merge: unify reasoning block lifecycle

This commit is contained in:
Keisuke Hirata 2026-06-03 11:13:42 +09:00
commit c94e3a01e1
No known key found for this signature in database
13 changed files with 534 additions and 260 deletions

View File

@ -91,16 +91,6 @@ impl Kind for ErrorKind {
type Event = ErrorEvent; type Event = ErrorEvent;
} }
/// Reasoning item Kind - 完成済み reasoning item の永続化用
///
/// 1 reasoning item につき 1 度だけ発火する。Worker は
/// `ReasoningItemCollector` 経由で受け取り、ターン終了時に
/// `Item::Reasoning` として history に append する。
pub struct ReasoningItemKind;
impl Kind for ReasoningItemKind {
type Event = ReasoningItemEvent;
}
// ============================================================================= // =============================================================================
// Block Kind Definitions // Block Kind Definitions
// ============================================================================= // =============================================================================
@ -152,6 +142,7 @@ pub struct ThinkingBlockStart {
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
pub struct ThinkingBlockStop { pub struct ThinkingBlockStop {
pub index: usize, pub index: usize,
pub reasoning: Option<ReasoningBlockData>,
} }
/// ToolUseBlock Kind - for tool use blocks /// ToolUseBlock Kind - for tool use blocks

View File

@ -17,14 +17,12 @@ use serde::{Deserialize, Serialize};
/// ///
/// - **メタイベント**: `Ping`, `Usage`, `Status`, `Error`, `UnhandledSse` /// - **メタイベント**: `Ping`, `Usage`, `Status`, `Error`, `UnhandledSse`
/// - **ブロックイベント**: `BlockStart`, `BlockDelta`, `BlockStop`, `BlockAbort` /// - **ブロックイベント**: `BlockStart`, `BlockDelta`, `BlockStop`, `BlockAbort`
/// - **永続化イベント**: `ReasoningItem` (history に commit すべき完成済み
/// reasoning item。streaming 表示用の Thinking BlockStart/Delta/Stop と
/// は別経路で発火する)
/// ///
/// # ブロックのライフサイクル /// # ブロックのライフサイクル
/// ///
/// テキストやツール呼び出しは、`BlockStart` → `BlockDelta`(複数) → `BlockStop` /// テキスト、thinking、ツール呼び出しは、`BlockStart` → `BlockDelta`(複数) → `BlockStop`
/// の順序でイベントが発生します。 /// の順序でイベントが発生します。thinking の round-trip metadata は
/// `BlockStop.reasoning` に載ります。
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum Event { pub enum Event {
/// ハートビート /// ハートビート
@ -48,18 +46,6 @@ pub enum Event {
BlockStop(BlockStop), BlockStop(BlockStop),
/// ブロック中断 /// ブロック中断
BlockAbort(BlockAbort), BlockAbort(BlockAbort),
/// Reasoning item の完成。scheme が「次の request に送り返すための
/// reasoning material が揃った」点で 1 度だけ発火する。
///
/// - Anthropic: 1 つの `thinking` content_block 完了ごと
/// - OpenAI Responses: 1 つの reasoning output_item 完了ごと
///
/// 上位層Worker / ReasoningItemCollectorはこれを `Item::Reasoning`
/// として `worker.history` に append する。streaming 表示用の
/// `BlockStart(Thinking)` / `BlockDelta(Thinking)` / `BlockStop(Thinking)`
/// は依然として並行発火するlive display と round-trip persist の責務分離)。
ReasoningItem(ReasoningItemEvent),
} }
// ============================================================================= // =============================================================================
@ -218,6 +204,12 @@ pub struct BlockStop {
pub block_type: BlockType, pub block_type: BlockType,
/// 停止理由 /// 停止理由
pub stop_reason: Option<StopReason>, pub stop_reason: Option<StopReason>,
/// Thinking block の停止時に確定した reasoning round-trip metadata。
///
/// `None` の Thinking block は live streaming / trace 用で、history に
/// `Item::Reasoning` として永続化しない。`Some` の場合は block lifecycle
/// が永続化の authoritative source になる。
pub reasoning: Option<ReasoningBlockData>,
} }
impl BlockStop { impl BlockStop {
@ -243,22 +235,17 @@ impl BlockAbort {
} }
} }
// ============================================================================= /// Thinking block stop で確定した reasoning material。
// Reasoning Item Event
// =============================================================================
/// 完成済み reasoning item。scheme が round-trip に必要なすべての
/// materialtext, summary, encrypted_content, signature, idを揃えて
/// 1 度だけ発火する。
/// ///
/// `Item::Reasoning` のフィールドを 1:1 に持つ。 /// `Item::Reasoning` の round-trip に必要な provider material を保持する。
/// `text` は deltas から収集した本文を上書きするために使うmetadata-only
/// reasoning block や provider completion event で全文が届くケース)。
#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
pub struct ReasoningItemEvent { pub struct ReasoningBlockData {
/// scheme 側で観測した item idOpenAI Responses の `id`)。 /// scheme 側で観測した item idOpenAI Responses の `id`)。
pub id: Option<String>, pub id: Option<String>,
/// reasoning 本体テキスト。Anthropic は `thinking` 累積、OpenAI は /// reasoning 本体テキスト。`None` の場合は block delta 収集結果を使う。
/// `reasoning_text` 累積。redacted_thinking では空。 pub text: Option<String>,
pub text: String,
/// summary (OpenAI Responses の `summary_text[]`)。他 scheme は空。 /// summary (OpenAI Responses の `summary_text[]`)。他 scheme は空。
pub summary: Vec<String>, pub summary: Vec<String>,
/// 暗号化された opaque blobAnthropic `redacted_thinking.data` / /// 暗号化された opaque blobAnthropic `redacted_thinking.data` /
@ -309,6 +296,7 @@ impl Event {
index, index,
block_type: BlockType::Text, block_type: BlockType::Text,
stop_reason, stop_reason,
reasoning: None,
}) })
} }
@ -338,6 +326,7 @@ impl Event {
index, index,
block_type: BlockType::ToolUse, block_type: BlockType::ToolUse,
stop_reason: Some(StopReason::ToolUse), stop_reason: Some(StopReason::ToolUse),
reasoning: None,
}) })
} }

View File

@ -216,6 +216,7 @@ impl AnthropicScheme {
index: event.index, index: event.index,
block_type: BlockType::Text, // Timeline層で上書きされる block_type: BlockType::Text, // Timeline層で上書きされる
stop_reason: None, stop_reason: None,
reasoning: None,
}))) })))
} }
AnthropicEventType::MessageDelta => { AnthropicEventType::MessageDelta => {
@ -286,9 +287,9 @@ impl AnthropicScheme {
/// `parse_event` の単発 Event に加えて、以下を行う: /// `parse_event` の単発 Event に加えて、以下を行う:
/// - `content_block_stop` の `block_type` を直前の Start 値で書き戻す /// - `content_block_stop` の `block_type` を直前の Start 値で書き戻す
/// - `thinking` / `redacted_thinking` ブロックの本体・signature・data を /// - `thinking` / `redacted_thinking` ブロックの本体・signature・data を
/// `state.pending_thinking` に蓄積し、`content_block_stop` /// `state.pending_thinking` に蓄積し、`content_block_stop` の Thinking
/// `Event::ReasoningItem` を追加発火す /// BlockStop metadata に載せ
/// - `signature_delta` を蓄積Stream channel には流さず、reasoning event /// - `signature_delta` を蓄積Stream channel には流さず、reasoning metadata
/// にだけ反映する) /// にだけ反映する)
pub(crate) fn parse_with_state( pub(crate) fn parse_with_state(
&self, &self,
@ -374,16 +375,21 @@ impl AnthropicScheme {
AnthropicEventType::ContentBlockStop => { AnthropicEventType::ContentBlockStop => {
let raw: ContentBlockStopEvent = serde_json::from_str(data)?; let raw: ContentBlockStopEvent = serde_json::from_str(data)?;
let block_type = state.current_block_type.take().unwrap_or(BlockType::Text); let block_type = state.current_block_type.take().unwrap_or(BlockType::Text);
let reasoning = if matches!(block_type, BlockType::Thinking) {
state
.pending_thinking
.take()
.map(PendingThinking::into_reasoning)
} else {
state.pending_thinking.take();
None
};
emitted.push(Event::BlockStop(BlockStop { emitted.push(Event::BlockStop(BlockStop {
index: raw.index, index: raw.index,
block_type, block_type,
stop_reason: None, stop_reason: None,
reasoning,
})); }));
if matches!(block_type, BlockType::Thinking) {
if let Some(pending) = state.pending_thinking.take() {
emitted.push(Event::ReasoningItem(pending.into_event()));
}
}
} }
// 残りは state を必要としない。既存 parse_event に委譲。 // 残りは state を必要としない。既存 parse_event に委譲。
_ => { _ => {
@ -524,8 +530,8 @@ mod tests {
} }
#[test] #[test]
fn thinking_block_emits_reasoning_item_with_signature() { fn thinking_block_stop_carries_reasoning_with_signature() {
// thinking ブロックが完了したら ReasoningItem に text+signature が乗ること // thinking ブロックが完了したら reasoning metadata に text+signature が乗ること
let scheme = AnthropicScheme::new(); let scheme = AnthropicScheme::new();
let mut state = AnthropicState::default(); let mut state = AnthropicState::default();
@ -567,18 +573,18 @@ mod tests {
&mut state, &mut state,
) )
.unwrap(); .unwrap();
// BlockStop と ReasoningItem の 2 件が並ぶ assert_eq!(stop_evs.len(), 1);
assert!(matches!(stop_evs[0], Event::BlockStop(_))); let Event::BlockStop(stop) = &stop_evs[0] else {
let Event::ReasoningItem(reasoning) = &stop_evs[1] else { panic!("expected BlockStop, got {:?}", stop_evs[0]);
panic!("expected ReasoningItem, got {:?}", stop_evs[1]);
}; };
assert_eq!(reasoning.text, "hello world"); let reasoning = stop.reasoning.as_ref().expect("reasoning metadata");
assert_eq!(reasoning.text.as_deref(), Some("hello world"));
assert_eq!(reasoning.signature.as_deref(), Some("SIG-XYZ")); assert_eq!(reasoning.signature.as_deref(), Some("SIG-XYZ"));
assert!(reasoning.encrypted_content.is_none()); assert!(reasoning.encrypted_content.is_none());
} }
#[test] #[test]
fn redacted_thinking_emits_reasoning_item_with_data() { fn redacted_thinking_stop_carries_reasoning_with_data() {
let scheme = AnthropicScheme::new(); let scheme = AnthropicScheme::new();
let mut state = AnthropicState::default(); let mut state = AnthropicState::default();
@ -596,16 +602,18 @@ mod tests {
&mut state, &mut state,
) )
.unwrap(); .unwrap();
let Event::ReasoningItem(reasoning) = &stop_evs[1] else { assert_eq!(stop_evs.len(), 1);
panic!("expected ReasoningItem"); let Event::BlockStop(stop) = &stop_evs[0] else {
panic!("expected BlockStop");
}; };
assert!(reasoning.text.is_empty()); let reasoning = stop.reasoning.as_ref().expect("reasoning metadata");
assert_eq!(reasoning.text.as_deref(), Some(""));
assert!(reasoning.signature.is_none()); assert!(reasoning.signature.is_none());
assert_eq!(reasoning.encrypted_content.as_deref(), Some("opaque-blob")); assert_eq!(reasoning.encrypted_content.as_deref(), Some("opaque-blob"));
} }
#[test] #[test]
fn text_block_does_not_emit_reasoning_item() { fn text_block_stop_has_no_reasoning_metadata() {
let scheme = AnthropicScheme::new(); let scheme = AnthropicScheme::new();
let mut state = AnthropicState::default(); let mut state = AnthropicState::default();
@ -631,7 +639,10 @@ mod tests {
) )
.unwrap(); .unwrap();
assert_eq!(stop_evs.len(), 1); assert_eq!(stop_evs.len(), 1);
assert!(matches!(stop_evs[0], Event::BlockStop(_))); let Event::BlockStop(stop) = &stop_evs[0] else {
panic!("expected BlockStop");
};
assert!(stop.reasoning.is_none());
} }
#[test] #[test]

View File

@ -9,7 +9,7 @@ use crate::llm_client::{
ClientError, ClientError,
auth::AuthRequirement, auth::AuthRequirement,
capability::ModelCapability, capability::ModelCapability,
event::{BlockType, Event, ReasoningItemEvent}, event::{BlockType, Event, ReasoningBlockData},
scheme::Scheme, scheme::Scheme,
types::Request, types::Request,
}; };
@ -23,7 +23,7 @@ use super::AnthropicScheme;
/// `BlockStop` に書き戻す。 /// `BlockStop` に書き戻す。
/// 2. `thinking` ブロック中の `thinking_delta` テキストと `signature_delta` /// 2. `thinking` ブロック中の `thinking_delta` テキストと `signature_delta`
/// 署名、および `redacted_thinking` ブロックの `data` を蓄積し、 /// 署名、および `redacted_thinking` ブロックの `data` を蓄積し、
/// `content_block_stop` で `Event::ReasoningItem` を発火する /// `content_block_stop` の Thinking block metadata として返す
/// round-trip 永続化のため)。 /// round-trip 永続化のため)。
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct AnthropicState { pub struct AnthropicState {
@ -40,10 +40,10 @@ pub(crate) struct PendingThinking {
} }
impl PendingThinking { impl PendingThinking {
pub(crate) fn into_event(self) -> ReasoningItemEvent { pub(crate) fn into_reasoning(self) -> ReasoningBlockData {
ReasoningItemEvent { ReasoningBlockData {
id: None, id: None,
text: self.text, text: Some(self.text),
summary: Vec::new(), summary: Vec::new(),
encrypted_content: self.redacted_data, encrypted_content: self.redacted_data,
signature: self.signature, signature: self.signature,

View File

@ -205,6 +205,7 @@ impl GeminiScheme {
index: candidate_index, index: candidate_index,
block_type: BlockType::Text, block_type: BlockType::Text,
stop_reason, stop_reason,
reasoning: None,
})); }));
} }
} }

View File

@ -14,7 +14,7 @@ use crate::llm_client::{
ClientError, ClientError,
event::{ event::{
BlockDelta, BlockMetadata, BlockStart, BlockStop, BlockType, DeltaContent, ErrorEvent, BlockDelta, BlockMetadata, BlockStart, BlockStop, BlockType, DeltaContent, ErrorEvent,
Event, ReasoningItemEvent, ResponseStatus, StatusEvent, UnhandledSseEvent, UsageEvent, Event, ReasoningBlockData, ResponseStatus, StatusEvent, UnhandledSseEvent, UsageEvent,
}, },
}; };
@ -25,8 +25,8 @@ pub struct OpenAIResponsesState {
next_index: usize, next_index: usize,
/// 蓄積中の reasoning output_item。`output_item.added`(Reasoning) で /// 蓄積中の reasoning output_item。`output_item.added`(Reasoning) で
/// 確保し、`reasoning_text.delta` / `reasoning_summary_text.delta` で /// 確保し、`reasoning_text.delta` / `reasoning_summary_text.delta` で
/// 蓄積、`output_item.done`(Reasoning) で `Event::ReasoningItem` を /// 蓄積、`output_item.done`(Reasoning) で既存 reasoning_text block または
/// 発火してエントリを除去する。 /// metadata-only Thinking block に reasoning persistence material を載せる。
pending_reasoning: HashMap<usize, PendingReasoning>, pending_reasoning: HashMap<usize, PendingReasoning>,
} }
@ -38,6 +38,24 @@ struct PendingReasoning {
text: String, text: String,
/// `reasoning_summary_text.delta` を summary_index 順に蓄積。 /// `reasoning_summary_text.delta` を summary_index 順に蓄積。
summary: Vec<String>, summary: Vec<String>,
/// `response.content_part.done` が先に到着した reasoning/thinking block。
///
/// `response.output_item.done` まで待たないと encrypted_content や最終
/// summary が揃わないため、live-visible な余分な synthetic Thinking block
/// を作らず、既存 block の stop に persistence metadata を載せる。
deferred_thinking_stops: Vec<SlotInfo>,
}
impl PendingReasoning {
fn into_reasoning_data(self, encrypted_content: Option<String>) -> ReasoningBlockData {
ReasoningBlockData {
id: self.id,
text: Some(self.text),
summary: self.summary.into_iter().filter(|s| !s.is_empty()).collect(),
encrypted_content,
signature: None,
}
}
} }
impl OpenAIResponsesState { impl OpenAIResponsesState {
@ -73,6 +91,31 @@ impl OpenAIResponsesState {
} }
entry.summary[summary_index].push_str(text); entry.summary[summary_index].push_str(text);
} }
fn defer_reasoning_stop(&mut self, output_index: usize, info: SlotInfo) {
self.ensure_reasoning(output_index)
.deferred_thinking_stops
.push(info);
}
fn take_active_reasoning_slots(&mut self, output_index: usize) -> Vec<SlotInfo> {
let mut keys: Vec<_> = self
.slots
.iter()
.filter_map(|(key, info)| match key {
SlotKey::ContentPart { output, content }
if *output == output_index && info.block_type == BlockType::Thinking =>
{
Some((*content, *key))
}
_ => None,
})
.collect();
keys.sort_by_key(|(content, _)| *content);
keys.into_iter()
.filter_map(|(_, key)| self.slots.remove(&key))
.collect()
}
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
@ -380,9 +423,10 @@ pub(crate) fn parse_sse(
"response.output_item.done" => { "response.output_item.done" => {
let ev: OutputItemDone = from_json(data)?; let ev: OutputItemDone = from_json(data)?;
// Reasoning wrapper の done で蓄積分を ReasoningItem として発火。 // Reasoning wrapper の done で蓄積分を既存 reasoning_text block の
// これは `slots` の OutputItem slot とは独立している // stop に載せる。content_part.done が先に来た場合は stop を defer
// (FunctionCall は slots、Reasoning は pending_reasoning)。 // しておき、ここで encrypted_content / summary と一緒に完了させる。
// reasoning_text が無い metadata-only item だけ synthetic block を作る。
if let OutputItem::Reasoning { if let OutputItem::Reasoning {
id, id,
encrypted_content, encrypted_content,
@ -396,23 +440,50 @@ pub(crate) fn parse_sse(
if pending.id.is_none() { if pending.id.is_none() {
pending.id = id; pending.id = id;
} }
return Ok(vec![Event::ReasoningItem(ReasoningItemEvent {
id: pending.id, let mut stop_blocks = std::mem::take(&mut pending.deferred_thinking_stops);
text: pending.text, stop_blocks.extend(state.take_active_reasoning_slots(ev.output_index));
summary: pending let reasoning = pending.into_reasoning_data(encrypted_content);
.summary
.into_iter() if stop_blocks.is_empty() {
.filter(|s| !s.is_empty()) let info =
.collect(), state.allocate(SlotKey::OutputItem(ev.output_index), BlockType::Thinking);
encrypted_content, state.slots.remove(&SlotKey::OutputItem(ev.output_index));
signature: None, return Ok(vec![
})]); Event::BlockStart(BlockStart {
index: info.flat_index,
block_type: BlockType::Thinking,
metadata: BlockMetadata::Thinking,
}),
Event::BlockStop(BlockStop {
index: info.flat_index,
block_type: BlockType::Thinking,
stop_reason: None,
reasoning: Some(reasoning),
}),
]);
}
let last = stop_blocks.len() - 1;
return Ok(stop_blocks
.into_iter()
.enumerate()
.map(|(idx, info)| {
Event::BlockStop(BlockStop {
index: info.flat_index,
block_type: info.block_type,
stop_reason: None,
reasoning: (idx == last).then(|| reasoning.clone()),
})
})
.collect());
} }
if let Some(info) = state.slots.remove(&SlotKey::OutputItem(ev.output_index)) { if let Some(info) = state.slots.remove(&SlotKey::OutputItem(ev.output_index)) {
Ok(vec![Event::BlockStop(BlockStop { Ok(vec![Event::BlockStop(BlockStop {
index: info.flat_index, index: info.flat_index,
block_type: info.block_type, block_type: info.block_type,
stop_reason: None, stop_reason: None,
reasoning: None,
})]) })])
} else { } else {
Ok(Vec::new()) Ok(Vec::new())
@ -446,11 +517,19 @@ pub(crate) fn parse_sse(
output: ev.output_index, output: ev.output_index,
content: ev.content_index, content: ev.content_index,
}) { }) {
Ok(vec![Event::BlockStop(BlockStop { if matches!(ev.part, ContentPart::ReasoningText { .. })
index: info.flat_index, || info.block_type == BlockType::Thinking
block_type: info.block_type, {
stop_reason: None, state.defer_reasoning_stop(ev.output_index, info);
})]) Ok(Vec::new())
} else {
Ok(vec![Event::BlockStop(BlockStop {
index: info.flat_index,
block_type: info.block_type,
stop_reason: None,
reasoning: None,
})])
}
} else { } else {
Ok(Vec::new()) Ok(Vec::new())
} }
@ -531,6 +610,7 @@ pub(crate) fn parse_sse(
index: info.flat_index, index: info.flat_index,
block_type: info.block_type, block_type: info.block_type,
stop_reason: None, stop_reason: None,
reasoning: None,
})]) })])
} else { } else {
Ok(Vec::new()) Ok(Vec::new())
@ -1116,9 +1196,9 @@ mod tests {
} }
#[test] #[test]
fn reasoning_output_item_emits_reasoning_item_with_text_summary_encrypted() { fn reasoning_output_item_completes_metadata_thinking_block_with_text_summary_encrypted() {
// 完成済み reasoning wrapper が text + summary[] + encrypted_content を持って // 完成済み reasoning wrapper が text + summary[] + encrypted_content を持って
// ReasoningItem として届くこと。 // Thinking BlockStop metadata として届くこと。
let mut state = OpenAIResponsesState::default(); let mut state = OpenAIResponsesState::default();
// wrapper added (id だけ持つ) // wrapper added (id だけ持つ)
@ -1128,11 +1208,15 @@ mod tests {
r#"{"output_index":0,"item":{"type":"reasoning","id":"r1"}}"#, r#"{"output_index":0,"item":{"type":"reasoning","id":"r1"}}"#,
); );
// 内側の reasoning_text 用 content_part // 内側の reasoning_text 用 content_part
with( let start = with(
&mut state, &mut state,
"response.content_part.added", "response.content_part.added",
r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":""}}"#, r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":""}}"#,
); );
let start_index = match start.as_slice() {
[Event::BlockStart(start)] => start.index,
other => panic!("expected one BlockStart, got {other:?}"),
};
with( with(
&mut state, &mut state,
"response.reasoning_text.delta", "response.reasoning_text.delta",
@ -1143,11 +1227,12 @@ mod tests {
"response.reasoning_text.delta", "response.reasoning_text.delta",
r#"{"output_index":0,"content_index":0,"item_id":"r1","delta":"world"}"#, r#"{"output_index":0,"content_index":0,"item_id":"r1","delta":"world"}"#,
); );
with( let part_done = with(
&mut state, &mut state,
"response.content_part.done", "response.content_part.done",
r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":"hello world"}}"#, r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":"hello world"}}"#,
); );
assert!(part_done.is_empty());
// summary 1 件 // summary 1 件
with( with(
&mut state, &mut state,
@ -1172,11 +1257,13 @@ mod tests {
r#"{"output_index":0,"item":{"type":"reasoning","id":"r1","encrypted_content":"ENC-XYZ"}}"#, r#"{"output_index":0,"item":{"type":"reasoning","id":"r1","encrypted_content":"ENC-XYZ"}}"#,
); );
assert_eq!(evs.len(), 1); assert_eq!(evs.len(), 1);
let Event::ReasoningItem(reasoning) = &evs[0] else { let Event::BlockStop(stop) = &evs[0] else {
panic!("expected ReasoningItem, got {:?}", evs[0]); panic!("expected BlockStop, got {:?}", evs[0]);
}; };
assert_eq!(stop.index, start_index);
let reasoning = stop.reasoning.as_ref().expect("reasoning metadata");
assert_eq!(reasoning.id.as_deref(), Some("r1")); assert_eq!(reasoning.id.as_deref(), Some("r1"));
assert_eq!(reasoning.text, "hello world"); assert_eq!(reasoning.text.as_deref(), Some("hello world"));
assert_eq!(reasoning.summary, vec!["sum-A".to_string()]); assert_eq!(reasoning.summary, vec!["sum-A".to_string()]);
assert_eq!(reasoning.encrypted_content.as_deref(), Some("ENC-XYZ")); assert_eq!(reasoning.encrypted_content.as_deref(), Some("ENC-XYZ"));
assert!(reasoning.signature.is_none()); assert!(reasoning.signature.is_none());
@ -1184,10 +1271,94 @@ mod tests {
assert!(state.pending_reasoning.is_empty()); assert!(state.pending_reasoning.is_empty());
} }
#[test]
fn reasoning_text_done_then_output_done_emits_single_existing_block_stop() {
let mut state = OpenAIResponsesState::default();
with(
&mut state,
"response.output_item.added",
r#"{"output_index":0,"item":{"type":"reasoning","id":"r1"}}"#,
);
let mut lifecycle = Vec::new();
lifecycle.extend(with(
&mut state,
"response.content_part.added",
r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":""}}"#,
));
lifecycle.extend(with(
&mut state,
"response.reasoning_text.delta",
r#"{"output_index":0,"content_index":0,"item_id":"r1","delta":"think"}"#,
));
lifecycle.extend(with(
&mut state,
"response.content_part.done",
r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":"think"}}"#,
));
lifecycle.extend(with(
&mut state,
"response.output_item.done",
r#"{"output_index":0,"item":{"type":"reasoning","id":"r1","encrypted_content":"ENC"}}"#,
));
let starts: Vec<_> = lifecycle
.iter()
.filter_map(|event| match event {
Event::BlockStart(start) => Some(start.index),
_ => None,
})
.collect();
let stops: Vec<_> = lifecycle
.iter()
.filter_map(|event| match event {
Event::BlockStop(stop) => Some(stop),
_ => None,
})
.collect();
assert_eq!(starts.len(), 1, "no synthetic second Thinking start");
assert_eq!(stops.len(), 1, "no duplicate empty Thinking stop");
assert_eq!(stops[0].index, starts[0]);
let reasoning = stops[0].reasoning.as_ref().expect("reasoning metadata");
assert_eq!(reasoning.text.as_deref(), Some("think"));
assert_eq!(reasoning.encrypted_content.as_deref(), Some("ENC"));
struct StopRecorder(std::sync::Arc<std::sync::Mutex<Vec<(usize, String, bool)>>>);
impl crate::handler::Handler<crate::handler::ThinkingBlockKind> for StopRecorder {
type Scope = String;
fn on_event(
&mut self,
scope: &mut Self::Scope,
event: &crate::handler::ThinkingBlockEvent,
) {
match event {
crate::handler::ThinkingBlockEvent::Start(_) => scope.clear(),
crate::handler::ThinkingBlockEvent::Delta(delta) => scope.push_str(delta),
crate::handler::ThinkingBlockEvent::Stop(stop) => self
.0
.lock()
.unwrap()
.push((stop.index, scope.clone(), stop.reasoning.is_some())),
}
}
}
let stops = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let mut timeline = crate::timeline::Timeline::new();
timeline.on_thinking_block(StopRecorder(stops.clone()));
for event in &lifecycle {
timeline.dispatch(event);
}
let stops = stops.lock().unwrap().clone();
assert_eq!(stops, vec![(starts[0], "think".to_string(), true)]);
}
#[test] #[test]
fn reasoning_wrapper_without_inner_content_emits_empty_text() { fn reasoning_wrapper_without_inner_content_emits_empty_text() {
// encrypted_content だけ届くreasoning_text 無し)ケースでも // encrypted_content だけ届くreasoning_text 無し)ケースでも
// ReasoningItem は発火する。 // reasoning metadata は届く
let mut state = OpenAIResponsesState::default(); let mut state = OpenAIResponsesState::default();
with( with(
&mut state, &mut state,
@ -1199,10 +1370,13 @@ mod tests {
"response.output_item.done", "response.output_item.done",
r#"{"output_index":2,"item":{"type":"reasoning","id":"r9","encrypted_content":"BLOB"}}"#, r#"{"output_index":2,"item":{"type":"reasoning","id":"r9","encrypted_content":"BLOB"}}"#,
); );
let Event::ReasoningItem(r) = &evs[0] else { assert_eq!(evs.len(), 2);
panic!() assert!(matches!(evs[0], Event::BlockStart(_)));
let Event::BlockStop(stop) = &evs[1] else {
panic!("expected BlockStop")
}; };
assert!(r.text.is_empty()); let r = stop.reasoning.as_ref().expect("reasoning metadata");
assert_eq!(r.text.as_deref(), Some(""));
assert!(r.summary.is_empty()); assert!(r.summary.is_empty());
assert_eq!(r.encrypted_content.as_deref(), Some("BLOB")); assert_eq!(r.encrypted_content.as_deref(), Some("BLOB"));
} }

View File

@ -7,18 +7,19 @@
//! - [`Timeline`] - イベントストリームの管理とディスパッチ //! - [`Timeline`] - イベントストリームの管理とディスパッチ
//! - [`Handler`] - イベントを処理するトレイト //! - [`Handler`] - イベントを処理するトレイト
//! - [`TextBlockCollector`] - テキストブロックを収集するHandler //! - [`TextBlockCollector`] - テキストブロックを収集するHandler
//! - [`ThinkingBlockCollector`] - reasoning material を Thinking block から収集するHandler
//! - [`ToolCallCollector`] - ツール呼び出しを収集するHandler //! - [`ToolCallCollector`] - ツール呼び出しを収集するHandler
pub mod event; pub mod event;
mod reasoning_item_collector;
mod text_block_collector; mod text_block_collector;
mod thinking_block_collector;
mod timeline; mod timeline;
mod tool_call_collector; mod tool_call_collector;
// 公開API // 公開API
pub use event::*; pub use event::*;
pub use reasoning_item_collector::ReasoningItemCollector;
pub use text_block_collector::TextBlockCollector; pub use text_block_collector::TextBlockCollector;
pub use thinking_block_collector::ThinkingBlockCollector;
pub use timeline::Timeline; pub use timeline::Timeline;
pub use tool_call_collector::ToolCallCollector; pub use tool_call_collector::ToolCallCollector;
@ -30,7 +31,6 @@ pub use crate::handler::{
Handler, Handler,
Kind, Kind,
PingKind, PingKind,
ReasoningItemKind,
StatusKind, StatusKind,
// Block Events // Block Events
TextBlockEvent, TextBlockEvent,

View File

@ -1,77 +0,0 @@
//! `ReasoningItemCollector` - 完成済み reasoning item を収集する Handler
//!
//! Timeline の `ReasoningItemKind` Handler として登録し、scheme 側が
//! `Event::ReasoningItem` を発火するたびに 1 件ずつバッファに溜める。
//! Worker はターン終了時に `take_collected()` でドレインして
//! `Item::Reasoning` として `worker.history` に append する。
use std::sync::{Arc, Mutex};
use crate::handler::{Handler, ReasoningItemKind};
use crate::llm_client::event::ReasoningItemEvent;
/// 収集された reasoning item の連列。
#[derive(Clone, Default)]
pub struct ReasoningItemCollector {
collected: Arc<Mutex<Vec<ReasoningItemEvent>>>,
}
impl ReasoningItemCollector {
pub fn new() -> Self {
Self::default()
}
/// 収集済み item を取り出してクリア
pub fn take_collected(&self) -> Vec<ReasoningItemEvent> {
let mut guard = self.collected.lock().unwrap();
std::mem::take(&mut *guard)
}
/// 収集をクリア
pub fn clear(&self) {
self.collected.lock().unwrap().clear();
}
}
impl Handler<ReasoningItemKind> for ReasoningItemCollector {
type Scope = ();
fn on_event(&mut self, _scope: &mut Self::Scope, event: &ReasoningItemEvent) {
self.collected.lock().unwrap().push(event.clone());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::llm_client::event::Event;
use crate::timeline::Timeline;
#[test]
fn collects_in_order() {
let collector = ReasoningItemCollector::new();
let mut timeline = Timeline::new();
timeline.on_reasoning_item(collector.clone());
timeline.dispatch(&Event::ReasoningItem(ReasoningItemEvent {
id: Some("r1".into()),
text: "first".into(),
signature: Some("sig1".into()),
..Default::default()
}));
timeline.dispatch(&Event::ReasoningItem(ReasoningItemEvent {
id: Some("r2".into()),
text: "second".into(),
..Default::default()
}));
let items = collector.take_collected();
assert_eq!(items.len(), 2);
assert_eq!(items[0].text, "first");
assert_eq!(items[0].signature.as_deref(), Some("sig1"));
assert_eq!(items[1].text, "second");
// take は drain なので 2 度目は空
assert!(collector.take_collected().is_empty());
}
}

View File

@ -0,0 +1,138 @@
//! `ThinkingBlockCollector` - reasoning material from Thinking block lifecycle.
//!
//! Scheme implementations emit Thinking BlockStart/Delta/Stop events for live
//! streaming. A Thinking block stop with `ReasoningBlockData` is also the single
//! authoritative persistence signal for `Item::Reasoning` round-trip material.
use std::sync::{Arc, Mutex};
use crate::handler::{Handler, ThinkingBlockEvent, ThinkingBlockKind};
use crate::llm_client::event::ReasoningBlockData;
/// Reasoning material collected from completed Thinking blocks.
#[derive(Clone, Default)]
pub struct ThinkingBlockCollector {
collected: Arc<Mutex<Vec<ReasoningBlockData>>>,
}
impl ThinkingBlockCollector {
pub fn new() -> Self {
Self::default()
}
/// 収集済み item を取り出してクリア
pub fn take_collected(&self) -> Vec<ReasoningBlockData> {
let mut guard = self.collected.lock().unwrap();
std::mem::take(&mut *guard)
}
/// 収集をクリア
pub fn clear(&self) {
self.collected.lock().unwrap().clear();
}
}
impl Handler<ThinkingBlockKind> for ThinkingBlockCollector {
type Scope = String;
fn on_event(&mut self, scope: &mut Self::Scope, event: &ThinkingBlockEvent) {
match event {
ThinkingBlockEvent::Start(_) => scope.clear(),
ThinkingBlockEvent::Delta(text) => scope.push_str(text),
ThinkingBlockEvent::Stop(stop) => {
if let Some(mut reasoning) = stop.reasoning.clone() {
if reasoning.text.is_none() {
reasoning.text = Some(scope.clone());
}
self.collected.lock().unwrap().push(reasoning);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::llm_client::event::{
BlockMetadata, BlockStart, BlockStop, BlockType, DeltaContent, Event, ReasoningBlockData,
};
use crate::timeline::Timeline;
#[test]
fn collects_in_order_from_thinking_block_stops() {
let collector = ThinkingBlockCollector::new();
let mut timeline = Timeline::new();
timeline.on_thinking_block(collector.clone());
timeline.dispatch(&Event::BlockStart(BlockStart {
index: 0,
block_type: BlockType::Thinking,
metadata: BlockMetadata::Thinking,
}));
timeline.dispatch(&Event::BlockDelta(crate::llm_client::event::BlockDelta {
index: 0,
delta: DeltaContent::Thinking("first".into()),
}));
timeline.dispatch(&Event::BlockStop(BlockStop {
index: 0,
block_type: BlockType::Thinking,
stop_reason: None,
reasoning: Some(ReasoningBlockData {
id: Some("r1".into()),
signature: Some("sig1".into()),
..Default::default()
}),
}));
timeline.dispatch(&Event::BlockStart(BlockStart {
index: 1,
block_type: BlockType::Thinking,
metadata: BlockMetadata::Thinking,
}));
timeline.dispatch(&Event::BlockStop(BlockStop {
index: 1,
block_type: BlockType::Thinking,
stop_reason: None,
reasoning: Some(ReasoningBlockData {
id: Some("r2".into()),
text: Some("second".into()),
..Default::default()
}),
}));
let items = collector.take_collected();
assert_eq!(items.len(), 2);
assert_eq!(items[0].text.as_deref(), Some("first"));
assert_eq!(items[0].signature.as_deref(), Some("sig1"));
assert_eq!(items[1].text.as_deref(), Some("second"));
// take は drain なので 2 度目は空
assert!(collector.take_collected().is_empty());
}
#[test]
fn ignores_streaming_only_thinking_blocks() {
let collector = ThinkingBlockCollector::new();
let mut timeline = Timeline::new();
timeline.on_thinking_block(collector.clone());
timeline.dispatch(&Event::BlockStart(BlockStart {
index: 0,
block_type: BlockType::Thinking,
metadata: BlockMetadata::Thinking,
}));
timeline.dispatch(&Event::BlockDelta(crate::llm_client::event::BlockDelta {
index: 0,
delta: DeltaContent::Thinking("live only".into()),
}));
timeline.dispatch(&Event::BlockStop(BlockStop {
index: 0,
block_type: BlockType::Thinking,
stop_reason: None,
reasoning: None,
}));
assert!(collector.take_collected().is_empty());
}
}

View File

@ -3,7 +3,7 @@
//! LLMからのイベントストリームを受信し、登録されたHandlerにディスパッチします。 //! LLMからのイベントストリームを受信し、登録されたHandlerにディスパッチします。
//! 通常はWorker経由で使用しますが、直接使用することも可能です。 //! 通常はWorker経由で使用しますが、直接使用することも可能です。
use std::marker::PhantomData; use std::{collections::HashMap, marker::PhantomData};
use super::event::*; use super::event::*;
use crate::handler::*; use crate::handler::*;
@ -189,7 +189,7 @@ where
H: Handler<ThinkingBlockKind>, H: Handler<ThinkingBlockKind>,
{ {
handler: H, handler: H,
scope: Option<H::Scope>, scopes: HashMap<usize, H::Scope>,
} }
impl<H> ThinkingBlockHandlerWrapper<H> impl<H> ThinkingBlockHandlerWrapper<H>
@ -199,7 +199,7 @@ where
fn new(handler: H) -> Self { fn new(handler: H) -> Self {
Self { Self {
handler, handler,
scope: None, scopes: HashMap::new(),
} }
} }
} }
@ -210,44 +210,43 @@ where
H::Scope: Send + Sync, H::Scope: Send + Sync,
{ {
fn dispatch_start(&mut self, start: &BlockStart) { fn dispatch_start(&mut self, start: &BlockStart) {
if let Some(scope) = &mut self.scope { let scope = self.scopes.entry(start.index).or_default();
self.handler.on_event( self.handler.on_event(
scope, scope,
&ThinkingBlockEvent::Start(ThinkingBlockStart { index: start.index }), &ThinkingBlockEvent::Start(ThinkingBlockStart { index: start.index }),
); );
}
} }
fn dispatch_delta(&mut self, delta: &BlockDelta) { fn dispatch_delta(&mut self, delta: &BlockDelta) {
if let Some(scope) = &mut self.scope { if let DeltaContent::Thinking(text) = &delta.delta {
if let DeltaContent::Thinking(text) = &delta.delta { let scope = self.scopes.entry(delta.index).or_default();
self.handler self.handler
.on_event(scope, &ThinkingBlockEvent::Delta(text.clone())); .on_event(scope, &ThinkingBlockEvent::Delta(text.clone()));
}
} }
} }
fn dispatch_stop(&mut self, stop: &BlockStop) { fn dispatch_stop(&mut self, stop: &BlockStop) {
if let Some(scope) = &mut self.scope { if let Some(mut scope) = self.scopes.remove(&stop.index) {
self.handler.on_event( self.handler.on_event(
scope, &mut scope,
&ThinkingBlockEvent::Stop(ThinkingBlockStop { index: stop.index }), &ThinkingBlockEvent::Stop(ThinkingBlockStop {
index: stop.index,
reasoning: stop.reasoning.clone(),
}),
); );
} }
} }
fn dispatch_abort(&mut self, _abort: &BlockAbort) {} fn dispatch_abort(&mut self, _abort: &BlockAbort) {
self.scopes.clear();
fn start_scope(&mut self) {
self.scope = Some(H::Scope::default());
} }
fn end_scope(&mut self) { fn start_scope(&mut self) {}
self.scope = None;
} fn end_scope(&mut self) {}
fn has_scope(&self) -> bool { fn has_scope(&self) -> bool {
self.scope.is_some() !self.scopes.is_empty()
} }
} }
@ -375,8 +374,6 @@ pub struct Timeline {
ping_handlers: Vec<Box<dyn ErasedHandler<PingKind>>>, ping_handlers: Vec<Box<dyn ErasedHandler<PingKind>>>,
status_handlers: Vec<Box<dyn ErasedHandler<StatusKind>>>, status_handlers: Vec<Box<dyn ErasedHandler<StatusKind>>>,
error_handlers: Vec<Box<dyn ErasedHandler<ErrorKind>>>, error_handlers: Vec<Box<dyn ErasedHandler<ErrorKind>>>,
reasoning_item_handlers: Vec<Box<dyn ErasedHandler<ReasoningItemKind>>>,
// Block系ハンドラーBlockTypeごとにグループ化 // Block系ハンドラーBlockTypeごとにグループ化
text_block_handlers: Vec<Box<dyn ErasedBlockHandler>>, text_block_handlers: Vec<Box<dyn ErasedBlockHandler>>,
thinking_block_handlers: Vec<Box<dyn ErasedBlockHandler>>, thinking_block_handlers: Vec<Box<dyn ErasedBlockHandler>>,
@ -405,7 +402,6 @@ impl Timeline {
ping_handlers: Vec::new(), ping_handlers: Vec::new(),
status_handlers: Vec::new(), status_handlers: Vec::new(),
error_handlers: Vec::new(), error_handlers: Vec::new(),
reasoning_item_handlers: Vec::new(),
text_block_handlers: Vec::new(), text_block_handlers: Vec::new(),
thinking_block_handlers: Vec::new(), thinking_block_handlers: Vec::new(),
tool_use_block_handlers: Vec::new(), tool_use_block_handlers: Vec::new(),
@ -467,18 +463,6 @@ impl Timeline {
self self
} }
/// `ReasoningItemKind` 用 Handler を登録
pub fn on_reasoning_item<H>(&mut self, handler: H) -> &mut Self
where
H: Handler<ReasoningItemKind> + Send + Sync + 'static,
H::Scope: Send + Sync,
{
let mut wrapper = HandlerWrapper::new(handler);
wrapper.start_scope();
self.reasoning_item_handlers.push(Box::new(wrapper));
self
}
/// TextBlockKind用のHandlerを登録 /// TextBlockKind用のHandlerを登録
pub fn on_text_block<H>(&mut self, handler: H) -> &mut Self pub fn on_text_block<H>(&mut self, handler: H) -> &mut Self
where where
@ -532,9 +516,6 @@ impl Timeline {
Event::BlockDelta(d) => self.handle_block_delta(d), Event::BlockDelta(d) => self.handle_block_delta(d),
Event::BlockStop(s) => self.handle_block_stop(s), Event::BlockStop(s) => self.handle_block_stop(s),
Event::BlockAbort(a) => self.handle_block_abort(a), Event::BlockAbort(a) => self.handle_block_abort(a),
// 完成済み reasoning item: 即時ディスパッチ
Event::ReasoningItem(r) => self.dispatch_reasoning_item(r),
} }
} }
@ -577,12 +558,6 @@ impl Timeline {
} }
} }
fn dispatch_reasoning_item(&mut self, event: &ReasoningItemEvent) {
for handler in &mut self.reasoning_item_handlers {
handler.dispatch(event);
}
}
fn handle_block_start(&mut self, start: &BlockStart) { fn handle_block_start(&mut self, start: &BlockStart) {
self.current_block = Some(start.block_type); self.current_block = Some(start.block_type);

View File

@ -24,7 +24,7 @@ use crate::{
}, },
state::{Locked, Mutable, WorkerState}, state::{Locked, Mutable, WorkerState},
timeline::event::{ErrorEvent, StatusEvent, UsageEvent}, timeline::event::{ErrorEvent, StatusEvent, UsageEvent},
timeline::{ReasoningItemCollector, TextBlockCollector, Timeline, ToolCallCollector}, timeline::{TextBlockCollector, ThinkingBlockCollector, Timeline, ToolCallCollector},
tool::{ tool::{
ToolCall, ToolDefinition as WorkerToolDefinition, ToolError, ToolOutputLimits, ToolResult, ToolCall, ToolDefinition as WorkerToolDefinition, ToolError, ToolOutputLimits, ToolResult,
truncate_content, truncate_content,
@ -163,9 +163,9 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
text_block_collector: TextBlockCollector, text_block_collector: TextBlockCollector,
/// Tool call collector (Timeline handler) /// Tool call collector (Timeline handler)
tool_call_collector: ToolCallCollector, tool_call_collector: ToolCallCollector,
/// Reasoning item collector (Timeline handler)。完成済み reasoning /// Thinking block collector (Timeline handler)。metadata 付きで完了した
/// item を 1 ターン分バッファし、history に append する。 /// Thinking block を 1 ターン分バッファし、history に append する。
reasoning_item_collector: ReasoningItemCollector, thinking_block_collector: ThinkingBlockCollector,
/// Tool server handle /// Tool server handle
tool_server: ToolServerHandle, tool_server: ToolServerHandle,
/// Interceptor for control-flow decisions /// Interceptor for control-flow decisions
@ -771,14 +771,14 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
/// 置くことを要求するためでもある。 /// 置くことを要求するためでもある。
fn build_assistant_items( fn build_assistant_items(
&self, &self,
reasoning_items: &[crate::llm_client::event::ReasoningItemEvent], reasoning_items: &[crate::llm_client::event::ReasoningBlockData],
text_blocks: &[String], text_blocks: &[String],
tool_calls: &[ToolCall], tool_calls: &[ToolCall],
) -> Vec<Item> { ) -> Vec<Item> {
let mut items = Vec::new(); let mut items = Vec::new();
for r in reasoning_items { for r in reasoning_items {
let mut item = Item::reasoning(r.text.clone()); let mut item = Item::reasoning(r.text.clone().unwrap_or_default());
if let Some(id) = &r.id { if let Some(id) = &r.id {
item = item.with_id(id); item = item.with_id(id);
} }
@ -1238,7 +1238,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
self.timeline.abort_current_block(); self.timeline.abort_current_block();
self.timeline.flush_usage(); self.timeline.flush_usage();
let reasoning_items = self.reasoning_item_collector.take_collected(); let reasoning_items = self.thinking_block_collector.take_collected();
let text_blocks = self.text_block_collector.take_collected(); let text_blocks = self.text_block_collector.take_collected();
// Do not recover tool calls from an interrupted stream. A completed // Do not recover tool calls from an interrupted stream. A completed
// tool_use is executable only when the provider finishes the stream. // tool_use is executable only when the provider finishes the stream.
@ -1270,7 +1270,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
// `append_history_items` so observers (e.g. the // `append_history_items` so observers (e.g. the
// Pod-side per-item session-log committer) see each item // Pod-side per-item session-log committer) see each item
// as it lands. // as it lands.
let reasoning_items = self.reasoning_item_collector.take_collected(); let reasoning_items = self.thinking_block_collector.take_collected();
let text_blocks = self.text_block_collector.take_collected(); let text_blocks = self.text_block_collector.take_collected();
let tool_calls = self.tool_call_collector.take_collected(); let tool_calls = self.tool_call_collector.take_collected();
let assistant_items = let assistant_items =
@ -1595,14 +1595,14 @@ impl<C: LlmClient> Worker<C, Mutable> {
pub fn new(client: C) -> Self { pub fn new(client: C) -> Self {
let text_block_collector = TextBlockCollector::new(); let text_block_collector = TextBlockCollector::new();
let tool_call_collector = ToolCallCollector::new(); let tool_call_collector = ToolCallCollector::new();
let reasoning_item_collector = ReasoningItemCollector::new(); let thinking_block_collector = ThinkingBlockCollector::new();
let mut timeline = Timeline::new(); let mut timeline = Timeline::new();
let (cancel_tx, cancel_rx) = mpsc::channel(1); let (cancel_tx, cancel_rx) = mpsc::channel(1);
// Register collectors with Timeline // Register collectors with Timeline
timeline.on_text_block(text_block_collector.clone()); timeline.on_text_block(text_block_collector.clone());
timeline.on_tool_use_block(tool_call_collector.clone()); timeline.on_tool_use_block(tool_call_collector.clone());
timeline.on_reasoning_item(reasoning_item_collector.clone()); timeline.on_thinking_block(thinking_block_collector.clone());
Self { Self {
client, client,
@ -1610,7 +1610,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
timeline, timeline,
text_block_collector, text_block_collector,
tool_call_collector, tool_call_collector,
reasoning_item_collector, thinking_block_collector,
tool_server: ToolServer::new().handle(), tool_server: ToolServer::new().handle(),
interceptor: Box::new(DefaultInterceptor), interceptor: Box::new(DefaultInterceptor),
system_prompt: None, system_prompt: None,
@ -1874,7 +1874,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
timeline: self.timeline, timeline: self.timeline,
text_block_collector: self.text_block_collector, text_block_collector: self.text_block_collector,
tool_call_collector: self.tool_call_collector, tool_call_collector: self.tool_call_collector,
reasoning_item_collector: self.reasoning_item_collector, thinking_block_collector: self.thinking_block_collector,
tool_server: self.tool_server, tool_server: self.tool_server,
interceptor: self.interceptor, interceptor: self.interceptor,
system_prompt: self.system_prompt, system_prompt: self.system_prompt,
@ -1966,7 +1966,7 @@ impl<C: LlmClient> Worker<C, Locked> {
timeline: self.timeline, timeline: self.timeline,
text_block_collector: self.text_block_collector, text_block_collector: self.text_block_collector,
tool_call_collector: self.tool_call_collector, tool_call_collector: self.tool_call_collector,
reasoning_item_collector: self.reasoning_item_collector, thinking_block_collector: self.thinking_block_collector,
tool_server: self.tool_server, tool_server: self.tool_server,
interceptor: self.interceptor, interceptor: self.interceptor,
system_prompt: self.system_prompt, system_prompt: self.system_prompt,

View File

@ -16,27 +16,53 @@ mod common;
use common::MockLlmClient; use common::MockLlmClient;
use llm_worker::Item; use llm_worker::Item;
use llm_worker::Worker; use llm_worker::Worker;
use llm_worker::llm_client::event::{Event, ReasoningItemEvent, ResponseStatus, StatusEvent}; use llm_worker::llm_client::event::{
BlockMetadata, BlockStart, BlockStop, BlockType, Event, ReasoningBlockData, ResponseStatus,
StatusEvent,
};
fn reasoning_block(text: impl Into<String>, data: ReasoningBlockData) -> Vec<Event> {
vec![
Event::BlockStart(BlockStart {
index: 100,
block_type: BlockType::Thinking,
metadata: BlockMetadata::Thinking,
}),
Event::BlockDelta(llm_worker::llm_client::event::BlockDelta {
index: 100,
delta: llm_worker::llm_client::event::DeltaContent::Thinking(text.into()),
}),
Event::BlockStop(BlockStop {
index: 100,
block_type: BlockType::Thinking,
stop_reason: None,
reasoning: Some(data),
}),
]
}
/// Anthropic 風: thinking ブロック → text → 終了 のシーケンス。 /// Anthropic 風: thinking ブロック → text → 終了 のシーケンス。
/// Worker history に Reasoning(signature 付き) → assistant_message が並ぶ。 /// Worker history に Reasoning(signature 付き) → assistant_message が並ぶ。
#[tokio::test] #[tokio::test]
async fn anthropic_thinking_round_trips_signature_into_history() { async fn anthropic_thinking_round_trips_signature_into_history() {
let events = vec![ let mut events = reasoning_block(
Event::ReasoningItem(ReasoningItemEvent { "let me think...",
ReasoningBlockData {
id: None, id: None,
text: "let me think...".into(), text: None,
summary: Vec::new(), summary: Vec::new(),
encrypted_content: None, encrypted_content: None,
signature: Some("SIG-OPUS".into()), signature: Some("SIG-OPUS".into()),
}), },
);
events.extend([
Event::text_block_start(0), Event::text_block_start(0),
Event::text_delta(0, "Here's the answer"), Event::text_delta(0, "Here's the answer"),
Event::text_block_stop(0, None), Event::text_block_stop(0, None),
Event::Status(StatusEvent { Event::Status(StatusEvent {
status: ResponseStatus::Completed, status: ResponseStatus::Completed,
}), }),
]; ]);
let client = MockLlmClient::new(events); let client = MockLlmClient::new(events);
let worker = Worker::new(client); let worker = Worker::new(client);
let out = worker.run("question?").await.expect("run ok"); let out = worker.run("question?").await.expect("run ok");
@ -63,21 +89,24 @@ async fn anthropic_thinking_round_trips_signature_into_history() {
/// `Item::Reasoning` のフィールドに展開されること。 /// `Item::Reasoning` のフィールドに展開されること。
#[tokio::test] #[tokio::test]
async fn openai_reasoning_round_trips_encrypted_and_summary() { async fn openai_reasoning_round_trips_encrypted_and_summary() {
let events = vec![ let mut events = reasoning_block(
Event::ReasoningItem(ReasoningItemEvent { "",
ReasoningBlockData {
id: Some("r1".into()), id: Some("r1".into()),
text: "inner reasoning".into(), text: Some("inner reasoning".into()),
summary: vec!["sum-A".into(), "sum-B".into()], summary: vec!["sum-A".into(), "sum-B".into()],
encrypted_content: Some("ENC-OPAQUE".into()), encrypted_content: Some("ENC-OPAQUE".into()),
signature: None, signature: None,
}), },
);
events.extend([
Event::text_block_start(0), Event::text_block_start(0),
Event::text_delta(0, "answer"), Event::text_delta(0, "answer"),
Event::text_block_stop(0, None), Event::text_block_stop(0, None),
Event::Status(StatusEvent { Event::Status(StatusEvent {
status: ResponseStatus::Completed, status: ResponseStatus::Completed,
}), }),
]; ]);
let client = MockLlmClient::new(events); let client = MockLlmClient::new(events);
let worker = Worker::new(client); let worker = Worker::new(client);
let out = worker.run("q").await.expect("run ok"); let out = worker.run("q").await.expect("run ok");
@ -107,21 +136,23 @@ async fn openai_reasoning_round_trips_encrypted_and_summary() {
/// が thinking を assistant メッセージの先頭に要求するため)。 /// が thinking を assistant メッセージの先頭に要求するため)。
#[tokio::test] #[tokio::test]
async fn reasoning_precedes_text_in_assistant_burst() { async fn reasoning_precedes_text_in_assistant_burst() {
let events = vec![ let mut events = vec![
// text/tool_call とは独立に、ReasoningItem が中盤で発火しても、 // text/tool_call とは独立に、reasoning block が中盤で完了しても、
// history append 時には assistant items の先頭に置かれる。 // history append 時には assistant items の先頭に置かれる。
Event::text_block_start(0), Event::text_block_start(0),
Event::text_delta(0, "intermediate"), Event::text_delta(0, "intermediate"),
Event::text_block_stop(0, None), Event::text_block_stop(0, None),
Event::ReasoningItem(ReasoningItemEvent { ];
text: "after text".into(), events.extend(reasoning_block(
"after text",
ReasoningBlockData {
signature: Some("SIG".into()), signature: Some("SIG".into()),
..Default::default() ..Default::default()
}), },
Event::Status(StatusEvent { ));
status: ResponseStatus::Completed, events.push(Event::Status(StatusEvent {
}), status: ResponseStatus::Completed,
]; }));
let client = MockLlmClient::new(events); let client = MockLlmClient::new(events);
let worker = Worker::new(client); let worker = Worker::new(client);
let out = worker.run("q").await.expect("run ok"); let out = worker.run("q").await.expect("run ok");

View File

@ -0,0 +1,41 @@
# Implementation report: reasoning block lifecycle
## Investigation
Initial implementation unified reasoning persistence through `BlockStop.reasoning`, but OpenAI Responses text-bearing reasoning items still had two Thinking lifecycles:
1. `response.reasoning_text.delta` streamed through the real reasoning content-part block.
2. `response.content_part.done` stopped that block with no persistence metadata.
3. `response.output_item.done` emitted a second synthetic metadata-only Thinking `BlockStart`/`BlockStop` pair.
That preserved persistence but changed live callback semantics: UI/trace consumers that listen to Thinking block stop callbacks could observe an extra empty Thinking stop after the real streamed reasoning block.
## Fix summary
OpenAI Responses now defers the stop for reasoning `content_part.done` when the part is a Thinking/reasoning-text content block. At `response.output_item.done`, the provider finalizes the deferred existing block with `ReasoningBlockData` instead of creating a second synthetic live-visible block.
Thinking block handler scopes are also keyed by block index, so a deferred reasoning-text stop still uses its original streamed buffer even if another Thinking block (for example a reasoning summary block) starts and stops before `output_item.done`.
Metadata-only reasoning items with no reasoning content-part still emit a synthetic metadata-bearing Thinking block so encrypted/id-only reasoning can be persisted and round-tripped.
The fix preserves:
- live `reasoning_text.delta` Thinking deltas;
- OpenAI Responses `id`, `summary`, and `encrypted_content` persistence;
- a single Thinking lifecycle for text-bearing reasoning items;
- metadata-only reasoning coverage.
## Validation
Passed:
- `cargo test -p llm-worker openai_responses::events::tests::reasoning --lib`
- `cargo test -p llm-worker --lib`
- `cargo check --workspace --all-targets`
- `./tickets.sh doctor`
- `git diff --check`
- `nix build .#yoi`
## Residual risk
The provider delays the stop event for OpenAI Responses reasoning text blocks until `response.output_item.done` so final encrypted/summary metadata can be attached to the same block. This avoids duplicate live stops but means the block stop is slightly later than the raw `content_part.done` SSE for reasoning text. This is intentional for the unified persistence model and covered by focused provider tests for the reviewed sequence.