import * as R from 'ramda'; import bluebird from 'bluebird'; import { entries, groupBy } from 'lodash'; import { AccountBase as PlaidAccountBase, Item as PlaidItem, Institution as PlaidInstitution, Transaction as PlaidTransaction, } from 'plaid'; import { transformPlaidAccountToCreateAccount, transformPlaidTrxsToCashflowCreate, } from '../utils'; import { Knex } from 'knex'; import uniqid from 'uniqid'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { RemovePendingUncategorizedTransaction } from '../../BankingTransactions/commands/RemovePendingUncategorizedTransaction.service'; import { CreateAccountService } from '../../Accounts/CreateAccount.service'; import { Account } from '../../Accounts/models/Account.model'; import { events } from '@/common/events/events'; import { PlaidItem as PlaidItemModel } from '../models/PlaidItem'; import { IAccountCreateDTO } from '@/interfaces/Account'; import { IPlaidTransactionsSyncedEventPayload } from '../types/BankingPlaid.types'; import { UncategorizedBankTransaction } from '../../BankingTransactions/models/UncategorizedBankTransaction'; import { Inject, Injectable } from '@nestjs/common'; import { CreateUncategorizedTransactionService } from '@/modules/BankingCategorize/commands/CreateUncategorizedTransaction.service'; import { TenantModelProxy } from '../../System/models/TenantBaseModel'; const CONCURRENCY_ASYNC = 10; @Injectable() export class PlaidSyncDb { constructor( private readonly createAccountService: CreateAccountService, private readonly createUncategorizedTransaction: CreateUncategorizedTransactionService, private readonly removePendingTransaction: RemovePendingUncategorizedTransaction, private readonly eventPublisher: EventEmitter2, @Inject(Account.name) private readonly accountModel: TenantModelProxy, @Inject(PlaidItemModel.name) private readonly plaidItemModel: TenantModelProxy, @Inject(UncategorizedBankTransaction.name) private readonly uncategorizedBankTransactionModel: TenantModelProxy< typeof UncategorizedBankTransaction >, ) {} /** * Syncs the Plaid bank account. * @param {IAccountCreateDTO} createBankAccountDTO * @param {Knex.Transaction} trx * @returns {Promise} */ public async syncBankAccount( createBankAccountDTO: IAccountCreateDTO, trx?: Knex.Transaction, ) { const plaidAccount = await this.accountModel() .query(trx) .findOne('plaidAccountId', createBankAccountDTO.plaidAccountId); // Can't continue if the Plaid account is already created. if (plaidAccount) { return; } await this.createAccountService.createAccount(createBankAccountDTO, trx, { ignoreUniqueName: true, }); } /** * Syncs the plaid accounts to the system accounts. * @param {PlaidAccount[]} plaidAccounts * @returns {Promise} */ public async syncBankAccounts( plaidAccounts: PlaidAccountBase[], institution: PlaidInstitution, item: PlaidItem, trx?: Knex.Transaction, ): Promise { const transformToPlaidAccounts = R.curry( transformPlaidAccountToCreateAccount, )(item, institution); const accountCreateDTOs = R.map(transformToPlaidAccounts)(plaidAccounts); await bluebird.map( accountCreateDTOs, (createAccountDTO: any) => this.syncBankAccount(createAccountDTO, trx), { concurrency: CONCURRENCY_ASYNC }, ); } /** * Synsc the Plaid transactions to the system GL entries. * @param {number} plaidAccountId - Plaid account ID. * @param {PlaidTransaction[]} plaidTranasctions - Plaid transactions * @return {Promise} */ public async syncAccountTranactions( plaidAccountId: number, plaidTranasctions: PlaidTransaction[], trx?: Knex.Transaction, ): Promise { const batch = uniqid(); const cashflowAccount = await this.accountModel() .query(trx) .findOne({ plaidAccountId }) .throwIfNotFound(); // Transformes the Plaid transactions to cashflow create DTOs. const transformTransaction = R.curry(transformPlaidTrxsToCashflowCreate)( cashflowAccount.id, ); const uncategorizedTransDTOs = R.map(transformTransaction)(plaidTranasctions); // Creating account transaction queue. await bluebird.map( uncategorizedTransDTOs, (uncategoriedDTO) => this.createUncategorizedTransaction.create( { ...uncategoriedDTO, batch }, trx, ), { concurrency: 1 }, ); // Triggers `onPlaidTransactionsSynced` event. await this.eventPublisher.emitAsync(events.plaid.onTransactionsSynced, { plaidAccountId, batch, } as IPlaidTransactionsSyncedEventPayload); } /** * Syncs the accounts transactions in paraller under controlled concurrency. * @param {PlaidTransaction[]} plaidTransactions * @return {Promise} */ public async syncAccountsTransactions( plaidAccountsTransactions: PlaidTransaction[], trx?: Knex.Transaction, ): Promise { const groupedTrnsxByAccountId = entries( groupBy(plaidAccountsTransactions, 'account_id'), ); await bluebird.map( groupedTrnsxByAccountId, ([plaidAccountId, plaidTransactions]: [number, PlaidTransaction[]]) => { return this.syncAccountTranactions( plaidAccountId, plaidTransactions, trx, ); }, { concurrency: CONCURRENCY_ASYNC }, ); } /** * Syncs the removed Plaid transactions ids from the cashflow system transactions. * @param {string[]} plaidTransactionsIds - Plaid Transactions IDs. */ public async syncRemoveTransactions( plaidTransactionsIds: string[], trx?: Knex.Transaction, ) { const uncategorizedTransactions = await this.uncategorizedBankTransactionModel() .query(trx) .whereIn('plaidTransactionId', plaidTransactionsIds); const uncategorizedTransactionsIds = uncategorizedTransactions.map( (trans) => trans.id, ); await bluebird.map( uncategorizedTransactionsIds, (uncategorizedTransactionId: number) => this.removePendingTransaction.removePendingTransaction( uncategorizedTransactionId, trx, ), { concurrency: CONCURRENCY_ASYNC }, ); } /** * Syncs the Plaid item last transaction cursor. * @param {string} itemId - Plaid item ID. * @param {string} lastCursor - Last transaction cursor. * @return {Promise} */ public async syncTransactionsCursor( plaidItemId: string, lastCursor: string, trx?: Knex.Transaction, ): Promise { await this.plaidItemModel() .query(trx) .findOne({ plaidItemId }) .patch({ lastCursor }); } /** * Updates the last feeds updated at of the given Plaid accounts ids. * @param {string[]} plaidAccountIds - Plaid accounts ids. * @return {Promise} */ public async updateLastFeedsUpdatedAt( plaidAccountIds: string[], trx?: Knex.Transaction, ): Promise { await this.accountModel() .query(trx) .whereIn('plaid_account_id', plaidAccountIds) .patch({ lastFeedsUpdatedAt: new Date(), }); } /** * Updates the accounts feed active status of the given Plaid accounts ids. * @param {number[]} plaidAccountIds - Plaid accounts ids. * @param {boolean} isFeedsActive - Feeds active status. * @returns {Promise} */ public async updateAccountsFeedsActive( plaidAccountIds: string[], isFeedsActive: boolean = true, trx?: Knex.Transaction, ): Promise { await this.accountModel() .query(trx) .whereIn('plaid_account_id', plaidAccountIds) .patch({ isFeedsActive, }); } }