From fc6ebfea5c1a186c596040af46c76aa4e7dc2391 Mon Sep 17 00:00:00 2001 From: Ahmed Bouhuolia Date: Wed, 28 Aug 2024 21:25:47 +0200 Subject: [PATCH] Debounce scheduling calculating items cost --- packages/server/package.json | 1 + .../src/services/Inventory/Inventory.ts | 22 ++++---- .../Sales/Invoices/SalesInvoicesCost.ts | 51 ++++++++++++------- .../src/subscribers/Inventory/Inventory.ts | 16 ++---- .../SaleInvoices/WriteJournalEntries.ts | 1 + pnpm-lock.yaml | 9 ++++ 6 files changed, 61 insertions(+), 39 deletions(-) diff --git a/packages/server/package.json b/packages/server/package.json index 817247e8c..d9951a0d7 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -37,6 +37,7 @@ "agendash": "^3.1.0", "app-root-path": "^3.0.0", "async": "^3.2.0", + "async-mutex": "^0.5.0", "axios": "^1.6.0", "babel-loader": "^9.1.2", "bcryptjs": "^2.4.3", diff --git a/packages/server/src/services/Inventory/Inventory.ts b/packages/server/src/services/Inventory/Inventory.ts index 33394bfe7..980ace79a 100644 --- a/packages/server/src/services/Inventory/Inventory.ts +++ b/packages/server/src/services/Inventory/Inventory.ts @@ -139,24 +139,26 @@ export default class InventoryService { ) { const agenda = Container.get('agenda'); + const commonJobsQuery = { + name: 'compute-item-cost', + lastRunAt: { $exists: false }, + 'data.tenantId': tenantId, + 'data.itemId': itemId, + }; // Cancel any `compute-item-cost` in the queue has upper starting date // with the same given item. await agenda.cancel({ - name: 'compute-item-cost', - nextRunAt: { $ne: null }, - 'data.tenantId': tenantId, - 'data.itemId': itemId, - 'data.startingDate': { $gt: startingDate }, + ...commonJobsQuery, + 'data.startingDate': { $lte: startingDate }, }); // Retrieve any `compute-item-cost` in the queue has lower starting date // with the same given item. const dependsJobs = await agenda.jobs({ - name: 'compute-item-cost', - nextRunAt: { $ne: null }, - 'data.tenantId': tenantId, - 'data.itemId': itemId, - 'data.startingDate': { $lte: startingDate }, + ...commonJobsQuery, + 'data.startingDate': { $gte: startingDate }, }); + + // If the depends jobs cleared. if (dependsJobs.length === 0) { await agenda.schedule( config.scheduleComputeItemCost, diff --git a/packages/server/src/services/Sales/Invoices/SalesInvoicesCost.ts b/packages/server/src/services/Sales/Invoices/SalesInvoicesCost.ts index 1faa37350..4e69e2a03 100644 --- a/packages/server/src/services/Sales/Invoices/SalesInvoicesCost.ts +++ b/packages/server/src/services/Sales/Invoices/SalesInvoicesCost.ts @@ -1,3 +1,4 @@ +import { Mutex } from 'async-mutex'; import { Container, Service, Inject } from 'typedi'; import { chain } from 'lodash'; import moment from 'moment'; @@ -34,17 +35,26 @@ export class SaleInvoicesCost { inventoryItemsIds: number[], startingDate: Date ): Promise { - const asyncOpers: Promise<[]>[] = []; + const mutex = new Mutex(); - inventoryItemsIds.forEach((inventoryItemId: number) => { - const oper: Promise<[]> = this.inventoryService.scheduleComputeItemCost( - tenantId, - inventoryItemId, - startingDate - ); - asyncOpers.push(oper); - }); - await Promise.all([...asyncOpers]); + const asyncOpers = inventoryItemsIds.map( + async (inventoryItemId: number) => { + // @todo refactor the lock acquire to be distrbuted using Redis + // and run the cost schedule job after running invoice transaction. + const release = await mutex.acquire(); + + try { + await this.inventoryService.scheduleComputeItemCost( + tenantId, + inventoryItemId, + startingDate + ); + } finally { + release(); + } + } + ); + await Promise.all(asyncOpers); } /** @@ -86,17 +96,22 @@ export class SaleInvoicesCost { tenantId: number, inventoryTransactions: IInventoryTransaction[] ) { - const asyncOpers: Promise<[]>[] = []; + const mutex = new Mutex(); const reducedTransactions = this.getMaxDateInventoryTransactions( inventoryTransactions ); - reducedTransactions.forEach((transaction) => { - const oper: Promise<[]> = this.inventoryService.scheduleComputeItemCost( - tenantId, - transaction.itemId, - transaction.date - ); - asyncOpers.push(oper); + const asyncOpers = reducedTransactions.map(async (transaction) => { + const release = await mutex.acquire(); + + try { + await this.inventoryService.scheduleComputeItemCost( + tenantId, + transaction.itemId, + transaction.date + ); + } finally { + release(); + } }); await Promise.all([...asyncOpers]); } diff --git a/packages/server/src/subscribers/Inventory/Inventory.ts b/packages/server/src/subscribers/Inventory/Inventory.ts index 80c917f1f..9061a6d81 100644 --- a/packages/server/src/subscribers/Inventory/Inventory.ts +++ b/packages/server/src/subscribers/Inventory/Inventory.ts @@ -86,20 +86,14 @@ export default class InventorySubscriber { private handleScheduleItemsCostOnInventoryTransactionsCreated = async ({ tenantId, inventoryTransactions, - trx + trx, }: IInventoryTransactionsCreatedPayload) => { const inventoryItemsIds = map(inventoryTransactions, 'itemId'); - runAfterTransaction(trx, async () => { - try { - await this.saleInvoicesCost.computeItemsCostByInventoryTransactions( - tenantId, - inventoryTransactions - ); - } catch (error) { - console.error(error); - } - }); + await this.saleInvoicesCost.computeItemsCostByInventoryTransactions( + tenantId, + inventoryTransactions + ); }; /** diff --git a/packages/server/src/subscribers/SaleInvoices/WriteJournalEntries.ts b/packages/server/src/subscribers/SaleInvoices/WriteJournalEntries.ts index 4b1eab698..14a160253 100644 --- a/packages/server/src/subscribers/SaleInvoices/WriteJournalEntries.ts +++ b/packages/server/src/subscribers/SaleInvoices/WriteJournalEntries.ts @@ -6,6 +6,7 @@ import { ISaleInvoiceEditedPayload, } from '@/interfaces'; import { SaleInvoiceGLEntries } from '@/services/Sales/Invoices/InvoiceGLEntries'; +import { runAfterTransaction } from '@/services/UnitOfWork/TransactionsHooks'; @Service() export default class SaleInvoiceWriteGLEntriesSubscriber { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 12042e0a5..93fa17bb3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -86,6 +86,9 @@ importers: async: specifier: ^3.2.0 version: 3.2.5 + async-mutex: + specifier: ^0.5.0 + version: 0.5.0 axios: specifier: ^1.6.0 version: 1.7.2 @@ -8009,6 +8012,12 @@ packages: resolution: {integrity: sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==} dev: false + /async-mutex@0.5.0: + resolution: {integrity: sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==} + dependencies: + tslib: 2.6.2 + dev: false + /async-settle@1.0.0: resolution: {integrity: sha512-VPXfB4Vk49z1LHHodrEQ6Xf7W4gg1w0dAPROHngx7qgDjqmIQ+fXmwgGXTW/ITLai0YLSvWepJOP9EVpMnEAcw==} engines: {node: '>= 0.10'}