fix(ai): address local review on Anthropic foundation

- Provider::Anthropic#supports_pdf_processing? bypasses prefix gate for
  custom endpoints, mirroring supports_model?
- Provider::Anthropic#initialize raises Error when custom_endpoint? AND
  model.blank?, parity with Provider::Openai
- stream_chat_response captures partial usage on mid-stream errors and
  records it via the new on_partial callback so chat_response can skip
  the duplicate error row in the outer rescue
- safe_accumulated_message swallows the secondary failure when the SDK
  cannot reconstruct a snapshot
- langfuse_client memoizes properly (||= instead of =) so repeated calls
  don't churn Langfuse instances
- MessageFormatter sorts tool_calls by created_at then id so the
  message array is deterministic across replays; skips tool_calls
  missing both provider_call_id and provider_id rather than sending
  `id: nil` and getting rejected by Anthropic
- Setting.anthropic_access_token default falls back through
  ENV["ANTHROPIC_API_KEY"].presence (was missing .presence, so an
  empty-string env value bled through)
- User#openai_configured? / #anthropic_configured? delegate to the
  Provider::* class methods — single source of truth
- Assistant::Responder renames the OpenAI-shape history builder
  conversation_history → openai_messages_payload so the kwarg name
  matches the local method name (messages: openai_messages_payload,
  conversation_history: chat_message_records)
- Assistant::Builtin stale-history comment updated to reference both
  builders

Adds a streaming chat_response test using ad-hoc subclasses of the
SDK event types so the case/when dispatch matches via is_a? without
stubbing class-level === behavior.
This commit is contained in:
Guillem Arias
2026-05-25 20:27:59 +02:00
parent a0c552cb38
commit 66753319a7
7 changed files with 133 additions and 27 deletions

View File

@@ -60,7 +60,7 @@ class Assistant::Builtin < Assistant::Base
if assistant_message.content.blank?
assistant_message.destroy
else
# Demote partially-streamed turns to `failed` so `Responder#conversation_history` excludes them.
# Demote partially-streamed turns to `failed` so the responder's history builders (`#openai_messages_payload`, `#chat_message_records`) exclude them.
assistant_message.update_columns(status: "failed")
end
end

View File

@@ -79,7 +79,7 @@ class Assistant::Responder
instructions: instructions,
functions: function_tool_caller.function_definitions,
function_results: function_results,
messages: conversation_history,
messages: openai_messages_payload,
conversation_history: chat_message_records,
streamer: streamer,
previous_response_id: previous_response_id,
@@ -119,7 +119,7 @@ class Assistant::Responder
# Raw Message records preceding the current turn — providers that build
# their own native message shape (Anthropic) consume this directly so they
# do not have to round-trip through the OpenAI-shaped `conversation_history`.
# do not have to round-trip through the OpenAI-shaped payload below.
def chat_message_records
return [] unless chat&.messages
@@ -131,7 +131,10 @@ class Assistant::Responder
.to_a
end
def conversation_history
# Builds the OpenAI-shaped messages payload (role: "user" | "assistant" |
# "tool"; tool_call_id pairing) consumed by Provider::Openai's generic
# chat path. Anthropic uses chat_message_records instead.
def openai_messages_payload
messages = []
return messages unless chat&.messages

View File

@@ -32,6 +32,11 @@ class Provider::Anthropic < Provider
@client = ::Anthropic::Client.new(**client_options)
@base_url = base_url
if custom_endpoint? && model.blank?
raise Error, "Model is required when using a custom Anthropic-compatible endpoint"
end
@default_model = model.presence || DEFAULT_MODEL
end
@@ -76,6 +81,8 @@ class Provider::Anthropic < Provider
end
def supports_pdf_processing?(model: @default_model)
return true if custom_endpoint?
VISION_CAPABLE_MODEL_PREFIXES.any? { |prefix| model.to_s.start_with?(prefix) }
end
@@ -120,10 +127,19 @@ class Provider::Anthropic < Provider
user_identifier: user_identifier
)
partial_usage_recorded = false
begin
parsed, usage =
if streamer.present?
stream_chat_response(streamer: streamer, request_params: request_params)
stream_chat_response(
streamer: streamer,
request_params: request_params,
on_partial: ->(partial_usage) {
record_llm_usage(family: family, model: model, operation: "chat", usage: partial_usage)
partial_usage_recorded = true
}
)
else
sync_chat_response(request_params: request_params)
end
@@ -147,7 +163,7 @@ class Provider::Anthropic < Provider
error: e,
trace: trace
)
record_llm_usage(family: family, model: model, operation: "chat", error: e)
record_llm_usage(family: family, model: model, operation: "chat", error: e) unless partial_usage_recorded
raise
end
end
@@ -167,22 +183,31 @@ class Provider::Anthropic < Provider
[ parsed, usage ]
end
def stream_chat_response(streamer:, request_params:)
def stream_chat_response(streamer:, request_params:, on_partial: nil)
final_message = nil
stream = client.messages.stream(**request_params)
stream.each do |event|
case event
when ::Anthropic::Streaming::TextEvent
streamer.call(
Provider::LlmConcept::ChatStreamChunk.new(type: "output_text", data: event.text, usage: nil)
)
when ::Anthropic::Streaming::MessageStopEvent
final_message = event.message
# If `stream.each` raises mid-iteration (network drop, client abort),
# we still want to surface whatever tokens accumulated so the cost
# ledger doesn't lose partial-output billing.
begin
stream.each do |event|
case event
when ::Anthropic::Streaming::TextEvent
streamer.call(
Provider::LlmConcept::ChatStreamChunk.new(type: "output_text", data: event.text, usage: nil)
)
when ::Anthropic::Streaming::MessageStopEvent
final_message = event.message
end
end
rescue => mid_stream_error
partial = safe_accumulated_message(stream)
on_partial&.call(build_usage_hash(partial&.usage)) if partial
raise mid_stream_error
end
final_message ||= stream.accumulated_message
final_message ||= safe_accumulated_message(stream)
parsed = ChatParser.new(final_message).parsed
usage = build_usage_hash(final_message.usage)
@@ -193,6 +218,12 @@ class Provider::Anthropic < Provider
[ parsed, usage ]
end
def safe_accumulated_message(stream)
stream.accumulated_message
rescue StandardError
nil
end
def build_usage_hash(raw_usage)
return {} unless raw_usage
@@ -217,7 +248,7 @@ class Provider::Anthropic < Provider
def langfuse_client
return unless ENV["LANGFUSE_PUBLIC_KEY"].present? && ENV["LANGFUSE_SECRET_KEY"].present?
@langfuse_client = Langfuse.new
@langfuse_client ||= Langfuse.new
end
def create_langfuse_trace(name:, input:, session_id: nil, user_identifier: nil)

View File

@@ -40,9 +40,19 @@ class Provider::Anthropic::MessageFormatter
end
private
# ToolCall records have no association-level order; enforce
# chronological order here so message arrays are deterministic across
# replays and Anthropic sees tool_use blocks in the order the model
# originally emitted them.
def ordered_tool_calls(assistant_message)
assistant_message.tool_calls.sort_by { |tc| [ tc.created_at || Time.zone.at(0), tc.id.to_s ] }
end
def assistant_history_blocks(assistant_message)
tool_calls = ordered_tool_calls(assistant_message).select { |tc| tool_call_id(tc).present? }
blocks = []
blocks.concat(assistant_message.tool_calls.map { |tc| tool_use_block_from_record(tc) }) if assistant_message.tool_calls.any?
blocks.concat(tool_calls.map { |tc| tool_use_block_from_record(tc) }) if tool_calls.any?
blocks << { type: "text", text: assistant_message.content.to_s } if assistant_message.content.present?
return [] if blocks.empty?
@@ -51,20 +61,26 @@ class Provider::Anthropic::MessageFormatter
# If the assistant turn used tools, Anthropic requires a user turn with
# matching tool_result blocks before the next assistant turn.
if assistant_message.tool_calls.any?
if tool_calls.any?
result << {
role: "user",
content: assistant_message.tool_calls.map { |tc| tool_result_block_from_record(tc) }
content: tool_calls.map { |tc| tool_result_block_from_record(tc) }
}
end
result
end
# tool_use_id is required; skip tool_calls missing both identifiers
# rather than sending `id: nil` and getting rejected by Anthropic.
def tool_call_id(tool_call)
tool_call.provider_call_id.presence || tool_call.provider_id.presence
end
def tool_use_block_from_record(tool_call)
{
type: "tool_use",
id: tool_call.provider_call_id || tool_call.provider_id,
id: tool_call_id(tool_call),
name: tool_call.function_name,
input: parse_arguments(tool_call.function_arguments)
}
@@ -73,7 +89,7 @@ class Provider::Anthropic::MessageFormatter
def tool_result_block_from_record(tool_call)
{
type: "tool_result",
tool_use_id: tool_call.provider_call_id || tool_call.provider_id,
tool_use_id: tool_call_id(tool_call),
content: serialize_output(tool_call.function_result)
}
end

View File

@@ -10,7 +10,7 @@ class Setting < RailsSettings::Base
field :openai_uri_base, type: :string, default: ENV["OPENAI_URI_BASE"]
field :openai_model, type: :string, default: ENV["OPENAI_MODEL"]
field :openai_json_mode, type: :string, default: ENV["LLM_JSON_MODE"]
field :anthropic_access_token, type: :string, default: ENV["ANTHROPIC_ACCESS_TOKEN"].presence || ENV["ANTHROPIC_API_KEY"]
field :anthropic_access_token, type: :string, default: ENV["ANTHROPIC_ACCESS_TOKEN"].presence || ENV["ANTHROPIC_API_KEY"].presence
field :anthropic_model, type: :string, default: ENV["ANTHROPIC_MODEL"]
field :anthropic_base_url, type: :string, default: ENV["ANTHROPIC_BASE_URL"]
field :llm_provider, type: :string, default: ENV.fetch("LLM_PROVIDER", "openai")

View File

@@ -162,13 +162,11 @@ class User < ApplicationRecord
end
def openai_configured?
ENV["OPENAI_ACCESS_TOKEN"].present? || Setting.openai_access_token.present?
Provider::Openai.configured?
end
def anthropic_configured?
ENV["ANTHROPIC_ACCESS_TOKEN"].present? ||
ENV["ANTHROPIC_API_KEY"].present? ||
Setting.anthropic_access_token.present?
Provider::Anthropic.configured?
end
def ai_enabled?

View File

@@ -144,6 +144,64 @@ class Provider::AnthropicTest < ActiveSupport::TestCase
assert_empty response.data.function_requests
end
test "chat_response streams text deltas and emits a final response chunk" do
final_message = build_anthropic_message(
id: "msg_stream",
model: @subject_model,
text_blocks: [ "Hello world" ],
tool_use_blocks: [],
usage: { input_tokens: 7, output_tokens: 3 }
)
# Use ad-hoc subclasses of the SDK event types so the case/when dispatch
# inside `stream_chat_response` matches them via `is_a?` without needing
# to stub class-level `===` behavior.
text_event_cls = Class.new(::Anthropic::Streaming::TextEvent) do
def initialize(text:, snapshot:)
@text = text
@snapshot = snapshot
end
attr_reader :text, :snapshot
end
stop_event_cls = Class.new(::Anthropic::Streaming::MessageStopEvent) do
def initialize(message:)
@message = message
end
attr_reader :message
end
events = [
text_event_cls.new(text: "Hello ", snapshot: "Hello "),
text_event_cls.new(text: "world", snapshot: "Hello world"),
stop_event_cls.new(message: final_message)
]
fake_stream = mock
fake_stream.stubs(:each).multiple_yields(*events.map { |e| [ e ] })
fake_stream.stubs(:accumulated_message).returns(final_message)
messages = mock
messages.stubs(:stream).returns(fake_stream)
client = mock
client.stubs(:messages).returns(messages)
@subject.instance_variable_set(:@client, client)
collected = []
response = @subject.chat_response(
"hi",
model: @subject_model,
streamer: ->(chunk) { collected << chunk }
)
assert response.success?
text_chunks = collected.select { |c| c.type == "output_text" }
response_chunks = collected.select { |c| c.type == "response" }
assert_equal 2, text_chunks.size
assert_equal [ "Hello ", "world" ], text_chunks.map(&:data)
assert_equal 1, response_chunks.size
assert_equal "msg_stream", response_chunks.first.data.id
assert_equal 10, response_chunks.first.usage["total_tokens"]
end
test "chat_response surfaces tool_use blocks as function_requests" do
fake_client = stub_anthropic_client_with(
build_anthropic_message(