Add DeFi via Coinstats (#1417)

* feat: handle defi account with coinstats provider

* chore: refactor to follow project conventions

* fix: fixing codex/coderabbit findings

* fix: fixing coderabbit findings

* fix: fixing coderabbit findings

* fix: fixing coderabbit findings

* fix: fixing coderabbit findings

* fix: fixing coderabbit findings
This commit is contained in:
Romain Brucker
2026-04-11 21:37:07 +02:00
committed by GitHub
parent 7427b753e5
commit 16a0fa08f8
6 changed files with 502 additions and 0 deletions

View File

@@ -3,9 +3,15 @@ module CoinstatsAccount::SourceClassification
def wallet_source?
payload = raw_payload.to_h.with_indifferent_access
return false if payload[:source] == "defi"
payload[:source] == "wallet" || (payload[:address].present? && payload[:blockchain].present?)
end
def defi_source?
payload = raw_payload.to_h.with_indifferent_access
payload[:source] == "defi"
end
def exchange_source?
exchange_source_for?(raw_payload)
end

View File

@@ -0,0 +1,206 @@
# frozen_string_literal: true
# Manages DeFi/staking accounts for a CoinStats wallet connection.
# Discovers staking, LP, and yield farming positions via the CoinStats DeFi API
# and keeps the corresponding CoinstatsAccounts up to date.
class CoinstatsItem::DefiAccountManager
attr_reader :coinstats_item
def initialize(coinstats_item)
@coinstats_item = coinstats_item
end
# Fetches DeFi positions for the given wallet and creates/updates CoinstatsAccounts.
# Positions that disappear from the API (fully unstaked) are zeroed out.
# Returns true on success, false on failure.
def sync_wallet!(address:, blockchain:, provider:)
normalized_address = address.to_s.downcase
normalized_blockchain = blockchain.to_s.downcase
response = provider.get_wallet_defi(address: address, connection_id: blockchain)
unless response.success?
Rails.logger.warn "CoinstatsItem::DefiAccountManager - DeFi fetch failed for #{normalized_blockchain}:#{normalized_address}"
return false
end
defi_data = response.data.to_h.with_indifferent_access
protocols = Array(defi_data[:protocols])
active_defi_ids = []
had_upsert_failures = false
protocols.each do |protocol|
protocol = protocol.with_indifferent_access
Array(protocol[:investments]).each do |investment|
investment = investment.with_indifferent_access
Array(investment[:assets]).each do |asset|
asset = asset.with_indifferent_access
next if asset[:amount].to_f.zero?
next if asset[:coinId].blank? && asset[:symbol].blank?
account_id = build_account_id(protocol, investment, asset, blockchain: normalized_blockchain)
if upsert_account!(address: normalized_address, blockchain: normalized_blockchain, protocol: protocol, investment: investment, asset: asset, account_id: account_id)
active_defi_ids << account_id
else
had_upsert_failures = true
end
end
end
end
# Skip zero-out when upserts failed — active_defi_ids is incomplete and we'd risk
# zeroing accounts that are still active but failed to save this cycle.
return false if had_upsert_failures
zero_out_inactive_accounts!(normalized_address, normalized_blockchain, active_defi_ids)
true
rescue => e
Rails.logger.warn "CoinstatsItem::DefiAccountManager - Sync failed for #{blockchain}:#{address}: #{e.message}"
false
end
# Creates the local Account for a DeFi CoinstatsAccount if it doesn't exist yet.
def ensure_local_account!(coinstats_account)
return false if coinstats_account.account.present?
account = Account.create_and_sync({
family: coinstats_item.family,
name: coinstats_account.name,
balance: coinstats_account.current_balance || 0,
cash_balance: 0,
currency: coinstats_account.currency,
accountable_type: "Crypto",
accountable_attributes: {
subtype: "wallet",
tax_treatment: "taxable"
}
}, skip_initial_sync: true)
AccountProvider.create!(account: account, provider: coinstats_account)
true
rescue ActiveRecord::RecordNotUnique
# Another concurrent sync created the AccountProvider; destroy the orphaned Account we just created.
account&.destroy
false
end
private
# Builds a stable, unique account_id for a DeFi asset position.
# Format: "defi:<blockchain>:<protocol_id>:<investment_type>:<coin_id>:<asset_title>"
# Blockchain is included to avoid collisions when the same wallet address exists on
# multiple EVM-compatible chains (e.g. Ethereum and Polygon).
def build_account_id(protocol, investment, asset, blockchain:)
chain = blockchain.to_s.downcase.gsub(/\s+/, "_").presence || "unknown"
protocol_id = protocol[:id].to_s.downcase.gsub(/\s+/, "_").presence || "unknown"
coin_id = (asset[:coinId] || asset[:symbol]).to_s.downcase
title = asset[:title].to_s.downcase.gsub(/\s+/, "_").presence || "position"
investment_type = investment[:name].to_s.downcase.gsub(/\s+/, "_").presence
parts = [ "defi", chain, protocol_id, coin_id, title ]
parts.insert(3, investment_type) if investment_type.present?
parts.join(":")
end
def build_account_name(protocol, asset)
protocol_name = protocol[:name].to_s
symbol = asset[:symbol].to_s.upcase
case asset[:title].to_s.downcase
when "deposit", "supplied"
"#{symbol} (#{protocol_name} Staking)"
when "reward", "yield"
"#{symbol} (#{protocol_name} Rewards)"
else
label = asset[:title].to_s.presence || "Position"
"#{symbol} (#{protocol_name} #{label})"
end
end
# Returns true on success, false on failure (so the caller can track active positions correctly).
def upsert_account!(address:, blockchain:, protocol:, investment:, asset:, account_id:)
coinstats_account = coinstats_item.coinstats_accounts.find_or_initialize_by(
account_id: account_id,
wallet_address: address
)
# The DeFi API returns asset.price as a TotalValueDto (total position value, not per-token price).
# Store it as `balance` so inferred_current_balance uses it directly instead of quantity * price.
# Guard against a missing USD key falling back to the whole hash (which would raise on .to_f).
total_balance_usd = if asset[:price].is_a?(Hash)
price_hash = asset[:price].with_indifferent_access
(price_hash[:USD] || price_hash["USD"] || 0).to_f
else
asset[:price].to_f
end
# Convert the USD balance to the family's base currency for consistent portfolio reporting.
# convert_usd_balance returns the actual currency used — it may fall back to "USD" if the
# exchange rate is unavailable, so we use the returned currency rather than assuming success.
balance, actual_currency = convert_usd_balance(total_balance_usd, family_currency)
quantity = asset[:amount].to_f
per_token_price = quantity > 0 ? balance / quantity : 0
snapshot = {
source: "defi",
id: account_id,
address: address,
blockchain: blockchain,
protocol_id: protocol[:id],
protocol_name: protocol[:name],
protocol_logo: protocol[:logo],
investment_type: investment[:name],
coinId: asset[:coinId],
symbol: asset[:symbol],
name: asset[:symbol].to_s.upcase,
amount: asset[:amount],
balance: balance,
priceUsd: per_token_price,
asset_title: asset[:title],
currency: actual_currency,
institution_logo: protocol[:logo]
}.compact
coinstats_account.name = build_account_name(protocol, asset) unless coinstats_account.persisted?
coinstats_account.currency = actual_currency
coinstats_account.raw_payload = snapshot
coinstats_account.current_balance = coinstats_account.inferred_current_balance(snapshot)
coinstats_account.institution_metadata = { logo: protocol[:logo] }.compact
coinstats_account.save!
ensure_local_account!(coinstats_account)
true
rescue => e
Rails.logger.warn "CoinstatsItem::DefiAccountManager - Failed to upsert account #{account_id}: #{e.message}"
false
end
# Sets balance to zero for DeFi accounts no longer present in the API response.
def zero_out_inactive_accounts!(address, blockchain, active_defi_ids)
coinstats_item.coinstats_accounts.where(wallet_address: address).each do |account|
raw = account.raw_payload.to_h.with_indifferent_access
next unless raw[:source] == "defi"
next unless raw[:blockchain].to_s.casecmp?(blockchain.to_s)
next if active_defi_ids.include?(account.account_id)
account.update!(current_balance: 0, raw_payload: raw.merge(amount: 0, balance: 0, priceUsd: 0))
end
end
def family_currency
coinstats_item.family.currency.presence || "USD"
end
# Converts a USD amount to the target currency using Money exchange rates.
# Returns [amount, currency] so the caller always knows what currency the amount is in.
# Falls back to [usd_amount, "USD"] if conversion is unavailable.
def convert_usd_balance(usd_amount, target_currency)
return [ usd_amount, "USD" ] if target_currency == "USD" || usd_amount.zero?
[ Money.new(usd_amount, "USD").exchange_to(target_currency).amount, target_currency ]
rescue => e
Rails.logger.warn "CoinstatsItem::DefiAccountManager - FX conversion USD->#{target_currency} failed: #{e.message}"
[ usd_amount, "USD" ]
end
end

View File

@@ -38,6 +38,9 @@ class CoinstatsItem::Importer
wallet_accounts = linked_accounts.select(&:wallet_source?)
exchange_accounts = linked_accounts.select(&:exchange_source?)
failed_defi_wallet_keys = sync_defi_for_wallets!(wallet_accounts)
accounts_failed += failed_defi_wallet_keys.size
bulk_balance_data = fetch_balances_for_accounts(wallet_accounts)
bulk_transactions_data = fetch_transactions_for_accounts(wallet_accounts)
portfolio_coins_data = fetch_portfolio_coins_for_exchange(exchange_accounts)
@@ -52,6 +55,12 @@ class CoinstatsItem::Importer
portfolio_coins_data: portfolio_coins_data,
portfolio_transactions_data: portfolio_transactions_data
)
elsif coinstats_account.defi_source?
# DeFi/staking accounts are kept up to date by sync_defi_for_wallets! above.
# Mark as failed if the wallet sync for this account's address didn't succeed.
raw = coinstats_account.raw_payload.to_h.with_indifferent_access
wallet_key = "#{raw[:blockchain]}:#{raw[:address]}".downcase
{ success: !failed_defi_wallet_keys.include?(wallet_key), transactions_count: 0 }
else
update_wallet_account(
coinstats_account,
@@ -576,4 +585,36 @@ class CoinstatsItem::Importer
def family_currency
coinstats_item.family.currency.presence || "USD"
end
# Syncs DeFi/staking positions for all unique wallet addresses by delegating
# to DefiAccountManager, which owns discovery, upsert, and zero-out logic.
# Returns a Set of "blockchain:address" keys for wallets that failed to sync,
# so the caller can accurately mark individual DeFi accounts as failed.
def sync_defi_for_wallets!(wallet_accounts)
# Include existing DeFi accounts as address sources so staking positions stay
# current even if the parent wallet account has been removed.
defi_accounts = coinstats_item.coinstats_accounts.joins(:account_provider).select(&:defi_source?)
unique_wallets = (wallet_accounts + defi_accounts).uniq(&:id).filter_map do |account|
raw = account.raw_payload.to_h.with_indifferent_access
next unless raw[:address].present? && raw[:blockchain].present?
{ address: raw[:address], blockchain: raw[:blockchain] }
end.uniq { |w| [ w[:address].downcase, w[:blockchain].downcase ] }
return Set.new if unique_wallets.empty?
manager = CoinstatsItem::DefiAccountManager.new(coinstats_item)
failed_wallet_keys = Set.new
unique_wallets.each do |wallet|
result = manager.sync_wallet!(address: wallet[:address], blockchain: wallet[:blockchain], provider: coinstats_provider)
failed_wallet_keys << "#{wallet[:blockchain]}:#{wallet[:address]}".downcase unless result
end
failed_wallet_keys
rescue => e
Rails.logger.warn "CoinstatsItem::Importer - DeFi sync failed: #{e.message}"
Set.new
end
end

View File

@@ -315,6 +315,25 @@ class Provider::Coinstats < Provider
raise Error, "CoinStats API request failed: #{e.message}"
end
# Get DeFi positions (staking, LP, yield farming) for a wallet address.
# https://coinstats.app/api-docs/openapi/get-wallet-defi
# @param address [String] Wallet address
# @param connection_id [String] Blockchain/connectionId identifier
# @return [Provider::Response] Response with DeFi position data
def get_wallet_defi(address:, connection_id:)
with_provider_response do
res = self.class.get(
"#{BASE_URL}/wallet/defi",
headers: auth_headers,
query: { address: address, connectionId: connection_id }
)
handle_response(res)
end
rescue SocketError, Net::OpenTimeout, Net::ReadTimeout => e
Rails.logger.error "CoinStats API: GET /wallet/defi failed: #{e.class}: #{e.message}"
raise Error, "CoinStats API request failed: #{e.message}"
end
# Get cryptocurrency balances for multiple wallets in a single request
# https://coinstats.app/api-docs/openapi/get-wallet-balances
# @param wallets [String] Comma-separated list of wallet addresses in format "blockchain:address"

View File

@@ -291,6 +291,20 @@ class CoinstatsAccountTest < ActiveSupport::TestCase
assert_nil CoinstatsAccount.find_by(id: wallet_a.id)
end
test "defi_source? returns true when source is defi" do
@coinstats_account.update!(raw_payload: { source: "defi", address: "0x123", blockchain: "ethereum" })
assert @coinstats_account.defi_source?
refute @coinstats_account.wallet_source?
end
test "wallet_source? returns true for wallet with address and blockchain, false for defi" do
@coinstats_account.update!(raw_payload: { address: "0x123", blockchain: "ethereum" })
assert @coinstats_account.wallet_source?
@coinstats_account.update!(raw_payload: { source: "defi", address: "0x123", blockchain: "ethereum" })
refute @coinstats_account.wallet_source?
end
test "portfolio exchange account derives total and cash balances from embedded coins" do
@family.update!(currency: "EUR")

View File

@@ -10,6 +10,8 @@ class CoinstatsItem::ImporterTest < ActiveSupport::TestCase
)
@mock_provider = mock("Provider::Coinstats")
# Stub DeFi endpoint globally — individual tests override if needed
@mock_provider.stubs(:get_wallet_defi).returns(success_response({ protocols: [] }))
end
# Helper to wrap data in Provider::Response
@@ -592,4 +594,218 @@ class CoinstatsItem::ImporterTest < ActiveSupport::TestCase
assert_equal 5000.0, coinstats_account1.current_balance.to_f # 2.0 * 2500
assert_equal 4500.0, coinstats_account2.current_balance.to_f # 0.1 * 45000
end
# DeFi / staking tests
test "creates DeFi account with balance equal to total position value, not quantity * price" do
crypto = Crypto.create!
account = @family.accounts.create!(
accountable: crypto,
name: "Ethereum Wallet",
balance: 1000,
currency: "USD"
)
coinstats_account = @coinstats_item.coinstats_accounts.create!(
name: "Ethereum (0x12...abc)",
currency: "USD",
account_id: "ethereum",
raw_payload: { address: "0x123abc", blockchain: "ethereum" }
)
AccountProvider.create!(account: account, provider: coinstats_account)
# DeFi response: 32 ETH staked, total position value = $70,272 (= 32 * $2196)
# The `price` field is TotalValueDto (total position value), NOT price per token.
defi_response = {
protocols: [
{
id: "lido",
name: "Lido",
logo: "https://example.com/lido.png",
investments: [
{
name: "Staking",
assets: [
{
title: "Deposit",
coinId: "ethereum",
symbol: "ETH",
amount: 32.0,
price: { USD: 70272.0 } # total value, not per-token
}
]
}
]
}
]
}
@mock_provider.expects(:get_wallet_defi)
.with(address: "0x123abc", connection_id: "ethereum")
.returns(success_response(defi_response))
@mock_provider.stubs(:get_wallet_balances).returns(success_response([]))
@mock_provider.stubs(:extract_wallet_balance).returns([])
@mock_provider.stubs(:get_wallet_transactions).returns(success_response([]))
@mock_provider.stubs(:extract_wallet_transactions).returns([])
assert_difference "CoinstatsAccount.count", 1 do
assert_difference "Account.count", 1 do
CoinstatsItem::Importer.new(@coinstats_item, coinstats_provider: @mock_provider).import
end
end
defi_account = @coinstats_item.coinstats_accounts.find_by(account_id: "defi:ethereum:lido:staking:ethereum:deposit")
assert_not_nil defi_account
assert_equal "defi", defi_account.raw_payload["source"]
# Balance must be the total position value ($70,272), NOT 32 * $70,272
assert_equal 70272.0, defi_account.current_balance.to_f
assert_equal "ETH (Lido Staking)", defi_account.name
end
test "zeros out DeFi account when staking position is no longer active" do
crypto = Crypto.create!
account = @family.accounts.create!(
accountable: crypto,
name: "Ethereum Wallet",
balance: 1000,
currency: "USD"
)
wallet_account = @coinstats_item.coinstats_accounts.create!(
name: "Ethereum (0x12...abc)",
currency: "USD",
account_id: "ethereum",
raw_payload: { address: "0x123abc", blockchain: "ethereum" }
)
AccountProvider.create!(account: account, provider: wallet_account)
# Existing DeFi account from a previous sync
defi_crypto = Crypto.create!
defi_linked_account = @family.accounts.create!(
accountable: defi_crypto,
name: "ETH (Lido Staking)",
balance: 70272,
currency: "USD"
)
defi_account = @coinstats_item.coinstats_accounts.create!(
name: "ETH (Lido Staking)",
currency: "USD",
account_id: "defi:ethereum:lido:staking:ethereum:deposit",
wallet_address: "0x123abc",
current_balance: 70272,
raw_payload: {
source: "defi",
address: "0x123abc",
blockchain: "ethereum",
protocol_id: "lido",
amount: 32.0,
balance: 70272.0
}
)
AccountProvider.create!(account: defi_linked_account, provider: defi_account)
# DeFi response returns empty — position has been fully unstaked
@mock_provider.expects(:get_wallet_defi)
.with(address: "0x123abc", connection_id: "ethereum")
.returns(success_response({ protocols: [] }))
@mock_provider.stubs(:get_wallet_balances).returns(success_response([]))
@mock_provider.stubs(:extract_wallet_balance).returns([])
@mock_provider.stubs(:get_wallet_transactions).returns(success_response([]))
@mock_provider.stubs(:extract_wallet_transactions).returns([])
CoinstatsItem::Importer.new(@coinstats_item, coinstats_provider: @mock_provider).import
defi_account.reload
assert_equal 0.0, defi_account.current_balance.to_f
end
test "defi accounts are skipped in wallet update loop" do
crypto = Crypto.create!
account = @family.accounts.create!(
accountable: crypto,
name: "Ethereum Wallet",
balance: 1000,
currency: "USD"
)
wallet_account = @coinstats_item.coinstats_accounts.create!(
name: "Ethereum",
currency: "USD",
account_id: "ethereum",
raw_payload: { address: "0x123abc", blockchain: "ethereum" }
)
AccountProvider.create!(account: account, provider: wallet_account)
defi_crypto = Crypto.create!
defi_linked_account = @family.accounts.create!(
accountable: defi_crypto,
name: "ETH (Lido Staking)",
balance: 1000,
currency: "USD"
)
defi_account = @coinstats_item.coinstats_accounts.create!(
name: "ETH (Lido Staking)",
currency: "USD",
account_id: "defi:ethereum:lido:staking:ethereum:deposit",
wallet_address: "0x123abc",
current_balance: 1000,
raw_payload: {
source: "defi",
address: "0x123abc",
blockchain: "ethereum",
amount: 0.5,
balance: 1000.0
}
)
AccountProvider.create!(account: defi_linked_account, provider: defi_account)
# get_wallet_defi called once (for the one wallet), get_wallet_balances/transactions only
# called once despite two linked accounts (DeFi account excluded from wallet fetch)
@mock_provider.expects(:get_wallet_defi)
.with(address: "0x123abc", connection_id: "ethereum")
.once
.returns(success_response({ protocols: [] }))
@mock_provider.expects(:get_wallet_balances).with("ethereum:0x123abc").once
.returns(success_response([]))
@mock_provider.stubs(:extract_wallet_balance).returns([])
@mock_provider.expects(:get_wallet_transactions).with("ethereum:0x123abc").once
.returns(success_response([]))
@mock_provider.stubs(:extract_wallet_transactions).returns([])
result = CoinstatsItem::Importer.new(@coinstats_item, coinstats_provider: @mock_provider).import
assert result[:success]
end
test "propagates DeFi sync failure into accounts_failed count" do
crypto = Crypto.create!
account = @family.accounts.create!(
accountable: crypto,
name: "Ethereum Wallet",
balance: 1000,
currency: "USD"
)
coinstats_account = @coinstats_item.coinstats_accounts.create!(
name: "Ethereum",
currency: "USD",
account_id: "ethereum",
raw_payload: { address: "0x123abc", blockchain: "ethereum" }
)
AccountProvider.create!(account: account, provider: coinstats_account)
@mock_provider.expects(:get_wallet_defi)
.with(address: "0x123abc", connection_id: "ethereum")
.raises(Provider::Coinstats::Error.new("DeFi endpoint unavailable"))
@mock_provider.stubs(:get_wallet_balances).returns(success_response([]))
@mock_provider.stubs(:extract_wallet_balance).returns([])
@mock_provider.stubs(:get_wallet_transactions).returns(success_response([]))
@mock_provider.stubs(:extract_wallet_transactions).returns([])
result = CoinstatsItem::Importer.new(@coinstats_item, coinstats_provider: @mock_provider).import
# Wallet account still updated, but DeFi failure is counted
assert_equal 1, result[:accounts_failed]
refute result[:success]
end
end