swaf/src/db/MysqlConnectionManager.ts

277 lines
9.5 KiB
TypeScript

import config from 'config';
import mysql, {Connection, FieldInfo, MysqlError, Pool, PoolConnection} from 'mysql';
import {logger} from "../Logger.js";
import {Type} from "../Utils.js";
import Migration, {MigrationType} from "./Migration.js";
export interface QueryResult {
readonly results: Record<string, unknown>[];
readonly fields: FieldInfo[];
readonly other?: Record<string, unknown>;
foundRows?: number;
}
export async function query(
queryString: string,
values?: QueryVariable[],
connection?: Connection,
): Promise<QueryResult> {
return await MysqlConnectionManager.query(queryString, values, connection);
}
export default class MysqlConnectionManager {
private static currentPool?: Pool;
private static databaseReady: boolean = false;
private static migrationsRegistered: boolean = false;
private static readonly migrations: Migration[] = [];
public static isReady(): boolean {
return this.databaseReady && this.currentPool !== undefined;
}
public static registerMigrations(migrations: MigrationType<Migration>[]): void {
if (!this.migrationsRegistered) {
this.migrationsRegistered = true;
migrations.forEach(m => this.registerMigration(v => new m(v)));
}
}
private static registerMigration(migration: (version: number) => Migration) {
this.migrations.push(migration(this.migrations.length + 1));
}
public static hasMigration(migration: Type<Migration>): boolean {
for (const m of this.migrations) {
if (m.constructor === migration) return true;
}
return false;
}
public static async prepare(runMigrations: boolean = true): Promise<void> {
if (config.get('mysql.create_database_automatically') === true) {
const dbName = config.get('mysql.database');
logger.info(`Creating database ${dbName}...`);
const connection = mysql.createConnection({
host: config.get('mysql.host'),
user: config.get('mysql.user'),
password: config.get('mysql.password'),
charset: 'utf8mb4',
});
await new Promise<void>((resolve, reject) => {
connection.query(`CREATE DATABASE IF NOT EXISTS ${dbName}`, (error) => {
return error !== null ?
reject(error) :
resolve();
});
});
connection.end();
logger.info(`Database ${dbName} created!`);
}
this.databaseReady = true;
if (runMigrations) await this.handleMigrations();
}
public static get pool(): Pool {
if (this.currentPool === undefined) {
this.currentPool = this.createPool();
}
return this.currentPool;
}
private static createPool(): Pool {
return mysql.createPool({
connectionLimit: config.get('mysql.connectionLimit'),
host: config.get('mysql.host'),
user: config.get('mysql.user'),
password: config.get('mysql.password'),
database: config.get('mysql.database'),
charset: 'utf8mb4',
});
}
public static async endPool(): Promise<void> {
return await new Promise(resolve => {
if (this.currentPool === undefined) {
return resolve();
}
this.currentPool.end(() => {
logger.info('Mysql connection pool ended.');
resolve();
});
this.currentPool = undefined;
});
}
public static async query(
queryString: string,
values: QueryVariable[] = [],
connection?: Connection,
): Promise<QueryResult> {
return await new Promise<QueryResult>((resolve, reject) => {
logger.debug('SQL:', logger.settings.minLevel === 'trace' || logger.settings.minLevel === 'silly' ?
mysql.format(queryString, values) :
queryString);
(connection ? connection : this.pool).query(queryString, values, (error, results, fields) => {
if (error !== null) {
reject(error);
return;
}
resolve({
results: Array.isArray(results) ? results : [],
fields: fields !== undefined ? fields : [],
other: Array.isArray(results) ? null : results,
});
});
});
}
public static async wrapTransaction<T>(transaction: (connection: Connection) => Promise<T>): Promise<T> {
return await new Promise<T>((resolve, reject) => {
this.pool.getConnection((err: MysqlError | undefined, connection: PoolConnection) => {
if (err) {
reject(err);
return;
}
connection.beginTransaction((err?: MysqlError) => {
if (err) {
reject(err);
connection.release();
return;
}
transaction(connection).then(val => {
connection.commit((err?: MysqlError) => {
if (err) {
this.rejectAndRollback(connection, err, reject);
connection.release();
return;
}
connection.release();
resolve(val);
});
}).catch(err => {
this.rejectAndRollback(connection, err, reject);
connection.release();
});
});
});
});
}
private static rejectAndRollback(
connection: Connection,
err: MysqlError | undefined,
reject: (err: unknown) => void,
) {
connection.rollback((rollbackErr?: MysqlError) => {
return rollbackErr ?
reject(err + '\n' + rollbackErr) :
reject(err);
});
}
public static async getCurrentMigrationVersion(): Promise<number> {
let currentVersion = 0;
try {
const result = await query('SELECT id FROM migrations ORDER BY id DESC LIMIT 1');
currentVersion = Number(result.results[0]?.id);
} catch (e) {
if (e instanceof Error) {
const mysqlError = e as MysqlError;
if (mysqlError.code !== 'ER_NO_SUCH_TABLE') {
throw new Error('Cannot run migrations: ' + mysqlError.code);
}
} else {
throw e;
}
}
return currentVersion;
}
private static async handleMigrations() {
const currentVersion = await this.getCurrentMigrationVersion();
for (const migration of this.migrations) {
if (await migration.shouldRun(currentVersion)) {
logger.info('Running migration ', migration.version, migration.constructor.name);
await MysqlConnectionManager.wrapTransaction<void>(async c => {
migration.setCurrentConnection(c);
await migration.install();
migration.setCurrentConnection(null);
await query('INSERT INTO migrations VALUES(?, ?, NOW())', [
migration.version,
migration.constructor.name,
]);
});
}
migration.registerModels?.();
}
}
/**
* @param migrationId what migration to rollback. Use with caution. default=0 is for last registered migration.
*/
public static async rollbackMigration(migrationId: number = 0): Promise<void> {
migrationId--;
const migration = this.migrations[migrationId];
logger.info('Rolling back migration ', migration.version, migration.constructor.name);
await MysqlConnectionManager.wrapTransaction<void>(async c => {
migration.setCurrentConnection(c);
await migration.rollback();
migration.setCurrentConnection(null);
await query('DELETE FROM migrations WHERE id=?', [migration.version]);
});
}
public static async migrationCommand(args: string[]): Promise<void> {
try {
logger.info('Current migration:', await this.getCurrentMigrationVersion());
for (let i = 0; i < args.length; i++) {
if (args[i] === 'rollback') {
let migrationId = 0;
if (args.length > i + 1) {
migrationId = parseInt(args[i + 1]);
}
logger.info('Rolling back migration', migrationId);
await this.prepare(false);
await this.rollbackMigration(migrationId);
return;
}
}
} finally {
await MysqlConnectionManager.endPool();
}
}
}
export type QueryVariable =
| boolean
| string
| number
| Date
| Buffer
| null
| undefined;
export function isQueryVariable(value: unknown): value is QueryVariable {
return typeof value === 'boolean' ||
typeof value === "string" ||
typeof value === 'number' ||
value instanceof Date ||
value instanceof Buffer ||
value === null ||
value === undefined;
}