From d74e9caf7b243ec8210a80fbcae712f27d392bef Mon Sep 17 00:00:00 2001 From: wps260 <278816799+wps260@users.noreply.github.com> Date: Fri, 1 May 2026 16:40:33 -0500 Subject: [PATCH] Optimize and Fix provider price fetches for sold securities and batch queries (#1580) * Performance and bug fixes in provider price fetches Three distinct bugs caused the price provider API to be called unnecessarily on every investment account sync. 1. Sold securities triggered a provider call on every sync forever import_security_prices passed end_date: Date.current for every security ever traded. Security::Price::Importer short-circuits via all_prices_exist? only when persisted_count == expected_count, where: expected_count = (clamped_start_date..Date.current).count This range increases daily, so a security closed two years ago would have all historical prices in the DB unnecessarily. This also causes any closed securities to fetch prices daily, forever. Fix: separate currently-held securities (end_date: Date.current) from historical-only securities (end_date: last holding date for that security). Once a closed position's price range is complete through its last holding date, all_prices_exist? becomes permanently stable and no further provider calls occur for that security. "Currently held" is defined as appearing in account.current_holdings, which returns the most recent holding per security with qty != 0. On the first sync after a sell, the pre-sale holding is still the most recent, so the security correctly receives end_date: Date.current for one final sync before the new qty=0 holding is materialised. 2. Offline securities were not filtered account.trades.map(&:security) returned all securities regardless of the offline flag. This results in fetching of securities even if the provider cannot serve them, or if the user don't want them served for some reason (eg when there are symbol collisions that causes the wrong prices to be returned) The global MarketDataImporter correctly uses Security.online; the account-scoped importer did not. Fix: Security.online.where(id: all_security_ids) matches the established contract. Offline IDs still pass through the pluck step but resolve to nil in the securities hash and are skipped by the existing `next unless security` guard. 3. N+1 queries for security loading and per-security start dates - account.trades.map(&:security): triggered one SQL query per trade to load the security association (N+1). - first_required_price_date(security): issued 2 DB queries per security - one MIN(entries.date) and one EXISTS - so S securities = 2S queries. Fix: replace with batch queries totalling 4 regardless of security count: - account.current_holdings.pluck(:security_id) - current security IDs - account.trades.pluck(:security_id).uniq - traded security IDs - Security.online.where(id: ...) - load all security records at once - batch_first_required_price_dates: one GROUP BY security_id MIN(entries.date) over trades, one pluck for provider-holding security IDs, one GROUP BY security_id MAX(date) over holdings for historical end dates * fix(market-data-importer): fetch prices through today for reopened positions Account::Syncer runs import_market_data before materialize_balances, so current_holdings reflects the last materialized snapshot rather than the current trade state. If a security was previously sold (stale holdings show qty=0) and then repurchased in the same sync cycle, it landed in historical_ids and had its end_date capped at the old last_holding_date. This caused all_prices_exist? to short-circuit, skipping the price fetch through today, and leaving the forthcoming holding materialization without a price for the repurchase period. Fix: compare the latest trade date against the last holding date for each historical security. If the trade is newer, the position was reopened before holdings were rematerialized; treat end_date as Date.current for that sync. The cap still applies on subsequent syncs once materialize_balances has updated the holdings table. Adds a regression test covering the repurchase scenario. * hoist account.start_date out of per-security loop Account#start_date issues SELECT MIN(date) FROM entries on every call. Inside batch_first_required_price_dates it was called up to twice per security (holding_date assignment + fallback), producing up to 2N extra queries for an account with N provider-held securities. Cache the result in account_start_date before the loop. * assert offline securities are skipped Adds a regression test verifying that Account::MarketDataImporter never calls fetch_security_prices for a security with offline: true, covering the Security.online filter on line 54 of the importer. --- app/models/account/market_data_importer.rb | 73 +++++++--- .../account/market_data_importer_test.rb | 135 ++++++++++++++++++ 2 files changed, 190 insertions(+), 18 deletions(-) diff --git a/app/models/account/market_data_importer.rb b/app/models/account/market_data_importer.rb index 1c380c0dc..5e7ae6182 100644 --- a/app/models/account/market_data_importer.rb +++ b/app/models/account/market_data_importer.rb @@ -45,34 +45,71 @@ class Account::MarketDataImporter def import_security_prices return unless Security.provider - account_securities = (account.trades.map(&:security) + account.current_holdings.map(&:security)).uniq + current_security_ids = account.current_holdings.pluck(:security_id).to_set + traded_security_ids = account.trades.pluck(:security_id).uniq - return if account_securities.empty? + all_security_ids = (current_security_ids | traded_security_ids) + return if all_security_ids.empty? - account_securities.each do |security| - security.import_provider_prices( - start_date: first_required_price_date(security), - end_date: Date.current - ) + securities = Security.online.where(id: all_security_ids).index_by(&:id) + start_dates = batch_first_required_price_dates(all_security_ids) + historical_ids = traded_security_ids - current_security_ids.to_a + + # For securities no longer held, cap end_date at the last holding date so + # all_prices_exist? stays stable and we don't call the provider every sync. + last_holding_date = account.holdings + .where(security_id: historical_ids) + .group(:security_id) + .maximum(:date) + + # import_market_data runs before materialize_balances in Account::Syncer, so + # current_holdings can reflect a stale pre-trade snapshot. If a historical + # security has a trade newer than its last holding date the position was + # reopened this sync; fetch prices through today so the forthcoming + # materialization has a price available. + latest_trade_date = account.trades + .where(security_id: historical_ids) + .group(:security_id) + .maximum("entries.date") + + all_security_ids.each do |security_id| + security = securities[security_id] + next unless security + + end_date = if current_security_ids.include?(security_id) + Date.current + else + holding_date = last_holding_date[security_id] + trade_date = latest_trade_date[security_id] + reopened = trade_date && holding_date && trade_date > holding_date + reopened ? Date.current : (holding_date || Date.current) + end + + security.import_provider_prices(start_date: start_dates[security_id], end_date: end_date) security.import_provider_details end end private - # Calculates the first date we require a price for the given security scoped to this account - def first_required_price_date(security) - trade_start_date = account.trades.with_entry - .where(security: security) - .where(entries: { account_id: account.id }) - .minimum("entries.date") + # Replaces 2-queries-per-security with 3 queries total. + def batch_first_required_price_dates(security_ids) + # account.trades is a has_many :through :entries, so entries is already joined + trade_start_dates = account.trades.group(:security_id).minimum("entries.date") - holding_start_date = - if account.holdings.where(security: security).where.not(account_provider_id: nil).exists? - account.start_date - end + provider_holding_security_ids = account.holdings + .where(security_id: security_ids) + .where.not(account_provider_id: nil) + .pluck(:security_id) + .to_set - [ trade_start_date, holding_start_date ].compact.min + account_start_date = account.start_date + + security_ids.each_with_object({}) do |security_id, hash| + trade_date = trade_start_dates[security_id] + holding_date = provider_holding_security_ids.include?(security_id) ? account_start_date : nil + hash[security_id] = [ trade_date, holding_date ].compact.min || account_start_date + end end def needs_exchange_rates? diff --git a/test/models/account/market_data_importer_test.rb b/test/models/account/market_data_importer_test.rb index 520185f73..8717c8cdb 100644 --- a/test/models/account/market_data_importer_test.rb +++ b/test/models/account/market_data_importer_test.rb @@ -112,6 +112,141 @@ class Account::MarketDataImporterTest < ActiveSupport::TestCase assert_equal 1, Security::Price.where(security: security, date: trade_date).count end + test "caps end_date at last holding date for securities no longer held" do + family = Family.create!(name: "Smith", currency: "USD") + + account = family.accounts.create!( + name: "Brokerage", + currency: "USD", + balance: 0, + accountable: Investment.new + ) + + current_sec = Security.create!(ticker: "CURR", exchange_operating_mic: "XNAS") + historical_sec = Security.create!(ticker: "HIST", exchange_operating_mic: "XNAS") + + trade_date = 30.days.ago.to_date + sold_date = 5.days.ago.to_date + + [ current_sec, historical_sec ].each do |sec| + trade = Trade.new(security: sec, qty: 10, price: 100, currency: "USD", investment_activity_label: "Buy") + account.entries.create!(name: "Buy #{sec.ticker}", date: trade_date, amount: 1000, currency: "USD", entryable: trade) + end + + # Current: most-recent holding has qty > 0 — shows up in current_holdings + account.holdings.create!(security: current_sec, date: Date.current, qty: 10, price: 110, amount: 1100, currency: "USD") + + # Historical: most-recent holding has qty == 0 (sold) — excluded from current_holdings + account.holdings.create!(security: historical_sec, date: 10.days.ago.to_date, qty: 10, price: 105, amount: 1050, currency: "USD") + account.holdings.create!(security: historical_sec, date: sold_date, qty: 0, price: 0, amount: 0, currency: "USD") + + expected_start_date = trade_date - SECURITY_PRICE_BUFFER + + @provider.expects(:fetch_security_prices) + .with(symbol: current_sec.ticker, + exchange_operating_mic: current_sec.exchange_operating_mic, + start_date: expected_start_date, + end_date: Date.current.in_time_zone("America/New_York").to_date) + .returns(provider_success_response([])) + + @provider.expects(:fetch_security_prices) + .with(symbol: historical_sec.ticker, + exchange_operating_mic: historical_sec.exchange_operating_mic, + start_date: expected_start_date, + end_date: sold_date) + .returns(provider_success_response([])) + + @provider.stubs(:fetch_security_info).returns(provider_success_response(OpenStruct.new(name: "Test", logo_url: nil))) + @provider.stubs(:fetch_exchange_rates).returns(provider_success_response([])) + + Account::MarketDataImporter.new(account).import_all + end + + test "fetches prices through today when a sold security is repurchased before holdings are rematerialized" do + family = Family.create!(name: "Smith", currency: "USD") + + account = family.accounts.create!( + name: "Brokerage", + currency: "USD", + balance: 0, + accountable: Investment.new + ) + + security = Security.create!(ticker: "AAPL", exchange_operating_mic: "XNAS") + + original_buy_date = 60.days.ago.to_date + sold_date = 30.days.ago.to_date + repurchase_date = 2.days.ago.to_date + + # Original buy + buy_trade = Trade.new(security: security, qty: 10, price: 100, currency: "USD", investment_activity_label: "Buy") + account.entries.create!(name: "Buy AAPL", date: original_buy_date, amount: 1000, currency: "USD", entryable: buy_trade) + + # Sell trade + sell_trade = Trade.new(security: security, qty: -10, price: 120, currency: "USD", investment_activity_label: "Sell") + account.entries.create!(name: "Sell AAPL", date: sold_date, amount: 1200, currency: "USD", entryable: sell_trade) + + # Stale materialized holdings — qty=0 means current_holdings excludes this security + account.holdings.create!(security: security, date: sold_date, qty: 0, price: 0, amount: 0, currency: "USD") + + # Repurchase trade added after last materialization — holdings haven't been rematerialized yet + repurchase = Trade.new(security: security, qty: 5, price: 130, currency: "USD", investment_activity_label: "Buy") + account.entries.create!(name: "Rebuy AAPL", date: repurchase_date, amount: 650, currency: "USD", entryable: repurchase) + + expected_start_date = original_buy_date - SECURITY_PRICE_BUFFER + + # end_date must be Date.current, not sold_date, because the position was reopened + @provider.expects(:fetch_security_prices) + .with(symbol: security.ticker, + exchange_operating_mic: security.exchange_operating_mic, + start_date: expected_start_date, + end_date: Date.current.in_time_zone("America/New_York").to_date) + .returns(provider_success_response([])) + + @provider.stubs(:fetch_security_info).returns(provider_success_response(OpenStruct.new(name: "Apple", logo_url: nil))) + @provider.stubs(:fetch_exchange_rates).returns(provider_success_response([])) + + Account::MarketDataImporter.new(account).import_all + end + + test "skips price fetching for offline securities" do + family = Family.create!(name: "Smith", currency: "USD") + + account = family.accounts.create!( + name: "Brokerage", + currency: "USD", + balance: 0, + accountable: Investment.new + ) + + online_sec = Security.create!(ticker: "ONLN", exchange_operating_mic: "XNAS") + offline_sec = Security.create!(ticker: "OFF", exchange_operating_mic: "XNAS", offline: true) + + trade_date = 10.days.ago.to_date + + [ online_sec, offline_sec ].each do |sec| + trade = Trade.new(security: sec, qty: 1, price: 100, currency: "USD", investment_activity_label: "Buy") + account.entries.create!(name: "Buy #{sec.ticker}", date: trade_date, amount: 100, currency: "USD", entryable: trade) + end + + @provider.expects(:fetch_security_prices) + .with(symbol: "OFF", exchange_operating_mic: anything, + start_date: anything, end_date: anything) + .never + + @provider.expects(:fetch_security_prices) + .with(symbol: online_sec.ticker, + exchange_operating_mic: online_sec.exchange_operating_mic, + start_date: anything, + end_date: anything) + .returns(provider_success_response([])) + + @provider.stubs(:fetch_security_info).returns(provider_success_response(OpenStruct.new(name: "Test", logo_url: nil))) + @provider.stubs(:fetch_exchange_rates).returns(provider_success_response([])) + + Account::MarketDataImporter.new(account).import_all + end + test "handles provider error response gracefully for security prices" do family = Family.create!(name: "Smith", currency: "USD")