feat(server): wip syncing Plaid transactions

This commit is contained in:
Ahmed Bouhuolia
2024-02-02 02:23:49 +02:00
parent b43cd26ecc
commit b940c6dd17
15 changed files with 491 additions and 7 deletions

View File

@@ -15,10 +15,17 @@ export class PlaidBankingController extends BaseController {
const router = Router();
router.post('/link-token', this.linkToken.bind(this));
router.post('/exchange-token', this.exchangeToken.bind(this));
return router;
}
/**
* Retrieves the Plaid link token.
* @param {Request} req
* @param {response} res
* @returns {Response}
*/
private async linkToken(req: Request, res: Response) {
const { tenantId } = req;
@@ -26,4 +33,21 @@ export class PlaidBankingController extends BaseController {
return res.status(200).send(linkToken);
}
/**
*
* @param {Request} req
* @param {response} res
* @returns {Response}
*/
public async exchangeToken(req: Request, res: Response) {
const { tenantId } = req;
const { public_token, institution_id } = req.body;
await this.plaidApp.exchangeToken(tenantId, {
institutionId: institution_id,
publicToken: public_token,
});
return res.status(200).send({});
}
}

View File

@@ -0,0 +1,14 @@
exports.up = function (knex) {
return knex.schema.createTable('plaid_items', (table) => {
table.increments('id');
table.integer('tenant_id').unsigned();
table.string('plaid_item_id');
table.string('plaid_institution_id');
table.string('plaid_access_token');
table.string('last_cursor');
table.string('status');
table.timestamps();
});
};
exports.down = function (knex) {};

View File

@@ -0,0 +1,7 @@
exports.up = function (knex) {
return knex.schema.table('accounts', (table) => {
table.string('plaid_account_id');
});
};
exports.down = function (knex) {};

View File

@@ -6,12 +6,13 @@ export interface IAccountDTO {
code: string;
description: string;
accountType: string;
parentAccountId: number;
parentAccountId?: number;
active: boolean;
}
export interface IAccountCreateDTO extends IAccountDTO {
currencyCode?: string;
plaidAccountId?: string;
}
export interface IAccountEditDTO extends IAccountDTO {}

View File

@@ -0,0 +1,29 @@
import Container, { Service } from 'typedi';
@Service()
export class PlaidFetchTransactionsJob {
/**
* Constructor method.
*/
constructor(agenda) {
agenda.define(
'plaid-update-account-transactions',
{ priority: 'high', concurrency: 2 },
this.handler
);
}
/**
* Triggers the function.
*/
private handler = async (job, done: Function) => {
const {} = job.attrs.data;
try {
done();
} catch (error) {
console.log(error);
done(error);
}
};
}

View File

@@ -0,0 +1,135 @@
import * as R from 'ramda';
import { Inject, Service } from 'typedi';
import async from 'async';
import { forOwn, groupBy } from 'lodash';
import { CreateAccount } from '@/services/Accounts/CreateAccount';
import {
PlaidAccount,
PlaidTransaction,
SyncAccountsTransactionsTask,
} from './_types';
import {
transformPlaidAccountToCreateAccount,
transformPlaidTrxsToCashflowCreate,
} from './utils';
import NewCashflowTransactionService from '@/services/Cashflow/NewCashflowTransactionService';
import HasTenancyService from '@/services/Tenancy/TenancyService';
import { ICashflowNewCommandDTO } from '@/interfaces';
@Service()
export class PlaidSyncDb {
@Inject()
private tenancy: HasTenancyService;
@Inject()
private createAccountService: CreateAccount;
@Inject()
private createCashflowTransactionService: NewCashflowTransactionService;
/**
* Syncs the plaid accounts to the system accounts.
* @param {number} tenantId Tenant ID.
* @param {PlaidAccount[]} plaidAccounts
*/
public syncBankAccounts(tenantId: number, plaidAccounts: PlaidAccount[]) {
const accountCreateDTOs = R.map(transformPlaidAccountToCreateAccount)(
plaidAccounts
);
accountCreateDTOs.map((createDTO) => {
return this.createAccountService.createAccount(tenantId, createDTO);
});
}
/**
* Synsc the Plaid transactions to the system GL entries.
* @param {number} tenantId - Tenant ID.
* @param {number} plaidAccountId - Plaid account ID.
* @param {PlaidTransaction[]} plaidTranasctions - Plaid transactions
*/
public async syncAccountTranactions(
tenantId: number,
plaidAccountId: number,
plaidTranasctions: PlaidTransaction[]
): Promise<void> {
const { Account } = await this.tenancy.models(tenantId);
const cashflowAccount = await Account.query()
.findOne('plaidAccountId', plaidAccountId)
.throwIfNotFound();
const openingEquityBalance = await Account.query().findOne(
'slug',
'opening-balance-equity'
);
// Transformes the Plaid transactions to cashflow create DTOs.
const transformTransaction = transformPlaidTrxsToCashflowCreate(
cashflowAccount.id,
openingEquityBalance.id
);
const accountsCashflowDTO = R.map(transformTransaction)(plaidTranasctions);
// Creating account transaction queue.
const createAccountTransactionsQueue = async.queue(
(cashflowDTO: ICashflowNewCommandDTO) =>
this.createCashflowTransactionService.newCashflowTransaction(
tenantId,
cashflowDTO
),
10
);
accountsCashflowDTO.forEach((cashflowDTO) => {
createAccountTransactionsQueue.push(cashflowDTO);
});
await createAccountTransactionsQueue.drain();
}
/**
* Syncs the accounts transactions in paraller under controlled concurrency.
* @param {number} tenantId
* @param {PlaidTransaction[]} plaidTransactions
*/
public async syncAccountsTransactions(
tenantId: number,
plaidAccountsTransactions: PlaidTransaction[]
): Promise<void> {
const groupedTrnsxByAccountId = groupBy(
plaidAccountsTransactions,
'account_id'
);
const syncAccountsTrnsx = async.queue(
({
tenantId,
plaidAccountId,
plaidTransactions,
}: SyncAccountsTransactionsTask) => {
return this.syncAccountTranactions(
tenantId,
plaidAccountId,
plaidTransactions
);
},
2
);
forOwn(groupedTrnsxByAccountId, (plaidTransactions, plaidAccountId) => {
syncAccountsTrnsx.push({ tenantId, plaidAccountId, plaidTransactions });
});
await syncAccountsTrnsx.drain();
}
/**
* Syncs the Plaid item last transaction cursor.
* @param {number} tenantId -
* @param {string} itemId -
* @param {string} lastCursor -
*/
public async syncTransactionsCursor(
tenantId: number,
plaidItemId: string,
lastCursor: string
) {
const { PlaidItem } = this.tenancy.models(tenantId);
await PlaidItem.query().findById(plaidItemId).patch({ lastCursor });
}
}

View File

@@ -0,0 +1,104 @@
import HasTenancyService from '@/services/Tenancy/TenancyService';
import { Inject, Service } from 'typedi';
import { PlaidClientWrapper } from './Plaid';
import { PlaidSyncDb } from './PlaidSyncDB';
import { PlaidFetchedTransactionsUpdates } from './_types';
@Service()
export class PlaidUpdateTransactions {
@Inject()
private tenancy: HasTenancyService;
@Inject()
private plaidSync: PlaidSyncDb;
/**
* Handles the fetching and storing of new, modified, or removed transactions
* @param {number} tenantId Tenant ID.
* @param {string} plaidItemId the Plaid ID for the item.
*/
public async updateTransactions(tenantId: number, plaidItemId: string) {
// Fetch new transactions from plaid api.
const { added, modified, removed, cursor, accessToken } =
await this.fetchTransactionUpdates(tenantId, plaidItemId);
const request = { access_token: accessToken };
const plaidInstance = new PlaidClientWrapper();
const {
data: { accounts },
} = await plaidInstance.accountsGet(request);
// Update the DB.
await this.plaidSync.syncBankAccounts(tenantId, accounts);
await this.plaidSync.syncAccountsTransactions(
tenantId,
added.concat(modified)
);
await this.plaidSync.syncTransactionsCursor(tenantId, plaidItemId, cursor);
return {
addedCount: added.length,
modifiedCount: modified.length,
removedCount: removed.length,
};
}
/**
* Fetches transactions from the `Plaid API` for a given item.
* @param {number} tenantId - Tenant ID.
* @param {string} plaidItemId - The Plaid ID for the item.
* @returns {Promise<PlaidFetchedTransactionsUpdates>}
*/
public async fetchTransactionUpdates(
tenantId: number,
plaidItemId: string
): Promise<PlaidFetchedTransactionsUpdates> {
// the transactions endpoint is paginated, so we may need to hit it multiple times to
// retrieve all available transactions.
const { PlaidItem } = this.tenancy.models(tenantId);
const plaidItem = await PlaidItem.query().findOne(
'plaidItemId',
plaidItemId
);
if (!plaidItem) {
throw new Error('The given Plaid item id is not found.');
}
const { plaidAccessToken, lastCursor } = plaidItem;
let cursor = lastCursor;
// New transaction updates since "cursor"
let added = [];
let modified = [];
// Removed transaction ids
let removed = [];
let hasMore = true;
const batchSize = 100;
try {
// Iterate through each page of new transaction updates for item
/* eslint-disable no-await-in-loop */
while (hasMore) {
const request = {
access_token: plaidAccessToken,
cursor: cursor,
count: batchSize,
};
const plaidInstance = new PlaidClientWrapper();
const response = await plaidInstance.transactionsSync(request);
const data = response.data;
// Add this page of results
added = added.concat(data.added);
modified = modified.concat(data.modified);
removed = removed.concat(data.removed);
hasMore = data.has_more;
// Update cursor to the next cursor
cursor = data.next_cursor;
}
} catch (err) {
console.error(`Error fetching transactions: ${err.message}`);
cursor = lastCursor;
}
return { added, modified, removed, cursor, accessToken: plaidAccessToken };
}
}

View File

@@ -0,0 +1,41 @@
export interface PlaidAccount {
account_id: string;
balances: {
available: number;
current: number;
iso_currency_code: string;
limit: null;
unofficial_currency_code: null;
};
mask: string;
name: string;
official_name: string;
persistent_account_id: string;
subtype: string;
type: string;
}
export interface PlaidTransaction {
account_id: string;
amount: number;
authorized_data: string;
category: string[];
check_number: number | null;
iso_currency_code: string;
transaction_id: string;
transaction_type: string;
}
export interface PlaidFetchedTransactionsUpdates {
added: any[];
modified: any[];
removed: any[];
accessToken: string;
cursor: string;
}
export interface SyncAccountsTransactionsTask {
tenantId: number;
plaidAccountId: number;
plaidTransactions: PlaidTransaction[];
}

View File

@@ -0,0 +1,41 @@
import * as R from 'ramda';
import { IAccountCreateDTO, ICashflowNewCommandDTO } from '@/interfaces';
import { PlaidAccount, PlaidTransaction } from './_types';
export const transformPlaidAccountToCreateAccount = (
plaidAccount: PlaidAccount
): IAccountCreateDTO => {
return {
name: plaidAccount.name,
code: '',
description: plaidAccount.official_name,
currencyCode: plaidAccount.balances.iso_currency_code,
accountType: 'cash',
active: true,
plaidAccountId: plaidAccount.account_id,
};
};
export const transformPlaidTrxsToCashflowCreate = R.curry(
(
cashflowAccountId: number,
creditAccountId: number,
plaidTranasction: PlaidTransaction,
): ICashflowNewCommandDTO => {
return {
date: plaidTranasction.authorized_data,
transactionType: '',
description: '',
amount: plaidTranasction.amount,
exchangeRate: 1,
currencyCode: plaidTranasction.iso_currency_code,
creditAccountId,
cashflowAccountId,
// transactionNumber: string;
// referenceNo: string;
};
}
);

View File

@@ -61,6 +61,7 @@ import Task from 'models/Task';
import TaxRate from 'models/TaxRate';
import TaxRateTransaction from 'models/TaxRateTransaction';
import Attachment from 'models/Attachment';
import PlaidItem from 'models/PlaidItem';
export default (knex) => {
const models = {
@@ -124,7 +125,8 @@ export default (knex) => {
Task,
TaxRate,
TaxRateTransaction,
Attachment
Attachment,
PlaidItem
};
return mapValues(models, (model) => model.bindKnex(knex));
};

View File

@@ -0,0 +1,24 @@
import TenantModel from 'models/TenantModel';
export default class PlaidItem extends TenantModel {
/**
* Table name.
*/
static get tableName() {
return 'plaid_items';
}
/**
* Timestamps columns.
*/
get timestamps() {
return [];
}
/**
* Relationship mapping.
*/
static get relationMappings() {
return {};
}
}

View File

@@ -1,18 +1,33 @@
import { Inject, Service } from 'typedi';
import { PlaidLinkTokenService } from './PlaidLinkToken';
import { PlaidItemService } from './PlaidItem';
import { PlaidItemDTO } from './_types';
@Service()
export class PlaidApplication {
@Inject()
private getLinkTokenService: PlaidLinkTokenService;
@Inject()
private plaidItemService: PlaidItemService;
/**
*
* @param tenantId
* @param itemId
* @returns
* Retrieves the Plaid link token.
* @param {number} tenantId
* @param {number} itemId
* @returns
*/
public getLinkToken(tenantId: number) {
return this.getLinkTokenService.getLinkToken(tenantId);
}
/**
* Exchanges the Plaid access token.
* @param {number} tenantId
* @param {PlaidItemDTO} itemDTO
* @returns
*/
public exchangeToken(tenantId: number, itemDTO: PlaidItemDTO) {
return this.plaidItemService.item(tenantId, itemDTO);
}
}

View File

@@ -0,0 +1,42 @@
import { Inject, Service } from 'typedi';
import { PlaidClientWrapper } from '@/lib/Plaid';
import { PlaidItemDTO } from './_types';
import HasTenancyService from '@/services/Tenancy/TenancyService';
import { PlaidUpdateTransactions } from '@/lib/Plaid/PlaidUpdateTransactions';
@Service()
export class PlaidItemService {
@Inject()
private tenancy: HasTenancyService;
@Inject()
private plaidUpdateTranasctions: PlaidUpdateTransactions;
/**
*
* @param {number} tenantId
* @param {PlaidItemDTO} itemDTO
*/
public async item(tenantId: number, itemDTO: PlaidItemDTO) {
const { PlaidItem } = this.tenancy.models(tenantId);
const { publicToken, institutionId } = itemDTO;
const plaidInstance = new PlaidClientWrapper();
// exchange the public token for a private access token and store with the item.
const response = await plaidInstance.itemPublicTokenExchange({
public_token: publicToken,
});
const plaidAccessToken = response.data.access_token;
const plaidItemId = response.data.item_id;
const plaidItem = await PlaidItem.query().insertAndFetch({
tenantId,
plaidAccessToken,
plaidItemId,
plaidInstitutionId: institutionId,
});
this.plaidUpdateTranasctions.updateTransactions(tenantId, plaidItemId);
}
}

View File

@@ -0,0 +1,4 @@
export interface PlaidItemDTO {
publicToken: string;
institutionId: string;
}

View File

@@ -86,6 +86,7 @@ export default class NewCashflowTransactionService {
'cashflowAccountId',
'creditAccountId',
'branchId',
'plaidAccountId'
]);
// Retreive the next invoice number.
const autoNextNumber =
@@ -124,7 +125,7 @@ export default class NewCashflowTransactionService {
public newCashflowTransaction = async (
tenantId: number,
newTransactionDTO: ICashflowNewCommandDTO,
userId: number
userId?: number
): Promise<{ cashflowTransaction: ICashflowTransaction }> => {
const { CashflowTransaction, Account } = this.tenancy.models(tenantId);