feat(workspaces): enhance workspace creation and job management

- Introduced a transaction for creating tenants, linking users, and saving metadata in the `CreateWorkspaceService`.
- Moved tenant marking as building to the `markAsBuilding` method for better separation of concerns.
- Added `WorkspaceBuildJobResponseDto` for improved API response structure.
- Updated `GetWorkspacesService` to utilize a transformer for cleaner data mapping.
- Added unit tests for `CreateWorkspaceService` to ensure robust functionality and error handling.
This commit is contained in:
Ahmed Bouhuolia
2026-04-01 23:32:28 +02:00
parent e968cf646c
commit 1a73d499ee
9 changed files with 525 additions and 55 deletions

View File

@@ -103,8 +103,6 @@ export class BuildOrganizationService {
buildDto: transformedBuildDTO,
} as OrganizationBuildQueueJobPayload,
);
// Marks the tenant as currently building.
await this.tenantRepository.markAsBuilding(jobMeta.id).findById(tenant.id);
return {
delay: jobMeta.delay,
@@ -113,6 +111,15 @@ export class BuildOrganizationService {
};
}
/**
* Marks the tenant as building.
* @param {string} buildJobId - The build job id.
*/
public async markAsBuilding(buildJobId: string) {
const tenant = await this.tenancyContext.getTenant();
await this.tenantRepository.markAsBuilding(buildJobId).findById(tenant.id);
}
/**
* Unlocks tenant build run job.
*/

View File

@@ -24,16 +24,19 @@ export class OrganizationBuildProcessor extends WorkerHost {
async process(job: Job<OrganizationBuildQueueJobPayload>) {
console.log('Processing organization build job:', job.id);
this.clsService.set('organizationId', job.data.organizationId);
this.clsService.set('userId', job.data.userId);
this.clsService.set('organizationId', job.data.organizationId);
this.clsService.set('userId', job.data.userId);
try {
await this.organizationBuildService.build(job.data.buildDto);
} catch (e) {
// Unlock build status of the tenant.
await this.organizationBuildService.revertBuildRunJob();
console.error('Error processing organization build job:', e);
throw e; // Re-throw to mark job as failed
}
// Mark as building INSIDE the job - ensures it only happens when job actually runs
await this.organizationBuildService.markAsBuilding(job.id!);
try {
await this.organizationBuildService.build(job.data.buildDto);
} catch (e) {
// Unlock build status of the tenant.
await this.organizationBuildService.revertBuildRunJob();
console.error('Error processing organization build job:', e);
throw e; // Re-throw to mark job as failed
}
}
}

View File

@@ -1,5 +1,6 @@
import { Inject, Injectable } from '@nestjs/common';
import { Knex } from 'knex';
import { Transaction } from 'objection';
import * as uniqid from 'uniqid';
import * as moment from 'moment';
import { TenantRepository as TenantBaseRepository } from '@/common/repository/TenantRepository';
@@ -31,10 +32,13 @@ export class TenantRepository extends TenantBaseRepository {
/**
* Creates a new tenant with random organization id.
* @param {string} uniqId - Unique id.
* @param {Knex.Transaction} trx - Knex transaction.
*/
createWithUniqueOrgId(uniqId?: string) {
createWithUniqueOrgId(uniqId?: string, trx?: Knex.Transaction) {
const organizationId = uniqid() || uniqId;
return this.model.query().insert({ organizationId });
const query = this.model.query(trx);
return query.insert({ organizationId });
}
/**
@@ -104,15 +108,16 @@ export class TenantRepository extends TenantBaseRepository {
* Saves the metadata of the given tenant.
* @param {number} tenantId - The tenant id.
* @param {Record<string, any>} metadata - The metadata to save.
* @param {Knex.Transaction} trx - Knex transaction.
*/
async saveMetadata(tenantId: number, metadata: Record<string, any>) {
async saveMetadata(tenantId: number, metadata: Record<string, any>, trx?: Knex.Transaction) {
const foundMetadata = await this.tenantMetadataModel
.query()
.query(trx)
.findOne({ tenantId });
const updateOrInsert = foundMetadata ? 'patch' : 'insert';
return this.tenantMetadataModel
.query()
.query(trx)
[updateOrInsert]({
tenantId,
...metadata,

View File

@@ -7,7 +7,7 @@ import {
Param,
Post,
} from '@nestjs/common';
import { ApiOperation, ApiTags } from '@nestjs/swagger';
import { ApiExtraModels, ApiOperation, ApiResponse, ApiTags, getSchemaPath } from '@nestjs/swagger';
import { ClsService } from 'nestjs-cls';
import { TenantAgnosticRoute } from '@/modules/Tenancy/TenancyGlobal.guard';
import { IgnoreUserVerifiedRoute } from '@/modules/Auth/guards/EnsureUserVerified.guard';
@@ -23,9 +23,11 @@ import {
CreateWorkspaceResponseDto,
WorkspaceDto,
} from './dtos/WorkspaceResponse.dto';
import { WorkspaceBuildJobResponseDto } from './dtos/WorkspaceBuildJobResponse.dto';
@ApiTags('Workspaces')
@Controller('workspaces')
@ApiExtraModels(WorkspaceDto, CreateWorkspaceResponseDto, WorkspaceBuildJobResponseDto)
export class WorkspacesController {
constructor(
private readonly createWorkspaceService: CreateWorkspaceService,
@@ -43,6 +45,14 @@ export class WorkspacesController {
@TenantAgnosticRoute()
@IgnoreUserVerifiedRoute()
@ApiOperation({ summary: 'List workspaces the authenticated user belongs to' })
@ApiResponse({
status: 200,
description: 'Returns the list of workspaces',
schema: {
type: 'array',
items: { $ref: getSchemaPath(WorkspaceDto) },
},
})
async listWorkspaces(): Promise<WorkspaceDto[]> {
const userId = this.cls.get<number>('userId');
return this.getWorkspacesService.getWorkspaces(userId);
@@ -58,6 +68,13 @@ export class WorkspacesController {
@IgnoreUserVerifiedRoute()
@HttpCode(200)
@ApiOperation({ summary: 'Create a new workspace' })
@ApiResponse({
status: 200,
description: 'Returns the created workspace details',
schema: {
$ref: getSchemaPath(CreateWorkspaceResponseDto),
},
})
async createWorkspace(
@Body() dto: CreateWorkspaceDto,
): Promise<CreateWorkspaceResponseDto> {
@@ -75,6 +92,10 @@ export class WorkspacesController {
@IgnoreTenantModelsInitialize()
@HttpCode(200)
@ApiOperation({ summary: 'Delete a workspace (owner only)' })
@ApiResponse({
status: 200,
description: 'Workspace deleted successfully',
})
async deleteWorkspace(
@Param('organizationId') organizationId: string,
): Promise<void> {
@@ -89,7 +110,14 @@ export class WorkspacesController {
@Get('build/:buildJobId')
@TenantAgnosticRoute()
@ApiOperation({ summary: 'Get workspace build job status' })
async buildJobStatus(@Param('buildJobId') buildJobId: string) {
@ApiResponse({
status: 200,
description: 'Returns the workspace build job details',
schema: {
$ref: getSchemaPath(WorkspaceBuildJobResponseDto),
},
})
async buildJobStatus(@Param('buildJobId') buildJobId: string): Promise<WorkspaceBuildJobResponseDto> {
return this.getWorkspaceBuildJobService.getJobDetails(buildJobId);
}
}

View File

@@ -0,0 +1,334 @@
import { Test, TestingModule } from '@nestjs/testing';
import { getQueueToken } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { CreateWorkspaceService } from './CreateWorkspace.service';
import { UserTenant } from '@/modules/System/models/UserTenant.model';
import { TenantRepository } from '@/modules/System/repositories/Tenant.repository';
import { OrganizationBuildQueue, OrganizationBuildQueueJob } from '@/modules/Organization/Organization.types';
import { SystemKnexConnection } from '@/modules/System/SystemDB/SystemDB.constants';
// Mock the Organization.utils module
jest.mock('@/modules/Organization/Organization.utils', () => ({
transformBuildDto: jest.fn((dto) => ({
...dto,
dateFormat: dto.dateFormat || 'DD MMM YYYY',
})),
}));
describe('CreateWorkspaceService', () => {
let service: CreateWorkspaceService;
let tenantRepository: jest.Mocked<any>;
let userTenantModel: jest.Mocked<any>;
let organizationBuildQueue: jest.Mocked<Queue>;
let mockKnexTransaction: jest.Mock;
const mockTenant = {
id: 1,
organizationId: 'org_abc123',
initializedAt: null,
seededAt: null,
builtAt: null,
buildJobId: null,
};
const mockJob = {
id: 'job_123',
name: 'organization-build',
data: {},
opts: {},
};
const createMockQuery = () => ({
insert: jest.fn().mockResolvedValue({ id: 1, userId: 1, tenantId: 1, role: 'owner' }),
findById: jest.fn().mockResolvedValue(mockTenant),
update: jest.fn().mockReturnThis(),
where: jest.fn().mockReturnThis(),
});
beforeEach(async () => {
const mockQuery = createMockQuery();
const mockUserTenantModel = {
query: jest.fn().mockReturnValue(mockQuery),
};
const mockTenantRepository = {
createWithUniqueOrgId: jest.fn().mockResolvedValue(mockTenant),
saveMetadata: jest.fn().mockResolvedValue(undefined),
markAsBuilding: jest.fn().mockReturnThis(),
findById: jest.fn().mockResolvedValue(mockTenant),
};
const mockQueue = {
add: jest.fn().mockResolvedValue(mockJob),
};
// Mock knex transaction
mockKnexTransaction = jest.fn(async (callback) => {
const trx = {};
return callback(trx);
});
const mockSystemKnex = {
transaction: mockKnexTransaction,
};
const module: TestingModule = await Test.createTestingModule({
providers: [
CreateWorkspaceService,
{
provide: UserTenant.name,
useValue: mockUserTenantModel,
},
{
provide: TenantRepository,
useValue: mockTenantRepository,
},
{
provide: getQueueToken(OrganizationBuildQueue),
useValue: mockQueue,
},
{
provide: SystemKnexConnection,
useValue: mockSystemKnex,
},
],
}).compile();
service = module.get<CreateWorkspaceService>(CreateWorkspaceService);
tenantRepository = module.get(TenantRepository);
userTenantModel = module.get(UserTenant.name);
organizationBuildQueue = module.get(getQueueToken(OrganizationBuildQueue));
});
afterEach(() => {
jest.clearAllMocks();
});
describe('createWorkspace', () => {
const userId = 1;
const dto = {
name: 'Test Organization',
baseCurrency: 'USD',
location: 'US',
timezone: 'America/New_York',
fiscalYear: 'January',
language: 'en-US',
industry: 'Technology',
};
it('should create a workspace successfully', async () => {
const result = await service.createWorkspace(userId, dto);
expect(result).toEqual({
organizationId: mockTenant.organizationId,
jobId: mockJob.id,
});
});
it('should wrap database operations in a transaction', async () => {
await service.createWorkspace(userId, dto);
expect(mockKnexTransaction).toHaveBeenCalledTimes(1);
});
it('should create a new tenant with unique organization id within transaction', async () => {
await service.createWorkspace(userId, dto);
expect(tenantRepository.createWithUniqueOrgId).toHaveBeenCalledTimes(1);
expect(tenantRepository.createWithUniqueOrgId).toHaveBeenCalledWith(undefined, expect.anything());
});
it('should link the user as owner of the workspace within transaction', async () => {
await service.createWorkspace(userId, dto);
expect(userTenantModel.query).toHaveBeenCalled();
// First call should be with the transaction object
expect(userTenantModel.query.mock.calls[0][0]).toBeDefined();
});
it('should save organization metadata within transaction', async () => {
await service.createWorkspace(userId, dto);
expect(tenantRepository.saveMetadata).toHaveBeenCalledWith(
mockTenant.id,
expect.objectContaining({
name: dto.name,
baseCurrency: dto.baseCurrency,
location: dto.location,
timezone: dto.timezone,
fiscalYear: dto.fiscalYear,
language: dto.language,
industry: dto.industry,
dateFormat: 'DD MMM YYYY',
}),
expect.anything(), // transaction object
);
});
it('should enqueue the organization build job outside the transaction', async () => {
const callOrder: string[] = [];
mockKnexTransaction.mockImplementationOnce(async (callback) => {
const trx = {};
const result = await callback(trx);
callOrder.push('transactionCommitted');
return result;
});
(organizationBuildQueue.add as jest.Mock).mockImplementationOnce(async () => {
callOrder.push('enqueueJob');
return mockJob;
});
await service.createWorkspace(userId, dto);
expect(callOrder).toEqual(['transactionCommitted', 'enqueueJob']);
});
it('should return organization id and job id', async () => {
const result = await service.createWorkspace(userId, dto);
expect(result).toHaveProperty('organizationId');
expect(result).toHaveProperty('jobId');
expect(result.organizationId).toBe(mockTenant.organizationId);
expect(result.jobId).toBe(mockJob.id);
});
it('should handle tenant creation failure and rollback transaction', async () => {
tenantRepository.createWithUniqueOrgId.mockRejectedValueOnce(
new Error('Database error'),
);
await expect(service.createWorkspace(userId, dto)).rejects.toThrow(
'Database error',
);
});
it('should handle user tenant linking failure and rollback transaction', async () => {
const mockQuery = createMockQuery();
mockQuery.insert.mockRejectedValueOnce(new Error('Linking error'));
userTenantModel.query.mockReturnValueOnce(mockQuery);
await expect(service.createWorkspace(userId, dto)).rejects.toThrow(
'Linking error',
);
});
it('should handle metadata save failure and rollback transaction', async () => {
tenantRepository.saveMetadata.mockRejectedValueOnce(
new Error('Metadata save error'),
);
await expect(service.createWorkspace(userId, dto)).rejects.toThrow(
'Metadata save error',
);
});
it('should not enqueue job if transaction fails', async () => {
tenantRepository.createWithUniqueOrgId.mockRejectedValueOnce(
new Error('Database error'),
);
await expect(service.createWorkspace(userId, dto)).rejects.toThrow(
'Database error',
);
expect(organizationBuildQueue.add).not.toHaveBeenCalled();
});
it('should handle queue add failure after successful transaction', async () => {
organizationBuildQueue.add.mockRejectedValueOnce(
new Error('Queue error'),
);
// Transaction should succeed but then queue add should fail
await expect(service.createWorkspace(userId, dto)).rejects.toThrow(
'Queue error',
);
// Transaction should have completed
expect(tenantRepository.createWithUniqueOrgId).toHaveBeenCalled();
});
it('should work with minimal DTO (only required fields)', async () => {
const minimalDto = {
name: 'Minimal Org',
baseCurrency: 'EUR',
location: 'DE',
timezone: 'Europe/Berlin',
fiscalYear: 'January',
language: 'en-US',
};
const result = await service.createWorkspace(userId, minimalDto);
expect(result.organizationId).toBe(mockTenant.organizationId);
expect(tenantRepository.saveMetadata).toHaveBeenCalledWith(
mockTenant.id,
expect.objectContaining({
name: minimalDto.name,
baseCurrency: minimalDto.baseCurrency,
location: minimalDto.location,
timezone: minimalDto.timezone,
fiscalYear: minimalDto.fiscalYear,
language: minimalDto.language,
dateFormat: 'DD MMM YYYY',
}),
expect.anything(),
);
});
it('should preserve custom date format if provided', async () => {
const dtoWithDateFormat = {
...dto,
dateFormat: 'MM/DD/YYYY',
};
await service.createWorkspace(userId, dtoWithDateFormat);
expect(tenantRepository.saveMetadata).toHaveBeenCalledWith(
mockTenant.id,
expect.objectContaining({
dateFormat: 'MM/DD/YYYY',
}),
expect.anything(),
);
});
it('should call all operations in correct sequence', async () => {
const callOrder: string[] = [];
(tenantRepository.createWithUniqueOrgId as jest.Mock).mockImplementationOnce(async () => {
callOrder.push('createTenant');
return mockTenant;
});
(userTenantModel.query as jest.Mock).mockImplementationOnce(() => {
callOrder.push('linkUser');
return {
insert: jest.fn().mockResolvedValue({ id: 1 }),
};
});
(tenantRepository.saveMetadata as jest.Mock).mockImplementationOnce(async () => {
callOrder.push('saveMetadata');
return 1;
});
mockKnexTransaction.mockImplementationOnce(async (callback) => {
const trx = {};
await callback(trx);
callOrder.push('transactionCommitted');
return mockTenant;
});
(organizationBuildQueue.add as jest.Mock).mockImplementationOnce(async () => {
callOrder.push('enqueueJob');
return mockJob;
});
await service.createWorkspace(userId, dto);
expect(callOrder).toEqual(['createTenant', 'linkUser', 'saveMetadata', 'transactionCommitted', 'enqueueJob']);
});
});
});

View File

@@ -1,6 +1,7 @@
import { Queue } from 'bullmq';
import { Inject, Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Knex } from 'knex';
import { UserTenant } from '@/modules/System/models/UserTenant.model';
import { TenantRepository } from '@/modules/System/repositories/Tenant.repository';
import {
@@ -9,6 +10,7 @@ import {
OrganizationBuildQueueJobPayload,
} from '@/modules/Organization/Organization.types';
import { transformBuildDto } from '@/modules/Organization/Organization.utils';
import { SystemKnexConnection } from '@/modules/System/SystemDB/SystemDB.constants';
import { CreateWorkspaceDto } from '../dtos/CreateWorkspace.dto';
import { CreateWorkspaceResponseDto } from '../dtos/WorkspaceResponse.dto';
@@ -22,34 +24,44 @@ export class CreateWorkspaceService {
@InjectQueue(OrganizationBuildQueue)
private readonly organizationBuildQueue: Queue,
@Inject(SystemKnexConnection)
private readonly systemKnex: Knex,
) {}
/**
* Creates a new workspace (organization) for the authenticated user.
* - Creates a new tenant row with a unique organizationId.
* - Links the user as owner via user_tenants.
* - Saves organization metadata.
* - Enqueues the tenant database build job.
* - Creates a new tenant row with a unique organizationId (in transaction).
* - Links the user as owner via user_tenants (in transaction).
* - Saves organization metadata (in transaction).
* - Enqueues the tenant database build job (outside transaction).
*/
async createWorkspace(
userId: number,
dto: CreateWorkspaceDto,
): Promise<CreateWorkspaceResponseDto> {
// Create the new tenant row.
const tenant = await this.tenantRepository.createWithUniqueOrgId();
const transformedDto = transformBuildDto(dto);
// Link the authenticated user as the owner of this new workspace.
await this.userTenantModel.query().insert({
userId,
tenantId: tenant.id,
role: 'owner',
// Wrap tenant creation, user linking, and metadata save in a transaction.
// The job enqueue happens outside the transaction since it's async.
const tenant = await this.systemKnex.transaction(async (trx) => {
// Create the new tenant row.
const tenant = await this.tenantRepository.createWithUniqueOrgId(undefined, trx);
// Link the authenticated user as the owner of this new workspace.
await this.userTenantModel.query(trx).insert({
userId,
tenantId: tenant.id,
role: 'owner',
});
// Persist the organization metadata.
await this.tenantRepository.saveMetadata(tenant.id, transformedDto, trx);
return tenant;
});
// Transform and persist the organization metadata.
const transformedDto = transformBuildDto(dto);
await this.tenantRepository.saveMetadata(tenant.id, transformedDto);
// Enqueue the build job using the same queue and processor as the existing flow.
// Enqueue the build job outside the transaction.
// This ensures the DB changes are committed before the job starts processing.
const jobMeta = await this.organizationBuildQueue.add(
OrganizationBuildQueueJob,
{
@@ -59,9 +71,6 @@ export class CreateWorkspaceService {
} as OrganizationBuildQueueJobPayload,
);
// Mark the tenant as currently building.
await this.tenantRepository.markAsBuilding(jobMeta.id).findById(tenant.id);
return {
organizationId: tenant.organizationId,
jobId: jobMeta.id,

View File

@@ -0,0 +1,24 @@
import { ApiProperty } from '@nestjs/swagger';
export class WorkspaceBuildJobResponseDto {
@ApiProperty({ example: '123' })
id: string;
@ApiProperty({ example: 'active' })
state: string;
@ApiProperty({ example: 50 })
progress: number | Record<string, unknown>;
@ApiProperty({ example: false })
isCompleted: boolean;
@ApiProperty({ example: true })
isRunning: boolean;
@ApiProperty({ example: false })
isWaiting: boolean;
@ApiProperty({ example: false })
isFailed: boolean;
}

View File

@@ -1,6 +1,7 @@
import { Inject, Injectable } from '@nestjs/common';
import { UserTenant } from '@/modules/System/models/UserTenant.model';
import { WorkspaceDto } from '../dtos/WorkspaceResponse.dto';
import { WorkspaceTransformer } from '../transformers/WorkspaceTransformer';
@Injectable()
export class GetWorkspacesService {
@@ -19,22 +20,7 @@ export class GetWorkspacesService {
.where('userId', userId)
.withGraphFetched('tenant.metadata');
return memberships.map((m) => ({
organizationId: m.tenant.organizationId,
isReady: m.tenant.isReady,
isBuildRunning: m.tenant.isBuildRunning,
buildJobId: m.tenant.buildJobId ?? undefined,
role: m.role,
metadata: m.tenant.metadata
? {
name: m.tenant.metadata.name,
baseCurrency: m.tenant.metadata.baseCurrency,
industry: m.tenant.metadata.industry,
location: m.tenant.metadata.location,
timezone: m.tenant.metadata.timezone,
language: m.tenant.metadata.language,
}
: undefined,
}));
const transformer = new WorkspaceTransformer();
return memberships.map((membership) => transformer.transform(membership));
}
}

View File

@@ -0,0 +1,74 @@
import { Transformer } from '@/modules/Transformer/Transformer';
import { UserTenant } from '@/modules/System/models/UserTenant.model';
import { WorkspaceDto } from '../dtos/WorkspaceResponse.dto';
/**
* Transforms UserTenant (workspace membership) to WorkspaceDto.
*/
export class WorkspaceTransformer extends Transformer<UserTenant> {
/**
* Include these attributes in the transformed output.
*/
public includeAttributes = (): string[] => {
return ['organizationId', 'isReady', 'isBuildRunning', 'buildJobId', 'role', 'metadata'];
};
/**
* Extract organizationId from tenant relation.
*/
protected organizationId = (membership: UserTenant): string => {
return membership.tenant?.organizationId;
};
/**
* Extract isReady from tenant relation.
*/
protected isReady = (membership: UserTenant): boolean => {
return membership.tenant?.isReady ?? false;
};
/**
* Extract isBuildRunning from tenant relation.
*/
protected isBuildRunning = (membership: UserTenant): boolean => {
return membership.tenant?.isBuildRunning ?? false;
};
/**
* Extract buildJobId from tenant relation.
*/
protected buildJobId = (membership: UserTenant): string | undefined => {
return membership.tenant?.buildJobId ?? undefined;
};
/**
* Transform metadata from tenant relation.
*/
protected metadata = (membership: UserTenant) => {
const metadata = membership.tenant?.metadata;
if (!metadata) return undefined;
return {
name: metadata.name,
baseCurrency: metadata.baseCurrency,
industry: metadata.industry,
location: metadata.location,
timezone: metadata.timezone,
language: metadata.language,
};
};
/**
* Transform single membership to WorkspaceDto.
*/
transform = (membership: UserTenant): WorkspaceDto => {
return {
organizationId: this.organizationId(membership),
isReady: this.isReady(membership),
isBuildRunning: this.isBuildRunning(membership),
buildJobId: this.buildJobId(membership),
role: membership.role,
metadata: this.metadata(membership),
};
};
}