Files
sure/app/models/provider/openai.rb
Guillem Arias Fauste 8251b7e4d6 feat(ai): add Anthropic provider with chat parity (1/5) (#1983)
* feat(ai): add Anthropic provider with chat parity (1/5)

Introduces Provider::Anthropic alongside Provider::Openai, implementing
the LlmConcept chat_response contract over the official anthropic Ruby
SDK. Batch ops, PDF, and RAG land in follow-up PRs.

- Provider::Anthropic uses Messages API for sync and streaming responses
- ChatConfig builds requests with ephemeral prompt-cache markers on the
  system prompt and the last tool definition
- MessageFormatter reconstructs multi-turn history (text + tool_use +
  tool_result blocks) from raw Message records, including the paired
  user-role tool_result turn Anthropic requires after every tool_use
- ChatParser maps Anthropic Message into the shared ChatResponse Data
- Registry, Setting, User, Chat default model wired for ANTHROPIC_*
  envs and Setting.anthropic_*; LLM_PROVIDER selects between providers
- Responder forwards raw conversation_history (Array<Message>) so
  providers without hosted conversation state can rebuild context
- OpenAI provider accepts and ignores the new kwarg (no behavior change)

Tests cover provider init, model gating, MessageFormatter for all turn
shapes, ChatConfig request building (max_tokens, system cache, tool
conversion), ChatParser for text / tool_use / mixed blocks, Registry
discovery, and mocked chat_response success / error / function_request
paths. Live VCR cassettes recorded in a follow-up with a real key.

Stacked PRs: 2/5 batch ops + cost ledger, 3/5 PDF, 4/5 pgvector RAG,
5/5 settings UI + disclosure.

* fix(ai): address PR review on Anthropic provider foundation

Surface fixes raised by Codex + CodeRabbit on PR 1/5:

- Provider::Anthropic#chat_response now accepts (and ignores) a
  `messages:` kwarg. Assistant::Responder passes both `messages:`
  (OpenAI-shape) and `conversation_history:` (raw Message records) for
  cross-provider parity, so the previous signature raised
  ArgumentError on the first chat turn through the Anthropic provider.
- Provider::Anthropic#supports_model? bypasses the `claude` prefix
  gate when a custom base_url is configured, mirroring the OpenAI
  provider. Bedrock-shaped IDs like
  `anthropic.claude-sonnet-4-5-20250929-v1:0` and
  `claude-opus-4@20250514` are otherwise rejected by
  Assistant::Provided#get_model_provider and the chat dies.
- Setting.anthropic_access_token is now in
  EncryptedSettingFields::ENCRYPTED_FIELDS so the Anthropic API key
  is encrypted at rest like every other provider secret. Previously
  plaintext while siblings (openai_access_token, twelve_data_api_key,
  external_assistant_token) were ciphertext.
- Chat.default_model falls back to whichever provider is actually
  configured. Previously, with LLM_PROVIDER=anthropic but no
  Anthropic credentials, the default model resolved to a Claude ID
  that no registered provider supported, so chats failed even when
  OpenAI was fully configured. Adds Provider::{Anthropic,Openai}#configured?
  class methods for the readable callsite.
- Provider::Anthropic.effective_model uses
  `ENV["ANTHROPIC_MODEL"].presence || Setting.anthropic_model` so the
  Setting lookup is only performed when the env var is absent — the
  previous `ENV.fetch(KEY, default)` evaluated the default arg
  eagerly on every call.
- Provider::Anthropic::ChatConfig#anthropic_input_schema strips both
  `:strict` and `"strict"` keys so JSON-decoded schemas with string
  keys cannot leak the OpenAI-only flag through to Anthropic.

Test coverage added: supports_model? bypass on custom endpoints,
chat_response messages: kwarg compatibility, default_model fallback
in the three credential combinations, configured? against ENV +
Setting, strict-flag stripping for both key types, and a
`Setting.expects(:anthropic_model).never` assertion proving the
ENV-precedence test now exercises the lazy path.

All 4365 tests pass (1 pre-existing libvips env error unrelated).

* test(chat): make default_model tests resilient to ENV model overrides

CodeRabbit flagged on PR review: the new default_model tests asserted
against Provider::*::DEFAULT_MODEL, but Chat.default_model actually
returns Provider::*.effective_model.presence (which reads
OPENAI_MODEL / ANTHROPIC_MODEL from the environment). With either env
var set, the tests would fail intermittently even though routing was
correct.

- New default_model tests now assert against the provider's
  effective_model directly, so they verify the routing decision
  (which provider's value wins) without coupling to the constant.
- Pre-existing "creates with default model" assertions had the same
  brittleness; switch them to compare against Chat.default_model so
  the chosen model is whatever the env / Setting cascade resolves to.

Verified by running `ANTHROPIC_MODEL=claude-haiku-4-5 OPENAI_MODEL=gpt-4o
bin/rails test test/models/chat_test.rb` — 16 runs, 0 failures
(previously 2 pre-existing failures + 0 from the new tests).

* fix(ai): address local review on Anthropic foundation

- Provider::Anthropic#supports_pdf_processing? bypasses prefix gate for
  custom endpoints, mirroring supports_model?
- Provider::Anthropic#initialize raises Error when custom_endpoint? AND
  model.blank?, parity with Provider::Openai
- stream_chat_response captures partial usage on mid-stream errors and
  records it via the new on_partial callback so chat_response can skip
  the duplicate error row in the outer rescue
- safe_accumulated_message swallows the secondary failure when the SDK
  cannot reconstruct a snapshot
- langfuse_client memoizes properly (||= instead of =) so repeated calls
  don't churn Langfuse instances
- MessageFormatter sorts tool_calls by created_at then id so the
  message array is deterministic across replays; skips tool_calls
  missing both provider_call_id and provider_id rather than sending
  `id: nil` and getting rejected by Anthropic
- Setting.anthropic_access_token default falls back through
  ENV["ANTHROPIC_API_KEY"].presence (was missing .presence, so an
  empty-string env value bled through)
- User#openai_configured? / #anthropic_configured? delegate to the
  Provider::* class methods — single source of truth
- Assistant::Responder renames the OpenAI-shape history builder
  conversation_history → openai_messages_payload so the kwarg name
  matches the local method name (messages: openai_messages_payload,
  conversation_history: chat_message_records)
- Assistant::Builtin stale-history comment updated to reference both
  builders

Adds a streaming chat_response test using ad-hoc subclasses of the
SDK event types so the case/when dispatch matches via is_a? without
stubbing class-level === behavior.

* test(ai): add Anthropic tool_use round-trip + multi-tool turn coverage

Addresses @jjmata's "worth confirming" note on PR #1983: tool-use turns
from prior assistant messages must round-trip correctly when retrieved
from the database.

- New `ChatParser → ToolCall::Function → MessageFormatter` test walks
  the full path: Anthropic response with a tool_use block →
  ChatFunctionRequest → ToolCall::Function.from_function_request →
  persisted on the AssistantMessage → MessageFormatter rebuild on the
  next turn. Asserts the original `tool_use.id` is preserved end-to-end
  as both `tool_use.id` and the paired `tool_result.tool_use_id`, and
  that the original `input` hash and serialized result content survive.
- New multi-tool assistant turn test confirms two tool_use blocks on a
  single assistant message render as two tool_use blocks followed by
  two paired tool_result blocks in a single user-role follow-up,
  matching Anthropic's required alternation.

Both tests exercise the existing PR1 code without behavior changes.

* test(ai): require "ostruct" explicitly in Anthropic provider tests

OpenStruct is moving out of Ruby's default load path (warning in 3.4+,
removed in 3.5+). Tests work today because ActiveSupport transitively
loads it, but that's incidental. Match the existing convention in
test/controllers/settings/hostings_controller_test.rb which explicitly
requires ostruct for the same reason.

* fix(ai): sanitize Langfuse warn logs, normalize tool_use.input, dedup history fetch

Addresses three open CodeRabbit findings on PR #1983.

- Provider::Anthropic Langfuse rescue branches no longer include
  `e.full_message` in `Rails.logger.warn`. `full_message` bundles the
  backtrace + cause chain and on some SDK error types includes the
  serialized request/response payload (prompt, model output). Logs
  now report `#{e.class}: #{e.message}` only. Three sites:
  create_langfuse_trace, log_langfuse_generation, upsert_langfuse_trace.
  Note: Provider::Openai has the same pattern (copy-pasted source) —
  harmonization deferred to a follow-up cleanup PR; this commit fixes
  only the Anthropic provider to keep PR scope tight.

- MessageFormatter#parse_arguments now coerces any non-Hash parsed
  result to `{}`. Anthropic's Messages API requires `tool_use.input`
  to be a JSON object (map); a stored ToolCall::Function record whose
  arguments parse to a scalar, bool, or array (corrupt row, legacy
  data, cross-provider bleed) would otherwise produce a payload the
  API rejects. Normal flow stores Hash arguments end-to-end so the
  fix is defensive — adds 2 tests covering scalar/array JSON strings
  and non-String non-Hash inputs.

- Assistant::Responder dedups the chat-history fetch. The previous
  layout fired two near-identical `chat.messages.where(...).includes(
  :tool_calls).ordered` queries per LLM turn (one for the OpenAI-shape
  payload, one for the raw-records kwarg). A new memoized
  `complete_chat_messages` fetches once; `chat_message_records` filters
  out the current message via `Array#reject`, `openai_messages_payload`
  iterates the cached array unchanged. One SQL query per turn instead
  of two. Memoization scope = single Responder instance (per LLM call),
  so cache invalidation is not a concern.

All 4370 tests pass (1 pre-existing libvips env error unrelated).
Rubocop + brakeman clean.

* fix(ci): replace sk-ant- prefixed test placeholders

Pipelock secret scanner pattern-matches `sk-ant-*` as a real Anthropic
API key and fails the PR security-scan check. Test stubs and
ClimateControl env values used `sk-ant-test`, `sk-ant-from-setting`,
`sk-ant-x`, `sk-ant-y` as obvious placeholders, but the scanner does
not care about value entropy.

Switched to `fake-anthropic-key-*` / `fake-token-*` strings so the
scanner stops flagging them. No production code touched, no behavior
change — Provider::Anthropic still accepts any non-blank token.
2026-05-31 16:11:28 +02:00

785 lines
26 KiB
Ruby
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
class Provider::Openai < Provider
include LlmConcept
# Subclass so errors caught in this provider are raised as Provider::Openai::Error
Error = Class.new(Provider::Error)
DEFAULT_MODEL = "gpt-4.1".freeze
SUPPORTED_MODELS = %w[gpt-4 gpt-5 o1 o3].freeze
VISION_CAPABLE_MODEL_PREFIXES = %w[gpt-4o gpt-4-turbo gpt-4.1 gpt-5 o1 o3].freeze
# Returns the effective model that would be used by the provider.
# Priority: explicit ENV > Setting > DEFAULT_MODEL.
def self.effective_model
ENV.fetch("OPENAI_MODEL") { Setting.openai_model }.presence || DEFAULT_MODEL
end
def self.configured?
ENV["OPENAI_ACCESS_TOKEN"].present? || Setting.openai_access_token.present?
end
def initialize(access_token, uri_base: nil, model: nil)
client_options = { access_token: access_token }
llm_uri_base = uri_base.presence
llm_model = model.presence
client_options[:uri_base] = llm_uri_base if llm_uri_base.present?
client_options[:request_timeout] = ENV.fetch("OPENAI_REQUEST_TIMEOUT", 60).to_i
@client = ::OpenAI::Client.new(**client_options)
@uri_base = llm_uri_base
if custom_provider? && llm_model.blank?
raise Error, "Model is required when using a custom OpenAIcompatible provider"
end
@default_model = llm_model.presence || self.class.effective_model
end
def supports_model?(model)
# If using custom uri_base, support any model
return true if custom_provider?
# Otherwise, check if model starts with any supported OpenAI prefix
SUPPORTED_MODELS.any? { |prefix| model.start_with?(prefix) }
end
def supports_responses_endpoint?
return @supports_responses_endpoint if defined?(@supports_responses_endpoint)
env_override = ENV["OPENAI_SUPPORTS_RESPONSES_ENDPOINT"]
if env_override.to_s.present?
return @supports_responses_endpoint = ActiveModel::Type::Boolean.new.cast(env_override)
end
@supports_responses_endpoint = !custom_provider?
end
def provider_name
custom_provider? ? "Custom OpenAI-compatible (#{@uri_base})" : "OpenAI"
end
def supported_models_description
if custom_provider?
@default_model.present? ? "configured model: #{@default_model}" : "any model"
else
"models starting with: #{SUPPORTED_MODELS.join(", ")}"
end
end
def custom_provider?
@uri_base.present?
end
# Token-budget knobs. Precedence: ENV > Setting > default. Defaults match
# Ollama's historical 2048-token baseline so local small-context models work
# out of the box. Users on larger-context cloud models can raise via ENV or
# via the Self-Hosting settings page.
def context_window
positive_budget(ENV["LLM_CONTEXT_WINDOW"], Setting.llm_context_window, 2048)
end
def max_response_tokens
positive_budget(ENV["LLM_MAX_RESPONSE_TOKENS"], Setting.llm_max_response_tokens, 512)
end
def system_prompt_reserve
positive_budget(ENV["LLM_SYSTEM_PROMPT_RESERVE"], nil, 256)
end
def max_history_tokens
explicit = ENV["LLM_MAX_HISTORY_TOKENS"].presence&.to_i
return explicit if explicit&.positive?
[ context_window - max_response_tokens - system_prompt_reserve, 256 ].max
end
# Budget available for a one-shot (non-chat) request's full input,
# excluding reserved response tokens AND the system/instructions prompt.
# Drives the batch slicer for the auto_categorize / auto_detect_merchants /
# enhance_provider_merchants calls — each ships ~200400 tokens of
# instructions + JSON schema that aren't counted in `fixed_tokens`.
def max_input_tokens
[ context_window - max_response_tokens - system_prompt_reserve, 256 ].max
end
def max_items_per_call
positive_budget(ENV["LLM_MAX_ITEMS_PER_CALL"], Setting.llm_max_items_per_call, 25)
end
def auto_categorize(transactions: [], user_categories: [], model: "", family: nil, json_mode: nil)
with_provider_response do
if user_categories.blank?
family_id = family&.id || "unknown"
Rails.logger.error("Cannot auto-categorize transactions for family #{family_id}: no categories available")
raise Error, "No categories available for auto-categorization"
end
effective_model = model.presence || @default_model
trace = create_langfuse_trace(
name: "openai.auto_categorize",
input: { transactions: transactions, user_categories: user_categories }
)
batches = slice_for_context(transactions, fixed: user_categories)
result = batches.flat_map do |batch|
AutoCategorizer.new(
client,
model: effective_model,
transactions: batch,
user_categories: user_categories,
custom_provider: custom_provider?,
langfuse_trace: trace,
family: family,
json_mode: json_mode
).auto_categorize
end
upsert_langfuse_trace(trace: trace, output: result.map(&:to_h))
result
end
end
def auto_detect_merchants(transactions: [], user_merchants: [], model: "", family: nil, json_mode: nil)
with_provider_response do
effective_model = model.presence || @default_model
trace = create_langfuse_trace(
name: "openai.auto_detect_merchants",
input: { transactions: transactions, user_merchants: user_merchants }
)
batches = slice_for_context(transactions, fixed: user_merchants)
result = batches.flat_map do |batch|
AutoMerchantDetector.new(
client,
model: effective_model,
transactions: batch,
user_merchants: user_merchants,
custom_provider: custom_provider?,
langfuse_trace: trace,
family: family,
json_mode: json_mode
).auto_detect_merchants
end
upsert_langfuse_trace(trace: trace, output: result.map(&:to_h))
result
end
end
def enhance_provider_merchants(merchants: [], model: "", family: nil, json_mode: nil)
with_provider_response do
effective_model = model.presence || @default_model
trace = create_langfuse_trace(
name: "openai.enhance_provider_merchants",
input: { merchants: merchants }
)
batches = slice_for_context(merchants)
result = batches.flat_map do |batch|
ProviderMerchantEnhancer.new(
client,
model: effective_model,
merchants: batch,
custom_provider: custom_provider?,
langfuse_trace: trace,
family: family,
json_mode: json_mode
).enhance_merchants
end
upsert_langfuse_trace(trace: trace, output: result.map(&:to_h))
result
end
end
# Can be disabled via ENV for OpenAI-compatible endpoints that don't support vision
# Only vision-capable models (gpt-4o, gpt-4-turbo, gpt-4.1, etc.) support PDF input
def supports_pdf_processing?(model: @default_model)
return false unless ENV.fetch("OPENAI_SUPPORTS_PDF_PROCESSING", "true").to_s.downcase.in?(%w[true 1 yes])
# Custom providers manage their own model capabilities
return true if custom_provider?
# Check if the specified model supports vision/PDF input
VISION_CAPABLE_MODEL_PREFIXES.any? { |prefix| model.start_with?(prefix) }
end
def process_pdf(pdf_content:, model: "", family: nil)
with_provider_response do
effective_model = model.presence || @default_model
raise Error, "Model does not support PDF/vision processing: #{effective_model}" unless supports_pdf_processing?(model: effective_model)
trace = create_langfuse_trace(
name: "openai.process_pdf",
input: { pdf_size: pdf_content&.bytesize }
)
result = PdfProcessor.new(
client,
model: effective_model,
pdf_content: pdf_content,
custom_provider: custom_provider?,
langfuse_trace: trace,
family: family,
max_response_tokens: max_response_tokens
).process
upsert_langfuse_trace(trace: trace, output: result.to_h)
result
end
end
def extract_bank_statement(pdf_content:, model: "", family: nil)
with_provider_response do
effective_model = model.presence || @default_model
trace = create_langfuse_trace(
name: "openai.extract_bank_statement",
input: { pdf_size: pdf_content&.bytesize }
)
result = BankStatementExtractor.new(
client: client,
pdf_content: pdf_content,
model: effective_model
).extract
upsert_langfuse_trace(trace: trace, output: { transaction_count: result[:transactions].size })
result
end
end
def chat_response(
prompt,
model:,
instructions: nil,
functions: [],
function_results: [],
messages: nil,
conversation_history: [],
streamer: nil,
previous_response_id: nil,
session_id: nil,
user_identifier: nil,
family: nil
)
if supports_responses_endpoint?
# Native path uses the Responses API which chains history via
# `previous_response_id`; it does NOT need (and must not receive)
# inline message history in the input payload.
native_chat_response(
prompt: prompt,
model: model,
instructions: instructions,
functions: functions,
function_results: function_results,
streamer: streamer,
previous_response_id: previous_response_id,
session_id: session_id,
user_identifier: user_identifier,
family: family
)
else
generic_chat_response(
prompt: prompt,
model: model,
instructions: instructions,
functions: functions,
function_results: function_results,
messages: messages,
streamer: streamer,
session_id: session_id,
user_identifier: user_identifier,
family: family
)
end
end
private
attr_reader :client
# Returns the first positive integer among env, setting, default. Treats
# zero or negative values as "unset" and falls through — a 0-token budget
# is never what the user meant.
def positive_budget(env_value, setting_value, default)
from_env = env_value.to_s.strip.to_i
return from_env if from_env.positive?
return setting_value.to_i if setting_value.to_i.positive?
default
end
# Routes one-shot (non-chat) inputs through the BatchSlicer so large
# caller batches are split to fit the model's context window. `fixed` is
# the portion of the prompt that stays constant across every sub-batch
# (e.g. user_categories, user_merchants), used for fixed-tokens accounting.
def slice_for_context(items, fixed: nil)
BatchSlicer.call(
Array(items),
max_items: max_items_per_call,
max_tokens: max_input_tokens,
fixed_tokens: fixed ? Assistant::TokenEstimator.estimate(fixed) : 0
)
end
def native_chat_response(
prompt:,
model:,
instructions: nil,
functions: [],
function_results: [],
streamer: nil,
previous_response_id: nil,
session_id: nil,
user_identifier: nil,
family: nil
)
with_provider_response do
chat_config = ChatConfig.new(
functions: functions,
function_results: function_results
)
collected_chunks = []
# Proxy that converts raw stream to "LLM Provider concept" stream
stream_proxy = if streamer.present?
proc do |chunk|
parsed_chunk = ChatStreamParser.new(chunk).parsed
unless parsed_chunk.nil?
streamer.call(parsed_chunk)
collected_chunks << parsed_chunk
end
end
else
nil
end
input_payload = chat_config.build_input(prompt: prompt)
begin
raw_response = client.responses.create(parameters: {
model: model,
input: input_payload,
instructions: instructions,
tools: chat_config.tools,
previous_response_id: previous_response_id,
stream: stream_proxy
})
# If streaming, Ruby OpenAI does not return anything, so to normalize this method's API, we search
# for the "response chunk" in the stream and return it (it is already parsed)
if stream_proxy.present?
error_chunk = collected_chunks.find { |chunk| chunk.type == "error" }
response_chunk = collected_chunks.find { |chunk| chunk.type == "response" }
if response_chunk.nil?
raise Error.new(
build_stream_error_message(error_chunk),
details: error_chunk&.data&.details
)
end
response = response_chunk.data
usage = response_chunk.usage
Rails.logger.debug("Stream response usage: #{usage.inspect}")
log_langfuse_generation(
name: "chat_response",
model: model,
input: input_payload,
output: response.messages.map(&:output_text).join("\n"),
usage: usage,
session_id: session_id,
user_identifier: user_identifier
)
record_llm_usage(family: family, model: model, operation: "chat", usage: usage)
response
else
parsed = ChatParser.new(raw_response).parsed
Rails.logger.debug("Non-stream raw_response['usage']: #{raw_response['usage'].inspect}")
log_langfuse_generation(
name: "chat_response",
model: model,
input: input_payload,
output: parsed.messages.map(&:output_text).join("\n"),
usage: raw_response["usage"],
session_id: session_id,
user_identifier: user_identifier
)
record_llm_usage(family: family, model: model, operation: "chat", usage: raw_response["usage"])
parsed
end
rescue => e
log_langfuse_generation(
name: "chat_response",
model: model,
input: input_payload,
error: e,
session_id: session_id,
user_identifier: user_identifier
)
record_llm_usage(family: family, model: model, operation: "chat", error: e)
raise
end
end
end
def generic_chat_response(
prompt:,
model:,
instructions: nil,
functions: [],
function_results: [],
messages: nil,
streamer: nil,
session_id: nil,
user_identifier: nil,
family: nil
)
with_provider_response do
messages = build_generic_messages(
prompt: prompt,
instructions: instructions,
function_results: function_results,
messages: messages
)
tools = build_generic_tools(functions)
# Force synchronous calls for generic chat (streaming not supported for custom providers)
params = {
model: model,
messages: messages
}
params[:tools] = tools if tools.present?
begin
raw_response = client.chat(parameters: params)
parsed = GenericChatParser.new(raw_response).parsed
log_langfuse_generation(
name: "chat_response",
model: model,
input: messages,
output: parsed.messages.map(&:output_text).join("\n"),
usage: raw_response["usage"],
session_id: session_id,
user_identifier: user_identifier
)
record_llm_usage(family: family, model: model, operation: "chat", usage: raw_response["usage"])
# If a streamer was provided, manually call it with the parsed response
# to maintain the same contract as the streaming version
if streamer.present?
# Emit output_text chunks for each message
parsed.messages.each do |message|
if message.output_text.present?
streamer.call(Provider::LlmConcept::ChatStreamChunk.new(type: "output_text", data: message.output_text, usage: nil))
end
end
# Emit response chunk
streamer.call(Provider::LlmConcept::ChatStreamChunk.new(type: "response", data: parsed, usage: raw_response["usage"]))
end
parsed
rescue => e
log_langfuse_generation(
name: "chat_response",
model: model,
input: messages,
error: e,
session_id: session_id,
user_identifier: user_identifier
)
record_llm_usage(family: family, model: model, operation: "chat", error: e)
raise
end
end
end
def build_generic_messages(prompt:, instructions: nil, function_results: [], messages: nil)
payload = []
# Add system message if instructions present
if instructions.present?
payload << { role: "system", content: instructions }
end
# Add conversation history or user prompt. History is trimmed to fit the
# configured token budget so small-context local models (Ollama, LM Studio,
# LocalAI) don't silently truncate. tool_call/tool_result pairs are
# preserved atomically by HistoryTrimmer.
if messages.present?
trimmed = Assistant::HistoryTrimmer.new(messages, max_tokens: max_history_tokens).call
payload.concat(trimmed)
elsif prompt.present?
payload << { role: "user", content: prompt }
end
# If there are function results, we need to add the assistant message that made the tool calls
# followed by the tool messages with the results
if function_results.any?
# Build assistant message with tool_calls
tool_calls = function_results.map do |fn_result|
# Convert arguments to JSON string if it's not already a string
arguments = fn_result[:arguments]
arguments_str = arguments.is_a?(String) ? arguments : arguments.to_json
{
id: fn_result[:call_id],
type: "function",
function: {
name: fn_result[:name],
arguments: arguments_str
}
}
end
payload << {
role: "assistant",
content: "", # Some OpenAI-compatible APIs require string, not null
tool_calls: tool_calls
}
# Add function results as tool messages
function_results.each do |fn_result|
# Convert output to JSON string if it's not already a string
# OpenAI API requires content to be either a string or array of objects
# Handle nil explicitly to avoid serializing to "null"
output = fn_result[:output]
content = if output.nil?
""
elsif output.is_a?(String)
output
else
output.to_json
end
payload << {
role: "tool",
tool_call_id: fn_result[:call_id],
name: fn_result[:name],
content: content
}
end
end
payload
end
def build_generic_tools(functions)
return [] if functions.blank?
functions.map do |fn|
{
type: "function",
function: {
name: fn[:name],
description: fn[:description],
parameters: fn[:params_schema],
strict: fn[:strict]
}
}
end
end
def langfuse_client
return unless ENV["LANGFUSE_PUBLIC_KEY"].present? && ENV["LANGFUSE_SECRET_KEY"].present?
@langfuse_client = Langfuse.new
end
def create_langfuse_trace(name:, input:, session_id: nil, user_identifier: nil)
return unless langfuse_client
langfuse_client.trace(
name: name,
input: input,
session_id: session_id,
user_id: user_identifier,
environment: Rails.env
)
rescue => e
Rails.logger.warn("Langfuse trace creation failed: #{e.message}\n#{e.full_message}")
nil
end
def log_langfuse_generation(name:, model:, input:, output: nil, usage: nil, error: nil, session_id: nil, user_identifier: nil)
return unless langfuse_client
trace = create_langfuse_trace(
name: "openai.#{name}",
input: input,
session_id: session_id,
user_identifier: user_identifier
)
generation = trace&.generation(
name: name,
model: model,
input: input
)
if error
generation&.end(
output: { error: error.message, details: error.respond_to?(:details) ? error.details : nil },
level: "ERROR"
)
upsert_langfuse_trace(
trace: trace,
output: { error: error.message },
level: "ERROR"
)
else
generation&.end(output: output, usage: usage)
upsert_langfuse_trace(trace: trace, output: output)
end
rescue => e
Rails.logger.warn("Langfuse logging failed: #{e.message}\n#{e.full_message}")
end
def upsert_langfuse_trace(trace:, output:, level: nil)
return unless langfuse_client && trace&.id
payload = {
id: trace.id,
output: output
}
payload[:level] = level if level.present?
langfuse_client.trace(**payload)
rescue => e
Rails.logger.warn("Langfuse trace upsert failed for trace_id=#{trace&.id}: #{e.message}\n#{e.full_message}")
nil
end
def record_llm_usage(family:, model:, operation:, usage: nil, error: nil)
return unless family
# For error cases, record with zero tokens
if error.present?
Rails.logger.info("Recording failed LLM usage - Error: #{safe_error_message(error)}")
# Extract HTTP status code if available from the error
http_status_code = extract_http_status_code(error)
inferred_provider = LlmUsage.infer_provider(model)
family.llm_usages.create!(
provider: inferred_provider,
model: model,
operation: operation,
prompt_tokens: 0,
completion_tokens: 0,
total_tokens: 0,
estimated_cost: nil,
metadata: {
error: safe_error_message(error),
http_status_code: http_status_code
}
)
Rails.logger.info("Failed LLM usage recorded successfully - Status: #{http_status_code}")
return
end
return unless usage
Rails.logger.info("Recording LLM usage - Raw usage data: #{usage.inspect}")
# Handle both old and new OpenAI API response formats
# Old format: prompt_tokens, completion_tokens, total_tokens
# New format: input_tokens, output_tokens, total_tokens
prompt_tokens = usage["prompt_tokens"] || usage["input_tokens"] || 0
completion_tokens = usage["completion_tokens"] || usage["output_tokens"] || 0
total_tokens = usage["total_tokens"] || 0
Rails.logger.info("Extracted tokens - prompt: #{prompt_tokens}, completion: #{completion_tokens}, total: #{total_tokens}")
estimated_cost = LlmUsage.calculate_cost(
model: model,
prompt_tokens: prompt_tokens,
completion_tokens: completion_tokens
)
# Log when we can't estimate the cost (e.g., custom/self-hosted models)
if estimated_cost.nil?
Rails.logger.info("Recording LLM usage without cost estimate for unknown model: #{model} (custom provider: #{custom_provider?})")
end
inferred_provider = LlmUsage.infer_provider(model)
family.llm_usages.create!(
provider: inferred_provider,
model: model,
operation: operation,
prompt_tokens: prompt_tokens,
completion_tokens: completion_tokens,
total_tokens: total_tokens,
estimated_cost: estimated_cost,
metadata: {}
)
Rails.logger.info("LLM usage recorded successfully - Cost: #{estimated_cost.inspect}")
rescue => e
Rails.logger.error("Failed to record LLM usage: #{e.message}")
end
def extract_http_status_code(error)
# Try to extract HTTP status code from various error types
# OpenAI gem errors may have status codes in different formats
if error.respond_to?(:code)
error.code
elsif error.respond_to?(:http_status)
error.http_status
elsif error.respond_to?(:status_code)
error.status_code
elsif error.respond_to?(:response) && error.response.respond_to?(:code)
error.response.code.to_i
elsif safe_error_message(error) =~ /(\d{3})/
# Extract 3-digit HTTP status code from error message
$1.to_i
else
nil
end
end
def safe_error_message(error)
error&.message
rescue => e
"(message unavailable: #{e.class})"
end
# Builds a useful error message when the OpenAI Responses stream ended
# without delivering a `response.completed` event. Uses upstream details
# when present (e.g. `response.failed`, `response.incomplete`, top-level
# `error`) and falls back to a generic message that hints at the most
# common causes.
def build_stream_error_message(error_chunk)
if error_chunk&.data&.message.present?
upstream = error_chunk.data
prefix = case upstream.event
when "response.incomplete" then "OpenAI response was incomplete"
when "response.failed" then "OpenAI response failed"
else "OpenAI returned an error"
end
code_suffix = upstream.code.present? ? " [#{upstream.code}]" : ""
"#{prefix}#{code_suffix}: #{upstream.message}"
else
"OpenAI stream ended without a completion event. " \
"This usually means the upstream call was cut short — common causes: " \
"expired previous_response_id (Responses API state TTL), context-length overflow, " \
"or a transient OpenAI error."
end
end
end