mirror of
https://github.com/we-promise/sure.git
synced 2026-04-07 14:31:25 +00:00
Investment prices fixes (#559)
* Fix investments retrieval
Problem Summary
Stock prices for securities like European stocks become stale because:
1. sync_all_accounts runs at 2:22 UTC (before European markets open)
2. Provider doesn't have today's price yet, so importer gap-fills with LOCF (yesterday's price)
3. Later import_market_data at 22:00 UTC sees all prices exist and skips fetching
4. Real closing price is never retrieved
Solution Overview
Add a provisional boolean column to mark gap-filled prices that should be re-fetched.
* Update schema.rb
---------
Signed-off-by: Juan José Mata <juanjo.mata@gmail.com>
Co-authored-by: Juan José Mata <juanjo.mata@gmail.com>
This commit is contained in:
@@ -3,4 +3,14 @@ class Security::Price < ApplicationRecord
|
||||
|
||||
validates :date, :price, :currency, presence: true
|
||||
validates :date, uniqueness: { scope: %i[security_id currency] }
|
||||
|
||||
# Provisional prices from recent weekdays that should be re-fetched
|
||||
# - Must be provisional (gap-filled)
|
||||
# - Must be from the last few days (configurable, default 3)
|
||||
# - Must be a weekday (Saturday = 6, Sunday = 0 in PostgreSQL DOW)
|
||||
scope :refetchable_provisional, ->(lookback_days: 3) {
|
||||
where(provisional: true)
|
||||
.where(date: lookback_days.days.ago.to_date..Date.current)
|
||||
.where("EXTRACT(DOW FROM date) NOT IN (0, 6)")
|
||||
}
|
||||
end
|
||||
|
||||
@@ -2,6 +2,8 @@ class Security::Price::Importer
|
||||
MissingSecurityPriceError = Class.new(StandardError)
|
||||
MissingStartPriceError = Class.new(StandardError)
|
||||
|
||||
PROVISIONAL_LOOKBACK_DAYS = 3
|
||||
|
||||
def initialize(security:, security_provider:, start_date:, end_date:, clear_cache: false)
|
||||
@security = security
|
||||
@security_provider = security_provider
|
||||
@@ -40,28 +42,43 @@ class Security::Price::Importer
|
||||
end
|
||||
|
||||
gapfilled_prices = effective_start_date.upto(end_date).map do |date|
|
||||
db_price_value = db_prices[date]&.price
|
||||
provider_price_value = provider_prices[date]&.price
|
||||
provider_currency = provider_prices[date]&.currency
|
||||
db_price = db_prices[date]
|
||||
db_price_value = db_price&.price
|
||||
provider_price = provider_prices[date]
|
||||
provider_price_value = provider_price&.price
|
||||
provider_currency = provider_price&.currency
|
||||
|
||||
chosen_price = if clear_cache
|
||||
provider_price_value || db_price_value # overwrite when possible
|
||||
has_provider_price = provider_price_value.present? && provider_price_value.to_f > 0
|
||||
is_provisional = db_price&.provisional
|
||||
|
||||
chosen_price = if clear_cache || is_provisional
|
||||
provider_price_value || db_price_value # overwrite when possible (or when provisional)
|
||||
else
|
||||
db_price_value || provider_price_value # fill gaps
|
||||
end
|
||||
|
||||
# Gap-fill using LOCF (last observation carried forward)
|
||||
# Treat nil or zero prices as invalid and use previous price
|
||||
used_locf = false
|
||||
if chosen_price.nil? || chosen_price.to_f <= 0
|
||||
chosen_price = prev_price_value
|
||||
used_locf = true
|
||||
end
|
||||
prev_price_value = chosen_price
|
||||
|
||||
provisional = determine_provisional_status(
|
||||
date: date,
|
||||
has_provider_price: has_provider_price,
|
||||
used_locf: used_locf,
|
||||
existing_provisional: db_price&.provisional
|
||||
)
|
||||
|
||||
{
|
||||
security_id: security.id,
|
||||
date: date,
|
||||
price: chosen_price,
|
||||
currency: provider_currency || prev_price_currency || db_price_currency || "USD"
|
||||
currency: provider_currency || prev_price_currency || db_price_currency || "USD",
|
||||
provisional: provisional
|
||||
}
|
||||
end
|
||||
|
||||
@@ -104,18 +121,33 @@ class Security::Price::Importer
|
||||
end
|
||||
|
||||
def all_prices_exist?
|
||||
return false if has_refetchable_provisional_prices?
|
||||
db_prices.count == expected_count
|
||||
end
|
||||
|
||||
def has_refetchable_provisional_prices?
|
||||
Security::Price.where(security_id: security.id, date: start_date..end_date)
|
||||
.refetchable_provisional(lookback_days: PROVISIONAL_LOOKBACK_DAYS)
|
||||
.exists?
|
||||
end
|
||||
|
||||
def expected_count
|
||||
(start_date..end_date).count
|
||||
end
|
||||
|
||||
# Skip over ranges that already exist unless clearing cache
|
||||
# Also includes dates with refetchable provisional prices
|
||||
def effective_start_date
|
||||
return start_date if clear_cache
|
||||
|
||||
(start_date..end_date).detect { |d| !db_prices.key?(d) } || end_date
|
||||
refetchable_dates = Security::Price.where(security_id: security.id, date: start_date..end_date)
|
||||
.refetchable_provisional(lookback_days: PROVISIONAL_LOOKBACK_DAYS)
|
||||
.pluck(:date)
|
||||
.to_set
|
||||
|
||||
(start_date..end_date).detect do |d|
|
||||
!db_prices.key?(d) || refetchable_dates.include?(d)
|
||||
end || end_date
|
||||
end
|
||||
|
||||
def start_price_value
|
||||
@@ -126,11 +158,29 @@ class Security::Price::Importer
|
||||
provider_price_value || db_price_value
|
||||
end
|
||||
|
||||
def determine_provisional_status(date:, has_provider_price:, used_locf:, existing_provisional:)
|
||||
# Provider returned real price => NOT provisional
|
||||
return false if has_provider_price
|
||||
|
||||
# Gap-filled (LOCF) => provisional only if recent weekday
|
||||
if used_locf
|
||||
is_weekday = !date.saturday? && !date.sunday?
|
||||
is_recent = date >= PROVISIONAL_LOOKBACK_DAYS.days.ago.to_date
|
||||
return is_weekday && is_recent
|
||||
end
|
||||
|
||||
# Otherwise preserve existing status
|
||||
existing_provisional || false
|
||||
end
|
||||
|
||||
def upsert_rows(rows)
|
||||
batch_size = 200
|
||||
total_upsert_count = 0
|
||||
now = Time.current
|
||||
|
||||
rows.each_slice(batch_size) do |batch|
|
||||
rows_with_timestamps = rows.map { |row| row.merge(updated_at: now) }
|
||||
|
||||
rows_with_timestamps.each_slice(batch_size) do |batch|
|
||||
ids = Security::Price.upsert_all(
|
||||
batch,
|
||||
unique_by: %i[security_id date currency],
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
class AddProvisionalToSecurityPrices < ActiveRecord::Migration[7.2]
|
||||
def change
|
||||
add_column :security_prices, :provisional, :boolean, default: false, null: false
|
||||
end
|
||||
end
|
||||
3
db/schema.rb
generated
3
db/schema.rb
generated
@@ -10,7 +10,7 @@
|
||||
#
|
||||
# It's strongly recommended that you check this file into your version control system.
|
||||
|
||||
ActiveRecord::Schema[7.2].define(version: 2025_12_21_060111) do
|
||||
ActiveRecord::Schema[7.2].define(version: 2026_01_06_152346) do
|
||||
# These are extensions that must be enabled in order to support this database
|
||||
enable_extension "pgcrypto"
|
||||
enable_extension "plpgsql"
|
||||
@@ -961,6 +961,7 @@ ActiveRecord::Schema[7.2].define(version: 2025_12_21_060111) do
|
||||
t.datetime "created_at", null: false
|
||||
t.datetime "updated_at", null: false
|
||||
t.uuid "security_id"
|
||||
t.boolean "provisional", default: false, null: false
|
||||
t.index ["security_id", "date", "currency"], name: "index_security_prices_on_security_id_and_date_and_currency", unique: true
|
||||
t.index ["security_id"], name: "index_security_prices_on_security_id"
|
||||
end
|
||||
|
||||
2
test/fixtures/security/prices.yml
vendored
2
test/fixtures/security/prices.yml
vendored
@@ -3,9 +3,11 @@ one:
|
||||
date: <%= Date.current %>
|
||||
price: 215
|
||||
currency: USD
|
||||
provisional: false
|
||||
|
||||
two:
|
||||
security: aapl
|
||||
date: <%= 1.day.ago.to_date %>
|
||||
price: 214
|
||||
currency: USD
|
||||
provisional: false
|
||||
|
||||
@@ -136,6 +136,181 @@ class Security::Price::ImporterTest < ActiveSupport::TestCase
|
||||
assert_equal 1, Security::Price.count
|
||||
end
|
||||
|
||||
test "marks prices as not provisional when from provider" do
|
||||
Security::Price.delete_all
|
||||
|
||||
provider_response = provider_success_response([
|
||||
OpenStruct.new(security: @security, date: 1.day.ago.to_date, price: 150, currency: "USD"),
|
||||
OpenStruct.new(security: @security, date: Date.current, price: 155, currency: "USD")
|
||||
])
|
||||
|
||||
@provider.expects(:fetch_security_prices)
|
||||
.with(symbol: @security.ticker, exchange_operating_mic: @security.exchange_operating_mic,
|
||||
start_date: get_provider_fetch_start_date(1.day.ago.to_date), end_date: Date.current)
|
||||
.returns(provider_response)
|
||||
|
||||
Security::Price::Importer.new(
|
||||
security: @security,
|
||||
security_provider: @provider,
|
||||
start_date: 1.day.ago.to_date,
|
||||
end_date: Date.current
|
||||
).import_provider_prices
|
||||
|
||||
db_prices = Security::Price.where(security: @security).order(:date)
|
||||
assert db_prices.all? { |p| p.provisional == false }, "All prices from provider should not be provisional"
|
||||
end
|
||||
|
||||
test "marks gap-filled weekend prices as not provisional" do
|
||||
Security::Price.delete_all
|
||||
|
||||
# Find a recent Saturday
|
||||
saturday = Date.current
|
||||
saturday -= 1.day until saturday.saturday?
|
||||
friday = saturday - 1.day
|
||||
|
||||
# Provider only returns Friday's price, not Saturday
|
||||
provider_response = provider_success_response([
|
||||
OpenStruct.new(security: @security, date: friday, price: 150, currency: "USD")
|
||||
])
|
||||
|
||||
@provider.expects(:fetch_security_prices)
|
||||
.with(symbol: @security.ticker, exchange_operating_mic: @security.exchange_operating_mic,
|
||||
start_date: get_provider_fetch_start_date(friday), end_date: saturday)
|
||||
.returns(provider_response)
|
||||
|
||||
Security::Price::Importer.new(
|
||||
security: @security,
|
||||
security_provider: @provider,
|
||||
start_date: friday,
|
||||
end_date: saturday
|
||||
).import_provider_prices
|
||||
|
||||
saturday_price = Security::Price.find_by(security: @security, date: saturday)
|
||||
assert_not saturday_price.provisional, "Weekend gap-filled price should not be provisional"
|
||||
end
|
||||
|
||||
test "marks gap-filled recent weekday prices as provisional" do
|
||||
Security::Price.delete_all
|
||||
|
||||
# Find a recent weekday that's not today
|
||||
weekday = 1.day.ago.to_date
|
||||
weekday -= 1.day while weekday.saturday? || weekday.sunday?
|
||||
|
||||
# Start from 2 days before the weekday
|
||||
start_date = weekday - 1.day
|
||||
start_date -= 1.day while start_date.saturday? || start_date.sunday?
|
||||
|
||||
# Provider only returns start_date price, not the weekday
|
||||
provider_response = provider_success_response([
|
||||
OpenStruct.new(security: @security, date: start_date, price: 150, currency: "USD")
|
||||
])
|
||||
|
||||
@provider.expects(:fetch_security_prices)
|
||||
.with(symbol: @security.ticker, exchange_operating_mic: @security.exchange_operating_mic,
|
||||
start_date: get_provider_fetch_start_date(start_date), end_date: weekday)
|
||||
.returns(provider_response)
|
||||
|
||||
Security::Price::Importer.new(
|
||||
security: @security,
|
||||
security_provider: @provider,
|
||||
start_date: start_date,
|
||||
end_date: weekday
|
||||
).import_provider_prices
|
||||
|
||||
weekday_price = Security::Price.find_by(security: @security, date: weekday)
|
||||
# Only recent weekdays should be provisional
|
||||
if weekday >= 3.days.ago.to_date
|
||||
assert weekday_price.provisional, "Gap-filled recent weekday price should be provisional"
|
||||
else
|
||||
assert_not weekday_price.provisional, "Gap-filled old weekday price should not be provisional"
|
||||
end
|
||||
end
|
||||
|
||||
test "retries fetch when refetchable provisional prices exist" do
|
||||
Security::Price.delete_all
|
||||
|
||||
# Skip if today is a weekend
|
||||
return if Date.current.saturday? || Date.current.sunday?
|
||||
|
||||
# Pre-populate with provisional price for today
|
||||
Security::Price.create!(
|
||||
security: @security,
|
||||
date: Date.current,
|
||||
price: 100,
|
||||
currency: "USD",
|
||||
provisional: true
|
||||
)
|
||||
|
||||
# Provider now returns today's actual price
|
||||
provider_response = provider_success_response([
|
||||
OpenStruct.new(security: @security, date: Date.current, price: 165, currency: "USD")
|
||||
])
|
||||
|
||||
@provider.expects(:fetch_security_prices)
|
||||
.with(symbol: @security.ticker, exchange_operating_mic: @security.exchange_operating_mic,
|
||||
start_date: get_provider_fetch_start_date(Date.current), end_date: Date.current)
|
||||
.returns(provider_response)
|
||||
|
||||
Security::Price::Importer.new(
|
||||
security: @security,
|
||||
security_provider: @provider,
|
||||
start_date: Date.current,
|
||||
end_date: Date.current
|
||||
).import_provider_prices
|
||||
|
||||
db_price = Security::Price.find_by(security: @security, date: Date.current)
|
||||
assert_equal 165, db_price.price, "Price should be updated from provider"
|
||||
assert_not db_price.provisional, "Price should no longer be provisional after provider returns real price"
|
||||
end
|
||||
|
||||
test "skips fetch when all prices are non-provisional" do
|
||||
Security::Price.delete_all
|
||||
|
||||
# Create non-provisional prices for the range
|
||||
(3.days.ago.to_date..Date.current).each_with_index do |date, idx|
|
||||
Security::Price.create!(security: @security, date: date, price: 100 + idx, currency: "USD", provisional: false)
|
||||
end
|
||||
|
||||
@provider.expects(:fetch_security_prices).never
|
||||
|
||||
Security::Price::Importer.new(
|
||||
security: @security,
|
||||
security_provider: @provider,
|
||||
start_date: 3.days.ago.to_date,
|
||||
end_date: Date.current
|
||||
).import_provider_prices
|
||||
end
|
||||
|
||||
test "does not mark old gap-filled prices as provisional" do
|
||||
Security::Price.delete_all
|
||||
|
||||
# Use dates older than the lookback window
|
||||
old_date = 10.days.ago.to_date
|
||||
old_date -= 1.day while old_date.saturday? || old_date.sunday?
|
||||
start_date = old_date - 1.day
|
||||
start_date -= 1.day while start_date.saturday? || start_date.sunday?
|
||||
|
||||
# Provider only returns start_date price
|
||||
provider_response = provider_success_response([
|
||||
OpenStruct.new(security: @security, date: start_date, price: 150, currency: "USD")
|
||||
])
|
||||
|
||||
@provider.expects(:fetch_security_prices)
|
||||
.with(symbol: @security.ticker, exchange_operating_mic: @security.exchange_operating_mic,
|
||||
start_date: get_provider_fetch_start_date(start_date), end_date: old_date)
|
||||
.returns(provider_response)
|
||||
|
||||
Security::Price::Importer.new(
|
||||
security: @security,
|
||||
security_provider: @provider,
|
||||
start_date: start_date,
|
||||
end_date: old_date
|
||||
).import_provider_prices
|
||||
|
||||
old_price = Security::Price.find_by(security: @security, date: old_date)
|
||||
assert_not old_price.provisional, "Old gap-filled price should not be provisional"
|
||||
end
|
||||
|
||||
private
|
||||
def get_provider_fetch_start_date(start_date)
|
||||
start_date - 5.days
|
||||
|
||||
Reference in New Issue
Block a user