refactor(nestjs): plaid banking syncing

This commit is contained in:
Ahmed Bouhuolia
2025-06-01 18:38:29 +02:00
parent 66a2261e50
commit deadd5ac80
19 changed files with 267 additions and 132 deletions

View File

@@ -1,3 +1,4 @@
import { BullModule } from '@nestjs/bullmq';
import { Module } from '@nestjs/common';
import { PlaidUpdateTransactionsOnItemCreatedSubscriber } from './subscribers/PlaidUpdateTransactionsOnItemCreatedSubscriber';
import { PlaidUpdateTransactions } from './command/PlaidUpdateTransactions';
@@ -16,6 +17,10 @@ import { TenancyContext } from '../Tenancy/TenancyContext.service';
import { InjectSystemModel } from '../System/SystemModels/SystemModels.module';
import { SystemPlaidItem } from './models/SystemPlaidItem';
import { BankingPlaidController } from './BankingPlaid.controller';
import { BankingPlaidWebhooksController } from './BankingPlaidWebhooks.controller';
import { SetupPlaidItemTenantService } from './command/SetupPlaidItemTenant.service';
import { UpdateBankingPlaidTransitionsQueueJob } from './types/BankingPlaid.types';
import { PlaidFetchTransactionsProcessor } from './jobs/PlaidFetchTransactionsJob';
const models = [RegisterTenancyModel(PlaidItem)];
@@ -25,6 +30,7 @@ const models = [RegisterTenancyModel(PlaidItem)];
AccountsModule,
BankingCategorizeModule,
BankingTransactionsModule,
BullModule.registerQueue({ name: UpdateBankingPlaidTransitionsQueueJob }),
...models,
],
providers: [
@@ -35,10 +41,12 @@ const models = [RegisterTenancyModel(PlaidItem)];
PlaidWebooks,
PlaidLinkTokenService,
PlaidApplication,
PlaidUpdateTransactionsOnItemCreatedSubscriber,
SetupPlaidItemTenantService,
TenancyContext,
PlaidFetchTransactionsProcessor,
PlaidUpdateTransactionsOnItemCreatedSubscriber,
],
exports: [...models],
controllers: [BankingPlaidController]
controllers: [BankingPlaidController, BankingPlaidWebhooksController],
})
export class BankingPlaidModule {}

View File

@@ -0,0 +1,30 @@
import { Body, Controller, Post } from '@nestjs/common';
import { PlaidWebhookDto } from './dtos/PlaidItem.dto';
import { ApiOperation } from '@nestjs/swagger';
import { PlaidApplication } from './PlaidApplication';
import { PublicRoute } from '../Auth/guards/jwt.guard';
import { SetupPlaidItemTenantService } from './command/SetupPlaidItemTenant.service';
@Controller('banking/plaid')
@PublicRoute()
export class BankingPlaidWebhooksController {
constructor(
private readonly plaidApplication: PlaidApplication,
private readonly setupPlaidItemTenantService: SetupPlaidItemTenantService,
) {}
@Post('webhooks')
@ApiOperation({ summary: 'Listen to Plaid webhooks' })
webhooks(@Body() { itemId, webhookType, webhookCode }: PlaidWebhookDto) {
return this.setupPlaidItemTenantService.setupPlaidTenant(
itemId,
() => {
return this.plaidApplication.webhooks(
itemId,
webhookType,
webhookCode,
);
},
);
}
}

View File

@@ -1,9 +1,12 @@
import { ClsService } from 'nestjs-cls';
import { Inject, Injectable } from '@nestjs/common';
import { PlaidLinkTokenService } from './queries/GetPlaidLinkToken.service';
import { PlaidItemService } from './command/PlaidItem';
import { PlaidWebooks } from './command/PlaidWebhooks';
import { Injectable } from '@nestjs/common';
import { PlaidItemDTO } from './types/BankingPlaid.types';
import { PlaidItemDto } from './dtos/PlaidItem.dto';
import { SystemPlaidItem } from './models/SystemPlaidItem';
import { TenantModel } from '../System/models/TenantModel';
import { SystemUser } from '../System/models/SystemUser';
@Injectable()
export class PlaidApplication {
@@ -11,6 +14,16 @@ export class PlaidApplication {
private readonly getLinkTokenService: PlaidLinkTokenService,
private readonly plaidItemService: PlaidItemService,
private readonly plaidWebhooks: PlaidWebooks,
private readonly clsService: ClsService,
@Inject(SystemPlaidItem.name)
private readonly systemPlaidItemModel: typeof SystemPlaidItem,
@Inject(TenantModel.name)
private readonly tenantModel: typeof TenantModel,
@Inject(SystemUser.name)
private readonly systemUserModel: typeof SystemUser,
) {}
/**
@@ -42,10 +55,33 @@ export class PlaidApplication {
webhookType: string,
webhookCode: string,
): Promise<void> {
return this.plaidWebhooks.webhooks(
plaidItemId,
webhookType,
webhookCode,
);
return this.plaidWebhooks.webhooks(plaidItemId, webhookType, webhookCode);
}
public async setupPlaidTenant(plaidItemId: string, callback: () => void) {
const plaidItem = await this.systemPlaidItemModel
.query()
.findOne({ plaidItemId });
if (!plaidItem) {
throw new Error('Plaid item not found');
}
const tenant = await this.tenantModel
.query()
.findOne({ id: plaidItem.tenantId })
.throwIfNotFound();
const user = await this.systemUserModel
.query()
.findOne({
tenantId: tenant.id,
})
.modify('active')
.throwIfNotFound();
this.clsService.set('organizationId', tenant.organizationId);
this.clsService.set('userId', user.id);
return callback();
}
}

View File

@@ -1,32 +0,0 @@
// import { Request, Response, NextFunction } from 'express';
// import { SystemPlaidItem, Tenant } from '@/system/models';
// import tenantDependencyInjection from '@/api/middleware/TenantDependencyInjection';
// export const PlaidWebhookTenantBootMiddleware = async (
// req: Request,
// res: Response,
// next: NextFunction
// ) => {
// const { item_id: plaidItemId } = req.body;
// const plaidItem = await SystemPlaidItem.query().findOne({ plaidItemId });
// const notFoundOrganization = () => {
// return res.boom.unauthorized('Organization identication not found.', {
// errors: [{ type: 'ORGANIZATION.ID.NOT.FOUND', code: 100 }],
// });
// };
// // In case the given organization not found.
// if (!plaidItem) {
// return notFoundOrganization();
// }
// const tenant = await Tenant.query()
// .findById(plaidItem.tenantId)
// .withGraphFetched('metadata');
// // When the given organization id not found on the system storage.
// if (!tenant) {
// return notFoundOrganization();
// }
// tenantDependencyInjection(req, tenant);
// next();
// };

View File

@@ -17,9 +17,7 @@ export class PlaidItemService {
private readonly tenancyContext: TenancyContext,
@Inject(SystemPlaidItem.name)
private readonly systemPlaidItemModel: TenantModelProxy<
typeof SystemPlaidItem
>,
private readonly systemPlaidItemModel: typeof SystemPlaidItem,
@Inject(PlaidItem.name)
private readonly plaidItemModel: TenantModelProxy<typeof PlaidItem>,
@@ -55,7 +53,7 @@ export class PlaidItemService {
plaidInstitutionId: institutionId,
});
// Stores the Plaid item id on system scope.
await this.systemPlaidItemModel().query().insert({ tenantId, plaidItemId });
await this.systemPlaidItemModel.query().insert({ tenantId, plaidItemId });
// Triggers `onPlaidItemCreated` event.
await this.eventEmitter.emitAsync(events.plaid.onItemCreated, {

View File

@@ -1,5 +1,6 @@
import * as R from 'ramda';
import bluebird from 'bluebird';
import * as bluebird from 'bluebird';
import * as uniqid from 'uniqid';
import { entries, groupBy } from 'lodash';
import {
AccountBase as PlaidAccountBase,
@@ -12,7 +13,6 @@ import {
transformPlaidTrxsToCashflowCreate,
} from '../utils';
import { Knex } from 'knex';
import uniqid from 'uniqid';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { RemovePendingUncategorizedTransaction } from '../../BankingTransactions/commands/RemovePendingUncategorizedTransaction.service';
import { CreateAccountService } from '../../Accounts/CreateAccount.service';

View File

@@ -103,7 +103,6 @@ export class PlaidUpdateTransactions {
/**
* Fetches transactions from the `Plaid API` for a given item.
* @param {number} tenantId - Tenant ID.
* @param {string} plaidItemId - The Plaid ID for the item.
* @returns {Promise<PlaidFetchedTransactionsUpdates>}
*/

View File

@@ -1,3 +1,4 @@
import { TenantModelProxy } from '@/modules/System/models/TenantBaseModel';
import { PlaidItem } from '../models/PlaidItem';
import { PlaidUpdateTransactions } from './PlaidUpdateTransactions';
import { Inject, Injectable } from '@nestjs/common';
@@ -8,7 +9,7 @@ export class PlaidWebooks {
private readonly updateTransactionsService: PlaidUpdateTransactions,
@Inject(PlaidItem.name)
private readonly plaidItemModel: typeof PlaidItem,
private readonly plaidItemModel: TenantModelProxy<typeof PlaidItem>,
) {}
/**
@@ -76,11 +77,10 @@ export class PlaidWebooks {
* @returns {Promise<void>}
*/
public async handleTransactionsWebooks(
tenantId: number,
plaidItemId: string,
webhookCode: string,
): Promise<void> {
const plaidItem = await this.plaidItemModel
const plaidItem = await this.plaidItemModel()
.query()
.findOne({ plaidItemId })
.throwIfNotFound();
@@ -122,9 +122,8 @@ export class PlaidWebooks {
/**
* Handles all Item webhook events.
* @param {number} tenantId - Tenant ID
* @param {string} webhookCode - The webhook code
* @param {string} plaidItemId - The Plaid ID for the item
* @param {string} webhookCode - The webhook code
* @returns {Promise<void>}
*/
public async itemsHandler(

View File

@@ -0,0 +1,54 @@
import { ClsService } from 'nestjs-cls';
import { Inject, Injectable } from '@nestjs/common';
import { SystemPlaidItem } from '../models/SystemPlaidItem';
import { TenantModel } from '@/modules/System/models/TenantModel';
import { SystemUser } from '@/modules/System/models/SystemUser';
@Injectable()
export class SetupPlaidItemTenantService {
constructor(
private readonly clsService: ClsService,
@Inject(SystemPlaidItem.name)
private readonly systemPlaidItemModel: typeof SystemPlaidItem,
@Inject(TenantModel.name)
private readonly tenantModel: typeof TenantModel,
@Inject(SystemUser.name)
private readonly systemUserModel: typeof SystemUser,
) {}
/**
* Sets up the Plaid tenant.
* @param {string} plaidItemId - The Plaid item id.
* @param {() => void} callback - The callback function to execute after setting up the Plaid tenant.
* @returns {Promise<void>}
*/
public async setupPlaidTenant(plaidItemId: string, callback: () => void) {
const plaidItem = await this.systemPlaidItemModel
.query()
.findOne({ plaidItemId });
if (!plaidItem) {
throw new Error('Plaid item not found');
}
const tenant = await this.tenantModel
.query()
.findOne({ id: plaidItem.tenantId })
.throwIfNotFound();
const user = await this.systemUserModel
.query()
.findOne({
tenantId: tenant.id,
})
.modify('active')
.throwIfNotFound();
this.clsService.set('organizationId', tenant.organizationId);
this.clsService.set('userId', user.id);
return callback();
}
}

View File

@@ -9,3 +9,18 @@ export class PlaidItemDto {
@IsNotEmpty()
institutionId: string;
}
export class PlaidWebhookDto {
@IsString()
@IsNotEmpty()
itemId: string;
@IsString()
@IsNotEmpty()
webhookType: string;
@IsString()
@IsNotEmpty()
webhookCode: string;
}

View File

@@ -1,43 +1,46 @@
// import Container, { Service } from 'typedi';
// import { PlaidUpdateTransactions } from './PlaidUpdateTransactions';
// import { IPlaidItemCreatedEventPayload } from '@/interfaces';
import { Process } from '@nestjs/bull';
import { UseCls } from 'nestjs-cls';
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Scope } from '@nestjs/common';
import { Job } from 'bullmq';
import {
PlaidFetchTransitonsEventPayload,
UpdateBankingPlaidTransitionsJob,
UpdateBankingPlaidTransitionsQueueJob,
} from '../types/BankingPlaid.types';
import { PlaidUpdateTransactions } from '../command/PlaidUpdateTransactions';
import { SetupPlaidItemTenantService } from '../command/SetupPlaidItemTenant.service';
// @Service()
// export class PlaidFetchTransactionsJob {
// /**
// * Constructor method.
// */
// constructor(agenda) {
// agenda.define(
// 'plaid-update-account-transactions',
// { priority: 'high', concurrency: 2 },
// this.handler
// );
// }
@Processor({
name: UpdateBankingPlaidTransitionsQueueJob,
scope: Scope.REQUEST,
})
export class PlaidFetchTransactionsProcessor extends WorkerHost {
constructor(
private readonly plaidFetchTransactionsService: PlaidUpdateTransactions,
private readonly setupPlaidItemService: SetupPlaidItemTenantService,
) {
super();
}
// /**
// * Triggers the function.
// */
// private handler = async (job, done: Function) => {
// const { tenantId, plaidItemId } = job.attrs
// .data as IPlaidItemCreatedEventPayload;
/**
* Triggers the function.
*/
@Process(UpdateBankingPlaidTransitionsJob)
@UseCls()
async process(job: Job<PlaidFetchTransitonsEventPayload>) {
const { plaidItemId } = job.data;
// const plaidFetchTransactionsService = Container.get(
// PlaidUpdateTransactions
// );
// const io = Container.get('socket');
// try {
// await plaidFetchTransactionsService.updateTransactions(
// tenantId,
// plaidItemId
// );
// // Notify the frontend to reflect the new transactions changes.
// io.emit('NEW_TRANSACTIONS_DATA', { plaidItemId });
// done();
// } catch (error) {
// console.log(error);
// done(error);
// }
// };
// }
try {
await this.setupPlaidItemService.setupPlaidTenant(plaidItemId, () => {
return this.plaidFetchTransactionsService.updateTransactions(
plaidItemId,
);
});
// Notify the frontend to reflect the new transactions changes.
// io.emit('NEW_TRANSACTIONS_DATA', { plaidItemId });
} catch (error) {
console.log(error);
}
}
}

View File

@@ -30,7 +30,7 @@ export class SystemPlaidItem extends BaseModel {
* Relationship mapping.
*/
static get relationMappings() {
const Tenant = require('system/models/Tenant');
const { TenantModel } = require('../../System/models/TenantModel');
return {
/**
@@ -38,7 +38,7 @@ export class SystemPlaidItem extends BaseModel {
*/
tenant: {
relation: Model.BelongsToOneRelation,
modelClass: Tenant.default,
modelClass: TenantModel,
join: {
from: 'users.tenantId',
to: 'tenants.id',

View File

@@ -1,22 +1,34 @@
import { events } from '@/common/events/events';
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { IPlaidItemCreatedEventPayload } from '../types/BankingPlaid.types';
import {
IPlaidItemCreatedEventPayload,
UpdateBankingPlaidTransitionsJob,
UpdateBankingPlaidTransitionsQueueJob,
} from '../types/BankingPlaid.types';
import { Queue } from 'bullmq';
import { InjectQueue } from '@nestjs/bullmq';
@Injectable()
export class PlaidUpdateTransactionsOnItemCreatedSubscriber {
constructor(
@InjectQueue(UpdateBankingPlaidTransitionsQueueJob)
private readonly updateTransitionsQueue: Queue,
) {}
/**
* Updates the Plaid item transactions
* @param {IPlaidItemCreatedEventPayload} payload - Event payload.
*/
@OnEvent(events.plaid.onItemCreated)
public async handleUpdateTransactionsOnItemCreated({
tenantId,
plaidItemId,
plaidAccessToken,
plaidInstitutionId,
}: IPlaidItemCreatedEventPayload) {
const payload = { tenantId, plaidItemId };
// await this.agenda.now('plaid-update-account-transactions', payload);
};
const payload = { plaidItemId };
await this.updateTransitionsQueue.add(
UpdateBankingPlaidTransitionsJob,
payload,
);
}
}

View File

@@ -1,11 +1,11 @@
import { Knex } from "knex";
import { RemovedTransaction, Transaction } from "plaid";
import { Knex } from 'knex';
import { RemovedTransaction, Transaction } from 'plaid';
export interface IPlaidTransactionsSyncedEventPayload {
// tenantId: number;
plaidAccountId: number;
batch: string;
trx?: Knex.Transaction
trx?: Knex.Transaction;
}
export interface PlaidItemDTO {
@@ -13,7 +13,6 @@ export interface PlaidItemDTO {
institutionId: string;
}
export interface PlaidFetchedTransactionsUpdates {
added: Transaction[];
modified: Transaction[];
@@ -22,11 +21,20 @@ export interface PlaidFetchedTransactionsUpdates {
cursor: string;
}
export interface IPlaidItemCreatedEventPayload {
tenantId: number;
plaidAccessToken: string;
plaidItemId: string;
plaidInstitutionId: string;
}
export const UpdateBankingPlaidTransitionsJob =
'update-banking-plaid-transitions-job';
export const UpdateBankingPlaidTransitionsQueueJob =
'update-banking-plaid-transitions-query';
export interface PlaidFetchTransitonsEventPayload {
plaidItemId: string;
}

View File

@@ -8,12 +8,13 @@ import { Inject, Injectable } from '@nestjs/common';
import { events } from '@/common/events/events';
import { Account } from '@/modules/Accounts/models/Account.model';
import { UncategorizedBankTransaction } from '../models/UncategorizedBankTransaction';
import { TenantModelProxy } from '@/modules/System/models/TenantBaseModel';
@Injectable()
export class DecrementUncategorizedTransactionOnCategorizeSubscriber {
constructor(
@Inject(Account.name)
private readonly accountModel: typeof Account,
private readonly accountModel: TenantModelProxy<typeof Account>,
) {}
/**
@@ -33,7 +34,7 @@ export class DecrementUncategorizedTransactionOnCategorizeSubscriber {
if (uncategorizedTransaction.isPending) {
return;
}
await this.accountModel
await this.accountModel()
.query(trx)
.findById(uncategorizedTransaction.accountId)
.decrement('uncategorizedTransactions', 1);
@@ -58,7 +59,7 @@ export class DecrementUncategorizedTransactionOnCategorizeSubscriber {
if (uncategorizedTransaction.isPending) {
return;
}
await this.accountModel
await this.accountModel()
.query(trx)
.findById(uncategorizedTransaction.accountId)
.increment('uncategorizedTransactions', 1);
@@ -80,7 +81,7 @@ export class DecrementUncategorizedTransactionOnCategorizeSubscriber {
// Cannot continue if the transaction is still pending.
if (uncategorizedTransaction.isPending) return;
await this.accountModel
await this.accountModel()
.query(trx)
.findById(uncategorizedTransaction.accountId)
.increment('uncategorizedTransactions', 1);

View File

@@ -76,7 +76,7 @@ export class PaymentReceived extends TenantBaseModel {
const { Customer } = require('../../Customers/models/Customer');
const { Account } = require('../../Accounts/models/Account.model');
const { Branch } = require('../../Branches/models/Branch.model');
// const Document = require('../../Documents/models/Document');
const { DocumentModel } = require('../../Attachments/models/Document.model');
return {
customer: {
@@ -139,21 +139,21 @@ export class PaymentReceived extends TenantBaseModel {
/**
* Payment transaction may has many attached attachments.
*/
// attachments: {
// relation: Model.ManyToManyRelation,
// modelClass: Document.default,
// join: {
// from: 'payment_receives.id',
// through: {
// from: 'document_links.modelId',
// to: 'document_links.documentId',
// },
// to: 'documents.id',
// },
// filter(query) {
// query.where('model_ref', 'PaymentReceive');
// },
// },
attachments: {
relation: Model.ManyToManyRelation,
modelClass: DocumentModel,
join: {
from: 'payment_receives.id',
through: {
from: 'document_links.modelId',
to: 'document_links.documentId',
},
to: 'documents.id',
},
filter(query) {
query.where('model_ref', 'PaymentReceive');
},
},
};
}

View File

@@ -1,9 +1,9 @@
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Inject, Injectable } from '@nestjs/common';
import { SaleEstimateTransfromer } from './SaleEstimate.transformer';
import { SaleEstimateValidators } from '../commands/SaleEstimateValidators.service';
import { TransformerInjectable } from '@/modules/Transformer/TransformerInjectable.service';
import { SaleEstimate } from '../models/SaleEstimate';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { events } from '@/common/events/events';
import { TenantModelProxy } from '@/modules/System/models/TenantBaseModel';

View File

@@ -28,6 +28,10 @@ export class SystemUser extends BaseModel {
inviteAccepted(query) {
query.whereNotNull('invite_accepted_at');
},
active(query) {
query.where('active', true);
},
};
}

View File

@@ -552,7 +552,7 @@ export function useGetRecognizedBankTransaction(
() =>
apiRequest
.get(`/banking/recognized/transactions/${uncategorizedTransactionId}`)
.then((res) => transformToCamelCase(res.data?.data)),
.then((res) => transformToCamelCase(res.data)),
options,
);
}
@@ -580,7 +580,7 @@ export function useGetBankAccountSummaryMeta(
() =>
apiRequest
.get(`/banking/accounts/${bankAccountId}/summary`)
.then((res) => transformToCamelCase(res.data?.data)),
.then((res) => transformToCamelCase(res.data)),
{ ...options },
);
}
@@ -616,7 +616,7 @@ export function useGetAutofillCategorizeTransaction(
.get(`/banking/categorize/autofill`, {
params: { uncategorizedTransactionIds },
})
.then((res) => transformToCamelCase(res.data?.data)),
.then((res) => transformToCamelCase(res.data)),
{ ...options },
);
}