From 9cc52b9d3582d833b1f95fd2d2f60bb1c228ddc8 Mon Sep 17 00:00:00 2001 From: GermanDZ Date: Tue, 5 May 2026 01:22:05 +0200 Subject: [PATCH] fix: handle OpenAI Responses API stream errors instead of crashing (#1669) The streaming code assumed every stream produced a `response.completed` event and dereferenced its data unconditionally, causing `undefined method 'data' for nil` whenever OpenAI emitted `response.failed`, `response.incomplete`, or a top-level `error` event (e.g. expired `previous_response_id`, context-window overflow, transient upstream failures). Surface a descriptive `Provider::Error` instead. - Extend `ChatStreamParser` to recognise `response.failed`, `response.incomplete`, and `error` events and emit an `error` chunk with a `StreamErrorData` payload (event, message, code, details). - In `Provider::Openai#native_chat_response`, detect the missing `response` chunk, build a user-facing error message from the collected error chunk, and raise `Provider::Error`. - Add unit tests for the parser (8 cases) and integration tests for the error path in the chat response flow. Co-authored-by: Claude Opus 4.7 (1M context) --- app/models/provider/openai.rb | 32 +++++++ .../provider/openai/chat_stream_parser.rb | 35 +++++++ .../openai/chat_stream_parser_test.rb | 95 +++++++++++++++++++ test/models/provider/openai_test.rb | 48 ++++++++++ 4 files changed, 210 insertions(+) create mode 100644 test/models/provider/openai/chat_stream_parser_test.rb diff --git a/app/models/provider/openai.rb b/app/models/provider/openai.rb index 052829cc1..0c04f63e1 100644 --- a/app/models/provider/openai.rb +++ b/app/models/provider/openai.rb @@ -373,7 +373,16 @@ class Provider::Openai < Provider # If streaming, Ruby OpenAI does not return anything, so to normalize this method's API, we search # for the "response chunk" in the stream and return it (it is already parsed) if stream_proxy.present? + error_chunk = collected_chunks.find { |chunk| chunk.type == "error" } response_chunk = collected_chunks.find { |chunk| chunk.type == "response" } + + if response_chunk.nil? + raise Error.new( + build_stream_error_message(error_chunk), + details: error_chunk&.data&.details + ) + end + response = response_chunk.data usage = response_chunk.usage Rails.logger.debug("Stream response usage: #{usage.inspect}") @@ -744,4 +753,27 @@ class Provider::Openai < Provider rescue => e "(message unavailable: #{e.class})" end + + # Builds a useful error message when the OpenAI Responses stream ended + # without delivering a `response.completed` event. Uses upstream details + # when present (e.g. `response.failed`, `response.incomplete`, top-level + # `error`) and falls back to a generic message that hints at the most + # common causes. + def build_stream_error_message(error_chunk) + if error_chunk&.data&.message.present? + upstream = error_chunk.data + prefix = case upstream.event + when "response.incomplete" then "OpenAI response was incomplete" + when "response.failed" then "OpenAI response failed" + else "OpenAI returned an error" + end + code_suffix = upstream.code.present? ? " [#{upstream.code}]" : "" + "#{prefix}#{code_suffix}: #{upstream.message}" + else + "OpenAI stream ended without a completion event. " \ + "This usually means the upstream call was cut short — common causes: " \ + "expired previous_response_id (Responses API state TTL), context-length overflow, " \ + "or a transient OpenAI error." + end + end end diff --git a/app/models/provider/openai/chat_stream_parser.rb b/app/models/provider/openai/chat_stream_parser.rb index dcfe44207..294a317a8 100644 --- a/app/models/provider/openai/chat_stream_parser.rb +++ b/app/models/provider/openai/chat_stream_parser.rb @@ -1,6 +1,8 @@ class Provider::Openai::ChatStreamParser Error = Class.new(StandardError) + StreamErrorData = Data.define(:event, :message, :code, :details) + def initialize(object) @object = object end @@ -15,6 +17,21 @@ class Provider::Openai::ChatStreamParser raw_response = object.dig("response") usage = raw_response.dig("usage") Chunk.new(type: "response", data: parse_response(raw_response), usage: usage) + when "response.failed" + Chunk.new(type: "error", data: build_response_error("response.failed"), usage: nil) + when "response.incomplete" + Chunk.new(type: "error", data: build_response_error("response.incomplete"), usage: nil) + when "error" + Chunk.new( + type: "error", + data: StreamErrorData.new( + event: "error", + message: object.dig("message").presence || "OpenAI stream returned an error event", + code: object.dig("code"), + details: object + ), + usage: nil + ) end end @@ -26,4 +43,22 @@ class Provider::Openai::ChatStreamParser def parse_response(response) Provider::Openai::ChatParser.new(response).parsed end + + def build_response_error(event) + raw_response = object.dig("response") || {} + error_message = + raw_response.dig("error", "message").presence || + raw_response.dig("incomplete_details", "reason").presence || + "OpenAI stream ended with #{event}" + code = + raw_response.dig("error", "code") || + raw_response.dig("incomplete_details", "reason") + + StreamErrorData.new( + event: event, + message: error_message, + code: code, + details: raw_response + ) + end end diff --git a/test/models/provider/openai/chat_stream_parser_test.rb b/test/models/provider/openai/chat_stream_parser_test.rb new file mode 100644 index 000000000..67cef1d98 --- /dev/null +++ b/test/models/provider/openai/chat_stream_parser_test.rb @@ -0,0 +1,95 @@ +require "test_helper" + +class Provider::Openai::ChatStreamParserTest < ActiveSupport::TestCase + test "parses output_text delta" do + chunk = Provider::Openai::ChatStreamParser.new( + { "type" => "response.output_text.delta", "delta" => "Hello" } + ).parsed + + assert_equal "output_text", chunk.type + assert_equal "Hello", chunk.data + end + + test "parses refusal delta as output_text" do + chunk = Provider::Openai::ChatStreamParser.new( + { "type" => "response.refusal.delta", "delta" => "I cannot..." } + ).parsed + + assert_equal "output_text", chunk.type + assert_equal "I cannot...", chunk.data + end + + test "returns nil for unknown event types" do + assert_nil Provider::Openai::ChatStreamParser.new({ "type" => "response.created" }).parsed + assert_nil Provider::Openai::ChatStreamParser.new({ "type" => "response.in_progress" }).parsed + end + + test "response.failed produces an error chunk with upstream message and code" do + chunk = Provider::Openai::ChatStreamParser.new( + { + "type" => "response.failed", + "response" => { + "error" => { "message" => "Previous response not found", "code" => "previous_response_not_found" } + } + } + ).parsed + + assert_equal "error", chunk.type + assert_equal "response.failed", chunk.data.event + assert_equal "Previous response not found", chunk.data.message + assert_equal "previous_response_not_found", chunk.data.code + end + + test "response.incomplete produces an error chunk using incomplete_details.reason" do + chunk = Provider::Openai::ChatStreamParser.new( + { + "type" => "response.incomplete", + "response" => { + "incomplete_details" => { "reason" => "max_output_tokens" } + } + } + ).parsed + + assert_equal "error", chunk.type + assert_equal "response.incomplete", chunk.data.event + assert_equal "max_output_tokens", chunk.data.message + assert_equal "max_output_tokens", chunk.data.code + end + + test "response.failed without details still surfaces an event-tagged error" do + chunk = Provider::Openai::ChatStreamParser.new({ "type" => "response.failed" }).parsed + + assert_equal "error", chunk.type + assert_equal "response.failed", chunk.data.event + assert_match(/response\.failed/, chunk.data.message) + end + + test "top-level error event becomes an error chunk" do + chunk = Provider::Openai::ChatStreamParser.new( + { "type" => "error", "message" => "Rate limit exceeded", "code" => "rate_limit_exceeded" } + ).parsed + + assert_equal "error", chunk.type + assert_equal "error", chunk.data.event + assert_equal "Rate limit exceeded", chunk.data.message + assert_equal "rate_limit_exceeded", chunk.data.code + end + + test "response.completed parses into a response chunk" do + chunk = Provider::Openai::ChatStreamParser.new( + { + "type" => "response.completed", + "response" => { + "id" => "resp_1", + "model" => "gpt-4.1", + "output" => [], + "usage" => { "total_tokens" => 5 } + } + } + ).parsed + + assert_equal "response", chunk.type + assert_equal "resp_1", chunk.data.id + assert_equal({ "total_tokens" => 5 }, chunk.usage) + end +end diff --git a/test/models/provider/openai_test.rb b/test/models/provider/openai_test.rb index 5d3e5e960..2db1013a4 100644 --- a/test/models/provider/openai_test.rb +++ b/test/models/provider/openai_test.rb @@ -445,6 +445,54 @@ class Provider::OpenaiTest < ActiveSupport::TestCase end end + test "streaming surfaces a useful error when the stream ends with response.failed and no completion" do + fake_responses = mock + fake_client = mock + fake_client.stubs(:responses).returns(fake_responses) + @subject.stubs(:client).returns(fake_client) + + fake_responses.expects(:create).with do |*_args, **kwargs| + stream = kwargs.dig(:parameters, :stream) + stream.call({ + "type" => "response.failed", + "response" => { + "error" => { "message" => "Previous response not found", "code" => "previous_response_not_found" } + } + }) + true + end.returns(nil) + + response = @subject.chat_response( + "hi", + model: @subject_model, + streamer: proc { |_| } + ) + + assert_not response.success? + assert_kind_of Provider::Openai::Error, response.error + assert_match(/Previous response not found/, response.error.message) + assert_match(/previous_response_not_found/, response.error.message) + end + + test "streaming surfaces a useful error when the stream ends with no response and no error event" do + fake_responses = mock + fake_client = mock + fake_client.stubs(:responses).returns(fake_responses) + @subject.stubs(:client).returns(fake_client) + + fake_responses.expects(:create).returns(nil) + + response = @subject.chat_response( + "hi", + model: @subject_model, + streamer: proc { |_| } + ) + + assert_not response.success? + assert_kind_of Provider::Openai::Error, response.error + assert_match(/stream ended without a completion event/i, response.error.message) + end + test "build_input no longer accepts inline messages history" do config = Provider::Openai::ChatConfig.new(functions: [], function_results: []) # Positive control: prompt works