Files
sure/test/models/import_session_test.rb
ghost 6e04c6927d feat(imports): add SureImport session batches (#1785)
* feat(imports): add SureImport session batches

Add first-class SureImport sessions for ordered multi-file NDJSON imports.

Persist source mappings across chunks, make session/chunk processing idempotent, expose progress readback, and keep existing single-file import behavior compatible.

Includes the devcontainer libvips runtime dependency needed by ActiveStorage variant tests.

Addresses #1610.

Related to #1458.

* fix(imports): avoid scanner-like API key test data

* test(imports): assert skipped balances are not persisted

* fix(imports): harden session publish retries

Validate expected import chunk sequences exactly before publish, and restore session state with error details when enqueueing the publish job fails.

* fix(imports): close session retry edge cases

Backfill expected chunk counts after client-session insert races and enqueue import-session jobs after the status transition commits. Persist a safe enqueue failure body so API readback does not expose raw queue errors.

* fix(imports): address session publish review gaps

Remove dead transaction external-id assignment, harden session publish retry/sync behavior, align session chunk status docs, and add regression coverage for partial retries and safe enqueue error readback.

* fix(imports): include sessions in family reset

Clear import sessions through the family reset job so chunk imports and source mappings do not survive a reset.

Expose import session and source mapping counts in the reset status response and regenerated OpenAPI schema so polling reflects the full reset surface.

* test(imports): cover split import mapping invariants

* test(imports): cover session verification invariants

* fix(imports): scope SureImport session reimports

* Tighten SureImport session batching

* fix(imports): export rule source ids for sessions

* test(imports): stabilize rule id export assertion

* test(imports): restore reset status session fixture
2026-06-04 11:48:44 +02:00

810 lines
26 KiB
Ruby

require "test_helper"
class ImportSessionTest < ActiveSupport::TestCase
setup do
@family = families(:empty)
end
test "job requires import session" do
error = assert_raises(ArgumentError) do
ImportSessionJob.perform_now(nil)
end
assert_equal "ImportSessionJob requires an import_session", error.message
end
test "job publishes import session" do
import_session = @family.import_sessions.create!
import_session.expects(:publish).once
ImportSessionJob.perform_now(import_session)
end
test "publishes ordered chunks with source mappings across files" do
session = @family.import_sessions.create!(expected_chunks: 2)
session.attach_chunk!(
sequence: 1,
client_chunk_id: "entities",
content: build_ndjson(entity_records),
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
session.attach_chunk!(
sequence: 2,
client_chunk_id: "transactions",
content: build_ndjson(transaction_records),
filename: "transactions.ndjson",
content_type: "application/x-ndjson"
)
session.publish
assert session.reload.complete?
account = @family.accounts.find_by!(name: "Session Checking")
entry = account.entries.find_by!(name: "Grocery Run")
transaction = entry.entryable
assert_equal "Groceries", transaction.category.name
assert_equal "Market", transaction.merchant.name
assert_equal [ "Weekly" ], transaction.tags.map(&:name)
assert_equal "sure_import_session:#{session.id}", entry.source
assert_equal "Transaction:txn-1", entry.external_id
assert_equal 1, session.summary.dig("transactions", "created")
assert_source_mapping session, "Account", "acct-1", account
assert_source_mapping session, "Category", "cat-1", transaction.category
assert_source_mapping session, "Merchant", "merchant-1", transaction.merchant
assert_source_mapping session, "Tag", "tag-1", transaction.tags.first
assert_source_mapping session, "Transaction", "txn-1", transaction
end
test "publishing session chunks records readback verification for each chunk" do
session = @family.import_sessions.create!(expected_chunks: 2)
session.attach_chunk!(
sequence: 1,
content: build_ndjson(entity_records),
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
session.attach_chunk!(
sequence: 2,
content: build_ndjson(transaction_records),
filename: "transactions.ndjson",
content_type: "application/x-ndjson"
)
session.publish
entity_chunk, transaction_chunk = session.imports.ordered_by_sequence.to_a
assert_equal 1, entity_chunk.expected_record_counts["accounts"]
assert_equal 1, transaction_chunk.expected_record_counts["transactions"]
assert_includes SureImport::VERIFICATION_STATUSES, entity_chunk.readback_verification["status"]
assert_equal 1, entity_chunk.readback_verification.dig("checked_counts", "accounts")
assert_equal 1, transaction_chunk.readback_verification.dig("checked_counts", "transactions")
assert_equal 1, transaction_chunk.readback_verification.dig("actual_delta_counts", "transactions")
end
test "publishing the same complete session does not duplicate imported transactions" do
session = @family.import_sessions.create!(expected_chunks: 2)
session.attach_chunk!(
sequence: 1,
content: build_ndjson(entity_records),
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
session.attach_chunk!(
sequence: 2,
content: build_ndjson(transaction_records),
filename: "transactions.ndjson",
content_type: "application/x-ndjson"
)
session.publish
assert_no_difference("Entry.count") do
session.publish
end
end
test "republishing failed session skips complete chunks and retries failed chunks" do
session = @family.import_sessions.create!(expected_chunks: 2)
session.attach_chunk!(
sequence: 1,
content: build_ndjson(entity_records),
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
session.attach_chunk!(
sequence: 2,
content: build_ndjson(transaction_records),
filename: "transactions.ndjson",
content_type: "application/x-ndjson"
)
complete_chunk = session.imports.find_by!(sequence: 1)
failed_chunk = session.imports.find_by!(sequence: 2)
complete_chunk.update!(status: :complete, summary: { "accounts" => { "created" => 1 } }, error_details: {})
failed_chunk.update!(status: :failed, error: "transient failure", error_details: { "code" => "import_failed" })
session.update!(
status: :failed,
summary: complete_chunk.summary,
error_details: { "code" => "import_failed", "message" => "transient failure" }
)
processed_sequences = []
importer_factory = lambda do |_family, _content, import_session:, import:|
processed_sequences << import.sequence
flunk "completed chunk was reprocessed" if import.sequence == 1
assert_equal session, import_session
Object.new.tap do |importer|
importer.define_singleton_method(:import!) do
{
accounts: [],
entries: [],
summary: { "transactions" => { "created" => 1 } }
}
end
end
end
Family::DataImporter.stub(:new, importer_factory) do
session.publish
end
assert_equal [ 2 ], processed_sequences
assert complete_chunk.reload.complete?
assert failed_chunk.reload.complete?
assert session.reload.complete?
assert_equal 1, session.summary.dig("accounts", "created")
assert_equal 1, session.summary.dig("transactions", "created")
end
test "publish keeps session complete and records safe error when family sync enqueue fails" do
session = @family.import_sessions.create!(expected_chunks: 1)
session.attach_chunk!(
sequence: 1,
content: build_ndjson(entity_records),
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
Family.any_instance.stubs(:sync_later).raises(StandardError, "redis://secret.local/0")
session.publish
assert session.reload.complete?
assert_equal "family_sync_enqueue_failed", session.error_details["code"]
assert_equal "Family sync could not be queued after import completion.", session.error_details["message"]
assert_no_match(/secret/, session.error_details.to_json)
end
test "publish stores generic error details for unexpected import failures" do
session = @family.import_sessions.create!(expected_chunks: 1)
session.attach_chunk!(
sequence: 1,
content: build_ndjson(entity_records),
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
importer_factory = ->(*) { raise StandardError, "redis://secret.local/0" }
Family::DataImporter.stub(:new, importer_factory) do
session.publish
end
assert session.reload.failed?
assert_equal "Import session failed.", session.imports.first.error
assert_equal "import_failed", session.error_details["code"]
assert_equal "Import session failed.", session.error_details["message"]
assert_no_match(/secret/, session.error_details.to_json)
end
test "publish later requires the exact expected chunk sequences" do
session = @family.import_sessions.create!(expected_chunks: 2)
session.attach_chunk!(
sequence: 1,
content: build_ndjson(entity_records),
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
error = assert_raises(ImportSession::ConflictError) do
session.publish_later
end
expected_message = "import session chunks do not match expected sequences " \
"(missing sequences: 2)"
assert_equal expected_message, error.message
assert session.reload.pending?
end
test "chunk upload rejects sequences beyond the expected chunk count" do
session = @family.import_sessions.create!(expected_chunks: 1)
error = assert_raises(ImportSession::ConflictError) do
session.attach_chunk!(
sequence: 2,
content: build_ndjson(entity_records),
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
end
assert_equal "sequence exceeds expected_chunks", error.message
assert_empty session.imports
end
test "publish later restores status and records enqueue failures" do
session = @family.import_sessions.create!(expected_chunks: 1)
session.attach_chunk!(
sequence: 1,
content: build_ndjson(entity_records),
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
ImportSessionJob.stub(:perform_later, ->(_import_session) { raise StandardError, "queue offline" }) do
error = assert_raises(ImportSession::EnqueueError) do
session.publish_later
end
assert_equal "Import session could not be queued.", error.message
end
assert session.reload.pending?
assert_equal "import_enqueue_failed", session.error_details["code"]
assert_equal "Import session could not be queued.", session.error_details["message"]
end
test "publish later syncs chunk row counts before enforcing row limit" do
session = @family.import_sessions.create!(expected_chunks: 1)
session.attach_chunk!(
sequence: 1,
content: build_ndjson(entity_records + transaction_records),
filename: "session.ndjson",
content_type: "application/x-ndjson"
)
session.imports.update_all(rows_count: 0)
SureImport.stub(:max_row_count, 1) do
assert_raises(Import::MaxRowCountExceededError) { session.publish_later }
end
assert session.reload.pending?
assert_equal 5, session.imports.reload.first.rows_count
end
test "fails loudly when a later chunk references a missing source id" do
session = @family.import_sessions.create!(expected_chunks: 1)
session.attach_chunk!(
sequence: 1,
content: build_ndjson(transaction_records),
filename: "transactions.ndjson",
content_type: "application/x-ndjson"
)
session.publish
assert session.reload.failed?
chunk = session.imports.first
assert chunk.failed?
assert_equal "missing_source_reference", chunk.error_details["code"]
assert_equal "acct-1", chunk.error_details["source_id"]
assert_equal 0, @family.entries.count
end
test "source mappings from another family cannot satisfy missing references" do
other_family = Family.create!(name: "Other Family", currency: "USD", locale: "en")
other_session = other_family.import_sessions.create!(expected_chunks: 1)
other_session.attach_chunk!(
sequence: 1,
content: build_ndjson(entity_records),
filename: "other-entities.ndjson",
content_type: "application/x-ndjson"
)
other_session.publish
assert other_session.reload.complete?
assert_equal 1, other_session.source_mappings.where(source_type: "Account", source_id: "acct-1").count
session = @family.import_sessions.create!(expected_chunks: 1)
session.attach_chunk!(
sequence: 1,
content: build_ndjson(transaction_records),
filename: "transactions.ndjson",
content_type: "application/x-ndjson"
)
session.publish
assert session.reload.failed?
assert_equal "missing_source_reference", session.imports.first.error_details["code"]
assert_equal "acct-1", session.imports.first.error_details["source_id"]
assert_equal 0, @family.entries.count
end
test "session mode rejects invalid account accountable types" do
session = @family.import_sessions.create!(expected_chunks: 1)
session.attach_chunk!(
sequence: 1,
content: build_ndjson([
{
type: "Account",
data: {
id: "acct-invalid",
name: "Invalid Account",
balance: "100.00",
currency: "USD",
accountable_type: "Kernel"
}
}
]),
filename: "accounts.ndjson",
content_type: "application/x-ndjson"
)
session.publish
assert session.reload.failed?
assert_equal 0, @family.accounts.count
assert_equal "invalid_import_record", session.imports.first.error_details["code"]
assert_equal "Account", session.imports.first.error_details["record_type"]
assert_equal "accountable_type", session.imports.first.error_details["field"]
assert_equal "Kernel", session.imports.first.error_details["value"]
end
test "chunk upload is idempotent by sequence and checksum" do
session = @family.import_sessions.create!
content = build_ndjson(entity_records)
first = session.attach_chunk!(
sequence: 1,
content: content,
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
second = session.attach_chunk!(
sequence: 1,
content: content,
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
assert_equal first.id, second.id
assert_raises(ImportSession::ConflictError) do
session.attach_chunk!(
sequence: 1,
content: build_ndjson(transaction_records),
filename: "different.ndjson",
content_type: "application/x-ndjson"
)
end
end
test "chunk upload repairs incomplete existing chunk before accepting retry" do
session = @family.import_sessions.create!
content = build_ndjson(transaction_records)
chunk = session.imports.create!(
family: @family,
type: "SureImport",
sequence: 1,
client_chunk_id: "entities",
checksum: Digest::SHA256.hexdigest(content)
)
result = session.attach_chunk!(
sequence: 1,
client_chunk_id: "entities",
content: content,
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
assert_equal chunk.id, result.id
assert result.reload.ndjson_file.attached?
assert_equal 1, result.rows_count
end
test "chunk upload resyncs attached existing chunk before accepting retry" do
session = @family.import_sessions.create!
content = build_ndjson(transaction_records)
chunk = session.imports.create!(
family: @family,
type: "SureImport",
sequence: 1,
client_chunk_id: "entities",
checksum: Digest::SHA256.hexdigest(content)
)
chunk.ndjson_file.attach(
io: StringIO.new(content),
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
result = session.attach_chunk!(
sequence: 1,
client_chunk_id: "entities",
content: content,
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
assert_equal chunk.id, result.id
assert_equal 1, result.rows_count
end
test "chunk upload rejects inconsistent sequence and client chunk keys" do
session = @family.import_sessions.create!
session.attach_chunk!(
sequence: 1,
client_chunk_id: "entities",
content: build_ndjson(entity_records),
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
session.attach_chunk!(
sequence: 2,
client_chunk_id: "transactions",
content: build_ndjson(transaction_records),
filename: "transactions.ndjson",
content_type: "application/x-ndjson"
)
error = assert_raises(ImportSession::ConflictError) do
session.attach_chunk!(
sequence: 1,
client_chunk_id: "transactions",
content: build_ndjson(transaction_records),
filename: "transactions.ndjson",
content_type: "application/x-ndjson"
)
end
assert_equal "sequence and client_chunk_id refer to different chunks", error.message
end
test "chunk upload treats duplicate insert races as idempotent retries" do
session = @family.import_sessions.create!
content = build_ndjson(entity_records)
existing = session.imports.create!(
family: @family,
type: "SureImport",
sequence: 1,
client_chunk_id: "entities",
checksum: Digest::SHA256.hexdigest(content)
)
existing.ndjson_file.attach(
io: StringIO.new(content),
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
existing.sync_ndjson_rows_count!
lookup_count = 0
session.stub(:existing_chunk_for!, ->(**) {
lookup_count += 1
lookup_count == 1 ? nil : existing
}) do
session.stub(:create_chunk!, ->(**) { raise ActiveRecord::RecordNotUnique, "duplicate chunk" }) do
assert_equal existing, session.attach_chunk!(
sequence: 1,
client_chunk_id: "entities",
content: content,
filename: "entities.ndjson",
content_type: "application/x-ndjson"
)
end
end
assert_equal 2, lookup_count
end
test "client session creation treats duplicate insert races as idempotent retries" do
existing = @family.import_sessions.create!(client_session_id: "race-session", expected_chunks: 2)
ImportSession.any_instance.stubs(:save!).raises(ActiveRecord::RecordNotUnique)
session = ImportSession.create_or_find_for!(
family: @family,
import_type: "SureImport",
client_session_id: "race-session",
expected_chunks: 2
)
assert_equal existing, session
end
test "client session creation race backfills missing expected chunks" do
existing = @family.import_sessions.create!(client_session_id: "race-session")
racing_session = @family.import_sessions.build(client_session_id: "race-session")
racing_session.stubs(:save!).raises(ActiveRecord::RecordNotUnique)
@family.import_sessions.stub(:find_or_initialize_by, racing_session) do
session = ImportSession.create_or_find_for!(
family: @family,
import_type: "SureImport",
client_session_id: "race-session",
expected_chunks: 2
)
assert_equal existing, session
end
assert_equal 2, existing.reload.expected_chunks
end
test "client session creation race preserves expected chunks conflict" do
@family.import_sessions.create!(client_session_id: "race-session", expected_chunks: 2)
ImportSession.any_instance.stubs(:save!).raises(ActiveRecord::RecordNotUnique)
error = assert_raises(ImportSession::ConflictError) do
ImportSession.create_or_find_for!(
family: @family,
import_type: "SureImport",
client_session_id: "race-session",
expected_chunks: 3
)
end
assert_equal "client_session_id already exists with a different expected_chunks value", error.message
end
test "session mode rejects rule records without source ids" do
session = @family.import_sessions.create!(expected_chunks: 1)
session.attach_chunk!(
sequence: 1,
content: build_ndjson([
{
type: "Rule",
data: {
name: "Missing Source Rule",
resource_type: "transaction",
active: true
}
}
]),
filename: "rules.ndjson",
content_type: "application/x-ndjson"
)
session.publish
assert session.reload.failed?
assert_equal 0, @family.rules.count
assert_equal "missing_source_reference", session.imports.first.error_details["code"]
assert_equal "Rule", session.imports.first.error_details["record_type"]
assert_equal "(blank)", session.imports.first.error_details["source_id"]
end
test "session mode imports rule records exported by Sure packages" do
source_family = Family.create!(name: "Rule Export Source", currency: "USD", locale: "en")
category = source_family.categories.create!(
name: "Exported Category",
color: "#00AA00",
lucide_icon: "shapes"
)
source_rule = source_family.rules.build(
name: "Exported Rule",
resource_type: "transaction",
active: true
)
source_rule.conditions.build(
condition_type: "transaction_name",
operator: "like",
value: "Coffee"
)
source_rule.actions.build(
action_type: "set_transaction_category",
value: category.id
)
source_rule.save!
session = @family.import_sessions.create!(expected_chunks: 1)
session.attach_chunk!(
sequence: 1,
content: exported_ndjson_for(source_family),
filename: "all.ndjson",
content_type: "application/x-ndjson"
)
session.publish
assert session.reload.complete?
imported_rule = @family.rules.find_by!(name: "Exported Rule")
imported_category = @family.categories.find_by!(name: "Exported Category")
assert_equal 1, session.summary.dig("rules", "created")
assert_equal imported_category.id, imported_rule.actions.first.value
assert_source_mapping session, "Rule", source_rule.id, imported_rule
end
test "client idempotency keys are bounded before indexed writes" do
session = @family.import_sessions.build(client_session_id: "x" * 256)
assert_not session.valid?
assert_includes session.errors[:client_session_id], "is too long (maximum is 255 characters)"
import = @family.imports.build(type: "SureImport", client_chunk_id: "x" * 256)
assert_not import.valid?
assert_includes import.errors[:client_chunk_id], "is too long (maximum is 255 characters)"
import.sequence = 0
import.checksum = "short"
assert_not import.valid?
assert_includes import.errors[:sequence], "must be greater than 0"
assert_includes import.errors[:checksum], "is the wrong length (should be 64 characters)"
other_family = Family.create!(name: "Other Import Family", currency: "USD", locale: "en")
import.import_session = other_family.import_sessions.build
import.sequence = nil
import.checksum = nil
assert_not import.valid?
assert_includes import.errors[:import_session], "must belong to your family"
assert_includes import.errors[:sequence], "must be present for import session chunks"
assert_includes import.errors[:checksum], "must be present for import session chunks"
mapping = @family.import_source_mappings.build(
import_session: @family.import_sessions.build,
source_type: "x" * 65,
source_id: "x" * 256,
target_type: "Account",
target_id: SecureRandom.uuid
)
assert_not mapping.valid?
assert_includes mapping.errors[:source_type], "is too long (maximum is 64 characters)"
assert_includes mapping.errors[:source_id], "is too long (maximum is 255 characters)"
mapping.source_type = "Unsupported"
mapping.source_id = "acct-1"
assert_not mapping.valid?
assert_includes mapping.errors[:source_type], "is not included in the list"
mapping.source_type = "Account"
mapping.target_type = "Unsupported"
assert_not mapping.valid?
assert_includes mapping.errors[:target_type], "is not included in the list"
end
test "client idempotency keys are stripped before validation" do
session = @family.import_sessions.create!(client_session_id: " session-1 ")
import = @family.imports.create!(type: "SureImport", client_chunk_id: " chunk-1 ")
category = @family.categories.create!(name: "Mapping Category")
mapping = session.source_mappings.create!(
family: @family,
source_type: "Category",
source_id: " cat-1 ",
target: category
)
assert_equal "session-1", session.client_session_id
assert_equal "chunk-1", import.client_chunk_id
assert_equal "cat-1", mapping.source_id
end
test "session status payloads must remain JSON objects" do
session = @family.import_sessions.build(summary: [], error_details: "failed")
import = @family.imports.build(type: "SureImport", summary: [], error_details: "failed")
assert_not session.valid?
assert_includes session.errors[:summary], "must be an object"
assert_includes session.errors[:error_details], "must be an object"
assert_not import.valid?
assert_includes import.errors[:summary], "must be an object"
assert_includes import.errors[:error_details], "must be an object"
end
test "source mappings must belong to the same family as their import session" do
other_family = Family.create!(name: "Other Mapping Family", currency: "USD", locale: "en")
mapping = other_family.import_source_mappings.build(
import_session: @family.import_sessions.build,
source_type: "Account",
source_id: "acct-1",
target: @family.accounts.build(name: "Session Checking")
)
assert_not mapping.valid?
assert_includes mapping.errors[:family], "must match import session"
end
test "source mapping targets must not cross family boundaries" do
other_family = Family.create!(name: "Other Mapping Target Family", currency: "USD", locale: "en")
mapping = @family.import_source_mappings.build(
import_session: @family.import_sessions.build,
source_type: " Account ",
source_id: "acct-1",
target: other_family.accounts.build(name: "Other Checking")
)
assert_not mapping.valid?
assert_equal "Account", mapping.source_type
assert_includes mapping.errors[:target], "must belong to your family"
end
private
def entity_records
[
{
type: "Account",
data: {
id: "acct-1",
name: "Session Checking",
balance: "100.00",
currency: "USD",
accountable_type: "Depository",
accountable: { subtype: "checking" }
}
},
{
type: "Category",
data: {
id: "cat-1",
name: "Groceries",
color: "#407706",
classification: "expense"
}
},
{
type: "Merchant",
data: {
id: "merchant-1",
name: "Market",
color: "#111111"
}
},
{
type: "Tag",
data: {
id: "tag-1",
name: "Weekly",
color: "#222222"
}
}
]
end
def transaction_records
[
{
type: "Transaction",
data: {
id: "txn-1",
account_id: "acct-1",
category_id: "cat-1",
merchant_id: "merchant-1",
tag_ids: [ "tag-1" ],
date: "2024-01-15",
amount: "-12.34",
currency: "USD",
name: "Grocery Run"
}
}
]
end
def build_ndjson(records)
records.map(&:to_json).join("\n")
end
def exported_ndjson_for(family)
ndjson = nil
Zip::File.open_buffer(Family::DataExporter.new(family).generate_export) do |zip|
ndjson = zip.read("all.ndjson")
end
ndjson
end
def assert_source_mapping(session, source_type, source_id, target)
mapping = session.source_mappings.find_by!(source_type: source_type, source_id: source_id)
assert_equal @family, mapping.family
assert_equal target, mapping.target
end
end