diff --git a/.rubocop.yml b/.rubocop.yml index 33542a43c..d313b5ab5 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -1,6 +1,6 @@ inherit_gem: rubocop-rails-omakase: rubocop.yml - + Layout/IndentationWidth: Enabled: true diff --git a/app/controllers/api/v1/import_sessions_controller.rb b/app/controllers/api/v1/import_sessions_controller.rb new file mode 100644 index 000000000..f749124d7 --- /dev/null +++ b/app/controllers/api/v1/import_sessions_controller.rb @@ -0,0 +1,195 @@ +# frozen_string_literal: true + +class Api::V1::ImportSessionsController < Api::V1::BaseController + before_action :ensure_read_scope, only: [ :show ] + before_action :ensure_write_scope, only: [ :create, :create_chunk, :publish ] + before_action :set_import_session, only: [ :show, :create_chunk, :publish ] + + def create + @import_session = ImportSession.create_or_find_for!( + family: Current.family, + import_type: params[:type].to_s, + client_session_id: params[:client_session_id].presence, + expected_chunks: expected_chunks_param + ) + + render_import_session(status: :created) + rescue ImportSession::ConflictError => e + render_import_session_conflict(e.message) + rescue ActiveRecord::RecordInvalid => e + render_error( + "validation_failed", + "Import session could not be created", + :unprocessable_entity, + errors: e.record.errors.full_messages + ) + end + + def show + render_import_session + end + + def create_chunk + content, filename, content_type = sure_import_upload_attributes + return unless content + + @import_session.attach_chunk!( + sequence: sequence_param, + client_chunk_id: params[:client_chunk_id].presence, + content: content, + filename: filename, + content_type: content_type + ) + + @import_session.reload + render_import_session(status: :created) + rescue ImportSession::ConflictError => e + render_import_session_conflict(e.message) + rescue ActiveRecord::RecordInvalid => e + render_error( + "validation_failed", + "Import chunk could not be created", + :unprocessable_entity, + errors: e.record.errors.full_messages + ) + end + + def publish + @import_session.publish_later + @import_session.reload + render_import_session(status: :accepted) + rescue Import::MaxRowCountExceededError + render_error("max_row_count_exceeded", "Import session has too many rows to publish.", :unprocessable_entity) + rescue ImportSession::EnqueueError + render_error("import_enqueue_failed", "Import session could not be queued.", :service_unavailable) + rescue ImportSession::ConflictError => e + render_import_session_conflict(e.message) + end + + private + def set_import_session + @import_session = Current.family.import_sessions.find(params[:id]) + end + + def ensure_read_scope + authorize_scope!(:read) + end + + def ensure_write_scope + authorize_scope!(:write) + end + + def expected_chunks_param + return if params[:expected_chunks].blank? + + params[:expected_chunks] + end + + def sequence_param + raise ActionController::ParameterMissing.new(:sequence) if params[:sequence].blank? + + params[:sequence] + end + + def sure_import_upload_attributes + if params[:file].present? + sure_import_file_upload_attributes(params[:file]) + elsif params[:raw_file_content].present? + sure_import_raw_content_attributes(params[:raw_file_content].to_s) + else + render_error("missing_content", "Provide a Sure NDJSON file or raw_file_content.", :unprocessable_entity) + nil + end + end + + def sure_import_file_upload_attributes(file) + if file.size > SureImport.max_ndjson_size + render_error( + "file_too_large", + "File is too large. Maximum size is #{SureImport.max_ndjson_size / 1.megabyte}MB.", + :unprocessable_entity + ) + return + end + + extension = File.extname(file.original_filename.to_s).downcase + unless SureImport::ALLOWED_NDJSON_CONTENT_TYPES.include?(file.content_type) || extension.in?(%w[.ndjson .json]) + render_error("invalid_file_type", "Invalid file type. Please upload a Sure NDJSON file.", :unprocessable_entity) + return + end + + sure_import_validated_attributes( + content: file.read, + filename: file.original_filename.presence || "sure-import.ndjson", + content_type: file.content_type.presence || "application/x-ndjson" + ) + end + + def sure_import_raw_content_attributes(content) + if content.bytesize > SureImport.max_ndjson_size + render_error( + "content_too_large", + "Content is too large. Maximum size is #{SureImport.max_ndjson_size / 1.megabyte}MB.", + :unprocessable_entity + ) + return + end + + sure_import_validated_attributes( + content: content, + filename: "sure-import.ndjson", + content_type: "application/x-ndjson" + ) + end + + def sure_import_validated_attributes(content:, filename:, content_type:) + unless SureImport.valid_ndjson_first_line?(content) + render_error("invalid_ndjson", "Invalid Sure NDJSON content.", :unprocessable_entity) + return + end + + [ content, filename, content_type ] + end + + def render_import_session_conflict(message) + render_error("import_session_conflict", message, :conflict) + end + + def render_import_session(status: :ok) + chunks = @import_session.imports.ordered_by_sequence.map do |import| + { + id: import.id, + sequence: import.sequence, + client_chunk_id: import.client_chunk_id, + status: import.status, + rows_count: import.rows_count, + summary: import.summary || {}, + error: import.error_details.presence, + created_at: import.created_at, + updated_at: import.updated_at + } + end + + render json: { + data: { + id: @import_session.id, + type: @import_session.import_type, + status: @import_session.status, + client_session_id: @import_session.client_session_id, + expected_chunks: @import_session.expected_chunks, + chunks_count: chunks.size, + summary: @import_session.summary || {}, + error: @import_session.error_details.presence, + created_at: @import_session.created_at, + updated_at: @import_session.updated_at, + chunks: chunks + } + }, status: status + end + + def render_error(error, message, status, errors: nil) + payload = { error: error, message: message } + payload[:errors] = errors if errors + render json: payload, status: status + end +end diff --git a/app/jobs/import_session_job.rb b/app/jobs/import_session_job.rb new file mode 100644 index 000000000..de7f0e377 --- /dev/null +++ b/app/jobs/import_session_job.rb @@ -0,0 +1,12 @@ +class ImportSessionJob < ApplicationJob + queue_as :high_priority + + def perform(import_session) + raise ArgumentError, "ImportSessionJob requires an import_session" if import_session.nil? + + Rails.logger.info("ImportSessionJob started import_session_id=#{import_session.id}") + import_session.publish + import_session.reload + Rails.logger.info("ImportSessionJob finished import_session_id=#{import_session.id} status=#{import_session.status}") + end +end diff --git a/app/models/family.rb b/app/models/family.rb index d4c6cf1e4..c5f8f2252 100644 --- a/app/models/family.rb +++ b/app/models/family.rb @@ -27,6 +27,8 @@ class Family < ApplicationRecord has_many :invitations, dependent: :destroy has_many :imports, dependent: :destroy + has_many :import_sessions, dependent: :destroy + has_many :import_source_mappings, dependent: :destroy has_many :family_exports, dependent: :destroy has_many :account_statements, dependent: :destroy diff --git a/app/models/family/data_exporter.rb b/app/models/family/data_exporter.rb index 404218056..20e08a70d 100644 --- a/app/models/family/data_exporter.rb +++ b/app/models/family/data_exporter.rb @@ -545,6 +545,7 @@ class Family::DataExporter def serialize_rule_for_export(rule) { + id: rule.id, name: rule.name, resource_type: rule.resource_type, active: rule.active, diff --git a/app/models/family/data_importer.rb b/app/models/family/data_importer.rb index 4de55a9bc..7a68419c5 100644 --- a/app/models/family/data_importer.rb +++ b/app/models/family/data_importer.rb @@ -1,6 +1,36 @@ require "set" class Family::DataImporter + MissingReferenceError = Class.new(StandardError) do + attr_reader :code, :details + + def initialize(record_type:, source_type:, source_id:) + @code = "missing_source_reference" + @details = { + record_type: record_type, + source_type: source_type, + source_id: source_id + } + + super("#{record_type} references missing #{source_type} source id #{source_id}") + end + end + + InvalidRecordError = Class.new(StandardError) do + attr_reader :code, :details + + def initialize(record_type:, field:, value:) + @code = "invalid_import_record" + @details = { + record_type: record_type, + field: field, + value: value + } + + super("#{record_type} has invalid #{field}: #{value.inspect}") + end + end + SUPPORTED_TYPES = %w[Account Balance Category Tag Merchant RecurringTransaction Transaction Transfer RejectedTransfer Trade Holding Valuation Budget BudgetCategory Rule].freeze ACCOUNTABLE_TYPE_CLASSES = { "Depository" => Depository, "Investment" => Investment, "Crypto" => Crypto, @@ -12,9 +42,41 @@ class Family::DataImporter ACCOUNTABLE_TYPE_CLASSES[type.to_s] end - def initialize(family, ndjson_content) + MAPPING_TYPES = { + accounts: "Account", + categories: "Category", + tags: "Tag", + merchants: "Merchant", + recurring_transactions: "RecurringTransaction", + transactions: "Transaction", + budgets: "Budget", + securities: "Security", + rules: "Rule" + }.freeze + SUMMARY_KEYS = { + "Account" => "accounts", + "Balance" => "balances", + "Category" => "categories", + "Tag" => "tags", + "Merchant" => "merchants", + "RecurringTransaction" => "recurring_transactions", + "Transaction" => "transactions", + "Transfer" => "transfers", + "RejectedTransfer" => "rejected_transfers", + "Trade" => "trades", + "Holding" => "holdings", + "Valuation" => "valuations", + "Budget" => "budgets", + "BudgetCategory" => "budget_categories", + "Rule" => "rules" + }.freeze + + def initialize(family, ndjson_content, import_session: nil, import: nil) @family = family @ndjson_content = ndjson_content + @import_session = import_session + @import = import + @strict_references = import_session.present? @id_mappings = { accounts: {}, categories: {}, @@ -23,11 +85,13 @@ class Family::DataImporter recurring_transactions: {}, transactions: {}, budgets: {}, - securities: {} + securities: {}, + rules: {} } @security_cache = {} @created_accounts = [] @created_entries = [] + @summary = Hash.new { |hash, key| hash[key] = empty_summary_bucket } end def import! @@ -54,7 +118,7 @@ class Family::DataImporter import_rules(records["Rule"] || []) end - { accounts: @created_accounts, entries: @created_entries } + { accounts: @created_accounts, entries: @created_entries, summary: compact_summary } end private @@ -79,6 +143,128 @@ class Family::DataImporter records end + def empty_summary_bucket + { "created" => 0, "updated" => 0, "skipped" => 0, "failed" => 0 } + end + + def compact_summary + @summary.select { |_entity_type, counts| counts.values.any?(&:positive?) } + end + + def increment_summary(record_type, status) + @summary[SUMMARY_KEYS.fetch(record_type)].tap do |counts| + counts[status.to_s] = counts.fetch(status.to_s, 0) + 1 + end + end + + def map_source!(mapping_key, source_id, target) + return if source_id.blank? || target.blank? + + @id_mappings[mapping_key][source_id] = target.id + return unless @import_session + + source_type = MAPPING_TYPES.fetch(mapping_key) + mapping = @import_session.source_mappings.find_or_initialize_by( + family: @family, + source_type: source_type, + source_id: source_id + ) + mapping.target = target + mapping.save! + end + + def mapped_id(mapping_key, old_id, record_type:, required: true) + if old_id.blank? + missing_reference(record_type, mapping_key, "(blank)") if required + return + end + + return @id_mappings[mapping_key][old_id] if @id_mappings[mapping_key].key?(old_id) + + source_type = MAPPING_TYPES.fetch(mapping_key) + mapping = @import_session&.source_mappings&.find_by( + family: @family, + source_type: source_type, + source_id: old_id + ) + + if mapping + @id_mappings[mapping_key][old_id] = mapping.target_id + return mapping.target_id + end + + if required && @strict_references + raise MissingReferenceError.new( + record_type: record_type, + source_type: source_type, + source_id: old_id + ) + end + + nil + end + + def mapped_record(mapping_key, old_id, scope, record_type:) + target_id = mapped_id(mapping_key, old_id, record_type: record_type, required: false) + return if target_id.blank? + + scope.find_by(id: target_id) + end + + def missing_reference(record_type, mapping_key, old_id) + if @strict_references + increment_summary(record_type, :failed) + raise MissingReferenceError.new( + record_type: record_type, + source_type: MAPPING_TYPES.fetch(mapping_key), + source_id: old_id + ) + end + + increment_summary(record_type, :skipped) + nil + end + + def require_source_id!(record_type, source_id) + return if source_id.present? || !@strict_references + + increment_summary(record_type, :failed) + raise MissingReferenceError.new( + record_type: record_type, + source_type: record_type, + source_id: "(blank)" + ) + end + + def invalid_record!(record_type, field, value) + if @strict_references + increment_summary(record_type, :failed) + raise InvalidRecordError.new(record_type: record_type, field: field, value: value) + end + + increment_summary(record_type, :skipped) + nil + end + + def session_entry_source + return unless @import_session + + "sure_import_session:#{@import_session.id}" + end + + def session_entry_external_id(record_type, source_id) + return if @import_session.blank? || source_id.blank? + + "#{record_type}:#{source_id}" + end + + def session_imported_entry(account, record_type, source_id) + external_id = session_entry_external_id(record_type, source_id) + return if external_id.blank? + + account.entries.find_by(source: session_entry_source, external_id: external_id) + end + def import_accounts(records) records.each do |record| data = record["data"] @@ -86,26 +272,41 @@ class Family::DataImporter accountable_data = data["accountable"] || {} accountable_type = data["accountable_type"] + require_source_id!("Account", old_id) + accountable_class = self.class.accountable_class_for(accountable_type) - next unless accountable_class - accountable = accountable_class.new - accountable.subtype = accountable_data["subtype"] if accountable.respond_to?(:subtype=) && accountable_data["subtype"] - - # Copy any other accountable attributes - safe_accountable_attrs = %w[subtype locked_attributes] - safe_accountable_attrs.each do |attr| - if accountable.respond_to?("#{attr}=") && accountable_data[attr].present? - accountable.send("#{attr}=", accountable_data[attr]) - end + unless accountable_class + invalid_record!("Account", "accountable_type", accountable_type) + next end - account = @family.accounts.build( + account = mapped_record(:accounts, old_id, @family.accounts, record_type: "Account") + created = account.blank? + + if account + accountable = account.accountable + else + # Build accountable + accountable = accountable_class.new + accountable.subtype = accountable_data["subtype"] if accountable.respond_to?(:subtype=) && accountable_data["subtype"] + + # Copy any other accountable attributes + safe_accountable_attrs = %w[subtype locked_attributes] + safe_accountable_attrs.each do |attr| + if accountable.respond_to?("#{attr}=") && accountable_data[attr].present? + accountable.send("#{attr}=", accountable_data[attr]) + end + end + + account = @family.accounts.build(accountable: accountable) + end + + account.assign_attributes( name: data["name"], balance: data["balance"].to_d, cash_balance: data["cash_balance"]&.to_d || data["balance"].to_d, currency: data["currency"] || @family.currency, - accountable: accountable, subtype: data["subtype"], institution_name: data["institution_name"], institution_domain: data["institution_domain"], @@ -118,7 +319,7 @@ class Family::DataImporter # Set opening balance if we have a historical balance and the import # does not provide either an explicit opening-anchor valuation or an # authoritative balance-history stream for this account. - if data["balance"].present? && !skip_opening_balance_import?(old_id, data) + if created && data["balance"].present? && !skip_opening_balance_import?(old_id, data) manager = Account::OpeningBalanceManager.new(account) result = manager.set_opening_balance( balance: data["balance"].to_d, @@ -127,8 +328,9 @@ class Family::DataImporter log_failed_opening_balance_import(account, old_id, result) unless result.success? end - @id_mappings[:accounts][old_id] = account.id - @created_accounts << account + map_source!(:accounts, old_id, account) + @created_accounts << account if created + increment_summary("Account", created ? :created : :updated) end end @@ -139,16 +341,23 @@ class Family::DataImporter def import_balances(records) records.each do |record| data = record["data"] || {} - new_account_id = @id_mappings[:accounts][data["account_id"]] + new_account_id = mapped_id(:accounts, data["account_id"], record_type: "Balance") balance_date = parse_import_date(data["date"]) - next if new_account_id.blank? || balance_date.blank? || data["balance"].blank? + next if new_account_id.blank? + + if balance_date.blank? || data["balance"].blank? + increment_summary("Balance", :skipped) + next + end account = @family.accounts.find(new_account_id) currency = data["currency"].presence || account.currency balance = account.balances.find_or_initialize_by(date: balance_date, currency: currency) + created = balance.new_record? balance.assign_attributes(imported_balance_attributes(data)) balance.save! + increment_summary("Balance", created ? :created : :updated) end end @@ -188,24 +397,30 @@ class Family::DataImporter old_id = data["id"] parent_id = data["parent_id"] + require_source_id!("Category", old_id) + # Store parent relationship for second pass parent_mappings[old_id] = parent_id if parent_id.present? - category = @family.categories.build( + category = mapped_record(:categories, old_id, @family.categories, record_type: "Category") + created = category.blank? + category ||= @family.categories.build + + category.assign_attributes( name: data["name"], color: data["color"] || Category::UNCATEGORIZED_COLOR, classification_unused: data["classification_unused"] || data["classification"] || "expense", lucide_icon: data["lucide_icon"] || "shapes" ) category.save! - - @id_mappings[:categories][old_id] = category.id + map_source!(:categories, old_id, category) + increment_summary("Category", created ? :created : :updated) end # Second pass: establish parent relationships parent_mappings.each do |old_id, old_parent_id| - new_id = @id_mappings[:categories][old_id] - new_parent_id = @id_mappings[:categories][old_parent_id] + new_id = mapped_id(:categories, old_id, record_type: "Category") + new_parent_id = mapped_id(:categories, old_parent_id, record_type: "Category") next unless new_id && new_parent_id @@ -219,13 +434,22 @@ class Family::DataImporter data = record["data"] old_id = data["id"] - tag = @family.tags.build( + require_source_id!("Tag", old_id) + + tag = mapped_record(:tags, old_id, @family.tags, record_type: "Tag") + created = tag.blank? + tag ||= @family.tags.build + color = data["color"] || tag.color + # Keep replayed session imports deterministic when the source omits a color. + color ||= Tag::COLORS.first if created + + tag.assign_attributes( name: data["name"], - color: data["color"] || Tag::COLORS.sample + color: color ) tag.save! - - @id_mappings[:tags][old_id] = tag.id + map_source!(:tags, old_id, tag) + increment_summary("Tag", created ? :created : :updated) end end @@ -234,14 +458,20 @@ class Family::DataImporter data = record["data"] old_id = data["id"] - merchant = @family.merchants.build( + require_source_id!("Merchant", old_id) + + merchant = mapped_record(:merchants, old_id, @family.merchants, record_type: "Merchant") + created = merchant.blank? + merchant ||= @family.merchants.build + + merchant.assign_attributes( name: data["name"], color: data["color"], logo_url: data["logo_url"] ) merchant.save! - - @id_mappings[:merchants][old_id] = merchant.id + map_source!(:merchants, old_id, merchant) + increment_summary("Merchant", created ? :created : :updated) end end @@ -250,10 +480,20 @@ class Family::DataImporter data = record["data"] old_id = data["id"] - new_account_id = remap_optional_id(:accounts, data["account_id"]) + require_source_id!("RecurringTransaction", old_id) + + recurring_transaction = mapped_record( + :recurring_transactions, + old_id, + @family.recurring_transactions, + record_type: "RecurringTransaction" + ) + created = recurring_transaction.blank? + + new_account_id = remap_optional_id(:accounts, data["account_id"], record_type: "RecurringTransaction") next if data["account_id"].present? && new_account_id.blank? - new_merchant_id = remap_optional_id(:merchants, data["merchant_id"]) + new_merchant_id = remap_optional_id(:merchants, data["merchant_id"], record_type: "RecurringTransaction") next if data["merchant_id"].present? && new_merchant_id.blank? expected_day_of_month = recurring_expected_day_for(data["expected_day_of_month"]) @@ -262,7 +502,8 @@ class Family::DataImporter next_expected_date = parse_import_date(data["next_expected_date"]) next unless last_occurrence_date && next_expected_date - recurring_transaction = @family.recurring_transactions.build( + recurring_transaction ||= @family.recurring_transactions.build + recurring_transaction.assign_attributes( account_id: new_account_id, merchant_id: new_merchant_id, amount: data["amount"].to_d, @@ -280,14 +521,15 @@ class Family::DataImporter ) recurring_transaction.save! - @id_mappings[:recurring_transactions][old_id] = recurring_transaction.id + map_source!(:recurring_transactions, old_id, recurring_transaction) + increment_summary("RecurringTransaction", created ? :created : :updated) end end - def remap_optional_id(mapping_key, old_id) + def remap_optional_id(mapping_key, old_id, record_type:) return if old_id.blank? - @id_mappings[mapping_key][old_id] + mapped_id(mapping_key, old_id, record_type: record_type) end def recurring_transaction_status_for(status) @@ -312,8 +554,10 @@ class Family::DataImporter data = record["data"] old_id = data["id"] + require_source_id!("Transaction", old_id) + # Map account ID - new_account_id = @id_mappings[:accounts][data["account_id"]] + new_account_id = mapped_id(:accounts, data["account_id"], record_type: "Transaction") next unless new_account_id account = @family.accounts.find(new_account_id) @@ -321,55 +565,69 @@ class Family::DataImporter # Map category ID (optional) new_category_id = nil if data["category_id"].present? - new_category_id = @id_mappings[:categories][data["category_id"]] + new_category_id = mapped_id(:categories, data["category_id"], record_type: "Transaction") end # Map merchant ID (optional) new_merchant_id = nil if data["merchant_id"].present? - new_merchant_id = @id_mappings[:merchants][data["merchant_id"]] + new_merchant_id = mapped_id(:merchants, data["merchant_id"], record_type: "Transaction") end # Map tag IDs (optional) - new_tag_ids = mapped_tag_ids(data["tag_ids"]) + new_tag_ids = mapped_tag_ids(data["tag_ids"], record_type: "Transaction") - transaction = Transaction.new( + entry = session_imported_entry(account, "Transaction", old_id) + transaction = entry&.entryable if entry&.entryable.is_a?(Transaction) + created = transaction.blank? + + transaction ||= Transaction.new + transaction.assign_attributes( category_id: new_category_id, merchant_id: new_merchant_id, kind: data["kind"] || "standard" ) - entry = Entry.new( + entry ||= Entry.new(entryable: transaction) + entry.assign_attributes( account: account, date: Date.parse(data["date"].to_s), amount: data["amount"].to_d, name: data["name"] || "Imported transaction", currency: data["currency"] || account.currency, notes: data["notes"], - excluded: data["excluded"] || false, - entryable: transaction + excluded: data["excluded"] || false ) + if @import_session + entry.external_id = session_entry_external_id("Transaction", old_id) + entry.source = session_entry_source + end entry.save! - @id_mappings[:transactions][old_id] = transaction.id + map_source!(:transactions, old_id, transaction) split_rows = importable_split_rows(data) if split_rows.any? - @created_entries << entry + @created_entries << entry if created import_split_lines!(entry, split_rows, fallback_tag_ids: new_tag_ids) else + transaction.taggings.destroy_all unless created new_tag_ids.each do |tag_id| transaction.taggings.create!(tag_id: tag_id) end - @created_entries << entry + @created_entries << entry if created end + + increment_summary("Transaction", created ? :created : :updated) end end - def mapped_tag_ids(old_tag_ids) - Array(old_tag_ids).map { |old_tag_id| @id_mappings[:tags][old_tag_id] }.compact + def mapped_tag_ids(old_tag_ids, record_type:) + Array(old_tag_ids).map do |old_tag_id| + mapped_id(:tags, old_tag_id, record_type: record_type) + end.compact end def importable_split_rows(data) @@ -380,8 +638,8 @@ class Family::DataImporter amount = row["amount"] || row["amount_money"] || row["amount_decimal"] next if amount.blank? - category_id = remap_optional_id(:categories, row["category_id"]) - merchant_id = remap_optional_id(:merchants, row["merchant_id"]) + category_id = remap_optional_id(:categories, row["category_id"], record_type: "Transaction") + merchant_id = remap_optional_id(:merchants, row["merchant_id"], record_type: "Transaction") { old_id: row["id"], @@ -392,7 +650,7 @@ class Family::DataImporter merchant_id_provided: row.key?("merchant_id"), notes: row["notes"], excluded: boolean_import_value(row, "excluded", default: false), - tag_ids: mapped_tag_ids(row["tag_ids"]), + tag_ids: mapped_tag_ids(row["tag_ids"], record_type: "Transaction"), tag_ids_provided: row.key?("tag_ids"), kind: row["kind"] } @@ -424,7 +682,7 @@ class Family::DataImporter transaction.taggings.create!(tag_id: tag_id) end - @id_mappings[:transactions][row[:old_id]] = transaction.id if row[:old_id].present? + map_source!(:transactions, row[:old_id], transaction) if row[:old_id].present? @created_entries << child_entry end end @@ -432,31 +690,60 @@ class Family::DataImporter def import_transfers(records) records.each do |record| data = record["data"] - inflow_transaction_id = @id_mappings[:transactions][data["inflow_transaction_id"]] - outflow_transaction_id = @id_mappings[:transactions][data["outflow_transaction_id"]] + inflow_transaction_id = mapped_id(:transactions, data["inflow_transaction_id"], record_type: "Transfer") + outflow_transaction_id = mapped_id(:transactions, data["outflow_transaction_id"], record_type: "Transfer") next unless inflow_transaction_id && outflow_transaction_id - Transfer.find_or_create_by!( + transfer = Transfer.find_or_create_by!( inflow_transaction_id: inflow_transaction_id, outflow_transaction_id: outflow_transaction_id ) do |transfer| transfer.status = transfer_status_for(data["status"]) transfer.notes = data["notes"] end + apply_transfer_transaction_kinds!(transfer) + increment_summary("Transfer", transfer.previously_new_record? ? :created : :updated) end end + def apply_transfer_transaction_kinds!(transfer) + destination_account = transfer.inflow_transaction.entry.account + outflow_kind = imported_transfer_outflow_kind(transfer) + outflow_attrs = { kind: outflow_kind } + if outflow_kind == "investment_contribution" && transfer.outflow_transaction.category_id.blank? + outflow_attrs[:category] = destination_account.family.investment_contributions_category + end + + transfer.outflow_transaction.update!(outflow_attrs) + transfer.inflow_transaction.update!(kind: "funds_movement") + end + + def imported_transfer_outflow_kind(transfer) + source_account = transfer.outflow_transaction.entry.account + destination_account = transfer.inflow_transaction.entry.account + return "loan_payment" if destination_account.loan? + return "cc_payment" if destination_account.liability? + return "investment_contribution" if investment_account?(destination_account) && !investment_account?(source_account) + + "funds_movement" + end + + def investment_account?(account) + account.investment? || account.crypto? + end + def import_rejected_transfers(records) records.each do |record| data = record["data"] - inflow_transaction_id = @id_mappings[:transactions][data["inflow_transaction_id"]] - outflow_transaction_id = @id_mappings[:transactions][data["outflow_transaction_id"]] + inflow_transaction_id = mapped_id(:transactions, data["inflow_transaction_id"], record_type: "RejectedTransfer") + outflow_transaction_id = mapped_id(:transactions, data["outflow_transaction_id"], record_type: "RejectedTransfer") next unless inflow_transaction_id && outflow_transaction_id - RejectedTransfer.find_or_create_by!( + rejected_transfer = RejectedTransfer.find_or_create_by!( inflow_transaction_id: inflow_transaction_id, outflow_transaction_id: outflow_transaction_id ) + increment_summary("RejectedTransfer", rejected_transfer.previously_new_record? ? :created : :updated) end end @@ -471,9 +758,12 @@ class Family::DataImporter def import_trades(records) records.each do |record| data = record["data"] + old_id = data["id"] + + require_source_id!("Trade", old_id) # Map account ID - new_account_id = @id_mappings[:accounts][data["account_id"]] + new_account_id = mapped_id(:accounts, data["account_id"], record_type: "Trade") next unless new_account_id account = @family.accounts.find(new_account_id) @@ -490,34 +780,47 @@ class Family::DataImporter exchange_operating_mic: data["exchange_operating_mic"] ) - trade = Trade.new( + entry = session_imported_entry(account, "Trade", old_id) + trade = entry&.entryable if entry&.entryable.is_a?(Trade) + created = trade.blank? + + trade ||= Trade.new + trade.assign_attributes( security: security, qty: data["qty"].to_d, price: data["price"].to_d, currency: data["currency"] || account.currency ) - entry = Entry.new( + entry ||= Entry.new(entryable: trade) + entry.assign_attributes( account: account, date: Date.parse(data["date"].to_s), amount: data["amount"].to_d, name: "#{data["qty"].to_d >= 0 ? 'Buy' : 'Sell'} #{ticker}", - currency: data["currency"] || account.currency, - entryable: trade + currency: data["currency"] || account.currency ) + if @import_session + entry.external_id = session_entry_external_id("Trade", old_id) + entry.source = session_entry_source + end entry.save! - @created_entries << entry + @created_entries << entry if created + increment_summary("Trade", created ? :created : :updated) end end def import_holdings(records) - accounts_by_id = @family.accounts.where(id: records.filter_map { |record| @id_mappings[:accounts][record.dig("data", "account_id")] }).index_by(&:id) + account_ids = records.filter_map do |record| + mapped_id(:accounts, record.dig("data", "account_id"), record_type: "Holding", required: false) + end + accounts_by_id = @family.accounts.where(id: account_ids).index_by(&:id) records.each do |record| data = record["data"] - new_account_id = @id_mappings[:accounts][data["account_id"]] + new_account_id = mapped_id(:accounts, data["account_id"], record_type: "Holding") next unless new_account_id account = accounts_by_id[new_account_id] @@ -552,33 +855,46 @@ class Family::DataImporter security_locked: truthy?(data["security_locked"]) || false } - upsert_imported_holding!(account, security, holding_date, holding_currency, holding_attributes) + created = upsert_imported_holding!(account, security, holding_date, holding_currency, holding_attributes) + increment_summary("Holding", created ? :created : :updated) end end def import_valuations(records) records.each do |record| data = record["data"] + old_id = data["id"] + + require_source_id!("Valuation", old_id) # Map account ID - new_account_id = @id_mappings[:accounts][data["account_id"]] + new_account_id = mapped_id(:accounts, data["account_id"], record_type: "Valuation") next unless new_account_id account = @family.accounts.find(new_account_id) - valuation = Valuation.new(kind: valuation_kind_for(data["kind"])) + entry = session_imported_entry(account, "Valuation", old_id) + valuation = entry&.entryable if entry&.entryable.is_a?(Valuation) + created = valuation.blank? + valuation ||= Valuation.new + valuation.kind = valuation_kind_for(data["kind"]) - entry = Entry.new( + entry ||= Entry.new(entryable: valuation) + entry.assign_attributes( account: account, date: Date.parse(data["date"].to_s), amount: data["amount"].to_d, name: data["name"] || "Valuation", - currency: data["currency"] || account.currency, - entryable: valuation + currency: data["currency"] || account.currency ) + if @import_session + entry.external_id = session_entry_external_id("Valuation", old_id) + entry.source = session_entry_source + end entry.save! - @created_entries << entry + @created_entries << entry if created + increment_summary("Valuation", created ? :created : :updated) end end @@ -650,7 +966,13 @@ class Family::DataImporter data = record["data"] old_id = data["id"] - budget = @family.budgets.build( + require_source_id!("Budget", old_id) + + budget = mapped_record(:budgets, old_id, @family.budgets, record_type: "Budget") + created = budget.blank? + budget ||= @family.budgets.build + + budget.assign_attributes( start_date: Date.parse(data["start_date"].to_s), end_date: Date.parse(data["end_date"].to_s), budgeted_spending: data["budgeted_spending"]&.to_d, @@ -659,7 +981,8 @@ class Family::DataImporter ) budget.save! - @id_mappings[:budgets][old_id] = budget.id + map_source!(:budgets, old_id, budget) + increment_summary("Budget", created ? :created : :updated) end end @@ -668,36 +991,49 @@ class Family::DataImporter data = record["data"] # Map budget ID - new_budget_id = @id_mappings[:budgets][data["budget_id"]] + new_budget_id = mapped_id(:budgets, data["budget_id"], record_type: "BudgetCategory") next unless new_budget_id # Map category ID - new_category_id = @id_mappings[:categories][data["category_id"]] + new_category_id = mapped_id(:categories, data["category_id"], record_type: "BudgetCategory") next unless new_category_id budget = @family.budgets.find(new_budget_id) - budget_category = budget.budget_categories.build( + budget_category = budget.budget_categories.find_or_initialize_by(category_id: new_category_id) + created = budget_category.new_record? + budget_category.assign_attributes( category_id: new_category_id, budgeted_spending: data["budgeted_spending"].to_d, currency: data["currency"] || budget.currency ) budget_category.save! + increment_summary("BudgetCategory", created ? :created : :updated) end end def import_rules(records) records.each do |record| data = record["data"] + old_id = data["id"] - rule = @family.rules.build( + require_source_id!("Rule", old_id) + + rule = mapped_record(:rules, old_id, @family.rules, record_type: "Rule") + created = rule.blank? + rule ||= @family.rules.build + + rule.assign_attributes( name: data["name"], resource_type: data["resource_type"] || "transaction", active: data["active"] || false, effective_date: data["effective_date"].present? ? Date.parse(data["effective_date"].to_s) : nil ) + rule.conditions.destroy_all unless created + rule.actions.destroy_all unless created + # Build conditions (data["conditions"] || []).each do |condition_data| build_rule_condition(rule, condition_data) @@ -709,6 +1045,8 @@ class Family::DataImporter end rule.save! + map_source!(:rules, old_id, rule) + increment_summary("Rule", created ? :created : :updated) end end @@ -845,8 +1183,9 @@ class Family::DataImporter return security end - if old_security_id.present? && @id_mappings[:securities][old_security_id] - security = Security.find(@id_mappings[:securities][old_security_id]) + mapped_security_id = mapped_id(:securities, old_security_id, record_type: "Security", required: false) + if old_security_id.present? && mapped_security_id + security = Security.find(mapped_security_id) apply_security_metadata(security, normalized_ticker, attributes) @security_cache[cache_key] = security return security @@ -856,7 +1195,7 @@ class Family::DataImporter apply_security_metadata(security, normalized_ticker, attributes) @security_cache[cache_key] = security - @id_mappings[:securities][old_security_id] = security.id if old_security_id.present? + map_source!(:securities, old_security_id, security) if old_security_id.present? security end @@ -901,6 +1240,7 @@ class Family::DataImporter def upsert_imported_holding!(account, security, date, currency, attributes) holding = account.holdings.find_or_initialize_by(security: security, date: date, currency: currency) + created = holding.new_record? holding.assign_attributes(attributes) begin @@ -908,7 +1248,10 @@ class Family::DataImporter rescue ActiveRecord::RecordNotUnique existing = account.holdings.find_by!(security: security, date: date, currency: currency) existing.update!(attributes) + created = false end + + created end def security_kind_for(value) diff --git a/app/models/family/financial_data_reset.rb b/app/models/family/financial_data_reset.rb index 91759e239..021f1eab9 100644 --- a/app/models/family/financial_data_reset.rb +++ b/app/models/family/financial_data_reset.rb @@ -7,6 +7,8 @@ class Family::FinancialDataReset account_statements family_exports imports + import_sessions + import_source_mappings import_rows import_mappings accounts @@ -127,6 +129,7 @@ class Family::FinancialDataReset delete_active_storage_attachments! scope(:transfers).destroy_all scope(:rejected_transfers).destroy_all + scope(:import_source_mappings).destroy_all scope(:import_mappings).destroy_all scope(:import_rows).destroy_all scope(:rule_runs).destroy_all @@ -138,6 +141,7 @@ class Family::FinancialDataReset scope(:account_statements).destroy_all scope(:family_exports).destroy_all scope(:imports).destroy_all + scope(:import_sessions).destroy_all scope(:entries).destroy_all scope(:holdings).destroy_all scope(:balances).destroy_all @@ -239,6 +243,7 @@ class Family::FinancialDataReset account_scope = Account.where(family_id: family.id) account_ids = account_scope.select(:id) import_scope = Import.where(family_id: family.id) + import_session_scope = ImportSession.where(family_id: family.id) import_ids = import_scope.select(:id) rule_scope = Rule.where(family_id: family.id) rule_ids = rule_scope.select(:id) @@ -252,6 +257,8 @@ class Family::FinancialDataReset account_statements: AccountStatement.where(family_id: family.id), family_exports: FamilyExport.where(family_id: family.id), imports: import_scope, + import_sessions: import_session_scope, + import_source_mappings: ImportSourceMapping.where(family_id: family.id), import_rows: Import::Row.where(import_id: import_ids), import_mappings: Import::Mapping.where(import_id: import_ids), accounts: account_scope, diff --git a/app/models/import.rb b/app/models/import.rb index d70a19eb2..4ea45f684 100644 --- a/app/models/import.rb +++ b/app/models/import.rb @@ -41,11 +41,14 @@ class Import < ApplicationRecord belongs_to :family belongs_to :account, optional: true belongs_to :account_statement, optional: true + belongs_to :import_session, optional: true before_validation :set_default_number_format before_validation :ensure_utf8_encoding + normalizes :client_chunk_id, with: ->(value) { value.strip.presence } scope :ordered, -> { order(created_at: :desc) } + scope :ordered_by_sequence, -> { order(:sequence, :created_at) } enum :status, { pending: "pending", @@ -61,9 +64,15 @@ class Import < ApplicationRecord validates :col_sep, inclusion: { in: SEPARATORS.map(&:last) } validates :signage_convention, inclusion: { in: SIGNAGE_CONVENTIONS }, allow_nil: true validates :number_format, presence: true, inclusion: { in: NUMBER_FORMATS.keys } + validates :sequence, numericality: { only_integer: true, greater_than: 0 }, allow_nil: true + validates :client_chunk_id, length: { maximum: 255 }, allow_blank: true + validates :checksum, length: { is: 64 }, allow_blank: true validate :custom_column_import_requires_identifier validates :rows_to_skip, numericality: { only_integer: true, greater_than_or_equal_to: 0 } validate :account_belongs_to_family + validate :import_session_belongs_to_family + validate :session_chunk_metadata + validate :session_payloads_are_json_objects validate :rows_to_skip_within_file_bounds has_many :rows, dependent: :destroy @@ -564,6 +573,25 @@ class Import < ApplicationRecord errors.add(:account, "must belong to your family") end + def import_session_belongs_to_family + return if import_session.nil? + return if import_session.family_id == family_id + + errors.add(:import_session, "must belong to your family") + end + + def session_chunk_metadata + return if import_session.nil? + + errors.add(:sequence, "must be present for import session chunks") if sequence.blank? + errors.add(:checksum, "must be present for import session chunks") if checksum.blank? + end + + def session_payloads_are_json_objects + errors.add(:summary, "must be an object") unless summary.is_a?(Hash) + errors.add(:error_details, "must be an object") unless error_details.is_a?(Hash) + end + def rows_to_skip_within_file_bounds return if raw_file_str.blank? return if rows_to_skip.to_i == 0 diff --git a/app/models/import_session.rb b/app/models/import_session.rb new file mode 100644 index 000000000..8ce8c1fd9 --- /dev/null +++ b/app/models/import_session.rb @@ -0,0 +1,425 @@ +require "digest" + +class ImportSession < ApplicationRecord + ConflictError = Class.new(StandardError) + EnqueueError = Class.new(StandardError) + + IMPORT_TYPES = %w[SureImport].freeze + STATUSES = %w[pending importing complete failed].freeze + + belongs_to :family + has_many :imports, -> { order(:sequence, :created_at) }, dependent: :destroy + has_many :source_mappings, + class_name: "ImportSourceMapping", + dependent: :destroy + + enum :status, { + pending: "pending", + importing: "importing", + complete: "complete", + failed: "failed" + }, validate: true, default: "pending" + + validates :import_type, inclusion: { in: IMPORT_TYPES } + validates :client_session_id, uniqueness: { scope: :family_id }, allow_blank: true + validates :client_session_id, length: { maximum: 255 }, allow_blank: true + normalizes :client_session_id, with: ->(value) { value.strip.presence } + validates :expected_chunks, + numericality: { only_integer: true, greater_than: 0 }, + allow_nil: true + validate :payloads_are_json_objects + + def self.create_or_find_for!(family:, import_type:, client_session_id:, expected_chunks:) + import_type = import_type.presence || "SureImport" + expected_chunks = normalize_positive_integer(expected_chunks) + unless IMPORT_TYPES.include?(import_type) + session = new(import_type: import_type) + session.errors.add(:import_type, "must be SureImport") + raise ActiveRecord::RecordInvalid.new(session) + end + + if client_session_id.present? + session = family.import_sessions.find_or_initialize_by(client_session_id: client_session_id) + if session.persisted? && + expected_chunks.present? && + session.expected_chunks.present? && + session.expected_chunks != expected_chunks + raise ConflictError, "client_session_id already exists with a different expected_chunks value" + end + else + session = family.import_sessions.build + end + + session.import_type = import_type + session.expected_chunks ||= expected_chunks + session.save! + session + rescue ActiveRecord::RecordNotUnique + raise unless client_session_id.present? + + existing = family.import_sessions.find_by(client_session_id: client_session_id) + raise unless existing + + if expected_chunks.present? && + existing.expected_chunks.present? && + existing.expected_chunks != expected_chunks + raise ConflictError, "client_session_id already exists with a different expected_chunks value" + end + if expected_chunks.present? && existing.expected_chunks.nil? + existing.update!(expected_chunks: expected_chunks) + end + + existing + end + + def self.normalize_positive_integer(value) + return if value.blank? + + Integer(value, exception: false) || 0 + end + private_class_method :normalize_positive_integer + + def attach_chunk!(sequence:, content:, filename:, content_type:, client_chunk_id: nil) + sequence = self.class.send(:normalize_positive_integer, sequence) + raise ConflictError, "sequence must be a positive integer" unless sequence.positive? + raise ConflictError, "sequence exceeds expected_chunks" if expected_chunks.present? && sequence > expected_chunks + + checksum = Digest::SHA256.hexdigest(content) + normalized_client_chunk_id = client_chunk_id.presence + chunk_needs_finalization = false + + chunk = with_lock do + raise ConflictError, "cannot add chunks after publishing starts" unless pending? || failed? + + existing = existing_chunk_for!( + sequence: sequence, + client_chunk_id: normalized_client_chunk_id, + checksum: checksum + ) + + if existing + chunk_needs_finalization = prepare_existing_chunk_for_retry!( + existing, + checksum: checksum, + content: content, + filename: filename, + content_type: content_type + ) + existing + else + chunk_needs_finalization = true + chunk = create_chunk!( + sequence: sequence, + client_chunk_id: normalized_client_chunk_id, + checksum: checksum, + content: content, + filename: filename, + content_type: content_type + ) + end + end + + finalize_chunk_for_retry!(chunk, checksum) if chunk_needs_finalization + chunk + rescue ActiveRecord::RecordNotUnique + imports.reset + existing = existing_chunk_for!( + sequence: sequence, + client_chunk_id: normalized_client_chunk_id, + checksum: checksum + ) + return prepare_and_finalize_existing_chunk!( + existing, + checksum: checksum, + content: content, + filename: filename, + content_type: content_type + ) if existing + + raise ConflictError, "chunk already exists with different content" + end + + def create_chunk!(sequence:, client_chunk_id:, checksum:, content:, filename:, content_type:) + imports.create!( + family: family, + type: "SureImport", + sequence: sequence, + client_chunk_id: client_chunk_id, + checksum: checksum + ).tap do |import| + import.ndjson_file.attach( + io: StringIO.new(content), + filename: filename, + content_type: content_type + ) + end + end + private :create_chunk! + + def publish_later + previous_status = nil + should_enqueue = false + + sync_chunk_row_counts! + + with_lock do + return if complete? || importing? + + validate_publishable_chunks! + + previous_status = status + update!(status: :importing, error_details: {}) + should_enqueue = true + end + + return unless should_enqueue + + begin + ImportSessionJob.perform_later(self) + rescue => error + with_lock do + reload + if importing? + update!(status: previous_status, error_details: enqueue_error_details) + end + end + Rails.logger.error("ImportSession enqueue failed import_session_id=#{id} exception=#{error.class}") + raise EnqueueError, "Import session could not be queued." + end + end + + def publish + return unless prepare_for_publish! + + Rails.logger.info("ImportSession publish started import_session_id=#{id}") + + imports.ordered_by_sequence.each do |import| + process_chunk!(import) + end + + update!(status: :complete, summary: aggregate_chunk_summaries, error_details: {}) + enqueue_family_sync + Rails.logger.info("ImportSession publish completed import_session_id=#{id}") + rescue => error + update!( + status: :failed, + error_details: error_details_for(error), + summary: aggregate_chunk_summaries + ) + Rails.logger.error("ImportSession publish failed import_session_id=#{id} exception=#{error.class}") + end + + def aggregate_chunk_summaries + imports.reload.each_with_object({}) do |import, totals| + merge_summary!(totals, import.summary || {}) + end + end + + private + def prepare_for_publish! + sync_chunk_row_counts! + + with_lock do + return false if complete? + + validate_publishable_chunks! + + update!(status: :importing, error_details: {}) unless importing? + true + end + end + + def enqueue_family_sync + family.sync_later + rescue => error + update!(error_details: sync_enqueue_error_details) + Rails.logger.error( + "ImportSession family sync enqueue failed import_session_id=#{id} exception=#{error.class}" + ) + end + + def existing_chunk_for!(sequence:, client_chunk_id:, checksum:) + sequence_match = imports.find_by(sequence: sequence) + client_chunk_match = imports.find_by(client_chunk_id: client_chunk_id) if client_chunk_id.present? + + if sequence_match && client_chunk_match && sequence_match.id != client_chunk_match.id + raise ConflictError, "sequence and client_chunk_id refer to different chunks" + end + + existing = sequence_match || client_chunk_match + return unless existing + + if existing.sequence != sequence + raise ConflictError, "client_chunk_id already exists with a different sequence" + end + + if client_chunk_id.present? && existing.client_chunk_id.present? && existing.client_chunk_id != client_chunk_id + raise ConflictError, "sequence already exists with a different client_chunk_id" + end + + raise ConflictError, "chunk already exists with different content" unless existing.checksum == checksum + + existing + end + + def prepare_and_finalize_existing_chunk!(chunk, checksum:, content:, filename:, content_type:) + needs_finalization = with_lock do + prepare_existing_chunk_for_retry!( + chunk.reload, + checksum: checksum, + content: content, + filename: filename, + content_type: content_type + ) + end + + finalize_chunk_for_retry!(chunk, checksum) if needs_finalization + chunk + end + + def prepare_existing_chunk_for_retry!(chunk, checksum:, content:, filename:, content_type:) + return false if chunk_ready_for_retry?(chunk, checksum) + return true if chunk.ndjson_file.attached? && chunk_content_checksum(chunk) == checksum + + chunk.ndjson_file.attach( + io: StringIO.new(content), + filename: filename, + content_type: content_type + ) + true + end + + def finalize_chunk_for_retry!(chunk, checksum) + chunk.sync_ndjson_rows_count! + chunk.reload + return chunk if chunk_ready_for_retry?(chunk, checksum) + + raise ConflictError, "chunk already exists but is incomplete" + rescue ActiveStorage::FileNotFoundError + raise ConflictError, "chunk already exists but is incomplete" + end + + def chunk_ready_for_retry?(chunk, checksum) + chunk.ndjson_file.attached? && + chunk.rows_count.to_i.positive? && + chunk_content_checksum(chunk) == checksum + end + + def chunk_content_checksum(chunk) + Digest::SHA256.hexdigest(chunk.ndjson_file.download) + rescue ActiveStorage::FileNotFoundError + nil + end + + def process_chunk!(import) + return if import.complete? + + import.update!(status: :importing, error: nil, error_details: {}) + result = import.import!(import_session: self) + import.update!(status: :complete, summary: result.fetch(:summary, {}), error_details: {}) + rescue => error + import.update!( + status: :failed, + error: public_error_message_for(error), + error_details: error_details_for(error), + summary: failed_summary_for(error) + ) + raise + end + + def row_count_exceeded? + imports.sum(:rows_count) > SureImport.max_row_count + end + + def validate_publishable_chunks! + raise ConflictError, "import session has no chunks" unless imports.exists? + raise Import::MaxRowCountExceededError if row_count_exceeded? + validate_expected_chunk_sequences! + end + + def sync_chunk_row_counts! + raise ConflictError, "import session has no chunks" unless imports.exists? + imports.reload.each(&:sync_ndjson_rows_count!) + rescue ActiveStorage::FileNotFoundError + raise ConflictError, "import session chunks are incomplete" + end + + def validate_expected_chunk_sequences! + return if expected_chunks.blank? + + expected_sequences = (1..expected_chunks).to_a + actual_sequences = imports.pluck(:sequence).sort + return if actual_sequences == expected_sequences + + missing_sequences = expected_sequences - actual_sequences + unexpected_sequences = actual_sequences - expected_sequences + details = [] + details << "missing sequences: #{missing_sequences.join(', ')}" if missing_sequences.any? + details << "unexpected sequences: #{unexpected_sequences.join(', ')}" if unexpected_sequences.any? + + raise ConflictError, "import session chunks do not match expected sequences (#{details.join('; ')})" + end + + def error_details_for(error) + details = { + "code" => error.respond_to?(:code) ? error.code : "import_failed", + "message" => public_error_message_for(error) + } + + if error.respond_to?(:details) + details.merge!(error.details.stringify_keys) + end + + details + end + + def public_error_message_for(error) + return error.message if error.respond_to?(:code) + + "Import session failed." + end + + def enqueue_error_details + { + "code" => "import_enqueue_failed", + "message" => "Import session could not be queued." + } + end + + def sync_enqueue_error_details + { + "code" => "family_sync_enqueue_failed", + "message" => "Family sync could not be queued after import completion." + } + end + + def merge_summary!(totals, summary) + summary.each do |entity_type, counts| + next unless counts.respond_to?(:each) + + totals[entity_type] ||= {} + counts.each do |status, count| + totals[entity_type][status] = totals[entity_type].fetch(status, 0) + count.to_i + end + end + end + + def failed_summary_for(error) + record_type = error_details_for(error)["record_type"] + return {} if record_type.blank? + + { + record_type.to_s.underscore.pluralize => { + "created" => 0, + "updated" => 0, + "skipped" => 0, + "failed" => 1 + } + } + end + + def payloads_are_json_objects + errors.add(:summary, "must be an object") unless summary.is_a?(Hash) + errors.add(:error_details, "must be an object") unless error_details.is_a?(Hash) + end +end diff --git a/app/models/import_source_mapping.rb b/app/models/import_source_mapping.rb new file mode 100644 index 000000000..c59bd8a81 --- /dev/null +++ b/app/models/import_source_mapping.rb @@ -0,0 +1,41 @@ +class ImportSourceMapping < ApplicationRecord + SOURCE_TYPES = %w[Account Category Tag Merchant RecurringTransaction Transaction Budget Security Rule].freeze + + belongs_to :family + belongs_to :import_session + belongs_to :target, polymorphic: true, optional: true + + validates :source_type, :source_id, :target_type, :target_id, presence: true + validates :source_type, inclusion: { in: SOURCE_TYPES } + validates :target_type, inclusion: { in: SOURCE_TYPES }, allow_blank: true + validates :source_type, length: { maximum: 64 } + validates :source_id, length: { maximum: 255 } + validates :source_id, uniqueness: { scope: [ :import_session_id, :source_type ] } + normalizes :source_type, :source_id, with: ->(value) { value.strip.presence } + validate :family_matches_import_session + validate :target_exists + validate :target_matches_family + + private + def family_matches_import_session + return if import_session.blank? || family_id == import_session.family_id + + errors.add(:family, "must match import session") + end + + def target_exists + return if target_type.blank? || target_id.blank? || !SOURCE_TYPES.include?(target_type) + return if target.present? + + errors.add(:target, "must exist") + end + + def target_matches_family + return if target_type.blank? || !SOURCE_TYPES.include?(target_type) + return if target.blank? + return unless target.respond_to?(:family_id) + return if target.family_id == family_id + + errors.add(:target, "must belong to your family") + end +end diff --git a/app/models/sure_import.rb b/app/models/sure_import.rb index c738e6156..096b3771d 100644 --- a/app/models/sure_import.rb +++ b/app/models/sure_import.rb @@ -129,16 +129,28 @@ class SureImport < Import self.class.dry_run_totals_from_ndjson(ndjson_blob_string) end - def import! + def import!(import_session: nil) sync_ndjson_counts! before_counts = readback_count_snapshot - importer = Family::DataImporter.new(family, ndjson_blob_string) + importer = Family::DataImporter.new(family, ndjson_blob_string, import_session: import_session, import: self) result = importer.import! - result[:accounts].each { |account| accounts << account } - result[:entries].each { |entry| entries << entry } + Import.transaction do + result[:accounts].each { |account| account.save! if account.new_record? } + result[:entries].each { |entry| entry.save! if entry.new_record? } + + account_ids = result[:accounts].filter_map(&:id) + entry_ids = result[:entries].filter_map(&:id) + existing_account_ids = accounts.where(id: account_ids).pluck(:id) + existing_entry_ids = entries.where(id: entry_ids).pluck(:id) + + accounts.concat(result[:accounts].reject { |account| existing_account_ids.include?(account.id) }) + entries.concat(result[:entries].reject { |entry| existing_entry_ids.include?(entry.id) }) + update!(summary: result[:summary]) if has_attribute?(:summary) + end record_readback_verification!(before_counts:) + result rescue => error record_failed_readback_verification!(before_counts:, error:) raise diff --git a/config/routes.rb b/config/routes.rb index fb4355deb..53bdc864a 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -536,6 +536,10 @@ Rails.application.routes.draw do post :preflight, on: :collection get :rows, on: :member end + resources :import_sessions, only: [ :show, :create ] do + post :chunks, on: :member, action: :create_chunk + post :publish, on: :member + end resource :usage, only: [ :show ], controller: :usage resource :balance_sheet, only: [ :show ], controller: :balance_sheet resource :family_settings, only: [ :show ], controller: :family_settings diff --git a/db/migrate/20260513013000_create_import_sessions.rb b/db/migrate/20260513013000_create_import_sessions.rb new file mode 100644 index 000000000..743d7f4bd --- /dev/null +++ b/db/migrate/20260513013000_create_import_sessions.rb @@ -0,0 +1,78 @@ +class CreateImportSessions < ActiveRecord::Migration[7.2] + def change + create_table :import_sessions, id: :uuid do |t| + t.references :family, null: false, foreign_key: true, type: :uuid + t.string :import_type, null: false, default: "SureImport" + t.string :status, null: false, default: "pending" + t.string :client_session_id, limit: 255 + t.integer :expected_chunks + t.jsonb :summary, null: false, default: {} + t.jsonb :error_details, null: false, default: {} + + t.timestamps + + t.index [ :family_id, :client_session_id ], + unique: true, + where: "client_session_id IS NOT NULL", + name: "idx_import_sessions_on_family_client_session" + t.index [ :family_id, :status ] + t.index [ :id, :family_id ], unique: true, name: "idx_import_sessions_on_id_family" + t.check_constraint "expected_chunks IS NULL OR expected_chunks > 0", name: "chk_import_sessions_expected_chunks_positive" + t.check_constraint "client_session_id IS NULL OR btrim(client_session_id) <> ''", + name: "chk_import_sessions_client_session_id_present" + t.check_constraint "import_type = 'SureImport'", name: "chk_import_sessions_import_type" + t.check_constraint "status IN ('pending', 'importing', 'complete', 'failed')", name: "chk_import_sessions_status" + t.check_constraint "jsonb_typeof(summary) = 'object'", name: "chk_import_sessions_summary_object" + t.check_constraint "jsonb_typeof(error_details) = 'object'", name: "chk_import_sessions_error_details_object" + end + + create_table :import_source_mappings, id: :uuid do |t| + t.references :family, null: false, foreign_key: true, type: :uuid + t.references :import_session, null: false, type: :uuid + t.string :source_type, null: false, limit: 64 + t.string :source_id, null: false, limit: 255 + t.references :target, polymorphic: true, null: false, type: :uuid, + index: { name: "idx_import_source_mappings_on_target" } + + t.timestamps + + t.index [ :import_session_id, :source_type, :source_id ], + unique: true, + name: "index_import_source_mappings_on_session_type_and_source" + t.index [ :family_id, :source_type, :source_id ], name: "idx_import_source_mappings_on_family_source" + t.check_constraint "btrim(source_type) <> ''", name: "chk_import_source_mappings_source_type_present" + t.check_constraint "source_type IN ('Account', 'Category', 'Tag', 'Merchant', 'RecurringTransaction', 'Transaction', 'Budget', 'Security', 'Rule')", + name: "chk_import_source_mappings_source_type" + t.check_constraint "btrim(source_id) <> ''", name: "chk_import_source_mappings_source_id_present" + t.check_constraint "btrim(target_type) <> ''", name: "chk_import_source_mappings_target_type_present" + t.check_constraint "target_type IN ('Account', 'Category', 'Tag', 'Merchant', 'RecurringTransaction', 'Transaction', 'Budget', 'Security', 'Rule')", + name: "chk_import_source_mappings_target_type" + end + + add_foreign_key :import_source_mappings, :import_sessions, + column: [ :import_session_id, :family_id ], primary_key: [ :id, :family_id ], + on_delete: :cascade, name: "fk_import_source_mappings_session_family" + + add_reference :imports, :import_session, type: :uuid + add_column :imports, :sequence, :integer + add_column :imports, :client_chunk_id, :string, limit: 255 + add_column :imports, :checksum, :string, limit: 64 + add_column :imports, :summary, :jsonb, null: false, default: {} + add_column :imports, :error_details, :jsonb, null: false, default: {} + + add_index :imports, [ :import_session_id, :sequence ], unique: true, + where: "import_session_id IS NOT NULL AND sequence IS NOT NULL", name: "idx_imports_on_session_sequence" + add_index :imports, [ :import_session_id, :client_chunk_id ], unique: true, + where: "import_session_id IS NOT NULL AND client_chunk_id IS NOT NULL", name: "idx_imports_on_session_client_chunk" + add_foreign_key :imports, :import_sessions, + column: [ :import_session_id, :family_id ], primary_key: [ :id, :family_id ], + on_delete: :cascade, name: "fk_imports_session_family" + add_check_constraint :imports, "sequence IS NULL OR sequence > 0", name: "chk_imports_session_sequence_positive" + add_check_constraint :imports, "client_chunk_id IS NULL OR btrim(client_chunk_id) <> ''", name: "chk_imports_client_chunk_id_present" + add_check_constraint :imports, "checksum IS NULL OR length(checksum) = 64", name: "chk_imports_checksum_sha256_length" + add_check_constraint :imports, "import_session_id IS NULL OR sequence IS NOT NULL", name: "chk_imports_session_sequence_present" + add_check_constraint :imports, "import_session_id IS NULL OR checksum IS NOT NULL", name: "chk_imports_session_checksum_present" + add_check_constraint :imports, "jsonb_typeof(summary) = 'object'", name: "chk_imports_summary_object" + add_check_constraint :imports, "jsonb_typeof(error_details) = 'object'", name: "chk_imports_error_details_object" + end +end diff --git a/db/schema.rb b/db/schema.rb index 513bd1abd..7f67f2646 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -982,6 +982,49 @@ ActiveRecord::Schema[7.2].define(version: 2026_05_31_153000) do t.check_constraint "source_row_number > 0", name: "chk_import_rows_source_row_number_positive" end + create_table "import_sessions", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| + t.uuid "family_id", null: false + t.string "import_type", default: "SureImport", null: false + t.string "status", default: "pending", null: false + t.string "client_session_id", limit: 255 + t.integer "expected_chunks" + t.jsonb "summary", default: {}, null: false + t.jsonb "error_details", default: {}, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["family_id", "client_session_id"], name: "idx_import_sessions_on_family_client_session", unique: true, where: "(client_session_id IS NOT NULL)" + t.index ["family_id", "status"], name: "index_import_sessions_on_family_id_and_status" + t.index ["family_id"], name: "index_import_sessions_on_family_id" + t.index ["id", "family_id"], name: "idx_import_sessions_on_id_family", unique: true + t.check_constraint "client_session_id IS NULL OR btrim(client_session_id::text) <> ''::text", name: "chk_import_sessions_client_session_id_present" + t.check_constraint "expected_chunks IS NULL OR expected_chunks > 0", name: "chk_import_sessions_expected_chunks_positive" + t.check_constraint "jsonb_typeof(error_details) = 'object'::text", name: "chk_import_sessions_error_details_object" + t.check_constraint "import_type::text = 'SureImport'::text", name: "chk_import_sessions_import_type" + t.check_constraint "status::text = ANY (ARRAY['pending'::character varying, 'importing'::character varying, 'complete'::character varying, 'failed'::character varying]::text[])", name: "chk_import_sessions_status" + t.check_constraint "jsonb_typeof(summary) = 'object'::text", name: "chk_import_sessions_summary_object" + end + + create_table "import_source_mappings", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| + t.uuid "family_id", null: false + t.uuid "import_session_id", null: false + t.string "source_type", limit: 64, null: false + t.string "source_id", limit: 255, null: false + t.string "target_type", null: false + t.uuid "target_id", null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["family_id", "source_type", "source_id"], name: "idx_import_source_mappings_on_family_source" + t.index ["family_id"], name: "index_import_source_mappings_on_family_id" + t.index ["import_session_id", "source_type", "source_id"], name: "index_import_source_mappings_on_session_type_and_source", unique: true + t.index ["import_session_id"], name: "index_import_source_mappings_on_import_session_id" + t.index ["target_type", "target_id"], name: "idx_import_source_mappings_on_target" + t.check_constraint "btrim(source_id::text) <> ''::text", name: "chk_import_source_mappings_source_id_present" + t.check_constraint "source_type::text = ANY (ARRAY['Account'::character varying, 'Category'::character varying, 'Tag'::character varying, 'Merchant'::character varying, 'RecurringTransaction'::character varying, 'Transaction'::character varying, 'Budget'::character varying, 'Security'::character varying, 'Rule'::character varying]::text[])", name: "chk_import_source_mappings_source_type" + t.check_constraint "btrim(source_type::text) <> ''::text", name: "chk_import_source_mappings_source_type_present" + t.check_constraint "target_type::text = ANY (ARRAY['Account'::character varying, 'Category'::character varying, 'Tag'::character varying, 'Merchant'::character varying, 'RecurringTransaction'::character varying, 'Transaction'::character varying, 'Budget'::character varying, 'Security'::character varying, 'Rule'::character varying]::text[])", name: "chk_import_source_mappings_target_type" + t.check_constraint "btrim(target_type::text) <> ''::text", name: "chk_import_source_mappings_target_type_present" + end + create_table "imports", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| t.jsonb "column_mappings" t.string "status" @@ -1021,8 +1064,24 @@ ActiveRecord::Schema[7.2].define(version: 2026_05_31_153000) do t.uuid "account_statement_id" t.jsonb "expected_record_counts", default: {}, null: false t.jsonb "readback_verification", default: {}, null: false + t.uuid "import_session_id" + t.integer "sequence" + t.string "client_chunk_id", limit: 255 + t.string "checksum", limit: 64 + t.jsonb "summary", default: {}, null: false + t.jsonb "error_details", default: {}, null: false t.index ["account_statement_id"], name: "index_imports_on_account_statement_id" t.index ["family_id"], name: "index_imports_on_family_id" + t.index ["import_session_id", "client_chunk_id"], name: "idx_imports_on_session_client_chunk", unique: true, where: "((import_session_id IS NOT NULL) AND (client_chunk_id IS NOT NULL))" + t.index ["import_session_id", "sequence"], name: "idx_imports_on_session_sequence", unique: true, where: "((import_session_id IS NOT NULL) AND (sequence IS NOT NULL))" + t.index ["import_session_id"], name: "index_imports_on_import_session_id" + t.check_constraint "checksum IS NULL OR length(checksum::text) = 64", name: "chk_imports_checksum_sha256_length" + t.check_constraint "client_chunk_id IS NULL OR btrim(client_chunk_id::text) <> ''::text", name: "chk_imports_client_chunk_id_present" + t.check_constraint "jsonb_typeof(error_details) = 'object'::text", name: "chk_imports_error_details_object" + t.check_constraint "import_session_id IS NULL OR checksum IS NOT NULL", name: "chk_imports_session_checksum_present" + t.check_constraint "import_session_id IS NULL OR sequence IS NOT NULL", name: "chk_imports_session_sequence_present" + t.check_constraint "jsonb_typeof(summary) = 'object'::text", name: "chk_imports_summary_object" + t.check_constraint "sequence IS NULL OR sequence > 0", name: "chk_imports_session_sequence_positive" end create_table "indexa_capital_accounts", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| @@ -2043,8 +2102,12 @@ ActiveRecord::Schema[7.2].define(version: 2026_05_31_153000) do add_foreign_key "impersonation_sessions", "users", column: "impersonated_id" add_foreign_key "impersonation_sessions", "users", column: "impersonator_id" add_foreign_key "import_rows", "imports" + add_foreign_key "import_sessions", "families" + add_foreign_key "import_source_mappings", "families" + add_foreign_key "import_source_mappings", "import_sessions", column: ["import_session_id", "family_id"], primary_key: ["id", "family_id"], name: "fk_import_source_mappings_session_family", on_delete: :cascade add_foreign_key "imports", "account_statements", on_delete: :nullify add_foreign_key "imports", "families" + add_foreign_key "imports", "import_sessions", column: ["import_session_id", "family_id"], primary_key: ["id", "family_id"], name: "fk_imports_session_family", on_delete: :cascade add_foreign_key "indexa_capital_accounts", "indexa_capital_items" add_foreign_key "indexa_capital_items", "families" add_foreign_key "invitations", "families" diff --git a/docs/api/openapi.yaml b/docs/api/openapi.yaml index 3a1adc911..a8f224d61 100644 --- a/docs/api/openapi.yaml +++ b/docs/api/openapi.yaml @@ -2086,6 +2086,115 @@ components: properties: data: "$ref": "#/components/schemas/ImportDetail" + ImportSessionChunk: + type: object + required: + - id + - sequence + - status + - rows_count + - summary + - created_at + - updated_at + properties: + id: + type: string + format: uuid + sequence: + type: integer + minimum: 1 + client_chunk_id: + type: string + nullable: true + status: + type: string + enum: + - pending + - importing + - complete + - failed + rows_count: + type: integer + minimum: 0 + summary: + type: object + additionalProperties: + type: object + additionalProperties: + type: integer + error: + type: object + nullable: true + additionalProperties: true + created_at: + type: string + format: date-time + updated_at: + type: string + format: date-time + ImportSession: + type: object + required: + - id + - type + - status + - chunks_count + - summary + - chunks + - created_at + - updated_at + properties: + id: + type: string + format: uuid + type: + type: string + enum: + - SureImport + status: + type: string + enum: + - pending + - importing + - complete + - failed + client_session_id: + type: string + nullable: true + expected_chunks: + type: integer + nullable: true + minimum: 1 + chunks_count: + type: integer + minimum: 0 + summary: + type: object + additionalProperties: + type: object + additionalProperties: + type: integer + error: + type: object + nullable: true + additionalProperties: true + chunks: + type: array + items: + "$ref": "#/components/schemas/ImportSessionChunk" + created_at: + type: string + format: date-time + updated_at: + type: string + format: date-time + ImportSessionResponse: + type: object + required: + - data + properties: + data: + "$ref": "#/components/schemas/ImportSession" ProviderConnectionInstitution: type: object required: @@ -2892,6 +3001,8 @@ components: - account_statements - family_exports - imports + - import_sessions + - import_source_mappings - import_rows - import_mappings - accounts @@ -2933,6 +3044,12 @@ components: imports: type: integer minimum: 0 + import_sessions: + type: integer + minimum: 0 + import_source_mappings: + type: integer + minimum: 0 import_rows: type: integer minimum: 0 @@ -4607,6 +4724,266 @@ paths: application/json: schema: "$ref": "#/components/schemas/ErrorResponse" + "/api/v1/import_sessions": + post: + summary: Create import session + description: Create or idempotently retrieve a multi-file SureImport session + keyed by client_session_id. + tags: + - Import Sessions + security: + - apiKeyAuth: [] + parameters: [] + responses: + '201': + description: import session created + content: + application/json: + schema: + "$ref": "#/components/schemas/ImportSessionResponse" + '401': + description: unauthorized + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '403': + description: insufficient scope + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '409': + description: client session conflict + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '422': + description: validation error + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + requestBody: + content: + application/json: + schema: + type: object + properties: + type: + type: string + enum: + - SureImport + description: Import session type. Only SureImport is supported. + client_session_id: + type: string + nullable: true + description: Client-provided idempotency key for the full import + session. + expected_chunks: + type: integer + minimum: 1 + nullable: true + description: Expected number of ordered chunks before publish is + allowed. + "/api/v1/import_sessions/{id}": + parameters: + - name: id + in: path + required: true + description: Import session ID + schema: + type: string + get: + summary: Retrieve import session + description: Retrieve import session status, chunk status, per-entity summary + counts, and safe error details. + tags: + - Import Sessions + security: + - apiKeyAuth: [] + responses: + '200': + description: import session retrieved + content: + application/json: + schema: + "$ref": "#/components/schemas/ImportSessionResponse" + '401': + description: unauthorized + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '403': + description: insufficient scope + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '404': + description: import session not found + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + "/api/v1/import_sessions/{id}/chunks": + parameters: + - name: id + in: path + required: true + description: Import session ID + schema: + type: string + post: + summary: Upload import session chunk + description: Attach an ordered Sure NDJSON chunk to an import session. Chunks + are idempotent by sequence and client_chunk_id with content verification. + tags: + - Import Sessions + security: + - apiKeyAuth: [] + requestBody: + required: true + content: + application/json: + schema: + type: object + required: + - sequence + - raw_file_content + properties: + sequence: + type: integer + minimum: 1 + description: One-based chunk sequence. Earlier dependency chunks + must have lower sequence numbers. + client_chunk_id: + type: string + nullable: true + description: Client-provided idempotency key for this chunk. + raw_file_content: + type: string + description: Raw Sure NDJSON content. Each chunk is limited to 10MB. + multipart/form-data: + schema: + type: object + required: + - sequence + - file + properties: + sequence: + type: integer + minimum: 1 + description: One-based chunk sequence. Earlier dependency chunks + must have lower sequence numbers. + client_chunk_id: + type: string + nullable: true + description: Client-provided idempotency key for this chunk. + file: + type: string + format: binary + description: Multipart Sure NDJSON file upload. Each chunk is limited + to 10MB. + parameters: [] + responses: + '201': + description: chunk uploaded + content: + application/json: + schema: + "$ref": "#/components/schemas/ImportSessionResponse" + '401': + description: unauthorized + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '403': + description: insufficient scope + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '409': + description: chunk conflict + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '404': + description: import session not found + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '422': + description: missing or invalid content + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + "/api/v1/import_sessions/{id}/publish": + parameters: + - name: id + in: path + required: true + description: Import session ID + schema: + type: string + post: + summary: Publish import session + description: Queue ordered chunk processing for a SureImport session. Later + chunks can reference source IDs mapped by earlier chunks. + tags: + - Import Sessions + security: + - apiKeyAuth: [] + responses: + '202': + description: import session publish queued + content: + application/json: + schema: + "$ref": "#/components/schemas/ImportSessionResponse" + '401': + description: unauthorized + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '403': + description: insufficient scope + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '422': + description: max_row_count_exceeded + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '409': + description: missing expected chunks + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '503': + description: enqueue failed + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" + '404': + description: import session not found + content: + application/json: + schema: + "$ref": "#/components/schemas/ErrorResponse" "/api/v1/imports": get: summary: List imports diff --git a/spec/requests/api/v1/import_sessions_spec.rb b/spec/requests/api/v1/import_sessions_spec.rb new file mode 100644 index 000000000..02c9af9f9 --- /dev/null +++ b/spec/requests/api/v1/import_sessions_spec.rb @@ -0,0 +1,430 @@ +# frozen_string_literal: true + +require 'swagger_helper' + +RSpec.describe 'API V1 Import Sessions', type: :request do + let(:user) { users(:empty) } + let(:family) { user.family } + + let(:api_key) { api_keys(:active_key) } + let(:api_key_without_write_scope) { api_keys(:one) } + let(:api_key_without_read_scope) { api_keys(:expired_key) } + + let(:'X-Api-Key') { api_key.plain_key } + + let(:entity_ndjson) do + { + type: 'Account', + data: { + id: 'docs-account-1', + name: 'Docs Checking', + balance: '100.00', + currency: 'USD', + accountable_type: 'Depository' + } + }.to_json + end + + let(:transaction_ndjson) do + { + type: 'Transaction', + data: { + id: 'docs-transaction-1', + account_id: 'docs-account-1', + date: '2024-01-15', + amount: '-12.34', + currency: 'USD', + name: 'Docs Transaction' + } + }.to_json + end + + path '/api/v1/import_sessions' do + post 'Create import session' do + description 'Create or idempotently retrieve a multi-file SureImport session keyed by client_session_id.' + tags 'Import Sessions' + security [ { apiKeyAuth: [] } ] + consumes 'application/json' + produces 'application/json' + + parameter name: :body, in: :body, required: false, schema: { + type: :object, + properties: { + type: { + type: :string, + enum: %w[SureImport], + description: 'Import session type. Only SureImport is supported.' + }, + client_session_id: { + type: :string, + nullable: true, + description: 'Client-provided idempotency key for the full import session.' + }, + expected_chunks: { + type: :integer, + minimum: 1, + nullable: true, + description: 'Expected number of ordered chunks before publish is allowed.' + } + } + } + + response '201', 'import session created' do + schema '$ref' => '#/components/schemas/ImportSessionResponse' + + let(:body) do + { + type: 'SureImport', + client_session_id: 'docs-session-1', + expected_chunks: 2 + } + end + + run_test! + end + + response '401', 'unauthorized' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:'X-Api-Key') { nil } + let(:body) { { type: 'SureImport' } } + + run_test! + end + + response '403', 'insufficient scope' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:'X-Api-Key') { api_key_without_write_scope.plain_key } + let(:body) { { type: 'SureImport' } } + + run_test! + end + + response '409', 'client session conflict' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + before do + family.import_sessions.create!( + client_session_id: 'docs-session-conflict', + expected_chunks: 1 + ) + end + + let(:body) do + { + type: 'SureImport', + client_session_id: 'docs-session-conflict', + expected_chunks: 2 + } + end + + run_test! + end + + response '422', 'validation error' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:body) { { type: 'TransactionImport' } } + + run_test! + end + end + end + + path '/api/v1/import_sessions/{id}' do + parameter name: :id, in: :path, type: :string, required: true, description: 'Import session ID' + + let(:import_session) { family.import_sessions.create!(expected_chunks: 1) } + + get 'Retrieve import session' do + description 'Retrieve import session status, chunk status, per-entity summary counts, and safe error details.' + tags 'Import Sessions' + security [ { apiKeyAuth: [] } ] + produces 'application/json' + + let(:id) { import_session.id } + + response '200', 'import session retrieved' do + schema '$ref' => '#/components/schemas/ImportSessionResponse' + + run_test! + end + + response '401', 'unauthorized' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:'X-Api-Key') { nil } + + run_test! + end + + response '403', 'insufficient scope' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:'X-Api-Key') { api_key_without_read_scope.plain_key } + + run_test! + end + + response '404', 'import session not found' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:id) { SecureRandom.uuid } + + run_test! + end + end + end + + path '/api/v1/import_sessions/{id}/chunks' do + parameter name: :id, in: :path, type: :string, required: true, description: 'Import session ID' + + let(:import_session) { family.import_sessions.create!(expected_chunks: 2) } + let(:id) { import_session.id } + + post 'Upload import session chunk' do + description 'Attach an ordered Sure NDJSON chunk to an import session. Chunks are idempotent by sequence and client_chunk_id with content verification.' + tags 'Import Sessions' + security [ { apiKeyAuth: [] } ] + consumes 'application/json', 'multipart/form-data' + produces 'application/json' + metadata[:operation][:requestBody] = { + required: true, + content: { + 'application/json' => { + schema: { + type: :object, + required: %w[sequence raw_file_content], + properties: { + sequence: { + type: :integer, + minimum: 1, + description: 'One-based chunk sequence. Earlier dependency chunks must have lower sequence numbers.' + }, + client_chunk_id: { + type: :string, + nullable: true, + description: 'Client-provided idempotency key for this chunk.' + }, + raw_file_content: { + type: :string, + description: 'Raw Sure NDJSON content. Each chunk is limited to 10MB.' + } + } + } + }, + 'multipart/form-data' => { + schema: { + type: :object, + required: %w[sequence file], + properties: { + sequence: { + type: :integer, + minimum: 1, + description: 'One-based chunk sequence. Earlier dependency chunks must have lower sequence numbers.' + }, + client_chunk_id: { + type: :string, + nullable: true, + description: 'Client-provided idempotency key for this chunk.' + }, + file: { + type: :string, + format: :binary, + description: 'Multipart Sure NDJSON file upload. Each chunk is limited to 10MB.' + } + } + } + } + } + } + + parameter name: :body, in: :body, required: false + + response '201', 'chunk uploaded' do + schema '$ref' => '#/components/schemas/ImportSessionResponse' + + let(:body) do + { + sequence: 1, + client_chunk_id: 'docs-entities', + raw_file_content: entity_ndjson + } + end + + run_test! + end + + response '401', 'unauthorized' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:'X-Api-Key') { nil } + let(:body) { { sequence: 1, raw_file_content: entity_ndjson } } + + run_test! + end + + response '403', 'insufficient scope' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:'X-Api-Key') { api_key_without_write_scope.plain_key } + let(:body) { { sequence: 1, raw_file_content: entity_ndjson } } + + run_test! + end + + response '409', 'chunk conflict' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + before do + import_session.attach_chunk!( + sequence: 1, + client_chunk_id: 'docs-entities', + content: entity_ndjson, + filename: 'entities.ndjson', + content_type: 'application/x-ndjson' + ) + end + + let(:body) do + { + sequence: 1, + client_chunk_id: 'docs-entities', + raw_file_content: transaction_ndjson + } + end + + run_test! + end + + response '404', 'import session not found' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:id) { SecureRandom.uuid } + let(:body) { { sequence: 1, raw_file_content: entity_ndjson } } + + run_test! + end + + response '422', 'missing or invalid content' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:body) { { sequence: 1 } } + + run_test! + end + end + end + + path '/api/v1/import_sessions/{id}/publish' do + parameter name: :id, in: :path, type: :string, required: true, description: 'Import session ID' + + let(:import_session) { family.import_sessions.create!(expected_chunks: 1) } + let(:id) { import_session.id } + + post 'Publish import session' do + description 'Queue ordered chunk processing for a SureImport session. Later chunks can reference source IDs mapped by earlier chunks.' + tags 'Import Sessions' + security [ { apiKeyAuth: [] } ] + produces 'application/json' + + response '202', 'import session publish queued' do + schema '$ref' => '#/components/schemas/ImportSessionResponse' + + before do + import_session.attach_chunk!( + sequence: 1, + client_chunk_id: 'docs-entities', + content: entity_ndjson, + filename: 'entities.ndjson', + content_type: 'application/x-ndjson' + ) + end + + run_test! + end + + response '401', 'unauthorized' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:'X-Api-Key') { nil } + + run_test! + end + + response '403', 'insufficient scope' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:'X-Api-Key') { api_key_without_write_scope.plain_key } + + run_test! + end + + response '422', 'max_row_count_exceeded' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + before do + import_session.attach_chunk!( + sequence: 1, + client_chunk_id: 'docs-entities', + content: entity_ndjson, + filename: 'entities.ndjson', + content_type: 'application/x-ndjson' + ) + import_session.imports.update_all(rows_count: SureImport.max_row_count + 1) + end + + run_test! + end + + response '409', 'missing expected chunks' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:import_session) { family.import_sessions.create!(expected_chunks: 2) } + + before do + import_session.attach_chunk!( + sequence: 1, + client_chunk_id: 'docs-entities', + content: entity_ndjson, + filename: 'entities.ndjson', + content_type: 'application/x-ndjson' + ) + end + + run_test! + end + + response '503', 'enqueue failed' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + before do + import_session.attach_chunk!( + sequence: 1, + client_chunk_id: 'docs-entities', + content: entity_ndjson, + filename: 'entities.ndjson', + content_type: 'application/x-ndjson' + ) + end + + around do |example| + ImportSessionJob.stub(:perform_later, ->(_import_session) { raise StandardError, 'queue offline' }) do + example.run + end + end + + run_test! + end + + response '404', 'import session not found' do + schema '$ref' => '#/components/schemas/ErrorResponse' + + let(:id) { SecureRandom.uuid } + + run_test! + end + end + end +end diff --git a/spec/swagger_helper.rb b/spec/swagger_helper.rb index c17ec07f2..22c3dd0cc 100644 --- a/spec/swagger_helper.rb +++ b/spec/swagger_helper.rb @@ -1172,6 +1172,68 @@ RSpec.configure do |config| data: { '$ref' => '#/components/schemas/ImportDetail' } } }, + ImportSessionChunk: { + type: :object, + required: %w[id sequence status rows_count summary created_at updated_at], + properties: { + id: { type: :string, format: :uuid }, + sequence: { type: :integer, minimum: 1 }, + client_chunk_id: { type: :string, nullable: true }, + status: { type: :string, enum: %w[pending importing complete failed] }, + rows_count: { type: :integer, minimum: 0 }, + summary: { + type: :object, + additionalProperties: { + type: :object, + additionalProperties: { type: :integer } + } + }, + error: { + type: :object, + nullable: true, + additionalProperties: true + }, + created_at: { type: :string, format: :'date-time' }, + updated_at: { type: :string, format: :'date-time' } + } + }, + ImportSession: { + type: :object, + required: %w[id type status chunks_count summary chunks created_at updated_at], + properties: { + id: { type: :string, format: :uuid }, + type: { type: :string, enum: %w[SureImport] }, + status: { type: :string, enum: %w[pending importing complete failed] }, + client_session_id: { type: :string, nullable: true }, + expected_chunks: { type: :integer, nullable: true, minimum: 1 }, + chunks_count: { type: :integer, minimum: 0 }, + summary: { + type: :object, + additionalProperties: { + type: :object, + additionalProperties: { type: :integer } + } + }, + error: { + type: :object, + nullable: true, + additionalProperties: true + }, + chunks: { + type: :array, + items: { '$ref' => '#/components/schemas/ImportSessionChunk' } + }, + created_at: { type: :string, format: :'date-time' }, + updated_at: { type: :string, format: :'date-time' } + } + }, + ImportSessionResponse: { + type: :object, + required: %w[data], + properties: { + data: { '$ref' => '#/components/schemas/ImportSession' } + } + }, ProviderConnectionInstitution: { type: :object, required: %w[name], diff --git a/test/controllers/api/v1/import_sessions_controller_test.rb b/test/controllers/api/v1/import_sessions_controller_test.rb new file mode 100644 index 000000000..3a90f361d --- /dev/null +++ b/test/controllers/api/v1/import_sessions_controller_test.rb @@ -0,0 +1,308 @@ +# frozen_string_literal: true + +require "test_helper" + +class Api::V1::ImportSessionsControllerTest < ActionDispatch::IntegrationTest + include ActiveJob::TestHelper + + setup do + @user = users(:family_admin) + @family = @user.family + @api_key = api_keys(:active_key) + @read_only_api_key = api_keys(:one) + + Redis.new.del("api_rate_limit:#{@api_key.id}") + Redis.new.del("api_rate_limit:#{@read_only_api_key.id}") + end + + test "creates an idempotent Sure import session" do + assert_difference("ImportSession.count", 1) do + post api_v1_import_sessions_url, + params: { + type: "SureImport", + client_session_id: "client-session-1", + expected_chunks: 2 + }, + headers: api_headers(@api_key) + end + + assert_response :created + first_id = JSON.parse(response.body).dig("data", "id") + + assert_no_difference("ImportSession.count") do + post api_v1_import_sessions_url, + params: { + type: "SureImport", + client_session_id: "client-session-1", + expected_chunks: 2 + }, + headers: api_headers(@api_key) + end + + assert_response :created + assert_equal first_id, JSON.parse(response.body).dig("data", "id") + end + + test "rejects unsupported import session types" do + assert_no_difference("ImportSession.count") do + post api_v1_import_sessions_url, + params: { type: "TransactionImport" }, + headers: api_headers(@api_key) + end + + assert_response :unprocessable_entity + assert_equal "validation_failed", JSON.parse(response.body)["error"] + end + + test "rejects malformed expected chunk counts" do + assert_no_difference("ImportSession.count") do + post api_v1_import_sessions_url, + params: { type: "SureImport", expected_chunks: "2abc" }, + headers: api_headers(@api_key) + end + + assert_response :unprocessable_entity + assert_equal "validation_failed", JSON.parse(response.body)["error"] + end + + test "requires authentication for session creation" do + post api_v1_import_sessions_url, params: { type: "SureImport" } + + assert_response :unauthorized + assert_equal "unauthorized", JSON.parse(response.body)["error"] + end + + test "uploads ordered chunks and publishes a full-fidelity transaction import" do + session = build_import_session + + post chunks_api_v1_import_session_url(session), + params: { + sequence: 1, + client_chunk_id: "entities", + raw_file_content: build_ndjson(entity_records) + }, + headers: api_headers(@api_key) + + assert_response :created + assert_equal 1, JSON.parse(response.body).dig("data", "chunks_count") + + post chunks_api_v1_import_session_url(session), + params: { + sequence: 2, + client_chunk_id: "transactions", + raw_file_content: build_ndjson(transaction_records) + }, + headers: api_headers(@api_key) + + assert_response :created + + perform_enqueued_jobs do + post publish_api_v1_import_session_url(session), headers: api_headers(@api_key) + end + + assert_response :accepted + session.reload + assert session.complete? + assert_equal 1, session.summary.dig("transactions", "created") + + entry = @family.accounts.find_by!(name: "API Session Checking").entries.find_by!(name: "API Grocery Run") + transaction = entry.entryable + assert_equal "API Groceries", transaction.category.name + assert_equal "API Market", transaction.merchant.name + assert_equal [ "API Weekly" ], transaction.tags.map(&:name) + end + + test "rejects replayed chunk with different content" do + session = build_import_session + params = { + sequence: 1, + client_chunk_id: "entities", + raw_file_content: build_ndjson(entity_records) + } + + post chunks_api_v1_import_session_url(session), params: params, headers: api_headers(@api_key) + assert_response :created + + post chunks_api_v1_import_session_url(session), + params: params.merge(raw_file_content: build_ndjson(transaction_records)), + headers: api_headers(@api_key) + + assert_response :conflict + assert_equal "import_session_conflict", JSON.parse(response.body)["error"] + end + + test "requires chunk sequence" do + session = build_import_session + + post chunks_api_v1_import_session_url(session), + params: { raw_file_content: build_ndjson(entity_records) }, + headers: api_headers(@api_key) + + assert_response :bad_request + assert_equal "bad_request", JSON.parse(response.body)["error"] + end + + test "rejects malformed chunk sequence values" do + session = build_import_session + + post chunks_api_v1_import_session_url(session), + params: { sequence: "1abc", raw_file_content: build_ndjson(entity_records) }, + headers: api_headers(@api_key) + + assert_response :conflict + assert_equal "import_session_conflict", JSON.parse(response.body)["error"] + end + + test "shows import session with read scope" do + session = build_import_session + + get api_v1_import_session_url(session), headers: api_headers(@read_only_api_key) + + assert_response :success + data = JSON.parse(response.body)["data"] + assert_equal session.id, data["id"] + assert_equal "SureImport", data["type"] + end + + test "shows chunks in sequence order" do + session = build_import_session + session.imports.create!( + family: @family, + type: "SureImport", + sequence: 2, + checksum: Digest::SHA256.hexdigest("two") + ) + session.imports.create!( + family: @family, + type: "SureImport", + sequence: 1, + checksum: Digest::SHA256.hexdigest("one") + ) + + get api_v1_import_session_url(session), headers: api_headers(@api_key) + + assert_response :success + assert_equal [ 1, 2 ], JSON.parse(response.body).dig("data", "chunks").map { |chunk| chunk["sequence"] } + end + + test "requires write scope for session mutation" do + assert_no_difference("ImportSession.count") do + post api_v1_import_sessions_url, + params: { type: "SureImport" }, + headers: api_headers(@read_only_api_key) + end + + assert_response :forbidden + assert_equal "insufficient_scope", JSON.parse(response.body)["error"] + end + + test "rejects publishing a session with no chunks" do + session = @family.import_sessions.create! + + post publish_api_v1_import_session_url(session), headers: api_headers(@api_key) + + assert_response :conflict + assert_equal "import_session_conflict", JSON.parse(response.body)["error"] + end + + test "returns stable error when publish cannot enqueue" do + session = build_import_session + session.attach_chunk!( + sequence: 1, + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + session.attach_chunk!( + sequence: 2, + content: build_ndjson(transaction_records), + filename: "transactions.ndjson", + content_type: "application/x-ndjson" + ) + + ImportSessionJob.stub(:perform_later, ->(_import_session) { raise StandardError, "redis://secret.local/0" }) do + post publish_api_v1_import_session_url(session), headers: api_headers(@api_key) + end + + assert_response :service_unavailable + body = JSON.parse(response.body) + assert_equal "import_enqueue_failed", body["error"] + assert_equal "Import session could not be queued.", body["message"] + assert_no_match(/secret/, response.body) + end + + test "does not expose another family's import session" do + other_family = Family.create!(name: "Other Family", currency: "USD", locale: "en") + other_session = other_family.import_sessions.create! + + get api_v1_import_session_url(other_session), headers: api_headers(@api_key) + + assert_response :not_found + end + + private + def build_import_session + @family.import_sessions.create!(expected_chunks: 2) + end + + def entity_records + [ + { + type: "Account", + data: { + id: "api-acct-1", + name: "API Session Checking", + balance: "100.00", + currency: "USD", + accountable_type: "Depository" + } + }, + { + type: "Category", + data: { + id: "api-cat-1", + name: "API Groceries", + color: "#407706", + classification: "expense" + } + }, + { + type: "Merchant", + data: { + id: "api-merchant-1", + name: "API Market" + } + }, + { + type: "Tag", + data: { + id: "api-tag-1", + name: "API Weekly" + } + } + ] + end + + def transaction_records + [ + { + type: "Transaction", + data: { + id: "api-txn-1", + account_id: "api-acct-1", + category_id: "api-cat-1", + merchant_id: "api-merchant-1", + tag_ids: [ "api-tag-1" ], + date: "2024-01-15", + amount: "-12.34", + currency: "USD", + name: "API Grocery Run" + } + } + ] + end + + def build_ndjson(records) + records.map(&:to_json).join("\n") + end +end diff --git a/test/controllers/api/v1/users_controller_test.rb b/test/controllers/api/v1/users_controller_test.rb index 570c0bd9d..32d69c5f9 100644 --- a/test/controllers/api/v1/users_controller_test.rb +++ b/test/controllers/api/v1/users_controller_test.rb @@ -116,6 +116,20 @@ class Api::V1::UsersControllerTest < ActionDispatch::IntegrationTest end test "reset status returns family data counts" do + import_session = @user.family.import_sessions.create!(expected_chunks: 1) + import_session.imports.create!( + family: @user.family, + type: "SureImport", + sequence: 1, + checksum: "a" * 64 + ) + import_session.source_mappings.create!( + family: @user.family, + source_type: "Category", + source_id: "source-category-1", + target: @user.family.categories.first + ) + get "/api/v1/users/reset/status", headers: api_headers(@read_only_api_key) assert_response :ok @@ -124,6 +138,8 @@ class Api::V1::UsersControllerTest < ActionDispatch::IntegrationTest assert_includes %w[complete data_remaining], body["status"] assert_equal body["counts"].values.sum.zero?, body["reset_complete"] assert_equal expected_reset_count_keys.sort, body["counts"].keys.sort + assert_equal 1, body["counts"]["import_sessions"] + assert_equal 1, body["counts"]["import_source_mappings"] end test "reset status ignores the follow-up family sync after reset" do diff --git a/test/jobs/family_reset_job_test.rb b/test/jobs/family_reset_job_test.rb index d5979f3f7..739df4b60 100644 --- a/test/jobs/family_reset_job_test.rb +++ b/test/jobs/family_reset_job_test.rb @@ -10,10 +10,25 @@ class FamilyResetJobTest < ActiveJob::TestCase test "resets family data successfully" do initial_account_count = @family.accounts.count initial_category_count = @family.categories.count + import_session = @family.import_sessions.create!(expected_chunks: 1) + import_session.imports.create!( + family: @family, + type: "SureImport", + sequence: 1, + checksum: "a" * 64 + ) + import_session.source_mappings.create!( + family: @family, + source_type: "Category", + source_id: "source-category-1", + target: @family.categories.first + ) # Family should have existing data assert initial_account_count > 0 assert initial_category_count > 0 + assert_equal 1, @family.import_sessions.count + assert_equal 1, @family.import_source_mappings.count # Don't expect Plaid removal calls since we're using fixtures without setup @plaid_provider.stubs(:remove_item) @@ -23,6 +38,38 @@ class FamilyResetJobTest < ActiveJob::TestCase # All data should be removed assert_equal 0, @family.accounts.reload.count assert_equal 0, @family.categories.reload.count + assert_equal 0, @family.import_sessions.reload.count + assert_equal 0, @family.import_source_mappings.reload.count + assert_equal 0, @family.imports.reload.count + end + + test "reset leaves another family's imports and mappings untouched" do + other_family = Family.create!(name: "Other Family", currency: "USD", locale: "en") + other_category = other_family.categories.create!(name: "Other Category") + other_session = other_family.import_sessions.create!(expected_chunks: 1) + other_import = other_session.imports.create!( + family: other_family, + type: "SureImport", + sequence: 1, + checksum: "b" * 64 + ) + other_mapping = other_session.source_mappings.create!( + family: other_family, + source_type: "Category", + source_id: "source-category-1", + target: other_category + ) + + @family.import_sessions.create!(expected_chunks: 1) + @plaid_provider.stubs(:remove_item) + + FamilyResetJob.perform_now(@family) + + assert ImportSession.exists?(other_session.id) + assert Import.exists?(other_import.id) + assert ImportSourceMapping.exists?(other_mapping.id) + assert Category.exists?(other_category.id) + assert_equal other_category, other_mapping.reload.target end test "resets family data even when Plaid credentials are invalid" do diff --git a/test/models/family/data_exporter_test.rb b/test/models/family/data_exporter_test.rb index 83e4ea8cc..00d4759d7 100644 --- a/test/models/family/data_exporter_test.rb +++ b/test/models/family/data_exporter_test.rb @@ -381,9 +381,12 @@ class Family::DataExporterTest < ActiveSupport::TestCase assert rule_lines.any? - rule_data = JSON.parse(rule_lines.first) + rule_data = rule_lines.map { |line| JSON.parse(line) }.find { |rule| rule["data"]["name"] == "Test Rule" } + + assert_not_nil rule_data assert_equal "Rule", rule_data["type"] assert_equal 1, rule_data["version"] + assert_equal @rule.id, rule_data["data"]["id"] assert rule_data["data"].key?("name") assert rule_data["data"].key?("resource_type") assert rule_data["data"].key?("active") diff --git a/test/models/family/data_importer_test.rb b/test/models/family/data_importer_test.rb index 0ad0c9870..21ea0d4b7 100644 --- a/test/models/family/data_importer_test.rb +++ b/test/models/family/data_importer_test.rb @@ -125,6 +125,26 @@ class Family::DataImporterTest < ActiveSupport::TestCase assert_equal 1, balance.flows_factor end + test "counts skipped balance rows with blank account references once" do + ndjson = build_ndjson([ + { + type: "Balance", + data: { + id: "balance-1", + account_id: "", + date: "2024-01-31", + balance: "1200.00", + currency: "USD" + } + } + ]) + + result = Family::DataImporter.new(@family, ndjson).import! + + assert_equal 1, result.dig(:summary, "balances", "skipped") + assert_not Balance.exists?(date: Date.iso8601("2024-01-31"), currency: "USD", balance: BigDecimal("1200.00")) + end + test "imports duplicate raw balance records idempotently by account date and currency" do balance_record = { type: "Balance", @@ -442,6 +462,23 @@ class Family::DataImporterTest < ActiveSupport::TestCase assert_equal "#FF0000", tag.color end + test "imports tags with deterministic fallback color when source omits color" do + ndjson = build_ndjson([ + { + type: "Tag", + data: { + id: "tag-1", + name: "Important" + } + } + ]) + + Family::DataImporter.new(@family, ndjson).import! + + tag = @family.tags.find_by!(name: "Important") + assert_equal Tag::COLORS.first, tag.color + end + test "imports merchants" do ndjson = build_ndjson([ { @@ -945,6 +982,98 @@ class Family::DataImporterTest < ActiveSupport::TestCase assert_empty explicit_empty_child.transaction.tags end + test "session transaction reimport only replaces current family taggings" do + session = @family.import_sessions.create!(expected_chunks: 1) + account = @family.accounts.create!( + name: "Session Checking", + accountable: Depository.new, + balance: 100, + currency: "USD" + ) + original_tag = @family.tags.create!(name: "Original") + replacement_tag = @family.tags.create!(name: "Replacement") + entry = account.entries.create!( + date: Date.parse("2024-01-01"), + amount: -10, + currency: "USD", + name: "Original transaction", + source: "sure_import_session:#{session.id}", + external_id: "Transaction:txn-1", + entryable: Transaction.new(kind: "standard") + ) + transaction = entry.entryable + transaction.taggings.create!(tag: original_tag) + + other_family = Family.create!(name: "Other Family", currency: "USD") + other_session = other_family.import_sessions.create!(expected_chunks: 1) + other_account = other_family.accounts.create!( + name: "Other Checking", + accountable: Depository.new, + balance: 100, + currency: "USD" + ) + other_tag = other_family.tags.create!(name: "Other Original") + other_entry = other_account.entries.create!( + date: Date.parse("2024-01-01"), + amount: -10, + currency: "USD", + name: "Other transaction", + source: "sure_import_session:#{other_session.id}", + external_id: "Transaction:txn-1", + entryable: Transaction.new(kind: "standard") + ) + other_transaction = other_entry.entryable + other_transaction.taggings.create!(tag: other_tag) + + other_session.source_mappings.create!( + family: other_family, + source_type: "Transaction", + source_id: "txn-1", + target: other_transaction + ) + + session.source_mappings.create!( + family: @family, + source_type: "Account", + source_id: "acct-1", + target: account + ) + session.source_mappings.create!( + family: @family, + source_type: "Tag", + source_id: "tag-1", + target: replacement_tag + ) + session.source_mappings.create!( + family: @family, + source_type: "Transaction", + source_id: "txn-1", + target: transaction + ) + + ndjson = build_ndjson([ + { + type: "Transaction", + data: { + id: "txn-1", + account_id: "acct-1", + tag_ids: [ "tag-1" ], + date: "2024-02-01", + amount: "-12.34", + currency: "USD", + name: "Updated transaction" + } + } + ]) + + Family::DataImporter.new(@family, ndjson, import_session: session).import! + + assert_equal [ "Replacement" ], transaction.reload.tags.map(&:name) + assert_equal "Updated transaction", entry.reload.name + assert_equal [ "Other Original" ], other_transaction.reload.tags.map(&:name) + assert_equal "Other transaction", other_entry.reload.name + end + test "imports trades with securities" do ndjson = build_ndjson([ { @@ -1429,7 +1558,7 @@ class Family::DataImporterTest < ActiveSupport::TestCase amount: "100.00", name: "Transfer to savings", currency: "USD", - kind: "funds_movement" + kind: "standard" } }, { @@ -1441,7 +1570,7 @@ class Family::DataImporterTest < ActiveSupport::TestCase amount: "-100.00", name: "Transfer from checking", currency: "USD", - kind: "funds_movement" + kind: "standard" } }, { @@ -1496,6 +1625,8 @@ class Family::DataImporterTest < ActiveSupport::TestCase assert_equal "Confirmed by user", transfer.notes assert_equal "Transfer from checking", transfer.inflow_transaction.entry.name assert_equal "Transfer to savings", transfer.outflow_transaction.entry.name + assert_equal "funds_movement", transfer.inflow_transaction.kind + assert_equal "funds_movement", transfer.outflow_transaction.kind rejected_transfer = RejectedTransfer .joins(inflow_transaction: :entry) @@ -1711,6 +1842,61 @@ class Family::DataImporterTest < ActiveSupport::TestCase assert_equal category.id, action.value end + test "session rule reimport only replaces current family conditions and actions" do + rule = @family.rules.build(name: "Original Rule", resource_type: "transaction", active: true) + rule.conditions.build(condition_type: "transaction_name", operator: "like", value: "old") + rule.actions.build(action_type: "auto_categorize") + rule.save! + + other_family = Family.create!(name: "Other Rules Family", currency: "USD") + other_rule = other_family.rules.build(name: "Other Rule", resource_type: "transaction", active: true) + other_rule.conditions.build(condition_type: "transaction_name", operator: "like", value: "other-old") + other_rule.actions.build(action_type: "auto_categorize") + other_rule.save! + + other_session = other_family.import_sessions.create!(expected_chunks: 1) + other_session.source_mappings.create!( + family: other_family, + source_type: "Rule", + source_id: "rule-1", + target: other_rule + ) + + session = @family.import_sessions.create!(expected_chunks: 1) + session.source_mappings.create!( + family: @family, + source_type: "Rule", + source_id: "rule-1", + target: rule + ) + + ndjson = build_ndjson([ + { + type: "Rule", + version: 1, + data: { + id: "rule-1", + name: "Updated Rule", + resource_type: "transaction", + active: true, + conditions: [ + { condition_type: "transaction_name", operator: "like", value: "new" } + ], + actions: [ + { action_type: "set_transaction_name", value: "Renamed" } + ] + } + } + ]) + + Family::DataImporter.new(@family, ndjson, import_session: session).import! + + assert_equal [ "new" ], rule.reload.conditions.map(&:value) + assert_equal [ "set_transaction_name" ], rule.actions.map(&:action_type) + assert_equal [ "other-old" ], other_rule.reload.conditions.map(&:value) + assert_equal [ "auto_categorize" ], other_rule.actions.map(&:action_type) + end + test "imports rules from normalized operand value refs" do ndjson = build_ndjson([ { diff --git a/test/models/import_session_test.rb b/test/models/import_session_test.rb new file mode 100644 index 000000000..d1b80df4b --- /dev/null +++ b/test/models/import_session_test.rb @@ -0,0 +1,809 @@ +require "test_helper" + +class ImportSessionTest < ActiveSupport::TestCase + setup do + @family = families(:empty) + end + + test "job requires import session" do + error = assert_raises(ArgumentError) do + ImportSessionJob.perform_now(nil) + end + + assert_equal "ImportSessionJob requires an import_session", error.message + end + + test "job publishes import session" do + import_session = @family.import_sessions.create! + import_session.expects(:publish).once + + ImportSessionJob.perform_now(import_session) + end + + test "publishes ordered chunks with source mappings across files" do + session = @family.import_sessions.create!(expected_chunks: 2) + session.attach_chunk!( + sequence: 1, + client_chunk_id: "entities", + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + session.attach_chunk!( + sequence: 2, + client_chunk_id: "transactions", + content: build_ndjson(transaction_records), + filename: "transactions.ndjson", + content_type: "application/x-ndjson" + ) + + session.publish + + assert session.reload.complete? + account = @family.accounts.find_by!(name: "Session Checking") + entry = account.entries.find_by!(name: "Grocery Run") + transaction = entry.entryable + + assert_equal "Groceries", transaction.category.name + assert_equal "Market", transaction.merchant.name + assert_equal [ "Weekly" ], transaction.tags.map(&:name) + assert_equal "sure_import_session:#{session.id}", entry.source + assert_equal "Transaction:txn-1", entry.external_id + assert_equal 1, session.summary.dig("transactions", "created") + + assert_source_mapping session, "Account", "acct-1", account + assert_source_mapping session, "Category", "cat-1", transaction.category + assert_source_mapping session, "Merchant", "merchant-1", transaction.merchant + assert_source_mapping session, "Tag", "tag-1", transaction.tags.first + assert_source_mapping session, "Transaction", "txn-1", transaction + end + + test "publishing session chunks records readback verification for each chunk" do + session = @family.import_sessions.create!(expected_chunks: 2) + session.attach_chunk!( + sequence: 1, + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + session.attach_chunk!( + sequence: 2, + content: build_ndjson(transaction_records), + filename: "transactions.ndjson", + content_type: "application/x-ndjson" + ) + + session.publish + + entity_chunk, transaction_chunk = session.imports.ordered_by_sequence.to_a + assert_equal 1, entity_chunk.expected_record_counts["accounts"] + assert_equal 1, transaction_chunk.expected_record_counts["transactions"] + assert_includes SureImport::VERIFICATION_STATUSES, entity_chunk.readback_verification["status"] + assert_equal 1, entity_chunk.readback_verification.dig("checked_counts", "accounts") + assert_equal 1, transaction_chunk.readback_verification.dig("checked_counts", "transactions") + assert_equal 1, transaction_chunk.readback_verification.dig("actual_delta_counts", "transactions") + end + + test "publishing the same complete session does not duplicate imported transactions" do + session = @family.import_sessions.create!(expected_chunks: 2) + session.attach_chunk!( + sequence: 1, + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + session.attach_chunk!( + sequence: 2, + content: build_ndjson(transaction_records), + filename: "transactions.ndjson", + content_type: "application/x-ndjson" + ) + + session.publish + + assert_no_difference("Entry.count") do + session.publish + end + end + + test "republishing failed session skips complete chunks and retries failed chunks" do + session = @family.import_sessions.create!(expected_chunks: 2) + session.attach_chunk!( + sequence: 1, + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + session.attach_chunk!( + sequence: 2, + content: build_ndjson(transaction_records), + filename: "transactions.ndjson", + content_type: "application/x-ndjson" + ) + complete_chunk = session.imports.find_by!(sequence: 1) + failed_chunk = session.imports.find_by!(sequence: 2) + complete_chunk.update!(status: :complete, summary: { "accounts" => { "created" => 1 } }, error_details: {}) + failed_chunk.update!(status: :failed, error: "transient failure", error_details: { "code" => "import_failed" }) + session.update!( + status: :failed, + summary: complete_chunk.summary, + error_details: { "code" => "import_failed", "message" => "transient failure" } + ) + processed_sequences = [] + + importer_factory = lambda do |_family, _content, import_session:, import:| + processed_sequences << import.sequence + flunk "completed chunk was reprocessed" if import.sequence == 1 + assert_equal session, import_session + + Object.new.tap do |importer| + importer.define_singleton_method(:import!) do + { + accounts: [], + entries: [], + summary: { "transactions" => { "created" => 1 } } + } + end + end + end + + Family::DataImporter.stub(:new, importer_factory) do + session.publish + end + + assert_equal [ 2 ], processed_sequences + assert complete_chunk.reload.complete? + assert failed_chunk.reload.complete? + assert session.reload.complete? + assert_equal 1, session.summary.dig("accounts", "created") + assert_equal 1, session.summary.dig("transactions", "created") + end + + test "publish keeps session complete and records safe error when family sync enqueue fails" do + session = @family.import_sessions.create!(expected_chunks: 1) + session.attach_chunk!( + sequence: 1, + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + + Family.any_instance.stubs(:sync_later).raises(StandardError, "redis://secret.local/0") + session.publish + + assert session.reload.complete? + assert_equal "family_sync_enqueue_failed", session.error_details["code"] + assert_equal "Family sync could not be queued after import completion.", session.error_details["message"] + assert_no_match(/secret/, session.error_details.to_json) + end + + test "publish stores generic error details for unexpected import failures" do + session = @family.import_sessions.create!(expected_chunks: 1) + session.attach_chunk!( + sequence: 1, + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + + importer_factory = ->(*) { raise StandardError, "redis://secret.local/0" } + + Family::DataImporter.stub(:new, importer_factory) do + session.publish + end + + assert session.reload.failed? + assert_equal "Import session failed.", session.imports.first.error + assert_equal "import_failed", session.error_details["code"] + assert_equal "Import session failed.", session.error_details["message"] + assert_no_match(/secret/, session.error_details.to_json) + end + + test "publish later requires the exact expected chunk sequences" do + session = @family.import_sessions.create!(expected_chunks: 2) + session.attach_chunk!( + sequence: 1, + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + + error = assert_raises(ImportSession::ConflictError) do + session.publish_later + end + + expected_message = "import session chunks do not match expected sequences " \ + "(missing sequences: 2)" + assert_equal expected_message, error.message + assert session.reload.pending? + end + + test "chunk upload rejects sequences beyond the expected chunk count" do + session = @family.import_sessions.create!(expected_chunks: 1) + + error = assert_raises(ImportSession::ConflictError) do + session.attach_chunk!( + sequence: 2, + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + end + + assert_equal "sequence exceeds expected_chunks", error.message + assert_empty session.imports + end + + test "publish later restores status and records enqueue failures" do + session = @family.import_sessions.create!(expected_chunks: 1) + session.attach_chunk!( + sequence: 1, + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + + ImportSessionJob.stub(:perform_later, ->(_import_session) { raise StandardError, "queue offline" }) do + error = assert_raises(ImportSession::EnqueueError) do + session.publish_later + end + + assert_equal "Import session could not be queued.", error.message + end + + assert session.reload.pending? + assert_equal "import_enqueue_failed", session.error_details["code"] + assert_equal "Import session could not be queued.", session.error_details["message"] + end + + test "publish later syncs chunk row counts before enforcing row limit" do + session = @family.import_sessions.create!(expected_chunks: 1) + session.attach_chunk!( + sequence: 1, + content: build_ndjson(entity_records + transaction_records), + filename: "session.ndjson", + content_type: "application/x-ndjson" + ) + session.imports.update_all(rows_count: 0) + + SureImport.stub(:max_row_count, 1) do + assert_raises(Import::MaxRowCountExceededError) { session.publish_later } + end + + assert session.reload.pending? + assert_equal 5, session.imports.reload.first.rows_count + end + + test "fails loudly when a later chunk references a missing source id" do + session = @family.import_sessions.create!(expected_chunks: 1) + session.attach_chunk!( + sequence: 1, + content: build_ndjson(transaction_records), + filename: "transactions.ndjson", + content_type: "application/x-ndjson" + ) + + session.publish + + assert session.reload.failed? + chunk = session.imports.first + assert chunk.failed? + assert_equal "missing_source_reference", chunk.error_details["code"] + assert_equal "acct-1", chunk.error_details["source_id"] + assert_equal 0, @family.entries.count + end + + test "source mappings from another family cannot satisfy missing references" do + other_family = Family.create!(name: "Other Family", currency: "USD", locale: "en") + other_session = other_family.import_sessions.create!(expected_chunks: 1) + other_session.attach_chunk!( + sequence: 1, + content: build_ndjson(entity_records), + filename: "other-entities.ndjson", + content_type: "application/x-ndjson" + ) + other_session.publish + + assert other_session.reload.complete? + assert_equal 1, other_session.source_mappings.where(source_type: "Account", source_id: "acct-1").count + + session = @family.import_sessions.create!(expected_chunks: 1) + session.attach_chunk!( + sequence: 1, + content: build_ndjson(transaction_records), + filename: "transactions.ndjson", + content_type: "application/x-ndjson" + ) + + session.publish + + assert session.reload.failed? + assert_equal "missing_source_reference", session.imports.first.error_details["code"] + assert_equal "acct-1", session.imports.first.error_details["source_id"] + assert_equal 0, @family.entries.count + end + + test "session mode rejects invalid account accountable types" do + session = @family.import_sessions.create!(expected_chunks: 1) + session.attach_chunk!( + sequence: 1, + content: build_ndjson([ + { + type: "Account", + data: { + id: "acct-invalid", + name: "Invalid Account", + balance: "100.00", + currency: "USD", + accountable_type: "Kernel" + } + } + ]), + filename: "accounts.ndjson", + content_type: "application/x-ndjson" + ) + + session.publish + + assert session.reload.failed? + assert_equal 0, @family.accounts.count + assert_equal "invalid_import_record", session.imports.first.error_details["code"] + assert_equal "Account", session.imports.first.error_details["record_type"] + assert_equal "accountable_type", session.imports.first.error_details["field"] + assert_equal "Kernel", session.imports.first.error_details["value"] + end + + test "chunk upload is idempotent by sequence and checksum" do + session = @family.import_sessions.create! + content = build_ndjson(entity_records) + + first = session.attach_chunk!( + sequence: 1, + content: content, + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + second = session.attach_chunk!( + sequence: 1, + content: content, + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + + assert_equal first.id, second.id + assert_raises(ImportSession::ConflictError) do + session.attach_chunk!( + sequence: 1, + content: build_ndjson(transaction_records), + filename: "different.ndjson", + content_type: "application/x-ndjson" + ) + end + end + + test "chunk upload repairs incomplete existing chunk before accepting retry" do + session = @family.import_sessions.create! + content = build_ndjson(transaction_records) + chunk = session.imports.create!( + family: @family, + type: "SureImport", + sequence: 1, + client_chunk_id: "entities", + checksum: Digest::SHA256.hexdigest(content) + ) + + result = session.attach_chunk!( + sequence: 1, + client_chunk_id: "entities", + content: content, + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + + assert_equal chunk.id, result.id + assert result.reload.ndjson_file.attached? + assert_equal 1, result.rows_count + end + + test "chunk upload resyncs attached existing chunk before accepting retry" do + session = @family.import_sessions.create! + content = build_ndjson(transaction_records) + chunk = session.imports.create!( + family: @family, + type: "SureImport", + sequence: 1, + client_chunk_id: "entities", + checksum: Digest::SHA256.hexdigest(content) + ) + chunk.ndjson_file.attach( + io: StringIO.new(content), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + + result = session.attach_chunk!( + sequence: 1, + client_chunk_id: "entities", + content: content, + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + + assert_equal chunk.id, result.id + assert_equal 1, result.rows_count + end + + test "chunk upload rejects inconsistent sequence and client chunk keys" do + session = @family.import_sessions.create! + session.attach_chunk!( + sequence: 1, + client_chunk_id: "entities", + content: build_ndjson(entity_records), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + session.attach_chunk!( + sequence: 2, + client_chunk_id: "transactions", + content: build_ndjson(transaction_records), + filename: "transactions.ndjson", + content_type: "application/x-ndjson" + ) + + error = assert_raises(ImportSession::ConflictError) do + session.attach_chunk!( + sequence: 1, + client_chunk_id: "transactions", + content: build_ndjson(transaction_records), + filename: "transactions.ndjson", + content_type: "application/x-ndjson" + ) + end + + assert_equal "sequence and client_chunk_id refer to different chunks", error.message + end + + test "chunk upload treats duplicate insert races as idempotent retries" do + session = @family.import_sessions.create! + content = build_ndjson(entity_records) + existing = session.imports.create!( + family: @family, + type: "SureImport", + sequence: 1, + client_chunk_id: "entities", + checksum: Digest::SHA256.hexdigest(content) + ) + existing.ndjson_file.attach( + io: StringIO.new(content), + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + existing.sync_ndjson_rows_count! + lookup_count = 0 + + session.stub(:existing_chunk_for!, ->(**) { + lookup_count += 1 + lookup_count == 1 ? nil : existing + }) do + session.stub(:create_chunk!, ->(**) { raise ActiveRecord::RecordNotUnique, "duplicate chunk" }) do + assert_equal existing, session.attach_chunk!( + sequence: 1, + client_chunk_id: "entities", + content: content, + filename: "entities.ndjson", + content_type: "application/x-ndjson" + ) + end + end + + assert_equal 2, lookup_count + end + + test "client session creation treats duplicate insert races as idempotent retries" do + existing = @family.import_sessions.create!(client_session_id: "race-session", expected_chunks: 2) + ImportSession.any_instance.stubs(:save!).raises(ActiveRecord::RecordNotUnique) + + session = ImportSession.create_or_find_for!( + family: @family, + import_type: "SureImport", + client_session_id: "race-session", + expected_chunks: 2 + ) + + assert_equal existing, session + end + + test "client session creation race backfills missing expected chunks" do + existing = @family.import_sessions.create!(client_session_id: "race-session") + racing_session = @family.import_sessions.build(client_session_id: "race-session") + racing_session.stubs(:save!).raises(ActiveRecord::RecordNotUnique) + + @family.import_sessions.stub(:find_or_initialize_by, racing_session) do + session = ImportSession.create_or_find_for!( + family: @family, + import_type: "SureImport", + client_session_id: "race-session", + expected_chunks: 2 + ) + + assert_equal existing, session + end + assert_equal 2, existing.reload.expected_chunks + end + + test "client session creation race preserves expected chunks conflict" do + @family.import_sessions.create!(client_session_id: "race-session", expected_chunks: 2) + ImportSession.any_instance.stubs(:save!).raises(ActiveRecord::RecordNotUnique) + + error = assert_raises(ImportSession::ConflictError) do + ImportSession.create_or_find_for!( + family: @family, + import_type: "SureImport", + client_session_id: "race-session", + expected_chunks: 3 + ) + end + + assert_equal "client_session_id already exists with a different expected_chunks value", error.message + end + + test "session mode rejects rule records without source ids" do + session = @family.import_sessions.create!(expected_chunks: 1) + session.attach_chunk!( + sequence: 1, + content: build_ndjson([ + { + type: "Rule", + data: { + name: "Missing Source Rule", + resource_type: "transaction", + active: true + } + } + ]), + filename: "rules.ndjson", + content_type: "application/x-ndjson" + ) + + session.publish + + assert session.reload.failed? + assert_equal 0, @family.rules.count + assert_equal "missing_source_reference", session.imports.first.error_details["code"] + assert_equal "Rule", session.imports.first.error_details["record_type"] + assert_equal "(blank)", session.imports.first.error_details["source_id"] + end + + test "session mode imports rule records exported by Sure packages" do + source_family = Family.create!(name: "Rule Export Source", currency: "USD", locale: "en") + category = source_family.categories.create!( + name: "Exported Category", + color: "#00AA00", + lucide_icon: "shapes" + ) + source_rule = source_family.rules.build( + name: "Exported Rule", + resource_type: "transaction", + active: true + ) + source_rule.conditions.build( + condition_type: "transaction_name", + operator: "like", + value: "Coffee" + ) + source_rule.actions.build( + action_type: "set_transaction_category", + value: category.id + ) + source_rule.save! + + session = @family.import_sessions.create!(expected_chunks: 1) + session.attach_chunk!( + sequence: 1, + content: exported_ndjson_for(source_family), + filename: "all.ndjson", + content_type: "application/x-ndjson" + ) + + session.publish + + assert session.reload.complete? + imported_rule = @family.rules.find_by!(name: "Exported Rule") + imported_category = @family.categories.find_by!(name: "Exported Category") + + assert_equal 1, session.summary.dig("rules", "created") + assert_equal imported_category.id, imported_rule.actions.first.value + assert_source_mapping session, "Rule", source_rule.id, imported_rule + end + + test "client idempotency keys are bounded before indexed writes" do + session = @family.import_sessions.build(client_session_id: "x" * 256) + + assert_not session.valid? + assert_includes session.errors[:client_session_id], "is too long (maximum is 255 characters)" + + import = @family.imports.build(type: "SureImport", client_chunk_id: "x" * 256) + + assert_not import.valid? + assert_includes import.errors[:client_chunk_id], "is too long (maximum is 255 characters)" + + import.sequence = 0 + import.checksum = "short" + + assert_not import.valid? + assert_includes import.errors[:sequence], "must be greater than 0" + assert_includes import.errors[:checksum], "is the wrong length (should be 64 characters)" + + other_family = Family.create!(name: "Other Import Family", currency: "USD", locale: "en") + import.import_session = other_family.import_sessions.build + import.sequence = nil + import.checksum = nil + + assert_not import.valid? + assert_includes import.errors[:import_session], "must belong to your family" + assert_includes import.errors[:sequence], "must be present for import session chunks" + assert_includes import.errors[:checksum], "must be present for import session chunks" + + mapping = @family.import_source_mappings.build( + import_session: @family.import_sessions.build, + source_type: "x" * 65, + source_id: "x" * 256, + target_type: "Account", + target_id: SecureRandom.uuid + ) + + assert_not mapping.valid? + assert_includes mapping.errors[:source_type], "is too long (maximum is 64 characters)" + assert_includes mapping.errors[:source_id], "is too long (maximum is 255 characters)" + + mapping.source_type = "Unsupported" + mapping.source_id = "acct-1" + + assert_not mapping.valid? + assert_includes mapping.errors[:source_type], "is not included in the list" + + mapping.source_type = "Account" + mapping.target_type = "Unsupported" + + assert_not mapping.valid? + assert_includes mapping.errors[:target_type], "is not included in the list" + end + + test "client idempotency keys are stripped before validation" do + session = @family.import_sessions.create!(client_session_id: " session-1 ") + import = @family.imports.create!(type: "SureImport", client_chunk_id: " chunk-1 ") + category = @family.categories.create!(name: "Mapping Category") + mapping = session.source_mappings.create!( + family: @family, + source_type: "Category", + source_id: " cat-1 ", + target: category + ) + + assert_equal "session-1", session.client_session_id + assert_equal "chunk-1", import.client_chunk_id + assert_equal "cat-1", mapping.source_id + end + + test "session status payloads must remain JSON objects" do + session = @family.import_sessions.build(summary: [], error_details: "failed") + import = @family.imports.build(type: "SureImport", summary: [], error_details: "failed") + + assert_not session.valid? + assert_includes session.errors[:summary], "must be an object" + assert_includes session.errors[:error_details], "must be an object" + + assert_not import.valid? + assert_includes import.errors[:summary], "must be an object" + assert_includes import.errors[:error_details], "must be an object" + end + + test "source mappings must belong to the same family as their import session" do + other_family = Family.create!(name: "Other Mapping Family", currency: "USD", locale: "en") + mapping = other_family.import_source_mappings.build( + import_session: @family.import_sessions.build, + source_type: "Account", + source_id: "acct-1", + target: @family.accounts.build(name: "Session Checking") + ) + + assert_not mapping.valid? + assert_includes mapping.errors[:family], "must match import session" + end + + test "source mapping targets must not cross family boundaries" do + other_family = Family.create!(name: "Other Mapping Target Family", currency: "USD", locale: "en") + mapping = @family.import_source_mappings.build( + import_session: @family.import_sessions.build, + source_type: " Account ", + source_id: "acct-1", + target: other_family.accounts.build(name: "Other Checking") + ) + + assert_not mapping.valid? + assert_equal "Account", mapping.source_type + assert_includes mapping.errors[:target], "must belong to your family" + end + + private + def entity_records + [ + { + type: "Account", + data: { + id: "acct-1", + name: "Session Checking", + balance: "100.00", + currency: "USD", + accountable_type: "Depository", + accountable: { subtype: "checking" } + } + }, + { + type: "Category", + data: { + id: "cat-1", + name: "Groceries", + color: "#407706", + classification: "expense" + } + }, + { + type: "Merchant", + data: { + id: "merchant-1", + name: "Market", + color: "#111111" + } + }, + { + type: "Tag", + data: { + id: "tag-1", + name: "Weekly", + color: "#222222" + } + } + ] + end + + def transaction_records + [ + { + type: "Transaction", + data: { + id: "txn-1", + account_id: "acct-1", + category_id: "cat-1", + merchant_id: "merchant-1", + tag_ids: [ "tag-1" ], + date: "2024-01-15", + amount: "-12.34", + currency: "USD", + name: "Grocery Run" + } + } + ] + end + + def build_ndjson(records) + records.map(&:to_json).join("\n") + end + + def exported_ndjson_for(family) + ndjson = nil + + Zip::File.open_buffer(Family::DataExporter.new(family).generate_export) do |zip| + ndjson = zip.read("all.ndjson") + end + + ndjson + end + + def assert_source_mapping(session, source_type, source_id, target) + mapping = session.source_mappings.find_by!(source_type: source_type, source_id: source_id) + + assert_equal @family, mapping.family + assert_equal target, mapping.target + end +end