feat: add socker connection between client and server

This commit is contained in:
Ahmed Bouhuolia
2024-02-24 00:18:48 +02:00
parent 1fd8a53ed1
commit 2d3544fe37
16 changed files with 357 additions and 27 deletions

View File

@@ -3,6 +3,7 @@ import { PlaidApplication } from '@/services/Banking/Plaid/PlaidApplication';
import { Request, Response } from 'express';
import { Inject, Service } from 'typedi';
import BaseController from '../BaseController';
import { PlaidWebhookTenantBootMiddleware } from '@/services/Banking/Plaid/PlaidWebhookTenantBootMiddleware';
@Service()
export class Webhooks extends BaseController {
@@ -15,6 +16,7 @@ export class Webhooks extends BaseController {
router() {
const router = Router();
router.use(PlaidWebhookTenantBootMiddleware);
router.post('/plaid', this.plaidWebhooks.bind(this));
return router;
@@ -34,8 +36,6 @@ export class Webhooks extends BaseController {
item_id: plaidItemId,
} = req.body;
console.log(req.body, 'triggered');
await this.plaidApp.webhooks(
tenantId,
plaidItemId,

View File

@@ -5,6 +5,8 @@ import boom from 'express-boom';
import errorHandler from 'errorhandler';
import bodyParser from 'body-parser';
import fileUpload from 'express-fileupload';
import { Server } from 'socket.io';
import Container from 'typedi';
import routes from 'api';
import LoggerMiddleware from '@/api/middleware/LoggerMiddleware';
import AgendashController from '@/api/controllers/Agendash';
@@ -72,4 +74,32 @@ export default ({ app }) => {
app.use((req: Request, res: Response, next: NextFunction) => {
return res.boom.notFound();
});
const server = app.listen(app.get('port'), (err) => {
if (err) {
console.log(err);
process.exit(1);
return;
}
console.log(`
################################################
Server listening on port: ${app.get('port')}
################################################
`);
});
const io = new Server(server, {});
// Set socket.io listeners.
io.on('connection', (socket) => {
console.log('SOCKET CONNECTED');
socket.on('disconnect', () => {
console.log('SOCKET DISCONNECTED');
});
});
// Middleware to pass socket to each request object.
app.use((req: Request, res: Response, next: NextFunction) => {
req.io = io;
next();
});
Container.set('socket', io);
};

View File

@@ -10,19 +10,6 @@ async function startServer() {
// Intiialize all registered loaders.
await loadersFactory({ expressApp: app });
app.listen(app.get('port'), (err) => {
if (err) {
console.log(err);
process.exit(1);
return;
}
console.log(`
################################################
Server listening on port: ${app.get('port')}
################################################
`);
});
}
startServer();

View File

@@ -25,11 +25,15 @@ export class PlaidFetchTransactionsJob {
const plaidFetchTransactionsService = Container.get(
PlaidUpdateTransactions
);
const io = Container.get('socket');
try {
await plaidFetchTransactionsService.updateTransactions(
tenantId,
plaidItemId
);
// Notify the frontend to reflect the new transactions changes.
io.emit('NEW_TRANSACTIONS_DATA', { plaidItemId });
done();
} catch (error) {
console.log(error);

View File

@@ -7,6 +7,7 @@ import {
IPlaidItemCreatedEventPayload,
PlaidItemDTO,
} from '@/interfaces/Plaid';
import SystemPlaidItem from '@/system/models/SystemPlaidItem';
@Service()
export class PlaidItemService {
@@ -29,19 +30,23 @@ export class PlaidItemService {
const plaidInstance = new PlaidClientWrapper();
// exchange the public token for a private access token and store with the item.
// Exchange the public token for a private access token and store with the item.
const response = await plaidInstance.itemPublicTokenExchange({
public_token: publicToken,
});
const plaidAccessToken = response.data.access_token;
const plaidItemId = response.data.item_id;
// Store the Plaid item metadata on tenant scope.
const plaidItem = await PlaidItem.query().insertAndFetch({
tenantId,
plaidAccessToken,
plaidItemId,
plaidInstitutionId: institutionId,
});
// Stores the Plaid item id on system scope.
await SystemPlaidItem.query().insert({ tenantId, plaidItemId });
// Triggers `onPlaidItemCreated` event.
await this.eventPublisher.emitAsync(events.plaid.onItemCreated, {
tenantId,

View File

@@ -0,0 +1,32 @@
import { Request, Response, NextFunction } from 'express';
import { SystemPlaidItem, Tenant } from '@/system/models';
import tenantDependencyInjection from '@/api/middleware/TenantDependencyInjection';
export const PlaidWebhookTenantBootMiddleware = async (
req: Request,
res: Response,
next: NextFunction
) => {
const { item_id: plaidItemId } = req.body;
const plaidItem = await SystemPlaidItem.query().findOne({ plaidItemId });
const notFoundOrganization = () => {
return res.boom.unauthorized('Organization identication not found.', {
errors: [{ type: 'ORGANIZATION.ID.NOT.FOUND', code: 100 }],
});
};
// In case the given organization not found.
if (!plaidItem) {
return notFoundOrganization();
}
const tenant = await Tenant.query()
.findById(plaidItem.tenantId)
.withGraphFetched('metadata');
// When the given organization id not found on the system storage.
if (!tenant) {
return notFoundOrganization();
}
tenantDependencyInjection(req, tenant);
next();
};

View File

@@ -8,17 +8,17 @@ export class PlaidWebooks {
/**
* Listens to Plaid webhooks
* @param {number} tenantId
* @param {string} webhookType
* @param {string} plaidItemId
* @param {string} webhookCode
* @param {number} tenantId - Tenant Id.
* @param {string} webhookType - Webhook type.
* @param {string} plaidItemId - Plaid item Id.
* @param {string} webhookCode - webhook code.
*/
public async webhooks(
tenantId: number,
plaidItemId: string,
webhookType: string,
webhookCode: string
) {
): Promise<void> {
const _webhookType = webhookType.toLowerCase();
// There are five types of webhooks: AUTH, TRANSACTIONS, ITEM, INCOME, and ASSETS.
@@ -43,7 +43,7 @@ export class PlaidWebooks {
webhookType: string,
webhookCode: string,
plaidItemId: string
) {
): Promise<void> {
console.log(
`UNHANDLED ${webhookType} WEBHOOK: ${webhookCode}: Plaid item id ${plaidItemId}: unhandled webhook type received.`
);
@@ -59,7 +59,7 @@ export class PlaidWebooks {
additionalInfo: string,
webhookCode: string,
plaidItemId: string
) {
): void {
console.log(
`WEBHOOK: TRANSACTIONS: ${webhookCode}: Plaid_item_id ${plaidItemId}: ${additionalInfo}`
);

View File

@@ -0,0 +1,15 @@
exports.up = function (knex) {
return knex.schema.createTable('plaid_items', (table) => {
table.bigIncrements('id');
table
.bigInteger('tenant_id')
.unsigned()
.index()
.references('id')
.inTable('tenants');
table.string('plaid_item_id');
table.timestamps();
});
};
exports.down = (knex) => {};

View File

@@ -0,0 +1,49 @@
import { Model } from 'objection';
import SystemModel from '@/system/models/SystemModel';
export default class SystemPlaidItem extends SystemModel {
tenantId: number;
plaidItemId: string;
/**
* Table name.
*/
static get tableName() {
return 'plaid_items';
}
/**
* Timestamps columns.
*/
get timestamps() {
return ['createdAt', 'updatedAt'];
}
/**
* Virtual attributes.
*/
static get virtualAttributes() {
return [];
}
/**
* Relationship mapping.
*/
static get relationMappings() {
const Tenant = require('system/models/Tenant');
return {
/**
* System user may belongs to tenant model.
*/
tenant: {
relation: Model.BelongsToOneRelation,
modelClass: Tenant.default,
join: {
from: 'users.tenantId',
to: 'tenants.id',
},
},
};
}
}

View File

@@ -3,5 +3,13 @@ import TenantMetadata from './TenantMetadata';
import SystemUser from './SystemUser';
import PasswordReset from './PasswordReset';
import Invite from './Invite';
import SystemPlaidItem from './SystemPlaidItem';
export { Tenant, TenantMetadata, SystemUser, PasswordReset, Invite };
export {
Tenant,
TenantMetadata,
SystemUser,
PasswordReset,
Invite,
SystemPlaidItem,
};