import mysql, {Connection, FieldInfo, Pool} from 'mysql'; import config from 'config'; import Migration from "./Migration"; import Logger from "../Logger"; import {Type} from "../Utils"; export interface QueryResult { readonly results: any[]; readonly fields: FieldInfo[]; readonly other?: any; foundRows?: number; } export async function query(queryString: string, values?: any, connection?: Connection): Promise { return await MysqlConnectionManager.query(queryString, values, connection); } export default class MysqlConnectionManager { private static currentPool?: Pool; private static databaseReady: boolean = false; private static migrationsRegistered: boolean = false; private static readonly migrations: Migration[] = []; public static registerMigrations(migrations: Type[]) { if (!this.migrationsRegistered) { this.migrationsRegistered = true; migrations.forEach(m => this.registerMigration(v => new m(v))); } } private static registerMigration(migration: (version: number) => Migration) { this.migrations.push(migration(this.migrations.length + 1)); } public static async prepare() { if (config.get('mysql.create_database_automatically') === true) { const dbName = config.get('mysql.database'); Logger.info(`Creating database ${dbName}...`); const connection = mysql.createConnection({ host: config.get('mysql.host'), user: config.get('mysql.user'), password: config.get('mysql.password'), }); await new Promise((resolve, reject) => { connection.query(`CREATE DATABASE IF NOT EXISTS ${dbName}`, (error) => { if (error !== null) { reject(error); } else { resolve(); } }); }); connection.end(); Logger.info(`Database ${dbName} created!`); } this.databaseReady = true; await this.handleMigrations(); } public static get pool(): Pool { if (this.currentPool === undefined) { this.currentPool = this.createPool(); } return this.currentPool; } private static createPool(): Pool { return mysql.createPool({ connectionLimit: config.get('mysql.connectionLimit'), host: config.get('mysql.host'), user: config.get('mysql.user'), password: config.get('mysql.password'), database: config.get('mysql.database'), }); } public static async endPool(): Promise { return new Promise(resolve => { if (this.currentPool !== undefined) { this.currentPool.end(() => { Logger.info('Mysql connection pool ended.'); resolve(); }); this.currentPool = undefined; } else { resolve(); } }); } public static async query(queryString: string, values?: any, connection?: Connection): Promise { return await new Promise((resolve, reject) => { Logger.dev('Mysql query:', queryString, '; values:', values); (connection ? connection : this.pool).query(queryString, values, (error, results, fields) => { if (error !== null) { reject(error); return; } resolve({ results: Array.isArray(results) ? results : [], fields: fields !== undefined ? fields : [], other: Array.isArray(results) ? null : results }); }); }); } public static async wrapTransaction(transaction: (connection: Connection) => Promise): Promise { return await new Promise((resolve, reject) => { this.pool.getConnection((err, connection) => { if (err) { reject(err); return; } connection.beginTransaction((err) => { if (err) { reject(err); this.pool.releaseConnection(connection); return; } transaction(connection).then(val => { connection.commit((err) => { if (err) { this.rejectAndRollback(connection, err, reject); this.pool.releaseConnection(connection); return; } this.pool.releaseConnection(connection); resolve(val); }); }).catch(err => { this.rejectAndRollback(connection, err, reject); this.pool.releaseConnection(connection); }); }); }); }); } private static rejectAndRollback(connection: Connection, err: any, reject: (err: any) => void) { connection.rollback((rollbackErr) => { if (rollbackErr) { reject(err + '\n' + rollbackErr); } else { reject(err); } }); } private static async handleMigrations() { let currentVersion = 0; try { const result = await query('SELECT id FROM migrations ORDER BY id DESC LIMIT 1'); currentVersion = result.results[0].id; } catch (e) { if (e.code === 'ECONNREFUSED' || e.code !== 'ER_NO_SUCH_TABLE') { throw new Error('Cannot run migrations: ' + e.code); } } for (const migration of this.migrations) { if (await migration.shouldRun(currentVersion)) { Logger.info('Running migration ', migration.version, migration.constructor.name); await MysqlConnectionManager.wrapTransaction(async c => { await migration.install(c); await query('INSERT INTO migrations VALUES(?, ?, NOW())', [ migration.version, migration.constructor.name, ]); }); } } } }