merge: openai unhandled sse observability

This commit is contained in:
Keisuke Hirata 2026-05-28 05:44:14 +09:00
commit dbfdf6aa6c
3 changed files with 109 additions and 10 deletions

View File

@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize};
///
/// # イベントの種類
///
/// - **メタイベント**: `Ping`, `Usage`, `Status`, `Error`
/// - **メタイベント**: `Ping`, `Usage`, `Status`, `Error`, `UnhandledSse`
/// - **ブロックイベント**: `BlockStart`, `BlockDelta`, `BlockStop`, `BlockAbort`
/// - **永続化イベント**: `ReasoningItem` (history に commit すべき完成済み
/// reasoning item。streaming 表示用の Thinking BlockStart/Delta/Stop と
@ -35,6 +35,10 @@ pub enum Event {
Status(StatusEvent),
/// エラー発生
Error(ErrorEvent),
/// Scheme が生成内容として解釈しない未対応 SSE イベント。
///
/// stream trace 用の観測イベントであり、timeline / history には反映しない。
UnhandledSse(UnhandledSseEvent),
/// ブロック開始(テキスト、ツール使用等)
BlockStart(BlockStart),
@ -119,6 +123,18 @@ pub struct ErrorEvent {
pub message: String,
}
/// 未対応 SSE イベントの観測用メタイベント。
///
/// `data_preview` は provider から受け取った raw SSE data の bounded preview、
/// `data_len` は preview 前の raw data byte length。
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct UnhandledSseEvent {
pub provider: String,
pub event_type: String,
pub data_preview: String,
pub data_len: usize,
}
// =============================================================================
// Block Types
// =============================================================================

View File

@ -14,7 +14,7 @@ use crate::llm_client::{
ClientError,
event::{
BlockDelta, BlockMetadata, BlockStart, BlockStop, BlockType, DeltaContent, ErrorEvent,
Event, ReasoningItemEvent, ResponseStatus, StatusEvent, UsageEvent,
Event, ReasoningItemEvent, ResponseStatus, StatusEvent, UnhandledSseEvent, UsageEvent,
},
};
@ -305,9 +305,9 @@ struct TopLevelError {
/// SSE フレーム 1 件をパースし、0 個以上の [`Event`] に変換する。
///
/// `event_type` は SSE の `event:` フィールド。未対応の event
/// 静かに無視する。`data` が JSON でない / 必要なフィールドが抜けて
/// いる等は [`ClientError::Api`] で返す。
/// `event_type` は SSE の `event:` フィールド。未対応の event type
/// [`Event::UnhandledSse`] として観測可能にする。`data` が JSON でない /
/// 必要なフィールドが抜けている等は [`ClientError::Api`] で返す。
pub(crate) fn parse_sse(
event_type: &str,
data: &str,
@ -581,8 +581,8 @@ pub(crate) fn parse_sse(
Ok(vec![Event::Error(ErrorEvent { code, message })])
}
// 未対応 / 情報系イベントは無視
_ => Ok(Vec::new()),
// 未対応 / 情報系 event type は生成 semantics からは無視しつつ trace に残す。
_ => Ok(vec![unhandled_sse_event(event_type, data)]),
}
}
@ -674,6 +674,32 @@ fn top_level_error_diagnostic(ev: TopLevelErrorEnvelope) -> (Option<String>, Str
}
const DIAGNOSTIC_VALUE_LIMIT: usize = 512;
const UNHANDLED_SSE_DATA_PREVIEW_LIMIT: usize = 512;
fn capped_unhandled_sse_data_preview(data: &str) -> String {
if data.len() <= UNHANDLED_SSE_DATA_PREVIEW_LIMIT {
return data.to_string();
}
let mut end = 0;
for (idx, ch) in data.char_indices() {
let next = idx + ch.len_utf8();
if next > UNHANDLED_SSE_DATA_PREVIEW_LIMIT {
break;
}
end = next;
}
data[..end].to_string()
}
fn unhandled_sse_event(event_type: &str, data: &str) -> Event {
Event::UnhandledSse(UnhandledSseEvent {
provider: "openai_responses".to_string(),
event_type: event_type.to_string(),
data_preview: capped_unhandled_sse_data_preview(data),
data_len: data.len(),
})
}
fn diagnostic_object(extra: BTreeMap<String, Value>, value_limit: usize) -> Value {
Value::Object(
@ -1182,8 +1208,33 @@ mod tests {
}
#[test]
fn unknown_event_is_ignored() {
let (events, _) = run("response.in_progress", "{}");
assert!(events.is_empty());
fn unknown_event_emits_trace_visible_unhandled_sse() {
let data = r#"{"sequence_number":7,"note":"debug me"}"#;
let (events, _) = run("response.mystery", data);
assert_eq!(events.len(), 1);
let Event::UnhandledSse(unhandled) = &events[0] else {
panic!("expected UnhandledSse, got {:?}", events[0]);
};
assert_eq!(unhandled.provider, "openai_responses");
assert_eq!(unhandled.event_type, "response.mystery");
assert_eq!(unhandled.data_preview, data);
assert_eq!(unhandled.data_len, data.len());
}
#[test]
fn unknown_event_data_preview_is_bounded_and_data_len_is_original_bytes() {
let data = format!("{}終端", "x".repeat(UNHANDLED_SSE_DATA_PREVIEW_LIMIT + 32));
let (events, _) = run("response.mystery.large", &data);
assert_eq!(events.len(), 1);
let Event::UnhandledSse(unhandled) = &events[0] else {
panic!("expected UnhandledSse, got {:?}", events[0]);
};
assert_eq!(unhandled.data_len, data.len());
assert!(unhandled.data_preview.len() <= UNHANDLED_SSE_DATA_PREVIEW_LIMIT);
assert_eq!(
unhandled.data_preview,
"x".repeat(UNHANDLED_SSE_DATA_PREVIEW_LIMIT)
);
assert!(unhandled.data_preview.len() < unhandled.data_len);
}
}

View File

@ -530,6 +530,8 @@ impl Timeline {
Event::Ping(p) => self.dispatch_ping(p),
Event::Status(s) => self.dispatch_status(s),
Event::Error(e) => self.dispatch_error(e),
// Observability-only event: stream trace records it before timeline dispatch.
Event::UnhandledSse(_) => {}
// Block系: スコープ管理しながらディスパッチ
Event::BlockStart(s) => self.handle_block_start(s),
@ -678,6 +680,36 @@ mod tests {
assert!(timeline.current_block().is_none());
}
#[test]
fn unhandled_sse_is_ignored_by_timeline_handlers() {
struct TestTextHandler {
calls: Arc<Mutex<Vec<TextBlockEvent>>>,
}
impl Handler<TextBlockKind> for TestTextHandler {
type Scope = ();
fn on_event(&mut self, _scope: &mut (), event: &TextBlockEvent) {
self.calls.lock().unwrap().push(event.clone());
}
}
let calls = Arc::new(Mutex::new(Vec::new()));
let mut timeline = Timeline::new();
timeline.on_text_block(TestTextHandler {
calls: calls.clone(),
});
timeline.dispatch(&Event::UnhandledSse(UnhandledSseEvent {
provider: "openai_responses".to_string(),
event_type: "response.mystery".to_string(),
data_preview: "{}".to_string(),
data_len: 2,
}));
assert!(timeline.current_block().is_none());
assert!(calls.lock().unwrap().is_empty());
}
#[test]
fn test_meta_event_dispatch() {
// シンプルなテスト用構造体