mirror of
https://github.com/we-promise/sure.git
synced 2026-05-29 15:34:58 +00:00
* feat(binance): add full account sync and transaction processing - Fixed a bug that hindered Account setup - Wire up Binance accounts, sync statistics, and unlinked account tracking in the accounts dashboard. - Support setting a sync_start_date during Binance account setup. - Set Binance accounts' opening balance to zero to ensure the ledger builds cleanly from the actual trade history. - Expand the Binance importer and processor to handle Spot, Margin, Earn, P2P, and Futures trades and assets. - Implement TransactionBuilder to parse raw Binance trades, accurately calculating fees, base/quote asset amounts, and market values for proper ledger integration. - Update Binance API timeout (`recvWindow`) to 60,000ms to prevent connection drops. These changes provide comprehensive support for tracking Binance portfolios, ensuring accurate historical ledgers and proper visibility of sync statuses in the frontend dashboard. * refactor(binance): enforce strong params, double-entry safety, and native fiat currency support - Implement strong parameters in BinanceItemsController#complete_account_setup to satisfy Rails security guidelines. - Add robust date parsing with a grace fallback to prevent controller crashes on malformed sync start dates. - Wrap P2P transaction creations inside a database transaction block to guarantee ledger integrity and prevent orphan records. - Optimize P2P deduplication queries by batching checks for both transaction and funding external IDs. - Shift P2P entry persistence from forced USD tracking to native fiat values extracted directly from the Binance API payload. - Update BinanceAccount::ProcessorTest assertions and fixtures to validate native fiat and fee calculation logic. * fix(binance): process sync trades before caching transaction payload - Reorder Binance processor execution to insert trade records into the database prior to updating the `raw_transactions_payload` cache. This guarantees that if a database insertion fails, the cache won't prematurely mark the sync as successful, ensuring the data is retried on the next run. - Move `set_opening_anchor_balance(balance: 0)` out of the generic crypto exchange account builder and apply it specifically during Binance account creation. - Refactor date parsing in BinanceItemsController to explicitly catch `ArgumentError` via a block instead of using a blanket inline `rescue`. - Clean up the `setup_accounts` view template by removing hardcoded default translation strings. * fix(binance): enhance trade sync logic and error propagation - Pass `startTime` (from `sync_start_date`) to spot and futures trade endpoints on initial sync to optimize data fetching. - Include previously synced futures pairs alongside spot pairs when resolving relevant symbols to properly recover sold-out assets. - Re-raise exceptions in processor rescue blocks to prevent silent failures and ensure errors are correctly propagated to background jobs. - Decrease Binance API `recvWindow` from 60000ms to 5000ms to align with recommended default timeout values.
468 lines
18 KiB
Ruby
468 lines
18 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
# Updates account balance and imports spot trades.
|
|
class BinanceAccount::Processor
|
|
include BinanceAccount::UsdConverter
|
|
|
|
# Quote currencies probed when fetching trade history. Ordered by prevalence so
|
|
# the most common pairs are tried first and rate-limit weight is front-loaded.
|
|
TRADE_QUOTE_CURRENCIES = %w[USDT BUSD FDUSD BTC ETH BNB].freeze
|
|
|
|
attr_reader :binance_account
|
|
|
|
def initialize(binance_account)
|
|
@binance_account = binance_account
|
|
end
|
|
|
|
def process
|
|
unless binance_account.current_account.present?
|
|
Rails.logger.info "BinanceAccount::Processor - no linked account for #{binance_account.id}, skipping"
|
|
return
|
|
end
|
|
|
|
begin
|
|
BinanceAccount::HoldingsProcessor.new(binance_account).process
|
|
rescue StandardError => e
|
|
Rails.logger.error "BinanceAccount::Processor - holdings failed for #{binance_account.id}: #{e.message}"
|
|
end
|
|
|
|
begin
|
|
process_account!
|
|
rescue StandardError => e
|
|
Rails.logger.error "BinanceAccount::Processor - account update failed for #{binance_account.id}: #{e.message}"
|
|
raise
|
|
end
|
|
|
|
fetch_and_process_trades
|
|
end
|
|
|
|
private
|
|
|
|
def target_currency
|
|
binance_account.binance_item.family.currency
|
|
end
|
|
|
|
def process_account!
|
|
account = binance_account.current_account
|
|
raw_usd = (binance_account.current_balance || 0).to_d
|
|
amount, stale, rate_date = convert_from_usd(raw_usd, date: Date.current)
|
|
stale_extra = build_stale_extra(stale, rate_date, Date.current)
|
|
|
|
account.update!(
|
|
balance: amount,
|
|
cash_balance: 0,
|
|
currency: target_currency
|
|
)
|
|
|
|
binance_account.update!(extra: binance_account.extra.to_h.deep_merge(stale_extra))
|
|
end
|
|
|
|
def fetch_and_process_trades
|
|
provider = binance_account.binance_item&.binance_provider
|
|
return unless provider
|
|
|
|
# 1. Initialize data from existing payload
|
|
existing_spot = binance_account.raw_transactions_payload&.dig("spot") || {}
|
|
existing_futures = binance_account.raw_transactions_payload&.dig("futures") || {}
|
|
existing_p2p = binance_account.raw_transactions_payload&.dig("p2p") || []
|
|
|
|
# 2. Fetch P2P Trades (This now runs even if you have no spot assets)
|
|
new_p2p = fetch_new_p2p_trades(provider, existing_p2p)
|
|
|
|
# 3. Handle Spot & Futures symbols
|
|
symbols = extract_trade_symbols
|
|
new_trades_by_symbol = {}
|
|
new_futures_by_symbol = {}
|
|
|
|
# Only attempt to loop if we actually have symbols (e.g., BTC, ETH)
|
|
if symbols.any?
|
|
symbols.each do |symbol|
|
|
TRADE_QUOTE_CURRENCIES.each do |quote|
|
|
pair = "#{symbol}#{quote}"
|
|
begin
|
|
new_trades = fetch_new_trades(provider, pair, existing_spot[pair], :spot)
|
|
new_trades_by_symbol[pair] = new_trades if new_trades.present?
|
|
rescue Provider::Binance::InvalidSymbolError => e
|
|
Rails.logger.debug "BinanceAccount::Processor - skipping spot #{pair}: #{e.message}"
|
|
end
|
|
|
|
begin
|
|
new_futures = fetch_new_trades(provider, pair, existing_futures[pair], :futures)
|
|
new_futures_by_symbol[pair] = new_futures if new_futures.present?
|
|
rescue Provider::Binance::InvalidSymbolError => e
|
|
Rails.logger.debug "BinanceAccount::Processor - skipping futures #{pair}: #{e.message}"
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
# 4. Process New Records into Database Entries FIRST
|
|
# We process these into the DB first. If they fail or raise an error,
|
|
# the method halts before updating the raw_transactions_payload cache,
|
|
# ensuring a retry happens on the next sync execution.
|
|
process_trades(new_trades_by_symbol, :spot) if new_trades_by_symbol.any?
|
|
process_trades(new_futures_by_symbol, :futures) if new_futures_by_symbol.any?
|
|
process_p2p_trades(new_p2p) if new_p2p.any?
|
|
|
|
# 5. Merge Results ONLY after successful DB insertion
|
|
merged_spot = existing_spot.merge(new_trades_by_symbol) { |_pair, old, new_t| old + new_t }
|
|
merged_futures = existing_futures.merge(new_futures_by_symbol) { |_pair, old, new_t| old + new_t }
|
|
merged_p2p = existing_p2p + new_p2p
|
|
|
|
# 6. Update the Account Payload LAST (Safe Caching Boundary)
|
|
binance_account.update!(raw_transactions_payload: {
|
|
"spot" => merged_spot,
|
|
"futures" => merged_futures,
|
|
"p2p" => merged_p2p,
|
|
"fetched_at" => Time.current.iso8601
|
|
})
|
|
end
|
|
|
|
# Fetches only trades newer than what is already cached for the given pair.
|
|
# On the first sync (no cached trades) fetches the most recent page.
|
|
# On subsequent syncs starts from max_cached_id + 1 and paginates forward.
|
|
def fetch_new_trades(provider, pair, cached_trades, market_type)
|
|
limit = 1000
|
|
max_cached_id = cached_trades&.map { |t| t["id"].to_i }&.max
|
|
|
|
from_id = max_cached_id ? max_cached_id + 1 : nil
|
|
start_time = nil
|
|
unless max_cached_id
|
|
start_time = binance_account.binance_item&.sync_start_date&.to_time&.to_i&.*(1000)
|
|
end
|
|
all_new = []
|
|
|
|
loop do
|
|
page = if market_type == :spot
|
|
provider.get_spot_trades(pair, limit: limit, from_id: from_id, startTime: start_time)
|
|
else
|
|
provider.get_futures_trades(pair, limit: limit, from_id: from_id, startTime: start_time)
|
|
end
|
|
break if page.blank?
|
|
|
|
all_new.concat(page)
|
|
break if page.size < limit
|
|
|
|
from_id = page.map { |t| t["id"].to_i }.max + 1
|
|
end
|
|
|
|
all_new
|
|
end
|
|
|
|
def fetch_new_p2p_trades(provider, cached_p2p)
|
|
# Binance P2P history endpoint only supports max 30-day windows.
|
|
# If no cache exists, we fetch back to sync_start_date (or default 30 days).
|
|
# If cache exists, we fetch from the last cached trade timestamp.
|
|
max_cached_timestamp = cached_p2p&.map { |t| t["createTime"].to_i }&.max
|
|
|
|
start_time = if max_cached_timestamp
|
|
max_cached_timestamp
|
|
elsif binance_account.binance_item&.sync_start_date
|
|
binance_account.binance_item.sync_start_date.to_time.to_i * 1000
|
|
else
|
|
(Time.current - 30.days).to_i * 1000
|
|
end
|
|
|
|
all_new = []
|
|
current_start = start_time
|
|
|
|
loop do
|
|
current_end = [ current_start + 30.days.to_i * 1000, Time.current.to_i * 1000 ].min
|
|
|
|
page = provider.get_all_p2p_trades(start_timestamp: current_start, end_timestamp: current_end)
|
|
|
|
# We might fetch overlapping trades if they share the exact timestamp, filter by unique orderNumber
|
|
if page.present?
|
|
cached_order_numbers = cached_p2p&.map { |t| t["orderNumber"] } || []
|
|
new_order_numbers = all_new.map { |t| t["orderNumber"] }
|
|
|
|
unique_page = page.reject do |t|
|
|
cached_order_numbers.include?(t["orderNumber"]) || new_order_numbers.include?(t["orderNumber"])
|
|
end
|
|
|
|
all_new.concat(unique_page)
|
|
end
|
|
|
|
break if current_end >= Time.current.to_i * 1000
|
|
current_start = current_end + 1
|
|
end
|
|
|
|
all_new
|
|
end
|
|
|
|
def extract_trade_symbols
|
|
stablecoins = BinanceAccount::STABLECOINS
|
|
quote_re = /(#{TRADE_QUOTE_CURRENCIES.join("|")})$/
|
|
|
|
# Base symbols from today's asset snapshot
|
|
assets = binance_account.raw_payload&.dig("assets") || []
|
|
current = assets.map { |a| a["symbol"] || a[:symbol] }.compact
|
|
|
|
# Base symbols from previously fetched pairs (recovers sold-out assets)
|
|
prev_spot = binance_account.raw_transactions_payload&.dig("spot")&.keys || []
|
|
prev_futures = binance_account.raw_transactions_payload&.dig("futures")&.keys || []
|
|
prev_pairs = (prev_spot + prev_futures).uniq
|
|
previous = prev_pairs.map { |pair| pair.gsub(quote_re, "") }
|
|
|
|
(current + previous).uniq.compact.reject { |s| s.blank? || stablecoins.include?(s) }
|
|
end
|
|
|
|
def process_trades(trades_by_symbol, market_type)
|
|
trades_by_symbol.each do |pair, trades|
|
|
trades.each { |trade| process_trade(trade, pair, market_type) }
|
|
end
|
|
rescue StandardError => e
|
|
Rails.logger.error "BinanceAccount::Processor - trade processing failed: #{e.message}"
|
|
raise
|
|
end
|
|
|
|
def process_trade(trade, pair, market_type)
|
|
account = binance_account.current_account
|
|
return unless account
|
|
|
|
quote_suffix = TRADE_QUOTE_CURRENCIES.find { |q| pair.end_with?(q) }
|
|
base_symbol = quote_suffix ? pair.delete_suffix(quote_suffix) : pair
|
|
return if base_symbol.blank?
|
|
|
|
ticker = "CRYPTO:#{base_symbol}"
|
|
security = BinanceAccount::SecurityResolver.resolve(ticker, base_symbol)
|
|
|
|
return unless security
|
|
|
|
prefix = market_type == :spot ? "spot" : "futures"
|
|
external_id = "binance_#{prefix}_#{pair}_#{trade["id"]}"
|
|
return if account.entries.exists?(external_id: external_id)
|
|
|
|
date = Time.zone.at(trade["time"].to_i / 1000).to_date
|
|
qty = trade["qty"].to_d
|
|
price_raw = trade["price"].to_d
|
|
quote_qty = trade["quoteQty"].to_d
|
|
|
|
# quoteQty and price are denominated in the quote currency (e.g. BTC for ETHBTC).
|
|
# Convert to USD so all entries and cost-basis calculations share a common currency.
|
|
quote_symbol = quote_suffix || "USDT"
|
|
amount_usd_raw = quote_to_usd(quote_qty, quote_symbol, date: date)
|
|
price_usd = quote_to_usd(price_raw, quote_symbol, date: date)
|
|
|
|
if amount_usd_raw.nil? || price_usd.nil?
|
|
Rails.logger.warn "BinanceAccount::Processor - skipping trade #{trade["id"]} for #{pair}: could not convert #{quote_symbol} to USD"
|
|
return
|
|
end
|
|
|
|
amount_usd = amount_usd_raw.round(2)
|
|
commission = commission_in_usd(trade, base_symbol, price_usd, date: date)
|
|
is_buyer = trade.key?("isBuyer") ? trade["isBuyer"] : trade["buyer"]
|
|
|
|
if is_buyer
|
|
account.entries.create!(
|
|
date: date,
|
|
name: "Buy #{qty.round(8)} #{base_symbol}",
|
|
amount: -amount_usd,
|
|
currency: "USD",
|
|
external_id: external_id,
|
|
source: "binance",
|
|
entryable: Trade.new(
|
|
security: security,
|
|
qty: qty,
|
|
price: price_usd,
|
|
currency: "USD",
|
|
fee: commission,
|
|
investment_activity_label: "Buy"
|
|
)
|
|
)
|
|
else
|
|
account.entries.create!(
|
|
date: date,
|
|
name: "Sell #{qty.round(8)} #{base_symbol}",
|
|
amount: amount_usd,
|
|
currency: "USD",
|
|
external_id: external_id,
|
|
source: "binance",
|
|
entryable: Trade.new(
|
|
security: security,
|
|
qty: -qty,
|
|
price: price_usd,
|
|
currency: "USD",
|
|
fee: commission,
|
|
investment_activity_label: "Sell"
|
|
)
|
|
)
|
|
end
|
|
rescue StandardError => e
|
|
Rails.logger.error "BinanceAccount::Processor - failed to process trade #{trade["id"]}: #{e.message}"
|
|
raise
|
|
end
|
|
|
|
# Converts an amount denominated in quote_symbol to USD.
|
|
# Stablecoins are treated as 1:1.
|
|
# For fiat/crypto assets, tries Binance historical price first, falls back to internal ExchangeRate.
|
|
def quote_to_usd(amount, quote_symbol, date: nil)
|
|
return amount if BinanceAccount::STABLECOINS.include?(quote_symbol)
|
|
return amount if quote_symbol.to_s.upcase == "USD"
|
|
|
|
provider = binance_account.binance_item&.binance_provider
|
|
|
|
if provider
|
|
spot = nil
|
|
begin
|
|
spot = provider.get_historical_price("#{quote_symbol}USDT", date) if date.present? && provider.respond_to?(:get_historical_price)
|
|
spot ||= provider.get_spot_price("#{quote_symbol}USDT")
|
|
rescue Provider::Binance::InvalidSymbolError
|
|
# Fall through to ExchangeRate lookup
|
|
end
|
|
return (amount * spot.to_d).round(8) if spot.present?
|
|
end
|
|
|
|
# Fallback to internal app ExchangeRate provider (crucial for P2P fiat currencies like TZS, NGN)
|
|
fallback_rate = ExchangeRate.find_or_fetch_rate(from: quote_symbol, to: "USD", date: date || Date.current, cache: true)
|
|
if fallback_rate.present?
|
|
# Extract the numeric rate from the returned object (or use it directly if it's already a number)
|
|
rate_val = fallback_rate.respond_to?(:rate) ? fallback_rate.rate : fallback_rate
|
|
return (amount * rate_val.to_d).round(8)
|
|
end
|
|
|
|
nil
|
|
rescue StandardError => e
|
|
Rails.logger.warn "BinanceAccount::Processor - could not convert #{quote_symbol} to USD: #{e.message}"
|
|
nil
|
|
end
|
|
|
|
# Converts the trade commission to USD.
|
|
# commissionAsset can be: a stablecoin (≈ 1 USD), the base asset, or something else (e.g. BNB).
|
|
def process_p2p_trades(trades)
|
|
account = binance_account.current_account
|
|
return unless account
|
|
|
|
Rails.logger.info "BinanceAccount::Processor - found #{trades.size} P2P trades to process"
|
|
|
|
trades.each do |trade|
|
|
external_id = "binance_p2p_#{trade["orderNumber"]}"
|
|
funding_external_id = "#{external_id}_funding"
|
|
|
|
# Deduplicate by checking for either the Trade or Funding leg in a single query
|
|
if account.entries.where(external_id: [ external_id, funding_external_id ]).exists?
|
|
Rails.logger.info "BinanceAccount::Processor - skipping P2P trade #{trade["orderNumber"]}: already exists in DB"
|
|
next
|
|
end
|
|
|
|
date = Time.zone.at(trade["createTime"].to_i / 1000).to_date
|
|
trade_type = trade["tradeType"] # BUY or SELL
|
|
|
|
begin
|
|
# Grab the exact Fiat and Crypto truth straight from the payload
|
|
fiat_currency = trade["fiat"]
|
|
fiat_amount = trade["totalPrice"].to_d
|
|
fiat_price = trade["unitPrice"].to_d
|
|
|
|
crypto_asset = trade["asset"]
|
|
gross_crypto = trade["amount"].to_d
|
|
net_crypto = (trade["takerAmount"] || gross_crypto).to_d
|
|
crypto_fee = (trade["takerCommission"] || 0).to_d
|
|
|
|
ticker = "CRYPTO:#{crypto_asset}"
|
|
security = BinanceAccount::SecurityResolver.resolve(ticker, crypto_asset)
|
|
|
|
unless security
|
|
Rails.logger.warn "BinanceAccount::Processor - skipping P2P trade #{trade["orderNumber"]}: could not resolve security for #{crypto_asset}"
|
|
next
|
|
end
|
|
|
|
# Convert the crypto fee (if any) to its fiat equivalent using the trade's exact unit price
|
|
fiat_fee = (crypto_fee * fiat_price).round(2)
|
|
|
|
# 3. AI Fix: Wrap the double-entry in a transaction block to guarantee ledger integrity
|
|
account.transaction do
|
|
if trade_type == "BUY"
|
|
# BUY LOGIC: User sent Fiat from their bank, received Crypto
|
|
account.entries.create!(
|
|
date: date,
|
|
name: "P2P Payment (#{fiat_currency})",
|
|
amount: -fiat_amount, # Fiat leaving the system
|
|
currency: fiat_currency,
|
|
external_id: funding_external_id,
|
|
source: "binance",
|
|
entryable: Transaction.new
|
|
)
|
|
|
|
account.entries.create!(
|
|
date: date,
|
|
name: "P2P Buy #{gross_crypto.round(8)} #{crypto_asset}",
|
|
amount: fiat_amount, # Fiat value entering as Crypto (Cost Basis)
|
|
currency: fiat_currency,
|
|
external_id: external_id,
|
|
source: "binance",
|
|
entryable: Trade.new(
|
|
security: security,
|
|
qty: net_crypto,
|
|
price: fiat_price,
|
|
currency: fiat_currency,
|
|
fee: fiat_fee,
|
|
investment_activity_label: "Buy"
|
|
)
|
|
)
|
|
else
|
|
# SELL LOGIC: User liquidated Crypto, received Fiat to their bank
|
|
account.entries.create!(
|
|
date: date,
|
|
name: "P2P Sell #{gross_crypto.round(8)} #{crypto_asset}",
|
|
amount: -fiat_amount, # Fiat value of Crypto leaving
|
|
currency: fiat_currency,
|
|
external_id: external_id,
|
|
source: "binance",
|
|
entryable: Trade.new(
|
|
security: security,
|
|
qty: -net_crypto,
|
|
price: fiat_price,
|
|
currency: fiat_currency,
|
|
fee: fiat_fee,
|
|
investment_activity_label: "Sell"
|
|
)
|
|
)
|
|
|
|
account.entries.create!(
|
|
date: date,
|
|
name: "P2P Receipt (#{fiat_currency})",
|
|
amount: fiat_amount, # Fiat entering the system
|
|
currency: fiat_currency,
|
|
external_id: funding_external_id,
|
|
source: "binance",
|
|
entryable: Transaction.new
|
|
)
|
|
end
|
|
end
|
|
rescue => e
|
|
Rails.logger.error "BINANCE P2P SYNC CRASHED for Order #{trade["orderNumber"]}: #{e.message}"
|
|
raise
|
|
end
|
|
end
|
|
rescue StandardError => e
|
|
Rails.logger.error "BinanceAccount::Processor - P2P trade processing failed: #{e.message}"
|
|
raise
|
|
end
|
|
|
|
def commission_in_usd(trade, base_symbol, trade_price, date: nil)
|
|
raw = trade["commission"].to_d
|
|
commission_asset = trade["commissionAsset"].to_s.upcase
|
|
return 0 if raw.zero? || commission_asset.blank?
|
|
|
|
stablecoins = BinanceAccount::STABLECOINS
|
|
return raw if stablecoins.include?(commission_asset)
|
|
|
|
# Fee in base asset (e.g. BTC for BTCUSDT) — convert using trade price
|
|
return (raw * trade_price).round(8) if commission_asset == base_symbol
|
|
|
|
# Fee in another asset (typically BNB) — fetch current USDT spot price as approximation
|
|
provider = binance_account.binance_item&.binance_provider
|
|
return 0 unless provider
|
|
|
|
spot = nil
|
|
spot = provider.get_historical_price("#{commission_asset}USDT", date) if date.present? && provider.respond_to?(:get_historical_price)
|
|
spot ||= provider.get_spot_price("#{commission_asset}USDT")
|
|
|
|
(raw * spot.to_d).round(8)
|
|
rescue StandardError => e
|
|
Rails.logger.warn "BinanceAccount::Processor - could not convert commission for #{trade["id"]}: #{e.message}"
|
|
0
|
|
end
|
|
end
|