fix: preserve openai reasoning live stops

This commit is contained in:
Keisuke Hirata 2026-06-03 11:08:06 +09:00
parent 42066f1e00
commit abb6adb5d2
No known key found for this signature in database
3 changed files with 254 additions and 66 deletions

View File

@ -25,8 +25,8 @@ pub struct OpenAIResponsesState {
next_index: usize,
/// 蓄積中の reasoning output_item。`output_item.added`(Reasoning) で
/// 確保し、`reasoning_text.delta` / `reasoning_summary_text.delta` で
/// 蓄積、`output_item.done`(Reasoning) で metadata-only Thinking block を
/// 完了させて reasoning persistence material を渡す
/// 蓄積、`output_item.done`(Reasoning) で既存 reasoning_text block または
/// metadata-only Thinking block に reasoning persistence material を載せる
pending_reasoning: HashMap<usize, PendingReasoning>,
}
@ -38,6 +38,24 @@ struct PendingReasoning {
text: String,
/// `reasoning_summary_text.delta` を summary_index 順に蓄積。
summary: Vec<String>,
/// `response.content_part.done` が先に到着した reasoning/thinking block。
///
/// `response.output_item.done` まで待たないと encrypted_content や最終
/// summary が揃わないため、live-visible な余分な synthetic Thinking block
/// を作らず、既存 block の stop に persistence metadata を載せる。
deferred_thinking_stops: Vec<SlotInfo>,
}
impl PendingReasoning {
fn into_reasoning_data(self, encrypted_content: Option<String>) -> ReasoningBlockData {
ReasoningBlockData {
id: self.id,
text: Some(self.text),
summary: self.summary.into_iter().filter(|s| !s.is_empty()).collect(),
encrypted_content,
signature: None,
}
}
}
impl OpenAIResponsesState {
@ -73,6 +91,31 @@ impl OpenAIResponsesState {
}
entry.summary[summary_index].push_str(text);
}
fn defer_reasoning_stop(&mut self, output_index: usize, info: SlotInfo) {
self.ensure_reasoning(output_index)
.deferred_thinking_stops
.push(info);
}
fn take_active_reasoning_slots(&mut self, output_index: usize) -> Vec<SlotInfo> {
let mut keys: Vec<_> = self
.slots
.iter()
.filter_map(|(key, info)| match key {
SlotKey::ContentPart { output, content }
if *output == output_index && info.block_type == BlockType::Thinking =>
{
Some((*content, *key))
}
_ => None,
})
.collect();
keys.sort_by_key(|(content, _)| *content);
keys.into_iter()
.filter_map(|(_, key)| self.slots.remove(&key))
.collect()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
@ -380,9 +423,10 @@ pub(crate) fn parse_sse(
"response.output_item.done" => {
let ev: OutputItemDone = from_json(data)?;
// Reasoning wrapper の done で蓄積分を metadata-only Thinking block
// stop に載せる。これは `slots` の OutputItem slot とは独立している
// (FunctionCall は slots、Reasoning は pending_reasoning)。
// Reasoning wrapper の done で蓄積分を既存 reasoning_text block の
// stop に載せる。content_part.done が先に来た場合は stop を defer
// しておき、ここで encrypted_content / summary と一緒に完了させる。
// reasoning_text が無い metadata-only item だけ synthetic block を作る。
if let OutputItem::Reasoning {
id,
encrypted_content,
@ -396,32 +440,43 @@ pub(crate) fn parse_sse(
if pending.id.is_none() {
pending.id = id;
}
let info =
state.allocate(SlotKey::OutputItem(ev.output_index), BlockType::Thinking);
state.slots.remove(&SlotKey::OutputItem(ev.output_index));
return Ok(vec![
Event::BlockStart(BlockStart {
index: info.flat_index,
block_type: BlockType::Thinking,
metadata: BlockMetadata::Thinking,
}),
Event::BlockStop(BlockStop {
index: info.flat_index,
block_type: BlockType::Thinking,
stop_reason: None,
reasoning: Some(ReasoningBlockData {
id: pending.id,
text: Some(pending.text),
summary: pending
.summary
.into_iter()
.filter(|s| !s.is_empty())
.collect(),
encrypted_content,
signature: None,
let mut stop_blocks = std::mem::take(&mut pending.deferred_thinking_stops);
stop_blocks.extend(state.take_active_reasoning_slots(ev.output_index));
let reasoning = pending.into_reasoning_data(encrypted_content);
if stop_blocks.is_empty() {
let info =
state.allocate(SlotKey::OutputItem(ev.output_index), BlockType::Thinking);
state.slots.remove(&SlotKey::OutputItem(ev.output_index));
return Ok(vec![
Event::BlockStart(BlockStart {
index: info.flat_index,
block_type: BlockType::Thinking,
metadata: BlockMetadata::Thinking,
}),
}),
]);
Event::BlockStop(BlockStop {
index: info.flat_index,
block_type: BlockType::Thinking,
stop_reason: None,
reasoning: Some(reasoning),
}),
]);
}
let last = stop_blocks.len() - 1;
return Ok(stop_blocks
.into_iter()
.enumerate()
.map(|(idx, info)| {
Event::BlockStop(BlockStop {
index: info.flat_index,
block_type: info.block_type,
stop_reason: None,
reasoning: (idx == last).then(|| reasoning.clone()),
})
})
.collect());
}
if let Some(info) = state.slots.remove(&SlotKey::OutputItem(ev.output_index)) {
Ok(vec![Event::BlockStop(BlockStop {
@ -462,12 +517,19 @@ pub(crate) fn parse_sse(
output: ev.output_index,
content: ev.content_index,
}) {
Ok(vec![Event::BlockStop(BlockStop {
index: info.flat_index,
block_type: info.block_type,
stop_reason: None,
reasoning: None,
})])
if matches!(ev.part, ContentPart::ReasoningText { .. })
|| info.block_type == BlockType::Thinking
{
state.defer_reasoning_stop(ev.output_index, info);
Ok(Vec::new())
} else {
Ok(vec![Event::BlockStop(BlockStop {
index: info.flat_index,
block_type: info.block_type,
stop_reason: None,
reasoning: None,
})])
}
} else {
Ok(Vec::new())
}
@ -1146,11 +1208,15 @@ mod tests {
r#"{"output_index":0,"item":{"type":"reasoning","id":"r1"}}"#,
);
// 内側の reasoning_text 用 content_part
with(
let start = with(
&mut state,
"response.content_part.added",
r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":""}}"#,
);
let start_index = match start.as_slice() {
[Event::BlockStart(start)] => start.index,
other => panic!("expected one BlockStart, got {other:?}"),
};
with(
&mut state,
"response.reasoning_text.delta",
@ -1161,11 +1227,12 @@ mod tests {
"response.reasoning_text.delta",
r#"{"output_index":0,"content_index":0,"item_id":"r1","delta":"world"}"#,
);
with(
let part_done = with(
&mut state,
"response.content_part.done",
r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":"hello world"}}"#,
);
assert!(part_done.is_empty());
// summary 1 件
with(
&mut state,
@ -1189,11 +1256,11 @@ mod tests {
"response.output_item.done",
r#"{"output_index":0,"item":{"type":"reasoning","id":"r1","encrypted_content":"ENC-XYZ"}}"#,
);
assert_eq!(evs.len(), 2);
assert!(matches!(evs[0], Event::BlockStart(_)));
let Event::BlockStop(stop) = &evs[1] else {
panic!("expected BlockStop, got {:?}", evs[1]);
assert_eq!(evs.len(), 1);
let Event::BlockStop(stop) = &evs[0] else {
panic!("expected BlockStop, got {:?}", evs[0]);
};
assert_eq!(stop.index, start_index);
let reasoning = stop.reasoning.as_ref().expect("reasoning metadata");
assert_eq!(reasoning.id.as_deref(), Some("r1"));
assert_eq!(reasoning.text.as_deref(), Some("hello world"));
@ -1204,6 +1271,90 @@ mod tests {
assert!(state.pending_reasoning.is_empty());
}
#[test]
fn reasoning_text_done_then_output_done_emits_single_existing_block_stop() {
let mut state = OpenAIResponsesState::default();
with(
&mut state,
"response.output_item.added",
r#"{"output_index":0,"item":{"type":"reasoning","id":"r1"}}"#,
);
let mut lifecycle = Vec::new();
lifecycle.extend(with(
&mut state,
"response.content_part.added",
r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":""}}"#,
));
lifecycle.extend(with(
&mut state,
"response.reasoning_text.delta",
r#"{"output_index":0,"content_index":0,"item_id":"r1","delta":"think"}"#,
));
lifecycle.extend(with(
&mut state,
"response.content_part.done",
r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":"think"}}"#,
));
lifecycle.extend(with(
&mut state,
"response.output_item.done",
r#"{"output_index":0,"item":{"type":"reasoning","id":"r1","encrypted_content":"ENC"}}"#,
));
let starts: Vec<_> = lifecycle
.iter()
.filter_map(|event| match event {
Event::BlockStart(start) => Some(start.index),
_ => None,
})
.collect();
let stops: Vec<_> = lifecycle
.iter()
.filter_map(|event| match event {
Event::BlockStop(stop) => Some(stop),
_ => None,
})
.collect();
assert_eq!(starts.len(), 1, "no synthetic second Thinking start");
assert_eq!(stops.len(), 1, "no duplicate empty Thinking stop");
assert_eq!(stops[0].index, starts[0]);
let reasoning = stops[0].reasoning.as_ref().expect("reasoning metadata");
assert_eq!(reasoning.text.as_deref(), Some("think"));
assert_eq!(reasoning.encrypted_content.as_deref(), Some("ENC"));
struct StopRecorder(std::sync::Arc<std::sync::Mutex<Vec<(usize, String, bool)>>>);
impl crate::handler::Handler<crate::handler::ThinkingBlockKind> for StopRecorder {
type Scope = String;
fn on_event(
&mut self,
scope: &mut Self::Scope,
event: &crate::handler::ThinkingBlockEvent,
) {
match event {
crate::handler::ThinkingBlockEvent::Start(_) => scope.clear(),
crate::handler::ThinkingBlockEvent::Delta(delta) => scope.push_str(delta),
crate::handler::ThinkingBlockEvent::Stop(stop) => self
.0
.lock()
.unwrap()
.push((stop.index, scope.clone(), stop.reasoning.is_some())),
}
}
}
let stops = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let mut timeline = crate::timeline::Timeline::new();
timeline.on_thinking_block(StopRecorder(stops.clone()));
for event in &lifecycle {
timeline.dispatch(event);
}
let stops = stops.lock().unwrap().clone();
assert_eq!(stops, vec![(starts[0], "think".to_string(), true)]);
}
#[test]
fn reasoning_wrapper_without_inner_content_emits_empty_text() {
// encrypted_content だけ届くreasoning_text 無し)ケースでも

View File

@ -3,7 +3,7 @@
//! LLMからのイベントストリームを受信し、登録されたHandlerにディスパッチします。
//! 通常はWorker経由で使用しますが、直接使用することも可能です。
use std::marker::PhantomData;
use std::{collections::HashMap, marker::PhantomData};
use super::event::*;
use crate::handler::*;
@ -189,7 +189,7 @@ where
H: Handler<ThinkingBlockKind>,
{
handler: H,
scope: Option<H::Scope>,
scopes: HashMap<usize, H::Scope>,
}
impl<H> ThinkingBlockHandlerWrapper<H>
@ -199,7 +199,7 @@ where
fn new(handler: H) -> Self {
Self {
handler,
scope: None,
scopes: HashMap::new(),
}
}
}
@ -210,27 +210,25 @@ where
H::Scope: Send + Sync,
{
fn dispatch_start(&mut self, start: &BlockStart) {
if let Some(scope) = &mut self.scope {
self.handler.on_event(
scope,
&ThinkingBlockEvent::Start(ThinkingBlockStart { index: start.index }),
);
}
let scope = self.scopes.entry(start.index).or_default();
self.handler.on_event(
scope,
&ThinkingBlockEvent::Start(ThinkingBlockStart { index: start.index }),
);
}
fn dispatch_delta(&mut self, delta: &BlockDelta) {
if let Some(scope) = &mut self.scope {
if let DeltaContent::Thinking(text) = &delta.delta {
self.handler
.on_event(scope, &ThinkingBlockEvent::Delta(text.clone()));
}
if let DeltaContent::Thinking(text) = &delta.delta {
let scope = self.scopes.entry(delta.index).or_default();
self.handler
.on_event(scope, &ThinkingBlockEvent::Delta(text.clone()));
}
}
fn dispatch_stop(&mut self, stop: &BlockStop) {
if let Some(scope) = &mut self.scope {
if let Some(mut scope) = self.scopes.remove(&stop.index) {
self.handler.on_event(
scope,
&mut scope,
&ThinkingBlockEvent::Stop(ThinkingBlockStop {
index: stop.index,
reasoning: stop.reasoning.clone(),
@ -239,18 +237,16 @@ where
}
}
fn dispatch_abort(&mut self, _abort: &BlockAbort) {}
fn start_scope(&mut self) {
self.scope = Some(H::Scope::default());
fn dispatch_abort(&mut self, _abort: &BlockAbort) {
self.scopes.clear();
}
fn end_scope(&mut self) {
self.scope = None;
}
fn start_scope(&mut self) {}
fn end_scope(&mut self) {}
fn has_scope(&self) -> bool {
self.scope.is_some()
!self.scopes.is_empty()
}
}

View File

@ -0,0 +1,41 @@
# Implementation report: reasoning block lifecycle
## Investigation
Initial implementation unified reasoning persistence through `BlockStop.reasoning`, but OpenAI Responses text-bearing reasoning items still had two Thinking lifecycles:
1. `response.reasoning_text.delta` streamed through the real reasoning content-part block.
2. `response.content_part.done` stopped that block with no persistence metadata.
3. `response.output_item.done` emitted a second synthetic metadata-only Thinking `BlockStart`/`BlockStop` pair.
That preserved persistence but changed live callback semantics: UI/trace consumers that listen to Thinking block stop callbacks could observe an extra empty Thinking stop after the real streamed reasoning block.
## Fix summary
OpenAI Responses now defers the stop for reasoning `content_part.done` when the part is a Thinking/reasoning-text content block. At `response.output_item.done`, the provider finalizes the deferred existing block with `ReasoningBlockData` instead of creating a second synthetic live-visible block.
Thinking block handler scopes are also keyed by block index, so a deferred reasoning-text stop still uses its original streamed buffer even if another Thinking block (for example a reasoning summary block) starts and stops before `output_item.done`.
Metadata-only reasoning items with no reasoning content-part still emit a synthetic metadata-bearing Thinking block so encrypted/id-only reasoning can be persisted and round-tripped.
The fix preserves:
- live `reasoning_text.delta` Thinking deltas;
- OpenAI Responses `id`, `summary`, and `encrypted_content` persistence;
- a single Thinking lifecycle for text-bearing reasoning items;
- metadata-only reasoning coverage.
## Validation
Passed:
- `cargo test -p llm-worker openai_responses::events::tests::reasoning --lib`
- `cargo test -p llm-worker --lib`
- `cargo check --workspace --all-targets`
- `./tickets.sh doctor`
- `git diff --check`
- `nix build .#yoi`
## Residual risk
The provider delays the stop event for OpenAI Responses reasoning text blocks until `response.output_item.done` so final encrypted/summary metadata can be attached to the same block. This avoids duplicate live stops but means the block stop is slightly later than the raw `content_part.done` SSE for reasoning text. This is intentional for the unified persistence model and covered by focused provider tests for the reviewed sequence.