mirror of
https://github.com/we-promise/sure.git
synced 2026-04-19 03:54:08 +00:00
Add external AI assistant with Pipelock security proxy (#1069)
* feat(helm): add Pipelock ConfigMap, scanning config, and consolidate compose - Add ConfigMap template rendering DLP, response scanning, MCP input/tool scanning, and forward proxy settings from values - Mount ConfigMap as /etc/pipelock/pipelock.yaml volume in deployment - Add checksum/config annotation for automatic pod restart on config change - Gate HTTPS_PROXY/HTTP_PROXY env injection on forwardProxy.enabled (skip in MCP-only mode) - Use hasKey for all boolean values to prevent Helm default swallowing false - Single source of truth for ports (forwardProxy.port/mcpProxy.port) - Pipelock-specific imagePullSecrets with fallback to app secrets - Merge standalone compose.example.pipelock.yml into compose.example.ai.yml - Add pipelock.example.yaml for Docker Compose users - Add exclude-paths to CI workflow for locale file false positives * Add external assistant support (OpenAI-compatible SSE proxy) Allow self-hosted instances to delegate chat to an external AI agent via an OpenAI-compatible streaming endpoint. Configurable per-family through Settings UI or ASSISTANT_TYPE env override. - Assistant::External::Client: SSE streaming HTTP client (no new gems) - Settings UI with type selector, env lock indicator, config status - Helm chart and Docker Compose env var support - 45 tests covering client, config, routing, controller, integration * Add session key routing, email allowlist, and config plumbing Route to the actual OpenClaw session via x-openclaw-session-key header instead of creating isolated sessions. Gate external assistant access behind an email allowlist (EXTERNAL_ASSISTANT_ALLOWED_EMAILS env var). Plumb session_key and allowedEmails through Helm chart, compose, and env template. * Add HTTPS_PROXY support to External::Client for Pipelock integration Net::HTTP does not auto-read HTTPS_PROXY/HTTP_PROXY env vars (unlike Faraday). Explicitly resolve proxy from environment in build_http so outbound traffic to the external assistant routes through Pipelock's forward proxy when enabled. Respects NO_PROXY for internal hosts. * Add UI fields for external assistant config (Setting-backed with env fallback) Follow the same pattern as OpenAI settings: database-backed Setting fields with env var defaults. Self-hosters can now configure the external assistant URL, token, and agent ID from the browser (Settings > Self-Hosting > AI Assistant) instead of requiring env vars. Fields disable when the corresponding env var is set. * Improve external assistant UI labels and add help text Change placeholder to generic OpenAI-compatible URL pattern. Add help text under each field explaining where the values come from: URL from agent provider, token for authentication, agent ID for multi-agent routing. * Add external assistant docs and fix URL help text Add External AI Assistant section to docs/hosting/ai.md covering setup (UI and env vars), how it works, Pipelock security scanning, access control, and Docker Compose example. Drop "chat completions" jargon from URL help text. * Harden external assistant: retry logic, disconnect UI, error handling, and test coverage - Add retry with backoff for transient network errors (no retry after streaming starts) - Add disconnect button with confirmation modal in self-hosting settings - Narrow rescue scope with fallback logging for unexpected errors - Safe cleanup of partial responses on stream interruption - Gate ai_available? on family assistant_type instead of OR-ing all providers - Truncate conversation history to last 20 messages - Proxy-aware HTTP client with NO_PROXY support - Sanitize protocol to use generic headers (X-Agent-Id, X-Session-Key) - Full test coverage for streaming, retries, proxy routing, config, and disconnect * Exclude external assistant client from Pipelock scan-diff False positive: `@token` instance variable flagged as "Credential in URL". Temporary workaround until Pipelock supports inline suppression. * Address review feedback: NO_PROXY boundary fix, SSE done flag, design tokens - Fix NO_PROXY matching to require domain boundary (exact match or .suffix), case-insensitive. Prevents badexample.com matching example.com. - Add done flag to SSE streaming so read_body stops after [DONE] - Move MAX_CONVERSATION_MESSAGES to class level - Use bg-success/bg-destructive design tokens for status indicators - Add rationale comment for pipelock scan exclusion - Update docs last-updated date * Address second round of review feedback - Allowlist email comparison is now case-insensitive and nil-safe - Cap SSE buffer at 1 MB to prevent memory blowup from malformed streams - Don't expose upstream HTTP response body in user-facing errors (log it instead) - Fix frozen string warning on buffer initialization - Fix "builtin" typo in docs (should be "built-in") * Protect completed responses from cleanup, sanitize error messages - Don't destroy a fully streamed assistant message if post-stream metadata update fails (only cleanup partial responses) - Log raw connection/HTTP errors internally, show generic messages to users to avoid leaking network/proxy details - Update test assertions for new error message wording * Fix SSE content guard and NO_PROXY test correctness Use nil check instead of present? for SSE delta content to preserve whitespace-only chunks (newlines, spaces) that can occur in code output. Fix NO_PROXY test to use HTTP_PROXY matching the http:// client URL so the proxy resolution and NO_PROXY bypass logic are actually exercised. * Forward proxy credentials to Net::HTTP Pass proxy_uri.user and proxy_uri.password to Net::HTTP.new so authenticated proxies (http://user:pass@host:port) work correctly. Without this, credentials parsed from the proxy URL were silently dropped. Nil values are safe as positional args when no creds exist. * Update pipelock integration to v0.3.1 with full scanning config Bump Helm image tag from 0.2.7 to 0.3.1. Add missing security sections to both the Helm ConfigMap and compose example config: mcp_tool_policy, mcp_session_binding, and tool_chain_detection. These protect the /mcp endpoint against tool injection, session hijacking, and multi-step exfiltration chains. Add version and mode fields to config files. Enable include_defaults for DLP and response scanning to merge user patterns with the 35 built-in patterns. Remove redundant --mode CLI flag from the Helm deployment template since mode is now in the config file.
This commit is contained in:
@@ -36,7 +36,7 @@ module Assistant
|
||||
|
||||
def implementation_for(chat)
|
||||
raise Error, "chat is required" if chat.blank?
|
||||
type = chat.user&.family&.assistant_type.presence || "builtin"
|
||||
type = ENV["ASSISTANT_TYPE"].presence || chat.user&.family&.assistant_type.presence || "builtin"
|
||||
REGISTRY.fetch(type) { REGISTRY["builtin"] }
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,14 +1,110 @@
|
||||
class Assistant::External < Assistant::Base
|
||||
Config = Struct.new(:url, :token, :agent_id, :session_key, keyword_init: true)
|
||||
MAX_CONVERSATION_MESSAGES = 20
|
||||
|
||||
class << self
|
||||
def for_chat(chat)
|
||||
new(chat)
|
||||
end
|
||||
|
||||
def configured?
|
||||
config.url.present? && config.token.present?
|
||||
end
|
||||
|
||||
def available_for?(user)
|
||||
configured? && allowed_user?(user)
|
||||
end
|
||||
|
||||
def allowed_user?(user)
|
||||
allowed = ENV["EXTERNAL_ASSISTANT_ALLOWED_EMAILS"]
|
||||
return true if allowed.blank?
|
||||
return false if user&.email.blank?
|
||||
|
||||
allowed.split(",").map { |e| e.strip.downcase }.include?(user.email.downcase)
|
||||
end
|
||||
|
||||
def config
|
||||
Config.new(
|
||||
url: ENV["EXTERNAL_ASSISTANT_URL"].presence || Setting.external_assistant_url,
|
||||
token: ENV["EXTERNAL_ASSISTANT_TOKEN"].presence || Setting.external_assistant_token,
|
||||
agent_id: ENV["EXTERNAL_ASSISTANT_AGENT_ID"].presence || Setting.external_assistant_agent_id.presence || "main",
|
||||
session_key: ENV.fetch("EXTERNAL_ASSISTANT_SESSION_KEY", "agent:main:main")
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
def respond_to(message)
|
||||
stop_thinking
|
||||
chat.add_error(
|
||||
StandardError.new("External assistant (OpenClaw/WebSocket) is not yet implemented.")
|
||||
response_completed = false
|
||||
|
||||
unless self.class.configured?
|
||||
raise Assistant::Error,
|
||||
"External assistant is not configured. Set the URL and token in Settings > Self-Hosting or via environment variables."
|
||||
end
|
||||
|
||||
unless self.class.allowed_user?(chat.user)
|
||||
raise Assistant::Error, "Your account is not authorized to use the external assistant."
|
||||
end
|
||||
|
||||
assistant_message = AssistantMessage.new(
|
||||
chat: chat,
|
||||
content: "",
|
||||
ai_model: "external-agent"
|
||||
)
|
||||
|
||||
client = build_client
|
||||
messages = build_conversation_messages
|
||||
|
||||
model = client.chat(
|
||||
messages: messages,
|
||||
user: "sure-family-#{chat.user.family_id}"
|
||||
) do |text|
|
||||
if assistant_message.content.blank?
|
||||
stop_thinking
|
||||
assistant_message.content = text
|
||||
assistant_message.save!
|
||||
else
|
||||
assistant_message.append_text!(text)
|
||||
end
|
||||
end
|
||||
|
||||
if assistant_message.new_record?
|
||||
stop_thinking
|
||||
raise Assistant::Error, "External assistant returned an empty response."
|
||||
end
|
||||
|
||||
response_completed = true
|
||||
assistant_message.update!(ai_model: model) if model.present?
|
||||
rescue Assistant::Error, ActiveRecord::ActiveRecordError => e
|
||||
cleanup_partial_response(assistant_message) unless response_completed
|
||||
stop_thinking
|
||||
chat.add_error(e)
|
||||
rescue => e
|
||||
Rails.logger.error("[Assistant::External] Unexpected error: #{e.class} - #{e.message}")
|
||||
cleanup_partial_response(assistant_message) unless response_completed
|
||||
stop_thinking
|
||||
chat.add_error(Assistant::Error.new("Something went wrong with the external assistant. Check server logs for details."))
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def cleanup_partial_response(assistant_message)
|
||||
assistant_message&.destroy! if assistant_message&.persisted?
|
||||
rescue ActiveRecord::ActiveRecordError => e
|
||||
Rails.logger.warn("[Assistant::External] Failed to clean up partial response: #{e.message}")
|
||||
end
|
||||
|
||||
def build_client
|
||||
Assistant::External::Client.new(
|
||||
url: self.class.config.url,
|
||||
token: self.class.config.token,
|
||||
agent_id: self.class.config.agent_id,
|
||||
session_key: self.class.config.session_key
|
||||
)
|
||||
end
|
||||
|
||||
def build_conversation_messages
|
||||
chat.conversation_messages.ordered.last(MAX_CONVERSATION_MESSAGES).map do |msg|
|
||||
{ role: msg.role, content: msg.content }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
175
app/models/assistant/external/client.rb
vendored
Normal file
175
app/models/assistant/external/client.rb
vendored
Normal file
@@ -0,0 +1,175 @@
|
||||
require "net/http"
|
||||
require "uri"
|
||||
require "json"
|
||||
|
||||
class Assistant::External::Client
|
||||
TIMEOUT_CONNECT = 10 # seconds
|
||||
TIMEOUT_READ = 120 # seconds (agent may take time to reason + call tools)
|
||||
MAX_RETRIES = 2
|
||||
RETRY_DELAY = 1 # seconds (doubles each retry)
|
||||
MAX_SSE_BUFFER = 1_048_576 # 1 MB safety cap on SSE buffer
|
||||
|
||||
TRANSIENT_ERRORS = [
|
||||
Net::OpenTimeout,
|
||||
Net::ReadTimeout,
|
||||
Errno::ECONNREFUSED,
|
||||
Errno::ECONNRESET,
|
||||
Errno::EHOSTUNREACH,
|
||||
SocketError
|
||||
].freeze
|
||||
|
||||
def initialize(url:, token:, agent_id: "main", session_key: "agent:main:main")
|
||||
@url = url
|
||||
@token = token
|
||||
@agent_id = agent_id
|
||||
@session_key = session_key
|
||||
end
|
||||
|
||||
# Streams text chunks from an OpenAI-compatible chat endpoint via SSE.
|
||||
#
|
||||
# messages - Array of {role:, content:} hashes (conversation history)
|
||||
# user - Optional user identifier for session persistence
|
||||
# block - Called with each text chunk as it arrives
|
||||
#
|
||||
# Returns the model identifier string from the response.
|
||||
def chat(messages:, user: nil, &block)
|
||||
uri = URI(@url)
|
||||
request = build_request(uri, messages, user)
|
||||
retries = 0
|
||||
streaming_started = false
|
||||
|
||||
begin
|
||||
http = build_http(uri)
|
||||
model = stream_response(http, request) do |content|
|
||||
streaming_started = true
|
||||
block.call(content)
|
||||
end
|
||||
model
|
||||
rescue *TRANSIENT_ERRORS => e
|
||||
if streaming_started
|
||||
Rails.logger.warn("[External::Client] Stream interrupted: #{e.class} - #{e.message}")
|
||||
raise Assistant::Error, "External assistant connection was interrupted."
|
||||
end
|
||||
|
||||
retries += 1
|
||||
if retries <= MAX_RETRIES
|
||||
Rails.logger.warn("[External::Client] Transient error (attempt #{retries}/#{MAX_RETRIES}): #{e.class} - #{e.message}")
|
||||
sleep(RETRY_DELAY * retries)
|
||||
retry
|
||||
end
|
||||
Rails.logger.error("[External::Client] Unreachable after #{MAX_RETRIES + 1} attempts: #{e.class} - #{e.message}")
|
||||
raise Assistant::Error, "External assistant is temporarily unavailable."
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def stream_response(http, request, &block)
|
||||
model = nil
|
||||
buffer = +""
|
||||
done = false
|
||||
|
||||
http.request(request) do |response|
|
||||
unless response.is_a?(Net::HTTPSuccess)
|
||||
Rails.logger.warn("[External::Client] Upstream HTTP #{response.code}: #{response.body.to_s.truncate(500)}")
|
||||
raise Assistant::Error, "External assistant returned HTTP #{response.code}."
|
||||
end
|
||||
|
||||
response.read_body do |chunk|
|
||||
break if done
|
||||
buffer << chunk
|
||||
|
||||
if buffer.bytesize > MAX_SSE_BUFFER
|
||||
raise Assistant::Error, "External assistant stream exceeded maximum buffer size."
|
||||
end
|
||||
|
||||
while (line_end = buffer.index("\n"))
|
||||
line = buffer.slice!(0..line_end).strip
|
||||
next if line.empty?
|
||||
next unless line.start_with?("data:")
|
||||
|
||||
data = line.delete_prefix("data:")
|
||||
data = data.delete_prefix(" ") # SSE spec: strip one optional leading space
|
||||
|
||||
if data == "[DONE]"
|
||||
done = true
|
||||
break
|
||||
end
|
||||
|
||||
parsed = parse_sse_data(data)
|
||||
next unless parsed
|
||||
|
||||
model ||= parsed["model"]
|
||||
content = parsed.dig("choices", 0, "delta", "content")
|
||||
block.call(content) unless content.nil?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
model
|
||||
end
|
||||
|
||||
def build_http(uri)
|
||||
proxy_uri = resolve_proxy(uri)
|
||||
|
||||
if proxy_uri
|
||||
http = Net::HTTP.new(uri.host, uri.port, proxy_uri.host, proxy_uri.port, proxy_uri.user, proxy_uri.password)
|
||||
else
|
||||
http = Net::HTTP.new(uri.host, uri.port)
|
||||
end
|
||||
|
||||
http.use_ssl = (uri.scheme == "https")
|
||||
http.open_timeout = TIMEOUT_CONNECT
|
||||
http.read_timeout = TIMEOUT_READ
|
||||
http
|
||||
end
|
||||
|
||||
def resolve_proxy(uri)
|
||||
proxy_env = (uri.scheme == "https") ? "HTTPS_PROXY" : "HTTP_PROXY"
|
||||
proxy_url = ENV[proxy_env] || ENV[proxy_env.downcase]
|
||||
return nil if proxy_url.blank?
|
||||
|
||||
no_proxy = ENV["NO_PROXY"] || ENV["no_proxy"]
|
||||
return nil if host_bypasses_proxy?(uri.host, no_proxy)
|
||||
|
||||
URI(proxy_url)
|
||||
rescue URI::InvalidURIError => e
|
||||
Rails.logger.warn("[External::Client] Invalid proxy URL ignored: #{e.message}")
|
||||
nil
|
||||
end
|
||||
|
||||
def host_bypasses_proxy?(host, no_proxy)
|
||||
return false if no_proxy.blank?
|
||||
host_down = host.downcase
|
||||
no_proxy.split(",").any? do |pattern|
|
||||
pattern = pattern.strip.downcase.delete_prefix(".")
|
||||
host_down == pattern || host_down.end_with?(".#{pattern}")
|
||||
end
|
||||
end
|
||||
|
||||
def build_request(uri, messages, user)
|
||||
request = Net::HTTP::Post.new(uri.request_uri)
|
||||
request["Content-Type"] = "application/json"
|
||||
request["Authorization"] = "Bearer #{@token}"
|
||||
request["Accept"] = "text/event-stream"
|
||||
request["X-Agent-Id"] = @agent_id
|
||||
request["X-Session-Key"] = @session_key
|
||||
|
||||
payload = {
|
||||
model: @agent_id,
|
||||
messages: messages,
|
||||
stream: true
|
||||
}
|
||||
payload[:user] = user if user.present?
|
||||
|
||||
request.body = payload.to_json
|
||||
request
|
||||
end
|
||||
|
||||
def parse_sse_data(data)
|
||||
JSON.parse(data)
|
||||
rescue JSON::ParserError => e
|
||||
Rails.logger.warn("[External::Client] Unparseable SSE data: #{e.message}")
|
||||
nil
|
||||
end
|
||||
end
|
||||
@@ -10,6 +10,9 @@ class Setting < RailsSettings::Base
|
||||
field :openai_uri_base, type: :string, default: ENV["OPENAI_URI_BASE"]
|
||||
field :openai_model, type: :string, default: ENV["OPENAI_MODEL"]
|
||||
field :openai_json_mode, type: :string, default: ENV["LLM_JSON_MODE"]
|
||||
field :external_assistant_url, type: :string, default: ENV["EXTERNAL_ASSISTANT_URL"]
|
||||
field :external_assistant_token, type: :string, default: ENV["EXTERNAL_ASSISTANT_TOKEN"]
|
||||
field :external_assistant_agent_id, type: :string, default: ENV.fetch("EXTERNAL_ASSISTANT_AGENT_ID", "main")
|
||||
field :brand_fetch_client_id, type: :string, default: ENV["BRAND_FETCH_CLIENT_ID"]
|
||||
field :brand_fetch_high_res_logos, type: :boolean, default: ENV.fetch("BRAND_FETCH_HIGH_RES_LOGOS", "false") == "true"
|
||||
|
||||
|
||||
@@ -136,7 +136,16 @@ class User < ApplicationRecord
|
||||
end
|
||||
|
||||
def ai_available?
|
||||
!Rails.application.config.app_mode.self_hosted? || ENV["OPENAI_ACCESS_TOKEN"].present? || Setting.openai_access_token.present?
|
||||
return true unless Rails.application.config.app_mode.self_hosted?
|
||||
|
||||
effective_type = ENV["ASSISTANT_TYPE"].presence || family&.assistant_type.presence || "builtin"
|
||||
|
||||
case effective_type
|
||||
when "external"
|
||||
Assistant::External.available_for?(self)
|
||||
else
|
||||
ENV["OPENAI_ACCESS_TOKEN"].present? || Setting.openai_access_token.present?
|
||||
end
|
||||
end
|
||||
|
||||
def ai_enabled?
|
||||
|
||||
Reference in New Issue
Block a user