Files
sure/app/models/snaptrade_item/importer.rb

444 lines
18 KiB
Ruby

class SnaptradeItem::Importer
include SyncStats::Collector
include SnaptradeAccount::DataHelpers
attr_reader :snaptrade_item, :snaptrade_provider, :sync
# Chunk size for fetching activities (365 days per chunk)
ACTIVITY_CHUNK_DAYS = 365
MAX_ACTIVITY_CHUNKS = 3 # Up to 3 years of history
# Minimum existing activities required before using incremental sync
# Prevents treating a partially synced account as "caught up"
MINIMUM_HISTORY_FOR_INCREMENTAL = 10
def initialize(snaptrade_item, snaptrade_provider:, sync: nil)
@snaptrade_item = snaptrade_item
@snaptrade_provider = snaptrade_provider
@sync = sync
end
class CredentialsError < StandardError; end
def import
Rails.logger.info "SnaptradeItem::Importer - Starting import for item #{snaptrade_item.id}"
credentials = snaptrade_item.snaptrade_credentials
unless credentials
raise CredentialsError, "No SnapTrade credentials configured for item #{snaptrade_item.id}"
end
# Step 1: Fetch and store all accounts
import_accounts(credentials)
# Step 2: For LINKED accounts only, fetch holdings and activities
# Unlinked accounts just need basic info (name, balance) for the setup modal
# Query directly to avoid any association caching issues
linked_accounts = SnaptradeAccount
.where(snaptrade_item_id: snaptrade_item.id)
.joins(:account_provider)
Rails.logger.info "SnaptradeItem::Importer - Found #{linked_accounts.count} linked accounts to process"
linked_accounts.each do |snaptrade_account|
Rails.logger.info "SnaptradeItem::Importer - Processing linked account #{snaptrade_account.id} (#{snaptrade_account.snaptrade_account_id})"
import_account_data(snaptrade_account, credentials)
end
# Update raw payload on the item
snaptrade_item.upsert_snaptrade_snapshot!(stats)
rescue Provider::Snaptrade::AuthenticationError => e
snaptrade_item.update!(status: :requires_update)
raise
end
private
# Extract activities array from API response
# get_account_activities returns a paginated object with .data accessor
# This handles both paginated responses and plain arrays
def extract_activities_from_response(response)
if response.respond_to?(:data)
# Paginated response (e.g., SnapTrade::PaginatedUniversalActivity)
Rails.logger.info "SnaptradeItem::Importer - Paginated response, extracting .data (#{response.data&.size || 0} items)"
response.data || []
elsif response.is_a?(Array)
# Direct array response
Rails.logger.info "SnaptradeItem::Importer - Array response (#{response.size} items)"
response
else
Rails.logger.warn "SnaptradeItem::Importer - Unexpected response type: #{response.class}"
[]
end
end
def stats
@stats ||= {}
end
def persist_stats!
return unless sync&.respond_to?(:sync_stats)
merged = (sync.sync_stats || {}).merge(stats)
sync.update_columns(sync_stats: merged)
end
def import_accounts(credentials)
Rails.logger.info "SnaptradeItem::Importer - Fetching accounts"
accounts_data = snaptrade_provider.list_accounts(
user_id: credentials[:user_id],
user_secret: credentials[:user_secret]
)
stats["api_requests"] = stats.fetch("api_requests", 0) + 1
stats["total_accounts"] = accounts_data.size
# Track upstream account IDs to detect removed accounts
upstream_account_ids = []
accounts_data.each do |account_data|
begin
import_account(account_data, credentials)
upstream_account_ids << account_data.id.to_s if account_data.id
rescue => e
Rails.logger.error "SnaptradeItem::Importer - Failed to import account: #{e.message}"
stats["accounts_skipped"] = stats.fetch("accounts_skipped", 0) + 1
register_error(e, account_data: account_data)
end
end
persist_stats!
# Clean up accounts that no longer exist upstream
prune_removed_accounts(upstream_account_ids)
end
def import_account(account_data, credentials)
# Find or create the SnaptradeAccount by SnapTrade's account ID
snaptrade_account_id = account_data.id.to_s
return if snaptrade_account_id.blank?
snaptrade_account = snaptrade_item.snaptrade_accounts.find_or_initialize_by(
snaptrade_account_id: snaptrade_account_id
)
# Update from API data - pass raw SDK object, model handles conversion
snaptrade_account.upsert_from_snaptrade!(account_data)
# Fetch and store balances
begin
balances = snaptrade_provider.get_balances(
user_id: credentials[:user_id],
user_secret: credentials[:user_secret],
account_id: snaptrade_account_id
)
stats["api_requests"] = stats.fetch("api_requests", 0) + 1
# Pass raw SDK objects - model handles conversion
snaptrade_account.upsert_balances!(balances)
rescue => e
Rails.logger.warn "SnaptradeItem::Importer - Failed to fetch balances for account #{snaptrade_account_id}: #{e.message}"
end
stats["accounts_imported"] = stats.fetch("accounts_imported", 0) + 1
end
def import_account_data(snaptrade_account, credentials)
snaptrade_account_id = snaptrade_account.snaptrade_account_id
return if snaptrade_account_id.blank?
# Import holdings
import_holdings(snaptrade_account, credentials)
# Import activities (chunked for history)
import_activities(snaptrade_account, credentials)
end
def import_holdings(snaptrade_account, credentials)
Rails.logger.info "SnaptradeItem::Importer - Fetching holdings for account #{snaptrade_account.id} (#{snaptrade_account.snaptrade_account_id})"
begin
holdings = snaptrade_provider.get_positions(
user_id: credentials[:user_id],
user_secret: credentials[:user_secret],
account_id: snaptrade_account.snaptrade_account_id
)
stats["api_requests"] = stats.fetch("api_requests", 0) + 1
Rails.logger.info "SnaptradeItem::Importer - Got #{holdings.size} holdings from API"
holdings_data = holdings.map { |h| sdk_object_to_hash(h) }
# Log sample holding structure
if holdings_data.first
sample = holdings_data.first
Rails.logger.info "SnaptradeItem::Importer - Sample holding: #{sample.keys.join(', ')}"
if sample["symbol"]
Rails.logger.info "SnaptradeItem::Importer - Sample symbol keys: #{sample['symbol'].keys.join(', ')}" if sample["symbol"].is_a?(Hash)
Rails.logger.info "SnaptradeItem::Importer - Sample symbol.symbol: #{sample.dig('symbol', 'symbol')}"
Rails.logger.info "SnaptradeItem::Importer - Sample symbol.description: #{sample.dig('symbol', 'description')}"
end
end
snaptrade_account.upsert_holdings_snapshot!(holdings_data)
stats["holdings_found"] = stats.fetch("holdings_found", 0) + holdings_data.size
rescue => e
Rails.logger.error "SnaptradeItem::Importer - Failed to fetch holdings: #{e.class} - #{e.message}"
Rails.logger.error e.backtrace.first(5).join("\n") if e.backtrace
register_error(e, context: "holdings", account_id: snaptrade_account.id)
end
end
def import_activities(snaptrade_account, credentials)
Rails.logger.info "SnaptradeItem::Importer - Fetching activities for account #{snaptrade_account.id} (#{snaptrade_account.snaptrade_account_id})"
# Determine date range for fetching activities
# Use first_transaction_date from sync_status to know how far back history goes
first_tx_date = extract_first_transaction_date(snaptrade_account)
existing_count = snaptrade_account.raw_activities_payload&.size || 0
# User-configured sync start date acts as a floor - don't fetch activities before this date
user_sync_start = snaptrade_account.sync_start_date
# Only do incremental sync if we already have meaningful history
# This ensures we do a full history fetch on first sync even if timestamps are set
can_do_incremental = snaptrade_item.last_synced_at.present? &&
snaptrade_account.last_activities_sync.present? &&
existing_count >= MINIMUM_HISTORY_FOR_INCREMENTAL
if can_do_incremental
# Incremental sync - fetch from last sync minus buffer (synchronous)
start_date = snaptrade_account.last_activities_sync - 30.days
# Respect user's sync_start_date floor
start_date = [ start_date, user_sync_start ].compact.max
Rails.logger.info "SnaptradeItem::Importer - Incremental activities fetch from #{start_date} (existing: #{existing_count})"
fetch_all_activities(snaptrade_account, credentials, start_date: start_date)
else
# Full history - use user's sync_start_date if set, otherwise first_transaction_date
# Default to MAX_ACTIVITY_CHUNKS years ago to match chunk size
default_start = (MAX_ACTIVITY_CHUNKS * ACTIVITY_CHUNK_DAYS).days.ago.to_date
start_date = user_sync_start || first_tx_date || default_start
Rails.logger.info "SnaptradeItem::Importer - Full history fetch from #{start_date} (user_sync_start: #{user_sync_start || 'none'}, first_tx_date: #{first_tx_date || 'unknown'}, existing: #{existing_count})"
# Try to fetch activities synchronously first
fetched_count = fetch_all_activities(snaptrade_account, credentials, start_date: start_date)
if fetched_count == 0 && existing_count == 0
# On fresh connection, SnapTrade may need time to sync data from brokerage
# Dispatch background job with retry logic instead of blocking the worker
Rails.logger.info(
"SnaptradeItem::Importer - No activities returned for account #{snaptrade_account.id}, " \
"dispatching background fetch job (SnapTrade may still be syncing)"
)
SnaptradeActivitiesFetchJob.set(wait: 10.seconds).perform_later(
snaptrade_account,
start_date: start_date
)
# Mark the account as having pending activities
# The background job will clear this flag when done
snaptrade_account.update!(activities_fetch_pending: true)
end
end
# Log what we have after fetching (may be 0 if job was dispatched)
final_count = snaptrade_account.reload.raw_activities_payload&.size || 0
Rails.logger.info "SnaptradeItem::Importer - Activities stored: #{final_count}"
if final_count > 0 && snaptrade_account.raw_activities_payload.first
sample = snaptrade_account.raw_activities_payload.first
Rails.logger.info "SnaptradeItem::Importer - Sample activity keys: #{sample.keys.join(', ')}"
Rails.logger.info "SnaptradeItem::Importer - Sample activity type: #{sample['type']}"
end
end
# Extract first_transaction_date from account's sync_status
# Checks multiple locations: raw_payload and raw_activities_payload
def extract_first_transaction_date(snaptrade_account)
# Try 1: Check raw_payload (from list_accounts)
raw = snaptrade_account.raw_payload
if raw.is_a?(Hash)
date_str = raw.dig("sync_status", "transactions", "first_transaction_date")
return Date.parse(date_str) if date_str.present?
end
# Try 2: Check activities payload (sync_status is nested in account object)
activities = snaptrade_account.raw_activities_payload
if activities.is_a?(Array) && activities.first.is_a?(Hash)
date_str = activities.first.dig("account", "sync_status", "transactions", "first_transaction_date")
return Date.parse(date_str) if date_str.present?
end
nil
rescue ArgumentError, TypeError
nil
end
# Fetch all activities using per-account endpoint with proper date range
# Uses get_account_activities which returns paginated data for the specific account
def fetch_all_activities(snaptrade_account, credentials, start_date:, end_date: nil)
# Ensure dates are proper Date objects (not strings or other types)
start_date = ensure_date(start_date) || 5.years.ago.to_date
end_date = ensure_date(end_date) || Date.current
all_activities = []
Rails.logger.info "SnaptradeItem::Importer - Fetching activities from #{start_date} to #{end_date}"
begin
# Use get_account_activities (per-account endpoint) for better results
response = snaptrade_provider.get_account_activities(
user_id: credentials[:user_id],
user_secret: credentials[:user_secret],
account_id: snaptrade_account.snaptrade_account_id,
start_date: start_date,
end_date: end_date
)
stats["api_requests"] = stats.fetch("api_requests", 0) + 1
# Handle paginated response
activities = extract_activities_from_response(response)
Rails.logger.info "SnaptradeItem::Importer - get_account_activities returned #{activities.size} items"
activities_data = activities.map { |a| sdk_object_to_hash(a) }
all_activities.concat(activities_data)
# If the per-account endpoint returned few results, also try the cross-account endpoint
# as a fallback (some brokerages may work better with one or the other)
if activities_data.size < 10 && (end_date - start_date).to_i > 365
Rails.logger.info "SnaptradeItem::Importer - Few results from per-account endpoint, trying cross-account endpoint"
cross_account_activities = fetch_via_cross_account_endpoint(
snaptrade_account, credentials, start_date: start_date, end_date: end_date
)
if cross_account_activities.size > activities_data.size
Rails.logger.info "SnaptradeItem::Importer - Cross-account endpoint returned more: #{cross_account_activities.size} vs #{activities_data.size}"
all_activities = cross_account_activities
end
end
# Only save if we actually got new activities
# Don't upsert empty arrays as this sets last_activities_sync incorrectly
if all_activities.any?
existing = snaptrade_account.raw_activities_payload || []
merged = merge_activities(existing, all_activities)
snaptrade_account.upsert_activities_snapshot!(merged)
stats["activities_found"] = stats.fetch("activities_found", 0) + all_activities.size
end
all_activities.size
rescue => e
Rails.logger.error "SnaptradeItem::Importer - Failed to fetch activities: #{e.class} - #{e.message}"
Rails.logger.error e.backtrace.first(5).join("\n") if e.backtrace
register_error(e, context: "activities", account_id: snaptrade_account.id)
0
end
end
# Fallback: try the cross-account endpoint which may work better for some brokerages
def fetch_via_cross_account_endpoint(snaptrade_account, credentials, start_date:, end_date:)
activities = snaptrade_provider.get_activities(
user_id: credentials[:user_id],
user_secret: credentials[:user_secret],
start_date: start_date,
end_date: end_date,
accounts: snaptrade_account.snaptrade_account_id
)
stats["api_requests"] = stats.fetch("api_requests", 0) + 1
activities = activities || []
activities.map { |a| sdk_object_to_hash(a) }
rescue => e
Rails.logger.warn "SnaptradeItem::Importer - Cross-account endpoint fallback failed: #{e.message}"
[]
end
# Merge activities, deduplicating by ID
# Fallback key includes symbol to distinguish activities with same date/type/amount
def merge_activities(existing, new_activities)
by_id = {}
existing.each do |activity|
a = activity.with_indifferent_access
key = a[:id] || activity_fallback_key(a)
by_id[key] = activity
end
new_activities.each do |activity|
a = activity.with_indifferent_access
key = a[:id] || activity_fallback_key(a)
by_id[key] = activity # Newer data wins
end
by_id.values
end
def activity_fallback_key(activity)
symbol = activity.dig(:symbol, :symbol) || activity.dig("symbol", "symbol")
[ activity[:settlement_date], activity[:type], activity[:amount], symbol ]
end
def prune_removed_accounts(upstream_account_ids)
return if upstream_account_ids.blank?
# Find accounts that no longer exist upstream
orphaned = snaptrade_item.snaptrade_accounts
.where.not(snaptrade_account_id: upstream_account_ids)
.where.not(snaptrade_account_id: nil)
orphaned.each do |snaptrade_account|
# Only delete if not linked to a Sure account
if snaptrade_account.current_account.blank?
Rails.logger.info "SnaptradeItem::Importer - Pruning orphaned account #{snaptrade_account.id}"
snaptrade_account.destroy
stats["accounts_pruned"] = stats.fetch("accounts_pruned", 0) + 1
end
end
end
def register_error(error, account_data: nil, context: nil, account_id: nil)
# Extract account name safely from SDK object or hash
account_name = extract_account_name(account_data)
stats["errors"] ||= []
stats["errors"] << {
message: error.message,
context: context,
account_id: account_id,
account_name: account_name
}.compact
stats["errors"] = stats["errors"].last(10)
stats["total_errors"] = stats.fetch("total_errors", 0) + 1
end
def extract_account_name(account_data)
return nil if account_data.nil?
if account_data.respond_to?(:name)
account_data.name
elsif account_data.respond_to?(:dig)
account_data.dig(:name)
elsif account_data.respond_to?(:[])
account_data[:name]
end
end
# Convert various date representations to a Date object
def ensure_date(value)
return nil if value.nil?
return value if value.is_a?(Date)
return value.to_date if value.is_a?(Time) || value.is_a?(DateTime) || value.is_a?(ActiveSupport::TimeWithZone)
if value.is_a?(String)
Date.parse(value)
elsif value.respond_to?(:to_date)
value.to_date
else
nil
end
rescue ArgumentError, TypeError
nil
end
end