import mysql, {Connection, FieldInfo, Pool} from 'mysql'; import config from 'config'; import Migration from "./Migration"; import Logger from "../Logger"; import {Type} from "../Utils"; export interface QueryResult { readonly results: any[]; readonly fields: FieldInfo[]; readonly other?: any; foundRows?: number; } export async function query(queryString: string, values?: any, connection?: Connection): Promise { return await MysqlConnectionManager.query(queryString, values, connection); } export default class MysqlConnectionManager { private static currentPool?: Pool; private static databaseReady: boolean = false; private static migrationsRegistered: boolean = false; private static readonly migrations: Migration[] = []; public static isReady(): boolean { return this.databaseReady && this.currentPool !== undefined; } public static registerMigrations(migrations: Type[]) { if (!this.migrationsRegistered) { this.migrationsRegistered = true; migrations.forEach(m => this.registerMigration(v => new m(v))); } } private static registerMigration(migration: (version: number) => Migration) { this.migrations.push(migration(this.migrations.length + 1)); } public static hasMigration(migration: Type) { for (const m of this.migrations) { if (m.constructor === migration) return true; } return false; } public static async prepare(runMigrations: boolean = true) { if (config.get('mysql.create_database_automatically') === true) { const dbName = config.get('mysql.database'); Logger.info(`Creating database ${dbName}...`); const connection = mysql.createConnection({ host: config.get('mysql.host'), user: config.get('mysql.user'), password: config.get('mysql.password'), charset: 'utf8mb4', }); await new Promise((resolve, reject) => { connection.query(`CREATE DATABASE IF NOT EXISTS ${dbName}`, (error) => { if (error !== null) { reject(error); } else { resolve(); } }); }); connection.end(); Logger.info(`Database ${dbName} created!`); } this.databaseReady = true; if (runMigrations) await this.handleMigrations(); } public static get pool(): Pool { if (this.currentPool === undefined) { this.currentPool = this.createPool(); } return this.currentPool; } private static createPool(): Pool { return mysql.createPool({ connectionLimit: config.get('mysql.connectionLimit'), host: config.get('mysql.host'), user: config.get('mysql.user'), password: config.get('mysql.password'), database: config.get('mysql.database'), charset: 'utf8mb4', }); } public static async endPool(): Promise { return new Promise(resolve => { if (this.currentPool !== undefined) { this.currentPool.end(() => { Logger.info('Mysql connection pool ended.'); resolve(); }); this.currentPool = undefined; } else { resolve(); } }); } public static async query(queryString: string, values?: any, connection?: Connection): Promise { return await new Promise((resolve, reject) => { Logger.dev('SQL:', Logger.isVerboseMode() ? mysql.format(queryString, values) : queryString); (connection ? connection : this.pool).query(queryString, values, (error, results, fields) => { if (error !== null) { reject(error); return; } resolve({ results: Array.isArray(results) ? results : [], fields: fields !== undefined ? fields : [], other: Array.isArray(results) ? null : results }); }); }); } public static async wrapTransaction(transaction: (connection: Connection) => Promise): Promise { return await new Promise((resolve, reject) => { this.pool.getConnection((err, connection) => { if (err) { reject(err); return; } connection.beginTransaction((err) => { if (err) { reject(err); this.pool.releaseConnection(connection); return; } transaction(connection).then(val => { connection.commit((err) => { if (err) { this.rejectAndRollback(connection, err, reject); this.pool.releaseConnection(connection); return; } this.pool.releaseConnection(connection); resolve(val); }); }).catch(err => { this.rejectAndRollback(connection, err, reject); this.pool.releaseConnection(connection); }); }); }); }); } private static rejectAndRollback(connection: Connection, err: any, reject: (err: any) => void) { connection.rollback((rollbackErr) => { if (rollbackErr) { reject(err + '\n' + rollbackErr); } else { reject(err); } }); } public static async getCurrentMigrationVersion(): Promise { let currentVersion = 0; try { const result = await query('SELECT id FROM migrations ORDER BY id DESC LIMIT 1'); currentVersion = result.results[0].id; } catch (e) { if (e.code === 'ECONNREFUSED' || e.code !== 'ER_NO_SUCH_TABLE') { throw new Error('Cannot run migrations: ' + e.code); } } return currentVersion; } private static async handleMigrations() { const currentVersion = await this.getCurrentMigrationVersion(); for (const migration of this.migrations) { if (await migration.shouldRun(currentVersion)) { Logger.info('Running migration ', migration.version, migration.constructor.name); await MysqlConnectionManager.wrapTransaction(async c => { await migration.install(c); await query('INSERT INTO migrations VALUES(?, ?, NOW())', [ migration.version, migration.constructor.name, ]); }); } } for (const migration of this.migrations) { migration.registerModels(); } } /** * @param migrationID what migration to rollback. Use with caution. default=0 is for last registered migration. */ public static async rollbackMigration(migrationID: number = 0): Promise { migrationID--; const migration = this.migrations[migrationID]; Logger.info('Rolling back migration ', migration.version, migration.constructor.name); await MysqlConnectionManager.wrapTransaction(async c => { await migration.rollback(c); await query('DELETE FROM migrations WHERE id=?', [migration.version]); }); } public static async migrationCommand(args: string[]): Promise { try { Logger.info('Current migration:', await this.getCurrentMigrationVersion()); for (let i = 0; i < args.length; i++) { if (args[i] === 'rollback') { let migrationID = 0; if (args.length > i + 1) { migrationID = parseInt(args[i + 1]); } await this.prepare(false); await this.rollbackMigration(migrationID); return; } } } finally { await MysqlConnectionManager.endPool(); } } }