Debounce scheduling calculating items cost

This commit is contained in:
Ahmed Bouhuolia
2024-08-28 21:25:47 +02:00
parent 161d60393a
commit fc6ebfea5c
6 changed files with 61 additions and 39 deletions

View File

@@ -37,6 +37,7 @@
"agendash": "^3.1.0",
"app-root-path": "^3.0.0",
"async": "^3.2.0",
"async-mutex": "^0.5.0",
"axios": "^1.6.0",
"babel-loader": "^9.1.2",
"bcryptjs": "^2.4.3",

View File

@@ -139,24 +139,26 @@ export default class InventoryService {
) {
const agenda = Container.get('agenda');
const commonJobsQuery = {
name: 'compute-item-cost',
lastRunAt: { $exists: false },
'data.tenantId': tenantId,
'data.itemId': itemId,
};
// Cancel any `compute-item-cost` in the queue has upper starting date
// with the same given item.
await agenda.cancel({
name: 'compute-item-cost',
nextRunAt: { $ne: null },
'data.tenantId': tenantId,
'data.itemId': itemId,
'data.startingDate': { $gt: startingDate },
...commonJobsQuery,
'data.startingDate': { $lte: startingDate },
});
// Retrieve any `compute-item-cost` in the queue has lower starting date
// with the same given item.
const dependsJobs = await agenda.jobs({
name: 'compute-item-cost',
nextRunAt: { $ne: null },
'data.tenantId': tenantId,
'data.itemId': itemId,
'data.startingDate': { $lte: startingDate },
...commonJobsQuery,
'data.startingDate': { $gte: startingDate },
});
// If the depends jobs cleared.
if (dependsJobs.length === 0) {
await agenda.schedule(
config.scheduleComputeItemCost,

View File

@@ -1,3 +1,4 @@
import { Mutex } from 'async-mutex';
import { Container, Service, Inject } from 'typedi';
import { chain } from 'lodash';
import moment from 'moment';
@@ -34,17 +35,26 @@ export class SaleInvoicesCost {
inventoryItemsIds: number[],
startingDate: Date
): Promise<void> {
const asyncOpers: Promise<[]>[] = [];
const mutex = new Mutex();
inventoryItemsIds.forEach((inventoryItemId: number) => {
const oper: Promise<[]> = this.inventoryService.scheduleComputeItemCost(
tenantId,
inventoryItemId,
startingDate
);
asyncOpers.push(oper);
});
await Promise.all([...asyncOpers]);
const asyncOpers = inventoryItemsIds.map(
async (inventoryItemId: number) => {
// @todo refactor the lock acquire to be distrbuted using Redis
// and run the cost schedule job after running invoice transaction.
const release = await mutex.acquire();
try {
await this.inventoryService.scheduleComputeItemCost(
tenantId,
inventoryItemId,
startingDate
);
} finally {
release();
}
}
);
await Promise.all(asyncOpers);
}
/**
@@ -86,17 +96,22 @@ export class SaleInvoicesCost {
tenantId: number,
inventoryTransactions: IInventoryTransaction[]
) {
const asyncOpers: Promise<[]>[] = [];
const mutex = new Mutex();
const reducedTransactions = this.getMaxDateInventoryTransactions(
inventoryTransactions
);
reducedTransactions.forEach((transaction) => {
const oper: Promise<[]> = this.inventoryService.scheduleComputeItemCost(
tenantId,
transaction.itemId,
transaction.date
);
asyncOpers.push(oper);
const asyncOpers = reducedTransactions.map(async (transaction) => {
const release = await mutex.acquire();
try {
await this.inventoryService.scheduleComputeItemCost(
tenantId,
transaction.itemId,
transaction.date
);
} finally {
release();
}
});
await Promise.all([...asyncOpers]);
}

View File

@@ -86,20 +86,14 @@ export default class InventorySubscriber {
private handleScheduleItemsCostOnInventoryTransactionsCreated = async ({
tenantId,
inventoryTransactions,
trx
trx,
}: IInventoryTransactionsCreatedPayload) => {
const inventoryItemsIds = map(inventoryTransactions, 'itemId');
runAfterTransaction(trx, async () => {
try {
await this.saleInvoicesCost.computeItemsCostByInventoryTransactions(
tenantId,
inventoryTransactions
);
} catch (error) {
console.error(error);
}
});
await this.saleInvoicesCost.computeItemsCostByInventoryTransactions(
tenantId,
inventoryTransactions
);
};
/**

View File

@@ -6,6 +6,7 @@ import {
ISaleInvoiceEditedPayload,
} from '@/interfaces';
import { SaleInvoiceGLEntries } from '@/services/Sales/Invoices/InvoiceGLEntries';
import { runAfterTransaction } from '@/services/UnitOfWork/TransactionsHooks';
@Service()
export default class SaleInvoiceWriteGLEntriesSubscriber {

9
pnpm-lock.yaml generated
View File

@@ -86,6 +86,9 @@ importers:
async:
specifier: ^3.2.0
version: 3.2.5
async-mutex:
specifier: ^0.5.0
version: 0.5.0
axios:
specifier: ^1.6.0
version: 1.7.2
@@ -8009,6 +8012,12 @@ packages:
resolution: {integrity: sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==}
dev: false
/async-mutex@0.5.0:
resolution: {integrity: sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==}
dependencies:
tslib: 2.6.2
dev: false
/async-settle@1.0.0:
resolution: {integrity: sha512-VPXfB4Vk49z1LHHodrEQ6Xf7W4gg1w0dAPROHngx7qgDjqmIQ+fXmwgGXTW/ITLai0YLSvWepJOP9EVpMnEAcw==}
engines: {node: '>= 0.10'}