feat: recognize the syncd bank transactions

This commit is contained in:
Ahmed Bouhuolia
2024-06-23 18:49:46 +02:00
parent 589b29bbdd
commit 8dc2b18707
12 changed files with 150 additions and 29 deletions

View File

@@ -5,6 +5,7 @@ import { entries, groupBy } from 'lodash';
import { CreateAccount } from '@/services/Accounts/CreateAccount';
import {
IAccountCreateDTO,
\ IPlaidTransactionsSyncedEventPayload,
PlaidAccount,
PlaidTransaction,
} from '@/interfaces';
@@ -16,6 +17,9 @@ import { DeleteCashflowTransaction } from '@/services/Cashflow/DeleteCashflowTra
import HasTenancyService from '@/services/Tenancy/TenancyService';
import { CashflowApplication } from '@/services/Cashflow/CashflowApplication';
import { Knex } from 'knex';
import { uniqid } from 'uniqid';
import { EventPublisher } from '@/lib/EventPublisher/EventPublisher';
import events from '@/subscribers/events';
const CONCURRENCY_ASYNC = 10;
@@ -33,6 +37,9 @@ export class PlaidSyncDb {
@Inject()
private deleteCashflowTransactionService: DeleteCashflowTransaction;
@Inject()
private eventPublisher: EventPublisher;
/**
* Syncs the Plaid bank account.
* @param {number} tenantId
@@ -92,6 +99,7 @@ export class PlaidSyncDb {
* @param {number} tenantId - Tenant ID.
* @param {number} plaidAccountId - Plaid account ID.
* @param {PlaidTransaction[]} plaidTranasctions - Plaid transactions
* @return {Promise<void>}
*/
public async syncAccountTranactions(
tenantId: number,
@@ -101,18 +109,14 @@ export class PlaidSyncDb {
): Promise<void> {
const { Account } = this.tenancy.models(tenantId);
const batch = uniqid();
const cashflowAccount = await Account.query(trx)
.findOne({ plaidAccountId })
.throwIfNotFound();
const openingEquityBalance = await Account.query(trx).findOne(
'slug',
'opening-balance-equity'
);
// Transformes the Plaid transactions to cashflow create DTOs.
const transformTransaction = transformPlaidTrxsToCashflowCreate(
cashflowAccount.id,
openingEquityBalance.id
cashflowAccount.id
);
const uncategorizedTransDTOs =
R.map(transformTransaction)(plaidTranasctions);
@@ -123,20 +127,28 @@ export class PlaidSyncDb {
(uncategoriedDTO) =>
this.cashflowApp.createUncategorizedTransaction(
tenantId,
uncategoriedDTO,
{ ...uncategoriedDTO, batch },
trx
),
{ concurrency: 1 }
);
// Triggers `onPlaidTransactionsSynced` event.
await this.eventPublisher.emitAsync(events.plaid.onTransactionsSynced, {
tenantId,
plaidAccountId,
batch,
} as IPlaidTransactionsSyncedEventPayload);
}
/**
* Syncs the accounts transactions in paraller under controlled concurrency.
* @param {number} tenantId
* @param {PlaidTransaction[]} plaidTransactions
* @return {Promise<void>}
*/
public async syncAccountsTransactions(
tenantId: number,
batchNo: string,
plaidAccountsTransactions: PlaidTransaction[],
trx?: Knex.Transaction
): Promise<void> {
@@ -149,6 +161,7 @@ export class PlaidSyncDb {
return this.syncAccountTranactions(
tenantId,
plaidAccountId,
batchNo,
plaidTransactions,
trx
);
@@ -192,13 +205,14 @@ export class PlaidSyncDb {
* @param {number} tenantId - Tenant ID.
* @param {string} itemId - Plaid item ID.
* @param {string} lastCursor - Last transaction cursor.
* @return {Promise<void>}
*/
public async syncTransactionsCursor(
tenantId: number,
plaidItemId: string,
lastCursor: string,
trx?: Knex.Transaction
) {
): Promise<void> {
const { PlaidItem } = this.tenancy.models(tenantId);
await PlaidItem.query(trx).findOne({ plaidItemId }).patch({ lastCursor });
@@ -208,12 +222,13 @@ export class PlaidSyncDb {
* Updates the last feeds updated at of the given Plaid accounts ids.
* @param {number} tenantId
* @param {string[]} plaidAccountIds
* @return {Promise<void>}
*/
public async updateLastFeedsUpdatedAt(
tenantId: number,
plaidAccountIds: string[],
trx?: Knex.Transaction
) {
): Promise<void> {
const { Account } = this.tenancy.models(tenantId);
await Account.query(trx)
@@ -228,13 +243,14 @@ export class PlaidSyncDb {
* @param {number} tenantId
* @param {number[]} plaidAccountIds
* @param {boolean} isFeedsActive
* @returns {Promise<void>}
*/
public async updateAccountsFeedsActive(
tenantId: number,
plaidAccountIds: string[],
isFeedsActive: boolean = true,
trx?: Knex.Transaction
) {
): Promise<void> {
const { Account } = this.tenancy.models(tenantId);
await Account.query(trx)

View File

@@ -0,0 +1,42 @@
import { Inject, Service } from 'typedi';
import { EventSubscriber } from '@/lib/EventPublisher/EventPublisher';
import {
IPlaidItemCreatedEventPayload,
IPlaidTransactionsSyncedEventPayload,
} from '@/interfaces/Plaid';
import events from '@/subscribers/events';
import { RecognizeTranasctionsService } from '../../RegonizeTranasctions/RecognizeTranasctionsService';
import { runAfterTransaction } from '@/services/UnitOfWork/TransactionsHooks';
@Service()
export class RecognizeSyncedBankTranasctions extends EventSubscriber {
@Inject()
private recognizeTranasctionsService: RecognizeTranasctionsService;
/**
* Constructor method.
*/
public attach(bus) {
bus.subscribe(
events.plaid.onTransactionsSynced,
this.handleRecognizeSyncedBankTransactions.bind(this)
);
}
/**
* Updates the Plaid item transactions
* @param {IPlaidItemCreatedEventPayload} payload - Event payload.
*/
private handleRecognizeSyncedBankTransactions = async ({
tenantId,
batch,
trx,
}: IPlaidTransactionsSyncedEventPayload) => {
runAfterTransaction(trx, async () => {
await this.recognizeTranasctionsService.recognizeTransactions(
tenantId,
batch
);
});
};
}