From 60b9cb169a03032631b7d2ddfac8df260e318c23 Mon Sep 17 00:00:00 2001 From: Hare Date: Thu, 28 May 2026 05:18:57 +0900 Subject: [PATCH] fix: trace unhandled openai responses sse --- crates/llm-worker/src/llm_client/event.rs | 18 ++++- .../scheme/openai_responses/events.rs | 69 ++++++++++++++++--- crates/llm-worker/src/timeline/timeline.rs | 32 +++++++++ 3 files changed, 109 insertions(+), 10 deletions(-) diff --git a/crates/llm-worker/src/llm_client/event.rs b/crates/llm-worker/src/llm_client/event.rs index 3e478803..d38b1734 100644 --- a/crates/llm-worker/src/llm_client/event.rs +++ b/crates/llm-worker/src/llm_client/event.rs @@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize}; /// /// # イベントの種類 /// -/// - **メタイベント**: `Ping`, `Usage`, `Status`, `Error` +/// - **メタイベント**: `Ping`, `Usage`, `Status`, `Error`, `UnhandledSse` /// - **ブロックイベント**: `BlockStart`, `BlockDelta`, `BlockStop`, `BlockAbort` /// - **永続化イベント**: `ReasoningItem` (history に commit すべき完成済み /// reasoning item。streaming 表示用の Thinking BlockStart/Delta/Stop と @@ -35,6 +35,10 @@ pub enum Event { Status(StatusEvent), /// エラー発生 Error(ErrorEvent), + /// Scheme が生成内容として解釈しない未対応 SSE イベント。 + /// + /// stream trace 用の観測イベントであり、timeline / history には反映しない。 + UnhandledSse(UnhandledSseEvent), /// ブロック開始(テキスト、ツール使用等) BlockStart(BlockStart), @@ -119,6 +123,18 @@ pub struct ErrorEvent { pub message: String, } +/// 未対応 SSE イベントの観測用メタイベント。 +/// +/// `data_preview` は provider から受け取った raw SSE data の bounded preview、 +/// `data_len` は preview 前の raw data byte length。 +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct UnhandledSseEvent { + pub provider: String, + pub event_type: String, + pub data_preview: String, + pub data_len: usize, +} + // ============================================================================= // Block Types // ============================================================================= diff --git a/crates/llm-worker/src/llm_client/scheme/openai_responses/events.rs b/crates/llm-worker/src/llm_client/scheme/openai_responses/events.rs index 1107ae56..2d17e721 100644 --- a/crates/llm-worker/src/llm_client/scheme/openai_responses/events.rs +++ b/crates/llm-worker/src/llm_client/scheme/openai_responses/events.rs @@ -14,7 +14,7 @@ use crate::llm_client::{ ClientError, event::{ BlockDelta, BlockMetadata, BlockStart, BlockStop, BlockType, DeltaContent, ErrorEvent, - Event, ReasoningItemEvent, ResponseStatus, StatusEvent, UsageEvent, + Event, ReasoningItemEvent, ResponseStatus, StatusEvent, UnhandledSseEvent, UsageEvent, }, }; @@ -305,9 +305,9 @@ struct TopLevelError { /// SSE フレーム 1 件をパースし、0 個以上の [`Event`] に変換する。 /// -/// `event_type` は SSE の `event:` フィールド。未対応の event は -/// 静かに無視する。`data` が JSON でない / 必要なフィールドが抜けて -/// いる等は [`ClientError::Api`] で返す。 +/// `event_type` は SSE の `event:` フィールド。未対応の event type は +/// [`Event::UnhandledSse`] として観測可能にする。`data` が JSON でない / +/// 必要なフィールドが抜けている等は [`ClientError::Api`] で返す。 pub(crate) fn parse_sse( event_type: &str, data: &str, @@ -581,8 +581,8 @@ pub(crate) fn parse_sse( Ok(vec![Event::Error(ErrorEvent { code, message })]) } - // 未対応 / 情報系イベントは無視 - _ => Ok(Vec::new()), + // 未対応 / 情報系 event type は生成 semantics からは無視しつつ trace に残す。 + _ => Ok(vec![unhandled_sse_event(event_type, data)]), } } @@ -674,6 +674,32 @@ fn top_level_error_diagnostic(ev: TopLevelErrorEnvelope) -> (Option, Str } const DIAGNOSTIC_VALUE_LIMIT: usize = 512; +const UNHANDLED_SSE_DATA_PREVIEW_LIMIT: usize = 512; + +fn capped_unhandled_sse_data_preview(data: &str) -> String { + if data.len() <= UNHANDLED_SSE_DATA_PREVIEW_LIMIT { + return data.to_string(); + } + + let mut end = 0; + for (idx, ch) in data.char_indices() { + let next = idx + ch.len_utf8(); + if next > UNHANDLED_SSE_DATA_PREVIEW_LIMIT { + break; + } + end = next; + } + data[..end].to_string() +} + +fn unhandled_sse_event(event_type: &str, data: &str) -> Event { + Event::UnhandledSse(UnhandledSseEvent { + provider: "openai_responses".to_string(), + event_type: event_type.to_string(), + data_preview: capped_unhandled_sse_data_preview(data), + data_len: data.len(), + }) +} fn diagnostic_object(extra: BTreeMap, value_limit: usize) -> Value { Value::Object( @@ -1182,8 +1208,33 @@ mod tests { } #[test] - fn unknown_event_is_ignored() { - let (events, _) = run("response.in_progress", "{}"); - assert!(events.is_empty()); + fn unknown_event_emits_trace_visible_unhandled_sse() { + let data = r#"{"sequence_number":7,"note":"debug me"}"#; + let (events, _) = run("response.mystery", data); + assert_eq!(events.len(), 1); + let Event::UnhandledSse(unhandled) = &events[0] else { + panic!("expected UnhandledSse, got {:?}", events[0]); + }; + assert_eq!(unhandled.provider, "openai_responses"); + assert_eq!(unhandled.event_type, "response.mystery"); + assert_eq!(unhandled.data_preview, data); + assert_eq!(unhandled.data_len, data.len()); + } + + #[test] + fn unknown_event_data_preview_is_bounded_and_data_len_is_original_bytes() { + let data = format!("{}終端", "x".repeat(UNHANDLED_SSE_DATA_PREVIEW_LIMIT + 32)); + let (events, _) = run("response.mystery.large", &data); + assert_eq!(events.len(), 1); + let Event::UnhandledSse(unhandled) = &events[0] else { + panic!("expected UnhandledSse, got {:?}", events[0]); + }; + assert_eq!(unhandled.data_len, data.len()); + assert!(unhandled.data_preview.len() <= UNHANDLED_SSE_DATA_PREVIEW_LIMIT); + assert_eq!( + unhandled.data_preview, + "x".repeat(UNHANDLED_SSE_DATA_PREVIEW_LIMIT) + ); + assert!(unhandled.data_preview.len() < unhandled.data_len); } } diff --git a/crates/llm-worker/src/timeline/timeline.rs b/crates/llm-worker/src/timeline/timeline.rs index 4739f8c9..06826592 100644 --- a/crates/llm-worker/src/timeline/timeline.rs +++ b/crates/llm-worker/src/timeline/timeline.rs @@ -530,6 +530,8 @@ impl Timeline { Event::Ping(p) => self.dispatch_ping(p), Event::Status(s) => self.dispatch_status(s), Event::Error(e) => self.dispatch_error(e), + // Observability-only event: stream trace records it before timeline dispatch. + Event::UnhandledSse(_) => {} // Block系: スコープ管理しながらディスパッチ Event::BlockStart(s) => self.handle_block_start(s), @@ -678,6 +680,36 @@ mod tests { assert!(timeline.current_block().is_none()); } + #[test] + fn unhandled_sse_is_ignored_by_timeline_handlers() { + struct TestTextHandler { + calls: Arc>>, + } + + impl Handler for TestTextHandler { + type Scope = (); + fn on_event(&mut self, _scope: &mut (), event: &TextBlockEvent) { + self.calls.lock().unwrap().push(event.clone()); + } + } + + let calls = Arc::new(Mutex::new(Vec::new())); + let mut timeline = Timeline::new(); + timeline.on_text_block(TestTextHandler { + calls: calls.clone(), + }); + + timeline.dispatch(&Event::UnhandledSse(UnhandledSseEvent { + provider: "openai_responses".to_string(), + event_type: "response.mystery".to_string(), + data_preview: "{}".to_string(), + data_len: 2, + })); + + assert!(timeline.current_block().is_none()); + assert!(calls.lock().unwrap().is_empty()); + } + #[test] fn test_meta_event_dispatch() { // シンプルなテスト用構造体