feat: import resources from csv/xlsx

This commit is contained in:
Ahmed Bouhuolia
2024-03-11 00:21:36 +02:00
parent 1fc6445123
commit 90b4f3ef6d
16 changed files with 467 additions and 184 deletions

View File

@@ -16,6 +16,7 @@ import { ActivateAccount } from './ActivateAccount';
import { GetAccounts } from './GetAccounts';
import { GetAccount } from './GetAccount';
import { GetAccountTransactions } from './GetAccountTransactions';
import { Knex } from 'knex';
@Service()
export class AccountsApplication {
@@ -48,9 +49,10 @@ export class AccountsApplication {
*/
public createAccount = (
tenantId: number,
accountDTO: IAccountCreateDTO
accountDTO: IAccountCreateDTO,
trx?: Knex.Transaction
): Promise<IAccount> => {
return this.createAccountService.createAccount(tenantId, accountDTO);
return this.createAccountService.createAccount(tenantId, accountDTO, trx);
};
/**

View File

@@ -97,13 +97,14 @@ export class CreateAccount {
/**
* Creates a new account on the storage.
* @param {number} tenantId
* @param {IAccountCreateDTO} accountDTO
* @param {number} tenantId
* @param {IAccountCreateDTO} accountDTO
* @returns {Promise<IAccount>}
*/
public createAccount = async (
tenantId: number,
accountDTO: IAccountCreateDTO
accountDTO: IAccountCreateDTO,
trx?: Knex.Transaction
): Promise<IAccount> => {
const { Account } = this.tenancy.models(tenantId);
@@ -119,27 +120,31 @@ export class CreateAccount {
tenantMeta.baseCurrency
);
// Creates a new account with associated transactions under unit-of-work envirement.
return this.uow.withTransaction(tenantId, async (trx: Knex.Transaction) => {
// Triggers `onAccountCreating` event.
await this.eventPublisher.emitAsync(events.accounts.onCreating, {
tenantId,
accountDTO,
trx,
} as IAccountEventCreatingPayload);
return this.uow.withTransaction(
tenantId,
async (trx: Knex.Transaction) => {
// Triggers `onAccountCreating` event.
await this.eventPublisher.emitAsync(events.accounts.onCreating, {
tenantId,
accountDTO,
trx,
} as IAccountEventCreatingPayload);
// Inserts account to the storage.
const account = await Account.query(trx).insertAndFetch({
...accountInputModel,
});
// Triggers `onAccountCreated` event.
await this.eventPublisher.emitAsync(events.accounts.onCreated, {
tenantId,
account,
accountId: account.id,
trx,
} as IAccountEventCreatedPayload);
// Inserts account to the storage.
const account = await Account.query(trx).insertAndFetch({
...accountInputModel,
});
// Triggers `onAccountCreated` event.
await this.eventPublisher.emitAsync(events.accounts.onCreated, {
tenantId,
account,
accountId: account.id,
trx,
} as IAccountEventCreatedPayload);
return account;
});
return account;
},
trx
);
};
}

View File

@@ -1,15 +1,15 @@
import { DATATYPES_LENGTH } from '@/data/DataTypes';
import { IsInt, IsOptional, IsString, Length, Min, Max } from 'class-validator';
import { IsInt, IsOptional, IsString, Length, Min, Max, IsNotEmpty } from 'class-validator';
export class AccountDTOSchema {
@IsString()
@Length(3, DATATYPES_LENGTH.STRING)
@IsNotEmpty()
name: string;
// @IsString()
// @IsInt()
@IsString()
@IsOptional()
// @Length(3, 6)
@Length(3, 6)
code?: string;
@IsOptional()
@@ -17,6 +17,7 @@ export class AccountDTOSchema {
@IsString()
@Length(3, DATATYPES_LENGTH.STRING)
@IsNotEmpty()
accountType: string;
@IsString()

View File

@@ -0,0 +1,49 @@
import { IAccountCreateDTO } from '@/interfaces';
import { AccountsApplication } from '../Accounts/AccountsApplication';
import { AccountDTOSchema } from '../Accounts/CreateAccountDTOSchema';
import { Inject, Service } from 'typedi';
import { Knex } from 'knex';
@Service()
export class AccountsImportable {
@Inject()
private accountsApp: AccountsApplication;
/**
*
* @param {number} tenantId
* @param {IAccountCreateDTO} createAccountDTO
* @returns
*/
public importable(
tenantId: number,
createAccountDTO: IAccountCreateDTO,
trx?: Knex.Transaction
) {
return this.accountsApp.createAccount(tenantId, createAccountDTO, trx);
}
/**
*
* @returns {}
*/
public validation() {
return AccountDTOSchema;
}
/**
*
* @param data
* @returns
*/
public transform(data) {
return {
...data,
accountType: this.mapAccountType(data.accounType),
};
}
mapAccountType(accountType: string) {
return 'Cash';
}
}

View File

@@ -0,0 +1,36 @@
import { Inject, Service } from 'typedi';
import HasTenancyService from '../Tenancy/TenancyService';
import { ImportMappingAttr } from './interfaces';
@Service()
export class ImportFileMapping {
@Inject()
private tenancy: HasTenancyService;
/**
* Mapping the excel sheet columns with resource columns.
* @param {number} tenantId
* @param {number} importId
* @param {ImportMappingAttr} maps
*/
public async mapping(
tenantId: number,
importId: number,
maps: ImportMappingAttr[]
) {
const { Import } = this.tenancy.models(tenantId);
const importFile = await Import.query()
.findOne('filename', importId)
.throwIfNotFound();
// @todo validate the resource columns.
// @todo validate the sheet columns.
const mappingStringified = JSON.stringify(maps);
await Import.query().findById(importFile.id).patch({
mapping: mappingStringified,
});
}
}

View File

@@ -0,0 +1,11 @@
import { Service } from 'typedi';
@Service()
export class ImportFilePreview {
/**
*
* @param {number} tenantId
* @param {number} importId
*/
public preview(tenantId: number, importId: number) {}
}

View File

@@ -0,0 +1,160 @@
import { Inject, Service } from 'typedi';
import * as R from 'ramda';
import XLSX from 'xlsx';
import { first, isUndefined } from 'lodash';
import bluebird from 'bluebird';
import HasTenancyService from '../Tenancy/TenancyService';
import { trimObject } from './_utils';
import { ImportMappingAttr, ImportValidationError } from './interfaces';
import { AccountsImportable } from './AccountsImportable';
import { plainToInstance } from 'class-transformer';
import { validate } from 'class-validator';
import UnitOfWork from '../UnitOfWork';
import { Knex } from 'knex';
const fs = require('fs').promises;
@Service()
export class ImportFileProcess {
@Inject()
private tenancy: HasTenancyService;
@Inject()
private importable: AccountsImportable;
@Inject()
private uow: UnitOfWork;
/**
* Reads the import file.
* @param {string} filename
* @returns {Promise<Buffer>}
*/
public readImportFile(filename: string) {
return fs.readFile(`public/imports/${filename}`);
}
/**
* Maps the columns of the imported data based on the provided mapping attributes.
* @param {Record<string, any>[]} body - The array of data objects to map.
* @param {ImportMappingAttr[]} map - The mapping attributes.
* @returns {Record<string, any>[]} - The mapped data objects.
*/
public parseXlsxSheet(buffer) {
const workbook = XLSX.read(buffer, { type: 'buffer' });
const firstSheetName = workbook.SheetNames[0];
const worksheet = workbook.Sheets[firstSheetName];
return XLSX.utils.sheet_to_json(worksheet);
}
/**
* Sanitizes the data in the imported sheet by trimming object keys.
* @param json - The JSON data representing the imported sheet.
* @returns {string[][]} - The sanitized data with trimmed object keys.
*/
public sanitizeSheetData(json) {
return R.compose(R.map(Object.keys), R.map(trimObject))(json);
}
/**
* Maps the columns of the imported data based on the provided mapping attributes.
* @param {Record<string, any>[]} body - The array of data objects to map.
* @param {ImportMappingAttr[]} map - The mapping attributes.
* @returns {Record<string, any>[]} - The mapped data objects.
*/
private mapSheetColumns(
body: Record<string, any>[],
map: ImportMappingAttr[]
): Record<string, any>[] {
return body.map((item) => {
const newItem = {};
map
.filter((mapping) => !isUndefined(item[mapping.from]))
.forEach((mapping) => {
newItem[mapping.to] = item[mapping.from];
});
return newItem;
});
}
/**
* Validates the given mapped DTOs and returns errors with their index.
* @param {Record<string, any>} mappedDTOs
* @returns {Promise<ImportValidationError[][]>}
*/
private async validateData(
mappedDTOs: Record<string, any>
): Promise<ImportValidationError[][]> {
const validateData = async (data, index: number) => {
const account = { ...data };
const accountClass = plainToInstance(
this.importable.validation(),
account
);
const errors = await validate(accountClass);
if (errors?.length > 0) {
return errors.map((error) => ({
index,
property: error.property,
constraints: error.constraints,
}));
}
return false;
};
const errors = await bluebird.map(mappedDTOs, validateData, {
concurrency: 20,
});
return errors.filter((error) => error !== false);
}
/**
* Transfomees the mapped DTOs.
* @param DTOs
* @returns
*/
private transformDTOs(DTOs) {
return DTOs.map((DTO) => this.importable.transform(DTO));
}
/**
* Process
* @param {number} tenantId
* @param {number} importId
*/
public async process(
tenantId: number,
importId: number,
settings = { skipErrors: true }
) {
const { Import } = this.tenancy.models(tenantId);
const importFile = await Import.query()
.findOne('importId', importId)
.throwIfNotFound();
const buffer = await this.readImportFile(importFile.filename);
const jsonData = this.parseXlsxSheet(buffer);
const data = this.sanitizeSheetData(jsonData);
const header = first(data);
const body = jsonData;
const mappedDTOs = this.mapSheetColumns(body, importFile.mappingParsed);
const transformedDTOs = this.transformDTOs(mappedDTOs);
// Validate the mapped DTOs.
const errors = await this.validateData(transformedDTOs);
return this.uow.withTransaction(tenantId, async (trx: Knex.Transaction) => {
await bluebird.map(
transformedDTOs,
(transformedDTO) =>
this.importable.importable(tenantId, transformedDTO, trx),
{ concurrency: 10 }
);
});
}
}

View File

@@ -26,6 +26,7 @@ export class ImportFileUploadService {
filename: string
) {
const { Import } = this.tenancy.models(tenantId);
const buffer = await fs.readFile(filePath);
const workbook = XLSX.read(buffer, { type: 'buffer' });
@@ -33,6 +34,7 @@ export class ImportFileUploadService {
const worksheet = workbook.Sheets[firstSheetName];
const jsonData = XLSX.utils.sheet_to_json(worksheet);
// @todo validate the resource.
const _resource = upperFirst(snakeCase(resource));
const exportFile = await Import.query().insert({
@@ -42,8 +44,9 @@ export class ImportFileUploadService {
});
const columns = this.getColumns(jsonData);
// @todo return the resource importable columns.
return {
...exportFile,
export: exportFile,
columns,
};
}

View File

@@ -1,11 +1,24 @@
import { Inject } from 'typedi';
import { ImportFileUploadService } from './ImportFileUpload';
import { ImportFileMapping } from './ImportFileMapping';
import { ImportMappingAttr } from './interfaces';
import { ImportFileProcess } from './ImportFileProcess';
import { ImportFilePreview } from './ImportFilePreview';
@Inject()
export class ImportResourceApplication {
@Inject()
private importFileService: ImportFileUploadService;
@Inject()
private importMappingService: ImportFileMapping;
@Inject()
private importProcessService: ImportFileProcess;
@Inject()
private ImportFilePreviewService: ImportFilePreview;
/**
* Reads the imported file and stores the import file meta under unqiue id.
* @param {number} tenantId -
@@ -26,4 +39,38 @@ export class ImportResourceApplication {
filename
);
}
/**
* Mapping the excel sheet columns with resource columns.
* @param {number} tenantId
* @param {number} importId
* @param {ImportMappingAttr} maps
*/
public async mapping(
tenantId: number,
importId: number,
maps: ImportMappingAttr[]
) {
return this.importMappingService.mapping(tenantId, importId, maps);
}
/**
* Preview the mapped results before process importing.
* @param {number} tenantId
* @param {number} importId
* @returns {}
*/
public async preview(tenantId: number, importId: number) {
return this.ImportFilePreviewService.preview(tenantId, importId);
}
/**
*
* @param {number} tenantId
* @param {number} importId
* @returns
*/
public async process(tenantId: number, importId: number) {
return this.importProcessService.process(tenantId, importId);
}
}

View File

@@ -1,120 +0,0 @@
import XLSX, { readFile } from 'xlsx';
import * as R from 'ramda';
import async from 'async';
import { camelCase, snakeCase, upperFirst } from 'lodash';
import HasTenancyService from '../Tenancy/TenancyService';
import { Inject, Service } from 'typedi';
import { first } from 'lodash';
import { ServiceError } from '@/exceptions';
import { validate } from 'class-validator';
import { AccountDTOSchema } from '../Accounts/CreateAccountDTOSchema';
import { AccountsApplication } from '../Accounts/AccountsApplication';
import { plainToClass, plainToInstance } from 'class-transformer';
const fs = require('fs').promises;
const ERRORS = {
IMPORT_ID_NOT_FOUND: 'IMPORT_ID_NOT_FOUND',
};
@Service()
export class ImportResourceInjectable {
@Inject()
private tenancy: HasTenancyService;
@Inject()
private accountsApplication: AccountsApplication;
public async mapping(
tenantId: number,
importId: number,
maps: { from: string; to: string }[]
) {
const { Import } = this.tenancy.models(tenantId);
const importFile = await Import.query().find('filename', importId);
if (!importFile) {
throw new ServiceError(ERRORS.IMPORT_ID_NOT_FOUND);
}
//
await Import.query()
.findById(importFile.id)
.update({
maps: JSON.stringify(maps),
});
// - Validate the to columns.
// - Store the mapping in the import table.
// -
}
public async preview(tenantId: number, importId: string) {}
/**
*
* @param tenantId
* @param importId
*/
public async importFile(tenantId: number, importId: string) {
const { Import } = this.tenancy.models(tenantId);
const importFile = await Import.query().where('importId', importId).first();
if (!importFile) {
throw new ServiceError(ERRORS.IMPORT_ID_NOT_FOUND);
}
const buffer = await fs.readFile(`public/imports/${importFile.filename}`);
const workbook = XLSX.read(buffer, { type: 'buffer' });
const firstSheetName = workbook.SheetNames[0];
const worksheet = workbook.Sheets[firstSheetName];
const jsonData = XLSX.utils.sheet_to_json(worksheet);
const data = R.compose(R.map(Object.keys), R.map(trimObject))(jsonData);
const header = first(data);
const body = jsonData;
const mapping = JSON.parse(importFile.mapping) || [];
const newData = [];
const findToAttr = (from: string) => {
const found = mapping.find((item) => {
return item.from === from;
});
return found?.to;
};
body.forEach((row) => {
const obj = {};
header.forEach((key, index) => {
const toIndex = camelCase(findToAttr(key));
obj[toIndex] = row[key];
});
newData.push(obj);
});
const saveJob = async (data) => {
const account = {};
Object.keys(data).map((key) => {
account[key] = data[key];
});
const accountClass = plainToInstance(AccountDTOSchema, account);
const errors = await validate(accountClass);
if (errors.length > 0) {
console.log('validation failed. errors: ', errors);
} else {
return this.accountsApplication.createAccount(tenantId, account);
}
};
const saveDataQueue = async.queue(saveJob, 10);
newData.forEach((data) => {
saveDataQueue.push(data);
});
await saveDataQueue.drain();
}
}

View File

@@ -0,0 +1,7 @@
abstract class importable {
}

View File

@@ -0,0 +1,10 @@
export interface ImportMappingAttr {
from: string;
to: string;
}
export interface ImportValidationError {
index: number;
property: string;
constraints: Record<string, string>;
}

View File

@@ -1,5 +1,6 @@
import { Service, Inject } from 'typedi';
import TenancyService from '@/services/Tenancy/TenancyService';
import { Transaction } from 'objection';
/**
* Enumeration that represents transaction isolation levels for use with the {@link Transactional} annotation
@@ -38,18 +39,22 @@ export default class UnitOfWork {
public withTransaction = async (
tenantId: number,
work,
trx?: Transaction,
isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED
) => {
const knex = this.tenancy.knex(tenantId);
const trx = await knex.transaction({ isolationLevel });
let _trx = trx;
if (_trx) {
_trx = await knex.transaction({ isolationLevel });
}
try {
const result = await work(trx);
trx.commit();
const result = await work(_trx);
_trx.commit();
return result;
} catch (error) {
trx.rollback();
_trx.rollback();
throw error;
}
};