From 8dc2b187077bd4c6a705d76f90c6a38dc38c621c Mon Sep 17 00:00:00 2001 From: Ahmed Bouhuolia Date: Sun, 23 Jun 2024 18:49:46 +0200 Subject: [PATCH] feat: recognize the syncd bank transactions --- ...categorized_cashflow_transactions_table.js | 11 +++++ packages/server/src/interfaces/CashFlow.ts | 1 + packages/server/src/interfaces/Plaid.ts | 9 ++++ packages/server/src/loaders/eventEmitter.ts | 4 ++ .../GetMatchedTransactionsByExpenses.ts | 23 +++++++--- .../GetMatchedTransactionsByManualJournals.ts | 26 +++++++++--- .../ValidateMatchingOnCashflowDelete.ts | 2 +- .../src/services/Banking/Plaid/PlaidSyncDB.ts | 36 +++++++++++----- .../RecognizeSyncedBankTransactions.ts | 42 +++++++++++++++++++ .../RecognizeTranasctionsService.ts | 19 ++++++--- .../services/Banking/Rules/DeleteBankRule.ts | 3 +- packages/server/src/subscribers/events.ts | 3 +- 12 files changed, 150 insertions(+), 29 deletions(-) create mode 100644 packages/server/src/database/migrations/20240623154149_add_batch_column_to_uncategorized_cashflow_transactions_table.js create mode 100644 packages/server/src/services/Banking/Plaid/subscribers/RecognizeSyncedBankTransactions.ts diff --git a/packages/server/src/database/migrations/20240623154149_add_batch_column_to_uncategorized_cashflow_transactions_table.js b/packages/server/src/database/migrations/20240623154149_add_batch_column_to_uncategorized_cashflow_transactions_table.js new file mode 100644 index 000000000..7641da878 --- /dev/null +++ b/packages/server/src/database/migrations/20240623154149_add_batch_column_to_uncategorized_cashflow_transactions_table.js @@ -0,0 +1,11 @@ +exports.up = function (knex) { + return knex.schema.table('uncategorized_cashflow_transactions', (table) => { + table.string('batch'); + }); +}; + +exports.down = function (knex) { + return knex.schema.table('uncategorized_cashflow_transactions', (table) => { + table.dropColumn('batch'); + }); +}; diff --git a/packages/server/src/interfaces/CashFlow.ts b/packages/server/src/interfaces/CashFlow.ts index 499c526b0..ce9bbf5b2 100644 --- a/packages/server/src/interfaces/CashFlow.ts +++ b/packages/server/src/interfaces/CashFlow.ts @@ -267,4 +267,5 @@ export interface CreateUncategorizedTransactionDTO { description?: string; referenceNo?: string | null; plaidTransactionId?: string | null; + batch?: string; } diff --git a/packages/server/src/interfaces/Plaid.ts b/packages/server/src/interfaces/Plaid.ts index a8ad469df..6142dab93 100644 --- a/packages/server/src/interfaces/Plaid.ts +++ b/packages/server/src/interfaces/Plaid.ts @@ -1,3 +1,5 @@ +import { Knex } from "knex"; + export interface IPlaidItemCreatedEventPayload { tenantId: number; plaidAccessToken: string; @@ -54,3 +56,10 @@ export interface SyncAccountsTransactionsTask { plaidAccountId: number; plaidTransactions: PlaidTransaction[]; } + +export interface IPlaidTransactionsSyncedEventPayload { + tenantId: number; + plaidAccountId: number; + batch: string; + trx?: Knex.Transaction +} diff --git a/packages/server/src/loaders/eventEmitter.ts b/packages/server/src/loaders/eventEmitter.ts index fd1586db1..2e5f46c3f 100644 --- a/packages/server/src/loaders/eventEmitter.ts +++ b/packages/server/src/loaders/eventEmitter.ts @@ -108,6 +108,7 @@ import { ValidateMatchingOnManualJournalDelete } from '@/services/Banking/Matchi import { ValidateMatchingOnPaymentReceivedDelete } from '@/services/Banking/Matching/events/ValidateMatchingOnPaymentReceivedDelete'; import { ValidateMatchingOnPaymentMadeDelete } from '@/services/Banking/Matching/events/ValidateMatchingOnPaymentMadeDelete'; import { ValidateMatchingOnCashflowDelete } from '@/services/Banking/Matching/events/ValidateMatchingOnCashflowDelete'; +import { RecognizeSyncedBankTranasctions } from '@/services/Banking/Plaid/subscribers/RecognizeSyncedBankTransactions'; export default () => { return new EventPublisher(); @@ -262,5 +263,8 @@ export const susbcribers = () => { ValidateMatchingOnManualJournalDelete, ValidateMatchingOnPaymentReceivedDelete, ValidateMatchingOnPaymentMadeDelete, + + // Plaid + RecognizeSyncedBankTranasctions, ]; }; diff --git a/packages/server/src/services/Banking/Matching/GetMatchedTransactionsByExpenses.ts b/packages/server/src/services/Banking/Matching/GetMatchedTransactionsByExpenses.ts index 8480e29b2..9205ce2e8 100644 --- a/packages/server/src/services/Banking/Matching/GetMatchedTransactionsByExpenses.ts +++ b/packages/server/src/services/Banking/Matching/GetMatchedTransactionsByExpenses.ts @@ -1,9 +1,9 @@ import { Inject, Service } from 'typedi'; -import { GetMatchedTransactionManualJournalsTransformer } from './GetMatchedTransactionManualJournalsTransformer'; import { GetMatchedTransactionsFilter } from './types'; import { TransformerInjectable } from '@/lib/Transformer/TransformerInjectable'; import HasTenancyService from '@/services/Tenancy/TenancyService'; import { GetMatchedTransactionsByType } from './GetMatchedTransactionsByType'; +import { GetMatchedTransactionExpensesTransformer } from './GetMatchedTransactionExpensesTransformer'; @Service() export class GetMatchedTransactionsByExpenses extends GetMatchedTransactionsByType { @@ -14,7 +14,7 @@ export class GetMatchedTransactionsByExpenses extends GetMatchedTransactionsByTy protected transformer: TransformerInjectable; /** - * + * Retrieves the matched transactions of expenses. * @param {number} tenantId * @param {GetMatchedTransactionsFilter} filter * @returns @@ -25,12 +25,25 @@ export class GetMatchedTransactionsByExpenses extends GetMatchedTransactionsByTy ) { const { Expense } = this.tenancy.models(tenantId); - const expenses = await Expense.query(); - + const expenses = await Expense.query().onBuild((query) => { + query.whereNotExists(Expense.relatedQuery('matchedBankTransaction')); + if (filter.fromDate) { + query.where('payment_date', '>=', filter.fromDate); + } + if (filter.toDate) { + query.where('payment_date', '<=', filter.toDate); + } + if (filter.minAmount) { + query.where('total_amount', '>=', filter.minAmount); + } + if (filter.maxAmount) { + query.where('total_amount', '<=', filter.maxAmount); + } + }); return this.transformer.transform( tenantId, expenses, - new GetMatchedTransactionManualJournalsTransformer() + new GetMatchedTransactionExpensesTransformer() ); } } diff --git a/packages/server/src/services/Banking/Matching/GetMatchedTransactionsByManualJournals.ts b/packages/server/src/services/Banking/Matching/GetMatchedTransactionsByManualJournals.ts index 252ff1c82..2aa6341af 100644 --- a/packages/server/src/services/Banking/Matching/GetMatchedTransactionsByManualJournals.ts +++ b/packages/server/src/services/Banking/Matching/GetMatchedTransactionsByManualJournals.ts @@ -1,8 +1,8 @@ +import { Inject, Service } from 'typedi'; import { TransformerInjectable } from '@/lib/Transformer/TransformerInjectable'; import { GetMatchedTransactionManualJournalsTransformer } from './GetMatchedTransactionManualJournalsTransformer'; -import { Inject, Service } from 'typedi'; -import { GetMatchedTransactionsFilter } from './types'; import { GetMatchedTransactionsByType } from './GetMatchedTransactionsByType'; +import { GetMatchedTransactionsFilter } from './types'; @Service() export class GetMatchedTransactionsByManualJournals extends GetMatchedTransactionsByType { @@ -17,12 +17,27 @@ export class GetMatchedTransactionsByManualJournals extends GetMatchedTransactio */ async getMatchedTransactions( tenantId: number, - filter: GetMatchedTransactionsFilter + filter: Omit ) { const { ManualJournal } = this.tenancy.models(tenantId); - const manualJournals = await ManualJournal.query(); - + const manualJournals = await ManualJournal.query().onBuild((query) => { + query.whereNotExists( + ManualJournal.relatedQuery('matchedBankTransaction') + ); + if (filter.fromDate) { + query.where('date', '>=', filter.fromDate); + } + if (filter.toDate) { + query.where('date', '<=', filter.toDate); + } + if (filter.minAmount) { + query.where('amount', '>=', filter.minAmount); + } + if (filter.maxAmount) { + query.where('amount', '<=', filter.maxAmount); + } + }); return this.transformer.transform( tenantId, manualJournals, @@ -41,6 +56,7 @@ export class GetMatchedTransactionsByManualJournals extends GetMatchedTransactio const manualJournal = await ManualJournal.query() .findById(transactionId) + .whereNotExists(ManualJournal.relatedQuery('matchedBankTransaction')) .throwIfNotFound(); return this.transformer.transform( diff --git a/packages/server/src/services/Banking/Matching/events/ValidateMatchingOnCashflowDelete.ts b/packages/server/src/services/Banking/Matching/events/ValidateMatchingOnCashflowDelete.ts index d41f2be11..9d9a8e965 100644 --- a/packages/server/src/services/Banking/Matching/events/ValidateMatchingOnCashflowDelete.ts +++ b/packages/server/src/services/Banking/Matching/events/ValidateMatchingOnCashflowDelete.ts @@ -1,7 +1,7 @@ +import { Inject, Service } from 'typedi'; import { IManualJournalDeletingPayload } from '@/interfaces'; import events from '@/subscribers/events'; import { ValidateTransactionMatched } from '../ValidateTransactionsMatched'; -import { Inject, Service } from 'typedi'; @Service() export class ValidateMatchingOnCashflowDelete { diff --git a/packages/server/src/services/Banking/Plaid/PlaidSyncDB.ts b/packages/server/src/services/Banking/Plaid/PlaidSyncDB.ts index 004662cfd..3070c22f9 100644 --- a/packages/server/src/services/Banking/Plaid/PlaidSyncDB.ts +++ b/packages/server/src/services/Banking/Plaid/PlaidSyncDB.ts @@ -5,6 +5,7 @@ import { entries, groupBy } from 'lodash'; import { CreateAccount } from '@/services/Accounts/CreateAccount'; import { IAccountCreateDTO, +\ IPlaidTransactionsSyncedEventPayload, PlaidAccount, PlaidTransaction, } from '@/interfaces'; @@ -16,6 +17,9 @@ import { DeleteCashflowTransaction } from '@/services/Cashflow/DeleteCashflowTra import HasTenancyService from '@/services/Tenancy/TenancyService'; import { CashflowApplication } from '@/services/Cashflow/CashflowApplication'; import { Knex } from 'knex'; +import { uniqid } from 'uniqid'; +import { EventPublisher } from '@/lib/EventPublisher/EventPublisher'; +import events from '@/subscribers/events'; const CONCURRENCY_ASYNC = 10; @@ -33,6 +37,9 @@ export class PlaidSyncDb { @Inject() private deleteCashflowTransactionService: DeleteCashflowTransaction; + @Inject() + private eventPublisher: EventPublisher; + /** * Syncs the Plaid bank account. * @param {number} tenantId @@ -92,6 +99,7 @@ export class PlaidSyncDb { * @param {number} tenantId - Tenant ID. * @param {number} plaidAccountId - Plaid account ID. * @param {PlaidTransaction[]} plaidTranasctions - Plaid transactions + * @return {Promise} */ public async syncAccountTranactions( tenantId: number, @@ -101,18 +109,14 @@ export class PlaidSyncDb { ): Promise { const { Account } = this.tenancy.models(tenantId); + const batch = uniqid(); const cashflowAccount = await Account.query(trx) .findOne({ plaidAccountId }) .throwIfNotFound(); - const openingEquityBalance = await Account.query(trx).findOne( - 'slug', - 'opening-balance-equity' - ); // Transformes the Plaid transactions to cashflow create DTOs. const transformTransaction = transformPlaidTrxsToCashflowCreate( - cashflowAccount.id, - openingEquityBalance.id + cashflowAccount.id ); const uncategorizedTransDTOs = R.map(transformTransaction)(plaidTranasctions); @@ -123,20 +127,28 @@ export class PlaidSyncDb { (uncategoriedDTO) => this.cashflowApp.createUncategorizedTransaction( tenantId, - uncategoriedDTO, + { ...uncategoriedDTO, batch }, trx ), { concurrency: 1 } ); + // Triggers `onPlaidTransactionsSynced` event. + await this.eventPublisher.emitAsync(events.plaid.onTransactionsSynced, { + tenantId, + plaidAccountId, + batch, + } as IPlaidTransactionsSyncedEventPayload); } /** * Syncs the accounts transactions in paraller under controlled concurrency. * @param {number} tenantId * @param {PlaidTransaction[]} plaidTransactions + * @return {Promise} */ public async syncAccountsTransactions( tenantId: number, + batchNo: string, plaidAccountsTransactions: PlaidTransaction[], trx?: Knex.Transaction ): Promise { @@ -149,6 +161,7 @@ export class PlaidSyncDb { return this.syncAccountTranactions( tenantId, plaidAccountId, + batchNo, plaidTransactions, trx ); @@ -192,13 +205,14 @@ export class PlaidSyncDb { * @param {number} tenantId - Tenant ID. * @param {string} itemId - Plaid item ID. * @param {string} lastCursor - Last transaction cursor. + * @return {Promise} */ public async syncTransactionsCursor( tenantId: number, plaidItemId: string, lastCursor: string, trx?: Knex.Transaction - ) { + ): Promise { const { PlaidItem } = this.tenancy.models(tenantId); await PlaidItem.query(trx).findOne({ plaidItemId }).patch({ lastCursor }); @@ -208,12 +222,13 @@ export class PlaidSyncDb { * Updates the last feeds updated at of the given Plaid accounts ids. * @param {number} tenantId * @param {string[]} plaidAccountIds + * @return {Promise} */ public async updateLastFeedsUpdatedAt( tenantId: number, plaidAccountIds: string[], trx?: Knex.Transaction - ) { + ): Promise { const { Account } = this.tenancy.models(tenantId); await Account.query(trx) @@ -228,13 +243,14 @@ export class PlaidSyncDb { * @param {number} tenantId * @param {number[]} plaidAccountIds * @param {boolean} isFeedsActive + * @returns {Promise} */ public async updateAccountsFeedsActive( tenantId: number, plaidAccountIds: string[], isFeedsActive: boolean = true, trx?: Knex.Transaction - ) { + ): Promise { const { Account } = this.tenancy.models(tenantId); await Account.query(trx) diff --git a/packages/server/src/services/Banking/Plaid/subscribers/RecognizeSyncedBankTransactions.ts b/packages/server/src/services/Banking/Plaid/subscribers/RecognizeSyncedBankTransactions.ts new file mode 100644 index 000000000..42104aafc --- /dev/null +++ b/packages/server/src/services/Banking/Plaid/subscribers/RecognizeSyncedBankTransactions.ts @@ -0,0 +1,42 @@ +import { Inject, Service } from 'typedi'; +import { EventSubscriber } from '@/lib/EventPublisher/EventPublisher'; +import { + IPlaidItemCreatedEventPayload, + IPlaidTransactionsSyncedEventPayload, +} from '@/interfaces/Plaid'; +import events from '@/subscribers/events'; +import { RecognizeTranasctionsService } from '../../RegonizeTranasctions/RecognizeTranasctionsService'; +import { runAfterTransaction } from '@/services/UnitOfWork/TransactionsHooks'; + +@Service() +export class RecognizeSyncedBankTranasctions extends EventSubscriber { + @Inject() + private recognizeTranasctionsService: RecognizeTranasctionsService; + + /** + * Constructor method. + */ + public attach(bus) { + bus.subscribe( + events.plaid.onTransactionsSynced, + this.handleRecognizeSyncedBankTransactions.bind(this) + ); + } + + /** + * Updates the Plaid item transactions + * @param {IPlaidItemCreatedEventPayload} payload - Event payload. + */ + private handleRecognizeSyncedBankTransactions = async ({ + tenantId, + batch, + trx, + }: IPlaidTransactionsSyncedEventPayload) => { + runAfterTransaction(trx, async () => { + await this.recognizeTranasctionsService.recognizeTransactions( + tenantId, + batch + ); + }); + }; +} diff --git a/packages/server/src/services/Banking/RegonizeTranasctions/RecognizeTranasctionsService.ts b/packages/server/src/services/Banking/RegonizeTranasctions/RecognizeTranasctionsService.ts index 32b8a2542..77055ad34 100644 --- a/packages/server/src/services/Banking/RegonizeTranasctions/RecognizeTranasctionsService.ts +++ b/packages/server/src/services/Banking/RegonizeTranasctions/RecognizeTranasctionsService.ts @@ -50,14 +50,21 @@ export class RecognizeTranasctionsService { * @param {number} tenantId - * @param {Knex.Transaction} trx - */ - public async recognizeTransactions(tenantId: number, trx?: Knex.Transaction) { + public async recognizeTransactions( + tenantId: number, + batch: string = '', + trx?: Knex.Transaction + ) { const { UncategorizedCashflowTransaction, BankRule } = this.tenancy.models(tenantId); const uncategorizedTranasctions = - await UncategorizedCashflowTransaction.query() - .where('recognized_transaction_id', null) - .where('categorized', false); + await UncategorizedCashflowTransaction.query().onBuild((query) => { + query.where('recognized_transaction_id', null); + query.where('categorized', false); + + if (batch) query.where('batch', batch); + }); const bankRules = await BankRule.query().withGraphFetched('conditions'); const bankRulesByAccountId = transformToMapBy( @@ -93,8 +100,8 @@ export class RecognizeTranasctionsService { } /** - * - * @param {number} uncategorizedTransaction + * + * @param {number} uncategorizedTransaction */ public async regonizeTransaction( uncategorizedTransaction: UncategorizedCashflowTransaction diff --git a/packages/server/src/services/Banking/Rules/DeleteBankRule.ts b/packages/server/src/services/Banking/Rules/DeleteBankRule.ts index 9d6ce0167..c02ab6686 100644 --- a/packages/server/src/services/Banking/Rules/DeleteBankRule.ts +++ b/packages/server/src/services/Banking/Rules/DeleteBankRule.ts @@ -27,7 +27,7 @@ export class DeleteBankRuleSerivce { * @returns {Promise} */ public async deleteBankRule(tenantId: number, ruleId: number): Promise { - const { BankRule } = this.tenancy.models(tenantId); + const { BankRule, BankRuleCondition } = this.tenancy.models(tenantId); const oldBankRule = await BankRule.query() .findById(ruleId) @@ -42,6 +42,7 @@ export class DeleteBankRuleSerivce { trx, } as IBankRuleEventDeletingPayload); + await BankRuleCondition.query(trx).where('ruleId', ruleId).delete(); await BankRule.query(trx).findById(ruleId).delete(); // Triggers `onBankRuleDeleted` event. diff --git a/packages/server/src/subscribers/events.ts b/packages/server/src/subscribers/events.ts index e139b2b41..507fe9e52 100644 --- a/packages/server/src/subscribers/events.ts +++ b/packages/server/src/subscribers/events.ts @@ -616,6 +616,7 @@ export default { plaid: { onItemCreated: 'onPlaidItemCreated', + onTransactionsSynced: 'onPlaidTransactionsSynced', }, // Bank rules. @@ -637,5 +638,5 @@ export default { onUnmatching: 'onBankTransactionUnmathcing', onUnmatched: 'onBankTransactionUnmathced', - } + }, };