refactor: inventory cost process

This commit is contained in:
Ahmed Bouhuolia
2025-03-14 03:51:45 +02:00
parent 197d173db9
commit 08de50e2b1
14 changed files with 346 additions and 96 deletions

View File

@@ -1,4 +1,4 @@
import { Module } from '@nestjs/common';
import { forwardRef, Module } from '@nestjs/common';
import { InventoryCostGLStorage } from './commands/InventoryCostGLStorage.service';
import { RegisterTenancyModel } from '../Tenancy/TenancyModels/Tenancy.module';
import { InventoryCostLotTracker } from './models/InventoryCostLotTracker';
@@ -20,6 +20,9 @@ import { BullModule } from '@nestjs/bullmq';
import { InventoryAverageCostMethodService } from './commands/InventoryAverageCostMethod.service';
import { InventoryItemCostService } from './commands/InventoryCosts.service';
import { InventoryItemOpeningAvgCostService } from './commands/InventoryItemOpeningAvgCost.service';
import { InventoryCostSubscriber } from './subscribers/InventoryCost.subscriber';
import { SaleInvoicesModule } from '../SaleInvoices/SaleInvoices.module';
import { ImportModule } from '../Import/Import.module';
const models = [
RegisterTenancyModel(InventoryCostLotTracker),
@@ -34,6 +37,8 @@ const models = [
BullModule.registerQueue({
name: WriteInventoryTransactionsGLEntriesQueue,
}),
forwardRef(() => SaleInvoicesModule),
ImportModule,
],
providers: [
InventoryCostGLBeforeWriteSubscriber,
@@ -48,7 +53,13 @@ const models = [
InventoryAverageCostMethodService,
InventoryItemCostService,
InventoryItemOpeningAvgCostService,
InventoryCostSubscriber,
],
exports: [
...models,
InventoryTransactionsService,
InventoryItemCostService,
InventoryComputeCostService,
],
exports: [...models, InventoryTransactionsService, InventoryItemCostService],
})
export class InventoryCostModule {}

View File

@@ -1,30 +1,51 @@
import { pick } from 'lodash';
import { Queue } from 'bullmq';
import { ClsService } from 'nestjs-cls';
import Redis from 'ioredis';
import { Inject, Injectable } from '@nestjs/common';
import { Knex } from 'knex';
import { UnitOfWork } from '../../Tenancy/TenancyDB/UnitOfWork.service';
import { Item } from '../../Items/models/Item';
import { SETTINGS_PROVIDER } from '../../Settings/Settings.types';
import { SettingsStore } from '../../Settings/SettingsStore';
import { InventoryTransaction } from '../models/InventoryTransaction';
import { IItemEntryTransactionType } from '../../TransactionItemEntry/ItemEntry.types';
import { ModelObject } from 'objection';
import { ItemEntry } from '../../TransactionItemEntry/models/ItemEntry';
import { TInventoryTransactionDirection } from '../types/InventoryCost.types';
import {
ComputeItemCostQueue,
ComputeItemCostQueueJob,
} from '../types/InventoryCost.types';
import { InventoryAverageCostMethodService } from './InventoryAverageCostMethod.service';
import { TenantModelProxy } from '@/modules/System/models/TenantBaseModel';
import { InjectQueue } from '@nestjs/bullmq';
import { RedisService } from '@liaoliaots/nestjs-redis';
@Injectable()
export class InventoryComputeCostService {
private readonly redisClient: Redis;
/**
* @param {UnitOfWork} uow - Unit of work.
* @param {InventoryAverageCostMethodService} inventoryAverageCostMethod - Inventory average cost method.
* @param {RedisService} redisService - Redis service.
* @param {ClsService} clsService - Cls service.
* @param {Queue} computeItemCostProcessor - Compute item cost processor.
* @param {TenantModelProxy<typeof Item>} itemModel - Item model.
* @param {() => SettingsStore} settingsStore - Settings store.
*/
constructor(
private readonly uow: UnitOfWork,
private readonly inventoryAverageCostMethod: InventoryAverageCostMethodService,
private readonly clsService: ClsService,
private readonly redisService: RedisService,
@InjectQueue(ComputeItemCostQueue)
private readonly computeItemCostProcessor: Queue,
@Inject(Item.name)
private readonly itemModel: TenantModelProxy<typeof Item>,
@Inject(SETTINGS_PROVIDER)
private readonly settingsStore: () => SettingsStore,
) {}
) {
this.redisClient = this.redisService.getOrThrow();
}
/**
* Compute item cost.
@@ -67,63 +88,45 @@ export class InventoryComputeCostService {
/**
* Schedule item cost compute job.
* @param {number} tenantId
* @param {number} itemId
* @param {Date} startingDate
*/
async scheduleComputeItemCost(
itemId: number,
startingDate: Date | string,
) {
// const agenda = Container.get('agenda');
// const commonJobsQuery = {
// name: 'compute-item-cost',
// lastRunAt: { $exists: false },
// 'data.tenantId': tenantId,
// 'data.itemId': itemId,
// };
// // Cancel any `compute-item-cost` in the queue has upper starting date
// // with the same given item.
// await agenda.cancel({
// ...commonJobsQuery,
// 'data.startingDate': { $lte: startingDate },
// });
// // Retrieve any `compute-item-cost` in the queue has lower starting date
// // with the same given item.
// const dependsJobs = await agenda.jobs({
// ...commonJobsQuery,
// 'data.startingDate': { $gte: startingDate },
// });
// // If the depends jobs cleared.
// if (dependsJobs.length === 0) {
// await agenda.schedule(
// this.config.get('inventory.scheduleComputeItemCost'),
// 'compute-item-cost',
// {
// startingDate,
// itemId,
// tenantId,
// },
// );
// // Triggers `onComputeItemCostJobScheduled` event.
// await this.eventEmitter.emitAsync(
// events.inventory.onComputeItemCostJobScheduled,
// {
// startingDate,
// itemId,
// tenantId,
// } as IInventoryItemCostScheduledPayload,
// );
// } else {
// // Re-schedule the jobs that have higher date from current moment.
// await Promise.all(
// dependsJobs.map((job) =>
// job
// .schedule(this.config.get('inventory.scheduleComputeItemCost'))
// .save(),
// ),
// );
// }
async scheduleComputeItemCost(itemId: number, startingDate: Date | string) {
const debounceKey = `inventory-cost-compute-debounce:${itemId}`;
const debounceTime = 1000 * 60; // 1 minute
// Generate a unique job ID or use a custom identifier
const jobId = `task-${Date.now()}-${Math.random().toString(36).substring(2)}`;
// Check if there's an existing debounced job
const existingJobId = await this.redisClient.get(debounceKey);
if (existingJobId) {
// Attempt to remove or mark the previous job as skippable
const existingJob =
await this.computeItemCostProcessor.getJob(existingJobId);
const state = await existingJob?.getState();
if (existingJob && ['waiting', 'delayed'].includes(state)) {
await existingJob.remove(); // Remove the previous job if it's still waiting
}
}
const organizationId = this.clsService.get('organizationId');
const userId = this.clsService.get('userId');
// Add the new job with a delay (debounce period)
const job = await this.computeItemCostProcessor.add(
ComputeItemCostQueueJob,
{ itemId, startingDate, jobId, organizationId, userId },
{
jobId, // Custom job ID
delay: debounceTime, // Delay execution by 1 minute
},
);
// Store the latest job ID in Redis with an expiration
await this.redisClient.set(debounceKey, jobId, 'PX', debounceTime);
return { jobId, message: 'Task added with debounce' };
}
/**

View File

@@ -92,7 +92,7 @@ export class InventoryItemsQuantitySyncService {
const changeQuantityOper = this.itemModel()
.query(trx)
.where({ id: itemQuantity.itemId, type: 'inventory' })
.modify('quantityOnHand', itemQuantity.balanceChange);
.modify('updateQuantityOnHand', itemQuantity.balanceChange);
opers.push(changeQuantityOper);
});

View File

@@ -6,14 +6,18 @@ import { ClsService } from 'nestjs-cls';
import { TenantJobPayload } from '@/interfaces/Tenant';
import { InventoryComputeCostService } from '../commands/InventoryComputeCost.service';
import { events } from '@/common/events/events';
import { ComputeItemCostQueueJob } from '../types/InventoryCost.types';
import {
ComputeItemCostQueue,
ComputeItemCostQueueJob,
} from '../types/InventoryCost.types';
import { Process } from '@nestjs/bull';
interface ComputeItemCostJobPayload extends TenantJobPayload {
itemId: number;
startingDate: Date;
}
@Processor({
name: ComputeItemCostQueueJob,
name: ComputeItemCostQueue,
scope: Scope.REQUEST,
})
export class ComputeItemCostProcessor extends WorkerHost {
@@ -34,9 +38,12 @@ export class ComputeItemCostProcessor extends WorkerHost {
* Process the compute item cost job.
* @param {Job<ComputeItemCostJobPayload>} job - The job to process
*/
@Process(ComputeItemCostQueueJob)
async process(job: Job<ComputeItemCostJobPayload>) {
const { itemId, startingDate, organizationId, userId } = job.data;
console.log(`Compute item cost for item ${itemId} started`);
this.clsService.set('organizationId', organizationId);
this.clsService.set('userId', userId);
@@ -50,6 +57,8 @@ export class ComputeItemCostProcessor extends WorkerHost {
events.inventory.onComputeItemCostJobCompleted,
{ startingDate, itemId, organizationId, userId },
);
console.log(`Compute item cost for item ${itemId} completed`);
} catch (error) {
console.error('Error computing item cost:', error);
throw error;

View File

@@ -14,7 +14,7 @@ import { Injectable } from '@nestjs/common';
import { InventoryComputeCostService } from '../commands/InventoryComputeCost.service';
@Injectable()
export default class InventorySubscriber {
export class InventoryCostSubscriber {
constructor(
private readonly saleInvoicesCost: SaleInvoicesCost,
private readonly itemsQuantitySync: InventoryItemsQuantitySyncService,