Merge pull request #635 from bigcapitalhq/fix-avoid-cost-job-import-preview

fix: Avoid running the cost job in import preview
This commit is contained in:
Ahmed Bouhuolia
2024-08-29 10:09:13 +02:00
committed by GitHub
10 changed files with 219 additions and 44 deletions

View File

@@ -37,6 +37,7 @@
"agendash": "^3.1.0", "agendash": "^3.1.0",
"app-root-path": "^3.0.0", "app-root-path": "^3.0.0",
"async": "^3.2.0", "async": "^3.2.0",
"async-mutex": "^0.5.0",
"axios": "^1.6.0", "axios": "^1.6.0",
"babel-loader": "^9.1.2", "babel-loader": "^9.1.2",
"bcryptjs": "^2.4.3", "bcryptjs": "^2.4.3",

View File

@@ -3,6 +3,6 @@ import { ImportFilePreviewPOJO } from "@/services/Import/interfaces";
export interface IImportFileCommitedEventPayload { export interface IImportFileCommitedEventPayload {
tenantId: number; tenantId: number;
importId: number; importId: string;
meta: ImportFilePreviewPOJO; meta: ImportFilePreviewPOJO;
} }

View File

@@ -100,7 +100,6 @@ export class TriggerRecognizedTransactions {
private async triggerRecognizeTransactionsOnImportCommitted({ private async triggerRecognizeTransactionsOnImportCommitted({
tenantId, tenantId,
importId, importId,
meta,
}: IImportFileCommitedEventPayload) { }: IImportFileCommitedEventPayload) {
const importFile = await Import.query().findOne({ importId }); const importFile = await Import.query().findOne({ importId });
const batch = importFile.paramsParsed.batch; const batch = importFile.paramsParsed.batch;

View File

@@ -0,0 +1,104 @@
import { Service } from 'typedi';
import { AsyncLocalStorage } from 'async_hooks';
@Service()
export class ImportAls {
private als: AsyncLocalStorage<Map<string, any>>;
constructor() {
this.als = new AsyncLocalStorage();
}
/**
* Runs a callback function within the context of a new AsyncLocalStorage store.
* @param callback The function to be executed within the AsyncLocalStorage context.
* @returns The result of the callback function.
*/
public run<T>(callback: () => T): T {
return this.als.run<T>(new Map(), callback);
}
/**
* Runs a callback function in preview mode within the AsyncLocalStorage context.
* @param callback The function to be executed in preview mode.
* @returns The result of the callback function.
*/
public runPreview<T>(callback: () => T): T {
return this.run(() => {
this.markAsImport();
this.markAsImportPreview();
return callback();
});
}
/**
* Runs a callback function in commit mode within the AsyncLocalStorage context.
* @param {() => T} callback - The function to be executed in commit mode.
* @returns {T} The result of the callback function.
*/
public runCommit<T>(callback: () => T): T {
return this.run(() => {
this.markAsImport();
this.markAsImportCommit();
return callback();
});
}
/**
* Retrieves the current AsyncLocalStorage store.
* @returns The current store or undefined if not in a valid context.
*/
public getStore(): Map<string, any> | undefined {
return this.als.getStore();
}
/**
* Marks the current context as an import operation.
* @param flag Boolean flag to set or unset the import status. Defaults to true.
*/
public markAsImport(flag: boolean = true): void {
const store = this.getStore();
store?.set('isImport', flag);
}
/**
* Marks the current context as an import commit operation.
* @param flag Boolean flag to set or unset the import commit status. Defaults to true.
*/
public markAsImportCommit(flag: boolean = true): void {
const store = this.getStore();
store?.set('isImportCommit', flag);
}
/**
* Marks the current context as an import preview operation.
* @param {boolean} flag - Boolean flag to set or unset the import preview status. Defaults to true.
*/
public markAsImportPreview(flag: boolean = true): void {
const store = this.getStore();
store?.set('isImportPreview', flag);
}
/**
* Checks if the current context is an import operation.
* @returns {boolean} True if the context is an import operation, false otherwise.
*/
public isImport(): boolean {
return !!this.getStore()?.get('isImport');
}
/**
* Checks if the current context is an import commit operation.
* @returns {boolean} True if the context is an import commit operation, false otherwise.
*/
public isImportCommit(): boolean {
return !!this.getStore()?.get('isImportCommit');
}
/**
* Checks if the current context is an import preview operation.
* @returns {boolean} True if the context is an import preview operation, false otherwise.
*/
public isImportPreview(): boolean {
return !!this.getStore()?.get('isImportPreview');
}
}

View File

@@ -2,6 +2,7 @@ import { Inject, Service } from 'typedi';
import HasTenancyService from '../Tenancy/TenancyService'; import HasTenancyService from '../Tenancy/TenancyService';
import { ImportFilePreviewPOJO } from './interfaces'; import { ImportFilePreviewPOJO } from './interfaces';
import { ImportFileProcess } from './ImportFileProcess'; import { ImportFileProcess } from './ImportFileProcess';
import { ImportAls } from './ImportALS';
@Service() @Service()
export class ImportFilePreview { export class ImportFilePreview {
@@ -11,13 +12,31 @@ export class ImportFilePreview {
@Inject() @Inject()
private importFile: ImportFileProcess; private importFile: ImportFileProcess;
@Inject()
private importAls: ImportAls;
/**
* Preview the imported file results before commiting the transactions.
* @param {number} tenantId -
* @param {string} importId -
* @returns {Promise<ImportFilePreviewPOJO>}
*/
public async preview(
tenantId: number,
importId: string
): Promise<ImportFilePreviewPOJO> {
return this.importAls.runPreview<Promise<ImportFilePreviewPOJO>>(() =>
this.previewAlsRun(tenantId, importId)
);
}
/** /**
* Preview the imported file results before commiting the transactions. * Preview the imported file results before commiting the transactions.
* @param {number} tenantId * @param {number} tenantId
* @param {number} importId * @param {number} importId
* @returns {Promise<ImportFilePreviewPOJO>} * @returns {Promise<ImportFilePreviewPOJO>}
*/ */
public async preview( public async previewAlsRun(
tenantId: number, tenantId: number,
importId: string importId: string
): Promise<ImportFilePreviewPOJO> { ): Promise<ImportFilePreviewPOJO> {

View File

@@ -5,6 +5,7 @@ import { ImportFileProcess } from './ImportFileProcess';
import { EventPublisher } from '@/lib/EventPublisher/EventPublisher'; import { EventPublisher } from '@/lib/EventPublisher/EventPublisher';
import events from '@/subscribers/events'; import events from '@/subscribers/events';
import { IImportFileCommitedEventPayload } from '@/interfaces/Import'; import { IImportFileCommitedEventPayload } from '@/interfaces/Import';
import { ImportAls } from './ImportALS';
@Service() @Service()
export class ImportFileProcessCommit { export class ImportFileProcessCommit {
@@ -14,16 +15,34 @@ export class ImportFileProcessCommit {
@Inject() @Inject()
private importFile: ImportFileProcess; private importFile: ImportFileProcess;
@Inject()
private importAls: ImportAls;
@Inject() @Inject()
private eventPublisher: EventPublisher; private eventPublisher: EventPublisher;
/**
* Commits the imported file under ALS.
* @param {number} tenantId
* @param {string} importId
* @returns {Promise<ImportFilePreviewPOJO>}
*/
public commit(
tenantId: number,
importId: string
): Promise<ImportFilePreviewPOJO> {
return this.importAls.runCommit<Promise<ImportFilePreviewPOJO>>(() =>
this.commitAlsRun(tenantId, importId)
);
}
/** /**
* Commits the imported file. * Commits the imported file.
* @param {number} tenantId * @param {number} tenantId
* @param {number} importId * @param {number} importId
* @returns {Promise<ImportFilePreviewPOJO>} * @returns {Promise<ImportFilePreviewPOJO>}
*/ */
public async commit( public async commitAlsRun(
tenantId: number, tenantId: number,
importId: string importId: string
): Promise<ImportFilePreviewPOJO> { ): Promise<ImportFilePreviewPOJO> {

View File

@@ -139,24 +139,25 @@ export default class InventoryService {
) { ) {
const agenda = Container.get('agenda'); const agenda = Container.get('agenda');
const commonJobsQuery = {
name: 'compute-item-cost',
lastRunAt: { $exists: false },
'data.tenantId': tenantId,
'data.itemId': itemId,
};
// Cancel any `compute-item-cost` in the queue has upper starting date // Cancel any `compute-item-cost` in the queue has upper starting date
// with the same given item. // with the same given item.
await agenda.cancel({ await agenda.cancel({
name: 'compute-item-cost', ...commonJobsQuery,
nextRunAt: { $ne: null }, 'data.startingDate': { $lte: startingDate },
'data.tenantId': tenantId,
'data.itemId': itemId,
'data.startingDate': { $gt: startingDate },
}); });
// Retrieve any `compute-item-cost` in the queue has lower starting date // Retrieve any `compute-item-cost` in the queue has lower starting date
// with the same given item. // with the same given item.
const dependsJobs = await agenda.jobs({ const dependsJobs = await agenda.jobs({
name: 'compute-item-cost', ...commonJobsQuery,
nextRunAt: { $ne: null }, 'data.startingDate': { $gte: startingDate },
'data.tenantId': tenantId,
'data.itemId': itemId,
'data.startingDate': { $lte: startingDate },
}); });
// If the depends jobs cleared.
if (dependsJobs.length === 0) { if (dependsJobs.length === 0) {
await agenda.schedule( await agenda.schedule(
config.scheduleComputeItemCost, config.scheduleComputeItemCost,
@@ -172,6 +173,13 @@ export default class InventoryService {
events.inventory.onComputeItemCostJobScheduled, events.inventory.onComputeItemCostJobScheduled,
{ startingDate, itemId, tenantId } as IInventoryItemCostScheduledPayload { startingDate, itemId, tenantId } as IInventoryItemCostScheduledPayload
); );
} else {
// Re-schedule the jobs that have higher date from current moment.
await Promise.all(
dependsJobs.map((job) =>
job.schedule(config.scheduleComputeItemCost).save()
)
);
} }
} }

View File

@@ -1,3 +1,4 @@
import { Mutex } from 'async-mutex';
import { Container, Service, Inject } from 'typedi'; import { Container, Service, Inject } from 'typedi';
import { chain } from 'lodash'; import { chain } from 'lodash';
import moment from 'moment'; import moment from 'moment';
@@ -34,17 +35,26 @@ export class SaleInvoicesCost {
inventoryItemsIds: number[], inventoryItemsIds: number[],
startingDate: Date startingDate: Date
): Promise<void> { ): Promise<void> {
const asyncOpers: Promise<[]>[] = []; const mutex = new Mutex();
inventoryItemsIds.forEach((inventoryItemId: number) => { const asyncOpers = inventoryItemsIds.map(
const oper: Promise<[]> = this.inventoryService.scheduleComputeItemCost( async (inventoryItemId: number) => {
// @todo refactor the lock acquire to be distrbuted using Redis
// and run the cost schedule job after running invoice transaction.
const release = await mutex.acquire();
try {
await this.inventoryService.scheduleComputeItemCost(
tenantId, tenantId,
inventoryItemId, inventoryItemId,
startingDate startingDate
); );
asyncOpers.push(oper); } finally {
}); release();
await Promise.all([...asyncOpers]); }
}
);
await Promise.all(asyncOpers);
} }
/** /**
@@ -86,17 +96,22 @@ export class SaleInvoicesCost {
tenantId: number, tenantId: number,
inventoryTransactions: IInventoryTransaction[] inventoryTransactions: IInventoryTransaction[]
) { ) {
const asyncOpers: Promise<[]>[] = []; const mutex = new Mutex();
const reducedTransactions = this.getMaxDateInventoryTransactions( const reducedTransactions = this.getMaxDateInventoryTransactions(
inventoryTransactions inventoryTransactions
); );
reducedTransactions.forEach((transaction) => { const asyncOpers = reducedTransactions.map(async (transaction) => {
const oper: Promise<[]> = this.inventoryService.scheduleComputeItemCost( const release = await mutex.acquire();
try {
await this.inventoryService.scheduleComputeItemCost(
tenantId, tenantId,
transaction.itemId, transaction.itemId,
transaction.date transaction.date
); );
asyncOpers.push(oper); } finally {
release();
}
}); });
await Promise.all([...asyncOpers]); await Promise.all([...asyncOpers]);
} }

View File

@@ -10,6 +10,7 @@ import {
} from '@/interfaces'; } from '@/interfaces';
import { runAfterTransaction } from '@/services/UnitOfWork/TransactionsHooks'; import { runAfterTransaction } from '@/services/UnitOfWork/TransactionsHooks';
import { SaleInvoicesCost } from '@/services/Sales/Invoices/SalesInvoicesCost'; import { SaleInvoicesCost } from '@/services/Sales/Invoices/SalesInvoicesCost';
import { ImportAls } from '@/services/Import/ImportALS';
@Service() @Service()
export default class InventorySubscriber { export default class InventorySubscriber {
@@ -25,6 +26,9 @@ export default class InventorySubscriber {
@Inject('agenda') @Inject('agenda')
private agenda: any; private agenda: any;
@Inject()
private importAls: ImportAls;
/** /**
* Attaches events with handlers. * Attaches events with handlers.
*/ */
@@ -86,20 +90,17 @@ export default class InventorySubscriber {
private handleScheduleItemsCostOnInventoryTransactionsCreated = async ({ private handleScheduleItemsCostOnInventoryTransactionsCreated = async ({
tenantId, tenantId,
inventoryTransactions, inventoryTransactions,
trx trx,
}: IInventoryTransactionsCreatedPayload) => { }: IInventoryTransactionsCreatedPayload) => {
const inventoryItemsIds = map(inventoryTransactions, 'itemId'); const inImportPreviewScope = this.importAls.isImportPreview();
// Avoid running the cost items job if the async process is in import preview.
if (inImportPreviewScope) return;
runAfterTransaction(trx, async () => {
try {
await this.saleInvoicesCost.computeItemsCostByInventoryTransactions( await this.saleInvoicesCost.computeItemsCostByInventoryTransactions(
tenantId, tenantId,
inventoryTransactions inventoryTransactions
); );
} catch (error) {
console.error(error);
}
});
}; };
/** /**

9
pnpm-lock.yaml generated
View File

@@ -86,6 +86,9 @@ importers:
async: async:
specifier: ^3.2.0 specifier: ^3.2.0
version: 3.2.5 version: 3.2.5
async-mutex:
specifier: ^0.5.0
version: 0.5.0
axios: axios:
specifier: ^1.6.0 specifier: ^1.6.0
version: 1.7.2 version: 1.7.2
@@ -8009,6 +8012,12 @@ packages:
resolution: {integrity: sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==} resolution: {integrity: sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==}
dev: false dev: false
/async-mutex@0.5.0:
resolution: {integrity: sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==}
dependencies:
tslib: 2.6.2
dev: false
/async-settle@1.0.0: /async-settle@1.0.0:
resolution: {integrity: sha512-VPXfB4Vk49z1LHHodrEQ6Xf7W4gg1w0dAPROHngx7qgDjqmIQ+fXmwgGXTW/ITLai0YLSvWepJOP9EVpMnEAcw==} resolution: {integrity: sha512-VPXfB4Vk49z1LHHodrEQ6Xf7W4gg1w0dAPROHngx7qgDjqmIQ+fXmwgGXTW/ITLai0YLSvWepJOP9EVpMnEAcw==}
engines: {node: '>= 0.10'} engines: {node: '>= 0.10'}