mirror of
https://github.com/we-promise/sure.git
synced 2026-04-19 03:54:08 +00:00
Harden SimpleFIN sync: protect user data, fix stuck syncs, optimize API calls (#671)
* Implement entry protection flags for sync overwrites - Added `user_modified` and `import_locked` flags to `entries` table to prevent provider sync from overwriting user-edited and imported data. - Introduced backfill migration to mark existing entries based on conditions. - Enhanced sync and processing logic to respect protection flags, track skipped entries, and log detailed stats. - Updated UI to display skipped/protected entries and reasons in sync summaries. * Localize error details summary text and adjust `sync_account_later` method placement * Restored schema.rb --------- Co-authored-by: luckyPipewrench <luckypipewrench@proton.me>
This commit is contained in:
@@ -37,6 +37,37 @@
|
||||
<span><%= t("provider_sync_summary.transactions.updated", count: tx_updated) %></span>
|
||||
<span><%= t("provider_sync_summary.transactions.skipped", count: tx_skipped) %></span>
|
||||
</div>
|
||||
|
||||
<%# Protected entries detail - shown when entries were skipped due to protection %>
|
||||
<% if has_skipped_entries? %>
|
||||
<div class="mt-2">
|
||||
<div class="flex items-center gap-1">
|
||||
<%= helpers.icon "shield-check", size: "sm" %>
|
||||
<span class="text-secondary"><%= t("provider_sync_summary.transactions.protected", count: tx_skipped) %></span>
|
||||
</div>
|
||||
<% if skip_summary.any? %>
|
||||
<div class="text-xs text-secondary mt-1">
|
||||
<% skip_summary.each do |reason, count| %>
|
||||
<span class="mr-2"><%= t("provider_sync_summary.skip_reasons.#{reason}", default: reason.humanize) %>: <%= count %></span>
|
||||
<% end %>
|
||||
</div>
|
||||
<% end %>
|
||||
<% if skip_details.any? %>
|
||||
<details class="mt-1">
|
||||
<summary class="text-xs cursor-pointer text-secondary hover:text-primary">
|
||||
<%= t("provider_sync_summary.transactions.view_protected") %>
|
||||
</summary>
|
||||
<div class="mt-1 pl-2 border-l-2 border-surface-inset space-y-1">
|
||||
<% skip_details.each do |detail| %>
|
||||
<p class="text-xs text-secondary">
|
||||
<%= detail["name"] %> (<%= t("provider_sync_summary.skip_reasons.#{detail["reason"]}", default: detail["reason"].humanize) %>)
|
||||
</p>
|
||||
<% end %>
|
||||
</div>
|
||||
</details>
|
||||
<% end %>
|
||||
</div>
|
||||
<% end %>
|
||||
</div>
|
||||
<% end %>
|
||||
|
||||
@@ -62,6 +93,18 @@
|
||||
<% end %>
|
||||
<% if has_errors? %>
|
||||
<span class="text-destructive"><%= t("provider_sync_summary.health.errors", count: total_errors) %></span>
|
||||
<% if error_details.any? %>
|
||||
<details class="mt-1">
|
||||
<summary class="text-xs cursor-pointer text-secondary hover:text-primary"><%= t("provider_sync_summary.health.view_error_details") %></summary>
|
||||
<div class="mt-1 pl-2 border-l-2 border-destructive/30 space-y-1">
|
||||
<% error_details.each do |detail| %>
|
||||
<p class="text-xs text-destructive">
|
||||
<% if detail["name"].present? %><strong><%= detail["name"] %>:</strong> <% end %><%= detail["message"] %>
|
||||
</p>
|
||||
<% end %>
|
||||
</div>
|
||||
</details>
|
||||
<% end %>
|
||||
<% elsif import_started? %>
|
||||
<span class="text-success"><%= t("provider_sync_summary.health.errors", count: 0) %></span>
|
||||
<% else %>
|
||||
|
||||
@@ -68,6 +68,19 @@ class ProviderSyncSummary < ViewComponent::Base
|
||||
stats.key?("tx_seen") || stats.key?("tx_imported") || stats.key?("tx_updated")
|
||||
end
|
||||
|
||||
# Skip statistics (protected entries not overwritten)
|
||||
def has_skipped_entries?
|
||||
tx_skipped > 0
|
||||
end
|
||||
|
||||
def skip_summary
|
||||
stats["skip_summary"] || {}
|
||||
end
|
||||
|
||||
def skip_details
|
||||
stats["skip_details"] || []
|
||||
end
|
||||
|
||||
# Holdings statistics
|
||||
def holdings_found
|
||||
stats["holdings_found"].to_i
|
||||
@@ -127,6 +140,14 @@ class ProviderSyncSummary < ViewComponent::Base
|
||||
total_errors > 0
|
||||
end
|
||||
|
||||
def error_details
|
||||
stats["errors"] || []
|
||||
end
|
||||
|
||||
def error_buckets
|
||||
stats["error_buckets"] || {}
|
||||
end
|
||||
|
||||
# Stale pending transactions (auto-excluded)
|
||||
def stale_pending_excluded
|
||||
stats["stale_pending_excluded"].to_i
|
||||
|
||||
@@ -30,6 +30,7 @@ class TradesController < ApplicationController
|
||||
|
||||
def update
|
||||
if @entry.update(update_entry_params)
|
||||
@entry.mark_user_modified!
|
||||
@entry.sync_account_later
|
||||
|
||||
respond_to do |format|
|
||||
|
||||
@@ -93,9 +93,10 @@ class TransactionsController < ApplicationController
|
||||
}
|
||||
end
|
||||
|
||||
@entry.sync_account_later
|
||||
@entry.lock_saved_attributes!
|
||||
@entry.mark_user_modified!
|
||||
@entry.transaction.lock_attr!(:tag_ids) if @entry.transaction.tags.any?
|
||||
@entry.sync_account_later
|
||||
|
||||
respond_to do |format|
|
||||
format.html { redirect_back_or_to account_path(@entry.account), notice: "Transaction updated" }
|
||||
|
||||
@@ -1,8 +1,14 @@
|
||||
class Account::ProviderImportAdapter
|
||||
attr_reader :account
|
||||
attr_reader :account, :skipped_entries
|
||||
|
||||
def initialize(account)
|
||||
@account = account
|
||||
@skipped_entries = []
|
||||
end
|
||||
|
||||
# Resets skipped entries tracking (call at start of new sync batch)
|
||||
def reset_skipped_entries!
|
||||
@skipped_entries = []
|
||||
end
|
||||
|
||||
# Imports a transaction from a provider
|
||||
@@ -31,6 +37,24 @@ class Account::ProviderImportAdapter
|
||||
e.entryable = Transaction.new
|
||||
end
|
||||
|
||||
# === TYPE COLLISION CHECK: Must happen before protection check ===
|
||||
# If entry exists but is a different type (e.g., Trade), that's an error.
|
||||
# This prevents external_id collisions across different entryable types.
|
||||
if entry.persisted? && !entry.entryable.is_a?(Transaction)
|
||||
raise ArgumentError, "Entry with external_id '#{external_id}' already exists with different entryable type: #{entry.entryable_type}"
|
||||
end
|
||||
|
||||
# === PROTECTION CHECK: Skip entries that should not be overwritten ===
|
||||
# Check persisted Transaction entries for protection flags before making changes.
|
||||
# This prevents sync from overwriting user edits, CSV imports, or excluded entries.
|
||||
if entry.persisted?
|
||||
skip_reason = determine_skip_reason(entry)
|
||||
if skip_reason
|
||||
record_skip(entry, skip_reason)
|
||||
return entry
|
||||
end
|
||||
end
|
||||
|
||||
# If this is a new entry, check for potential duplicates from manual/CSV imports
|
||||
# This handles the case where a user manually created or CSV imported a transaction
|
||||
# before linking their account to a provider
|
||||
@@ -38,7 +62,14 @@ class Account::ProviderImportAdapter
|
||||
if entry.new_record?
|
||||
duplicate = find_duplicate_transaction(date: date, amount: amount, currency: currency)
|
||||
if duplicate
|
||||
# "Claim" the duplicate by updating its external_id and source
|
||||
# Check if duplicate is protected - if so, link but don't modify
|
||||
if duplicate.protected_from_sync?
|
||||
duplicate.update!(external_id: external_id, source: source)
|
||||
record_skip(duplicate, determine_skip_reason(duplicate) || "protected")
|
||||
return duplicate
|
||||
end
|
||||
|
||||
# "Claim" the unprotected duplicate by updating its external_id and source
|
||||
# This prevents future duplicate checks from matching it again
|
||||
entry = duplicate
|
||||
entry.assign_attributes(external_id: external_id, source: source)
|
||||
@@ -81,11 +112,6 @@ class Account::ProviderImportAdapter
|
||||
# Track if this is a new posted transaction (for fuzzy suggestion after save)
|
||||
is_new_posted = entry.new_record? && !incoming_pending
|
||||
|
||||
# Validate entryable type matches to prevent external_id collisions
|
||||
if entry.persisted? && !entry.entryable.is_a?(Transaction)
|
||||
raise ArgumentError, "Entry with external_id '#{external_id}' already exists with different entryable type: #{entry.entryable_type}"
|
||||
end
|
||||
|
||||
entry.assign_attributes(
|
||||
amount: amount,
|
||||
currency: currency,
|
||||
@@ -763,4 +789,30 @@ class Account::ProviderImportAdapter
|
||||
)
|
||||
)
|
||||
end
|
||||
|
||||
# Determines why an entry should be skipped during sync.
|
||||
# Returns nil if entry should NOT be skipped.
|
||||
#
|
||||
# @param entry [Entry] The entry to check
|
||||
# @return [String, nil] Skip reason or nil if entry can be synced
|
||||
def determine_skip_reason(entry)
|
||||
return "excluded" if entry.excluded?
|
||||
return "user_modified" if entry.user_modified?
|
||||
return "import_locked" if entry.import_locked?
|
||||
nil
|
||||
end
|
||||
|
||||
# Records a skipped entry for stats collection.
|
||||
#
|
||||
# @param entry [Entry] The entry that was skipped
|
||||
# @param reason [String] Why it was skipped
|
||||
def record_skip(entry, reason)
|
||||
@skipped_entries << {
|
||||
id: entry.id,
|
||||
name: entry.name,
|
||||
reason: reason,
|
||||
external_id: entry.external_id,
|
||||
account_name: entry.account.name
|
||||
}
|
||||
end
|
||||
end
|
||||
|
||||
@@ -180,6 +180,37 @@ module SyncStats
|
||||
sync.update!(sync_stats: { "cleared_at" => Time.current.iso8601 })
|
||||
end
|
||||
|
||||
# Collects statistics about entries that were skipped during sync.
|
||||
# Skipped entries are those protected from sync overwrites (user-modified,
|
||||
# import-locked, excluded, or converted to different types).
|
||||
#
|
||||
# @param sync [Sync] The sync record to update
|
||||
# @param skipped_entries [Array<Hash>] Array of skipped entry info with :id, :name, :reason, :account_name
|
||||
# @return [Hash] The skip stats that were collected
|
||||
def collect_skip_stats(sync, skipped_entries:)
|
||||
return {} unless sync.respond_to?(:sync_stats)
|
||||
return {} if skipped_entries.blank?
|
||||
|
||||
# Group by reason for summary breakdown
|
||||
by_reason = skipped_entries.group_by { |e| e[:reason] }
|
||||
|
||||
skip_stats = {
|
||||
"tx_skipped" => skipped_entries.size,
|
||||
"skip_summary" => by_reason.transform_values(&:size),
|
||||
"skip_details" => skipped_entries.first(20).map do |e|
|
||||
{
|
||||
"entry_id" => e[:id].to_s,
|
||||
"name" => e[:name],
|
||||
"reason" => e[:reason],
|
||||
"account_name" => e[:account_name]
|
||||
}
|
||||
end
|
||||
}
|
||||
|
||||
merge_sync_stats(sync, skip_stats)
|
||||
skip_stats
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# Merges new stats into the existing sync_stats hash.
|
||||
|
||||
@@ -11,13 +11,17 @@ module Syncable
|
||||
|
||||
# Schedules a sync for syncable. If there is an existing sync pending/syncing for this syncable,
|
||||
# we do not create a new sync, and attempt to expand the sync window if needed.
|
||||
#
|
||||
# NOTE: Uses `visible` scope (syncs < 5 min old) instead of `incomplete` to prevent
|
||||
# getting stuck on stale syncs after server/Sidekiq restarts. If a sync is older than
|
||||
# 5 minutes, we assume its job was lost and create a new sync.
|
||||
def sync_later(parent_sync: nil, window_start_date: nil, window_end_date: nil)
|
||||
Sync.transaction do
|
||||
with_lock do
|
||||
sync = self.syncs.incomplete.first
|
||||
sync = self.syncs.visible.first
|
||||
|
||||
if sync
|
||||
Rails.logger.info("There is an existing sync, expanding window if needed (#{sync.id})")
|
||||
Rails.logger.info("There is an existing recent sync, expanding window if needed (#{sync.id})")
|
||||
sync.expand_window_if_needed(window_start_date, window_end_date)
|
||||
|
||||
# Update parent relationship if one is provided and sync doesn't already have a parent
|
||||
|
||||
@@ -243,6 +243,23 @@ class Entry < ApplicationRecord
|
||||
external_id.present?
|
||||
end
|
||||
|
||||
# Checks if entry should be protected from provider sync overwrites.
|
||||
# This does NOT prevent user from editing - only protects from automated sync.
|
||||
#
|
||||
# @return [Boolean] true if entry should be skipped during provider sync
|
||||
def protected_from_sync?
|
||||
excluded? || user_modified? || import_locked?
|
||||
end
|
||||
|
||||
# Marks entry as user-modified after manual edit.
|
||||
# Called when user edits any field to prevent provider sync from overwriting.
|
||||
#
|
||||
# @return [Boolean] true if successfully marked
|
||||
def mark_user_modified!
|
||||
return true if user_modified?
|
||||
update!(user_modified: true)
|
||||
end
|
||||
|
||||
class << self
|
||||
def search(params)
|
||||
EntrySearch.new(params).build_query(all)
|
||||
@@ -272,6 +289,7 @@ class Entry < ApplicationRecord
|
||||
entry.update! bulk_attributes
|
||||
|
||||
entry.lock_saved_attributes!
|
||||
entry.mark_user_modified!
|
||||
entry.entryable.lock_attr!(:tag_ids) if entry.transaction? && entry.transaction.tags.any?
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
class SimplefinAccount::Processor
|
||||
include SimplefinNumericHelpers
|
||||
attr_reader :simplefin_account
|
||||
attr_reader :simplefin_account, :skipped_entries
|
||||
|
||||
def initialize(simplefin_account)
|
||||
@simplefin_account = simplefin_account
|
||||
@skipped_entries = []
|
||||
end
|
||||
|
||||
# Each step represents different SimpleFin data processing
|
||||
@@ -144,7 +145,9 @@ class SimplefinAccount::Processor
|
||||
end
|
||||
|
||||
def process_transactions
|
||||
SimplefinAccount::Transactions::Processor.new(simplefin_account).process
|
||||
processor = SimplefinAccount::Transactions::Processor.new(simplefin_account)
|
||||
processor.process
|
||||
@skipped_entries.concat(processor.skipped_entries)
|
||||
rescue => e
|
||||
report_exception(e, "transactions")
|
||||
end
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
class SimplefinAccount::Transactions::Processor
|
||||
attr_reader :simplefin_account
|
||||
attr_reader :simplefin_account, :skipped_entries
|
||||
|
||||
def initialize(simplefin_account)
|
||||
@simplefin_account = simplefin_account
|
||||
@skipped_entries = []
|
||||
end
|
||||
|
||||
def process
|
||||
@@ -24,12 +25,16 @@ class SimplefinAccount::Transactions::Processor
|
||||
processed_count = 0
|
||||
error_count = 0
|
||||
|
||||
# Use a shared adapter to accumulate skipped entries across all transactions
|
||||
adapter = Account::ProviderImportAdapter.new(acct) if acct
|
||||
|
||||
# Each entry is processed inside a transaction, but to avoid locking up the DB when
|
||||
# there are hundreds or thousands of transactions, we process them individually.
|
||||
transactions.each do |transaction_data|
|
||||
SimplefinEntry::Processor.new(
|
||||
transaction_data,
|
||||
simplefin_account: simplefin_account
|
||||
simplefin_account: simplefin_account,
|
||||
import_adapter: adapter
|
||||
).process
|
||||
processed_count += 1
|
||||
rescue => e
|
||||
@@ -39,7 +44,10 @@ class SimplefinAccount::Transactions::Processor
|
||||
Rails.logger.error e.backtrace.first(5).join("\n") if e.backtrace
|
||||
end
|
||||
|
||||
Rails.logger.info "SimplefinAccount::Transactions::Processor - Completed for simplefin_account #{simplefin_account.id}: #{processed_count} processed, #{error_count} errors"
|
||||
# Collect skipped entries from shared adapter
|
||||
@skipped_entries = adapter&.skipped_entries || []
|
||||
|
||||
Rails.logger.info "SimplefinAccount::Transactions::Processor - Completed for simplefin_account #{simplefin_account.id}: #{processed_count} processed, #{error_count} errors, #{@skipped_entries.size} skipped (protected)"
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
@@ -3,9 +3,11 @@ require "digest/md5"
|
||||
class SimplefinEntry::Processor
|
||||
include CurrencyNormalizable
|
||||
# simplefin_transaction is the raw hash fetched from SimpleFin API and converted to JSONB
|
||||
def initialize(simplefin_transaction, simplefin_account:)
|
||||
# @param import_adapter [Account::ProviderImportAdapter, nil] Optional shared adapter for accumulating skipped entries
|
||||
def initialize(simplefin_transaction, simplefin_account:, import_adapter: nil)
|
||||
@simplefin_transaction = simplefin_transaction
|
||||
@simplefin_account = simplefin_account
|
||||
@shared_import_adapter = import_adapter
|
||||
end
|
||||
|
||||
def process
|
||||
@@ -76,7 +78,8 @@ class SimplefinEntry::Processor
|
||||
end
|
||||
|
||||
def import_adapter
|
||||
@import_adapter ||= Account::ProviderImportAdapter.new(account)
|
||||
# Use shared adapter if provided, otherwise create new one
|
||||
@import_adapter ||= @shared_import_adapter || Account::ProviderImportAdapter.new(account)
|
||||
end
|
||||
|
||||
def account
|
||||
|
||||
@@ -106,14 +106,20 @@ class SimplefinItem < ApplicationRecord
|
||||
end
|
||||
end
|
||||
|
||||
all_skipped_entries = []
|
||||
|
||||
linked.each do |simplefin_account|
|
||||
acct = simplefin_account.current_account
|
||||
Rails.logger.info "SimplefinItem#process_accounts - Processing: SimplefinAccount id=#{simplefin_account.id} name='#{simplefin_account.name}' -> Account id=#{acct.id} name='#{acct.name}' type=#{acct.accountable_type}"
|
||||
SimplefinAccount::Processor.new(simplefin_account).process
|
||||
processor = SimplefinAccount::Processor.new(simplefin_account)
|
||||
processor.process
|
||||
all_skipped_entries.concat(processor.skipped_entries)
|
||||
end
|
||||
|
||||
Rails.logger.info "SimplefinItem#process_accounts END"
|
||||
Rails.logger.info "SimplefinItem#process_accounts END - #{all_skipped_entries.size} entries skipped (protected)"
|
||||
Rails.logger.info "=" * 60
|
||||
|
||||
all_skipped_entries
|
||||
end
|
||||
|
||||
# Repairs stale linkages when user re-adds institution in SimpleFIN.
|
||||
|
||||
@@ -342,10 +342,12 @@ class SimplefinItem::Importer
|
||||
end
|
||||
|
||||
def import_with_chunked_history
|
||||
# SimpleFin's actual limit is 60 days (not 365 as documented)
|
||||
# Use 60-day chunks to stay within limits
|
||||
# SimpleFin's actual limit is 60 days per request (not 365 as documented).
|
||||
# SimpleFin typically only provides 60-90 days of history (bank-dependent).
|
||||
# Use adaptive chunking: start with 2 chunks, continue if new data found,
|
||||
# stop after 2 consecutive empty chunks. Max 6 chunks (360 days) for safety.
|
||||
chunk_size_days = 60
|
||||
max_requests = 22
|
||||
max_requests = 6 # Down from 22 - SimpleFIN rarely provides >90 days anyway
|
||||
current_end_date = Time.current
|
||||
|
||||
# Decide how far back to walk:
|
||||
@@ -359,11 +361,11 @@ class SimplefinItem::Importer
|
||||
default_start_date = implied_max_lookback_days.days.ago
|
||||
target_start_date = user_start_date ? user_start_date.beginning_of_day : default_start_date
|
||||
|
||||
# Enforce maximum 3-year lookback to respect SimpleFin's actual 60-day limit per request
|
||||
# With 22 requests max: 60 days × 22 = 1,320 days = 3.6 years, so 3 years is safe
|
||||
max_lookback_date = 3.years.ago.beginning_of_day
|
||||
# Enforce maximum 1-year lookback since SimpleFIN rarely provides more than 90 days
|
||||
# This saves unnecessary API calls while still covering edge cases
|
||||
max_lookback_date = 1.year.ago.beginning_of_day
|
||||
if target_start_date < max_lookback_date
|
||||
Rails.logger.info "SimpleFin: Limiting sync start date from #{target_start_date.strftime('%Y-%m-%d')} to #{max_lookback_date.strftime('%Y-%m-%d')} due to rate limits"
|
||||
Rails.logger.info "SimpleFin: Limiting sync start date from #{target_start_date.strftime('%Y-%m-%d')} to #{max_lookback_date.strftime('%Y-%m-%d')} (SimpleFIN typically provides 60-90 days max)"
|
||||
target_start_date = max_lookback_date
|
||||
end
|
||||
|
||||
@@ -373,8 +375,11 @@ class SimplefinItem::Importer
|
||||
|
||||
total_accounts_imported = 0
|
||||
chunk_count = 0
|
||||
consecutive_empty_chunks = 0
|
||||
total_new_transactions = 0
|
||||
stopped_early = false
|
||||
|
||||
Rails.logger.info "SimpleFin chunked sync: syncing from #{target_start_date.strftime('%Y-%m-%d')} to #{current_end_date.strftime('%Y-%m-%d')}"
|
||||
Rails.logger.info "SimpleFin chunked sync: syncing from #{target_start_date.strftime('%Y-%m-%d')} to #{current_end_date.strftime('%Y-%m-%d')} (max #{max_requests} chunks)"
|
||||
|
||||
# Walk backwards from current_end_date in proper chunks
|
||||
chunk_end_date = current_end_date
|
||||
@@ -399,6 +404,9 @@ class SimplefinItem::Importer
|
||||
|
||||
Rails.logger.info "SimpleFin chunked sync: fetching chunk #{chunk_count}/#{max_requests} (#{chunk_start_date.strftime('%Y-%m-%d')} to #{chunk_end_date.strftime('%Y-%m-%d')}) - #{actual_days} days"
|
||||
|
||||
# Count transactions before this chunk
|
||||
pre_chunk_tx_count = count_linked_transactions
|
||||
|
||||
accounts_data = fetch_accounts_data(start_date: chunk_start_date, end_date: chunk_end_date)
|
||||
return if accounts_data.nil? # Error already handled
|
||||
|
||||
@@ -432,6 +440,26 @@ class SimplefinItem::Importer
|
||||
end
|
||||
end
|
||||
|
||||
# Count new transactions in this chunk (adaptive stopping)
|
||||
post_chunk_tx_count = count_linked_transactions
|
||||
new_txns_in_chunk = [ post_chunk_tx_count - pre_chunk_tx_count, 0 ].max
|
||||
total_new_transactions += new_txns_in_chunk
|
||||
|
||||
Rails.logger.info "SimpleFin chunked sync: chunk #{chunk_count} added #{new_txns_in_chunk} new transactions"
|
||||
|
||||
# Adaptive stopping: if chunk returned no new transactions, increment counter
|
||||
if new_txns_in_chunk.zero?
|
||||
consecutive_empty_chunks += 1
|
||||
# Stop after 2 consecutive empty chunks (allow for gaps in bank data)
|
||||
if consecutive_empty_chunks >= 2
|
||||
Rails.logger.info "SimpleFin chunked sync: stopping early - #{consecutive_empty_chunks} consecutive empty chunks (SimpleFIN likely doesn't have more history)"
|
||||
stopped_early = true
|
||||
break
|
||||
end
|
||||
else
|
||||
consecutive_empty_chunks = 0
|
||||
end
|
||||
|
||||
# Stop if we've reached our target start date
|
||||
if chunk_start_date <= target_start_date
|
||||
Rails.logger.info "SimpleFin chunked sync: reached target start date, stopping"
|
||||
@@ -442,7 +470,23 @@ class SimplefinItem::Importer
|
||||
chunk_end_date = chunk_start_date
|
||||
end
|
||||
|
||||
Rails.logger.info "SimpleFin chunked sync completed: #{chunk_count} chunks processed, #{total_accounts_imported} account records imported"
|
||||
# Record chunked history stats for observability
|
||||
stats["chunked_history"] = {
|
||||
"chunks_processed" => chunk_count,
|
||||
"total_new_transactions" => total_new_transactions,
|
||||
"stopped_early" => stopped_early,
|
||||
"reason" => stopped_early ? "no_new_data" : (chunk_count >= max_requests ? "max_chunks" : "reached_target")
|
||||
}
|
||||
persist_stats!
|
||||
|
||||
Rails.logger.info "SimpleFin chunked sync completed: #{chunk_count} chunks processed, #{total_accounts_imported} account records, #{total_new_transactions} new transactions#{stopped_early ? " (stopped early)" : ""}"
|
||||
end
|
||||
|
||||
# Count total transactions in linked SimpleFIN accounts (for adaptive chunking)
|
||||
def count_linked_transactions
|
||||
simplefin_item.simplefin_accounts
|
||||
.select { |sfa| sfa.current_account.present? }
|
||||
.sum { |sfa| sfa.raw_transactions_payload.to_a.size }
|
||||
end
|
||||
|
||||
def import_regular_sync
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
class SimplefinItem::Syncer
|
||||
include SyncStats::Collector
|
||||
|
||||
attr_reader :simplefin_item
|
||||
|
||||
def initialize(simplefin_item)
|
||||
@@ -12,11 +14,7 @@ class SimplefinItem::Syncer
|
||||
begin
|
||||
# Check for linked accounts via BOTH legacy FK (accounts.simplefin_account_id) AND
|
||||
# the new AccountProvider system. An account is "linked" if either association exists.
|
||||
linked_via_legacy = simplefin_item.simplefin_accounts.joins(:account).count
|
||||
linked_via_provider = simplefin_item.simplefin_accounts.joins(:account_provider).count
|
||||
total_linked = simplefin_item.simplefin_accounts.select { |sfa| sfa.current_account.present? }.count
|
||||
|
||||
Rails.logger.info("SimplefinItem::Syncer - linked check: legacy=#{linked_via_legacy}, provider=#{linked_via_provider}, total=#{total_linked}")
|
||||
total_linked = simplefin_item.simplefin_accounts.count { |sfa| sfa.current_account.present? }
|
||||
|
||||
if total_linked == 0
|
||||
sync.update!(status_text: "Discovering accounts (balances only)...") if sync.respond_to?(:status_text)
|
||||
@@ -64,7 +62,12 @@ class SimplefinItem::Syncer
|
||||
linked_simplefin_accounts = simplefin_item.simplefin_accounts.select { |sfa| sfa.current_account.present? }
|
||||
if linked_simplefin_accounts.any?
|
||||
sync.update!(status_text: "Processing transactions and holdings...") if sync.respond_to?(:status_text)
|
||||
simplefin_item.process_accounts
|
||||
skipped_entries = simplefin_item.process_accounts
|
||||
|
||||
# Collect skip stats for protected entries (user-modified, import-locked, etc.)
|
||||
if skipped_entries.any?
|
||||
collect_skip_stats(sync, skipped_entries: skipped_entries)
|
||||
end
|
||||
|
||||
sync.update!(status_text: "Calculating balances...") if sync.respond_to?(:status_text)
|
||||
simplefin_item.schedule_account_syncs(
|
||||
|
||||
@@ -28,7 +28,8 @@ class TradeImport < Import
|
||||
amount: row.signed_amount,
|
||||
name: row.name,
|
||||
currency: row.currency.presence || mapped_account.currency,
|
||||
import: self
|
||||
import: self,
|
||||
import_locked: true # Protect from provider sync overwrites
|
||||
),
|
||||
)
|
||||
end
|
||||
|
||||
@@ -48,10 +48,12 @@ class TransactionImport < Import
|
||||
duplicate_entry.transaction.tags = tags if tags.any?
|
||||
duplicate_entry.notes = row.notes if row.notes.present?
|
||||
duplicate_entry.import = self
|
||||
duplicate_entry.import_locked = true # Protect from provider sync overwrites
|
||||
updated_entries << duplicate_entry
|
||||
claimed_entry_ids.add(duplicate_entry.id)
|
||||
else
|
||||
# Create new transaction (no duplicate found)
|
||||
# Mark as import_locked to protect from provider sync overwrites
|
||||
new_transactions << Transaction.new(
|
||||
category: category,
|
||||
tags: tags,
|
||||
@@ -62,7 +64,8 @@ class TransactionImport < Import
|
||||
name: row.name,
|
||||
currency: effective_currency,
|
||||
notes: row.notes,
|
||||
import: self
|
||||
import: self,
|
||||
import_locked: true
|
||||
)
|
||||
)
|
||||
end
|
||||
|
||||
@@ -15,12 +15,22 @@ en:
|
||||
imported: "Imported: %{count}"
|
||||
updated: "Updated: %{count}"
|
||||
skipped: "Skipped: %{count}"
|
||||
protected:
|
||||
one: "%{count} entry protected (not overwritten)"
|
||||
other: "%{count} entries protected (not overwritten)"
|
||||
view_protected: View protected entries
|
||||
skip_reasons:
|
||||
excluded: Excluded
|
||||
user_modified: User modified
|
||||
import_locked: CSV import
|
||||
protected: Protected
|
||||
holdings:
|
||||
title: Holdings
|
||||
found: "Found: %{count}"
|
||||
processed: "Processed: %{count}"
|
||||
health:
|
||||
title: Health
|
||||
view_error_details: View error details
|
||||
rate_limited: "Rate limited %{time_ago}"
|
||||
recently: recently
|
||||
errors: "Errors: %{count}"
|
||||
|
||||
17
db/migrate/20260115100000_add_entry_protection_flags.rb
Normal file
17
db/migrate/20260115100000_add_entry_protection_flags.rb
Normal file
@@ -0,0 +1,17 @@
|
||||
class AddEntryProtectionFlags < ActiveRecord::Migration[7.2]
|
||||
def change
|
||||
# user_modified: Set when user manually edits any field on an entry.
|
||||
# Prevents provider sync from overwriting user's intentional changes.
|
||||
# Does NOT prevent user from editing - only protects from automated overwrites.
|
||||
add_column :entries, :user_modified, :boolean, default: false, null: false
|
||||
|
||||
# import_locked: Set when entry is created via CSV/manual import.
|
||||
# Prevents provider sync from overwriting imported data.
|
||||
# Does NOT prevent user from editing - only protects from automated overwrites.
|
||||
add_column :entries, :import_locked, :boolean, default: false, null: false
|
||||
|
||||
# Partial indexes for efficient queries when filtering protected entries
|
||||
add_index :entries, :user_modified, where: "user_modified = true", name: "index_entries_on_user_modified_true"
|
||||
add_index :entries, :import_locked, where: "import_locked = true", name: "index_entries_on_import_locked_true"
|
||||
end
|
||||
end
|
||||
34
db/migrate/20260115100001_backfill_entry_protection_flags.rb
Normal file
34
db/migrate/20260115100001_backfill_entry_protection_flags.rb
Normal file
@@ -0,0 +1,34 @@
|
||||
class BackfillEntryProtectionFlags < ActiveRecord::Migration[7.2]
|
||||
disable_ddl_transaction!
|
||||
|
||||
def up
|
||||
# Backfill import_locked for entries that came from CSV/manual imports
|
||||
# These entries have import_id set but typically no external_id or source
|
||||
say_with_time "Marking CSV-imported entries as import_locked" do
|
||||
execute <<-SQL.squish
|
||||
UPDATE entries
|
||||
SET import_locked = true
|
||||
WHERE import_id IS NOT NULL
|
||||
AND import_locked = false
|
||||
SQL
|
||||
end
|
||||
|
||||
# Backfill user_modified for entries where user has manually edited fields
|
||||
# These entries have non-empty locked_attributes (set when user edits)
|
||||
say_with_time "Marking user-edited entries as user_modified" do
|
||||
execute <<-SQL.squish
|
||||
UPDATE entries
|
||||
SET user_modified = true
|
||||
WHERE locked_attributes != '{}'::jsonb
|
||||
AND locked_attributes IS NOT NULL
|
||||
AND user_modified = false
|
||||
SQL
|
||||
end
|
||||
end
|
||||
|
||||
def down
|
||||
# Reversible but generally not needed
|
||||
execute "UPDATE entries SET import_locked = false WHERE import_locked = true"
|
||||
execute "UPDATE entries SET user_modified = false WHERE user_modified = true"
|
||||
end
|
||||
end
|
||||
6
db/schema.rb
generated
6
db/schema.rb
generated
@@ -10,7 +10,7 @@
|
||||
#
|
||||
# It's strongly recommended that you check this file into your version control system.
|
||||
|
||||
ActiveRecord::Schema[7.2].define(version: 2026_01_12_065106) do
|
||||
ActiveRecord::Schema[7.2].define(version: 2026_01_15_100001) do
|
||||
# These are extensions that must be enabled in order to support this database
|
||||
enable_extension "pgcrypto"
|
||||
enable_extension "plpgsql"
|
||||
@@ -342,6 +342,8 @@ ActiveRecord::Schema[7.2].define(version: 2026_01_12_065106) do
|
||||
t.jsonb "locked_attributes", default: {}
|
||||
t.string "external_id"
|
||||
t.string "source"
|
||||
t.boolean "user_modified", default: false, null: false
|
||||
t.boolean "import_locked", default: false, null: false
|
||||
t.index "lower((name)::text)", name: "index_entries_on_lower_name"
|
||||
t.index ["account_id", "date"], name: "index_entries_on_account_id_and_date"
|
||||
t.index ["account_id", "source", "external_id"], name: "index_entries_on_account_source_and_external_id", unique: true, where: "((external_id IS NOT NULL) AND (source IS NOT NULL))"
|
||||
@@ -349,6 +351,8 @@ ActiveRecord::Schema[7.2].define(version: 2026_01_12_065106) do
|
||||
t.index ["date"], name: "index_entries_on_date"
|
||||
t.index ["entryable_type"], name: "index_entries_on_entryable_type"
|
||||
t.index ["import_id"], name: "index_entries_on_import_id"
|
||||
t.index ["import_locked"], name: "index_entries_on_import_locked_true", where: "(import_locked = true)"
|
||||
t.index ["user_modified"], name: "index_entries_on_user_modified_true", where: "(user_modified = true)"
|
||||
end
|
||||
|
||||
create_table "eval_datasets", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
|
||||
|
||||
Reference in New Issue
Block a user