feat: process pending transactions from lunchflow (#731)

* feat(config): add Lunchflow runtime configuration flags

* feat(api): add include_pending parameter to Lunchflow API

* feat(processor): add pending metadata support to Lunchflow processor

* feat(processor): generate temporary IDs for pending transactions

* feat(importer): integrate pending transaction support in sync

* fix(importer): improve deduplication for transactions without IDs

* feat(model): add Lunchflow pending support to Transaction scopes

* test: add Lunchflow processor pending metadata tests

* docs: update AGENTS.md for Lunchflow pending support

* chore: remove unused variable

* fix: simplify key check

* fix: dotenv-linter key order

* fix: avoid collapsing distinct pending transactions

* fix: prevent unbounded raw payload growth for blank IDs
This commit is contained in:
AdamWHY2K
2026-01-22 23:53:24 +00:00
committed by GitHub
parent 2c827fbc88
commit 3f5fff27ea
12 changed files with 587 additions and 15 deletions

View File

@@ -0,0 +1,28 @@
# Shared concern for generating content-based hashes for Lunchflow transactions
# Used by both the importer (for deduplication) and processor (for temporary external IDs)
module LunchflowTransactionHash
extend ActiveSupport::Concern
private
# Generate a content-based hash for a transaction
# This creates a deterministic identifier based on transaction attributes
# Used for:
# - Deduplicating blank-ID transactions in the importer
# - Generating temporary external IDs in the processor
#
# @param tx [Hash] Transaction data with indifferent access
# @return [String] MD5 hash of transaction attributes
def content_hash_for_transaction(tx)
attributes = [
tx[:accountId],
tx[:amount],
tx[:currency],
tx[:date],
tx[:merchant],
tx[:description]
].compact.join("|")
Digest::MD5.hexdigest(attributes)
end
end

View File

@@ -2,8 +2,10 @@ require "digest/md5"
class LunchflowEntry::Processor
include CurrencyNormalizable
include LunchflowTransactionHash
# lunchflow_transaction is the raw hash fetched from Lunchflow API and converted to JSONB
# Transaction structure: { id, accountId, amount, currency, date, merchant, description }
# Transaction structure: { id, accountId, amount, currency, date, merchant, description, isPending }
def initialize(lunchflow_transaction, lunchflow_account:)
@lunchflow_transaction = lunchflow_transaction
@lunchflow_account = lunchflow_account
@@ -26,7 +28,8 @@ class LunchflowEntry::Processor
name: name,
source: "lunchflow",
merchant: merchant,
notes: notes
notes: notes,
extra: extra_metadata
)
rescue ArgumentError => e
# Re-raise validation errors (missing required fields, invalid data)
@@ -61,10 +64,46 @@ class LunchflowEntry::Processor
def external_id
id = data[:id].presence
raise ArgumentError, "Lunchflow transaction missing required field 'id'" unless id
# For pending transactions, Lunchflow may return blank/nil IDs
# Generate a stable temporary ID based on transaction attributes
if id.blank?
# Create a deterministic hash from key transaction attributes
# This ensures the same pending transaction gets the same ID across syncs
base_temp_id = content_hash_for_transaction(data)
temp_id_with_prefix = "lunchflow_pending_#{base_temp_id}"
# Handle collisions: if this external_id already exists for this account,
# append a counter to make it unique. This prevents multiple pending transactions
# with identical attributes (e.g., two same-day Uber rides) from colliding.
# We check both the account's entries and the current raw payload being processed.
final_id = temp_id_with_prefix
counter = 1
while entry_exists_with_external_id?(final_id)
final_id = "#{temp_id_with_prefix}_#{counter}"
counter += 1
end
if counter > 1
Rails.logger.debug "Lunchflow: Collision detected, using #{final_id} for pending transaction: #{data[:merchant]} #{data[:amount]} #{data[:currency]}"
else
Rails.logger.debug "Lunchflow: Generated temporary ID #{final_id} for pending transaction: #{data[:merchant]} #{data[:amount]} #{data[:currency]}"
end
return final_id
end
"lunchflow_#{id}"
end
def entry_exists_with_external_id?(external_id)
return false unless account.present?
# Check if an entry with this external_id already exists in the account
account.entries.exists?(external_id: external_id, source: "lunchflow")
end
def name
data[:merchant].presence || "Unknown transaction"
end
@@ -141,4 +180,17 @@ class LunchflowEntry::Processor
Rails.logger.error("Failed to parse Lunchflow transaction date '#{data[:date]}': #{e.message}")
raise ArgumentError, "Unable to parse transaction date: #{data[:date].inspect}"
end
# Build extra metadata hash with pending status
# Lunchflow API field: isPending (boolean)
def extra_metadata
metadata = {}
# Store pending status from Lunchflow API when present
if data.key?(:isPending)
metadata[:lunchflow] = { pending: ActiveModel::Type::Boolean.new.cast(data[:isPending]) }
end
metadata
end
end

View File

@@ -1,4 +1,6 @@
class LunchflowItem::Importer
include LunchflowTransactionHash
attr_reader :lunchflow_item, :lunchflow_provider
def initialize(lunchflow_item, lunchflow_provider:)
@@ -183,15 +185,23 @@ class LunchflowItem::Importer
def fetch_and_store_transactions(lunchflow_account)
start_date = determine_sync_start_date(lunchflow_account)
Rails.logger.info "LunchflowItem::Importer - Fetching transactions for account #{lunchflow_account.account_id} from #{start_date}"
include_pending = Rails.configuration.x.lunchflow.include_pending
Rails.logger.info "LunchflowItem::Importer - Fetching transactions for account #{lunchflow_account.account_id} from #{start_date} (include_pending=#{include_pending})"
begin
# Fetch transactions
transactions_data = lunchflow_provider.get_account_transactions(
lunchflow_account.account_id,
start_date: start_date
start_date: start_date,
include_pending: include_pending
)
# Optional: Debug logging
if Rails.configuration.x.lunchflow.debug_raw
Rails.logger.debug "Lunchflow raw response: #{transactions_data.to_json}"
end
# Validate response structure
unless transactions_data.is_a?(Hash)
Rails.logger.error "LunchflowItem::Importer - Invalid transactions_data format for account #{lunchflow_account.account_id}"
@@ -207,17 +217,38 @@ class LunchflowItem::Importer
existing_transactions = lunchflow_account.raw_transactions_payload.to_a
# Build set of existing transaction IDs for efficient lookup
# For transactions with IDs: use the ID directly
# For transactions without IDs (blank/nil): use content hash to prevent duplicate storage
existing_ids = existing_transactions.map do |tx|
tx.with_indifferent_access[:id]
end.to_set
tx_with_access = tx.with_indifferent_access
tx_id = tx_with_access[:id]
if tx_id.blank?
# Generate content hash for blank-ID transactions to detect duplicates
content_hash_for_transaction(tx_with_access)
else
tx_id
end
end.compact.to_set
# Filter to ONLY truly new transactions (skip duplicates)
# Transactions are immutable on the bank side, so we don't need to update them
# For transactions WITH IDs: skip if ID already exists (true duplicates)
# For transactions WITHOUT IDs: skip if content hash exists (prevents unbounded growth)
# Note: Pending transactions may update from pending→posted, but we treat them as immutable snapshots
new_transactions = transactions_data[:transactions].select do |tx|
next false unless tx.is_a?(Hash)
tx_id = tx.with_indifferent_access[:id]
tx_id.present? && !existing_ids.include?(tx_id)
tx_with_access = tx.with_indifferent_access
tx_id = tx_with_access[:id]
if tx_id.blank?
# Use content hash to detect if we've already stored this exact transaction
content_hash = content_hash_for_transaction(tx_with_access)
!existing_ids.include?(content_hash)
else
# If has ID, only include if not already stored
!existing_ids.include?(tx_id)
end
end
if new_transactions.any?

View File

@@ -30,8 +30,8 @@ class Provider::Lunchflow
# Get transactions for a specific account
# Returns: { transactions: [...], total: N }
# Transaction structure: { id, accountId, amount, currency, date, merchant, description }
def get_account_transactions(account_id, start_date: nil, end_date: nil)
# Transaction structure: { id, accountId, amount, currency, date, merchant, description, isPending }
def get_account_transactions(account_id, start_date: nil, end_date: nil, include_pending: false)
query_params = {}
if start_date
@@ -42,6 +42,10 @@ class Provider::Lunchflow
query_params[:end_date] = end_date.to_date.to_s
end
if include_pending
query_params[:include_pending] = true
end
path = "/accounts/#{ERB::Util.url_encode(account_id.to_s)}/transactions"
path += "?#{URI.encode_www_form(query_params)}" unless query_params.empty?

View File

@@ -35,6 +35,7 @@ class Transaction < ApplicationRecord
where(<<~SQL.squish)
(transactions.extra -> 'simplefin' ->> 'pending')::boolean = true
OR (transactions.extra -> 'plaid' ->> 'pending')::boolean = true
OR (transactions.extra -> 'lunchflow' ->> 'pending')::boolean = true
SQL
}
@@ -42,6 +43,7 @@ class Transaction < ApplicationRecord
where(<<~SQL.squish)
(transactions.extra -> 'simplefin' ->> 'pending')::boolean IS DISTINCT FROM true
AND (transactions.extra -> 'plaid' ->> 'pending')::boolean IS DISTINCT FROM true
AND (transactions.extra -> 'lunchflow' ->> 'pending')::boolean IS DISTINCT FROM true
SQL
}
@@ -63,7 +65,8 @@ class Transaction < ApplicationRecord
def pending?
extra_data = extra.is_a?(Hash) ? extra : {}
ActiveModel::Type::Boolean.new.cast(extra_data.dig("simplefin", "pending")) ||
ActiveModel::Type::Boolean.new.cast(extra_data.dig("plaid", "pending"))
ActiveModel::Type::Boolean.new.cast(extra_data.dig("plaid", "pending")) ||
ActiveModel::Type::Boolean.new.cast(extra_data.dig("lunchflow", "pending"))
rescue
false
end