import config from 'config'; import mysql, {Connection, FieldInfo, MysqlError, Pool, PoolConnection} from 'mysql'; import {logger} from "../Logger.js"; import {Type} from "../Utils.js"; import Migration, {MigrationType} from "./Migration.js"; export interface QueryResult { readonly results: Record[]; readonly fields: FieldInfo[]; readonly other?: Record; foundRows?: number; } export async function query( queryString: string, values?: QueryVariable[], connection?: Connection, ): Promise { return await MysqlConnectionManager.query(queryString, values, connection); } export default class MysqlConnectionManager { private static currentPool?: Pool; private static databaseReady: boolean = false; private static migrationsRegistered: boolean = false; private static readonly migrations: Migration[] = []; public static isReady(): boolean { return this.databaseReady && this.currentPool !== undefined; } public static registerMigrations(migrations: MigrationType[]): void { if (!this.migrationsRegistered) { this.migrationsRegistered = true; migrations.forEach(m => this.registerMigration(v => new m(v))); } } private static registerMigration(migration: (version: number) => Migration) { this.migrations.push(migration(this.migrations.length + 1)); } public static hasMigration(migration: Type): boolean { for (const m of this.migrations) { if (m.constructor === migration) return true; } return false; } public static async prepare(runMigrations: boolean = true): Promise { if (config.get('mysql.create_database_automatically') === true) { const dbName = config.get('mysql.database'); logger.info(`Creating database ${dbName}...`); const connection = mysql.createConnection({ host: config.get('mysql.host'), user: config.get('mysql.user'), password: config.get('mysql.password'), charset: 'utf8mb4', }); await new Promise((resolve, reject) => { connection.query(`CREATE DATABASE IF NOT EXISTS ${dbName}`, (error) => { return error !== null ? reject(error) : resolve(); }); }); connection.end(); logger.info(`Database ${dbName} created!`); } this.databaseReady = true; if (runMigrations) await this.handleMigrations(); } public static get pool(): Pool { if (this.currentPool === undefined) { this.currentPool = this.createPool(); } return this.currentPool; } private static createPool(): Pool { return mysql.createPool({ connectionLimit: config.get('mysql.connectionLimit'), host: config.get('mysql.host'), user: config.get('mysql.user'), password: config.get('mysql.password'), database: config.get('mysql.database'), charset: 'utf8mb4', }); } public static async endPool(): Promise { return await new Promise(resolve => { if (this.currentPool === undefined) { return resolve(); } this.currentPool.end(() => { logger.info('Mysql connection pool ended.'); resolve(); }); this.currentPool = undefined; }); } public static async query( queryString: string, values: QueryVariable[] = [], connection?: Connection, ): Promise { return await new Promise((resolve, reject) => { logger.debug('SQL:', logger.settings.minLevel === 'trace' || logger.settings.minLevel === 'silly' ? mysql.format(queryString, values) : queryString); (connection ? connection : this.pool).query(queryString, values, (error, results, fields) => { if (error !== null) { reject(error); return; } resolve({ results: Array.isArray(results) ? results : [], fields: fields !== undefined ? fields : [], other: Array.isArray(results) ? null : results, }); }); }); } public static async wrapTransaction(transaction: (connection: Connection) => Promise): Promise { return await new Promise((resolve, reject) => { this.pool.getConnection((err: MysqlError | undefined, connection: PoolConnection) => { if (err) { reject(err); return; } connection.beginTransaction((err?: MysqlError) => { if (err) { reject(err); connection.release(); return; } transaction(connection).then(val => { connection.commit((err?: MysqlError) => { if (err) { this.rejectAndRollback(connection, err, reject); connection.release(); return; } connection.release(); resolve(val); }); }).catch(err => { this.rejectAndRollback(connection, err, reject); connection.release(); }); }); }); }); } private static rejectAndRollback( connection: Connection, err: MysqlError | undefined, reject: (err: unknown) => void, ) { connection.rollback((rollbackErr?: MysqlError) => { return rollbackErr ? reject(err + '\n' + rollbackErr) : reject(err); }); } public static async getCurrentMigrationVersion(): Promise { let currentVersion = 0; try { const result = await query('SELECT id FROM migrations ORDER BY id DESC LIMIT 1'); currentVersion = Number(result.results[0]?.id); } catch (e) { if (e.code === 'ECONNREFUSED' || e.code !== 'ER_NO_SUCH_TABLE') { throw new Error('Cannot run migrations: ' + e.code); } } return currentVersion; } private static async handleMigrations() { const currentVersion = await this.getCurrentMigrationVersion(); for (const migration of this.migrations) { if (await migration.shouldRun(currentVersion)) { logger.info('Running migration ', migration.version, migration.constructor.name); await MysqlConnectionManager.wrapTransaction(async c => { migration.setCurrentConnection(c); await migration.install(); migration.setCurrentConnection(null); await query('INSERT INTO migrations VALUES(?, ?, NOW())', [ migration.version, migration.constructor.name, ]); }); } migration.registerModels?.(); } } /** * @param migrationId what migration to rollback. Use with caution. default=0 is for last registered migration. */ public static async rollbackMigration(migrationId: number = 0): Promise { migrationId--; const migration = this.migrations[migrationId]; logger.info('Rolling back migration ', migration.version, migration.constructor.name); await MysqlConnectionManager.wrapTransaction(async c => { migration.setCurrentConnection(c); await migration.rollback(); migration.setCurrentConnection(null); await query('DELETE FROM migrations WHERE id=?', [migration.version]); }); } public static async migrationCommand(args: string[]): Promise { try { logger.info('Current migration:', await this.getCurrentMigrationVersion()); for (let i = 0; i < args.length; i++) { if (args[i] === 'rollback') { let migrationId = 0; if (args.length > i + 1) { migrationId = parseInt(args[i + 1]); } await this.prepare(false); await this.rollbackMigration(migrationId); return; } } } finally { await MysqlConnectionManager.endPool(); } } } export type QueryVariable = | boolean | string | number | Date | Buffer | null | undefined; export function isQueryVariable(value: unknown): value is QueryVariable { return typeof value === 'boolean' || typeof value === "string" || typeof value === 'number' || value instanceof Date || value instanceof Buffer || value === null || value === undefined; }