mirror of
https://github.com/we-promise/sure.git
synced 2026-05-28 15:04:57 +00:00
* fix(jobs): delegate recurring-transaction sync gate to Sync.for_family `IdentifyRecurringTransactionsJob#family_has_incomplete_syncs?` hand-rolled the list of provider `*_items` associations it polled — plaid, simplefin, lunchflow, enable_banking, sophtron — missing nine other `Syncable` provider concerns on `Family`: coinbase, binance, kraken, coinstats, snaptrade, mercury, brex, indexa_capital, ibkr. When a sync on any of those nine was in flight, the debounce gate fell through and `RecurringTransaction::Identifier` ran against a partial dataset; the follow-up re-enqueue then hit the `find_or_initialize_by` upsert path and inherited the stale `occurrence_count`. Same drift pattern that bolted sophtron on as the 5th entry (#591) was already an iteration of. The maintainers' own `Sync.for_family` (sync.rb:61) already enumerates every `*_items` association via `Family.reflect_on_all_associations(:has_many)` filtered by inclusion of `Syncable` — exactly the helper the gate should delegate to so the list cannot drift again. - Add `Sync.any_incomplete_for?(family)` class method that wraps `for_family(family).incomplete.exists?`. - Rewrite `family_has_incomplete_syncs?` to delegate. 14 lines → 1. - New test file `test/jobs/identify_recurring_transactions_job_test.rb` covers in-flight Coinbase + Mercury (gate fires), idle (identifier runs), missing family, and superseded-by-newer-schedule. - `test/models/sync_test.rb` gets 2 new tests pinning `any_incomplete_for?` against a provider `_items` sync and a family-itself sync. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(jobs): stub Rails.cache.read for supersession test (NullStore in test env) `Rails.cache` is `ActiveSupport::Cache::NullStore` in the Rails test env, so the previous test's `Rails.cache.write(cache_key, @scheduled_at + 10, ...)` was a no-op and `Rails.cache.read(cache_key)` returned `nil`. The supersession short-circuit `return if latest_scheduled && latest_scheduled > scheduled_at` then fell through, the job proceeded to invoke `RecurringTransaction::Identifier`, and the Mocha `.expects(:identify_recurring_patterns).never` failed in CI. Switch to `Rails.cache.stubs(:read).with(cache_key).returns(...)` — the same idiom `test/models/provider/twelve_data_test.rb:186-197` already uses for the cache layer. Add an `assert_nil` on the bare `perform` return so Minitest's assertion counter sees an explicit assertion (silences the "missing assertions" warning). No production-code change. Behavior under test is unchanged; only the test mechanism for simulating "newer scheduled run already in cache" is fixed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
274 lines
7.7 KiB
Ruby
274 lines
7.7 KiB
Ruby
class Sync < ApplicationRecord
|
|
# We run a cron that marks any syncs that have not been resolved in 24 hours as "stale"
|
|
# Syncs often become stale when new code is deployed and the worker restarts
|
|
STALE_AFTER = 24.hours
|
|
|
|
# The max time that a sync will show in the UI (after 5 minutes)
|
|
VISIBLE_FOR = 5.minutes
|
|
|
|
include AASM
|
|
|
|
Error = Class.new(StandardError)
|
|
|
|
belongs_to :syncable, polymorphic: true
|
|
|
|
belongs_to :parent, class_name: "Sync", optional: true
|
|
has_many :children, class_name: "Sync", foreign_key: :parent_id, dependent: :destroy
|
|
|
|
scope :ordered, -> { order(created_at: :desc, id: :desc) }
|
|
scope :incomplete, -> { where("syncs.status IN (?)", %w[pending syncing]) }
|
|
scope :visible, -> { incomplete.where("syncs.created_at > ?", VISIBLE_FOR.ago) }
|
|
|
|
after_commit :update_family_sync_timestamp
|
|
|
|
serialize :sync_stats, coder: JSON
|
|
|
|
validate :window_valid
|
|
|
|
# Sync state machine
|
|
aasm column: :status, timestamps: true do
|
|
state :pending, initial: true
|
|
state :syncing
|
|
state :completed
|
|
state :failed
|
|
state :stale
|
|
|
|
after_all_transitions :handle_transition
|
|
|
|
event :start, after_commit: :handle_start_transition do
|
|
transitions from: :pending, to: :syncing
|
|
end
|
|
|
|
event :complete, after_commit: :handle_completion_transition do
|
|
transitions from: :syncing, to: :completed
|
|
end
|
|
|
|
event :fail do
|
|
transitions from: :syncing, to: :failed
|
|
end
|
|
|
|
# Marks a sync that never completed within the expected time window
|
|
event :mark_stale do
|
|
transitions from: %i[pending syncing], to: :stale
|
|
end
|
|
end
|
|
|
|
class << self
|
|
def clean
|
|
incomplete.where("syncs.created_at < ?", STALE_AFTER.ago).find_each(&:mark_stale!)
|
|
end
|
|
|
|
def for_family(family, resource_owner: nil)
|
|
query = where(syncable_type: "Family", syncable_id: family.id)
|
|
query = query.or(where(syncable_type: "Account", syncable_id: account_syncable_ids(family, resource_owner)))
|
|
|
|
family_syncable_associations.each do |association|
|
|
query = query.or(
|
|
where(syncable_type: association.klass.name, syncable_id: family.public_send(association.name).select(:id))
|
|
)
|
|
end
|
|
|
|
query
|
|
end
|
|
|
|
# True iff the family has any pending/syncing Sync — across its own row,
|
|
# its accounts, and every Syncable provider `*_items` association. Built
|
|
# on `for_family` so new provider integrations are picked up automatically
|
|
# via `family_syncable_associations` reflection (no hand-rolled list).
|
|
def any_incomplete_for?(family)
|
|
for_family(family).incomplete.exists?
|
|
end
|
|
|
|
private
|
|
def account_syncable_ids(family, resource_owner)
|
|
(resource_owner ? resource_owner.accessible_accounts : family.accounts)
|
|
.where(family_id: family.id)
|
|
.select(:id)
|
|
end
|
|
|
|
def family_syncable_associations
|
|
Family.reflect_on_all_associations(:has_many).select do |association|
|
|
association.name.to_s.end_with?("_items") &&
|
|
association.klass.included_modules.include?(Syncable)
|
|
rescue NameError
|
|
false
|
|
end
|
|
end
|
|
end
|
|
|
|
def in_progress?
|
|
pending? || syncing?
|
|
end
|
|
|
|
def terminal?
|
|
completed? || failed? || stale?
|
|
end
|
|
|
|
def api_error_payload
|
|
return unless failed? || stale?
|
|
return if stale? && error.blank?
|
|
|
|
{
|
|
message: stale? ? "Sync became stale before completion" : "Sync failed"
|
|
}
|
|
end
|
|
|
|
def perform
|
|
Rails.logger.tagged("Sync", id, syncable_type, syncable_id) do
|
|
# This can happen on server restarts or if Sidekiq enqueues a duplicate job
|
|
unless may_start?
|
|
Rails.logger.warn("Sync #{id} is not in a valid state (#{aasm.from_state}) to start. Skipping sync.")
|
|
return
|
|
end
|
|
|
|
# Guard: syncable may have been deleted while job was queued
|
|
unless syncable.present?
|
|
Rails.logger.warn("Sync #{id} - syncable #{syncable_type}##{syncable_id} no longer exists. Marking as failed.")
|
|
start! if may_start?
|
|
fail!
|
|
update(error: "Syncable record was deleted")
|
|
return
|
|
end
|
|
|
|
# Guard: syncable may be scheduled for deletion
|
|
if syncable.respond_to?(:scheduled_for_deletion?) && syncable.scheduled_for_deletion?
|
|
Rails.logger.warn("Sync #{id} - syncable #{syncable_type}##{syncable_id} is scheduled for deletion. Skipping sync.")
|
|
start! if may_start?
|
|
fail!
|
|
update(error: "Syncable record is scheduled for deletion")
|
|
return
|
|
end
|
|
|
|
start!
|
|
|
|
begin
|
|
syncable.perform_sync(self)
|
|
rescue => e
|
|
fail!
|
|
update(error: e.message)
|
|
report_error(e)
|
|
ensure
|
|
finalize_if_all_children_finalized
|
|
end
|
|
end
|
|
end
|
|
|
|
# Finalizes the current sync AND parent (if it exists)
|
|
def finalize_if_all_children_finalized
|
|
Sync.transaction do
|
|
lock!
|
|
|
|
# If this is the "parent" and there are still children running, don't finalize.
|
|
return unless all_children_finalized?
|
|
|
|
if syncing?
|
|
if has_failed_children?
|
|
fail!
|
|
else
|
|
complete!
|
|
end
|
|
end
|
|
|
|
# If we make it here, the sync is finalized. Run post-sync, regardless of failure/success.
|
|
perform_post_sync
|
|
end
|
|
|
|
# If this sync has a parent, try to finalize it so the child status propagates up the chain.
|
|
parent&.finalize_if_all_children_finalized
|
|
end
|
|
|
|
# If a sync is pending, we can adjust the window if new syncs are created with a wider window.
|
|
def expand_window_if_needed(new_window_start_date, new_window_end_date)
|
|
return unless pending?
|
|
return if self.window_start_date.nil? && self.window_end_date.nil? # already as wide as possible
|
|
|
|
earliest_start_date = if self.window_start_date && new_window_start_date
|
|
[ self.window_start_date, new_window_start_date ].min
|
|
else
|
|
nil
|
|
end
|
|
|
|
latest_end_date = if self.window_end_date && new_window_end_date
|
|
[ self.window_end_date, new_window_end_date ].max
|
|
else
|
|
nil
|
|
end
|
|
|
|
update(
|
|
window_start_date: earliest_start_date,
|
|
window_end_date: latest_end_date
|
|
)
|
|
end
|
|
|
|
private
|
|
def log_status_change
|
|
Rails.logger.info("changing from #{aasm.from_state} to #{aasm.to_state} (event: #{aasm.current_event})")
|
|
end
|
|
|
|
def has_failed_children?
|
|
children.failed.any?
|
|
end
|
|
|
|
def all_children_finalized?
|
|
children.incomplete.empty?
|
|
end
|
|
|
|
def perform_post_sync
|
|
Rails.logger.info("Performing post-sync for #{syncable_type} (#{syncable.id})")
|
|
syncable.perform_post_sync
|
|
syncable.broadcast_sync_complete
|
|
rescue => e
|
|
Rails.logger.error("Error performing post-sync for #{syncable_type} (#{syncable.id}): #{e.message}")
|
|
report_error(e)
|
|
end
|
|
|
|
def report_error(error)
|
|
Sentry.capture_exception(error) do |scope|
|
|
scope.set_tags(sync_id: id)
|
|
end
|
|
end
|
|
|
|
def report_warnings
|
|
todays_sync_count = syncable.syncs.where(created_at: Date.current.all_day).count
|
|
|
|
if todays_sync_count > 10
|
|
Sentry.capture_exception(
|
|
Error.new("#{syncable_type} (#{syncable.id}) has exceeded 10 syncs today (count: #{todays_sync_count})"),
|
|
level: :warning
|
|
)
|
|
end
|
|
end
|
|
|
|
def handle_start_transition
|
|
report_warnings
|
|
end
|
|
|
|
def handle_transition
|
|
log_status_change
|
|
end
|
|
|
|
def handle_completion_transition
|
|
family.touch(:latest_sync_completed_at)
|
|
end
|
|
|
|
def window_valid
|
|
if window_start_date && window_end_date && window_start_date > window_end_date
|
|
errors.add(:window_end_date, "must be greater than window_start_date")
|
|
end
|
|
end
|
|
|
|
def update_family_sync_timestamp
|
|
return unless family.persisted?
|
|
|
|
family.touch(:latest_sync_activity_at)
|
|
end
|
|
|
|
def family
|
|
if syncable.is_a?(Family)
|
|
syncable
|
|
else
|
|
syncable.family
|
|
end
|
|
end
|
|
end
|