mirror of
https://github.com/bigcapitalhq/bigcapital.git
synced 2026-02-16 04:40:32 +00:00
fix: Concurrency controlling multiple processes in Bigcapital CLI commands
This commit is contained in:
@@ -4,6 +4,7 @@ import color from 'colorette';
|
||||
import argv from 'getopts';
|
||||
import Knex from 'knex';
|
||||
import { knexSnakeCaseMappers } from 'objection';
|
||||
import { PromisePool } from '@supercharge/promise-pool';
|
||||
import '../before';
|
||||
import config from '../config';
|
||||
|
||||
@@ -28,7 +29,7 @@ function initSystemKnex() {
|
||||
});
|
||||
}
|
||||
|
||||
function initTenantKnex(organizationId) {
|
||||
function initTenantKnex(organizationId: string = '') {
|
||||
return Knex({
|
||||
client: config.tenant.db_client,
|
||||
connection: {
|
||||
@@ -71,10 +72,12 @@ function getAllSystemTenants(knex) {
|
||||
return knex('tenants');
|
||||
}
|
||||
|
||||
function getAllInitializedSystemTenants(knex) {
|
||||
function getAllInitializedTenants(knex) {
|
||||
return knex('tenants').whereNotNull('initializedAt');
|
||||
}
|
||||
|
||||
const MIGRATION_CONCURRENCY = 10;
|
||||
|
||||
// module.exports = {
|
||||
// log,
|
||||
// success,
|
||||
@@ -91,6 +94,7 @@ function getAllInitializedSystemTenants(knex) {
|
||||
// - bigcapital tenants:migrate:make
|
||||
// - bigcapital system:migrate:make
|
||||
// - bigcapital tenants:list
|
||||
// - bigcapital tenants:list --all
|
||||
|
||||
commander
|
||||
.command('system:migrate:rollback')
|
||||
@@ -149,10 +153,13 @@ commander
|
||||
commander
|
||||
.command('tenants:list')
|
||||
.description('Retrieve a list of all system tenants databases.')
|
||||
.option('-a, --all', 'All tenants even are not initialized.')
|
||||
.action(async (cmd) => {
|
||||
try {
|
||||
const sysKnex = await initSystemKnex();
|
||||
const tenants = await getAllSystemTenants(sysKnex);
|
||||
const tenants = cmd?.all
|
||||
? await getAllSystemTenants(sysKnex)
|
||||
: await getAllInitializedTenants(sysKnex);
|
||||
|
||||
tenants.forEach((tenant) => {
|
||||
const dbName = `${config.tenant.db_name_prefix}${tenant.organizationId}`;
|
||||
@@ -183,18 +190,20 @@ commander
|
||||
commander
|
||||
.command('tenants:migrate:latest')
|
||||
.description('Migrate all tenants or the given tenant id.')
|
||||
.option('-t, --tenant_id [tenant_id]', 'Which tenant id do you migrate.')
|
||||
.option(
|
||||
'-t, --tenant_id [tenant_id]',
|
||||
'Which organization id do you migrate.'
|
||||
)
|
||||
.action(async (cmd) => {
|
||||
try {
|
||||
const sysKnex = await initSystemKnex();
|
||||
const tenants = await getAllInitializedSystemTenants(sysKnex);
|
||||
const tenants = await getAllInitializedTenants(sysKnex);
|
||||
const tenantsOrgsIds = tenants.map((tenant) => tenant.organizationId);
|
||||
|
||||
if (cmd.tenant_id && tenantsOrgsIds.indexOf(cmd.tenant_id) === -1) {
|
||||
exit(`The given tenant id ${cmd.tenant_id} is not exists.`);
|
||||
}
|
||||
// Validate the tenant id exist first of all.
|
||||
const migrateOpers = [];
|
||||
const migrateTenant = async (organizationId) => {
|
||||
try {
|
||||
const tenantKnex = await initTenantKnex(organizationId);
|
||||
@@ -216,17 +225,17 @@ commander
|
||||
}
|
||||
};
|
||||
if (!cmd.tenant_id) {
|
||||
tenants.forEach((tenant) => {
|
||||
const oper = migrateTenant(tenant.organizationId);
|
||||
migrateOpers.push(oper);
|
||||
});
|
||||
await PromisePool.withConcurrency(MIGRATION_CONCURRENCY)
|
||||
.for(tenants)
|
||||
.process((tenant, index, pool) => {
|
||||
return migrateTenant(tenant.organizationId);
|
||||
})
|
||||
.then(() => {
|
||||
success('All tenants are migrated.');
|
||||
});
|
||||
} else {
|
||||
const oper = migrateTenant(cmd.tenant_id);
|
||||
migrateOpers.push(oper);
|
||||
await migrateTenant(cmd.tenant_id);
|
||||
}
|
||||
Promise.all(migrateOpers).then(() => {
|
||||
success('All tenants are migrated.');
|
||||
});
|
||||
} catch (error) {
|
||||
exit(error);
|
||||
}
|
||||
@@ -235,19 +244,21 @@ commander
|
||||
commander
|
||||
.command('tenants:migrate:rollback')
|
||||
.description('Rollback the last batch of tenants migrations.')
|
||||
.option('-t, --tenant_id [tenant_id]', 'Which tenant id do you migrate.')
|
||||
.option(
|
||||
'-t, --tenant_id [tenant_id]',
|
||||
'Which organization id do you migrate.'
|
||||
)
|
||||
.action(async (cmd) => {
|
||||
try {
|
||||
const sysKnex = await initSystemKnex();
|
||||
const tenants = await getAllSystemTenants(sysKnex);
|
||||
const tenants = await getAllInitializedTenants(sysKnex);
|
||||
const tenantsOrgsIds = tenants.map((tenant) => tenant.organizationId);
|
||||
|
||||
if (cmd.tenant_id && tenantsOrgsIds.indexOf(cmd.tenant_id) === -1) {
|
||||
exit(`The given tenant id ${cmd.tenant_id} is not exists.`);
|
||||
}
|
||||
|
||||
const migrateOpers = [];
|
||||
const migrateTenant = async (organizationId) => {
|
||||
const migrateTenant = async (organizationId: string) => {
|
||||
try {
|
||||
const tenantKnex = await initTenantKnex(organizationId);
|
||||
const [batchNo, _log] = await tenantKnex.migrate.rollback();
|
||||
@@ -268,17 +279,17 @@ commander
|
||||
};
|
||||
|
||||
if (!cmd.tenant_id) {
|
||||
tenants.forEach((tenant) => {
|
||||
const oper = migrateTenant(tenant.organizationId);
|
||||
migrateOpers.push(oper);
|
||||
});
|
||||
await PromisePool.withConcurrency(MIGRATION_CONCURRENCY)
|
||||
.for(tenants)
|
||||
.process((tenant, index, pool) => {
|
||||
return migrateTenant(tenant.organizationId);
|
||||
})
|
||||
.then(() => {
|
||||
success('All tenants are rollbacked.');
|
||||
});
|
||||
} else {
|
||||
const oper = migrateTenant(cmd.tenant_id);
|
||||
migrateOpers.push(oper);
|
||||
await migrateTenant(cmd.tenant_id);
|
||||
}
|
||||
Promise.all(migrateOpers).then(() => {
|
||||
success('All tenants are rollbacked.');
|
||||
});
|
||||
} catch (error) {
|
||||
exit(error);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user