fix: Plaid data available syncing

This commit is contained in:
Ahmed Bouhuolia
2024-06-07 01:07:17 +02:00
parent fc9995c4da
commit d27562bd43
7 changed files with 209 additions and 85 deletions

View File

@@ -3,7 +3,11 @@ import { Inject, Service } from 'typedi';
import bluebird from 'bluebird';
import { entries, groupBy } from 'lodash';
import { CreateAccount } from '@/services/Accounts/CreateAccount';
import { PlaidAccount, PlaidTransaction } from '@/interfaces';
import {
IAccountCreateDTO,
PlaidAccount,
PlaidTransaction,
} from '@/interfaces';
import {
transformPlaidAccountToCreateAccount,
transformPlaidTrxsToCashflowCreate,
@@ -11,6 +15,7 @@ import {
import { DeleteCashflowTransaction } from '@/services/Cashflow/DeleteCashflowTransactionService';
import HasTenancyService from '@/services/Tenancy/TenancyService';
import { CashflowApplication } from '@/services/Cashflow/CashflowApplication';
import { Knex } from 'knex';
const CONCURRENCY_ASYNC = 10;
@@ -28,6 +33,35 @@ export class PlaidSyncDb {
@Inject()
private deleteCashflowTransactionService: DeleteCashflowTransaction;
/**
* Syncs the Plaid bank account.
* @param {number} tenantId
* @param {IAccountCreateDTO} createBankAccountDTO
* @param {Knex.Transaction} trx
* @returns {Promise<void>}
*/
public async syncBankAccount(
tenantId: number,
createBankAccountDTO: IAccountCreateDTO,
trx?: Knex.Transaction
) {
const { Account } = this.tenancy.models(tenantId);
const plaidAccount = Account.query().findOne(
'plaidAccountId',
createBankAccountDTO.plaidAccountId
);
// Can't continue if the Plaid account is already created.
if (plaidAccount) {
return;
}
await this.createAccountService.createAccount(
tenantId,
createBankAccountDTO,
trx,
{ ignoreUniqueName: true }
);
}
/**
* Syncs the plaid accounts to the system accounts.
* @param {number} tenantId Tenant ID.
@@ -37,7 +71,8 @@ export class PlaidSyncDb {
public async syncBankAccounts(
tenantId: number,
plaidAccounts: PlaidAccount[],
institution: any
institution: any,
trx?: Knex.Transaction
): Promise<void> {
const transformToPlaidAccounts =
transformPlaidAccountToCreateAccount(institution);
@@ -47,7 +82,7 @@ export class PlaidSyncDb {
await bluebird.map(
accountCreateDTOs,
(createAccountDTO: any) =>
this.createAccountService.createAccount(tenantId, createAccountDTO),
this.syncBankAccount(tenantId, createAccountDTO, trx),
{ concurrency: CONCURRENCY_ASYNC }
);
}
@@ -61,7 +96,8 @@ export class PlaidSyncDb {
public async syncAccountTranactions(
tenantId: number,
plaidAccountId: number,
plaidTranasctions: PlaidTransaction[]
plaidTranasctions: PlaidTransaction[],
trx?: Knex.Transaction
): Promise<void> {
const { Account } = this.tenancy.models(tenantId);
@@ -87,7 +123,8 @@ export class PlaidSyncDb {
(uncategoriedDTO) =>
this.cashflowApp.createUncategorizedTransaction(
tenantId,
uncategoriedDTO
uncategoriedDTO,
trx
),
{ concurrency: 1 }
);
@@ -100,7 +137,8 @@ export class PlaidSyncDb {
*/
public async syncAccountsTransactions(
tenantId: number,
plaidAccountsTransactions: PlaidTransaction[]
plaidAccountsTransactions: PlaidTransaction[],
trx?: Knex.Transaction
): Promise<void> {
const groupedTrnsxByAccountId = entries(
groupBy(plaidAccountsTransactions, 'account_id')
@@ -111,7 +149,8 @@ export class PlaidSyncDb {
return this.syncAccountTranactions(
tenantId,
plaidAccountId,
plaidTransactions
plaidTransactions,
trx
);
},
{ concurrency: CONCURRENCY_ASYNC }
@@ -124,11 +163,12 @@ export class PlaidSyncDb {
*/
public async syncRemoveTransactions(
tenantId: number,
plaidTransactionsIds: string[]
plaidTransactionsIds: string[],
trx?: Knex.Transaction
) {
const { CashflowTransaction } = this.tenancy.models(tenantId);
const cashflowTransactions = await CashflowTransaction.query().whereIn(
const cashflowTransactions = await CashflowTransaction.query(trx).whereIn(
'plaidTransactionId',
plaidTransactionsIds
);
@@ -140,7 +180,8 @@ export class PlaidSyncDb {
(transactionId: number) =>
this.deleteCashflowTransactionService.deleteCashflowTransaction(
tenantId,
transactionId
transactionId,
trx
),
{ concurrency: CONCURRENCY_ASYNC }
);
@@ -155,11 +196,12 @@ export class PlaidSyncDb {
public async syncTransactionsCursor(
tenantId: number,
plaidItemId: string,
lastCursor: string
lastCursor: string,
trx?: Knex.Transaction
) {
const { PlaidItem } = this.tenancy.models(tenantId);
await PlaidItem.query().findOne({ plaidItemId }).patch({ lastCursor });
await PlaidItem.query(trx).findOne({ plaidItemId }).patch({ lastCursor });
}
/**
@@ -169,13 +211,16 @@ export class PlaidSyncDb {
*/
public async updateLastFeedsUpdatedAt(
tenantId: number,
plaidAccountIds: string[]
plaidAccountIds: string[],
trx?: Knex.Transaction
) {
const { Account } = this.tenancy.models(tenantId);
await Account.query().whereIn('plaid_account_id', plaidAccountIds).patch({
lastFeedsUpdatedAt: new Date(),
});
await Account.query(trx)
.whereIn('plaid_account_id', plaidAccountIds)
.patch({
lastFeedsUpdatedAt: new Date(),
});
}
/**
@@ -187,12 +232,15 @@ export class PlaidSyncDb {
public async updateAccountsFeedsActive(
tenantId: number,
plaidAccountIds: string[],
isFeedsActive: boolean = true
isFeedsActive: boolean = true,
trx?: Knex.Transaction
) {
const { Account } = this.tenancy.models(tenantId);
await Account.query().whereIn('plaid_account_id', plaidAccountIds).patch({
isFeedsActive,
});
await Account.query(trx)
.whereIn('plaid_account_id', plaidAccountIds)
.patch({
isFeedsActive,
});
}
}

View File

@@ -3,6 +3,8 @@ import { Inject, Service } from 'typedi';
import { PlaidClientWrapper } from '@/lib/Plaid/Plaid';
import { PlaidSyncDb } from './PlaidSyncDB';
import { PlaidFetchedTransactionsUpdates } from '@/interfaces';
import UnitOfWork from '@/services/UnitOfWork';
import { Knex } from 'knex';
@Service()
export class PlaidUpdateTransactions {
@@ -12,12 +14,40 @@ export class PlaidUpdateTransactions {
@Inject()
private plaidSync: PlaidSyncDb;
@Inject()
private uow: UnitOfWork;
/**
* Handles the fetching and storing of new, modified, or removed transactions
* @param {number} tenantId Tenant ID.
* @param {string} plaidItemId the Plaid ID for the item.
* Handles sync the Plaid item to Bigcaptial under UOW.
* @param {number} tenantId
* @param {number} plaidItemId
* @returns {Promise<{ addedCount: number; modifiedCount: number; removedCount: number; }>}
*/
public async updateTransactions(tenantId: number, plaidItemId: string) {
return this.uow.withTransaction(tenantId, (trx: Knex.Transaction) => {
return this.updateTransactionsWork(tenantId, plaidItemId, trx);
});
}
/**
* Handles the fetching and storing the following:
* - New, modified, or removed transactions.
* - New bank accounts.
* - Last accounts feeds updated at.
* - Turn on the accounts feed flag.
* @param {number} tenantId - Tenant ID.
* @param {string} plaidItemId - The Plaid ID for the item.
* @returns {Promise<{ addedCount: number; modifiedCount: number; removedCount: number; }>}
*/
public async updateTransactionsWork(
tenantId: number,
plaidItemId: string,
trx?: Knex.Transaction
): Promise<{
addedCount: number;
modifiedCount: number;
removedCount: number;
}> {
// Fetch new transactions from plaid api.
const { added, modified, removed, cursor, accessToken } =
await this.fetchTransactionUpdates(tenantId, plaidItemId);
@@ -29,28 +59,42 @@ export class PlaidUpdateTransactions {
} = await plaidInstance.accountsGet(request);
const plaidAccountsIds = accounts.map((a) => a.account_id);
const {
data: { institution },
} = await plaidInstance.institutionsGetById({
institution_id: item.institution_id,
country_codes: ['US', 'UK'],
});
// Update the DB.
await this.plaidSync.syncBankAccounts(tenantId, accounts, institution);
// Sync bank accounts.
await this.plaidSync.syncBankAccounts(tenantId, accounts, institution, trx);
// Sync bank account transactions.
await this.plaidSync.syncAccountsTransactions(
tenantId,
added.concat(modified)
added.concat(modified),
trx
);
// Sync removed transactions.
await this.plaidSync.syncRemoveTransactions(tenantId, removed, trx);
// Sync transactions cursor.
await this.plaidSync.syncTransactionsCursor(
tenantId,
plaidItemId,
cursor,
trx
);
await this.plaidSync.syncRemoveTransactions(tenantId, removed);
await this.plaidSync.syncTransactionsCursor(tenantId, plaidItemId, cursor);
// Update the last feeds updated at of the updated accounts.
await this.plaidSync.updateLastFeedsUpdatedAt(tenantId, plaidAccountsIds);
await this.plaidSync.updateLastFeedsUpdatedAt(
tenantId,
plaidAccountsIds,
trx
);
// Turn on the accounts feeds flag.
await this.plaidSync.updateAccountsFeedsActive(tenantId, plaidAccountsIds);
await this.plaidSync.updateAccountsFeedsActive(
tenantId,
plaidAccountsIds,
true,
trx
);
return {
addedCount: added.length,
modifiedCount: modified.length,