diff --git a/crates/llm-worker/src/handler.rs b/crates/llm-worker/src/handler.rs index adb7ec4e..b6928800 100644 --- a/crates/llm-worker/src/handler.rs +++ b/crates/llm-worker/src/handler.rs @@ -91,16 +91,6 @@ impl Kind for ErrorKind { type Event = ErrorEvent; } -/// Reasoning item Kind - 完成済み reasoning item の永続化用 -/// -/// 1 reasoning item につき 1 度だけ発火する。Worker は -/// `ReasoningItemCollector` 経由で受け取り、ターン終了時に -/// `Item::Reasoning` として history に append する。 -pub struct ReasoningItemKind; -impl Kind for ReasoningItemKind { - type Event = ReasoningItemEvent; -} - // ============================================================================= // Block Kind Definitions // ============================================================================= @@ -152,6 +142,7 @@ pub struct ThinkingBlockStart { #[derive(Debug, Clone, PartialEq)] pub struct ThinkingBlockStop { pub index: usize, + pub reasoning: Option, } /// ToolUseBlock Kind - for tool use blocks diff --git a/crates/llm-worker/src/llm_client/event.rs b/crates/llm-worker/src/llm_client/event.rs index d38b1734..d31c035d 100644 --- a/crates/llm-worker/src/llm_client/event.rs +++ b/crates/llm-worker/src/llm_client/event.rs @@ -17,14 +17,12 @@ use serde::{Deserialize, Serialize}; /// /// - **メタイベント**: `Ping`, `Usage`, `Status`, `Error`, `UnhandledSse` /// - **ブロックイベント**: `BlockStart`, `BlockDelta`, `BlockStop`, `BlockAbort` -/// - **永続化イベント**: `ReasoningItem` (history に commit すべき完成済み -/// reasoning item。streaming 表示用の Thinking BlockStart/Delta/Stop と -/// は別経路で発火する) /// /// # ブロックのライフサイクル /// -/// テキストやツール呼び出しは、`BlockStart` → `BlockDelta`(複数) → `BlockStop` -/// の順序でイベントが発生します。 +/// テキスト、thinking、ツール呼び出しは、`BlockStart` → `BlockDelta`(複数) → `BlockStop` +/// の順序でイベントが発生します。thinking の round-trip metadata は +/// `BlockStop.reasoning` に載ります。 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum Event { /// ハートビート @@ -48,18 +46,6 @@ pub enum Event { BlockStop(BlockStop), /// ブロック中断 BlockAbort(BlockAbort), - - /// Reasoning item の完成。scheme が「次の request に送り返すための - /// reasoning material が揃った」点で 1 度だけ発火する。 - /// - /// - Anthropic: 1 つの `thinking` content_block 完了ごと - /// - OpenAI Responses: 1 つの reasoning output_item 完了ごと - /// - /// 上位層(Worker / ReasoningItemCollector)はこれを `Item::Reasoning` - /// として `worker.history` に append する。streaming 表示用の - /// `BlockStart(Thinking)` / `BlockDelta(Thinking)` / `BlockStop(Thinking)` - /// は依然として並行発火する(live display と round-trip persist の責務分離)。 - ReasoningItem(ReasoningItemEvent), } // ============================================================================= @@ -218,6 +204,12 @@ pub struct BlockStop { pub block_type: BlockType, /// 停止理由 pub stop_reason: Option, + /// Thinking block の停止時に確定した reasoning round-trip metadata。 + /// + /// `None` の Thinking block は live streaming / trace 用で、history に + /// `Item::Reasoning` として永続化しない。`Some` の場合は block lifecycle + /// が永続化の authoritative source になる。 + pub reasoning: Option, } impl BlockStop { @@ -243,22 +235,17 @@ impl BlockAbort { } } -// ============================================================================= -// Reasoning Item Event -// ============================================================================= - -/// 完成済み reasoning item。scheme が round-trip に必要なすべての -/// material(text, summary, encrypted_content, signature, id)を揃えて -/// 1 度だけ発火する。 +/// Thinking block stop で確定した reasoning material。 /// -/// `Item::Reasoning` のフィールドを 1:1 に持つ。 +/// `Item::Reasoning` の round-trip に必要な provider material を保持する。 +/// `text` は deltas から収集した本文を上書きするために使う(metadata-only +/// reasoning block や provider completion event で全文が届くケース)。 #[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)] -pub struct ReasoningItemEvent { +pub struct ReasoningBlockData { /// scheme 側で観測した item id(OpenAI Responses の `id`)。 pub id: Option, - /// reasoning 本体テキスト。Anthropic は `thinking` 累積、OpenAI は - /// `reasoning_text` 累積。redacted_thinking では空。 - pub text: String, + /// reasoning 本体テキスト。`None` の場合は block delta 収集結果を使う。 + pub text: Option, /// summary (OpenAI Responses の `summary_text[]`)。他 scheme は空。 pub summary: Vec, /// 暗号化された opaque blob(Anthropic `redacted_thinking.data` / @@ -309,6 +296,7 @@ impl Event { index, block_type: BlockType::Text, stop_reason, + reasoning: None, }) } @@ -338,6 +326,7 @@ impl Event { index, block_type: BlockType::ToolUse, stop_reason: Some(StopReason::ToolUse), + reasoning: None, }) } diff --git a/crates/llm-worker/src/llm_client/scheme/anthropic/events.rs b/crates/llm-worker/src/llm_client/scheme/anthropic/events.rs index 6a8ca7e5..bc5482fd 100644 --- a/crates/llm-worker/src/llm_client/scheme/anthropic/events.rs +++ b/crates/llm-worker/src/llm_client/scheme/anthropic/events.rs @@ -216,6 +216,7 @@ impl AnthropicScheme { index: event.index, block_type: BlockType::Text, // Timeline層で上書きされる stop_reason: None, + reasoning: None, }))) } AnthropicEventType::MessageDelta => { @@ -286,9 +287,9 @@ impl AnthropicScheme { /// `parse_event` の単発 Event に加えて、以下を行う: /// - `content_block_stop` の `block_type` を直前の Start 値で書き戻す /// - `thinking` / `redacted_thinking` ブロックの本体・signature・data を - /// `state.pending_thinking` に蓄積し、`content_block_stop` で - /// `Event::ReasoningItem` を追加発火する - /// - `signature_delta` を蓄積(Stream channel には流さず、reasoning event + /// `state.pending_thinking` に蓄積し、`content_block_stop` の Thinking + /// BlockStop metadata に載せる + /// - `signature_delta` を蓄積(Stream channel には流さず、reasoning metadata /// にだけ反映する) pub(crate) fn parse_with_state( &self, @@ -374,16 +375,21 @@ impl AnthropicScheme { AnthropicEventType::ContentBlockStop => { let raw: ContentBlockStopEvent = serde_json::from_str(data)?; let block_type = state.current_block_type.take().unwrap_or(BlockType::Text); + let reasoning = if matches!(block_type, BlockType::Thinking) { + state + .pending_thinking + .take() + .map(PendingThinking::into_reasoning) + } else { + state.pending_thinking.take(); + None + }; emitted.push(Event::BlockStop(BlockStop { index: raw.index, block_type, stop_reason: None, + reasoning, })); - if matches!(block_type, BlockType::Thinking) { - if let Some(pending) = state.pending_thinking.take() { - emitted.push(Event::ReasoningItem(pending.into_event())); - } - } } // 残りは state を必要としない。既存 parse_event に委譲。 _ => { @@ -524,8 +530,8 @@ mod tests { } #[test] - fn thinking_block_emits_reasoning_item_with_signature() { - // thinking ブロックが完了したら ReasoningItem に text+signature が乗ること + fn thinking_block_stop_carries_reasoning_with_signature() { + // thinking ブロックが完了したら reasoning metadata に text+signature が乗ること let scheme = AnthropicScheme::new(); let mut state = AnthropicState::default(); @@ -567,18 +573,18 @@ mod tests { &mut state, ) .unwrap(); - // BlockStop と ReasoningItem の 2 件が並ぶ - assert!(matches!(stop_evs[0], Event::BlockStop(_))); - let Event::ReasoningItem(reasoning) = &stop_evs[1] else { - panic!("expected ReasoningItem, got {:?}", stop_evs[1]); + assert_eq!(stop_evs.len(), 1); + let Event::BlockStop(stop) = &stop_evs[0] else { + panic!("expected BlockStop, got {:?}", stop_evs[0]); }; - assert_eq!(reasoning.text, "hello world"); + let reasoning = stop.reasoning.as_ref().expect("reasoning metadata"); + assert_eq!(reasoning.text.as_deref(), Some("hello world")); assert_eq!(reasoning.signature.as_deref(), Some("SIG-XYZ")); assert!(reasoning.encrypted_content.is_none()); } #[test] - fn redacted_thinking_emits_reasoning_item_with_data() { + fn redacted_thinking_stop_carries_reasoning_with_data() { let scheme = AnthropicScheme::new(); let mut state = AnthropicState::default(); @@ -596,16 +602,18 @@ mod tests { &mut state, ) .unwrap(); - let Event::ReasoningItem(reasoning) = &stop_evs[1] else { - panic!("expected ReasoningItem"); + assert_eq!(stop_evs.len(), 1); + let Event::BlockStop(stop) = &stop_evs[0] else { + panic!("expected BlockStop"); }; - assert!(reasoning.text.is_empty()); + let reasoning = stop.reasoning.as_ref().expect("reasoning metadata"); + assert_eq!(reasoning.text.as_deref(), Some("")); assert!(reasoning.signature.is_none()); assert_eq!(reasoning.encrypted_content.as_deref(), Some("opaque-blob")); } #[test] - fn text_block_does_not_emit_reasoning_item() { + fn text_block_stop_has_no_reasoning_metadata() { let scheme = AnthropicScheme::new(); let mut state = AnthropicState::default(); @@ -631,7 +639,10 @@ mod tests { ) .unwrap(); assert_eq!(stop_evs.len(), 1); - assert!(matches!(stop_evs[0], Event::BlockStop(_))); + let Event::BlockStop(stop) = &stop_evs[0] else { + panic!("expected BlockStop"); + }; + assert!(stop.reasoning.is_none()); } #[test] diff --git a/crates/llm-worker/src/llm_client/scheme/anthropic/scheme_impl.rs b/crates/llm-worker/src/llm_client/scheme/anthropic/scheme_impl.rs index 787f80b1..ce15baa9 100644 --- a/crates/llm-worker/src/llm_client/scheme/anthropic/scheme_impl.rs +++ b/crates/llm-worker/src/llm_client/scheme/anthropic/scheme_impl.rs @@ -9,7 +9,7 @@ use crate::llm_client::{ ClientError, auth::AuthRequirement, capability::ModelCapability, - event::{BlockType, Event, ReasoningItemEvent}, + event::{BlockType, Event, ReasoningBlockData}, scheme::Scheme, types::Request, }; @@ -23,7 +23,7 @@ use super::AnthropicScheme; /// `BlockStop` に書き戻す。 /// 2. `thinking` ブロック中の `thinking_delta` テキストと `signature_delta` /// 署名、および `redacted_thinking` ブロックの `data` を蓄積し、 -/// `content_block_stop` で `Event::ReasoningItem` を発火する +/// `content_block_stop` の Thinking block metadata として返す /// (round-trip 永続化のため)。 #[derive(Debug, Default)] pub struct AnthropicState { @@ -40,10 +40,10 @@ pub(crate) struct PendingThinking { } impl PendingThinking { - pub(crate) fn into_event(self) -> ReasoningItemEvent { - ReasoningItemEvent { + pub(crate) fn into_reasoning(self) -> ReasoningBlockData { + ReasoningBlockData { id: None, - text: self.text, + text: Some(self.text), summary: Vec::new(), encrypted_content: self.redacted_data, signature: self.signature, diff --git a/crates/llm-worker/src/llm_client/scheme/gemini/events.rs b/crates/llm-worker/src/llm_client/scheme/gemini/events.rs index d2fb0449..06af36a4 100644 --- a/crates/llm-worker/src/llm_client/scheme/gemini/events.rs +++ b/crates/llm-worker/src/llm_client/scheme/gemini/events.rs @@ -205,6 +205,7 @@ impl GeminiScheme { index: candidate_index, block_type: BlockType::Text, stop_reason, + reasoning: None, })); } } diff --git a/crates/llm-worker/src/llm_client/scheme/openai_responses/events.rs b/crates/llm-worker/src/llm_client/scheme/openai_responses/events.rs index 24ded585..322e27e9 100644 --- a/crates/llm-worker/src/llm_client/scheme/openai_responses/events.rs +++ b/crates/llm-worker/src/llm_client/scheme/openai_responses/events.rs @@ -14,7 +14,7 @@ use crate::llm_client::{ ClientError, event::{ BlockDelta, BlockMetadata, BlockStart, BlockStop, BlockType, DeltaContent, ErrorEvent, - Event, ReasoningItemEvent, ResponseStatus, StatusEvent, UnhandledSseEvent, UsageEvent, + Event, ReasoningBlockData, ResponseStatus, StatusEvent, UnhandledSseEvent, UsageEvent, }, }; @@ -25,8 +25,8 @@ pub struct OpenAIResponsesState { next_index: usize, /// 蓄積中の reasoning output_item。`output_item.added`(Reasoning) で /// 確保し、`reasoning_text.delta` / `reasoning_summary_text.delta` で - /// 蓄積、`output_item.done`(Reasoning) で `Event::ReasoningItem` を - /// 発火してエントリを除去する。 + /// 蓄積、`output_item.done`(Reasoning) で metadata-only Thinking block を + /// 完了させて reasoning persistence material を渡す。 pending_reasoning: HashMap, } @@ -380,8 +380,8 @@ pub(crate) fn parse_sse( "response.output_item.done" => { let ev: OutputItemDone = from_json(data)?; - // Reasoning wrapper の done で蓄積分を ReasoningItem として発火。 - // これは `slots` の OutputItem slot とは独立している + // Reasoning wrapper の done で蓄積分を metadata-only Thinking block + // stop に載せる。これは `slots` の OutputItem slot とは独立している // (FunctionCall は slots、Reasoning は pending_reasoning)。 if let OutputItem::Reasoning { id, @@ -396,23 +396,39 @@ pub(crate) fn parse_sse( if pending.id.is_none() { pending.id = id; } - return Ok(vec![Event::ReasoningItem(ReasoningItemEvent { - id: pending.id, - text: pending.text, - summary: pending - .summary - .into_iter() - .filter(|s| !s.is_empty()) - .collect(), - encrypted_content, - signature: None, - })]); + let info = + state.allocate(SlotKey::OutputItem(ev.output_index), BlockType::Thinking); + state.slots.remove(&SlotKey::OutputItem(ev.output_index)); + return Ok(vec![ + Event::BlockStart(BlockStart { + index: info.flat_index, + block_type: BlockType::Thinking, + metadata: BlockMetadata::Thinking, + }), + Event::BlockStop(BlockStop { + index: info.flat_index, + block_type: BlockType::Thinking, + stop_reason: None, + reasoning: Some(ReasoningBlockData { + id: pending.id, + text: Some(pending.text), + summary: pending + .summary + .into_iter() + .filter(|s| !s.is_empty()) + .collect(), + encrypted_content, + signature: None, + }), + }), + ]); } if let Some(info) = state.slots.remove(&SlotKey::OutputItem(ev.output_index)) { Ok(vec![Event::BlockStop(BlockStop { index: info.flat_index, block_type: info.block_type, stop_reason: None, + reasoning: None, })]) } else { Ok(Vec::new()) @@ -450,6 +466,7 @@ pub(crate) fn parse_sse( index: info.flat_index, block_type: info.block_type, stop_reason: None, + reasoning: None, })]) } else { Ok(Vec::new()) @@ -531,6 +548,7 @@ pub(crate) fn parse_sse( index: info.flat_index, block_type: info.block_type, stop_reason: None, + reasoning: None, })]) } else { Ok(Vec::new()) @@ -1116,9 +1134,9 @@ mod tests { } #[test] - fn reasoning_output_item_emits_reasoning_item_with_text_summary_encrypted() { + fn reasoning_output_item_completes_metadata_thinking_block_with_text_summary_encrypted() { // 完成済み reasoning wrapper が text + summary[] + encrypted_content を持って - // ReasoningItem として届くこと。 + // Thinking BlockStop metadata として届くこと。 let mut state = OpenAIResponsesState::default(); // wrapper added (id だけ持つ) @@ -1171,12 +1189,14 @@ mod tests { "response.output_item.done", r#"{"output_index":0,"item":{"type":"reasoning","id":"r1","encrypted_content":"ENC-XYZ"}}"#, ); - assert_eq!(evs.len(), 1); - let Event::ReasoningItem(reasoning) = &evs[0] else { - panic!("expected ReasoningItem, got {:?}", evs[0]); + assert_eq!(evs.len(), 2); + assert!(matches!(evs[0], Event::BlockStart(_))); + let Event::BlockStop(stop) = &evs[1] else { + panic!("expected BlockStop, got {:?}", evs[1]); }; + let reasoning = stop.reasoning.as_ref().expect("reasoning metadata"); assert_eq!(reasoning.id.as_deref(), Some("r1")); - assert_eq!(reasoning.text, "hello world"); + assert_eq!(reasoning.text.as_deref(), Some("hello world")); assert_eq!(reasoning.summary, vec!["sum-A".to_string()]); assert_eq!(reasoning.encrypted_content.as_deref(), Some("ENC-XYZ")); assert!(reasoning.signature.is_none()); @@ -1187,7 +1207,7 @@ mod tests { #[test] fn reasoning_wrapper_without_inner_content_emits_empty_text() { // encrypted_content だけ届く(reasoning_text 無し)ケースでも - // ReasoningItem は発火する。 + // reasoning metadata は届く。 let mut state = OpenAIResponsesState::default(); with( &mut state, @@ -1199,10 +1219,13 @@ mod tests { "response.output_item.done", r#"{"output_index":2,"item":{"type":"reasoning","id":"r9","encrypted_content":"BLOB"}}"#, ); - let Event::ReasoningItem(r) = &evs[0] else { - panic!() + assert_eq!(evs.len(), 2); + assert!(matches!(evs[0], Event::BlockStart(_))); + let Event::BlockStop(stop) = &evs[1] else { + panic!("expected BlockStop") }; - assert!(r.text.is_empty()); + let r = stop.reasoning.as_ref().expect("reasoning metadata"); + assert_eq!(r.text.as_deref(), Some("")); assert!(r.summary.is_empty()); assert_eq!(r.encrypted_content.as_deref(), Some("BLOB")); } diff --git a/crates/llm-worker/src/timeline/mod.rs b/crates/llm-worker/src/timeline/mod.rs index 02c1d8ad..a1bd24cb 100644 --- a/crates/llm-worker/src/timeline/mod.rs +++ b/crates/llm-worker/src/timeline/mod.rs @@ -7,18 +7,19 @@ //! - [`Timeline`] - イベントストリームの管理とディスパッチ //! - [`Handler`] - イベントを処理するトレイト //! - [`TextBlockCollector`] - テキストブロックを収集するHandler +//! - [`ThinkingBlockCollector`] - reasoning material を Thinking block から収集するHandler //! - [`ToolCallCollector`] - ツール呼び出しを収集するHandler pub mod event; -mod reasoning_item_collector; mod text_block_collector; +mod thinking_block_collector; mod timeline; mod tool_call_collector; // 公開API pub use event::*; -pub use reasoning_item_collector::ReasoningItemCollector; pub use text_block_collector::TextBlockCollector; +pub use thinking_block_collector::ThinkingBlockCollector; pub use timeline::Timeline; pub use tool_call_collector::ToolCallCollector; @@ -30,7 +31,6 @@ pub use crate::handler::{ Handler, Kind, PingKind, - ReasoningItemKind, StatusKind, // Block Events TextBlockEvent, diff --git a/crates/llm-worker/src/timeline/reasoning_item_collector.rs b/crates/llm-worker/src/timeline/reasoning_item_collector.rs deleted file mode 100644 index 93ab433d..00000000 --- a/crates/llm-worker/src/timeline/reasoning_item_collector.rs +++ /dev/null @@ -1,77 +0,0 @@ -//! `ReasoningItemCollector` - 完成済み reasoning item を収集する Handler -//! -//! Timeline の `ReasoningItemKind` Handler として登録し、scheme 側が -//! `Event::ReasoningItem` を発火するたびに 1 件ずつバッファに溜める。 -//! Worker はターン終了時に `take_collected()` でドレインして -//! `Item::Reasoning` として `worker.history` に append する。 - -use std::sync::{Arc, Mutex}; - -use crate::handler::{Handler, ReasoningItemKind}; -use crate::llm_client::event::ReasoningItemEvent; - -/// 収集された reasoning item の連列。 -#[derive(Clone, Default)] -pub struct ReasoningItemCollector { - collected: Arc>>, -} - -impl ReasoningItemCollector { - pub fn new() -> Self { - Self::default() - } - - /// 収集済み item を取り出してクリア - pub fn take_collected(&self) -> Vec { - let mut guard = self.collected.lock().unwrap(); - std::mem::take(&mut *guard) - } - - /// 収集をクリア - pub fn clear(&self) { - self.collected.lock().unwrap().clear(); - } -} - -impl Handler for ReasoningItemCollector { - type Scope = (); - - fn on_event(&mut self, _scope: &mut Self::Scope, event: &ReasoningItemEvent) { - self.collected.lock().unwrap().push(event.clone()); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::llm_client::event::Event; - use crate::timeline::Timeline; - - #[test] - fn collects_in_order() { - let collector = ReasoningItemCollector::new(); - let mut timeline = Timeline::new(); - timeline.on_reasoning_item(collector.clone()); - - timeline.dispatch(&Event::ReasoningItem(ReasoningItemEvent { - id: Some("r1".into()), - text: "first".into(), - signature: Some("sig1".into()), - ..Default::default() - })); - timeline.dispatch(&Event::ReasoningItem(ReasoningItemEvent { - id: Some("r2".into()), - text: "second".into(), - ..Default::default() - })); - - let items = collector.take_collected(); - assert_eq!(items.len(), 2); - assert_eq!(items[0].text, "first"); - assert_eq!(items[0].signature.as_deref(), Some("sig1")); - assert_eq!(items[1].text, "second"); - - // take は drain なので 2 度目は空 - assert!(collector.take_collected().is_empty()); - } -} diff --git a/crates/llm-worker/src/timeline/thinking_block_collector.rs b/crates/llm-worker/src/timeline/thinking_block_collector.rs new file mode 100644 index 00000000..0caa776b --- /dev/null +++ b/crates/llm-worker/src/timeline/thinking_block_collector.rs @@ -0,0 +1,138 @@ +//! `ThinkingBlockCollector` - reasoning material from Thinking block lifecycle. +//! +//! Scheme implementations emit Thinking BlockStart/Delta/Stop events for live +//! streaming. A Thinking block stop with `ReasoningBlockData` is also the single +//! authoritative persistence signal for `Item::Reasoning` round-trip material. + +use std::sync::{Arc, Mutex}; + +use crate::handler::{Handler, ThinkingBlockEvent, ThinkingBlockKind}; +use crate::llm_client::event::ReasoningBlockData; + +/// Reasoning material collected from completed Thinking blocks. +#[derive(Clone, Default)] +pub struct ThinkingBlockCollector { + collected: Arc>>, +} + +impl ThinkingBlockCollector { + pub fn new() -> Self { + Self::default() + } + + /// 収集済み item を取り出してクリア + pub fn take_collected(&self) -> Vec { + let mut guard = self.collected.lock().unwrap(); + std::mem::take(&mut *guard) + } + + /// 収集をクリア + pub fn clear(&self) { + self.collected.lock().unwrap().clear(); + } +} + +impl Handler for ThinkingBlockCollector { + type Scope = String; + + fn on_event(&mut self, scope: &mut Self::Scope, event: &ThinkingBlockEvent) { + match event { + ThinkingBlockEvent::Start(_) => scope.clear(), + ThinkingBlockEvent::Delta(text) => scope.push_str(text), + ThinkingBlockEvent::Stop(stop) => { + if let Some(mut reasoning) = stop.reasoning.clone() { + if reasoning.text.is_none() { + reasoning.text = Some(scope.clone()); + } + self.collected.lock().unwrap().push(reasoning); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::llm_client::event::{ + BlockMetadata, BlockStart, BlockStop, BlockType, DeltaContent, Event, ReasoningBlockData, + }; + use crate::timeline::Timeline; + + #[test] + fn collects_in_order_from_thinking_block_stops() { + let collector = ThinkingBlockCollector::new(); + let mut timeline = Timeline::new(); + timeline.on_thinking_block(collector.clone()); + + timeline.dispatch(&Event::BlockStart(BlockStart { + index: 0, + block_type: BlockType::Thinking, + metadata: BlockMetadata::Thinking, + })); + timeline.dispatch(&Event::BlockDelta(crate::llm_client::event::BlockDelta { + index: 0, + delta: DeltaContent::Thinking("first".into()), + })); + timeline.dispatch(&Event::BlockStop(BlockStop { + index: 0, + block_type: BlockType::Thinking, + stop_reason: None, + reasoning: Some(ReasoningBlockData { + id: Some("r1".into()), + signature: Some("sig1".into()), + ..Default::default() + }), + })); + + timeline.dispatch(&Event::BlockStart(BlockStart { + index: 1, + block_type: BlockType::Thinking, + metadata: BlockMetadata::Thinking, + })); + timeline.dispatch(&Event::BlockStop(BlockStop { + index: 1, + block_type: BlockType::Thinking, + stop_reason: None, + reasoning: Some(ReasoningBlockData { + id: Some("r2".into()), + text: Some("second".into()), + ..Default::default() + }), + })); + + let items = collector.take_collected(); + assert_eq!(items.len(), 2); + assert_eq!(items[0].text.as_deref(), Some("first")); + assert_eq!(items[0].signature.as_deref(), Some("sig1")); + assert_eq!(items[1].text.as_deref(), Some("second")); + + // take は drain なので 2 度目は空 + assert!(collector.take_collected().is_empty()); + } + + #[test] + fn ignores_streaming_only_thinking_blocks() { + let collector = ThinkingBlockCollector::new(); + let mut timeline = Timeline::new(); + timeline.on_thinking_block(collector.clone()); + + timeline.dispatch(&Event::BlockStart(BlockStart { + index: 0, + block_type: BlockType::Thinking, + metadata: BlockMetadata::Thinking, + })); + timeline.dispatch(&Event::BlockDelta(crate::llm_client::event::BlockDelta { + index: 0, + delta: DeltaContent::Thinking("live only".into()), + })); + timeline.dispatch(&Event::BlockStop(BlockStop { + index: 0, + block_type: BlockType::Thinking, + stop_reason: None, + reasoning: None, + })); + + assert!(collector.take_collected().is_empty()); + } +} diff --git a/crates/llm-worker/src/timeline/timeline.rs b/crates/llm-worker/src/timeline/timeline.rs index 91011918..61b2cb06 100644 --- a/crates/llm-worker/src/timeline/timeline.rs +++ b/crates/llm-worker/src/timeline/timeline.rs @@ -231,7 +231,10 @@ where if let Some(scope) = &mut self.scope { self.handler.on_event( scope, - &ThinkingBlockEvent::Stop(ThinkingBlockStop { index: stop.index }), + &ThinkingBlockEvent::Stop(ThinkingBlockStop { + index: stop.index, + reasoning: stop.reasoning.clone(), + }), ); } } @@ -375,8 +378,6 @@ pub struct Timeline { ping_handlers: Vec>>, status_handlers: Vec>>, error_handlers: Vec>>, - reasoning_item_handlers: Vec>>, - // Block系ハンドラー(BlockTypeごとにグループ化) text_block_handlers: Vec>, thinking_block_handlers: Vec>, @@ -405,7 +406,6 @@ impl Timeline { ping_handlers: Vec::new(), status_handlers: Vec::new(), error_handlers: Vec::new(), - reasoning_item_handlers: Vec::new(), text_block_handlers: Vec::new(), thinking_block_handlers: Vec::new(), tool_use_block_handlers: Vec::new(), @@ -467,18 +467,6 @@ impl Timeline { self } - /// `ReasoningItemKind` 用 Handler を登録 - pub fn on_reasoning_item(&mut self, handler: H) -> &mut Self - where - H: Handler + Send + Sync + 'static, - H::Scope: Send + Sync, - { - let mut wrapper = HandlerWrapper::new(handler); - wrapper.start_scope(); - self.reasoning_item_handlers.push(Box::new(wrapper)); - self - } - /// TextBlockKind用のHandlerを登録 pub fn on_text_block(&mut self, handler: H) -> &mut Self where @@ -532,9 +520,6 @@ impl Timeline { Event::BlockDelta(d) => self.handle_block_delta(d), Event::BlockStop(s) => self.handle_block_stop(s), Event::BlockAbort(a) => self.handle_block_abort(a), - - // 完成済み reasoning item: 即時ディスパッチ - Event::ReasoningItem(r) => self.dispatch_reasoning_item(r), } } @@ -577,12 +562,6 @@ impl Timeline { } } - fn dispatch_reasoning_item(&mut self, event: &ReasoningItemEvent) { - for handler in &mut self.reasoning_item_handlers { - handler.dispatch(event); - } - } - fn handle_block_start(&mut self, start: &BlockStart) { self.current_block = Some(start.block_type); diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index 2bcb5f77..d3e95fed 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -24,7 +24,7 @@ use crate::{ }, state::{Locked, Mutable, WorkerState}, timeline::event::{ErrorEvent, StatusEvent, UsageEvent}, - timeline::{ReasoningItemCollector, TextBlockCollector, Timeline, ToolCallCollector}, + timeline::{TextBlockCollector, ThinkingBlockCollector, Timeline, ToolCallCollector}, tool::{ ToolCall, ToolDefinition as WorkerToolDefinition, ToolError, ToolOutputLimits, ToolResult, truncate_content, @@ -163,9 +163,9 @@ pub struct Worker { text_block_collector: TextBlockCollector, /// Tool call collector (Timeline handler) tool_call_collector: ToolCallCollector, - /// Reasoning item collector (Timeline handler)。完成済み reasoning - /// item を 1 ターン分バッファし、history に append する。 - reasoning_item_collector: ReasoningItemCollector, + /// Thinking block collector (Timeline handler)。metadata 付きで完了した + /// Thinking block を 1 ターン分バッファし、history に append する。 + thinking_block_collector: ThinkingBlockCollector, /// Tool server handle tool_server: ToolServerHandle, /// Interceptor for control-flow decisions @@ -771,14 +771,14 @@ impl Worker { /// 置くことを要求するためでもある。 fn build_assistant_items( &self, - reasoning_items: &[crate::llm_client::event::ReasoningItemEvent], + reasoning_items: &[crate::llm_client::event::ReasoningBlockData], text_blocks: &[String], tool_calls: &[ToolCall], ) -> Vec { let mut items = Vec::new(); for r in reasoning_items { - let mut item = Item::reasoning(r.text.clone()); + let mut item = Item::reasoning(r.text.clone().unwrap_or_default()); if let Some(id) = &r.id { item = item.with_id(id); } @@ -1238,7 +1238,7 @@ impl Worker { self.timeline.abort_current_block(); self.timeline.flush_usage(); - let reasoning_items = self.reasoning_item_collector.take_collected(); + let reasoning_items = self.thinking_block_collector.take_collected(); let text_blocks = self.text_block_collector.take_collected(); // Do not recover tool calls from an interrupted stream. A completed // tool_use is executable only when the provider finishes the stream. @@ -1270,7 +1270,7 @@ impl Worker { // `append_history_items` so observers (e.g. the // Pod-side per-item session-log committer) see each item // as it lands. - let reasoning_items = self.reasoning_item_collector.take_collected(); + let reasoning_items = self.thinking_block_collector.take_collected(); let text_blocks = self.text_block_collector.take_collected(); let tool_calls = self.tool_call_collector.take_collected(); let assistant_items = @@ -1595,14 +1595,14 @@ impl Worker { pub fn new(client: C) -> Self { let text_block_collector = TextBlockCollector::new(); let tool_call_collector = ToolCallCollector::new(); - let reasoning_item_collector = ReasoningItemCollector::new(); + let thinking_block_collector = ThinkingBlockCollector::new(); let mut timeline = Timeline::new(); let (cancel_tx, cancel_rx) = mpsc::channel(1); // Register collectors with Timeline timeline.on_text_block(text_block_collector.clone()); timeline.on_tool_use_block(tool_call_collector.clone()); - timeline.on_reasoning_item(reasoning_item_collector.clone()); + timeline.on_thinking_block(thinking_block_collector.clone()); Self { client, @@ -1610,7 +1610,7 @@ impl Worker { timeline, text_block_collector, tool_call_collector, - reasoning_item_collector, + thinking_block_collector, tool_server: ToolServer::new().handle(), interceptor: Box::new(DefaultInterceptor), system_prompt: None, @@ -1874,7 +1874,7 @@ impl Worker { timeline: self.timeline, text_block_collector: self.text_block_collector, tool_call_collector: self.tool_call_collector, - reasoning_item_collector: self.reasoning_item_collector, + thinking_block_collector: self.thinking_block_collector, tool_server: self.tool_server, interceptor: self.interceptor, system_prompt: self.system_prompt, @@ -1966,7 +1966,7 @@ impl Worker { timeline: self.timeline, text_block_collector: self.text_block_collector, tool_call_collector: self.tool_call_collector, - reasoning_item_collector: self.reasoning_item_collector, + thinking_block_collector: self.thinking_block_collector, tool_server: self.tool_server, interceptor: self.interceptor, system_prompt: self.system_prompt, diff --git a/crates/llm-worker/tests/reasoning_round_trip_test.rs b/crates/llm-worker/tests/reasoning_round_trip_test.rs index 800a59ae..309e036c 100644 --- a/crates/llm-worker/tests/reasoning_round_trip_test.rs +++ b/crates/llm-worker/tests/reasoning_round_trip_test.rs @@ -16,27 +16,53 @@ mod common; use common::MockLlmClient; use llm_worker::Item; use llm_worker::Worker; -use llm_worker::llm_client::event::{Event, ReasoningItemEvent, ResponseStatus, StatusEvent}; +use llm_worker::llm_client::event::{ + BlockMetadata, BlockStart, BlockStop, BlockType, Event, ReasoningBlockData, ResponseStatus, + StatusEvent, +}; + +fn reasoning_block(text: impl Into, data: ReasoningBlockData) -> Vec { + vec![ + Event::BlockStart(BlockStart { + index: 100, + block_type: BlockType::Thinking, + metadata: BlockMetadata::Thinking, + }), + Event::BlockDelta(llm_worker::llm_client::event::BlockDelta { + index: 100, + delta: llm_worker::llm_client::event::DeltaContent::Thinking(text.into()), + }), + Event::BlockStop(BlockStop { + index: 100, + block_type: BlockType::Thinking, + stop_reason: None, + reasoning: Some(data), + }), + ] +} /// Anthropic 風: thinking ブロック → text → 終了 のシーケンス。 /// Worker history に Reasoning(signature 付き) → assistant_message が並ぶ。 #[tokio::test] async fn anthropic_thinking_round_trips_signature_into_history() { - let events = vec![ - Event::ReasoningItem(ReasoningItemEvent { + let mut events = reasoning_block( + "let me think...", + ReasoningBlockData { id: None, - text: "let me think...".into(), + text: None, summary: Vec::new(), encrypted_content: None, signature: Some("SIG-OPUS".into()), - }), + }, + ); + events.extend([ Event::text_block_start(0), Event::text_delta(0, "Here's the answer"), Event::text_block_stop(0, None), Event::Status(StatusEvent { status: ResponseStatus::Completed, }), - ]; + ]); let client = MockLlmClient::new(events); let worker = Worker::new(client); let out = worker.run("question?").await.expect("run ok"); @@ -63,21 +89,24 @@ async fn anthropic_thinking_round_trips_signature_into_history() { /// `Item::Reasoning` のフィールドに展開されること。 #[tokio::test] async fn openai_reasoning_round_trips_encrypted_and_summary() { - let events = vec![ - Event::ReasoningItem(ReasoningItemEvent { + let mut events = reasoning_block( + "", + ReasoningBlockData { id: Some("r1".into()), - text: "inner reasoning".into(), + text: Some("inner reasoning".into()), summary: vec!["sum-A".into(), "sum-B".into()], encrypted_content: Some("ENC-OPAQUE".into()), signature: None, - }), + }, + ); + events.extend([ Event::text_block_start(0), Event::text_delta(0, "answer"), Event::text_block_stop(0, None), Event::Status(StatusEvent { status: ResponseStatus::Completed, }), - ]; + ]); let client = MockLlmClient::new(events); let worker = Worker::new(client); let out = worker.run("q").await.expect("run ok"); @@ -107,21 +136,23 @@ async fn openai_reasoning_round_trips_encrypted_and_summary() { /// が thinking を assistant メッセージの先頭に要求するため)。 #[tokio::test] async fn reasoning_precedes_text_in_assistant_burst() { - let events = vec![ - // text/tool_call とは独立に、ReasoningItem が中盤で発火しても、 + let mut events = vec![ + // text/tool_call とは独立に、reasoning block が中盤で完了しても、 // history append 時には assistant items の先頭に置かれる。 Event::text_block_start(0), Event::text_delta(0, "intermediate"), Event::text_block_stop(0, None), - Event::ReasoningItem(ReasoningItemEvent { - text: "after text".into(), + ]; + events.extend(reasoning_block( + "after text", + ReasoningBlockData { signature: Some("SIG".into()), ..Default::default() - }), - Event::Status(StatusEvent { - status: ResponseStatus::Completed, - }), - ]; + }, + )); + events.push(Event::Status(StatusEvent { + status: ResponseStatus::Completed, + })); let client = MockLlmClient::new(events); let worker = Worker::new(client); let out = worker.run("q").await.expect("run ok");