Files
sure/lib/generators/provider/family/templates/importer.rb.tt

317 lines
12 KiB
Plaintext

# frozen_string_literal: true
class <%= class_name %>Item::Importer
include SyncStats::Collector
include <%= class_name %>Account::DataHelpers
<% if investment_provider? -%>
# Chunk size for fetching activities
ACTIVITY_CHUNK_DAYS = 365
MAX_ACTIVITY_CHUNKS = 3 # Up to 3 years of history
# Minimum existing activities required before using incremental sync
MINIMUM_HISTORY_FOR_INCREMENTAL = 10
<% end -%>
attr_reader :<%= file_name %>_item, :<%= file_name %>_provider, :sync
def initialize(<%= file_name %>_item, <%= file_name %>_provider:, sync: nil)
@<%= file_name %>_item = <%= file_name %>_item
@<%= file_name %>_provider = <%= file_name %>_provider
@sync = sync
end
class CredentialsError < StandardError; end
def import
Rails.logger.info "<%= class_name %>Item::Importer - Starting import for item #{<%= file_name %>_item.id}"
credentials = <%= file_name %>_item.<%= file_name %>_credentials
unless credentials
raise CredentialsError, "No <%= class_name %> credentials configured for item #{<%= file_name %>_item.id}"
end
# Step 1: Fetch and store all accounts
import_accounts(credentials)
# Step 2: For LINKED accounts only, fetch data
# Unlinked accounts just need basic info (name, balance) for the setup modal
linked_accounts = <%= class_name %>Account
.where(<%= file_name %>_item_id: <%= file_name %>_item.id)
.joins(:account_provider)
Rails.logger.info "<%= class_name %>Item::Importer - Found #{linked_accounts.count} linked accounts to process"
linked_accounts.each do |<%= file_name %>_account|
Rails.logger.info "<%= class_name %>Item::Importer - Processing linked account #{<%= file_name %>_account.id}"
import_account_data(<%= file_name %>_account, credentials)
end
# Update raw payload on the item
<%= file_name %>_item.upsert_<%= file_name %>_snapshot!(stats)
rescue Provider::<%= class_name %>::AuthenticationError => e
<%= file_name %>_item.update!(status: :requires_update)
raise
end
private
def stats
@stats ||= {}
end
def persist_stats!
return unless sync&.respond_to?(:sync_stats)
merged = (sync.sync_stats || {}).merge(stats)
sync.update_columns(sync_stats: merged)
end
def import_accounts(credentials)
Rails.logger.info "<%= class_name %>Item::Importer - Fetching accounts"
# TODO: Implement API call to fetch accounts
# accounts_data = <%= file_name %>_provider.list_accounts(...)
accounts_data = []
stats["api_requests"] = stats.fetch("api_requests", 0) + 1
stats["total_accounts"] = accounts_data.size
# Track upstream account IDs to detect removed accounts
upstream_account_ids = []
accounts_data.each do |account_data|
begin
import_account(account_data, credentials)
# TODO: Extract account ID from your provider's response format
# upstream_account_ids << account_data[:id].to_s if account_data[:id]
rescue => e
Rails.logger.error "<%= class_name %>Item::Importer - Failed to import account: #{e.message}"
stats["accounts_skipped"] = stats.fetch("accounts_skipped", 0) + 1
register_error(e, account_data: account_data)
end
end
persist_stats!
# Clean up accounts that no longer exist upstream
prune_removed_accounts(upstream_account_ids)
end
def import_account(account_data, credentials)
# TODO: Customize based on your provider's account ID field
# <%= file_name %>_account_id = account_data[:id].to_s
# return if <%= file_name %>_account_id.blank?
# <%= file_name %>_account = <%= file_name %>_item.<%= file_name %>_accounts.find_or_initialize_by(
# <%= file_name %>_account_id: <%= file_name %>_account_id
# )
# Update from API data
# <%= file_name %>_account.upsert_from_<%= file_name %>!(account_data)
stats["accounts_imported"] = stats.fetch("accounts_imported", 0) + 1
end
def import_account_data(<%= file_name %>_account, credentials)
<% if investment_provider? -%>
# Import holdings
import_holdings(<%= file_name %>_account, credentials)
# Import activities
import_activities(<%= file_name %>_account, credentials)
<% else -%>
# Import transactions
import_transactions(<%= file_name %>_account, credentials)
<% end -%>
end
<% if investment_provider? -%>
def import_holdings(<%= file_name %>_account, credentials)
Rails.logger.info "<%= class_name %>Item::Importer - Fetching holdings for account #{<%= file_name %>_account.id}"
begin
# TODO: Implement API call to fetch holdings
# holdings_data = <%= file_name %>_provider.get_holdings(account_id: <%= file_name %>_account.<%= file_name %>_account_id)
holdings_data = []
stats["api_requests"] = stats.fetch("api_requests", 0) + 1
if holdings_data.any?
# Convert SDK objects to hashes for storage
holdings_hashes = holdings_data.map { |h| sdk_object_to_hash(h) }
<%= file_name %>_account.upsert_holdings_snapshot!(holdings_hashes)
stats["holdings_found"] = stats.fetch("holdings_found", 0) + holdings_data.size
end
rescue => e
Rails.logger.warn "<%= class_name %>Item::Importer - Failed to fetch holdings: #{e.message}"
register_error(e, context: "holdings", account_id: <%= file_name %>_account.id)
end
end
def import_activities(<%= file_name %>_account, credentials)
Rails.logger.info "<%= class_name %>Item::Importer - Fetching activities for account #{<%= file_name %>_account.id}"
begin
# Determine date range
start_date = calculate_start_date(<%= file_name %>_account)
end_date = Date.current
# TODO: Implement API call to fetch activities
# activities_data = <%= file_name %>_provider.get_activities(
# account_id: <%= file_name %>_account.<%= file_name %>_account_id,
# start_date: start_date,
# end_date: end_date
# )
activities_data = []
stats["api_requests"] = stats.fetch("api_requests", 0) + 1
if activities_data.any?
# Convert SDK objects to hashes and merge with existing
activities_hashes = activities_data.map { |a| sdk_object_to_hash(a) }
merged = merge_activities(<%= file_name %>_account.raw_activities_payload || [], activities_hashes)
<%= file_name %>_account.upsert_activities_snapshot!(merged)
stats["activities_found"] = stats.fetch("activities_found", 0) + activities_data.size
elsif fresh_linked_account?(<%= file_name %>_account)
# Fresh account with no activities - schedule background fetch
schedule_background_activities_fetch(<%= file_name %>_account, start_date)
end
rescue => e
Rails.logger.warn "<%= class_name %>Item::Importer - Failed to fetch activities: #{e.message}"
register_error(e, context: "activities", account_id: <%= file_name %>_account.id)
end
end
def calculate_start_date(<%= file_name %>_account)
# Use user-specified start date if available
user_start = <%= file_name %>_account.sync_start_date
return user_start if user_start.present?
# For accounts with existing history, use incremental sync
existing_count = (<%= file_name %>_account.raw_activities_payload || []).size
if existing_count >= MINIMUM_HISTORY_FOR_INCREMENTAL && <%= file_name %>_account.last_activities_sync.present?
# Incremental: go back 30 days from last sync to catch updates
(<%= file_name %>_account.last_activities_sync - 30.days).to_date
else
# Full sync: go back up to 3 years
(ACTIVITY_CHUNK_DAYS * MAX_ACTIVITY_CHUNKS).days.ago.to_date
end
end
def fresh_linked_account?(<%= file_name %>_account)
# Account was just linked and has no activity history yet
<%= file_name %>_account.last_activities_sync.nil? &&
(<%= file_name %>_account.raw_activities_payload || []).empty?
end
def schedule_background_activities_fetch(<%= file_name %>_account, start_date)
return if <%= file_name %>_account.activities_fetch_pending?
Rails.logger.info "<%= class_name %>Item::Importer - Scheduling background activities fetch for account #{<%= file_name %>_account.id}"
<%= file_name %>_account.update!(activities_fetch_pending: true)
<%= class_name %>ActivitiesFetchJob.perform_later(<%= file_name %>_account, start_date: start_date)
end
def merge_activities(existing, new_activities)
# Merge by ID, preferring newer data
by_id = {}
existing.each { |a| by_id[activity_key(a)] = a }
new_activities.each { |a| by_id[activity_key(a)] = a }
by_id.values
end
def activity_key(activity)
activity = activity.with_indifferent_access if activity.is_a?(Hash)
# Use ID if available, otherwise generate key from date/type/amount
activity[:id] || activity["id"] ||
[ activity[:date], activity[:type], activity[:amount], activity[:symbol] ].join("-")
end
<% else -%>
def import_transactions(<%= file_name %>_account, credentials)
Rails.logger.info "<%= class_name %>Item::Importer - Fetching transactions for account #{<%= file_name %>_account.id}"
begin
# Determine date range
start_date = calculate_transaction_start_date(<%= file_name %>_account)
end_date = Date.current
# TODO: Implement API call to fetch transactions
# transactions_data = <%= file_name %>_provider.get_transactions(
# account_id: <%= file_name %>_account.<%= file_name %>_account_id,
# start_date: start_date,
# end_date: end_date
# )
transactions_data = []
stats["api_requests"] = stats.fetch("api_requests", 0) + 1
if transactions_data.any?
# Convert SDK objects to hashes and merge with existing
transactions_hashes = transactions_data.map { |t| sdk_object_to_hash(t) }
merged = merge_transactions(<%= file_name %>_account.raw_transactions_payload || [], transactions_hashes)
<%= file_name %>_account.upsert_<%= file_name %>_transactions_snapshot!(merged)
stats["transactions_found"] = stats.fetch("transactions_found", 0) + transactions_data.size
end
rescue => e
Rails.logger.warn "<%= class_name %>Item::Importer - Failed to fetch transactions: #{e.message}"
register_error(e, context: "transactions", account_id: <%= file_name %>_account.id)
end
end
def calculate_transaction_start_date(<%= file_name %>_account)
# Use user-specified start date if available
user_start = <%= file_name %>_account.sync_start_date
return user_start if user_start.present?
# For accounts with existing transactions, use incremental sync
existing_count = (<%= file_name %>_account.raw_transactions_payload || []).size
if existing_count >= 10 && <%= file_name %>_item.last_synced_at.present?
# Incremental: go back 7 days from last sync to catch updates
(<%= file_name %>_item.last_synced_at - 7.days).to_date
else
# Full sync: go back 90 days
90.days.ago.to_date
end
end
def merge_transactions(existing, new_transactions)
# Merge by ID, preferring newer data
by_id = {}
existing.each { |t| by_id[transaction_key(t)] = t }
new_transactions.each { |t| by_id[transaction_key(t)] = t }
by_id.values
end
def transaction_key(transaction)
transaction = transaction.with_indifferent_access if transaction.is_a?(Hash)
# Use ID if available, otherwise generate key from date/amount/description
transaction[:id] || transaction["id"] ||
[ transaction[:date], transaction[:amount], transaction[:description] ].join("-")
end
<% end -%>
def prune_removed_accounts(upstream_account_ids)
return if upstream_account_ids.empty?
# Find accounts that exist locally but not upstream
removed = <%= file_name %>_item.<%= file_name %>_accounts
.where.not(<%= file_name %>_account_id: upstream_account_ids)
if removed.any?
Rails.logger.info "<%= class_name %>Item::Importer - Pruning #{removed.count} removed accounts"
removed.destroy_all
end
end
def register_error(error, **context)
stats["errors"] ||= []
stats["errors"] << {
message: error.message,
context: context.to_s,
timestamp: Time.current.iso8601
}
end
end