From f7fcfefc78b9837e554aa7abdc139b6865572d7a Mon Sep 17 00:00:00 2001 From: Ahmed Bouhuolia Date: Sun, 9 Jun 2024 22:52:56 +0200 Subject: [PATCH] fix: Concurrency controlling multiple processes in Bigcapital CLI commands --- packages/server/package.json | 1 + packages/server/src/commands/bigcapital.ts | 67 +++++++++++++--------- pnpm-lock.yaml | 10 ++++ 3 files changed, 50 insertions(+), 28 deletions(-) diff --git a/packages/server/package.json b/packages/server/package.json index ef6682eca..07329cb60 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -25,6 +25,7 @@ "@casl/ability": "^5.4.3", "@hapi/boom": "^7.4.3", "@lemonsqueezy/lemonsqueezy.js": "^2.2.0", + "@supercharge/promise-pool": "^3.2.0", "@types/express": "^4.17.21", "@types/i18n": "^0.8.7", "@types/knex": "^0.16.1", diff --git a/packages/server/src/commands/bigcapital.ts b/packages/server/src/commands/bigcapital.ts index 10a1390a8..7e9b6cc88 100644 --- a/packages/server/src/commands/bigcapital.ts +++ b/packages/server/src/commands/bigcapital.ts @@ -4,6 +4,7 @@ import color from 'colorette'; import argv from 'getopts'; import Knex from 'knex'; import { knexSnakeCaseMappers } from 'objection'; +import { PromisePool } from '@supercharge/promise-pool'; import '../before'; import config from '../config'; @@ -28,7 +29,7 @@ function initSystemKnex() { }); } -function initTenantKnex(organizationId) { +function initTenantKnex(organizationId: string = '') { return Knex({ client: config.tenant.db_client, connection: { @@ -71,10 +72,12 @@ function getAllSystemTenants(knex) { return knex('tenants'); } -function getAllInitializedSystemTenants(knex) { +function getAllInitializedTenants(knex) { return knex('tenants').whereNotNull('initializedAt'); } +const MIGRATION_CONCURRENCY = 10; + // module.exports = { // log, // success, @@ -91,6 +94,7 @@ function getAllInitializedSystemTenants(knex) { // - bigcapital tenants:migrate:make // - bigcapital system:migrate:make // - bigcapital tenants:list +// - bigcapital tenants:list --all commander .command('system:migrate:rollback') @@ -149,10 +153,13 @@ commander commander .command('tenants:list') .description('Retrieve a list of all system tenants databases.') + .option('-a, --all', 'All tenants even are not initialized.') .action(async (cmd) => { try { const sysKnex = await initSystemKnex(); - const tenants = await getAllSystemTenants(sysKnex); + const tenants = cmd?.all + ? await getAllSystemTenants(sysKnex) + : await getAllInitializedTenants(sysKnex); tenants.forEach((tenant) => { const dbName = `${config.tenant.db_name_prefix}${tenant.organizationId}`; @@ -183,18 +190,20 @@ commander commander .command('tenants:migrate:latest') .description('Migrate all tenants or the given tenant id.') - .option('-t, --tenant_id [tenant_id]', 'Which tenant id do you migrate.') + .option( + '-t, --tenant_id [tenant_id]', + 'Which organization id do you migrate.' + ) .action(async (cmd) => { try { const sysKnex = await initSystemKnex(); - const tenants = await getAllInitializedSystemTenants(sysKnex); + const tenants = await getAllInitializedTenants(sysKnex); const tenantsOrgsIds = tenants.map((tenant) => tenant.organizationId); if (cmd.tenant_id && tenantsOrgsIds.indexOf(cmd.tenant_id) === -1) { exit(`The given tenant id ${cmd.tenant_id} is not exists.`); } // Validate the tenant id exist first of all. - const migrateOpers = []; const migrateTenant = async (organizationId) => { try { const tenantKnex = await initTenantKnex(organizationId); @@ -216,17 +225,17 @@ commander } }; if (!cmd.tenant_id) { - tenants.forEach((tenant) => { - const oper = migrateTenant(tenant.organizationId); - migrateOpers.push(oper); - }); + await PromisePool.withConcurrency(MIGRATION_CONCURRENCY) + .for(tenants) + .process((tenant, index, pool) => { + return migrateTenant(tenant.organizationId); + }) + .then(() => { + success('All tenants are migrated.'); + }); } else { - const oper = migrateTenant(cmd.tenant_id); - migrateOpers.push(oper); + await migrateTenant(cmd.tenant_id); } - Promise.all(migrateOpers).then(() => { - success('All tenants are migrated.'); - }); } catch (error) { exit(error); } @@ -235,19 +244,21 @@ commander commander .command('tenants:migrate:rollback') .description('Rollback the last batch of tenants migrations.') - .option('-t, --tenant_id [tenant_id]', 'Which tenant id do you migrate.') + .option( + '-t, --tenant_id [tenant_id]', + 'Which organization id do you migrate.' + ) .action(async (cmd) => { try { const sysKnex = await initSystemKnex(); - const tenants = await getAllSystemTenants(sysKnex); + const tenants = await getAllInitializedTenants(sysKnex); const tenantsOrgsIds = tenants.map((tenant) => tenant.organizationId); if (cmd.tenant_id && tenantsOrgsIds.indexOf(cmd.tenant_id) === -1) { exit(`The given tenant id ${cmd.tenant_id} is not exists.`); } - const migrateOpers = []; - const migrateTenant = async (organizationId) => { + const migrateTenant = async (organizationId: string) => { try { const tenantKnex = await initTenantKnex(organizationId); const [batchNo, _log] = await tenantKnex.migrate.rollback(); @@ -268,17 +279,17 @@ commander }; if (!cmd.tenant_id) { - tenants.forEach((tenant) => { - const oper = migrateTenant(tenant.organizationId); - migrateOpers.push(oper); - }); + await PromisePool.withConcurrency(MIGRATION_CONCURRENCY) + .for(tenants) + .process((tenant, index, pool) => { + return migrateTenant(tenant.organizationId); + }) + .then(() => { + success('All tenants are rollbacked.'); + }); } else { - const oper = migrateTenant(cmd.tenant_id); - migrateOpers.push(oper); + await migrateTenant(cmd.tenant_id); } - Promise.all(migrateOpers).then(() => { - success('All tenants are rollbacked.'); - }); } catch (error) { exit(error); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d770b0795..9767dc9d6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -50,6 +50,9 @@ importers: '@lemonsqueezy/lemonsqueezy.js': specifier: ^2.2.0 version: 2.2.0 + '@supercharge/promise-pool': + specifier: ^3.2.0 + version: 3.2.0 '@types/express': specifier: ^4.17.21 version: 4.17.21 @@ -5751,6 +5754,11 @@ packages: resolution: {integrity: sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==} dev: false + /@supercharge/promise-pool@3.2.0: + resolution: {integrity: sha512-pj0cAALblTZBPtMltWOlZTQSLT07jIaFNeM8TWoJD1cQMgDB9mcMlVMoetiB35OzNJpqQ2b+QEtwiR9f20mADg==} + engines: {node: '>=8'} + dev: false + /@surma/rollup-plugin-off-main-thread@2.2.3: resolution: {integrity: sha512-lR8q/9W7hZpMWweNiAKU7NQerBnzQQLvi8qnTDU/fxItPhtZVMbPV3lbCwjhIlNBe9Bbr5V+KHshvWmVSG9cxQ==} dependencies: @@ -17382,6 +17390,7 @@ packages: /memory-pager@1.5.0: resolution: {integrity: sha512-ZS4Bp4r/Zoeq6+NLJpP+0Zzm0pR8whtGPf1XExKLJBAczGMnSi3It14OiNCStjQjM6NU1okjQGSxgEZN8eBYKg==} + requiresBuild: true dev: false /memorystream@0.3.1: @@ -23472,6 +23481,7 @@ packages: /sparse-bitfield@3.0.3: resolution: {integrity: sha512-kvzhi7vqKTfkh0PZU+2D2PIllw2ymqJKujUcyPMd9Y75Nv4nPbGJZXNhxsgdQab2BmlDct1YnfQCguEvHr7VsQ==} + requiresBuild: true dependencies: memory-pager: 1.5.0 dev: false