From 16a0fa08f86a4769a54162e8d08e9c29342b8bf0 Mon Sep 17 00:00:00 2001 From: Romain Brucker Date: Sat, 11 Apr 2026 21:37:07 +0200 Subject: [PATCH] Add DeFi via Coinstats (#1417) * feat: handle defi account with coinstats provider * chore: refactor to follow project conventions * fix: fixing codex/coderabbit findings * fix: fixing coderabbit findings * fix: fixing coderabbit findings * fix: fixing coderabbit findings * fix: fixing coderabbit findings * fix: fixing coderabbit findings --- .../source_classification.rb | 6 + .../coinstats_item/defi_account_manager.rb | 206 +++++++++++++++++ app/models/coinstats_item/importer.rb | 41 ++++ app/models/provider/coinstats.rb | 19 ++ test/models/coinstats_account_test.rb | 14 ++ test/models/coinstats_item/importer_test.rb | 216 ++++++++++++++++++ 6 files changed, 502 insertions(+) create mode 100644 app/models/coinstats_item/defi_account_manager.rb diff --git a/app/models/coinstats_account/source_classification.rb b/app/models/coinstats_account/source_classification.rb index 97f867d4a..a3e2ed520 100644 --- a/app/models/coinstats_account/source_classification.rb +++ b/app/models/coinstats_account/source_classification.rb @@ -3,9 +3,15 @@ module CoinstatsAccount::SourceClassification def wallet_source? payload = raw_payload.to_h.with_indifferent_access + return false if payload[:source] == "defi" payload[:source] == "wallet" || (payload[:address].present? && payload[:blockchain].present?) end + def defi_source? + payload = raw_payload.to_h.with_indifferent_access + payload[:source] == "defi" + end + def exchange_source? exchange_source_for?(raw_payload) end diff --git a/app/models/coinstats_item/defi_account_manager.rb b/app/models/coinstats_item/defi_account_manager.rb new file mode 100644 index 000000000..a5962608f --- /dev/null +++ b/app/models/coinstats_item/defi_account_manager.rb @@ -0,0 +1,206 @@ +# frozen_string_literal: true + +# Manages DeFi/staking accounts for a CoinStats wallet connection. +# Discovers staking, LP, and yield farming positions via the CoinStats DeFi API +# and keeps the corresponding CoinstatsAccounts up to date. +class CoinstatsItem::DefiAccountManager + attr_reader :coinstats_item + + def initialize(coinstats_item) + @coinstats_item = coinstats_item + end + + # Fetches DeFi positions for the given wallet and creates/updates CoinstatsAccounts. + # Positions that disappear from the API (fully unstaked) are zeroed out. + # Returns true on success, false on failure. + def sync_wallet!(address:, blockchain:, provider:) + normalized_address = address.to_s.downcase + normalized_blockchain = blockchain.to_s.downcase + + response = provider.get_wallet_defi(address: address, connection_id: blockchain) + unless response.success? + Rails.logger.warn "CoinstatsItem::DefiAccountManager - DeFi fetch failed for #{normalized_blockchain}:#{normalized_address}" + return false + end + + defi_data = response.data.to_h.with_indifferent_access + protocols = Array(defi_data[:protocols]) + active_defi_ids = [] + had_upsert_failures = false + + protocols.each do |protocol| + protocol = protocol.with_indifferent_access + + Array(protocol[:investments]).each do |investment| + investment = investment.with_indifferent_access + + Array(investment[:assets]).each do |asset| + asset = asset.with_indifferent_access + next if asset[:amount].to_f.zero? + next if asset[:coinId].blank? && asset[:symbol].blank? + + account_id = build_account_id(protocol, investment, asset, blockchain: normalized_blockchain) + + if upsert_account!(address: normalized_address, blockchain: normalized_blockchain, protocol: protocol, investment: investment, asset: asset, account_id: account_id) + active_defi_ids << account_id + else + had_upsert_failures = true + end + end + end + end + + # Skip zero-out when upserts failed — active_defi_ids is incomplete and we'd risk + # zeroing accounts that are still active but failed to save this cycle. + return false if had_upsert_failures + + zero_out_inactive_accounts!(normalized_address, normalized_blockchain, active_defi_ids) + true + rescue => e + Rails.logger.warn "CoinstatsItem::DefiAccountManager - Sync failed for #{blockchain}:#{address}: #{e.message}" + false + end + + # Creates the local Account for a DeFi CoinstatsAccount if it doesn't exist yet. + def ensure_local_account!(coinstats_account) + return false if coinstats_account.account.present? + + account = Account.create_and_sync({ + family: coinstats_item.family, + name: coinstats_account.name, + balance: coinstats_account.current_balance || 0, + cash_balance: 0, + currency: coinstats_account.currency, + accountable_type: "Crypto", + accountable_attributes: { + subtype: "wallet", + tax_treatment: "taxable" + } + }, skip_initial_sync: true) + + AccountProvider.create!(account: account, provider: coinstats_account) + true + rescue ActiveRecord::RecordNotUnique + # Another concurrent sync created the AccountProvider; destroy the orphaned Account we just created. + account&.destroy + false + end + + private + + # Builds a stable, unique account_id for a DeFi asset position. + # Format: "defi:::::" + # Blockchain is included to avoid collisions when the same wallet address exists on + # multiple EVM-compatible chains (e.g. Ethereum and Polygon). + def build_account_id(protocol, investment, asset, blockchain:) + chain = blockchain.to_s.downcase.gsub(/\s+/, "_").presence || "unknown" + protocol_id = protocol[:id].to_s.downcase.gsub(/\s+/, "_").presence || "unknown" + coin_id = (asset[:coinId] || asset[:symbol]).to_s.downcase + title = asset[:title].to_s.downcase.gsub(/\s+/, "_").presence || "position" + investment_type = investment[:name].to_s.downcase.gsub(/\s+/, "_").presence + parts = [ "defi", chain, protocol_id, coin_id, title ] + parts.insert(3, investment_type) if investment_type.present? + parts.join(":") + end + + def build_account_name(protocol, asset) + protocol_name = protocol[:name].to_s + symbol = asset[:symbol].to_s.upcase + + case asset[:title].to_s.downcase + when "deposit", "supplied" + "#{symbol} (#{protocol_name} Staking)" + when "reward", "yield" + "#{symbol} (#{protocol_name} Rewards)" + else + label = asset[:title].to_s.presence || "Position" + "#{symbol} (#{protocol_name} #{label})" + end + end + + # Returns true on success, false on failure (so the caller can track active positions correctly). + def upsert_account!(address:, blockchain:, protocol:, investment:, asset:, account_id:) + coinstats_account = coinstats_item.coinstats_accounts.find_or_initialize_by( + account_id: account_id, + wallet_address: address + ) + + # The DeFi API returns asset.price as a TotalValueDto (total position value, not per-token price). + # Store it as `balance` so inferred_current_balance uses it directly instead of quantity * price. + # Guard against a missing USD key falling back to the whole hash (which would raise on .to_f). + total_balance_usd = if asset[:price].is_a?(Hash) + price_hash = asset[:price].with_indifferent_access + (price_hash[:USD] || price_hash["USD"] || 0).to_f + else + asset[:price].to_f + end + + # Convert the USD balance to the family's base currency for consistent portfolio reporting. + # convert_usd_balance returns the actual currency used — it may fall back to "USD" if the + # exchange rate is unavailable, so we use the returned currency rather than assuming success. + balance, actual_currency = convert_usd_balance(total_balance_usd, family_currency) + quantity = asset[:amount].to_f + per_token_price = quantity > 0 ? balance / quantity : 0 + + snapshot = { + source: "defi", + id: account_id, + address: address, + blockchain: blockchain, + protocol_id: protocol[:id], + protocol_name: protocol[:name], + protocol_logo: protocol[:logo], + investment_type: investment[:name], + coinId: asset[:coinId], + symbol: asset[:symbol], + name: asset[:symbol].to_s.upcase, + amount: asset[:amount], + balance: balance, + priceUsd: per_token_price, + asset_title: asset[:title], + currency: actual_currency, + institution_logo: protocol[:logo] + }.compact + + coinstats_account.name = build_account_name(protocol, asset) unless coinstats_account.persisted? + coinstats_account.currency = actual_currency + coinstats_account.raw_payload = snapshot + coinstats_account.current_balance = coinstats_account.inferred_current_balance(snapshot) + coinstats_account.institution_metadata = { logo: protocol[:logo] }.compact + coinstats_account.save! + + ensure_local_account!(coinstats_account) + true + rescue => e + Rails.logger.warn "CoinstatsItem::DefiAccountManager - Failed to upsert account #{account_id}: #{e.message}" + false + end + + # Sets balance to zero for DeFi accounts no longer present in the API response. + def zero_out_inactive_accounts!(address, blockchain, active_defi_ids) + coinstats_item.coinstats_accounts.where(wallet_address: address).each do |account| + raw = account.raw_payload.to_h.with_indifferent_access + next unless raw[:source] == "defi" + next unless raw[:blockchain].to_s.casecmp?(blockchain.to_s) + next if active_defi_ids.include?(account.account_id) + + account.update!(current_balance: 0, raw_payload: raw.merge(amount: 0, balance: 0, priceUsd: 0)) + end + end + + def family_currency + coinstats_item.family.currency.presence || "USD" + end + + # Converts a USD amount to the target currency using Money exchange rates. + # Returns [amount, currency] so the caller always knows what currency the amount is in. + # Falls back to [usd_amount, "USD"] if conversion is unavailable. + def convert_usd_balance(usd_amount, target_currency) + return [ usd_amount, "USD" ] if target_currency == "USD" || usd_amount.zero? + + [ Money.new(usd_amount, "USD").exchange_to(target_currency).amount, target_currency ] + rescue => e + Rails.logger.warn "CoinstatsItem::DefiAccountManager - FX conversion USD->#{target_currency} failed: #{e.message}" + [ usd_amount, "USD" ] + end +end diff --git a/app/models/coinstats_item/importer.rb b/app/models/coinstats_item/importer.rb index 9f48dd71a..b3e35250d 100644 --- a/app/models/coinstats_item/importer.rb +++ b/app/models/coinstats_item/importer.rb @@ -38,6 +38,9 @@ class CoinstatsItem::Importer wallet_accounts = linked_accounts.select(&:wallet_source?) exchange_accounts = linked_accounts.select(&:exchange_source?) + failed_defi_wallet_keys = sync_defi_for_wallets!(wallet_accounts) + accounts_failed += failed_defi_wallet_keys.size + bulk_balance_data = fetch_balances_for_accounts(wallet_accounts) bulk_transactions_data = fetch_transactions_for_accounts(wallet_accounts) portfolio_coins_data = fetch_portfolio_coins_for_exchange(exchange_accounts) @@ -52,6 +55,12 @@ class CoinstatsItem::Importer portfolio_coins_data: portfolio_coins_data, portfolio_transactions_data: portfolio_transactions_data ) + elsif coinstats_account.defi_source? + # DeFi/staking accounts are kept up to date by sync_defi_for_wallets! above. + # Mark as failed if the wallet sync for this account's address didn't succeed. + raw = coinstats_account.raw_payload.to_h.with_indifferent_access + wallet_key = "#{raw[:blockchain]}:#{raw[:address]}".downcase + { success: !failed_defi_wallet_keys.include?(wallet_key), transactions_count: 0 } else update_wallet_account( coinstats_account, @@ -576,4 +585,36 @@ class CoinstatsItem::Importer def family_currency coinstats_item.family.currency.presence || "USD" end + + # Syncs DeFi/staking positions for all unique wallet addresses by delegating + # to DefiAccountManager, which owns discovery, upsert, and zero-out logic. + # Returns a Set of "blockchain:address" keys for wallets that failed to sync, + # so the caller can accurately mark individual DeFi accounts as failed. + def sync_defi_for_wallets!(wallet_accounts) + # Include existing DeFi accounts as address sources so staking positions stay + # current even if the parent wallet account has been removed. + defi_accounts = coinstats_item.coinstats_accounts.joins(:account_provider).select(&:defi_source?) + + unique_wallets = (wallet_accounts + defi_accounts).uniq(&:id).filter_map do |account| + raw = account.raw_payload.to_h.with_indifferent_access + next unless raw[:address].present? && raw[:blockchain].present? + + { address: raw[:address], blockchain: raw[:blockchain] } + end.uniq { |w| [ w[:address].downcase, w[:blockchain].downcase ] } + + return Set.new if unique_wallets.empty? + + manager = CoinstatsItem::DefiAccountManager.new(coinstats_item) + failed_wallet_keys = Set.new + + unique_wallets.each do |wallet| + result = manager.sync_wallet!(address: wallet[:address], blockchain: wallet[:blockchain], provider: coinstats_provider) + failed_wallet_keys << "#{wallet[:blockchain]}:#{wallet[:address]}".downcase unless result + end + + failed_wallet_keys + rescue => e + Rails.logger.warn "CoinstatsItem::Importer - DeFi sync failed: #{e.message}" + Set.new + end end diff --git a/app/models/provider/coinstats.rb b/app/models/provider/coinstats.rb index 1689c6189..7dda1f5ac 100644 --- a/app/models/provider/coinstats.rb +++ b/app/models/provider/coinstats.rb @@ -315,6 +315,25 @@ class Provider::Coinstats < Provider raise Error, "CoinStats API request failed: #{e.message}" end + # Get DeFi positions (staking, LP, yield farming) for a wallet address. + # https://coinstats.app/api-docs/openapi/get-wallet-defi + # @param address [String] Wallet address + # @param connection_id [String] Blockchain/connectionId identifier + # @return [Provider::Response] Response with DeFi position data + def get_wallet_defi(address:, connection_id:) + with_provider_response do + res = self.class.get( + "#{BASE_URL}/wallet/defi", + headers: auth_headers, + query: { address: address, connectionId: connection_id } + ) + handle_response(res) + end + rescue SocketError, Net::OpenTimeout, Net::ReadTimeout => e + Rails.logger.error "CoinStats API: GET /wallet/defi failed: #{e.class}: #{e.message}" + raise Error, "CoinStats API request failed: #{e.message}" + end + # Get cryptocurrency balances for multiple wallets in a single request # https://coinstats.app/api-docs/openapi/get-wallet-balances # @param wallets [String] Comma-separated list of wallet addresses in format "blockchain:address" diff --git a/test/models/coinstats_account_test.rb b/test/models/coinstats_account_test.rb index 804f79e33..7ff88468d 100644 --- a/test/models/coinstats_account_test.rb +++ b/test/models/coinstats_account_test.rb @@ -291,6 +291,20 @@ class CoinstatsAccountTest < ActiveSupport::TestCase assert_nil CoinstatsAccount.find_by(id: wallet_a.id) end + test "defi_source? returns true when source is defi" do + @coinstats_account.update!(raw_payload: { source: "defi", address: "0x123", blockchain: "ethereum" }) + assert @coinstats_account.defi_source? + refute @coinstats_account.wallet_source? + end + + test "wallet_source? returns true for wallet with address and blockchain, false for defi" do + @coinstats_account.update!(raw_payload: { address: "0x123", blockchain: "ethereum" }) + assert @coinstats_account.wallet_source? + + @coinstats_account.update!(raw_payload: { source: "defi", address: "0x123", blockchain: "ethereum" }) + refute @coinstats_account.wallet_source? + end + test "portfolio exchange account derives total and cash balances from embedded coins" do @family.update!(currency: "EUR") diff --git a/test/models/coinstats_item/importer_test.rb b/test/models/coinstats_item/importer_test.rb index 96ccafc0d..af4436bb3 100644 --- a/test/models/coinstats_item/importer_test.rb +++ b/test/models/coinstats_item/importer_test.rb @@ -10,6 +10,8 @@ class CoinstatsItem::ImporterTest < ActiveSupport::TestCase ) @mock_provider = mock("Provider::Coinstats") + # Stub DeFi endpoint globally — individual tests override if needed + @mock_provider.stubs(:get_wallet_defi).returns(success_response({ protocols: [] })) end # Helper to wrap data in Provider::Response @@ -592,4 +594,218 @@ class CoinstatsItem::ImporterTest < ActiveSupport::TestCase assert_equal 5000.0, coinstats_account1.current_balance.to_f # 2.0 * 2500 assert_equal 4500.0, coinstats_account2.current_balance.to_f # 0.1 * 45000 end + + # DeFi / staking tests + + test "creates DeFi account with balance equal to total position value, not quantity * price" do + crypto = Crypto.create! + account = @family.accounts.create!( + accountable: crypto, + name: "Ethereum Wallet", + balance: 1000, + currency: "USD" + ) + coinstats_account = @coinstats_item.coinstats_accounts.create!( + name: "Ethereum (0x12...abc)", + currency: "USD", + account_id: "ethereum", + raw_payload: { address: "0x123abc", blockchain: "ethereum" } + ) + AccountProvider.create!(account: account, provider: coinstats_account) + + # DeFi response: 32 ETH staked, total position value = $70,272 (= 32 * $2196) + # The `price` field is TotalValueDto (total position value), NOT price per token. + defi_response = { + protocols: [ + { + id: "lido", + name: "Lido", + logo: "https://example.com/lido.png", + investments: [ + { + name: "Staking", + assets: [ + { + title: "Deposit", + coinId: "ethereum", + symbol: "ETH", + amount: 32.0, + price: { USD: 70272.0 } # total value, not per-token + } + ] + } + ] + } + ] + } + + @mock_provider.expects(:get_wallet_defi) + .with(address: "0x123abc", connection_id: "ethereum") + .returns(success_response(defi_response)) + + @mock_provider.stubs(:get_wallet_balances).returns(success_response([])) + @mock_provider.stubs(:extract_wallet_balance).returns([]) + @mock_provider.stubs(:get_wallet_transactions).returns(success_response([])) + @mock_provider.stubs(:extract_wallet_transactions).returns([]) + + assert_difference "CoinstatsAccount.count", 1 do + assert_difference "Account.count", 1 do + CoinstatsItem::Importer.new(@coinstats_item, coinstats_provider: @mock_provider).import + end + end + + defi_account = @coinstats_item.coinstats_accounts.find_by(account_id: "defi:ethereum:lido:staking:ethereum:deposit") + assert_not_nil defi_account + assert_equal "defi", defi_account.raw_payload["source"] + # Balance must be the total position value ($70,272), NOT 32 * $70,272 + assert_equal 70272.0, defi_account.current_balance.to_f + assert_equal "ETH (Lido Staking)", defi_account.name + end + + test "zeros out DeFi account when staking position is no longer active" do + crypto = Crypto.create! + account = @family.accounts.create!( + accountable: crypto, + name: "Ethereum Wallet", + balance: 1000, + currency: "USD" + ) + wallet_account = @coinstats_item.coinstats_accounts.create!( + name: "Ethereum (0x12...abc)", + currency: "USD", + account_id: "ethereum", + raw_payload: { address: "0x123abc", blockchain: "ethereum" } + ) + AccountProvider.create!(account: account, provider: wallet_account) + + # Existing DeFi account from a previous sync + defi_crypto = Crypto.create! + defi_linked_account = @family.accounts.create!( + accountable: defi_crypto, + name: "ETH (Lido Staking)", + balance: 70272, + currency: "USD" + ) + defi_account = @coinstats_item.coinstats_accounts.create!( + name: "ETH (Lido Staking)", + currency: "USD", + account_id: "defi:ethereum:lido:staking:ethereum:deposit", + wallet_address: "0x123abc", + current_balance: 70272, + raw_payload: { + source: "defi", + address: "0x123abc", + blockchain: "ethereum", + protocol_id: "lido", + amount: 32.0, + balance: 70272.0 + } + ) + AccountProvider.create!(account: defi_linked_account, provider: defi_account) + + # DeFi response returns empty — position has been fully unstaked + @mock_provider.expects(:get_wallet_defi) + .with(address: "0x123abc", connection_id: "ethereum") + .returns(success_response({ protocols: [] })) + + @mock_provider.stubs(:get_wallet_balances).returns(success_response([])) + @mock_provider.stubs(:extract_wallet_balance).returns([]) + @mock_provider.stubs(:get_wallet_transactions).returns(success_response([])) + @mock_provider.stubs(:extract_wallet_transactions).returns([]) + + CoinstatsItem::Importer.new(@coinstats_item, coinstats_provider: @mock_provider).import + + defi_account.reload + assert_equal 0.0, defi_account.current_balance.to_f + end + + test "defi accounts are skipped in wallet update loop" do + crypto = Crypto.create! + account = @family.accounts.create!( + accountable: crypto, + name: "Ethereum Wallet", + balance: 1000, + currency: "USD" + ) + wallet_account = @coinstats_item.coinstats_accounts.create!( + name: "Ethereum", + currency: "USD", + account_id: "ethereum", + raw_payload: { address: "0x123abc", blockchain: "ethereum" } + ) + AccountProvider.create!(account: account, provider: wallet_account) + + defi_crypto = Crypto.create! + defi_linked_account = @family.accounts.create!( + accountable: defi_crypto, + name: "ETH (Lido Staking)", + balance: 1000, + currency: "USD" + ) + defi_account = @coinstats_item.coinstats_accounts.create!( + name: "ETH (Lido Staking)", + currency: "USD", + account_id: "defi:ethereum:lido:staking:ethereum:deposit", + wallet_address: "0x123abc", + current_balance: 1000, + raw_payload: { + source: "defi", + address: "0x123abc", + blockchain: "ethereum", + amount: 0.5, + balance: 1000.0 + } + ) + AccountProvider.create!(account: defi_linked_account, provider: defi_account) + + # get_wallet_defi called once (for the one wallet), get_wallet_balances/transactions only + # called once despite two linked accounts (DeFi account excluded from wallet fetch) + @mock_provider.expects(:get_wallet_defi) + .with(address: "0x123abc", connection_id: "ethereum") + .once + .returns(success_response({ protocols: [] })) + + @mock_provider.expects(:get_wallet_balances).with("ethereum:0x123abc").once + .returns(success_response([])) + @mock_provider.stubs(:extract_wallet_balance).returns([]) + @mock_provider.expects(:get_wallet_transactions).with("ethereum:0x123abc").once + .returns(success_response([])) + @mock_provider.stubs(:extract_wallet_transactions).returns([]) + + result = CoinstatsItem::Importer.new(@coinstats_item, coinstats_provider: @mock_provider).import + + assert result[:success] + end + + test "propagates DeFi sync failure into accounts_failed count" do + crypto = Crypto.create! + account = @family.accounts.create!( + accountable: crypto, + name: "Ethereum Wallet", + balance: 1000, + currency: "USD" + ) + coinstats_account = @coinstats_item.coinstats_accounts.create!( + name: "Ethereum", + currency: "USD", + account_id: "ethereum", + raw_payload: { address: "0x123abc", blockchain: "ethereum" } + ) + AccountProvider.create!(account: account, provider: coinstats_account) + + @mock_provider.expects(:get_wallet_defi) + .with(address: "0x123abc", connection_id: "ethereum") + .raises(Provider::Coinstats::Error.new("DeFi endpoint unavailable")) + + @mock_provider.stubs(:get_wallet_balances).returns(success_response([])) + @mock_provider.stubs(:extract_wallet_balance).returns([]) + @mock_provider.stubs(:get_wallet_transactions).returns(success_response([])) + @mock_provider.stubs(:extract_wallet_transactions).returns([]) + + result = CoinstatsItem::Importer.new(@coinstats_item, coinstats_provider: @mock_provider).import + + # Wallet account still updated, but DeFi failure is counted + assert_equal 1, result[:accounts_failed] + refute result[:success] + end end