Performance improvements in holding calculation pipeline (#1579)

* Performance improvements in holding calculation pipeline

Investment accounts with large histories were pegging CPU at 100% during
sync. Root cause was a cluster of quadratic and superlinear algorithms in
the inner holding calculation loop. All are replaced with O(1) hash lookups
built from single-pass indexes over the already-loaded data.

Holding::PortfolioCache - load_prices:

  Three O(SxN) patterns inside the per-security loop:

  1. DB prices: `security.prices.where(...)` fired one SQL query per
     security (N+1). Replaced with a single bulk query before the loop:

       Security::Price.where(security_id: ..., date: ...).group_by(&:security_id)

     70 securities -> 70 queries becomes 1.

  2. Trade prices: `trades.select { |t| t.entryable.security_id == id }`
     scanned the full trades array for every security - O(SxT). Replaced
     with trades_by_security_id, pre-indexed once from the loaded array.

  3. Holding prices: `holdings.select { |h| h.security_id == id }` - same
     O(SxH) pattern. Replaced with holdings_by_security_id.

  Prices are now indexed into prices_by_date and prices_by_date_and_source
  hashes during load_prices, making get_price O(1) instead of scanning the
  flat prices array on every lookup.

Holding::PortfolioCache - get_trades / get_price:

  - get_trades(date:): `trades.select { |t| t.date == date }` (O(T) scan)
    replaced with trades_by_date hash (O(1)).

  - get_price: two `prices.select { p.date == date ... }.min_by` linear
    scans replaced with direct hash lookups into prices_by_date and
    prices_by_date_and_source.

Holding::PortfolioCache - collect_unique_securities:

  `holdings.map(&:security)` traversed the security association on every
  holding record (N+1 if not preloaded). Replaced with a pluck of
  security_ids followed by a single Security.where(id: ...) batch load.

Holding::ForwardCalculator / ReverseCalculator:

  `holdings += build_holdings(...)` allocated a new array copy on every
  iteration - O(N) per day x thousands of days = O(D^2) total allocations.
  Replaced with holdings.concat(...) which appends in place, O(1).

Holding::ReverseCalculator - precompute_cost_basis:

  Old: walked every date from account.start_date to Date.current (O(D)),
  writing a cost_basis entry for every security on every date. For an
  account with 2 trades over 9,250 days this wrote ~18,500 hash entries
  and consumed the full date range in the outer loop regardless of trade
  density.

  New: walks only buy trades (O(T)), appending one [date, avg_cost]
  snapshot per trade. cost_basis_for binary-searches the sparse snapshot
  array - O(log T) per lookup. Memory drops from O(DxS) to O(T).

Holding::Gapfillable:

  `security_holdings.find { |h| h.date == date }` was called on every
  date in the gapfill range - O(H) per date, O(HxD) total. Replaced with
  security_holdings.index_by(:date) built once before the loop, making
  each date lookup O(1).

Holding::Materializer - purge_stale_holdings:

  `account.entries.trades.map { |entry| entry.entryable.security_id }.uniq`
  loaded all trade entry records into Ruby then traversed the entryable
  association on each (N+1). Replaced with account.trades.pluck(:security_id).uniq
  (single SQL query returning only the IDs).

In testing, these changes were able to reduce sync time of an account with
25 years of history and 70 securities from about 90 minutes down to under
3 minutes.

* Lint fix

* Lint fix

* addressing the open review nits I agreed with:

* return dup'd arrays from PortfolioCache#get_trades so callers can't mutate memoized cache state
* use the precomputed security-id indexes in collect_unique_securities
* keep security-id dedupe in SQL via distinct.pluck(:security_id)
* tighten the DB price preload to select only needed columns
* harden cost-basis assertions with assert_in_delta

* Back out unnecessary AI slop

* Add back dup to trades array returned from memoized hash

trades_by_date[date] returns a live reference into the memoized hash.
Any caller that mutates the result would silently corrupt the cache for
subsequent calls on the same date within the same sync run. Add .dup to
return a shallow copy, matching the safety of the original select path.
This commit is contained in:
wps260
2026-05-04 18:24:33 -05:00
committed by GitHub
parent 9cc52b9d35
commit c294cbf54b
6 changed files with 169 additions and 51 deletions

View File

@@ -18,7 +18,7 @@ class Holding::ForwardCalculator
trades = portfolio_cache.get_trades(date: date)
update_cost_basis_tracker(trades)
next_portfolio = transform_portfolio(current_portfolio, trades, direction: :forward)
holdings += build_holdings(next_portfolio, date)
holdings.concat(build_holdings(next_portfolio, date))
current_portfolio = next_portfolio
end

View File

@@ -9,10 +9,11 @@ module Holding::Gapfillable
next if security_holdings.empty?
sorted = security_holdings.sort_by(&:date)
holdings_by_date = security_holdings.index_by(&:date)
previous_holding = sorted.first
sorted.first.date.upto(Date.current) do |date|
holding = security_holdings.find { |h| h.date == date }
holding = holdings_by_date[date]
if holding
filled_holdings << holding

View File

@@ -171,7 +171,7 @@ class Holding::Materializer
end
def purge_stale_holdings
portfolio_security_ids = account.entries.trades.map { |entry| entry.entryable.security_id }.uniq
portfolio_security_ids = account.trades.distinct.pluck(:security_id)
# Never delete provider-sourced holdings - they're authoritative from the provider
# If there are no securities in the portfolio, only delete non-provider holdings

View File

@@ -18,7 +18,7 @@ class Holding::PortfolioCache
if date.blank?
trades
else
trades.select { |t| t.date == date }
trades_by_date[date]&.dup || []
end
end
@@ -26,12 +26,15 @@ class Holding::PortfolioCache
security = @security_cache[security_id]
raise SecurityNotFound.new(security_id, account.id) unless security
if source.present?
price = security[:prices].select { |p| p.price.date == date && p.source == source }.min_by(&:priority)&.price
price_with_priority = if source.present?
security[:prices_by_date_and_source][[ date, source ]]
else
price = security[:prices].select { |p| p.price.date == date }.min_by(&:priority)&.price
security[:prices_by_date][date]
end
return nil unless price_with_priority
price = price_with_priority.price
return nil unless price
price_money = Money.new(price.price, price.currency)
@@ -61,20 +64,28 @@ class Holding::PortfolioCache
@trades ||= account.entries.includes(entryable: :security).trades.chronological.to_a
end
def trades_by_date
@trades_by_date ||= trades.group_by(&:date)
end
def trades_by_security_id
@trades_by_security_id ||= trades.group_by { |t| t.entryable.security_id }
end
def holdings
@holdings ||= account.holdings.chronological.to_a
end
def holdings_by_security_id
@holdings_by_security_id ||= holdings.group_by(&:security_id)
end
def collect_unique_securities
unique_securities_from_trades = trades.map(&:entryable).map(&:security).uniq
unique_securities_from_trades = unique_securities_from_trades.select { |s| @security_ids.include?(s.id) } if @security_ids
ids = trades_by_security_id.keys
ids |= holdings_by_security_id.keys if use_holdings
ids &= @security_ids if @security_ids
return unique_securities_from_trades unless use_holdings
unique_securities_from_holdings = holdings.map(&:security).uniq
unique_securities_from_holdings = unique_securities_from_holdings.select { |s| @security_ids.include?(s.id) } if @security_ids
(unique_securities_from_trades + unique_securities_from_holdings).uniq
Security.where(id: ids).to_a
end
# Loads all known prices for all securities in the account with priority based on source:
@@ -87,11 +98,18 @@ class Holding::PortfolioCache
Rails.logger.info "Preloading #{securities.size} securities for account #{account.id}"
security_ids = securities.map(&:id)
# Bulk-load all DB prices for all securities in one query, grouped by security_id
db_prices_by_security_id = Security::Price
.where(security_id: security_ids, date: account.start_date..Date.current)
.group_by(&:security_id)
securities.each do |security|
Rails.logger.info "Loading security: ID=#{security.id} Ticker=#{security.ticker}"
# High priority prices from DB (synced from provider)
db_prices = security.prices.where(date: account.start_date..Date.current).map do |price|
db_prices = (db_prices_by_security_id[security.id] || []).map do |price|
PriceWithPriority.new(
price: price,
priority: 1,
@@ -100,8 +118,7 @@ class Holding::PortfolioCache
end
# Medium priority prices from trades
trade_prices = trades
.select { |t| t.entryable.security_id == security.id }
trade_prices = (trades_by_security_id[security.id] || [])
.map do |trade|
PriceWithPriority.new(
price: Security::Price.new(
@@ -117,7 +134,7 @@ class Holding::PortfolioCache
# Low priority prices from holdings (if applicable)
holding_prices = if use_holdings
holdings.select { |h| h.security_id == security.id }.map do |holding|
(holdings_by_security_id[security.id] || []).map do |holding|
PriceWithPriority.new(
price: Security::Price.new(
security: security,
@@ -133,9 +150,18 @@ class Holding::PortfolioCache
[]
end
all_prices = db_prices + trade_prices + holding_prices
# Index by date for O(1) lookup in get_price instead of O(N) linear scan
prices_by_date = all_prices.group_by { |p| p.price.date }
.transform_values { |ps| ps.min_by(&:priority) }
prices_by_date_and_source = all_prices.group_by { |p| [ p.price.date, p.source ] }
.transform_values { |ps| ps.min_by(&:priority) }
@security_cache[security.id] = {
security: security,
prices: db_prices + trade_prices + holding_prices
prices_by_date: prices_by_date,
prices_by_date_and_source: prices_by_date_and_source
}
end
end

View File

@@ -35,7 +35,7 @@ class Holding::ReverseCalculator
previous_portfolio = transform_portfolio(current_portfolio, today_trades, direction: :reverse)
# If current day, always use holding prices (since that's what Plaid gives us). For historical values, use market data (since Plaid doesn't supply historical prices)
holdings += build_holdings(current_portfolio, date, price_source: date == Date.current ? "holding" : nil)
holdings.concat(build_holdings(current_portfolio, date, price_source: date == Date.current ? "holding" : nil))
current_portfolio = previous_portfolio
end
@@ -79,45 +79,46 @@ class Holding::ReverseCalculator
end.compact
end
# Pre-compute cost basis for all securities at all dates using forward pass through trades
# Stores: { security_id => { date => cost_basis } }
def precompute_cost_basis
@cost_basis_by_date = Hash.new { |h, k| h[k] = {} }
@cost_basis_snapshots = Hash.new { |h, k| h[k] = [] }
tracker = Hash.new { |h, k| h[k] = { total_cost: BigDecimal("0"), total_qty: BigDecimal("0") } }
trades = portfolio_cache.get_trades.sort_by(&:date)
trade_index = 0
portfolio_cache.get_trades.sort_by(&:date).each do |trade_entry|
trade = trade_entry.entryable
next unless trade.qty > 0
account.start_date.upto(Date.current).each do |date|
# Process all trades up to and including this date
while trade_index < trades.size && trades[trade_index].date <= date
trade_entry = trades[trade_index]
trade = trade_entry.entryable
if trade.qty > 0 # Only track buys
security_id = trade.security_id
trade_price = Money.new(trade.price, trade.currency)
begin
converted_price = trade_price.exchange_to(account.currency).amount
rescue Money::ConversionError
converted_price = trade.price
end
tracker[security_id][:total_cost] += converted_price * trade.qty
tracker[security_id][:total_qty] += trade.qty
end
trade_index += 1
security_id = trade.security_id
trade_price = Money.new(trade.price, trade.currency)
begin
converted_price = trade_price.exchange_to(account.currency).amount
rescue Money::ConversionError
converted_price = trade.price
end
# Store current cost basis snapshot for each security at this date
tracker.each do |security_id, data|
next if data[:total_qty].zero?
@cost_basis_by_date[security_id][date] = data[:total_cost] / data[:total_qty]
end
tracker[security_id][:total_cost] += converted_price * trade.qty
tracker[security_id][:total_qty] += trade.qty
@cost_basis_snapshots[security_id] << [
trade_entry.date,
tracker[security_id][:total_cost] / tracker[security_id][:total_qty]
]
end
end
def cost_basis_for(security_id, date)
@cost_basis_by_date.dig(security_id, date)
snapshots = @cost_basis_snapshots[security_id]
return nil if snapshots.empty?
lo, hi, result = 0, snapshots.size - 1, nil
while lo <= hi
mid = (lo + hi) / 2
if snapshots[mid][0] <= date
result = snapshots[mid][1]
lo = mid + 1
else
hi = mid - 1
end
end
result
end
end

View File

@@ -158,6 +158,84 @@ class Holding::ReverseCalculatorTest < ActiveSupport::TestCase
end
end
# cost_basis_for
test "cost_basis_for returns nil when there are no buy trades" do
security = Security.create!(ticker: "TST", name: "Test")
calc = calculator_with_trades(security)
assert_nil cost_basis_for(calc, security, Date.current)
end
test "cost_basis_for returns nil for dates before the first buy" do
security = Security.create!(ticker: "TST", name: "Test")
buy_date = 5.days.ago.to_date
calc = calculator_with_trades(security) do
create_trade(security, account: @account, qty: 10, price: 100, date: buy_date)
end
assert_nil cost_basis_for(calc, security, buy_date - 1)
end
test "cost_basis_for returns weighted average cost on buy date" do
security = Security.create!(ticker: "TST", name: "Test")
buy_date = 5.days.ago.to_date
calc = calculator_with_trades(security) do
create_trade(security, account: @account, qty: 10, price: 100, date: buy_date)
end
assert_in_delta 100.0, cost_basis_for(calc, security, buy_date).to_f, 1e-6
end
test "cost_basis_for carries forward to dates between buys" do
security = Security.create!(ticker: "TST", name: "Test")
first_buy = 10.days.ago.to_date
second_buy = 3.days.ago.to_date
calc = calculator_with_trades(security) do
create_trade(security, account: @account, qty: 10, price: 100, date: first_buy)
create_trade(security, account: @account, qty: 5, price: 130, date: second_buy)
end
# Between the two buys, cost basis is from the first buy only
assert_in_delta 100.0, cost_basis_for(calc, security, first_buy + 1).to_f, 1e-6
assert_in_delta 100.0, cost_basis_for(calc, security, second_buy - 1).to_f, 1e-6
# After second buy: WAC = (10*100 + 5*130) / 15 = 110.0
assert_in_delta 110.0, cost_basis_for(calc, security, second_buy).to_f, 1e-6
assert_in_delta 110.0, cost_basis_for(calc, security, Date.current).to_f, 1e-6
end
test "cost_basis_for accumulates multiple buys on the same date" do
security = Security.create!(ticker: "TST", name: "Test")
buy_date = 5.days.ago.to_date
calc = calculator_with_trades(security) do
create_trade(security, account: @account, qty: 10, price: 100, date: buy_date)
create_trade(security, account: @account, qty: 5, price: 130, date: buy_date)
end
# WAC = (10*100 + 5*130) / 15 = 110.0 — not the intermediate value after only the first trade
assert_in_delta 110.0, cost_basis_for(calc, security, buy_date).to_f, 1e-6
end
test "cost_basis_for ignores sell trades" do
security = Security.create!(ticker: "TST", name: "Test")
buy_date = 10.days.ago.to_date
sell_date = 5.days.ago.to_date
calc = calculator_with_trades(security) do
create_trade(security, account: @account, qty: 10, price: 100, date: buy_date)
create_trade(security, account: @account, qty: -5, price: 120, date: sell_date)
end
# Sell does not change cost basis
assert_in_delta 100.0, cost_basis_for(calc, security, sell_date).to_f, 1e-6
assert_in_delta 100.0, cost_basis_for(calc, security, Date.current).to_f, 1e-6
end
private
def assert_holdings(expected, calculated)
expected.each do |expected_entry|
@@ -202,6 +280,18 @@ class Holding::ReverseCalculatorTest < ActiveSupport::TestCase
# Brokerage Cash: $5,000
# Holdings Value: $15,000
# Total Balance: $20,000
def calculator_with_trades(security)
yield if block_given?
snapshot = OpenStruct.new(to_h: { security.id => 10 })
calc = Holding::ReverseCalculator.new(@account, portfolio_snapshot: snapshot)
calc.send(:precompute_cost_basis)
calc
end
def cost_basis_for(calc, security, date)
calc.send(:cost_basis_for, security.id, date)
end
def load_today_portfolio
@account.update!(cash_balance: 5000)