feat: Reasoningのコンテキスト管理の対応

This commit is contained in:
Keisuke Hirata 2026-05-04 21:31:44 +09:00
parent e31cbcb150
commit 594671edc3
13 changed files with 1009 additions and 35 deletions

View File

@ -91,6 +91,16 @@ impl Kind for ErrorKind {
type Event = ErrorEvent;
}
/// Reasoning item Kind - 完成済み reasoning item の永続化用
///
/// 1 reasoning item につき 1 度だけ発火する。Worker は
/// `ReasoningItemCollector` 経由で受け取り、ターン終了時に
/// `Item::Reasoning` として history に append する。
pub struct ReasoningItemKind;
impl Kind for ReasoningItemKind {
type Event = ReasoningItemEvent;
}
// =============================================================================
// Block Kind Definitions
// =============================================================================

View File

@ -17,6 +17,9 @@ use serde::{Deserialize, Serialize};
///
/// - **メタイベント**: `Ping`, `Usage`, `Status`, `Error`
/// - **ブロックイベント**: `BlockStart`, `BlockDelta`, `BlockStop`, `BlockAbort`
/// - **永続化イベント**: `ReasoningItem` (history に commit すべき完成済み
/// reasoning item。streaming 表示用の Thinking BlockStart/Delta/Stop と
/// は別経路で発火する)
///
/// # ブロックのライフサイクル
///
@ -41,6 +44,18 @@ pub enum Event {
BlockStop(BlockStop),
/// ブロック中断
BlockAbort(BlockAbort),
/// Reasoning item の完成。scheme が「次の request に送り返すための
/// reasoning material が揃った」点で 1 度だけ発火する。
///
/// - Anthropic: 1 つの `thinking` content_block 完了ごと
/// - OpenAI Responses: 1 つの reasoning output_item 完了ごと
///
/// 上位層Worker / ReasoningItemCollectorはこれを `Item::Reasoning`
/// として `worker.history` に append する。streaming 表示用の
/// `BlockStart(Thinking)` / `BlockDelta(Thinking)` / `BlockStop(Thinking)`
/// は依然として並行発火するlive display と round-trip persist の責務分離)。
ReasoningItem(ReasoningItemEvent),
}
// =============================================================================
@ -212,6 +227,31 @@ impl BlockAbort {
}
}
// =============================================================================
// Reasoning Item Event
// =============================================================================
/// 完成済み reasoning item。scheme が round-trip に必要なすべての
/// materialtext, summary, encrypted_content, signature, idを揃えて
/// 1 度だけ発火する。
///
/// `Item::Reasoning` のフィールドを 1:1 に持つ。
#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
pub struct ReasoningItemEvent {
/// scheme 側で観測した item idOpenAI Responses の `id`)。
pub id: Option<String>,
/// reasoning 本体テキスト。Anthropic は `thinking` 累積、OpenAI は
/// `reasoning_text` 累積。redacted_thinking では空。
pub text: String,
/// summary (OpenAI Responses の `summary_text[]`)。他 scheme は空。
pub summary: Vec<String>,
/// 暗号化された opaque blobAnthropic `redacted_thinking.data` /
/// OpenAI Responses `encrypted_content`)。
pub encrypted_content: Option<String>,
/// Anthropic extended thinking signature。round-trip 必須。
pub signature: Option<String>,
}
/// 停止理由
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum StopReason {

View File

@ -12,6 +12,7 @@ use crate::llm_client::{
use serde::Deserialize;
use super::AnthropicScheme;
use super::scheme_impl::{AnthropicState, PendingThinking};
/// Anthropic SSEイベントタイプ
#[derive(Debug, Clone, PartialEq, Eq)]
@ -75,7 +76,21 @@ pub(crate) enum ContentBlock {
#[serde(rename = "text")]
Text { text: String },
#[serde(rename = "thinking")]
Thinking { thinking: String },
Thinking {
#[serde(default)]
thinking: String,
/// 非ストリーミングレスポンス由来の初期 signature通常はストリームでは
/// 空 → `signature_delta` で埋まる)。
#[serde(default)]
signature: Option<String>,
},
#[serde(rename = "redacted_thinking")]
RedactedThinking {
/// 暗号化された opaque blob。signature ではなく、まるごと
/// `redacted_thinking.data` として送り返す必要がある。
#[serde(default)]
data: String,
},
#[serde(rename = "tool_use")]
ToolUse {
id: String,
@ -228,7 +243,9 @@ impl AnthropicScheme {
fn convert_block_start(&self, event: &ContentBlockStartEvent) -> Event {
let (block_type, metadata) = match &event.content_block {
ContentBlock::Text { .. } => (BlockType::Text, BlockMetadata::Text),
ContentBlock::Thinking { .. } => (BlockType::Thinking, BlockMetadata::Thinking),
ContentBlock::Thinking { .. } | ContentBlock::RedactedThinking { .. } => {
(BlockType::Thinking, BlockMetadata::Thinking)
}
ContentBlock::ToolUse { id, name, .. } => (
BlockType::ToolUse,
BlockMetadata::ToolUse {
@ -264,6 +281,123 @@ impl AnthropicScheme {
}))
}
/// state を持ち回す上位パース。
///
/// `parse_event` の単発 Event に加えて、以下を行う:
/// - `content_block_stop` の `block_type` を直前の Start 値で書き戻す
/// - `thinking` / `redacted_thinking` ブロックの本体・signature・data を
/// `state.pending_thinking` に蓄積し、`content_block_stop` で
/// `Event::ReasoningItem` を追加発火する
/// - `signature_delta` を蓄積Stream channel には流さず、reasoning event
/// にだけ反映する)
pub(crate) fn parse_with_state(
&self,
event_type: &str,
data: &str,
state: &mut AnthropicState,
) -> Result<Vec<Event>, ClientError> {
let Some(parsed_event_type) = AnthropicEventType::parse(event_type) else {
return Ok(Vec::new());
};
// signature_delta はストリーム表示には流さず、state にだけ蓄積。
// それ以外は parse_event で標準 Event 化する。
let mut emitted: Vec<Event> = Vec::new();
match parsed_event_type {
AnthropicEventType::ContentBlockStart => {
let raw: ContentBlockStartEvent = serde_json::from_str(data)?;
state.current_block_type = Some(match &raw.content_block {
ContentBlock::Text { .. } => BlockType::Text,
ContentBlock::Thinking { .. } | ContentBlock::RedactedThinking { .. } => {
BlockType::Thinking
}
ContentBlock::ToolUse { .. } => BlockType::ToolUse,
});
match &raw.content_block {
ContentBlock::Thinking {
thinking, signature,
} => {
state.pending_thinking = Some(PendingThinking {
text: thinking.clone(),
signature: signature.clone(),
redacted_data: None,
});
}
ContentBlock::RedactedThinking { data: blob } => {
state.pending_thinking = Some(PendingThinking {
text: String::new(),
signature: None,
redacted_data: Some(blob.clone()),
});
}
_ => {}
}
emitted.push(self.convert_block_start(&raw));
}
AnthropicEventType::ContentBlockDelta => {
let raw: ContentBlockDeltaEvent = serde_json::from_str(data)?;
match &raw.delta {
DeltaBlock::ThinkingDelta { thinking } => {
if let Some(pending) = state.pending_thinking.as_mut() {
pending.text.push_str(thinking);
}
emitted.push(Event::BlockDelta(BlockDelta {
index: raw.index,
delta: DeltaContent::Thinking(thinking.clone()),
}));
}
DeltaBlock::SignatureDelta { signature } => {
if let Some(pending) = state.pending_thinking.as_mut() {
// 通常 1 回しか来ないが、複数 fragment 来ても連結しておく
match &mut pending.signature {
Some(acc) => acc.push_str(signature),
None => pending.signature = Some(signature.clone()),
}
}
}
DeltaBlock::TextDelta { text } => {
emitted.push(Event::BlockDelta(BlockDelta {
index: raw.index,
delta: DeltaContent::Text(text.clone()),
}));
}
DeltaBlock::InputJsonDelta { partial_json } => {
emitted.push(Event::BlockDelta(BlockDelta {
index: raw.index,
delta: DeltaContent::InputJson(partial_json.clone()),
}));
}
}
}
AnthropicEventType::ContentBlockStop => {
let raw: ContentBlockStopEvent = serde_json::from_str(data)?;
let block_type = state
.current_block_type
.take()
.unwrap_or(BlockType::Text);
emitted.push(Event::BlockStop(BlockStop {
index: raw.index,
block_type,
stop_reason: None,
}));
if matches!(block_type, BlockType::Thinking) {
if let Some(pending) = state.pending_thinking.take() {
emitted.push(Event::ReasoningItem(pending.into_event()));
}
}
}
// 残りは state を必要としない。既存 parse_event に委譲。
_ => {
if let Some(event) = self.parse_event(event_type, data)? {
emitted.push(event);
}
}
}
Ok(emitted)
}
fn convert_usage(&self, usage: &UsageData) -> UsageEvent {
// Anthropic の `input_tokens` は **キャッシュ外** の入力トークンのみで、
// プロンプト全長は input_tokens + cache_read + cache_creation。
@ -391,6 +525,117 @@ mod tests {
}
}
#[test]
fn thinking_block_emits_reasoning_item_with_signature() {
// thinking ブロックが完了したら ReasoningItem に text+signature が乗ること
let scheme = AnthropicScheme::new();
let mut state = AnthropicState::default();
let evs = scheme
.parse_with_state(
"content_block_start",
r#"{"type":"content_block_start","index":0,"content_block":{"type":"thinking","thinking":""}}"#,
&mut state,
)
.unwrap();
assert!(matches!(evs[0], Event::BlockStart(_)));
scheme
.parse_with_state(
"content_block_delta",
r#"{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"hello "}}"#,
&mut state,
)
.unwrap();
scheme
.parse_with_state(
"content_block_delta",
r#"{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"world"}}"#,
&mut state,
)
.unwrap();
scheme
.parse_with_state(
"content_block_delta",
r#"{"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":"SIG-XYZ"}}"#,
&mut state,
)
.unwrap();
let stop_evs = scheme
.parse_with_state(
"content_block_stop",
r#"{"type":"content_block_stop","index":0}"#,
&mut state,
)
.unwrap();
// BlockStop と ReasoningItem の 2 件が並ぶ
assert!(matches!(stop_evs[0], Event::BlockStop(_)));
let Event::ReasoningItem(reasoning) = &stop_evs[1] else {
panic!("expected ReasoningItem, got {:?}", stop_evs[1]);
};
assert_eq!(reasoning.text, "hello world");
assert_eq!(reasoning.signature.as_deref(), Some("SIG-XYZ"));
assert!(reasoning.encrypted_content.is_none());
}
#[test]
fn redacted_thinking_emits_reasoning_item_with_data() {
let scheme = AnthropicScheme::new();
let mut state = AnthropicState::default();
scheme
.parse_with_state(
"content_block_start",
r#"{"type":"content_block_start","index":0,"content_block":{"type":"redacted_thinking","data":"opaque-blob"}}"#,
&mut state,
)
.unwrap();
let stop_evs = scheme
.parse_with_state(
"content_block_stop",
r#"{"type":"content_block_stop","index":0}"#,
&mut state,
)
.unwrap();
let Event::ReasoningItem(reasoning) = &stop_evs[1] else {
panic!("expected ReasoningItem");
};
assert!(reasoning.text.is_empty());
assert!(reasoning.signature.is_none());
assert_eq!(reasoning.encrypted_content.as_deref(), Some("opaque-blob"));
}
#[test]
fn text_block_does_not_emit_reasoning_item() {
let scheme = AnthropicScheme::new();
let mut state = AnthropicState::default();
scheme
.parse_with_state(
"content_block_start",
r#"{"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}"#,
&mut state,
)
.unwrap();
scheme
.parse_with_state(
"content_block_delta",
r#"{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"hi"}}"#,
&mut state,
)
.unwrap();
let stop_evs = scheme
.parse_with_state(
"content_block_stop",
r#"{"type":"content_block_stop","index":0}"#,
&mut state,
)
.unwrap();
assert_eq!(stop_evs.len(), 1);
assert!(matches!(stop_evs[0], Event::BlockStop(_)));
}
#[test]
fn test_parse_ping() {
let scheme = AnthropicScheme::new();

View File

@ -77,6 +77,21 @@ pub(crate) enum AnthropicContentPart {
#[serde(skip_serializing_if = "Option::is_none")]
cache_control: Option<CacheControl>,
},
#[serde(rename = "thinking")]
Thinking {
thinking: String,
signature: String,
#[serde(skip_serializing_if = "Option::is_none")]
cache_control: Option<CacheControl>,
},
#[serde(rename = "redacted_thinking")]
RedactedThinking {
/// 暗号化済み reasoning blob。`Item::Reasoning::encrypted_content`
/// から渡る。
data: String,
#[serde(skip_serializing_if = "Option::is_none")]
cache_control: Option<CacheControl>,
},
#[serde(rename = "tool_use")]
ToolUse {
id: String,
@ -102,6 +117,21 @@ impl AnthropicContentPart {
}
}
fn thinking(thinking: String, signature: String) -> Self {
Self::Thinking {
thinking,
signature,
cache_control: None,
}
}
fn redacted_thinking(data: String) -> Self {
Self::RedactedThinking {
data,
cache_control: None,
}
}
fn tool_use(id: String, name: String, input: serde_json::Value) -> Self {
Self::ToolUse {
id,
@ -122,6 +152,8 @@ impl AnthropicContentPart {
fn set_cache_control(&mut self, cc: CacheControl) {
match self {
Self::Text { cache_control, .. }
| Self::Thinking { cache_control, .. }
| Self::RedactedThinking { cache_control, .. }
| Self::ToolUse { cache_control, .. }
| Self::ToolResult { cache_control, .. } => {
*cache_control = Some(cc);
@ -305,11 +337,33 @@ impl AnthropicScheme {
.push((i, AnthropicContentPart::tool_result(call_id.clone(), text)));
}
Item::Reasoning { text, .. } => {
Item::Reasoning {
text,
encrypted_content,
signature,
..
} => {
flush_pending(&mut messages, &mut pending_user, "user", &mut locations);
// Reasoning is treated as assistant text in Anthropic
// (actual thinking blocks are handled differently in streaming).
pending_assistant.push((i, AnthropicContentPart::text(text.clone())));
// Anthropic はアシスタントターン中の `thinking` /
// `redacted_thinking` ブロックを必ず assistant role の
// content_part として送り返す必要がある。
//
// - signature あり: `thinking` content_part を投影
// - signature 無し + encrypted_content あり:
// `redacted_thinking` content_part を投影
// - どちらも無い: 他 schemeOpenAI 等)から流入した
// 素の reasoning text。Anthropic に投げる意味も
// round-trip の根拠も無いので drop。
if let Some(sig) = signature.clone() {
pending_assistant.push((
i,
AnthropicContentPart::thinking(text.clone(), sig),
));
} else if let Some(data) = encrypted_content.clone() {
pending_assistant
.push((i, AnthropicContentPart::redacted_thinking(data)));
}
// どちらも None なら何も pend せず、本 item は無視。
}
}
}
@ -542,6 +596,8 @@ mod tests {
fn part_cache_control(part: &AnthropicContentPart) -> Option<CacheControl> {
match part {
AnthropicContentPart::Text { cache_control, .. }
| AnthropicContentPart::Thinking { cache_control, .. }
| AnthropicContentPart::RedactedThinking { cache_control, .. }
| AnthropicContentPart::ToolUse { cache_control, .. }
| AnthropicContentPart::ToolResult { cache_control, .. } => *cache_control,
}
@ -737,6 +793,81 @@ mod tests {
assert!(breakpoint_positions(&req).is_empty());
}
fn collect_assistant_thinking_parts(req: &AnthropicRequest) -> Vec<&AnthropicContentPart> {
let mut out = Vec::new();
for msg in &req.messages {
if msg.role != "assistant" {
continue;
}
if let AnthropicContent::Parts(parts) = &msg.content {
for part in parts {
if matches!(
part,
AnthropicContentPart::Thinking { .. }
| AnthropicContentPart::RedactedThinking { .. }
) {
out.push(part);
}
}
}
}
out
}
#[test]
fn reasoning_with_signature_projects_thinking_part() {
// Item::Reasoning に signature があれば assistant role の
// `thinking` content_part として送る。
let scheme = AnthropicScheme::new();
let request = Request::new()
.user("hi")
.item(Item::reasoning("step-by-step").with_signature("SIG-A"))
.item(Item::assistant_message("done"));
let req = scheme.build_request("claude-sonnet-4-20250514", &request, &cap_explicit());
let thinking_parts = collect_assistant_thinking_parts(&req);
assert_eq!(thinking_parts.len(), 1);
match thinking_parts[0] {
AnthropicContentPart::Thinking {
thinking, signature, ..
} => {
assert_eq!(thinking, "step-by-step");
assert_eq!(signature, "SIG-A");
}
other => panic!("expected Thinking part, got {other:?}"),
}
}
#[test]
fn reasoning_with_only_encrypted_content_projects_redacted_thinking() {
let scheme = AnthropicScheme::new();
let request = Request::new()
.user("hi")
.item(Item::reasoning("").with_encrypted_content("opaque"))
.item(Item::assistant_message("done"));
let req = scheme.build_request("claude-sonnet-4-20250514", &request, &cap_explicit());
let parts = collect_assistant_thinking_parts(&req);
assert_eq!(parts.len(), 1);
match parts[0] {
AnthropicContentPart::RedactedThinking { data, .. } => {
assert_eq!(data, "opaque");
}
other => panic!("expected RedactedThinking, got {other:?}"),
}
}
#[test]
fn reasoning_without_signature_or_encrypted_is_dropped() {
// 他 scheme から流入した素の reasoning は Anthropic に投げない。
let scheme = AnthropicScheme::new();
let request = Request::new()
.user("hi")
.item(Item::reasoning("plain text"))
.item(Item::assistant_message("done"));
let req = scheme.build_request("claude-sonnet-4-20250514", &request, &cap_explicit());
// thinking part は 1 つも乗らない
assert!(collect_assistant_thinking_parts(&req).is_empty());
}
#[test]
fn tool_definitions_carry_no_cache_control() {
// Tool JSON schema must serialise unchanged — no sneak-in of

View File

@ -9,7 +9,7 @@ use crate::llm_client::{
ClientError,
auth::AuthRequirement,
capability::ModelCapability,
event::{BlockStop, BlockType, Event},
event::{BlockType, Event, ReasoningItemEvent},
scheme::Scheme,
types::Request,
};
@ -18,12 +18,37 @@ use super::AnthropicScheme;
/// Anthropic の SSE パースで必要な状態。
///
/// `content_block_stop` イベントは `block_type` を持たない仕様なので、
/// 直前の `content_block_start` で観測した `block_type` を保持して
/// `BlockStop` に書き戻す。
/// 1. `content_block_stop` イベントは `block_type` を持たない仕様なので、
/// 直前の `content_block_start` で観測した `block_type` を保持して
/// `BlockStop` に書き戻す。
/// 2. `thinking` ブロック中の `thinking_delta` テキストと `signature_delta`
/// 署名、および `redacted_thinking` ブロックの `data` を蓄積し、
/// `content_block_stop` で `Event::ReasoningItem` を発火する
/// round-trip 永続化のため)。
#[derive(Debug, Default)]
pub struct AnthropicState {
current_block_type: Option<BlockType>,
pub(crate) current_block_type: Option<BlockType>,
pub(crate) pending_thinking: Option<PendingThinking>,
}
/// 1 つの `thinking` または `redacted_thinking` content_block の蓄積バッファ。
#[derive(Debug, Default)]
pub(crate) struct PendingThinking {
pub(crate) text: String,
pub(crate) signature: Option<String>,
pub(crate) redacted_data: Option<String>,
}
impl PendingThinking {
pub(crate) fn into_event(self) -> ReasoningItemEvent {
ReasoningItemEvent {
id: None,
text: self.text,
summary: Vec::new(),
encrypted_content: self.redacted_data,
signature: self.signature,
}
}
}
impl Scheme for AnthropicScheme {
@ -73,24 +98,7 @@ impl Scheme for AnthropicScheme {
data: &str,
state: &mut Self::State,
) -> Result<Vec<Event>, ClientError> {
let Some(mut event) = self.parse_event(event_type, data)? else {
return Ok(Vec::new());
};
match &event {
Event::BlockStart(start) => {
state.current_block_type = Some(start.block_type);
}
Event::BlockStop(stop) => {
if let Some(block_type) = state.current_block_type.take() {
event = Event::BlockStop(BlockStop {
block_type,
..stop.clone()
});
}
}
_ => {}
}
Ok(vec![event])
self.parse_with_state(event_type, data, state)
}
fn default_capability(&self) -> ModelCapability {

View File

@ -13,7 +13,7 @@ use crate::llm_client::{
ClientError,
event::{
BlockDelta, BlockMetadata, BlockStart, BlockStop, BlockType, DeltaContent, ErrorEvent,
Event, ResponseStatus, StatusEvent, UsageEvent,
Event, ReasoningItemEvent, ResponseStatus, StatusEvent, UsageEvent,
},
};
@ -22,6 +22,21 @@ use crate::llm_client::{
pub struct OpenAIResponsesState {
slots: HashMap<SlotKey, SlotInfo>,
next_index: usize,
/// 蓄積中の reasoning output_item。`output_item.added`(Reasoning) で
/// 確保し、`reasoning_text.delta` / `reasoning_summary_text.delta` で
/// 蓄積、`output_item.done`(Reasoning) で `Event::ReasoningItem` を
/// 発火してエントリを除去する。
pending_reasoning: HashMap<usize, PendingReasoning>,
}
/// 1 つの reasoning output_item の蓄積バッファ。
#[derive(Debug, Default)]
struct PendingReasoning {
id: Option<String>,
/// `reasoning_text.delta` の累積。複数 content_part あれば順に concat。
text: String,
/// `reasoning_summary_text.delta` を summary_index 順に蓄積。
summary: Vec<String>,
}
impl OpenAIResponsesState {
@ -45,6 +60,18 @@ impl OpenAIResponsesState {
(self.allocate(key, block_type), true)
}
}
fn ensure_reasoning(&mut self, output_index: usize) -> &mut PendingReasoning {
self.pending_reasoning.entry(output_index).or_default()
}
fn extend_reasoning_summary(&mut self, output_index: usize, summary_index: usize, text: &str) {
let entry = self.ensure_reasoning(output_index);
if entry.summary.len() <= summary_index {
entry.summary.resize(summary_index + 1, String::new());
}
entry.summary[summary_index].push_str(text);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
@ -89,8 +116,12 @@ enum OutputItem {
id: Option<String>,
},
Reasoning {
#[allow(dead_code)]
#[serde(default)]
id: Option<String>,
/// `output_item.done` で初めて埋まる。`include=["reasoning.encrypted_content"]`
/// 指定時に opaque blob が乗る。
#[serde(default)]
encrypted_content: Option<String>,
},
FunctionCall {
#[allow(dead_code)]
@ -319,12 +350,49 @@ pub(crate) fn parse_sse(
metadata: BlockMetadata::ToolUse { id: call_id, name },
})])
}
OutputItem::Reasoning { id, .. } => {
// wrapper を確保。中身の content_part / summary_part は
// 別 SlotKey で扱われ続けるStreaming 表示は維持)。
let entry = state.ensure_reasoning(ev.output_index);
if id.is_some() {
entry.id = id;
}
Ok(Vec::new())
}
_ => Ok(Vec::new()),
}
}
"response.output_item.done" => {
let ev: OutputItemDone = from_json(data)?;
// Reasoning wrapper の done で蓄積分を ReasoningItem として発火。
// これは `slots` の OutputItem slot とは独立している
// (FunctionCall は slots、Reasoning は pending_reasoning)。
if let OutputItem::Reasoning {
id,
encrypted_content,
..
} = ev.item
{
let mut pending = state
.pending_reasoning
.remove(&ev.output_index)
.unwrap_or_default();
if pending.id.is_none() {
pending.id = id;
}
return Ok(vec![Event::ReasoningItem(ReasoningItemEvent {
id: pending.id,
text: pending.text,
summary: pending
.summary
.into_iter()
.filter(|s| !s.is_empty())
.collect(),
encrypted_content,
signature: None,
})]);
}
if let Some(info) = state.slots.remove(&SlotKey::OutputItem(ev.output_index)) {
Ok(vec![Event::BlockStop(BlockStop {
index: info.flat_index,
@ -389,6 +457,8 @@ pub(crate) fn parse_sse(
"response.reasoning_text.delta" => {
let ev: ReasoningTextDelta = from_json(data)?;
// round-trip 用に蓄積
state.ensure_reasoning(ev.output_index).text.push_str(&ev.delta);
Ok(ensure_and_delta(
state,
SlotKey::ContentPart {
@ -419,6 +489,8 @@ pub(crate) fn parse_sse(
"response.reasoning_summary_text.delta" => {
let ev: ReasoningSummaryTextDelta = from_json(data)?;
// round-trip 用に蓄積
state.extend_reasoning_summary(ev.output_index, ev.summary_index, &ev.delta);
Ok(ensure_and_delta(
state,
SlotKey::Summary {
@ -797,6 +869,98 @@ mod tests {
));
}
#[test]
fn reasoning_output_item_emits_reasoning_item_with_text_summary_encrypted() {
// 完成済み reasoning wrapper が text + summary[] + encrypted_content を持って
// ReasoningItem として届くこと。
let mut state = OpenAIResponsesState::default();
// wrapper added (id だけ持つ)
with(
&mut state,
"response.output_item.added",
r#"{"output_index":0,"item":{"type":"reasoning","id":"r1"}}"#,
);
// 内側の reasoning_text 用 content_part
with(
&mut state,
"response.content_part.added",
r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":""}}"#,
);
with(
&mut state,
"response.reasoning_text.delta",
r#"{"output_index":0,"content_index":0,"item_id":"r1","delta":"hello "}"#,
);
with(
&mut state,
"response.reasoning_text.delta",
r#"{"output_index":0,"content_index":0,"item_id":"r1","delta":"world"}"#,
);
with(
&mut state,
"response.content_part.done",
r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":"hello world"}}"#,
);
// summary 1 件
with(
&mut state,
"response.reasoning_summary_part.added",
r#"{"output_index":0,"summary_index":0,"item_id":"r1","part":{"type":"summary_text","text":""}}"#,
);
with(
&mut state,
"response.reasoning_summary_text.delta",
r#"{"output_index":0,"summary_index":0,"item_id":"r1","delta":"sum-A"}"#,
);
with(
&mut state,
"response.reasoning_summary_part.done",
r#"{"output_index":0,"summary_index":0,"item_id":"r1"}"#,
);
// wrapper done (encrypted_content が乗る)
let evs = with(
&mut state,
"response.output_item.done",
r#"{"output_index":0,"item":{"type":"reasoning","id":"r1","encrypted_content":"ENC-XYZ"}}"#,
);
assert_eq!(evs.len(), 1);
let Event::ReasoningItem(reasoning) = &evs[0] else {
panic!("expected ReasoningItem, got {:?}", evs[0]);
};
assert_eq!(reasoning.id.as_deref(), Some("r1"));
assert_eq!(reasoning.text, "hello world");
assert_eq!(reasoning.summary, vec!["sum-A".to_string()]);
assert_eq!(reasoning.encrypted_content.as_deref(), Some("ENC-XYZ"));
assert!(reasoning.signature.is_none());
// pending_reasoning は drain されていること
assert!(state.pending_reasoning.is_empty());
}
#[test]
fn reasoning_wrapper_without_inner_content_emits_empty_text() {
// encrypted_content だけ届くreasoning_text 無し)ケースでも
// ReasoningItem は発火する。
let mut state = OpenAIResponsesState::default();
with(
&mut state,
"response.output_item.added",
r#"{"output_index":2,"item":{"type":"reasoning","id":"r9"}}"#,
);
let evs = with(
&mut state,
"response.output_item.done",
r#"{"output_index":2,"item":{"type":"reasoning","id":"r9","encrypted_content":"BLOB"}}"#,
);
let Event::ReasoningItem(r) = &evs[0] else {
panic!()
};
assert!(r.text.is_empty());
assert!(r.summary.is_empty());
assert_eq!(r.encrypted_content.as_deref(), Some("BLOB"));
}
#[test]
fn unknown_event_is_ignored() {
let (events, _) = run("response.in_progress", "{}");

View File

@ -94,8 +94,15 @@ pub enum Item {
summary: Vec<String>,
/// サーバから返された暗号化済み reasoning blob。ZDR / `store=false`
/// 運用で stateless に再送するときそのまま添える必要がある。
/// Anthropic の `redacted_thinking.data` もここに格納する。
#[serde(default, skip_serializing_if = "Option::is_none")]
encrypted_content: Option<String>,
/// Anthropic extended thinking の `signature`。新世代 Claude
/// (Opus 4.5+/Sonnet 4.6+) では同一論理ターン内の `thinking`
/// ブロックを送り返す際に必須。改ざん検知に使われる。他 scheme
/// では `None`。
#[serde(default, skip_serializing_if = "Option::is_none")]
signature: Option<String>,
/// Item status
#[serde(skip_serializing_if = "Option::is_none")]
status: Option<ItemStatus>,
@ -224,6 +231,7 @@ impl Item {
text: text.into(),
summary: Vec::new(),
encrypted_content: None,
signature: None,
status: None,
}
}
@ -247,6 +255,14 @@ impl Item {
self
}
/// Set Anthropic `signature` on a `Reasoning` item. No-op on other variants.
pub fn with_signature(mut self, sig: impl Into<String>) -> Self {
if let Self::Reasoning { signature, .. } = &mut self {
*signature = Some(sig.into());
}
self
}
// ========================================================================
// Builder methods
// ========================================================================

View File

@ -10,12 +10,14 @@
//! - [`ToolCallCollector`] - ツール呼び出しを収集するHandler
pub mod event;
mod reasoning_item_collector;
mod text_block_collector;
mod timeline;
mod tool_call_collector;
// 公開API
pub use event::*;
pub use reasoning_item_collector::ReasoningItemCollector;
pub use text_block_collector::TextBlockCollector;
pub use timeline::Timeline;
pub use tool_call_collector::ToolCallCollector;
@ -28,6 +30,7 @@ pub use crate::handler::{
Handler,
Kind,
PingKind,
ReasoningItemKind,
StatusKind,
// Block Events
TextBlockEvent,

View File

@ -0,0 +1,77 @@
//! `ReasoningItemCollector` - 完成済み reasoning item を収集する Handler
//!
//! Timeline の `ReasoningItemKind` Handler として登録し、scheme 側が
//! `Event::ReasoningItem` を発火するたびに 1 件ずつバッファに溜める。
//! Worker はターン終了時に `take_collected()` でドレインして
//! `Item::Reasoning` として `worker.history` に append する。
use std::sync::{Arc, Mutex};
use crate::handler::{Handler, ReasoningItemKind};
use crate::llm_client::event::ReasoningItemEvent;
/// 収集された reasoning item の連列。
#[derive(Clone, Default)]
pub struct ReasoningItemCollector {
collected: Arc<Mutex<Vec<ReasoningItemEvent>>>,
}
impl ReasoningItemCollector {
pub fn new() -> Self {
Self::default()
}
/// 収集済み item を取り出してクリア
pub fn take_collected(&self) -> Vec<ReasoningItemEvent> {
let mut guard = self.collected.lock().unwrap();
std::mem::take(&mut *guard)
}
/// 収集をクリア
pub fn clear(&self) {
self.collected.lock().unwrap().clear();
}
}
impl Handler<ReasoningItemKind> for ReasoningItemCollector {
type Scope = ();
fn on_event(&mut self, _scope: &mut Self::Scope, event: &ReasoningItemEvent) {
self.collected.lock().unwrap().push(event.clone());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::llm_client::event::Event;
use crate::timeline::Timeline;
#[test]
fn collects_in_order() {
let collector = ReasoningItemCollector::new();
let mut timeline = Timeline::new();
timeline.on_reasoning_item(collector.clone());
timeline.dispatch(&Event::ReasoningItem(ReasoningItemEvent {
id: Some("r1".into()),
text: "first".into(),
signature: Some("sig1".into()),
..Default::default()
}));
timeline.dispatch(&Event::ReasoningItem(ReasoningItemEvent {
id: Some("r2".into()),
text: "second".into(),
..Default::default()
}));
let items = collector.take_collected();
assert_eq!(items.len(), 2);
assert_eq!(items[0].text, "first");
assert_eq!(items[0].signature.as_deref(), Some("sig1"));
assert_eq!(items[1].text, "second");
// take は drain なので 2 度目は空
assert!(collector.take_collected().is_empty());
}
}

View File

@ -381,6 +381,7 @@ pub struct Timeline {
ping_handlers: Vec<Box<dyn ErasedHandler<PingKind>>>,
status_handlers: Vec<Box<dyn ErasedHandler<StatusKind>>>,
error_handlers: Vec<Box<dyn ErasedHandler<ErrorKind>>>,
reasoning_item_handlers: Vec<Box<dyn ErasedHandler<ReasoningItemKind>>>,
// Block系ハンドラーBlockTypeごとにグループ化
text_block_handlers: Vec<Box<dyn ErasedBlockHandler>>,
@ -410,6 +411,7 @@ impl Timeline {
ping_handlers: Vec::new(),
status_handlers: Vec::new(),
error_handlers: Vec::new(),
reasoning_item_handlers: Vec::new(),
text_block_handlers: Vec::new(),
thinking_block_handlers: Vec::new(),
tool_use_block_handlers: Vec::new(),
@ -471,6 +473,18 @@ impl Timeline {
self
}
/// `ReasoningItemKind` 用 Handler を登録
pub fn on_reasoning_item<H>(&mut self, handler: H) -> &mut Self
where
H: Handler<ReasoningItemKind> + Send + Sync + 'static,
H::Scope: Send + Sync,
{
let mut wrapper = HandlerWrapper::new(handler);
wrapper.start_scope();
self.reasoning_item_handlers.push(Box::new(wrapper));
self
}
/// TextBlockKind用のHandlerを登録
pub fn on_text_block<H>(&mut self, handler: H) -> &mut Self
where
@ -522,6 +536,9 @@ impl Timeline {
Event::BlockDelta(d) => self.handle_block_delta(d),
Event::BlockStop(s) => self.handle_block_stop(s),
Event::BlockAbort(a) => self.handle_block_abort(a),
// 完成済み reasoning item: 即時ディスパッチ
Event::ReasoningItem(r) => self.dispatch_reasoning_item(r),
}
}
@ -564,6 +581,12 @@ impl Timeline {
}
}
fn dispatch_reasoning_item(&mut self, event: &ReasoningItemEvent) {
for handler in &mut self.reasoning_item_handlers {
handler.dispatch(event);
}
}
fn handle_block_start(&mut self, start: &BlockStart) {
self.current_block = Some(start.block_type);

View File

@ -22,7 +22,7 @@ use crate::{
},
state::{Locked, Mutable, WorkerState},
timeline::event::{ErrorEvent, StatusEvent, UsageEvent},
timeline::{TextBlockCollector, Timeline, ToolCallCollector},
timeline::{ReasoningItemCollector, TextBlockCollector, Timeline, ToolCallCollector},
tool::{
ToolCall, ToolDefinition as WorkerToolDefinition, ToolError, ToolOutputLimits, ToolResult,
truncate_content,
@ -140,6 +140,9 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
text_block_collector: TextBlockCollector,
/// Tool call collector (Timeline handler)
tool_call_collector: ToolCallCollector,
/// Reasoning item collector (Timeline handler)。完成済み reasoning
/// item を 1 ターン分バッファし、history に append する。
reasoning_item_collector: ReasoningItemCollector,
/// Tool server handle
tool_server: ToolServerHandle,
/// Interceptor for control-flow decisions
@ -587,10 +590,37 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
self.tool_server.tool_definitions_sorted()
}
/// Build assistant response items from text blocks and tool calls
fn build_assistant_items(&self, text_blocks: &[String], tool_calls: &[ToolCall]) -> Vec<Item> {
/// Build assistant response items from reasoning items, text blocks, and tool calls.
///
/// Reasoning items come first (Anthropic / OpenAI Responses 双方ともに
/// アシスタント応答内で reasoning は先頭に並ぶ仕様)。これは Anthropic
/// が新世代モデルで thinking ブロックを assistant メッセージの先頭に
/// 置くことを要求するためでもある。
fn build_assistant_items(
&self,
reasoning_items: &[crate::llm_client::event::ReasoningItemEvent],
text_blocks: &[String],
tool_calls: &[ToolCall],
) -> Vec<Item> {
let mut items = Vec::new();
for r in reasoning_items {
let mut item = Item::reasoning(r.text.clone());
if let Some(id) = &r.id {
item = item.with_id(id);
}
if !r.summary.is_empty() {
item = item.with_reasoning_summary(r.summary.clone());
}
if let Some(enc) = &r.encrypted_content {
item = item.with_encrypted_content(enc);
}
if let Some(sig) = &r.signature {
item = item.with_signature(sig);
}
items.push(item);
}
// Add text as assistant message if present
let text = text_blocks.join("");
if !text.is_empty() {
@ -973,9 +1003,11 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
self.turn_count += 1;
// Collect and commit assistant items
let reasoning_items = self.reasoning_item_collector.take_collected();
let text_blocks = self.text_block_collector.take_collected();
let tool_calls = self.tool_call_collector.take_collected();
let assistant_items = self.build_assistant_items(&text_blocks, &tool_calls);
let assistant_items =
self.build_assistant_items(&reasoning_items, &text_blocks, &tool_calls);
self.history.extend(assistant_items);
if tool_calls.is_empty() {
@ -1118,18 +1150,21 @@ impl<C: LlmClient> Worker<C, Mutable> {
pub fn new(client: C) -> Self {
let text_block_collector = TextBlockCollector::new();
let tool_call_collector = ToolCallCollector::new();
let reasoning_item_collector = ReasoningItemCollector::new();
let mut timeline = Timeline::new();
let (cancel_tx, cancel_rx) = mpsc::channel(1);
// Register collectors with Timeline
timeline.on_text_block(text_block_collector.clone());
timeline.on_tool_use_block(tool_call_collector.clone());
timeline.on_reasoning_item(reasoning_item_collector.clone());
Self {
client,
timeline,
text_block_collector,
tool_call_collector,
reasoning_item_collector,
tool_server: ToolServer::new().handle(),
interceptor: Box::new(DefaultInterceptor),
system_prompt: None,
@ -1388,6 +1423,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
timeline: self.timeline,
text_block_collector: self.text_block_collector,
tool_call_collector: self.tool_call_collector,
reasoning_item_collector: self.reasoning_item_collector,
tool_server: self.tool_server,
interceptor: self.interceptor,
system_prompt: self.system_prompt,
@ -1470,6 +1506,7 @@ impl<C: LlmClient> Worker<C, Locked> {
timeline: self.timeline,
text_block_collector: self.text_block_collector,
tool_call_collector: self.tool_call_collector,
reasoning_item_collector: self.reasoning_item_collector,
tool_server: self.tool_server,
interceptor: self.interceptor,
system_prompt: self.system_prompt,

View File

@ -0,0 +1,212 @@
//! Reasoning history round-trip 統合テスト
//!
//! Worker のストリーム → history append → 次リクエスト送出までの
//! ライフサイクルで `Item::Reasoning` が脱落せず保持されることを確認する。
//!
//! 検証点:
//! - Anthropic 由来の thinking + signature が `Item::Reasoning::signature` として
//! history に残る
//! - OpenAI Responses 由来の reasoning text + summary + encrypted_content が
//! `Item::Reasoning` の各フィールドに展開される
//! - 直前の reasoning は次の outgoing request の `request.items` の先頭付近に
//! 含まれるassistant メッセージの先頭、Anthropic 仕様)
mod common;
use common::MockLlmClient;
use llm_worker::Item;
use llm_worker::Worker;
use llm_worker::llm_client::event::{
Event, ReasoningItemEvent, ResponseStatus, StatusEvent,
};
/// Anthropic 風: thinking ブロック → text → 終了 のシーケンス。
/// Worker history に Reasoning(signature 付き) → assistant_message が並ぶ。
#[tokio::test]
async fn anthropic_thinking_round_trips_signature_into_history() {
let events = vec![
Event::ReasoningItem(ReasoningItemEvent {
id: None,
text: "let me think...".into(),
summary: Vec::new(),
encrypted_content: None,
signature: Some("SIG-OPUS".into()),
}),
Event::text_block_start(0),
Event::text_delta(0, "Here's the answer"),
Event::text_block_stop(0, None),
Event::Status(StatusEvent {
status: ResponseStatus::Completed,
}),
];
let client = MockLlmClient::new(events);
let worker = Worker::new(client);
let out = worker.run("question?").await.expect("run ok");
let worker = out.worker;
let history = worker.history();
// user / reasoning / assistant_message
assert_eq!(history.len(), 3, "history: {history:?}");
assert!(matches!(history[0], Item::Message { .. }));
match &history[1] {
Item::Reasoning {
text, signature, ..
} => {
assert_eq!(text, "let me think...");
assert_eq!(signature.as_deref(), Some("SIG-OPUS"));
}
other => panic!("expected Reasoning, got {other:?}"),
}
assert_eq!(history[2].as_text(), Some("Here's the answer"));
}
/// OpenAI Responses 風: encrypted_content + summary を持った reasoning が
/// `Item::Reasoning` のフィールドに展開されること。
#[tokio::test]
async fn openai_reasoning_round_trips_encrypted_and_summary() {
let events = vec![
Event::ReasoningItem(ReasoningItemEvent {
id: Some("r1".into()),
text: "inner reasoning".into(),
summary: vec!["sum-A".into(), "sum-B".into()],
encrypted_content: Some("ENC-OPAQUE".into()),
signature: None,
}),
Event::text_block_start(0),
Event::text_delta(0, "answer"),
Event::text_block_stop(0, None),
Event::Status(StatusEvent {
status: ResponseStatus::Completed,
}),
];
let client = MockLlmClient::new(events);
let worker = Worker::new(client);
let out = worker.run("q").await.expect("run ok");
let worker = out.worker;
let history = worker.history();
match &history[1] {
Item::Reasoning {
text,
summary,
encrypted_content,
signature,
id,
..
} => {
assert_eq!(text, "inner reasoning");
assert_eq!(summary, &vec!["sum-A".to_string(), "sum-B".to_string()]);
assert_eq!(encrypted_content.as_deref(), Some("ENC-OPAQUE"));
assert!(signature.is_none());
assert_eq!(id.as_deref(), Some("r1"));
}
other => panic!("expected Reasoning, got {other:?}"),
}
}
/// Reasoning は assistant ターン内で text/tool_call より先に並ぶことAnthropic
/// が thinking を assistant メッセージの先頭に要求するため)。
#[tokio::test]
async fn reasoning_precedes_text_in_assistant_burst() {
let events = vec![
// text/tool_call とは独立に、ReasoningItem が中盤で発火しても、
// history append 時には assistant items の先頭に置かれる。
Event::text_block_start(0),
Event::text_delta(0, "intermediate"),
Event::text_block_stop(0, None),
Event::ReasoningItem(ReasoningItemEvent {
text: "after text".into(),
signature: Some("SIG".into()),
..Default::default()
}),
Event::Status(StatusEvent {
status: ResponseStatus::Completed,
}),
];
let client = MockLlmClient::new(events);
let worker = Worker::new(client);
let out = worker.run("q").await.expect("run ok");
let worker = out.worker;
let history = worker.history();
// user / reasoning(先頭) / assistant_message
assert!(matches!(history[1], Item::Reasoning { .. }));
assert_eq!(history[2].as_text(), Some("intermediate"));
}
/// resume シナリオ: history.json 由来の Item::Reasoning(signature) を Worker に
/// 注入して run しても、次の outgoing request の `Request::items` にそのまま
/// 載って LLM へ渡るworker は items を改変しない契約)。
#[tokio::test]
async fn injected_reasoning_survives_into_outgoing_request() {
use async_trait::async_trait;
use futures::Stream;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use llm_worker::llm_client::{ClientError, LlmClient, Request};
/// Request を 1 度だけキャプチャして空ストリームを返す client。
#[derive(Clone)]
struct CapturingClient {
captured: Arc<Mutex<Option<Request>>>,
}
#[async_trait]
impl LlmClient for CapturingClient {
fn clone_boxed(&self) -> Box<dyn LlmClient> {
Box::new(self.clone())
}
async fn stream(
&self,
request: Request,
) -> Result<Pin<Box<dyn Stream<Item = Result<Event, ClientError>> + Send>>, ClientError>
{
*self.captured.lock().unwrap() = Some(request);
let stream = futures::stream::iter(vec![Ok(Event::Status(StatusEvent {
status: ResponseStatus::Completed,
}))]);
Ok(Box::pin(stream))
}
}
let captured = Arc::new(Mutex::new(None));
let client = CapturingClient {
captured: captured.clone(),
};
let mut worker = Worker::new(client);
// resume: 既存 history を流し込む
worker.set_history(vec![
Item::user_message("prior question"),
Item::reasoning("prior thinking").with_signature("SIG-PRIOR"),
Item::assistant_message("prior answer"),
]);
let _ = worker.run("follow up").await.expect("run ok");
let req = captured
.lock()
.unwrap()
.take()
.expect("client should have received a request");
// Reasoning item が outgoing items に保持されていること
let mut found = false;
for item in &req.items {
if let Item::Reasoning {
text, signature, ..
} = item
{
assert_eq!(text, "prior thinking");
assert_eq!(signature.as_deref(), Some("SIG-PRIOR"));
found = true;
}
}
assert!(
found,
"Reasoning item must survive into outgoing request items: {req:?}",
req = req.items,
);
}

View File

@ -39,6 +39,10 @@ pub enum LoggedItem {
summary: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
encrypted_content: Option<String>,
/// Anthropic extended thinking signature。新世代 Claude で round-trip
/// 必須。OpenAI Responses など他 scheme では `None`。
#[serde(default, skip_serializing_if = "Option::is_none")]
signature: Option<String>,
},
}
@ -92,11 +96,13 @@ impl From<&Item> for LoggedItem {
text,
summary,
encrypted_content,
signature,
..
} => Self::Reasoning {
text: text.clone(),
summary: summary.clone(),
encrypted_content: encrypted_content.clone(),
signature: signature.clone(),
},
}
}
@ -142,11 +148,13 @@ impl From<LoggedItem> for Item {
text,
summary,
encrypted_content,
signature,
} => Item::Reasoning {
id: None,
text,
summary,
encrypted_content,
signature,
status: None,
},
}