From abb6adb5d20eca45b53722f5f2c399c483c6146a Mon Sep 17 00:00:00 2001 From: Hare Date: Wed, 3 Jun 2026 11:08:06 +0900 Subject: [PATCH] fix: preserve openai reasoning live stops --- .../scheme/openai_responses/events.rs | 235 ++++++++++++++---- crates/llm-worker/src/timeline/timeline.rs | 44 ++-- .../artifacts/implementation-report.md | 41 +++ 3 files changed, 254 insertions(+), 66 deletions(-) create mode 100644 work-items/open/20260603-001124-unify-reasoning-block-lifecycle/artifacts/implementation-report.md diff --git a/crates/llm-worker/src/llm_client/scheme/openai_responses/events.rs b/crates/llm-worker/src/llm_client/scheme/openai_responses/events.rs index 322e27e9..24ac3f26 100644 --- a/crates/llm-worker/src/llm_client/scheme/openai_responses/events.rs +++ b/crates/llm-worker/src/llm_client/scheme/openai_responses/events.rs @@ -25,8 +25,8 @@ pub struct OpenAIResponsesState { next_index: usize, /// 蓄積中の reasoning output_item。`output_item.added`(Reasoning) で /// 確保し、`reasoning_text.delta` / `reasoning_summary_text.delta` で - /// 蓄積、`output_item.done`(Reasoning) で metadata-only Thinking block を - /// 完了させて reasoning persistence material を渡す。 + /// 蓄積、`output_item.done`(Reasoning) で既存 reasoning_text block または + /// metadata-only Thinking block に reasoning persistence material を載せる。 pending_reasoning: HashMap, } @@ -38,6 +38,24 @@ struct PendingReasoning { text: String, /// `reasoning_summary_text.delta` を summary_index 順に蓄積。 summary: Vec, + /// `response.content_part.done` が先に到着した reasoning/thinking block。 + /// + /// `response.output_item.done` まで待たないと encrypted_content や最終 + /// summary が揃わないため、live-visible な余分な synthetic Thinking block + /// を作らず、既存 block の stop に persistence metadata を載せる。 + deferred_thinking_stops: Vec, +} + +impl PendingReasoning { + fn into_reasoning_data(self, encrypted_content: Option) -> ReasoningBlockData { + ReasoningBlockData { + id: self.id, + text: Some(self.text), + summary: self.summary.into_iter().filter(|s| !s.is_empty()).collect(), + encrypted_content, + signature: None, + } + } } impl OpenAIResponsesState { @@ -73,6 +91,31 @@ impl OpenAIResponsesState { } entry.summary[summary_index].push_str(text); } + + fn defer_reasoning_stop(&mut self, output_index: usize, info: SlotInfo) { + self.ensure_reasoning(output_index) + .deferred_thinking_stops + .push(info); + } + + fn take_active_reasoning_slots(&mut self, output_index: usize) -> Vec { + let mut keys: Vec<_> = self + .slots + .iter() + .filter_map(|(key, info)| match key { + SlotKey::ContentPart { output, content } + if *output == output_index && info.block_type == BlockType::Thinking => + { + Some((*content, *key)) + } + _ => None, + }) + .collect(); + keys.sort_by_key(|(content, _)| *content); + keys.into_iter() + .filter_map(|(_, key)| self.slots.remove(&key)) + .collect() + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -380,9 +423,10 @@ pub(crate) fn parse_sse( "response.output_item.done" => { let ev: OutputItemDone = from_json(data)?; - // Reasoning wrapper の done で蓄積分を metadata-only Thinking block - // stop に載せる。これは `slots` の OutputItem slot とは独立している - // (FunctionCall は slots、Reasoning は pending_reasoning)。 + // Reasoning wrapper の done で蓄積分を既存 reasoning_text block の + // stop に載せる。content_part.done が先に来た場合は stop を defer + // しておき、ここで encrypted_content / summary と一緒に完了させる。 + // reasoning_text が無い metadata-only item だけ synthetic block を作る。 if let OutputItem::Reasoning { id, encrypted_content, @@ -396,32 +440,43 @@ pub(crate) fn parse_sse( if pending.id.is_none() { pending.id = id; } - let info = - state.allocate(SlotKey::OutputItem(ev.output_index), BlockType::Thinking); - state.slots.remove(&SlotKey::OutputItem(ev.output_index)); - return Ok(vec![ - Event::BlockStart(BlockStart { - index: info.flat_index, - block_type: BlockType::Thinking, - metadata: BlockMetadata::Thinking, - }), - Event::BlockStop(BlockStop { - index: info.flat_index, - block_type: BlockType::Thinking, - stop_reason: None, - reasoning: Some(ReasoningBlockData { - id: pending.id, - text: Some(pending.text), - summary: pending - .summary - .into_iter() - .filter(|s| !s.is_empty()) - .collect(), - encrypted_content, - signature: None, + + let mut stop_blocks = std::mem::take(&mut pending.deferred_thinking_stops); + stop_blocks.extend(state.take_active_reasoning_slots(ev.output_index)); + let reasoning = pending.into_reasoning_data(encrypted_content); + + if stop_blocks.is_empty() { + let info = + state.allocate(SlotKey::OutputItem(ev.output_index), BlockType::Thinking); + state.slots.remove(&SlotKey::OutputItem(ev.output_index)); + return Ok(vec![ + Event::BlockStart(BlockStart { + index: info.flat_index, + block_type: BlockType::Thinking, + metadata: BlockMetadata::Thinking, }), - }), - ]); + Event::BlockStop(BlockStop { + index: info.flat_index, + block_type: BlockType::Thinking, + stop_reason: None, + reasoning: Some(reasoning), + }), + ]); + } + + let last = stop_blocks.len() - 1; + return Ok(stop_blocks + .into_iter() + .enumerate() + .map(|(idx, info)| { + Event::BlockStop(BlockStop { + index: info.flat_index, + block_type: info.block_type, + stop_reason: None, + reasoning: (idx == last).then(|| reasoning.clone()), + }) + }) + .collect()); } if let Some(info) = state.slots.remove(&SlotKey::OutputItem(ev.output_index)) { Ok(vec![Event::BlockStop(BlockStop { @@ -462,12 +517,19 @@ pub(crate) fn parse_sse( output: ev.output_index, content: ev.content_index, }) { - Ok(vec![Event::BlockStop(BlockStop { - index: info.flat_index, - block_type: info.block_type, - stop_reason: None, - reasoning: None, - })]) + if matches!(ev.part, ContentPart::ReasoningText { .. }) + || info.block_type == BlockType::Thinking + { + state.defer_reasoning_stop(ev.output_index, info); + Ok(Vec::new()) + } else { + Ok(vec![Event::BlockStop(BlockStop { + index: info.flat_index, + block_type: info.block_type, + stop_reason: None, + reasoning: None, + })]) + } } else { Ok(Vec::new()) } @@ -1146,11 +1208,15 @@ mod tests { r#"{"output_index":0,"item":{"type":"reasoning","id":"r1"}}"#, ); // 内側の reasoning_text 用 content_part - with( + let start = with( &mut state, "response.content_part.added", r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":""}}"#, ); + let start_index = match start.as_slice() { + [Event::BlockStart(start)] => start.index, + other => panic!("expected one BlockStart, got {other:?}"), + }; with( &mut state, "response.reasoning_text.delta", @@ -1161,11 +1227,12 @@ mod tests { "response.reasoning_text.delta", r#"{"output_index":0,"content_index":0,"item_id":"r1","delta":"world"}"#, ); - with( + let part_done = with( &mut state, "response.content_part.done", r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":"hello world"}}"#, ); + assert!(part_done.is_empty()); // summary 1 件 with( &mut state, @@ -1189,11 +1256,11 @@ mod tests { "response.output_item.done", r#"{"output_index":0,"item":{"type":"reasoning","id":"r1","encrypted_content":"ENC-XYZ"}}"#, ); - assert_eq!(evs.len(), 2); - assert!(matches!(evs[0], Event::BlockStart(_))); - let Event::BlockStop(stop) = &evs[1] else { - panic!("expected BlockStop, got {:?}", evs[1]); + assert_eq!(evs.len(), 1); + let Event::BlockStop(stop) = &evs[0] else { + panic!("expected BlockStop, got {:?}", evs[0]); }; + assert_eq!(stop.index, start_index); let reasoning = stop.reasoning.as_ref().expect("reasoning metadata"); assert_eq!(reasoning.id.as_deref(), Some("r1")); assert_eq!(reasoning.text.as_deref(), Some("hello world")); @@ -1204,6 +1271,90 @@ mod tests { assert!(state.pending_reasoning.is_empty()); } + #[test] + fn reasoning_text_done_then_output_done_emits_single_existing_block_stop() { + let mut state = OpenAIResponsesState::default(); + with( + &mut state, + "response.output_item.added", + r#"{"output_index":0,"item":{"type":"reasoning","id":"r1"}}"#, + ); + + let mut lifecycle = Vec::new(); + lifecycle.extend(with( + &mut state, + "response.content_part.added", + r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":""}}"#, + )); + lifecycle.extend(with( + &mut state, + "response.reasoning_text.delta", + r#"{"output_index":0,"content_index":0,"item_id":"r1","delta":"think"}"#, + )); + lifecycle.extend(with( + &mut state, + "response.content_part.done", + r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":"think"}}"#, + )); + lifecycle.extend(with( + &mut state, + "response.output_item.done", + r#"{"output_index":0,"item":{"type":"reasoning","id":"r1","encrypted_content":"ENC"}}"#, + )); + + let starts: Vec<_> = lifecycle + .iter() + .filter_map(|event| match event { + Event::BlockStart(start) => Some(start.index), + _ => None, + }) + .collect(); + let stops: Vec<_> = lifecycle + .iter() + .filter_map(|event| match event { + Event::BlockStop(stop) => Some(stop), + _ => None, + }) + .collect(); + + assert_eq!(starts.len(), 1, "no synthetic second Thinking start"); + assert_eq!(stops.len(), 1, "no duplicate empty Thinking stop"); + assert_eq!(stops[0].index, starts[0]); + let reasoning = stops[0].reasoning.as_ref().expect("reasoning metadata"); + assert_eq!(reasoning.text.as_deref(), Some("think")); + assert_eq!(reasoning.encrypted_content.as_deref(), Some("ENC")); + + struct StopRecorder(std::sync::Arc>>); + impl crate::handler::Handler for StopRecorder { + type Scope = String; + + fn on_event( + &mut self, + scope: &mut Self::Scope, + event: &crate::handler::ThinkingBlockEvent, + ) { + match event { + crate::handler::ThinkingBlockEvent::Start(_) => scope.clear(), + crate::handler::ThinkingBlockEvent::Delta(delta) => scope.push_str(delta), + crate::handler::ThinkingBlockEvent::Stop(stop) => self + .0 + .lock() + .unwrap() + .push((stop.index, scope.clone(), stop.reasoning.is_some())), + } + } + } + + let stops = std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let mut timeline = crate::timeline::Timeline::new(); + timeline.on_thinking_block(StopRecorder(stops.clone())); + for event in &lifecycle { + timeline.dispatch(event); + } + let stops = stops.lock().unwrap().clone(); + assert_eq!(stops, vec![(starts[0], "think".to_string(), true)]); + } + #[test] fn reasoning_wrapper_without_inner_content_emits_empty_text() { // encrypted_content だけ届く(reasoning_text 無し)ケースでも diff --git a/crates/llm-worker/src/timeline/timeline.rs b/crates/llm-worker/src/timeline/timeline.rs index 61b2cb06..b28259ff 100644 --- a/crates/llm-worker/src/timeline/timeline.rs +++ b/crates/llm-worker/src/timeline/timeline.rs @@ -3,7 +3,7 @@ //! LLMからのイベントストリームを受信し、登録されたHandlerにディスパッチします。 //! 通常はWorker経由で使用しますが、直接使用することも可能です。 -use std::marker::PhantomData; +use std::{collections::HashMap, marker::PhantomData}; use super::event::*; use crate::handler::*; @@ -189,7 +189,7 @@ where H: Handler, { handler: H, - scope: Option, + scopes: HashMap, } impl ThinkingBlockHandlerWrapper @@ -199,7 +199,7 @@ where fn new(handler: H) -> Self { Self { handler, - scope: None, + scopes: HashMap::new(), } } } @@ -210,27 +210,25 @@ where H::Scope: Send + Sync, { fn dispatch_start(&mut self, start: &BlockStart) { - if let Some(scope) = &mut self.scope { - self.handler.on_event( - scope, - &ThinkingBlockEvent::Start(ThinkingBlockStart { index: start.index }), - ); - } + let scope = self.scopes.entry(start.index).or_default(); + self.handler.on_event( + scope, + &ThinkingBlockEvent::Start(ThinkingBlockStart { index: start.index }), + ); } fn dispatch_delta(&mut self, delta: &BlockDelta) { - if let Some(scope) = &mut self.scope { - if let DeltaContent::Thinking(text) = &delta.delta { - self.handler - .on_event(scope, &ThinkingBlockEvent::Delta(text.clone())); - } + if let DeltaContent::Thinking(text) = &delta.delta { + let scope = self.scopes.entry(delta.index).or_default(); + self.handler + .on_event(scope, &ThinkingBlockEvent::Delta(text.clone())); } } fn dispatch_stop(&mut self, stop: &BlockStop) { - if let Some(scope) = &mut self.scope { + if let Some(mut scope) = self.scopes.remove(&stop.index) { self.handler.on_event( - scope, + &mut scope, &ThinkingBlockEvent::Stop(ThinkingBlockStop { index: stop.index, reasoning: stop.reasoning.clone(), @@ -239,18 +237,16 @@ where } } - fn dispatch_abort(&mut self, _abort: &BlockAbort) {} - - fn start_scope(&mut self) { - self.scope = Some(H::Scope::default()); + fn dispatch_abort(&mut self, _abort: &BlockAbort) { + self.scopes.clear(); } - fn end_scope(&mut self) { - self.scope = None; - } + fn start_scope(&mut self) {} + + fn end_scope(&mut self) {} fn has_scope(&self) -> bool { - self.scope.is_some() + !self.scopes.is_empty() } } diff --git a/work-items/open/20260603-001124-unify-reasoning-block-lifecycle/artifacts/implementation-report.md b/work-items/open/20260603-001124-unify-reasoning-block-lifecycle/artifacts/implementation-report.md new file mode 100644 index 00000000..2efbe606 --- /dev/null +++ b/work-items/open/20260603-001124-unify-reasoning-block-lifecycle/artifacts/implementation-report.md @@ -0,0 +1,41 @@ +# Implementation report: reasoning block lifecycle + +## Investigation + +Initial implementation unified reasoning persistence through `BlockStop.reasoning`, but OpenAI Responses text-bearing reasoning items still had two Thinking lifecycles: + +1. `response.reasoning_text.delta` streamed through the real reasoning content-part block. +2. `response.content_part.done` stopped that block with no persistence metadata. +3. `response.output_item.done` emitted a second synthetic metadata-only Thinking `BlockStart`/`BlockStop` pair. + +That preserved persistence but changed live callback semantics: UI/trace consumers that listen to Thinking block stop callbacks could observe an extra empty Thinking stop after the real streamed reasoning block. + +## Fix summary + +OpenAI Responses now defers the stop for reasoning `content_part.done` when the part is a Thinking/reasoning-text content block. At `response.output_item.done`, the provider finalizes the deferred existing block with `ReasoningBlockData` instead of creating a second synthetic live-visible block. + +Thinking block handler scopes are also keyed by block index, so a deferred reasoning-text stop still uses its original streamed buffer even if another Thinking block (for example a reasoning summary block) starts and stops before `output_item.done`. + +Metadata-only reasoning items with no reasoning content-part still emit a synthetic metadata-bearing Thinking block so encrypted/id-only reasoning can be persisted and round-tripped. + +The fix preserves: + +- live `reasoning_text.delta` Thinking deltas; +- OpenAI Responses `id`, `summary`, and `encrypted_content` persistence; +- a single Thinking lifecycle for text-bearing reasoning items; +- metadata-only reasoning coverage. + +## Validation + +Passed: + +- `cargo test -p llm-worker openai_responses::events::tests::reasoning --lib` +- `cargo test -p llm-worker --lib` +- `cargo check --workspace --all-targets` +- `./tickets.sh doctor` +- `git diff --check` +- `nix build .#yoi` + +## Residual risk + +The provider delays the stop event for OpenAI Responses reasoning text blocks until `response.output_item.done` so final encrypted/summary metadata can be attached to the same block. This avoids duplicate live stops but means the block stop is slightly later than the raw `content_part.done` SSE for reasoning text. This is intentional for the unified persistence model and covered by focused provider tests for the reviewed sequence.