From fc6ebfea5c1a186c596040af46c76aa4e7dc2391 Mon Sep 17 00:00:00 2001 From: Ahmed Bouhuolia Date: Wed, 28 Aug 2024 21:25:47 +0200 Subject: [PATCH 1/3] Debounce scheduling calculating items cost --- packages/server/package.json | 1 + .../src/services/Inventory/Inventory.ts | 22 ++++---- .../Sales/Invoices/SalesInvoicesCost.ts | 51 ++++++++++++------- .../src/subscribers/Inventory/Inventory.ts | 16 ++---- .../SaleInvoices/WriteJournalEntries.ts | 1 + pnpm-lock.yaml | 9 ++++ 6 files changed, 61 insertions(+), 39 deletions(-) diff --git a/packages/server/package.json b/packages/server/package.json index 817247e8c..d9951a0d7 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -37,6 +37,7 @@ "agendash": "^3.1.0", "app-root-path": "^3.0.0", "async": "^3.2.0", + "async-mutex": "^0.5.0", "axios": "^1.6.0", "babel-loader": "^9.1.2", "bcryptjs": "^2.4.3", diff --git a/packages/server/src/services/Inventory/Inventory.ts b/packages/server/src/services/Inventory/Inventory.ts index 33394bfe7..980ace79a 100644 --- a/packages/server/src/services/Inventory/Inventory.ts +++ b/packages/server/src/services/Inventory/Inventory.ts @@ -139,24 +139,26 @@ export default class InventoryService { ) { const agenda = Container.get('agenda'); + const commonJobsQuery = { + name: 'compute-item-cost', + lastRunAt: { $exists: false }, + 'data.tenantId': tenantId, + 'data.itemId': itemId, + }; // Cancel any `compute-item-cost` in the queue has upper starting date // with the same given item. await agenda.cancel({ - name: 'compute-item-cost', - nextRunAt: { $ne: null }, - 'data.tenantId': tenantId, - 'data.itemId': itemId, - 'data.startingDate': { $gt: startingDate }, + ...commonJobsQuery, + 'data.startingDate': { $lte: startingDate }, }); // Retrieve any `compute-item-cost` in the queue has lower starting date // with the same given item. const dependsJobs = await agenda.jobs({ - name: 'compute-item-cost', - nextRunAt: { $ne: null }, - 'data.tenantId': tenantId, - 'data.itemId': itemId, - 'data.startingDate': { $lte: startingDate }, + ...commonJobsQuery, + 'data.startingDate': { $gte: startingDate }, }); + + // If the depends jobs cleared. if (dependsJobs.length === 0) { await agenda.schedule( config.scheduleComputeItemCost, diff --git a/packages/server/src/services/Sales/Invoices/SalesInvoicesCost.ts b/packages/server/src/services/Sales/Invoices/SalesInvoicesCost.ts index 1faa37350..4e69e2a03 100644 --- a/packages/server/src/services/Sales/Invoices/SalesInvoicesCost.ts +++ b/packages/server/src/services/Sales/Invoices/SalesInvoicesCost.ts @@ -1,3 +1,4 @@ +import { Mutex } from 'async-mutex'; import { Container, Service, Inject } from 'typedi'; import { chain } from 'lodash'; import moment from 'moment'; @@ -34,17 +35,26 @@ export class SaleInvoicesCost { inventoryItemsIds: number[], startingDate: Date ): Promise { - const asyncOpers: Promise<[]>[] = []; + const mutex = new Mutex(); - inventoryItemsIds.forEach((inventoryItemId: number) => { - const oper: Promise<[]> = this.inventoryService.scheduleComputeItemCost( - tenantId, - inventoryItemId, - startingDate - ); - asyncOpers.push(oper); - }); - await Promise.all([...asyncOpers]); + const asyncOpers = inventoryItemsIds.map( + async (inventoryItemId: number) => { + // @todo refactor the lock acquire to be distrbuted using Redis + // and run the cost schedule job after running invoice transaction. + const release = await mutex.acquire(); + + try { + await this.inventoryService.scheduleComputeItemCost( + tenantId, + inventoryItemId, + startingDate + ); + } finally { + release(); + } + } + ); + await Promise.all(asyncOpers); } /** @@ -86,17 +96,22 @@ export class SaleInvoicesCost { tenantId: number, inventoryTransactions: IInventoryTransaction[] ) { - const asyncOpers: Promise<[]>[] = []; + const mutex = new Mutex(); const reducedTransactions = this.getMaxDateInventoryTransactions( inventoryTransactions ); - reducedTransactions.forEach((transaction) => { - const oper: Promise<[]> = this.inventoryService.scheduleComputeItemCost( - tenantId, - transaction.itemId, - transaction.date - ); - asyncOpers.push(oper); + const asyncOpers = reducedTransactions.map(async (transaction) => { + const release = await mutex.acquire(); + + try { + await this.inventoryService.scheduleComputeItemCost( + tenantId, + transaction.itemId, + transaction.date + ); + } finally { + release(); + } }); await Promise.all([...asyncOpers]); } diff --git a/packages/server/src/subscribers/Inventory/Inventory.ts b/packages/server/src/subscribers/Inventory/Inventory.ts index 80c917f1f..9061a6d81 100644 --- a/packages/server/src/subscribers/Inventory/Inventory.ts +++ b/packages/server/src/subscribers/Inventory/Inventory.ts @@ -86,20 +86,14 @@ export default class InventorySubscriber { private handleScheduleItemsCostOnInventoryTransactionsCreated = async ({ tenantId, inventoryTransactions, - trx + trx, }: IInventoryTransactionsCreatedPayload) => { const inventoryItemsIds = map(inventoryTransactions, 'itemId'); - runAfterTransaction(trx, async () => { - try { - await this.saleInvoicesCost.computeItemsCostByInventoryTransactions( - tenantId, - inventoryTransactions - ); - } catch (error) { - console.error(error); - } - }); + await this.saleInvoicesCost.computeItemsCostByInventoryTransactions( + tenantId, + inventoryTransactions + ); }; /** diff --git a/packages/server/src/subscribers/SaleInvoices/WriteJournalEntries.ts b/packages/server/src/subscribers/SaleInvoices/WriteJournalEntries.ts index 4b1eab698..14a160253 100644 --- a/packages/server/src/subscribers/SaleInvoices/WriteJournalEntries.ts +++ b/packages/server/src/subscribers/SaleInvoices/WriteJournalEntries.ts @@ -6,6 +6,7 @@ import { ISaleInvoiceEditedPayload, } from '@/interfaces'; import { SaleInvoiceGLEntries } from '@/services/Sales/Invoices/InvoiceGLEntries'; +import { runAfterTransaction } from '@/services/UnitOfWork/TransactionsHooks'; @Service() export default class SaleInvoiceWriteGLEntriesSubscriber { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 12042e0a5..93fa17bb3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -86,6 +86,9 @@ importers: async: specifier: ^3.2.0 version: 3.2.5 + async-mutex: + specifier: ^0.5.0 + version: 0.5.0 axios: specifier: ^1.6.0 version: 1.7.2 @@ -8009,6 +8012,12 @@ packages: resolution: {integrity: sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==} dev: false + /async-mutex@0.5.0: + resolution: {integrity: sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==} + dependencies: + tslib: 2.6.2 + dev: false + /async-settle@1.0.0: resolution: {integrity: sha512-VPXfB4Vk49z1LHHodrEQ6Xf7W4gg1w0dAPROHngx7qgDjqmIQ+fXmwgGXTW/ITLai0YLSvWepJOP9EVpMnEAcw==} engines: {node: '>= 0.10'} From b87321c897f7c45ae482c95184668acb1af4cd93 Mon Sep 17 00:00:00 2001 From: Ahmed Bouhuolia Date: Wed, 28 Aug 2024 22:15:15 +0200 Subject: [PATCH 2/3] fix: Avoid running the cost job in import preview --- packages/server/src/interfaces/Import.ts | 2 +- .../events/TriggerRecognizedTransactions.ts | 1 - .../server/src/services/Import/ImportALS.ts | 104 ++++++++++++++++++ .../src/services/Import/ImportFilePreview.ts | 21 +++- .../Import/ImportFileProcessCommit.ts | 21 +++- .../src/subscribers/Inventory/Inventory.ts | 9 +- 6 files changed, 153 insertions(+), 5 deletions(-) create mode 100644 packages/server/src/services/Import/ImportALS.ts diff --git a/packages/server/src/interfaces/Import.ts b/packages/server/src/interfaces/Import.ts index 4f6d2e069..cf25a79fb 100644 --- a/packages/server/src/interfaces/Import.ts +++ b/packages/server/src/interfaces/Import.ts @@ -3,6 +3,6 @@ import { ImportFilePreviewPOJO } from "@/services/Import/interfaces"; export interface IImportFileCommitedEventPayload { tenantId: number; - importId: number; + importId: string; meta: ImportFilePreviewPOJO; } \ No newline at end of file diff --git a/packages/server/src/services/Banking/RegonizeTranasctions/events/TriggerRecognizedTransactions.ts b/packages/server/src/services/Banking/RegonizeTranasctions/events/TriggerRecognizedTransactions.ts index eba643481..7220be765 100644 --- a/packages/server/src/services/Banking/RegonizeTranasctions/events/TriggerRecognizedTransactions.ts +++ b/packages/server/src/services/Banking/RegonizeTranasctions/events/TriggerRecognizedTransactions.ts @@ -100,7 +100,6 @@ export class TriggerRecognizedTransactions { private async triggerRecognizeTransactionsOnImportCommitted({ tenantId, importId, - meta, }: IImportFileCommitedEventPayload) { const importFile = await Import.query().findOne({ importId }); const batch = importFile.paramsParsed.batch; diff --git a/packages/server/src/services/Import/ImportALS.ts b/packages/server/src/services/Import/ImportALS.ts new file mode 100644 index 000000000..bfa69eb9f --- /dev/null +++ b/packages/server/src/services/Import/ImportALS.ts @@ -0,0 +1,104 @@ +import { Service } from 'typedi'; +import { AsyncLocalStorage } from 'async_hooks'; + +@Service() +export class ImportAls { + private als: AsyncLocalStorage>; + + constructor() { + this.als = new AsyncLocalStorage(); + } + /** + * Runs a callback function within the context of a new AsyncLocalStorage store. + * @param callback The function to be executed within the AsyncLocalStorage context. + * @returns The result of the callback function. + */ + public run(callback: () => T): T { + return this.als.run(new Map(), callback); + } + + /** + * Runs a callback function in preview mode within the AsyncLocalStorage context. + * @param callback The function to be executed in preview mode. + * @returns The result of the callback function. + */ + public runPreview(callback: () => T): T { + return this.run(() => { + this.markAsImport(); + this.markAsImportPreview(); + return callback(); + }); + } + + /** + * Runs a callback function in commit mode within the AsyncLocalStorage context. + * @param {() => T} callback - The function to be executed in commit mode. + * @returns {T} The result of the callback function. + */ + public runCommit(callback: () => T): T { + return this.run(() => { + this.markAsImport(); + this.markAsImportCommit(); + return callback(); + }); + } + + /** + * Retrieves the current AsyncLocalStorage store. + * @returns The current store or undefined if not in a valid context. + */ + public getStore(): Map | undefined { + return this.als.getStore(); + } + + /** + * Marks the current context as an import operation. + * @param flag Boolean flag to set or unset the import status. Defaults to true. + */ + public markAsImport(flag: boolean = true): void { + const store = this.getStore(); + store?.set('isImport', flag); + } + + /** + * Marks the current context as an import commit operation. + * @param flag Boolean flag to set or unset the import commit status. Defaults to true. + */ + public markAsImportCommit(flag: boolean = true): void { + const store = this.getStore(); + store?.set('isImportCommit', flag); + } + + /** + * Marks the current context as an import preview operation. + * @param {boolean} flag - Boolean flag to set or unset the import preview status. Defaults to true. + */ + public markAsImportPreview(flag: boolean = true): void { + const store = this.getStore(); + store?.set('isImportPreview', flag); + } + + /** + * Checks if the current context is an import operation. + * @returns {boolean} True if the context is an import operation, false otherwise. + */ + public isImport(): boolean { + return !!this.getStore()?.get('isImport'); + } + + /** + * Checks if the current context is an import commit operation. + * @returns {boolean} True if the context is an import commit operation, false otherwise. + */ + public isImportCommit(): boolean { + return !!this.getStore()?.get('isImportCommit'); + } + + /** + * Checks if the current context is an import preview operation. + * @returns {boolean} True if the context is an import preview operation, false otherwise. + */ + public isImportPreview(): boolean { + return !!this.getStore()?.get('isImportPreview'); + } +} diff --git a/packages/server/src/services/Import/ImportFilePreview.ts b/packages/server/src/services/Import/ImportFilePreview.ts index b25c00a0e..e8b566bbe 100644 --- a/packages/server/src/services/Import/ImportFilePreview.ts +++ b/packages/server/src/services/Import/ImportFilePreview.ts @@ -2,6 +2,7 @@ import { Inject, Service } from 'typedi'; import HasTenancyService from '../Tenancy/TenancyService'; import { ImportFilePreviewPOJO } from './interfaces'; import { ImportFileProcess } from './ImportFileProcess'; +import { ImportAls } from './ImportALS'; @Service() export class ImportFilePreview { @@ -11,13 +12,31 @@ export class ImportFilePreview { @Inject() private importFile: ImportFileProcess; + @Inject() + private importAls: ImportAls; + + /** + * Preview the imported file results before commiting the transactions. + * @param {number} tenantId - + * @param {string} importId - + * @returns {Promise} + */ + public async preview( + tenantId: number, + importId: string + ): Promise { + return this.importAls.runPreview>(() => + this.previewAlsRun(tenantId, importId) + ); + } + /** * Preview the imported file results before commiting the transactions. * @param {number} tenantId * @param {number} importId * @returns {Promise} */ - public async preview( + public async previewAlsRun( tenantId: number, importId: string ): Promise { diff --git a/packages/server/src/services/Import/ImportFileProcessCommit.ts b/packages/server/src/services/Import/ImportFileProcessCommit.ts index 689b956c7..f9cb640fc 100644 --- a/packages/server/src/services/Import/ImportFileProcessCommit.ts +++ b/packages/server/src/services/Import/ImportFileProcessCommit.ts @@ -5,6 +5,7 @@ import { ImportFileProcess } from './ImportFileProcess'; import { EventPublisher } from '@/lib/EventPublisher/EventPublisher'; import events from '@/subscribers/events'; import { IImportFileCommitedEventPayload } from '@/interfaces/Import'; +import { ImportAls } from './ImportALS'; @Service() export class ImportFileProcessCommit { @@ -14,16 +15,34 @@ export class ImportFileProcessCommit { @Inject() private importFile: ImportFileProcess; + @Inject() + private importAls: ImportAls; + @Inject() private eventPublisher: EventPublisher; + /** + * Commits the imported file under ALS. + * @param {number} tenantId + * @param {string} importId + * @returns {Promise} + */ + public commit( + tenantId: number, + importId: string + ): Promise { + return this.importAls.runCommit>(() => + this.commitAlsRun(tenantId, importId) + ); + } + /** * Commits the imported file. * @param {number} tenantId * @param {number} importId * @returns {Promise} */ - public async commit( + public async commitAlsRun( tenantId: number, importId: string ): Promise { diff --git a/packages/server/src/subscribers/Inventory/Inventory.ts b/packages/server/src/subscribers/Inventory/Inventory.ts index 9061a6d81..f45a1a8f7 100644 --- a/packages/server/src/subscribers/Inventory/Inventory.ts +++ b/packages/server/src/subscribers/Inventory/Inventory.ts @@ -10,6 +10,7 @@ import { } from '@/interfaces'; import { runAfterTransaction } from '@/services/UnitOfWork/TransactionsHooks'; import { SaleInvoicesCost } from '@/services/Sales/Invoices/SalesInvoicesCost'; +import { ImportAls } from '@/services/Import/ImportALS'; @Service() export default class InventorySubscriber { @@ -25,6 +26,9 @@ export default class InventorySubscriber { @Inject('agenda') private agenda: any; + @Inject() + private importAls: ImportAls; + /** * Attaches events with handlers. */ @@ -88,7 +92,10 @@ export default class InventorySubscriber { inventoryTransactions, trx, }: IInventoryTransactionsCreatedPayload) => { - const inventoryItemsIds = map(inventoryTransactions, 'itemId'); + const inImportPreviewScope = this.importAls.isImportPreview(); + + // Avoid running the cost items job if the async process is in import preview. + if (inImportPreviewScope) return; await this.saleInvoicesCost.computeItemsCostByInventoryTransactions( tenantId, From 84a0b8f4954b146890727548a6e69b7cadad43ba Mon Sep 17 00:00:00 2001 From: Ahmed Bouhuolia Date: Thu, 29 Aug 2024 10:05:38 +0200 Subject: [PATCH 3/3] fix: re-schedule the jobs have date from the current moment --- packages/server/src/services/Inventory/Inventory.ts | 8 +++++++- .../src/subscribers/SaleInvoices/WriteJournalEntries.ts | 1 - 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/server/src/services/Inventory/Inventory.ts b/packages/server/src/services/Inventory/Inventory.ts index 980ace79a..9a92685ca 100644 --- a/packages/server/src/services/Inventory/Inventory.ts +++ b/packages/server/src/services/Inventory/Inventory.ts @@ -157,7 +157,6 @@ export default class InventoryService { ...commonJobsQuery, 'data.startingDate': { $gte: startingDate }, }); - // If the depends jobs cleared. if (dependsJobs.length === 0) { await agenda.schedule( @@ -174,6 +173,13 @@ export default class InventoryService { events.inventory.onComputeItemCostJobScheduled, { startingDate, itemId, tenantId } as IInventoryItemCostScheduledPayload ); + } else { + // Re-schedule the jobs that have higher date from current moment. + await Promise.all( + dependsJobs.map((job) => + job.schedule(config.scheduleComputeItemCost).save() + ) + ); } } diff --git a/packages/server/src/subscribers/SaleInvoices/WriteJournalEntries.ts b/packages/server/src/subscribers/SaleInvoices/WriteJournalEntries.ts index 14a160253..4b1eab698 100644 --- a/packages/server/src/subscribers/SaleInvoices/WriteJournalEntries.ts +++ b/packages/server/src/subscribers/SaleInvoices/WriteJournalEntries.ts @@ -6,7 +6,6 @@ import { ISaleInvoiceEditedPayload, } from '@/interfaces'; import { SaleInvoiceGLEntries } from '@/services/Sales/Invoices/InvoiceGLEntries'; -import { runAfterTransaction } from '@/services/UnitOfWork/TransactionsHooks'; @Service() export default class SaleInvoiceWriteGLEntriesSubscriber {