From 30023349b9ba659869a1fffc42f06483ad530976 Mon Sep 17 00:00:00 2001 From: Hare Date: Mon, 4 May 2026 21:31:44 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20Reasoning=E3=81=AE=E3=82=B3=E3=83=B3?= =?UTF-8?q?=E3=83=86=E3=82=AD=E3=82=B9=E3=83=88=E7=AE=A1=E7=90=86=E3=81=AE?= =?UTF-8?q?=E5=AF=BE=E5=BF=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/llm-worker/src/handler.rs | 10 + crates/llm-worker/src/llm_client/event.rs | 40 +++ .../src/llm_client/scheme/anthropic/events.rs | 249 +++++++++++++++++- .../llm_client/scheme/anthropic/request.rs | 139 +++++++++- .../scheme/anthropic/scheme_impl.rs | 54 ++-- .../scheme/openai_responses/events.rs | 168 +++++++++++- crates/llm-worker/src/llm_client/types.rs | 16 ++ crates/llm-worker/src/timeline/mod.rs | 3 + .../src/timeline/reasoning_item_collector.rs | 77 ++++++ crates/llm-worker/src/timeline/timeline.rs | 23 ++ crates/llm-worker/src/worker.rs | 45 +++- .../tests/reasoning_round_trip_test.rs | 212 +++++++++++++++ crates/session-store/src/logged_item.rs | 8 + 13 files changed, 1009 insertions(+), 35 deletions(-) create mode 100644 crates/llm-worker/src/timeline/reasoning_item_collector.rs create mode 100644 crates/llm-worker/tests/reasoning_round_trip_test.rs diff --git a/crates/llm-worker/src/handler.rs b/crates/llm-worker/src/handler.rs index 225c514d..adb7ec4e 100644 --- a/crates/llm-worker/src/handler.rs +++ b/crates/llm-worker/src/handler.rs @@ -91,6 +91,16 @@ impl Kind for ErrorKind { type Event = ErrorEvent; } +/// Reasoning item Kind - 完成済み reasoning item の永続化用 +/// +/// 1 reasoning item につき 1 度だけ発火する。Worker は +/// `ReasoningItemCollector` 経由で受け取り、ターン終了時に +/// `Item::Reasoning` として history に append する。 +pub struct ReasoningItemKind; +impl Kind for ReasoningItemKind { + type Event = ReasoningItemEvent; +} + // ============================================================================= // Block Kind Definitions // ============================================================================= diff --git a/crates/llm-worker/src/llm_client/event.rs b/crates/llm-worker/src/llm_client/event.rs index a9be9d45..3e478803 100644 --- a/crates/llm-worker/src/llm_client/event.rs +++ b/crates/llm-worker/src/llm_client/event.rs @@ -17,6 +17,9 @@ use serde::{Deserialize, Serialize}; /// /// - **メタイベント**: `Ping`, `Usage`, `Status`, `Error` /// - **ブロックイベント**: `BlockStart`, `BlockDelta`, `BlockStop`, `BlockAbort` +/// - **永続化イベント**: `ReasoningItem` (history に commit すべき完成済み +/// reasoning item。streaming 表示用の Thinking BlockStart/Delta/Stop と +/// は別経路で発火する) /// /// # ブロックのライフサイクル /// @@ -41,6 +44,18 @@ pub enum Event { BlockStop(BlockStop), /// ブロック中断 BlockAbort(BlockAbort), + + /// Reasoning item の完成。scheme が「次の request に送り返すための + /// reasoning material が揃った」点で 1 度だけ発火する。 + /// + /// - Anthropic: 1 つの `thinking` content_block 完了ごと + /// - OpenAI Responses: 1 つの reasoning output_item 完了ごと + /// + /// 上位層(Worker / ReasoningItemCollector)はこれを `Item::Reasoning` + /// として `worker.history` に append する。streaming 表示用の + /// `BlockStart(Thinking)` / `BlockDelta(Thinking)` / `BlockStop(Thinking)` + /// は依然として並行発火する(live display と round-trip persist の責務分離)。 + ReasoningItem(ReasoningItemEvent), } // ============================================================================= @@ -212,6 +227,31 @@ impl BlockAbort { } } +// ============================================================================= +// Reasoning Item Event +// ============================================================================= + +/// 完成済み reasoning item。scheme が round-trip に必要なすべての +/// material(text, summary, encrypted_content, signature, id)を揃えて +/// 1 度だけ発火する。 +/// +/// `Item::Reasoning` のフィールドを 1:1 に持つ。 +#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)] +pub struct ReasoningItemEvent { + /// scheme 側で観測した item id(OpenAI Responses の `id`)。 + pub id: Option, + /// reasoning 本体テキスト。Anthropic は `thinking` 累積、OpenAI は + /// `reasoning_text` 累積。redacted_thinking では空。 + pub text: String, + /// summary (OpenAI Responses の `summary_text[]`)。他 scheme は空。 + pub summary: Vec, + /// 暗号化された opaque blob(Anthropic `redacted_thinking.data` / + /// OpenAI Responses `encrypted_content`)。 + pub encrypted_content: Option, + /// Anthropic extended thinking signature。round-trip 必須。 + pub signature: Option, +} + /// 停止理由 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum StopReason { diff --git a/crates/llm-worker/src/llm_client/scheme/anthropic/events.rs b/crates/llm-worker/src/llm_client/scheme/anthropic/events.rs index 6542a5c4..29374e25 100644 --- a/crates/llm-worker/src/llm_client/scheme/anthropic/events.rs +++ b/crates/llm-worker/src/llm_client/scheme/anthropic/events.rs @@ -12,6 +12,7 @@ use crate::llm_client::{ use serde::Deserialize; use super::AnthropicScheme; +use super::scheme_impl::{AnthropicState, PendingThinking}; /// Anthropic SSEイベントタイプ #[derive(Debug, Clone, PartialEq, Eq)] @@ -75,7 +76,21 @@ pub(crate) enum ContentBlock { #[serde(rename = "text")] Text { text: String }, #[serde(rename = "thinking")] - Thinking { thinking: String }, + Thinking { + #[serde(default)] + thinking: String, + /// 非ストリーミングレスポンス由来の初期 signature(通常はストリームでは + /// 空 → `signature_delta` で埋まる)。 + #[serde(default)] + signature: Option, + }, + #[serde(rename = "redacted_thinking")] + RedactedThinking { + /// 暗号化された opaque blob。signature ではなく、まるごと + /// `redacted_thinking.data` として送り返す必要がある。 + #[serde(default)] + data: String, + }, #[serde(rename = "tool_use")] ToolUse { id: String, @@ -228,7 +243,9 @@ impl AnthropicScheme { fn convert_block_start(&self, event: &ContentBlockStartEvent) -> Event { let (block_type, metadata) = match &event.content_block { ContentBlock::Text { .. } => (BlockType::Text, BlockMetadata::Text), - ContentBlock::Thinking { .. } => (BlockType::Thinking, BlockMetadata::Thinking), + ContentBlock::Thinking { .. } | ContentBlock::RedactedThinking { .. } => { + (BlockType::Thinking, BlockMetadata::Thinking) + } ContentBlock::ToolUse { id, name, .. } => ( BlockType::ToolUse, BlockMetadata::ToolUse { @@ -264,6 +281,123 @@ impl AnthropicScheme { })) } + /// state を持ち回す上位パース。 + /// + /// `parse_event` の単発 Event に加えて、以下を行う: + /// - `content_block_stop` の `block_type` を直前の Start 値で書き戻す + /// - `thinking` / `redacted_thinking` ブロックの本体・signature・data を + /// `state.pending_thinking` に蓄積し、`content_block_stop` で + /// `Event::ReasoningItem` を追加発火する + /// - `signature_delta` を蓄積(Stream channel には流さず、reasoning event + /// にだけ反映する) + pub(crate) fn parse_with_state( + &self, + event_type: &str, + data: &str, + state: &mut AnthropicState, + ) -> Result, ClientError> { + let Some(parsed_event_type) = AnthropicEventType::parse(event_type) else { + return Ok(Vec::new()); + }; + + // signature_delta はストリーム表示には流さず、state にだけ蓄積。 + // それ以外は parse_event で標準 Event 化する。 + let mut emitted: Vec = Vec::new(); + + match parsed_event_type { + AnthropicEventType::ContentBlockStart => { + let raw: ContentBlockStartEvent = serde_json::from_str(data)?; + state.current_block_type = Some(match &raw.content_block { + ContentBlock::Text { .. } => BlockType::Text, + ContentBlock::Thinking { .. } | ContentBlock::RedactedThinking { .. } => { + BlockType::Thinking + } + ContentBlock::ToolUse { .. } => BlockType::ToolUse, + }); + match &raw.content_block { + ContentBlock::Thinking { + thinking, signature, + } => { + state.pending_thinking = Some(PendingThinking { + text: thinking.clone(), + signature: signature.clone(), + redacted_data: None, + }); + } + ContentBlock::RedactedThinking { data: blob } => { + state.pending_thinking = Some(PendingThinking { + text: String::new(), + signature: None, + redacted_data: Some(blob.clone()), + }); + } + _ => {} + } + emitted.push(self.convert_block_start(&raw)); + } + AnthropicEventType::ContentBlockDelta => { + let raw: ContentBlockDeltaEvent = serde_json::from_str(data)?; + match &raw.delta { + DeltaBlock::ThinkingDelta { thinking } => { + if let Some(pending) = state.pending_thinking.as_mut() { + pending.text.push_str(thinking); + } + emitted.push(Event::BlockDelta(BlockDelta { + index: raw.index, + delta: DeltaContent::Thinking(thinking.clone()), + })); + } + DeltaBlock::SignatureDelta { signature } => { + if let Some(pending) = state.pending_thinking.as_mut() { + // 通常 1 回しか来ないが、複数 fragment 来ても連結しておく + match &mut pending.signature { + Some(acc) => acc.push_str(signature), + None => pending.signature = Some(signature.clone()), + } + } + } + DeltaBlock::TextDelta { text } => { + emitted.push(Event::BlockDelta(BlockDelta { + index: raw.index, + delta: DeltaContent::Text(text.clone()), + })); + } + DeltaBlock::InputJsonDelta { partial_json } => { + emitted.push(Event::BlockDelta(BlockDelta { + index: raw.index, + delta: DeltaContent::InputJson(partial_json.clone()), + })); + } + } + } + AnthropicEventType::ContentBlockStop => { + let raw: ContentBlockStopEvent = serde_json::from_str(data)?; + let block_type = state + .current_block_type + .take() + .unwrap_or(BlockType::Text); + emitted.push(Event::BlockStop(BlockStop { + index: raw.index, + block_type, + stop_reason: None, + })); + if matches!(block_type, BlockType::Thinking) { + if let Some(pending) = state.pending_thinking.take() { + emitted.push(Event::ReasoningItem(pending.into_event())); + } + } + } + // 残りは state を必要としない。既存 parse_event に委譲。 + _ => { + if let Some(event) = self.parse_event(event_type, data)? { + emitted.push(event); + } + } + } + + Ok(emitted) + } + fn convert_usage(&self, usage: &UsageData) -> UsageEvent { // Anthropic の `input_tokens` は **キャッシュ外** の入力トークンのみで、 // プロンプト全長は input_tokens + cache_read + cache_creation。 @@ -391,6 +525,117 @@ mod tests { } } + #[test] + fn thinking_block_emits_reasoning_item_with_signature() { + // thinking ブロックが完了したら ReasoningItem に text+signature が乗ること + let scheme = AnthropicScheme::new(); + let mut state = AnthropicState::default(); + + let evs = scheme + .parse_with_state( + "content_block_start", + r#"{"type":"content_block_start","index":0,"content_block":{"type":"thinking","thinking":""}}"#, + &mut state, + ) + .unwrap(); + assert!(matches!(evs[0], Event::BlockStart(_))); + + scheme + .parse_with_state( + "content_block_delta", + r#"{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"hello "}}"#, + &mut state, + ) + .unwrap(); + scheme + .parse_with_state( + "content_block_delta", + r#"{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"world"}}"#, + &mut state, + ) + .unwrap(); + scheme + .parse_with_state( + "content_block_delta", + r#"{"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":"SIG-XYZ"}}"#, + &mut state, + ) + .unwrap(); + + let stop_evs = scheme + .parse_with_state( + "content_block_stop", + r#"{"type":"content_block_stop","index":0}"#, + &mut state, + ) + .unwrap(); + // BlockStop と ReasoningItem の 2 件が並ぶ + assert!(matches!(stop_evs[0], Event::BlockStop(_))); + let Event::ReasoningItem(reasoning) = &stop_evs[1] else { + panic!("expected ReasoningItem, got {:?}", stop_evs[1]); + }; + assert_eq!(reasoning.text, "hello world"); + assert_eq!(reasoning.signature.as_deref(), Some("SIG-XYZ")); + assert!(reasoning.encrypted_content.is_none()); + } + + #[test] + fn redacted_thinking_emits_reasoning_item_with_data() { + let scheme = AnthropicScheme::new(); + let mut state = AnthropicState::default(); + + scheme + .parse_with_state( + "content_block_start", + r#"{"type":"content_block_start","index":0,"content_block":{"type":"redacted_thinking","data":"opaque-blob"}}"#, + &mut state, + ) + .unwrap(); + let stop_evs = scheme + .parse_with_state( + "content_block_stop", + r#"{"type":"content_block_stop","index":0}"#, + &mut state, + ) + .unwrap(); + let Event::ReasoningItem(reasoning) = &stop_evs[1] else { + panic!("expected ReasoningItem"); + }; + assert!(reasoning.text.is_empty()); + assert!(reasoning.signature.is_none()); + assert_eq!(reasoning.encrypted_content.as_deref(), Some("opaque-blob")); + } + + #[test] + fn text_block_does_not_emit_reasoning_item() { + let scheme = AnthropicScheme::new(); + let mut state = AnthropicState::default(); + + scheme + .parse_with_state( + "content_block_start", + r#"{"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}"#, + &mut state, + ) + .unwrap(); + scheme + .parse_with_state( + "content_block_delta", + r#"{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"hi"}}"#, + &mut state, + ) + .unwrap(); + let stop_evs = scheme + .parse_with_state( + "content_block_stop", + r#"{"type":"content_block_stop","index":0}"#, + &mut state, + ) + .unwrap(); + assert_eq!(stop_evs.len(), 1); + assert!(matches!(stop_evs[0], Event::BlockStop(_))); + } + #[test] fn test_parse_ping() { let scheme = AnthropicScheme::new(); diff --git a/crates/llm-worker/src/llm_client/scheme/anthropic/request.rs b/crates/llm-worker/src/llm_client/scheme/anthropic/request.rs index 12cce691..39b485ac 100644 --- a/crates/llm-worker/src/llm_client/scheme/anthropic/request.rs +++ b/crates/llm-worker/src/llm_client/scheme/anthropic/request.rs @@ -77,6 +77,21 @@ pub(crate) enum AnthropicContentPart { #[serde(skip_serializing_if = "Option::is_none")] cache_control: Option, }, + #[serde(rename = "thinking")] + Thinking { + thinking: String, + signature: String, + #[serde(skip_serializing_if = "Option::is_none")] + cache_control: Option, + }, + #[serde(rename = "redacted_thinking")] + RedactedThinking { + /// 暗号化済み reasoning blob。`Item::Reasoning::encrypted_content` + /// から渡る。 + data: String, + #[serde(skip_serializing_if = "Option::is_none")] + cache_control: Option, + }, #[serde(rename = "tool_use")] ToolUse { id: String, @@ -102,6 +117,21 @@ impl AnthropicContentPart { } } + fn thinking(thinking: String, signature: String) -> Self { + Self::Thinking { + thinking, + signature, + cache_control: None, + } + } + + fn redacted_thinking(data: String) -> Self { + Self::RedactedThinking { + data, + cache_control: None, + } + } + fn tool_use(id: String, name: String, input: serde_json::Value) -> Self { Self::ToolUse { id, @@ -122,6 +152,8 @@ impl AnthropicContentPart { fn set_cache_control(&mut self, cc: CacheControl) { match self { Self::Text { cache_control, .. } + | Self::Thinking { cache_control, .. } + | Self::RedactedThinking { cache_control, .. } | Self::ToolUse { cache_control, .. } | Self::ToolResult { cache_control, .. } => { *cache_control = Some(cc); @@ -305,11 +337,33 @@ impl AnthropicScheme { .push((i, AnthropicContentPart::tool_result(call_id.clone(), text))); } - Item::Reasoning { text, .. } => { + Item::Reasoning { + text, + encrypted_content, + signature, + .. + } => { flush_pending(&mut messages, &mut pending_user, "user", &mut locations); - // Reasoning is treated as assistant text in Anthropic - // (actual thinking blocks are handled differently in streaming). - pending_assistant.push((i, AnthropicContentPart::text(text.clone()))); + // Anthropic はアシスタントターン中の `thinking` / + // `redacted_thinking` ブロックを必ず assistant role の + // content_part として送り返す必要がある。 + // + // - signature あり: `thinking` content_part を投影 + // - signature 無し + encrypted_content あり: + // `redacted_thinking` content_part を投影 + // - どちらも無い: 他 scheme(OpenAI 等)から流入した + // 素の reasoning text。Anthropic に投げる意味も + // round-trip の根拠も無いので drop。 + if let Some(sig) = signature.clone() { + pending_assistant.push(( + i, + AnthropicContentPart::thinking(text.clone(), sig), + )); + } else if let Some(data) = encrypted_content.clone() { + pending_assistant + .push((i, AnthropicContentPart::redacted_thinking(data))); + } + // どちらも None なら何も pend せず、本 item は無視。 } } } @@ -542,6 +596,8 @@ mod tests { fn part_cache_control(part: &AnthropicContentPart) -> Option { match part { AnthropicContentPart::Text { cache_control, .. } + | AnthropicContentPart::Thinking { cache_control, .. } + | AnthropicContentPart::RedactedThinking { cache_control, .. } | AnthropicContentPart::ToolUse { cache_control, .. } | AnthropicContentPart::ToolResult { cache_control, .. } => *cache_control, } @@ -737,6 +793,81 @@ mod tests { assert!(breakpoint_positions(&req).is_empty()); } + fn collect_assistant_thinking_parts(req: &AnthropicRequest) -> Vec<&AnthropicContentPart> { + let mut out = Vec::new(); + for msg in &req.messages { + if msg.role != "assistant" { + continue; + } + if let AnthropicContent::Parts(parts) = &msg.content { + for part in parts { + if matches!( + part, + AnthropicContentPart::Thinking { .. } + | AnthropicContentPart::RedactedThinking { .. } + ) { + out.push(part); + } + } + } + } + out + } + + #[test] + fn reasoning_with_signature_projects_thinking_part() { + // Item::Reasoning に signature があれば assistant role の + // `thinking` content_part として送る。 + let scheme = AnthropicScheme::new(); + let request = Request::new() + .user("hi") + .item(Item::reasoning("step-by-step").with_signature("SIG-A")) + .item(Item::assistant_message("done")); + let req = scheme.build_request("claude-sonnet-4-20250514", &request, &cap_explicit()); + let thinking_parts = collect_assistant_thinking_parts(&req); + assert_eq!(thinking_parts.len(), 1); + match thinking_parts[0] { + AnthropicContentPart::Thinking { + thinking, signature, .. + } => { + assert_eq!(thinking, "step-by-step"); + assert_eq!(signature, "SIG-A"); + } + other => panic!("expected Thinking part, got {other:?}"), + } + } + + #[test] + fn reasoning_with_only_encrypted_content_projects_redacted_thinking() { + let scheme = AnthropicScheme::new(); + let request = Request::new() + .user("hi") + .item(Item::reasoning("").with_encrypted_content("opaque")) + .item(Item::assistant_message("done")); + let req = scheme.build_request("claude-sonnet-4-20250514", &request, &cap_explicit()); + let parts = collect_assistant_thinking_parts(&req); + assert_eq!(parts.len(), 1); + match parts[0] { + AnthropicContentPart::RedactedThinking { data, .. } => { + assert_eq!(data, "opaque"); + } + other => panic!("expected RedactedThinking, got {other:?}"), + } + } + + #[test] + fn reasoning_without_signature_or_encrypted_is_dropped() { + // 他 scheme から流入した素の reasoning は Anthropic に投げない。 + let scheme = AnthropicScheme::new(); + let request = Request::new() + .user("hi") + .item(Item::reasoning("plain text")) + .item(Item::assistant_message("done")); + let req = scheme.build_request("claude-sonnet-4-20250514", &request, &cap_explicit()); + // thinking part は 1 つも乗らない + assert!(collect_assistant_thinking_parts(&req).is_empty()); + } + #[test] fn tool_definitions_carry_no_cache_control() { // Tool JSON schema must serialise unchanged — no sneak-in of diff --git a/crates/llm-worker/src/llm_client/scheme/anthropic/scheme_impl.rs b/crates/llm-worker/src/llm_client/scheme/anthropic/scheme_impl.rs index fff8a73a..787f80b1 100644 --- a/crates/llm-worker/src/llm_client/scheme/anthropic/scheme_impl.rs +++ b/crates/llm-worker/src/llm_client/scheme/anthropic/scheme_impl.rs @@ -9,7 +9,7 @@ use crate::llm_client::{ ClientError, auth::AuthRequirement, capability::ModelCapability, - event::{BlockStop, BlockType, Event}, + event::{BlockType, Event, ReasoningItemEvent}, scheme::Scheme, types::Request, }; @@ -18,12 +18,37 @@ use super::AnthropicScheme; /// Anthropic の SSE パースで必要な状態。 /// -/// `content_block_stop` イベントは `block_type` を持たない仕様なので、 -/// 直前の `content_block_start` で観測した `block_type` を保持して -/// `BlockStop` に書き戻す。 +/// 1. `content_block_stop` イベントは `block_type` を持たない仕様なので、 +/// 直前の `content_block_start` で観測した `block_type` を保持して +/// `BlockStop` に書き戻す。 +/// 2. `thinking` ブロック中の `thinking_delta` テキストと `signature_delta` +/// 署名、および `redacted_thinking` ブロックの `data` を蓄積し、 +/// `content_block_stop` で `Event::ReasoningItem` を発火する +/// (round-trip 永続化のため)。 #[derive(Debug, Default)] pub struct AnthropicState { - current_block_type: Option, + pub(crate) current_block_type: Option, + pub(crate) pending_thinking: Option, +} + +/// 1 つの `thinking` または `redacted_thinking` content_block の蓄積バッファ。 +#[derive(Debug, Default)] +pub(crate) struct PendingThinking { + pub(crate) text: String, + pub(crate) signature: Option, + pub(crate) redacted_data: Option, +} + +impl PendingThinking { + pub(crate) fn into_event(self) -> ReasoningItemEvent { + ReasoningItemEvent { + id: None, + text: self.text, + summary: Vec::new(), + encrypted_content: self.redacted_data, + signature: self.signature, + } + } } impl Scheme for AnthropicScheme { @@ -73,24 +98,7 @@ impl Scheme for AnthropicScheme { data: &str, state: &mut Self::State, ) -> Result, ClientError> { - let Some(mut event) = self.parse_event(event_type, data)? else { - return Ok(Vec::new()); - }; - match &event { - Event::BlockStart(start) => { - state.current_block_type = Some(start.block_type); - } - Event::BlockStop(stop) => { - if let Some(block_type) = state.current_block_type.take() { - event = Event::BlockStop(BlockStop { - block_type, - ..stop.clone() - }); - } - } - _ => {} - } - Ok(vec![event]) + self.parse_with_state(event_type, data, state) } fn default_capability(&self) -> ModelCapability { diff --git a/crates/llm-worker/src/llm_client/scheme/openai_responses/events.rs b/crates/llm-worker/src/llm_client/scheme/openai_responses/events.rs index c1984ba9..14da96c4 100644 --- a/crates/llm-worker/src/llm_client/scheme/openai_responses/events.rs +++ b/crates/llm-worker/src/llm_client/scheme/openai_responses/events.rs @@ -13,7 +13,7 @@ use crate::llm_client::{ ClientError, event::{ BlockDelta, BlockMetadata, BlockStart, BlockStop, BlockType, DeltaContent, ErrorEvent, - Event, ResponseStatus, StatusEvent, UsageEvent, + Event, ReasoningItemEvent, ResponseStatus, StatusEvent, UsageEvent, }, }; @@ -22,6 +22,21 @@ use crate::llm_client::{ pub struct OpenAIResponsesState { slots: HashMap, next_index: usize, + /// 蓄積中の reasoning output_item。`output_item.added`(Reasoning) で + /// 確保し、`reasoning_text.delta` / `reasoning_summary_text.delta` で + /// 蓄積、`output_item.done`(Reasoning) で `Event::ReasoningItem` を + /// 発火してエントリを除去する。 + pending_reasoning: HashMap, +} + +/// 1 つの reasoning output_item の蓄積バッファ。 +#[derive(Debug, Default)] +struct PendingReasoning { + id: Option, + /// `reasoning_text.delta` の累積。複数 content_part あれば順に concat。 + text: String, + /// `reasoning_summary_text.delta` を summary_index 順に蓄積。 + summary: Vec, } impl OpenAIResponsesState { @@ -45,6 +60,18 @@ impl OpenAIResponsesState { (self.allocate(key, block_type), true) } } + + fn ensure_reasoning(&mut self, output_index: usize) -> &mut PendingReasoning { + self.pending_reasoning.entry(output_index).or_default() + } + + fn extend_reasoning_summary(&mut self, output_index: usize, summary_index: usize, text: &str) { + let entry = self.ensure_reasoning(output_index); + if entry.summary.len() <= summary_index { + entry.summary.resize(summary_index + 1, String::new()); + } + entry.summary[summary_index].push_str(text); + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -89,8 +116,12 @@ enum OutputItem { id: Option, }, Reasoning { - #[allow(dead_code)] + #[serde(default)] id: Option, + /// `output_item.done` で初めて埋まる。`include=["reasoning.encrypted_content"]` + /// 指定時に opaque blob が乗る。 + #[serde(default)] + encrypted_content: Option, }, FunctionCall { #[allow(dead_code)] @@ -319,12 +350,49 @@ pub(crate) fn parse_sse( metadata: BlockMetadata::ToolUse { id: call_id, name }, })]) } + OutputItem::Reasoning { id, .. } => { + // wrapper を確保。中身の content_part / summary_part は + // 別 SlotKey で扱われ続ける(Streaming 表示は維持)。 + let entry = state.ensure_reasoning(ev.output_index); + if id.is_some() { + entry.id = id; + } + Ok(Vec::new()) + } _ => Ok(Vec::new()), } } "response.output_item.done" => { let ev: OutputItemDone = from_json(data)?; + // Reasoning wrapper の done で蓄積分を ReasoningItem として発火。 + // これは `slots` の OutputItem slot とは独立している + // (FunctionCall は slots、Reasoning は pending_reasoning)。 + if let OutputItem::Reasoning { + id, + encrypted_content, + .. + } = ev.item + { + let mut pending = state + .pending_reasoning + .remove(&ev.output_index) + .unwrap_or_default(); + if pending.id.is_none() { + pending.id = id; + } + return Ok(vec![Event::ReasoningItem(ReasoningItemEvent { + id: pending.id, + text: pending.text, + summary: pending + .summary + .into_iter() + .filter(|s| !s.is_empty()) + .collect(), + encrypted_content, + signature: None, + })]); + } if let Some(info) = state.slots.remove(&SlotKey::OutputItem(ev.output_index)) { Ok(vec![Event::BlockStop(BlockStop { index: info.flat_index, @@ -389,6 +457,8 @@ pub(crate) fn parse_sse( "response.reasoning_text.delta" => { let ev: ReasoningTextDelta = from_json(data)?; + // round-trip 用に蓄積 + state.ensure_reasoning(ev.output_index).text.push_str(&ev.delta); Ok(ensure_and_delta( state, SlotKey::ContentPart { @@ -419,6 +489,8 @@ pub(crate) fn parse_sse( "response.reasoning_summary_text.delta" => { let ev: ReasoningSummaryTextDelta = from_json(data)?; + // round-trip 用に蓄積 + state.extend_reasoning_summary(ev.output_index, ev.summary_index, &ev.delta); Ok(ensure_and_delta( state, SlotKey::Summary { @@ -797,6 +869,98 @@ mod tests { )); } + #[test] + fn reasoning_output_item_emits_reasoning_item_with_text_summary_encrypted() { + // 完成済み reasoning wrapper が text + summary[] + encrypted_content を持って + // ReasoningItem として届くこと。 + let mut state = OpenAIResponsesState::default(); + + // wrapper added (id だけ持つ) + with( + &mut state, + "response.output_item.added", + r#"{"output_index":0,"item":{"type":"reasoning","id":"r1"}}"#, + ); + // 内側の reasoning_text 用 content_part + with( + &mut state, + "response.content_part.added", + r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":""}}"#, + ); + with( + &mut state, + "response.reasoning_text.delta", + r#"{"output_index":0,"content_index":0,"item_id":"r1","delta":"hello "}"#, + ); + with( + &mut state, + "response.reasoning_text.delta", + r#"{"output_index":0,"content_index":0,"item_id":"r1","delta":"world"}"#, + ); + with( + &mut state, + "response.content_part.done", + r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":"hello world"}}"#, + ); + // summary 1 件 + with( + &mut state, + "response.reasoning_summary_part.added", + r#"{"output_index":0,"summary_index":0,"item_id":"r1","part":{"type":"summary_text","text":""}}"#, + ); + with( + &mut state, + "response.reasoning_summary_text.delta", + r#"{"output_index":0,"summary_index":0,"item_id":"r1","delta":"sum-A"}"#, + ); + with( + &mut state, + "response.reasoning_summary_part.done", + r#"{"output_index":0,"summary_index":0,"item_id":"r1"}"#, + ); + + // wrapper done (encrypted_content が乗る) + let evs = with( + &mut state, + "response.output_item.done", + r#"{"output_index":0,"item":{"type":"reasoning","id":"r1","encrypted_content":"ENC-XYZ"}}"#, + ); + assert_eq!(evs.len(), 1); + let Event::ReasoningItem(reasoning) = &evs[0] else { + panic!("expected ReasoningItem, got {:?}", evs[0]); + }; + assert_eq!(reasoning.id.as_deref(), Some("r1")); + assert_eq!(reasoning.text, "hello world"); + assert_eq!(reasoning.summary, vec!["sum-A".to_string()]); + assert_eq!(reasoning.encrypted_content.as_deref(), Some("ENC-XYZ")); + assert!(reasoning.signature.is_none()); + // pending_reasoning は drain されていること + assert!(state.pending_reasoning.is_empty()); + } + + #[test] + fn reasoning_wrapper_without_inner_content_emits_empty_text() { + // encrypted_content だけ届く(reasoning_text 無し)ケースでも + // ReasoningItem は発火する。 + let mut state = OpenAIResponsesState::default(); + with( + &mut state, + "response.output_item.added", + r#"{"output_index":2,"item":{"type":"reasoning","id":"r9"}}"#, + ); + let evs = with( + &mut state, + "response.output_item.done", + r#"{"output_index":2,"item":{"type":"reasoning","id":"r9","encrypted_content":"BLOB"}}"#, + ); + let Event::ReasoningItem(r) = &evs[0] else { + panic!() + }; + assert!(r.text.is_empty()); + assert!(r.summary.is_empty()); + assert_eq!(r.encrypted_content.as_deref(), Some("BLOB")); + } + #[test] fn unknown_event_is_ignored() { let (events, _) = run("response.in_progress", "{}"); diff --git a/crates/llm-worker/src/llm_client/types.rs b/crates/llm-worker/src/llm_client/types.rs index f4d9a094..93afafc1 100644 --- a/crates/llm-worker/src/llm_client/types.rs +++ b/crates/llm-worker/src/llm_client/types.rs @@ -94,8 +94,15 @@ pub enum Item { summary: Vec, /// サーバから返された暗号化済み reasoning blob。ZDR / `store=false` /// 運用で stateless に再送するときそのまま添える必要がある。 + /// Anthropic の `redacted_thinking.data` もここに格納する。 #[serde(default, skip_serializing_if = "Option::is_none")] encrypted_content: Option, + /// Anthropic extended thinking の `signature`。新世代 Claude + /// (Opus 4.5+/Sonnet 4.6+) では同一論理ターン内の `thinking` + /// ブロックを送り返す際に必須。改ざん検知に使われる。他 scheme + /// では `None`。 + #[serde(default, skip_serializing_if = "Option::is_none")] + signature: Option, /// Item status #[serde(skip_serializing_if = "Option::is_none")] status: Option, @@ -224,6 +231,7 @@ impl Item { text: text.into(), summary: Vec::new(), encrypted_content: None, + signature: None, status: None, } } @@ -247,6 +255,14 @@ impl Item { self } + /// Set Anthropic `signature` on a `Reasoning` item. No-op on other variants. + pub fn with_signature(mut self, sig: impl Into) -> Self { + if let Self::Reasoning { signature, .. } = &mut self { + *signature = Some(sig.into()); + } + self + } + // ======================================================================== // Builder methods // ======================================================================== diff --git a/crates/llm-worker/src/timeline/mod.rs b/crates/llm-worker/src/timeline/mod.rs index 9ee896e8..02c1d8ad 100644 --- a/crates/llm-worker/src/timeline/mod.rs +++ b/crates/llm-worker/src/timeline/mod.rs @@ -10,12 +10,14 @@ //! - [`ToolCallCollector`] - ツール呼び出しを収集するHandler pub mod event; +mod reasoning_item_collector; mod text_block_collector; mod timeline; mod tool_call_collector; // 公開API pub use event::*; +pub use reasoning_item_collector::ReasoningItemCollector; pub use text_block_collector::TextBlockCollector; pub use timeline::Timeline; pub use tool_call_collector::ToolCallCollector; @@ -28,6 +30,7 @@ pub use crate::handler::{ Handler, Kind, PingKind, + ReasoningItemKind, StatusKind, // Block Events TextBlockEvent, diff --git a/crates/llm-worker/src/timeline/reasoning_item_collector.rs b/crates/llm-worker/src/timeline/reasoning_item_collector.rs new file mode 100644 index 00000000..93ab433d --- /dev/null +++ b/crates/llm-worker/src/timeline/reasoning_item_collector.rs @@ -0,0 +1,77 @@ +//! `ReasoningItemCollector` - 完成済み reasoning item を収集する Handler +//! +//! Timeline の `ReasoningItemKind` Handler として登録し、scheme 側が +//! `Event::ReasoningItem` を発火するたびに 1 件ずつバッファに溜める。 +//! Worker はターン終了時に `take_collected()` でドレインして +//! `Item::Reasoning` として `worker.history` に append する。 + +use std::sync::{Arc, Mutex}; + +use crate::handler::{Handler, ReasoningItemKind}; +use crate::llm_client::event::ReasoningItemEvent; + +/// 収集された reasoning item の連列。 +#[derive(Clone, Default)] +pub struct ReasoningItemCollector { + collected: Arc>>, +} + +impl ReasoningItemCollector { + pub fn new() -> Self { + Self::default() + } + + /// 収集済み item を取り出してクリア + pub fn take_collected(&self) -> Vec { + let mut guard = self.collected.lock().unwrap(); + std::mem::take(&mut *guard) + } + + /// 収集をクリア + pub fn clear(&self) { + self.collected.lock().unwrap().clear(); + } +} + +impl Handler for ReasoningItemCollector { + type Scope = (); + + fn on_event(&mut self, _scope: &mut Self::Scope, event: &ReasoningItemEvent) { + self.collected.lock().unwrap().push(event.clone()); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::llm_client::event::Event; + use crate::timeline::Timeline; + + #[test] + fn collects_in_order() { + let collector = ReasoningItemCollector::new(); + let mut timeline = Timeline::new(); + timeline.on_reasoning_item(collector.clone()); + + timeline.dispatch(&Event::ReasoningItem(ReasoningItemEvent { + id: Some("r1".into()), + text: "first".into(), + signature: Some("sig1".into()), + ..Default::default() + })); + timeline.dispatch(&Event::ReasoningItem(ReasoningItemEvent { + id: Some("r2".into()), + text: "second".into(), + ..Default::default() + })); + + let items = collector.take_collected(); + assert_eq!(items.len(), 2); + assert_eq!(items[0].text, "first"); + assert_eq!(items[0].signature.as_deref(), Some("sig1")); + assert_eq!(items[1].text, "second"); + + // take は drain なので 2 度目は空 + assert!(collector.take_collected().is_empty()); + } +} diff --git a/crates/llm-worker/src/timeline/timeline.rs b/crates/llm-worker/src/timeline/timeline.rs index 7ac581ab..4739f8c9 100644 --- a/crates/llm-worker/src/timeline/timeline.rs +++ b/crates/llm-worker/src/timeline/timeline.rs @@ -381,6 +381,7 @@ pub struct Timeline { ping_handlers: Vec>>, status_handlers: Vec>>, error_handlers: Vec>>, + reasoning_item_handlers: Vec>>, // Block系ハンドラー(BlockTypeごとにグループ化) text_block_handlers: Vec>, @@ -410,6 +411,7 @@ impl Timeline { ping_handlers: Vec::new(), status_handlers: Vec::new(), error_handlers: Vec::new(), + reasoning_item_handlers: Vec::new(), text_block_handlers: Vec::new(), thinking_block_handlers: Vec::new(), tool_use_block_handlers: Vec::new(), @@ -471,6 +473,18 @@ impl Timeline { self } + /// `ReasoningItemKind` 用 Handler を登録 + pub fn on_reasoning_item(&mut self, handler: H) -> &mut Self + where + H: Handler + Send + Sync + 'static, + H::Scope: Send + Sync, + { + let mut wrapper = HandlerWrapper::new(handler); + wrapper.start_scope(); + self.reasoning_item_handlers.push(Box::new(wrapper)); + self + } + /// TextBlockKind用のHandlerを登録 pub fn on_text_block(&mut self, handler: H) -> &mut Self where @@ -522,6 +536,9 @@ impl Timeline { Event::BlockDelta(d) => self.handle_block_delta(d), Event::BlockStop(s) => self.handle_block_stop(s), Event::BlockAbort(a) => self.handle_block_abort(a), + + // 完成済み reasoning item: 即時ディスパッチ + Event::ReasoningItem(r) => self.dispatch_reasoning_item(r), } } @@ -564,6 +581,12 @@ impl Timeline { } } + fn dispatch_reasoning_item(&mut self, event: &ReasoningItemEvent) { + for handler in &mut self.reasoning_item_handlers { + handler.dispatch(event); + } + } + fn handle_block_start(&mut self, start: &BlockStart) { self.current_block = Some(start.block_type); diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index 48154efc..6431b392 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -22,7 +22,7 @@ use crate::{ }, state::{Locked, Mutable, WorkerState}, timeline::event::{ErrorEvent, StatusEvent, UsageEvent}, - timeline::{TextBlockCollector, Timeline, ToolCallCollector}, + timeline::{ReasoningItemCollector, TextBlockCollector, Timeline, ToolCallCollector}, tool::{ ToolCall, ToolDefinition as WorkerToolDefinition, ToolError, ToolOutputLimits, ToolResult, truncate_content, @@ -140,6 +140,9 @@ pub struct Worker { text_block_collector: TextBlockCollector, /// Tool call collector (Timeline handler) tool_call_collector: ToolCallCollector, + /// Reasoning item collector (Timeline handler)。完成済み reasoning + /// item を 1 ターン分バッファし、history に append する。 + reasoning_item_collector: ReasoningItemCollector, /// Tool server handle tool_server: ToolServerHandle, /// Interceptor for control-flow decisions @@ -587,10 +590,37 @@ impl Worker { self.tool_server.tool_definitions_sorted() } - /// Build assistant response items from text blocks and tool calls - fn build_assistant_items(&self, text_blocks: &[String], tool_calls: &[ToolCall]) -> Vec { + /// Build assistant response items from reasoning items, text blocks, and tool calls. + /// + /// Reasoning items come first (Anthropic / OpenAI Responses 双方ともに + /// アシスタント応答内で reasoning は先頭に並ぶ仕様)。これは Anthropic + /// が新世代モデルで thinking ブロックを assistant メッセージの先頭に + /// 置くことを要求するためでもある。 + fn build_assistant_items( + &self, + reasoning_items: &[crate::llm_client::event::ReasoningItemEvent], + text_blocks: &[String], + tool_calls: &[ToolCall], + ) -> Vec { let mut items = Vec::new(); + for r in reasoning_items { + let mut item = Item::reasoning(r.text.clone()); + if let Some(id) = &r.id { + item = item.with_id(id); + } + if !r.summary.is_empty() { + item = item.with_reasoning_summary(r.summary.clone()); + } + if let Some(enc) = &r.encrypted_content { + item = item.with_encrypted_content(enc); + } + if let Some(sig) = &r.signature { + item = item.with_signature(sig); + } + items.push(item); + } + // Add text as assistant message if present let text = text_blocks.join(""); if !text.is_empty() { @@ -973,9 +1003,11 @@ impl Worker { self.turn_count += 1; // Collect and commit assistant items + let reasoning_items = self.reasoning_item_collector.take_collected(); let text_blocks = self.text_block_collector.take_collected(); let tool_calls = self.tool_call_collector.take_collected(); - let assistant_items = self.build_assistant_items(&text_blocks, &tool_calls); + let assistant_items = + self.build_assistant_items(&reasoning_items, &text_blocks, &tool_calls); self.history.extend(assistant_items); if tool_calls.is_empty() { @@ -1118,18 +1150,21 @@ impl Worker { pub fn new(client: C) -> Self { let text_block_collector = TextBlockCollector::new(); let tool_call_collector = ToolCallCollector::new(); + let reasoning_item_collector = ReasoningItemCollector::new(); let mut timeline = Timeline::new(); let (cancel_tx, cancel_rx) = mpsc::channel(1); // Register collectors with Timeline timeline.on_text_block(text_block_collector.clone()); timeline.on_tool_use_block(tool_call_collector.clone()); + timeline.on_reasoning_item(reasoning_item_collector.clone()); Self { client, timeline, text_block_collector, tool_call_collector, + reasoning_item_collector, tool_server: ToolServer::new().handle(), interceptor: Box::new(DefaultInterceptor), system_prompt: None, @@ -1388,6 +1423,7 @@ impl Worker { timeline: self.timeline, text_block_collector: self.text_block_collector, tool_call_collector: self.tool_call_collector, + reasoning_item_collector: self.reasoning_item_collector, tool_server: self.tool_server, interceptor: self.interceptor, system_prompt: self.system_prompt, @@ -1470,6 +1506,7 @@ impl Worker { timeline: self.timeline, text_block_collector: self.text_block_collector, tool_call_collector: self.tool_call_collector, + reasoning_item_collector: self.reasoning_item_collector, tool_server: self.tool_server, interceptor: self.interceptor, system_prompt: self.system_prompt, diff --git a/crates/llm-worker/tests/reasoning_round_trip_test.rs b/crates/llm-worker/tests/reasoning_round_trip_test.rs new file mode 100644 index 00000000..b8b4f14b --- /dev/null +++ b/crates/llm-worker/tests/reasoning_round_trip_test.rs @@ -0,0 +1,212 @@ +//! Reasoning history round-trip 統合テスト +//! +//! Worker のストリーム → history append → 次リクエスト送出までの +//! ライフサイクルで `Item::Reasoning` が脱落せず保持されることを確認する。 +//! +//! 検証点: +//! - Anthropic 由来の thinking + signature が `Item::Reasoning::signature` として +//! history に残る +//! - OpenAI Responses 由来の reasoning text + summary + encrypted_content が +//! `Item::Reasoning` の各フィールドに展開される +//! - 直前の reasoning は次の outgoing request の `request.items` の先頭付近に +//! 含まれる(assistant メッセージの先頭、Anthropic 仕様) + +mod common; + +use common::MockLlmClient; +use llm_worker::Item; +use llm_worker::Worker; +use llm_worker::llm_client::event::{ + Event, ReasoningItemEvent, ResponseStatus, StatusEvent, +}; + +/// Anthropic 風: thinking ブロック → text → 終了 のシーケンス。 +/// Worker history に Reasoning(signature 付き) → assistant_message が並ぶ。 +#[tokio::test] +async fn anthropic_thinking_round_trips_signature_into_history() { + let events = vec![ + Event::ReasoningItem(ReasoningItemEvent { + id: None, + text: "let me think...".into(), + summary: Vec::new(), + encrypted_content: None, + signature: Some("SIG-OPUS".into()), + }), + Event::text_block_start(0), + Event::text_delta(0, "Here's the answer"), + Event::text_block_stop(0, None), + Event::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ]; + let client = MockLlmClient::new(events); + let worker = Worker::new(client); + let out = worker.run("question?").await.expect("run ok"); + let worker = out.worker; + + let history = worker.history(); + // user / reasoning / assistant_message + assert_eq!(history.len(), 3, "history: {history:?}"); + + assert!(matches!(history[0], Item::Message { .. })); + match &history[1] { + Item::Reasoning { + text, signature, .. + } => { + assert_eq!(text, "let me think..."); + assert_eq!(signature.as_deref(), Some("SIG-OPUS")); + } + other => panic!("expected Reasoning, got {other:?}"), + } + assert_eq!(history[2].as_text(), Some("Here's the answer")); +} + +/// OpenAI Responses 風: encrypted_content + summary を持った reasoning が +/// `Item::Reasoning` のフィールドに展開されること。 +#[tokio::test] +async fn openai_reasoning_round_trips_encrypted_and_summary() { + let events = vec![ + Event::ReasoningItem(ReasoningItemEvent { + id: Some("r1".into()), + text: "inner reasoning".into(), + summary: vec!["sum-A".into(), "sum-B".into()], + encrypted_content: Some("ENC-OPAQUE".into()), + signature: None, + }), + Event::text_block_start(0), + Event::text_delta(0, "answer"), + Event::text_block_stop(0, None), + Event::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ]; + let client = MockLlmClient::new(events); + let worker = Worker::new(client); + let out = worker.run("q").await.expect("run ok"); + let worker = out.worker; + + let history = worker.history(); + match &history[1] { + Item::Reasoning { + text, + summary, + encrypted_content, + signature, + id, + .. + } => { + assert_eq!(text, "inner reasoning"); + assert_eq!(summary, &vec!["sum-A".to_string(), "sum-B".to_string()]); + assert_eq!(encrypted_content.as_deref(), Some("ENC-OPAQUE")); + assert!(signature.is_none()); + assert_eq!(id.as_deref(), Some("r1")); + } + other => panic!("expected Reasoning, got {other:?}"), + } +} + +/// Reasoning は assistant ターン内で text/tool_call より先に並ぶこと(Anthropic +/// が thinking を assistant メッセージの先頭に要求するため)。 +#[tokio::test] +async fn reasoning_precedes_text_in_assistant_burst() { + let events = vec![ + // text/tool_call とは独立に、ReasoningItem が中盤で発火しても、 + // history append 時には assistant items の先頭に置かれる。 + Event::text_block_start(0), + Event::text_delta(0, "intermediate"), + Event::text_block_stop(0, None), + Event::ReasoningItem(ReasoningItemEvent { + text: "after text".into(), + signature: Some("SIG".into()), + ..Default::default() + }), + Event::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ]; + let client = MockLlmClient::new(events); + let worker = Worker::new(client); + let out = worker.run("q").await.expect("run ok"); + let worker = out.worker; + + let history = worker.history(); + // user / reasoning(先頭) / assistant_message + assert!(matches!(history[1], Item::Reasoning { .. })); + assert_eq!(history[2].as_text(), Some("intermediate")); +} + +/// resume シナリオ: history.json 由来の Item::Reasoning(signature) を Worker に +/// 注入して run しても、次の outgoing request の `Request::items` にそのまま +/// 載って LLM へ渡る(worker は items を改変しない契約)。 +#[tokio::test] +async fn injected_reasoning_survives_into_outgoing_request() { + use async_trait::async_trait; + use futures::Stream; + use std::pin::Pin; + use std::sync::{Arc, Mutex}; + + use llm_worker::llm_client::{ClientError, LlmClient, Request}; + + /// Request を 1 度だけキャプチャして空ストリームを返す client。 + #[derive(Clone)] + struct CapturingClient { + captured: Arc>>, + } + + #[async_trait] + impl LlmClient for CapturingClient { + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + async fn stream( + &self, + request: Request, + ) -> Result> + Send>>, ClientError> + { + *self.captured.lock().unwrap() = Some(request); + let stream = futures::stream::iter(vec![Ok(Event::Status(StatusEvent { + status: ResponseStatus::Completed, + }))]); + Ok(Box::pin(stream)) + } + } + + let captured = Arc::new(Mutex::new(None)); + let client = CapturingClient { + captured: captured.clone(), + }; + + let mut worker = Worker::new(client); + // resume: 既存 history を流し込む + worker.set_history(vec![ + Item::user_message("prior question"), + Item::reasoning("prior thinking").with_signature("SIG-PRIOR"), + Item::assistant_message("prior answer"), + ]); + + let _ = worker.run("follow up").await.expect("run ok"); + + let req = captured + .lock() + .unwrap() + .take() + .expect("client should have received a request"); + // Reasoning item が outgoing items に保持されていること + let mut found = false; + for item in &req.items { + if let Item::Reasoning { + text, signature, .. + } = item + { + assert_eq!(text, "prior thinking"); + assert_eq!(signature.as_deref(), Some("SIG-PRIOR")); + found = true; + } + } + assert!( + found, + "Reasoning item must survive into outgoing request items: {req:?}", + req = req.items, + ); +} diff --git a/crates/session-store/src/logged_item.rs b/crates/session-store/src/logged_item.rs index a1bc2bf6..b390a1a9 100644 --- a/crates/session-store/src/logged_item.rs +++ b/crates/session-store/src/logged_item.rs @@ -39,6 +39,10 @@ pub enum LoggedItem { summary: Vec, #[serde(default, skip_serializing_if = "Option::is_none")] encrypted_content: Option, + /// Anthropic extended thinking signature。新世代 Claude で round-trip + /// 必須。OpenAI Responses など他 scheme では `None`。 + #[serde(default, skip_serializing_if = "Option::is_none")] + signature: Option, }, } @@ -92,11 +96,13 @@ impl From<&Item> for LoggedItem { text, summary, encrypted_content, + signature, .. } => Self::Reasoning { text: text.clone(), summary: summary.clone(), encrypted_content: encrypted_content.clone(), + signature: signature.clone(), }, } } @@ -142,11 +148,13 @@ impl From for Item { text, summary, encrypted_content, + signature, } => Item::Reasoning { id: None, text, summary, encrypted_content, + signature, status: None, }, }