feat(server): move updating plaid transactions to background job

This commit is contained in:
Ahmed Bouhuolia
2024-02-03 13:59:46 +02:00
parent b940c6dd17
commit e0ddcb022a
16 changed files with 150 additions and 68 deletions

View File

@@ -1,7 +1,7 @@
import { Inject, Service } from 'typedi';
import { PlaidLinkTokenService } from './PlaidLinkToken';
import { PlaidItemService } from './PlaidItem';
import { PlaidItemDTO } from './_types';
import { PlaidItemDTO } from '@/interfaces';
@Service()
export class PlaidApplication {

View File

@@ -0,0 +1,39 @@
import Container, { Service } from 'typedi';
import { PlaidUpdateTransactions } from './PlaidUpdateTransactions';
import { IPlaidItemCreatedEventPayload } from '@/interfaces';
@Service()
export class PlaidFetchTransactionsJob {
/**
* Constructor method.
*/
constructor(agenda) {
agenda.define(
'plaid-update-account-transactions',
{ priority: 'high', concurrency: 2 },
this.handler
);
}
/**
* Triggers the function.
*/
private handler = async (job, done: Function) => {
const { tenantId, plaidItemId } = job.attrs
.data as IPlaidItemCreatedEventPayload;
const plaidFetchTransactionsService = Container.get(
PlaidUpdateTransactions
);
try {
await plaidFetchTransactionsService.updateTransactions(
tenantId,
plaidItemId
);
done();
} catch (error) {
console.log(error);
done(error);
}
};
}

View File

@@ -1,8 +1,12 @@
import { Inject, Service } from 'typedi';
import { PlaidClientWrapper } from '@/lib/Plaid';
import { PlaidItemDTO } from './_types';
import HasTenancyService from '@/services/Tenancy/TenancyService';
import { PlaidUpdateTransactions } from '@/lib/Plaid/PlaidUpdateTransactions';
import { EventPublisher } from '@/lib/EventPublisher/EventPublisher';
import events from '@/subscribers/events';
import {
IPlaidItemCreatedEventPayload,
PlaidItemDTO,
} from '@/interfaces/Plaid';
@Service()
export class PlaidItemService {
@@ -10,12 +14,14 @@ export class PlaidItemService {
private tenancy: HasTenancyService;
@Inject()
private plaidUpdateTranasctions: PlaidUpdateTransactions;
private eventPublisher: EventPublisher;
/**
*
* @param {number} tenantId
* @param {PlaidItemDTO} itemDTO
* Exchanges the public token to get access token and item id and then creates
* a new Plaid item.
* @param {number} tenantId
* @param {PlaidItemDTO} itemDTO
* @returns {Promise<void>}
*/
public async item(tenantId: number, itemDTO: PlaidItemDTO) {
const { PlaidItem } = this.tenancy.models(tenantId);
@@ -36,7 +42,12 @@ export class PlaidItemService {
plaidItemId,
plaidInstitutionId: institutionId,
});
this.plaidUpdateTranasctions.updateTransactions(tenantId, plaidItemId);
// Triggers `onPlaidItemCreated` event.
await this.eventPublisher.emitAsync(events.plaid.onItemCreated, {
tenantId,
plaidAccessToken,
plaidItemId,
plaidInstitutionId: institutionId,
} as IPlaidItemCreatedEventPayload);
}
}

View File

@@ -0,0 +1,126 @@
import * as R from 'ramda';
import { Inject, Service } from 'typedi';
import bluebird from 'bluebird';
import { entries, groupBy } from 'lodash';
import { CreateAccount } from '@/services/Accounts/CreateAccount';
import { PlaidAccount, PlaidTransaction } from '@/interfaces';
import {
transformPlaidAccountToCreateAccount,
transformPlaidTrxsToCashflowCreate,
} from './utils';
import NewCashflowTransactionService from '@/services/Cashflow/NewCashflowTransactionService';
import HasTenancyService from '@/services/Tenancy/TenancyService';
@Service()
export class PlaidSyncDb {
@Inject()
private tenancy: HasTenancyService;
@Inject()
private createAccountService: CreateAccount;
@Inject()
private createCashflowTransactionService: NewCashflowTransactionService;
/**
* Syncs the plaid accounts to the system accounts.
* @param {number} tenantId Tenant ID.
* @param {PlaidAccount[]} plaidAccounts
* @returns {Promise<void>}
*/
public async syncBankAccounts(
tenantId: number,
plaidAccounts: PlaidAccount[]
): Promise<void> {
const accountCreateDTOs = R.map(transformPlaidAccountToCreateAccount)(
plaidAccounts
);
await bluebird.map(
accountCreateDTOs,
(createAccountDTO: any) =>
this.createAccountService.createAccount(tenantId, createAccountDTO),
{ concurrency: 10 }
);
}
/**
* Synsc the Plaid transactions to the system GL entries.
* @param {number} tenantId - Tenant ID.
* @param {number} plaidAccountId - Plaid account ID.
* @param {PlaidTransaction[]} plaidTranasctions - Plaid transactions
*/
public async syncAccountTranactions(
tenantId: number,
plaidAccountId: number,
plaidTranasctions: PlaidTransaction[]
): Promise<void> {
const { Account } = this.tenancy.models(tenantId);
const cashflowAccount = await Account.query()
.findOne({ plaidAccountId })
.throwIfNotFound();
const openingEquityBalance = await Account.query().findOne(
'slug',
'opening-balance-equity'
);
// Transformes the Plaid transactions to cashflow create DTOs.
const transformTransaction = transformPlaidTrxsToCashflowCreate(
cashflowAccount.id,
openingEquityBalance.id
);
const accountsCashflowDTO = R.map(transformTransaction)(plaidTranasctions);
// Creating account transaction queue.
await bluebird.map(
accountsCashflowDTO,
(cashflowDTO) =>
this.createCashflowTransactionService.newCashflowTransaction(
tenantId,
cashflowDTO
),
{ concurrency: 10 }
);
}
/**
* Syncs the accounts transactions in paraller under controlled concurrency.
* @param {number} tenantId
* @param {PlaidTransaction[]} plaidTransactions
*/
public async syncAccountsTransactions(
tenantId: number,
plaidAccountsTransactions: PlaidTransaction[]
): Promise<void> {
const groupedTrnsxByAccountId = entries(
groupBy(plaidAccountsTransactions, 'account_id')
);
await bluebird.map(
groupedTrnsxByAccountId,
([plaidAccountId, plaidTransactions]: [number, PlaidTransaction[]]) => {
return this.syncAccountTranactions(
tenantId,
plaidAccountId,
plaidTransactions
);
},
{ concurrency: 10 }
);
}
/**
* Syncs the Plaid item last transaction cursor.
* @param {number} tenantId - Tenant ID.
* @param {string} itemId - Plaid item ID.
* @param {string} lastCursor - Last transaction cursor.
*/
public async syncTransactionsCursor(
tenantId: number,
plaidItemId: string,
lastCursor: string
) {
const { PlaidItem } = this.tenancy.models(tenantId);
await PlaidItem.query().findById(plaidItemId).patch({ lastCursor });
}
}

View File

@@ -0,0 +1,104 @@
import HasTenancyService from '@/services/Tenancy/TenancyService';
import { Inject, Service } from 'typedi';
import { PlaidClientWrapper } from '@/lib/Plaid/Plaid';
import { PlaidSyncDb } from './PlaidSyncDB';
import { PlaidFetchedTransactionsUpdates } from '@/interfaces';
@Service()
export class PlaidUpdateTransactions {
@Inject()
private tenancy: HasTenancyService;
@Inject()
private plaidSync: PlaidSyncDb;
/**
* Handles the fetching and storing of new, modified, or removed transactions
* @param {number} tenantId Tenant ID.
* @param {string} plaidItemId the Plaid ID for the item.
*/
public async updateTransactions(tenantId: number, plaidItemId: string) {
// Fetch new transactions from plaid api.
const { added, modified, removed, cursor, accessToken } =
await this.fetchTransactionUpdates(tenantId, plaidItemId);
const request = { access_token: accessToken };
const plaidInstance = new PlaidClientWrapper();
const {
data: { accounts },
} = await plaidInstance.accountsGet(request);
// Update the DB.
await this.plaidSync.syncBankAccounts(tenantId, accounts);
await this.plaidSync.syncAccountsTransactions(
tenantId,
added.concat(modified)
);
await this.plaidSync.syncTransactionsCursor(tenantId, plaidItemId, cursor);
return {
addedCount: added.length,
modifiedCount: modified.length,
removedCount: removed.length,
};
}
/**
* Fetches transactions from the `Plaid API` for a given item.
* @param {number} tenantId - Tenant ID.
* @param {string} plaidItemId - The Plaid ID for the item.
* @returns {Promise<PlaidFetchedTransactionsUpdates>}
*/
private async fetchTransactionUpdates(
tenantId: number,
plaidItemId: string
): Promise<PlaidFetchedTransactionsUpdates> {
// the transactions endpoint is paginated, so we may need to hit it multiple times to
// retrieve all available transactions.
const { PlaidItem } = this.tenancy.models(tenantId);
const plaidItem = await PlaidItem.query().findOne(
'plaidItemId',
plaidItemId
);
if (!plaidItem) {
throw new Error('The given Plaid item id is not found.');
}
const { plaidAccessToken, lastCursor } = plaidItem;
let cursor = lastCursor;
// New transaction updates since "cursor"
let added = [];
let modified = [];
// Removed transaction ids
let removed = [];
let hasMore = true;
const batchSize = 100;
try {
// Iterate through each page of new transaction updates for item
/* eslint-disable no-await-in-loop */
while (hasMore) {
const request = {
access_token: plaidAccessToken,
cursor: cursor,
count: batchSize,
};
const plaidInstance = new PlaidClientWrapper();
const response = await plaidInstance.transactionsSync(request);
const data = response.data;
// Add this page of results
added = added.concat(data.added);
modified = modified.concat(data.modified);
removed = removed.concat(data.removed);
hasMore = data.has_more;
// Update cursor to the next cursor
cursor = data.next_cursor;
}
} catch (err) {
console.error(`Error fetching transactions: ${err.message}`);
cursor = lastCursor;
}
return { added, modified, removed, cursor, accessToken: plaidAccessToken };
}
}

View File

@@ -1,4 +0,0 @@
export interface PlaidItemDTO {
publicToken: string;
institutionId: string;
}

View File

@@ -0,0 +1,34 @@
import { Inject, Service } from 'typedi';
import { EventSubscriber } from '@/lib/EventPublisher/EventPublisher';
import { IPlaidItemCreatedEventPayload } from '@/interfaces/Plaid';
import events from '@/subscribers/events';
@Service()
export class PlaidUpdateTransactionsOnItemCreatedSubscriber extends EventSubscriber {
@Inject('agenda')
private agenda: any;
/**
* Constructor method.
*/
public attach(bus) {
bus.subscribe(
events.plaid.onItemCreated,
this.handleUpdateTransactionsOnItemCreated
);
}
/**
* Updates the Plaid item transactions
* @param {IPlaidItemCreatedEventPayload} payload - Event payload.
*/
private handleUpdateTransactionsOnItemCreated = async ({
tenantId,
plaidItemId,
plaidAccessToken,
plaidInstitutionId,
}: IPlaidItemCreatedEventPayload) => {
const payload = { tenantId, plaidItemId };
await this.agenda.now('plaid-update-account-transactions', payload);
};
}

View File

@@ -0,0 +1,54 @@
import * as R from 'ramda';
import { IAccountCreateDTO, ICashflowNewCommandDTO } from '@/interfaces';
import { PlaidAccount, PlaidTransaction } from './_types';
/**
* Transformes the Plaid account to create cashflow account DTO.
* @param {PlaidAccount} plaidAccount
* @returns {IAccountCreateDTO}
*/
export const transformPlaidAccountToCreateAccount = (
plaidAccount: PlaidAccount
): IAccountCreateDTO => {
return {
name: plaidAccount.name,
code: '',
description: plaidAccount.official_name,
currencyCode: plaidAccount.balances.iso_currency_code,
accountType: 'cash',
active: true,
plaidAccountId: plaidAccount.account_id,
};
};
/**
* Transformes the plaid transaction to cashflow create DTO.
* @param {number} cashflowAccountId - Cashflow account ID.
* @param {number} creditAccountId - Credit account ID.
* @param {PlaidTransaction} plaidTranasction - Plaid transaction.
* @returns {ICashflowNewCommandDTO}
*/
export const transformPlaidTrxsToCashflowCreate = R.curry(
(
cashflowAccountId: number,
creditAccountId: number,
plaidTranasction: PlaidTransaction
): ICashflowNewCommandDTO => {
return {
date: plaidTranasction.date,
transactionType: 'OwnerContribution',
description: plaidTranasction.name,
amount: plaidTranasction.amount,
exchangeRate: 1,
currencyCode: plaidTranasction.iso_currency_code,
creditAccountId,
cashflowAccountId,
// transactionNumber: string;
// referenceNo: string;
publish: true,
};
}
);