swaf/src/db/MysqlConnectionManager.ts

169 lines
6.0 KiB
TypeScript

import mysql, {Connection, FieldInfo, Pool} from 'mysql';
import config from 'config';
import Migration from "./Migration";
import Logger from "../Logger";
export interface QueryResult {
readonly results: any[];
readonly fields: FieldInfo[];
readonly other?: any;
foundRows?: number;
}
export async function query(queryString: string, values?: any, connection?: Connection): Promise<QueryResult> {
return await MysqlConnectionManager.query(queryString, values, connection);
}
export default class MysqlConnectionManager {
private static currentPool?: Pool;
private static databaseReady: boolean = false;
private static readonly migrations: Migration[] = [];
public static registerMigration(migration: (version: number) => Migration) {
this.migrations.push(migration(this.migrations.length + 1));
}
public static async prepare() {
if (config.get('mysql.create_database_automatically') === true) {
const dbName = config.get('mysql.database');
Logger.info(`Creating database ${dbName}...`);
const connection = mysql.createConnection({
host: config.get('mysql.host'),
user: config.get('mysql.user'),
password: config.get('mysql.password'),
});
await new Promise((resolve, reject) => {
connection.query(`CREATE DATABASE IF NOT EXISTS ${dbName}`, (error) => {
if (error !== null) {
reject(error);
} else {
resolve();
}
});
});
connection.end();
Logger.info(`Database ${dbName} created!`);
}
this.databaseReady = true;
await this.handleMigrations();
}
public static get pool(): Pool {
if (this.currentPool === undefined) {
this.currentPool = this.createPool();
}
return this.currentPool;
}
private static createPool(): Pool {
return mysql.createPool({
connectionLimit: config.get('mysql.connectionLimit'),
host: config.get('mysql.host'),
user: config.get('mysql.user'),
password: config.get('mysql.password'),
database: config.get('mysql.database'),
});
}
public static async endPool(): Promise<void> {
return new Promise(resolve => {
if (this.currentPool !== undefined) {
this.currentPool.end(() => {
Logger.info('Mysql connection pool ended.');
resolve();
});
this.currentPool = undefined;
} else {
resolve();
}
});
}
public static async query(queryString: string, values?: any, connection?: Connection): Promise<QueryResult> {
return await new Promise<QueryResult>((resolve, reject) => {
Logger.dev('Mysql query:', queryString, '; values:', values);
(connection ? connection : this.pool).query(queryString, values, (error, results, fields) => {
if (error !== null) {
reject(error);
return;
}
resolve({
results: Array.isArray(results) ? results : [],
fields: fields !== undefined ? fields : [],
other: Array.isArray(results) ? null : results
});
});
});
}
public static async wrapTransaction<T>(transaction: (connection: Connection) => Promise<T>): Promise<T> {
return await new Promise<T>((resolve, reject) => {
this.pool.getConnection((err, connection) => {
if (err) {
reject(err);
return;
}
connection.beginTransaction((err) => {
if (err) {
reject(err);
this.pool.releaseConnection(connection);
return;
}
transaction(connection).then(val => {
connection.commit((err) => {
if (err) {
this.rejectAndRollback(connection, err, reject);
this.pool.releaseConnection(connection);
return;
}
this.pool.releaseConnection(connection);
resolve(val);
});
}).catch(err => {
this.rejectAndRollback(connection, err, reject);
this.pool.releaseConnection(connection);
});
});
});
});
}
private static rejectAndRollback(connection: Connection, err: any, reject: (err: any) => void) {
connection.rollback((rollbackErr) => {
if (rollbackErr) {
reject(err + '\n' + rollbackErr);
} else {
reject(err);
}
});
}
private static async handleMigrations() {
let currentVersion = 0;
try {
const result = await query('SELECT id FROM migrations ORDER BY id DESC LIMIT 1');
currentVersion = result.results[0].id;
} catch (e) {
if (e.code === 'ECONNREFUSED' || e.code !== 'ER_NO_SUCH_TABLE') {
throw new Error('Cannot run migrations: ' + e.code);
}
}
for (const migration of this.migrations) {
if (await migration.shouldRun(currentVersion)) {
Logger.info('Running migration ', migration.version, migration.constructor.name);
await migration.install();
await query('INSERT INTO migrations VALUES(?, ?, NOW())', [
migration.version,
migration.constructor.name,
]);
}
}
}
}