From 66753319a7489d538e7b582ff2f42fe56feb46dc Mon Sep 17 00:00:00 2001 From: Guillem Arias Date: Mon, 25 May 2026 20:27:59 +0200 Subject: [PATCH] fix(ai): address local review on Anthropic foundation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Provider::Anthropic#supports_pdf_processing? bypasses prefix gate for custom endpoints, mirroring supports_model? - Provider::Anthropic#initialize raises Error when custom_endpoint? AND model.blank?, parity with Provider::Openai - stream_chat_response captures partial usage on mid-stream errors and records it via the new on_partial callback so chat_response can skip the duplicate error row in the outer rescue - safe_accumulated_message swallows the secondary failure when the SDK cannot reconstruct a snapshot - langfuse_client memoizes properly (||= instead of =) so repeated calls don't churn Langfuse instances - MessageFormatter sorts tool_calls by created_at then id so the message array is deterministic across replays; skips tool_calls missing both provider_call_id and provider_id rather than sending `id: nil` and getting rejected by Anthropic - Setting.anthropic_access_token default falls back through ENV["ANTHROPIC_API_KEY"].presence (was missing .presence, so an empty-string env value bled through) - User#openai_configured? / #anthropic_configured? delegate to the Provider::* class methods — single source of truth - Assistant::Responder renames the OpenAI-shape history builder conversation_history → openai_messages_payload so the kwarg name matches the local method name (messages: openai_messages_payload, conversation_history: chat_message_records) - Assistant::Builtin stale-history comment updated to reference both builders Adds a streaming chat_response test using ad-hoc subclasses of the SDK event types so the case/when dispatch matches via is_a? without stubbing class-level === behavior. --- app/models/assistant/builtin.rb | 2 +- app/models/assistant/responder.rb | 9 ++- app/models/provider/anthropic.rb | 57 +++++++++++++----- .../provider/anthropic/message_formatter.rb | 26 +++++++-- app/models/setting.rb | 2 +- app/models/user.rb | 6 +- test/models/provider/anthropic_test.rb | 58 +++++++++++++++++++ 7 files changed, 133 insertions(+), 27 deletions(-) diff --git a/app/models/assistant/builtin.rb b/app/models/assistant/builtin.rb index 6a1ae93c9..16130bc38 100644 --- a/app/models/assistant/builtin.rb +++ b/app/models/assistant/builtin.rb @@ -60,7 +60,7 @@ class Assistant::Builtin < Assistant::Base if assistant_message.content.blank? assistant_message.destroy else - # Demote partially-streamed turns to `failed` so `Responder#conversation_history` excludes them. + # Demote partially-streamed turns to `failed` so the responder's history builders (`#openai_messages_payload`, `#chat_message_records`) exclude them. assistant_message.update_columns(status: "failed") end end diff --git a/app/models/assistant/responder.rb b/app/models/assistant/responder.rb index 406993ab7..a5950a51a 100644 --- a/app/models/assistant/responder.rb +++ b/app/models/assistant/responder.rb @@ -79,7 +79,7 @@ class Assistant::Responder instructions: instructions, functions: function_tool_caller.function_definitions, function_results: function_results, - messages: conversation_history, + messages: openai_messages_payload, conversation_history: chat_message_records, streamer: streamer, previous_response_id: previous_response_id, @@ -119,7 +119,7 @@ class Assistant::Responder # Raw Message records preceding the current turn — providers that build # their own native message shape (Anthropic) consume this directly so they - # do not have to round-trip through the OpenAI-shaped `conversation_history`. + # do not have to round-trip through the OpenAI-shaped payload below. def chat_message_records return [] unless chat&.messages @@ -131,7 +131,10 @@ class Assistant::Responder .to_a end - def conversation_history + # Builds the OpenAI-shaped messages payload (role: "user" | "assistant" | + # "tool"; tool_call_id pairing) consumed by Provider::Openai's generic + # chat path. Anthropic uses chat_message_records instead. + def openai_messages_payload messages = [] return messages unless chat&.messages diff --git a/app/models/provider/anthropic.rb b/app/models/provider/anthropic.rb index 1344ca333..181f1a76f 100644 --- a/app/models/provider/anthropic.rb +++ b/app/models/provider/anthropic.rb @@ -32,6 +32,11 @@ class Provider::Anthropic < Provider @client = ::Anthropic::Client.new(**client_options) @base_url = base_url + + if custom_endpoint? && model.blank? + raise Error, "Model is required when using a custom Anthropic-compatible endpoint" + end + @default_model = model.presence || DEFAULT_MODEL end @@ -76,6 +81,8 @@ class Provider::Anthropic < Provider end def supports_pdf_processing?(model: @default_model) + return true if custom_endpoint? + VISION_CAPABLE_MODEL_PREFIXES.any? { |prefix| model.to_s.start_with?(prefix) } end @@ -120,10 +127,19 @@ class Provider::Anthropic < Provider user_identifier: user_identifier ) + partial_usage_recorded = false + begin parsed, usage = if streamer.present? - stream_chat_response(streamer: streamer, request_params: request_params) + stream_chat_response( + streamer: streamer, + request_params: request_params, + on_partial: ->(partial_usage) { + record_llm_usage(family: family, model: model, operation: "chat", usage: partial_usage) + partial_usage_recorded = true + } + ) else sync_chat_response(request_params: request_params) end @@ -147,7 +163,7 @@ class Provider::Anthropic < Provider error: e, trace: trace ) - record_llm_usage(family: family, model: model, operation: "chat", error: e) + record_llm_usage(family: family, model: model, operation: "chat", error: e) unless partial_usage_recorded raise end end @@ -167,22 +183,31 @@ class Provider::Anthropic < Provider [ parsed, usage ] end - def stream_chat_response(streamer:, request_params:) + def stream_chat_response(streamer:, request_params:, on_partial: nil) final_message = nil stream = client.messages.stream(**request_params) - stream.each do |event| - case event - when ::Anthropic::Streaming::TextEvent - streamer.call( - Provider::LlmConcept::ChatStreamChunk.new(type: "output_text", data: event.text, usage: nil) - ) - when ::Anthropic::Streaming::MessageStopEvent - final_message = event.message + # If `stream.each` raises mid-iteration (network drop, client abort), + # we still want to surface whatever tokens accumulated so the cost + # ledger doesn't lose partial-output billing. + begin + stream.each do |event| + case event + when ::Anthropic::Streaming::TextEvent + streamer.call( + Provider::LlmConcept::ChatStreamChunk.new(type: "output_text", data: event.text, usage: nil) + ) + when ::Anthropic::Streaming::MessageStopEvent + final_message = event.message + end end + rescue => mid_stream_error + partial = safe_accumulated_message(stream) + on_partial&.call(build_usage_hash(partial&.usage)) if partial + raise mid_stream_error end - final_message ||= stream.accumulated_message + final_message ||= safe_accumulated_message(stream) parsed = ChatParser.new(final_message).parsed usage = build_usage_hash(final_message.usage) @@ -193,6 +218,12 @@ class Provider::Anthropic < Provider [ parsed, usage ] end + def safe_accumulated_message(stream) + stream.accumulated_message + rescue StandardError + nil + end + def build_usage_hash(raw_usage) return {} unless raw_usage @@ -217,7 +248,7 @@ class Provider::Anthropic < Provider def langfuse_client return unless ENV["LANGFUSE_PUBLIC_KEY"].present? && ENV["LANGFUSE_SECRET_KEY"].present? - @langfuse_client = Langfuse.new + @langfuse_client ||= Langfuse.new end def create_langfuse_trace(name:, input:, session_id: nil, user_identifier: nil) diff --git a/app/models/provider/anthropic/message_formatter.rb b/app/models/provider/anthropic/message_formatter.rb index e9288e376..b6ba2717f 100644 --- a/app/models/provider/anthropic/message_formatter.rb +++ b/app/models/provider/anthropic/message_formatter.rb @@ -40,9 +40,19 @@ class Provider::Anthropic::MessageFormatter end private + # ToolCall records have no association-level order; enforce + # chronological order here so message arrays are deterministic across + # replays and Anthropic sees tool_use blocks in the order the model + # originally emitted them. + def ordered_tool_calls(assistant_message) + assistant_message.tool_calls.sort_by { |tc| [ tc.created_at || Time.zone.at(0), tc.id.to_s ] } + end + def assistant_history_blocks(assistant_message) + tool_calls = ordered_tool_calls(assistant_message).select { |tc| tool_call_id(tc).present? } + blocks = [] - blocks.concat(assistant_message.tool_calls.map { |tc| tool_use_block_from_record(tc) }) if assistant_message.tool_calls.any? + blocks.concat(tool_calls.map { |tc| tool_use_block_from_record(tc) }) if tool_calls.any? blocks << { type: "text", text: assistant_message.content.to_s } if assistant_message.content.present? return [] if blocks.empty? @@ -51,20 +61,26 @@ class Provider::Anthropic::MessageFormatter # If the assistant turn used tools, Anthropic requires a user turn with # matching tool_result blocks before the next assistant turn. - if assistant_message.tool_calls.any? + if tool_calls.any? result << { role: "user", - content: assistant_message.tool_calls.map { |tc| tool_result_block_from_record(tc) } + content: tool_calls.map { |tc| tool_result_block_from_record(tc) } } end result end + # tool_use_id is required; skip tool_calls missing both identifiers + # rather than sending `id: nil` and getting rejected by Anthropic. + def tool_call_id(tool_call) + tool_call.provider_call_id.presence || tool_call.provider_id.presence + end + def tool_use_block_from_record(tool_call) { type: "tool_use", - id: tool_call.provider_call_id || tool_call.provider_id, + id: tool_call_id(tool_call), name: tool_call.function_name, input: parse_arguments(tool_call.function_arguments) } @@ -73,7 +89,7 @@ class Provider::Anthropic::MessageFormatter def tool_result_block_from_record(tool_call) { type: "tool_result", - tool_use_id: tool_call.provider_call_id || tool_call.provider_id, + tool_use_id: tool_call_id(tool_call), content: serialize_output(tool_call.function_result) } end diff --git a/app/models/setting.rb b/app/models/setting.rb index b4d80c007..e6879ed8d 100644 --- a/app/models/setting.rb +++ b/app/models/setting.rb @@ -10,7 +10,7 @@ class Setting < RailsSettings::Base field :openai_uri_base, type: :string, default: ENV["OPENAI_URI_BASE"] field :openai_model, type: :string, default: ENV["OPENAI_MODEL"] field :openai_json_mode, type: :string, default: ENV["LLM_JSON_MODE"] - field :anthropic_access_token, type: :string, default: ENV["ANTHROPIC_ACCESS_TOKEN"].presence || ENV["ANTHROPIC_API_KEY"] + field :anthropic_access_token, type: :string, default: ENV["ANTHROPIC_ACCESS_TOKEN"].presence || ENV["ANTHROPIC_API_KEY"].presence field :anthropic_model, type: :string, default: ENV["ANTHROPIC_MODEL"] field :anthropic_base_url, type: :string, default: ENV["ANTHROPIC_BASE_URL"] field :llm_provider, type: :string, default: ENV.fetch("LLM_PROVIDER", "openai") diff --git a/app/models/user.rb b/app/models/user.rb index f6f059eb5..e585d9d56 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -162,13 +162,11 @@ class User < ApplicationRecord end def openai_configured? - ENV["OPENAI_ACCESS_TOKEN"].present? || Setting.openai_access_token.present? + Provider::Openai.configured? end def anthropic_configured? - ENV["ANTHROPIC_ACCESS_TOKEN"].present? || - ENV["ANTHROPIC_API_KEY"].present? || - Setting.anthropic_access_token.present? + Provider::Anthropic.configured? end def ai_enabled? diff --git a/test/models/provider/anthropic_test.rb b/test/models/provider/anthropic_test.rb index c0cc158b5..24d96bb54 100644 --- a/test/models/provider/anthropic_test.rb +++ b/test/models/provider/anthropic_test.rb @@ -144,6 +144,64 @@ class Provider::AnthropicTest < ActiveSupport::TestCase assert_empty response.data.function_requests end + test "chat_response streams text deltas and emits a final response chunk" do + final_message = build_anthropic_message( + id: "msg_stream", + model: @subject_model, + text_blocks: [ "Hello world" ], + tool_use_blocks: [], + usage: { input_tokens: 7, output_tokens: 3 } + ) + # Use ad-hoc subclasses of the SDK event types so the case/when dispatch + # inside `stream_chat_response` matches them via `is_a?` without needing + # to stub class-level `===` behavior. + text_event_cls = Class.new(::Anthropic::Streaming::TextEvent) do + def initialize(text:, snapshot:) + @text = text + @snapshot = snapshot + end + attr_reader :text, :snapshot + end + stop_event_cls = Class.new(::Anthropic::Streaming::MessageStopEvent) do + def initialize(message:) + @message = message + end + attr_reader :message + end + events = [ + text_event_cls.new(text: "Hello ", snapshot: "Hello "), + text_event_cls.new(text: "world", snapshot: "Hello world"), + stop_event_cls.new(message: final_message) + ] + + fake_stream = mock + fake_stream.stubs(:each).multiple_yields(*events.map { |e| [ e ] }) + fake_stream.stubs(:accumulated_message).returns(final_message) + + messages = mock + messages.stubs(:stream).returns(fake_stream) + client = mock + client.stubs(:messages).returns(messages) + @subject.instance_variable_set(:@client, client) + + collected = [] + response = @subject.chat_response( + "hi", + model: @subject_model, + streamer: ->(chunk) { collected << chunk } + ) + + assert response.success? + text_chunks = collected.select { |c| c.type == "output_text" } + response_chunks = collected.select { |c| c.type == "response" } + + assert_equal 2, text_chunks.size + assert_equal [ "Hello ", "world" ], text_chunks.map(&:data) + assert_equal 1, response_chunks.size + assert_equal "msg_stream", response_chunks.first.data.id + assert_equal 10, response_chunks.first.usage["total_tokens"] + end + test "chat_response surfaces tool_use blocks as function_requests" do fake_client = stub_anthropic_client_with( build_anthropic_message(