fix: running compute item cost processor

This commit is contained in:
Ahmed Bouhuolia
2025-12-28 12:30:06 +02:00
parent 872fc661ce
commit 30d8fdb4c0
2 changed files with 16 additions and 9 deletions

View File

@@ -93,7 +93,7 @@ export class InventoryComputeCostService {
*/ */
async scheduleComputeItemCost(itemId: number, startingDate: Date | string) { async scheduleComputeItemCost(itemId: number, startingDate: Date | string) {
const debounceKey = `inventory-cost-compute-debounce:${itemId}`; const debounceKey = `inventory-cost-compute-debounce:${itemId}`;
const debounceTime = 1000 * 60; // 1 minute const debounceTime = 1000 * 10; // 10 seconds
// Generate a unique job ID or use a custom identifier // Generate a unique job ID or use a custom identifier
const jobId = `task-${Date.now()}-${Math.random().toString(36).substring(2)}`; const jobId = `task-${Date.now()}-${Math.random().toString(36).substring(2)}`;

View File

@@ -2,7 +2,8 @@ import { EventEmitter2 } from '@nestjs/event-emitter';
import { Processor, WorkerHost } from '@nestjs/bullmq'; import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Scope } from '@nestjs/common'; import { Scope } from '@nestjs/common';
import { Job } from 'bullmq'; import { Job } from 'bullmq';
import { ClsService } from 'nestjs-cls'; import { ClsService, UseCls } from 'nestjs-cls';
import * as moment from 'moment';
import { TenantJobPayload } from '@/interfaces/Tenant'; import { TenantJobPayload } from '@/interfaces/Tenant';
import { InventoryComputeCostService } from '../commands/InventoryComputeCost.service'; import { InventoryComputeCostService } from '../commands/InventoryComputeCost.service';
import { events } from '@/common/events/events'; import { events } from '@/common/events/events';
@@ -14,7 +15,7 @@ import { Process } from '@nestjs/bull';
interface ComputeItemCostJobPayload extends TenantJobPayload { interface ComputeItemCostJobPayload extends TenantJobPayload {
itemId: number; itemId: number;
startingDate: Date; startingDate: Date | string;
} }
@Processor({ @Processor({
name: ComputeItemCostQueue, name: ComputeItemCostQueue,
@@ -39,28 +40,34 @@ export class ComputeItemCostProcessor extends WorkerHost {
* @param {Job<ComputeItemCostJobPayload>} job - The job to process * @param {Job<ComputeItemCostJobPayload>} job - The job to process
*/ */
@Process(ComputeItemCostQueueJob) @Process(ComputeItemCostQueueJob)
@UseCls()
async process(job: Job<ComputeItemCostJobPayload>) { async process(job: Job<ComputeItemCostJobPayload>) {
const { itemId, startingDate, organizationId, userId } = job.data; const { itemId, startingDate, organizationId, userId } = job.data;
console.log(`Compute item cost for item ${itemId} started`); // Parse startingDate using moment to handle both Date and string formats
const startingDateObj = moment(startingDate).toDate();
console.log(`[info] Compute item cost for item ${itemId} started`, {
payload: job.data,
jobId: job.id
});
this.clsService.set('organizationId', organizationId); this.clsService.set('organizationId', organizationId);
this.clsService.set('userId', userId); this.clsService.set('userId', userId);
try { try {
await this.inventoryComputeCostService.computeItemCost( await this.inventoryComputeCostService.computeItemCost(
startingDate, startingDateObj,
itemId, itemId,
); );
// Emit job completed event // Emit job completed event
await this.eventEmitter.emitAsync( await this.eventEmitter.emitAsync(
events.inventory.onComputeItemCostJobCompleted, events.inventory.onComputeItemCostJobCompleted,
{ startingDate, itemId, organizationId, userId }, { startingDate: startingDateObj, itemId, organizationId, userId },
); );
console.log(`[info] Compute item cost for item ${itemId} completed successfully`);
console.log(`Compute item cost for item ${itemId} completed`);
} catch (error) { } catch (error) {
console.error('Error computing item cost:', error); console.error(`[error] Error computing item cost for item ${itemId}:`, error);
console.error('Error stack:', error instanceof Error ? error.stack : 'No stack trace');
throw error; throw error;
} }
} }