feat: Reasoningのコンテキスト管理の対応
This commit is contained in:
parent
dd3903efde
commit
9fd61e8068
|
|
@ -91,6 +91,16 @@ impl Kind for ErrorKind {
|
|||
type Event = ErrorEvent;
|
||||
}
|
||||
|
||||
/// Reasoning item Kind - 完成済み reasoning item の永続化用
|
||||
///
|
||||
/// 1 reasoning item につき 1 度だけ発火する。Worker は
|
||||
/// `ReasoningItemCollector` 経由で受け取り、ターン終了時に
|
||||
/// `Item::Reasoning` として history に append する。
|
||||
pub struct ReasoningItemKind;
|
||||
impl Kind for ReasoningItemKind {
|
||||
type Event = ReasoningItemEvent;
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Block Kind Definitions
|
||||
// =============================================================================
|
||||
|
|
|
|||
|
|
@ -17,6 +17,9 @@ use serde::{Deserialize, Serialize};
|
|||
///
|
||||
/// - **メタイベント**: `Ping`, `Usage`, `Status`, `Error`
|
||||
/// - **ブロックイベント**: `BlockStart`, `BlockDelta`, `BlockStop`, `BlockAbort`
|
||||
/// - **永続化イベント**: `ReasoningItem` (history に commit すべき完成済み
|
||||
/// reasoning item。streaming 表示用の Thinking BlockStart/Delta/Stop と
|
||||
/// は別経路で発火する)
|
||||
///
|
||||
/// # ブロックのライフサイクル
|
||||
///
|
||||
|
|
@ -41,6 +44,18 @@ pub enum Event {
|
|||
BlockStop(BlockStop),
|
||||
/// ブロック中断
|
||||
BlockAbort(BlockAbort),
|
||||
|
||||
/// Reasoning item の完成。scheme が「次の request に送り返すための
|
||||
/// reasoning material が揃った」点で 1 度だけ発火する。
|
||||
///
|
||||
/// - Anthropic: 1 つの `thinking` content_block 完了ごと
|
||||
/// - OpenAI Responses: 1 つの reasoning output_item 完了ごと
|
||||
///
|
||||
/// 上位層(Worker / ReasoningItemCollector)はこれを `Item::Reasoning`
|
||||
/// として `worker.history` に append する。streaming 表示用の
|
||||
/// `BlockStart(Thinking)` / `BlockDelta(Thinking)` / `BlockStop(Thinking)`
|
||||
/// は依然として並行発火する(live display と round-trip persist の責務分離)。
|
||||
ReasoningItem(ReasoningItemEvent),
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
|
|
@ -212,6 +227,31 @@ impl BlockAbort {
|
|||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Reasoning Item Event
|
||||
// =============================================================================
|
||||
|
||||
/// 完成済み reasoning item。scheme が round-trip に必要なすべての
|
||||
/// material(text, summary, encrypted_content, signature, id)を揃えて
|
||||
/// 1 度だけ発火する。
|
||||
///
|
||||
/// `Item::Reasoning` のフィールドを 1:1 に持つ。
|
||||
#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
|
||||
pub struct ReasoningItemEvent {
|
||||
/// scheme 側で観測した item id(OpenAI Responses の `id`)。
|
||||
pub id: Option<String>,
|
||||
/// reasoning 本体テキスト。Anthropic は `thinking` 累積、OpenAI は
|
||||
/// `reasoning_text` 累積。redacted_thinking では空。
|
||||
pub text: String,
|
||||
/// summary (OpenAI Responses の `summary_text[]`)。他 scheme は空。
|
||||
pub summary: Vec<String>,
|
||||
/// 暗号化された opaque blob(Anthropic `redacted_thinking.data` /
|
||||
/// OpenAI Responses `encrypted_content`)。
|
||||
pub encrypted_content: Option<String>,
|
||||
/// Anthropic extended thinking signature。round-trip 必須。
|
||||
pub signature: Option<String>,
|
||||
}
|
||||
|
||||
/// 停止理由
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub enum StopReason {
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ use crate::llm_client::{
|
|||
use serde::Deserialize;
|
||||
|
||||
use super::AnthropicScheme;
|
||||
use super::scheme_impl::{AnthropicState, PendingThinking};
|
||||
|
||||
/// Anthropic SSEイベントタイプ
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
|
|
@ -75,7 +76,21 @@ pub(crate) enum ContentBlock {
|
|||
#[serde(rename = "text")]
|
||||
Text { text: String },
|
||||
#[serde(rename = "thinking")]
|
||||
Thinking { thinking: String },
|
||||
Thinking {
|
||||
#[serde(default)]
|
||||
thinking: String,
|
||||
/// 非ストリーミングレスポンス由来の初期 signature(通常はストリームでは
|
||||
/// 空 → `signature_delta` で埋まる)。
|
||||
#[serde(default)]
|
||||
signature: Option<String>,
|
||||
},
|
||||
#[serde(rename = "redacted_thinking")]
|
||||
RedactedThinking {
|
||||
/// 暗号化された opaque blob。signature ではなく、まるごと
|
||||
/// `redacted_thinking.data` として送り返す必要がある。
|
||||
#[serde(default)]
|
||||
data: String,
|
||||
},
|
||||
#[serde(rename = "tool_use")]
|
||||
ToolUse {
|
||||
id: String,
|
||||
|
|
@ -228,7 +243,9 @@ impl AnthropicScheme {
|
|||
fn convert_block_start(&self, event: &ContentBlockStartEvent) -> Event {
|
||||
let (block_type, metadata) = match &event.content_block {
|
||||
ContentBlock::Text { .. } => (BlockType::Text, BlockMetadata::Text),
|
||||
ContentBlock::Thinking { .. } => (BlockType::Thinking, BlockMetadata::Thinking),
|
||||
ContentBlock::Thinking { .. } | ContentBlock::RedactedThinking { .. } => {
|
||||
(BlockType::Thinking, BlockMetadata::Thinking)
|
||||
}
|
||||
ContentBlock::ToolUse { id, name, .. } => (
|
||||
BlockType::ToolUse,
|
||||
BlockMetadata::ToolUse {
|
||||
|
|
@ -264,6 +281,123 @@ impl AnthropicScheme {
|
|||
}))
|
||||
}
|
||||
|
||||
/// state を持ち回す上位パース。
|
||||
///
|
||||
/// `parse_event` の単発 Event に加えて、以下を行う:
|
||||
/// - `content_block_stop` の `block_type` を直前の Start 値で書き戻す
|
||||
/// - `thinking` / `redacted_thinking` ブロックの本体・signature・data を
|
||||
/// `state.pending_thinking` に蓄積し、`content_block_stop` で
|
||||
/// `Event::ReasoningItem` を追加発火する
|
||||
/// - `signature_delta` を蓄積(Stream channel には流さず、reasoning event
|
||||
/// にだけ反映する)
|
||||
pub(crate) fn parse_with_state(
|
||||
&self,
|
||||
event_type: &str,
|
||||
data: &str,
|
||||
state: &mut AnthropicState,
|
||||
) -> Result<Vec<Event>, ClientError> {
|
||||
let Some(parsed_event_type) = AnthropicEventType::parse(event_type) else {
|
||||
return Ok(Vec::new());
|
||||
};
|
||||
|
||||
// signature_delta はストリーム表示には流さず、state にだけ蓄積。
|
||||
// それ以外は parse_event で標準 Event 化する。
|
||||
let mut emitted: Vec<Event> = Vec::new();
|
||||
|
||||
match parsed_event_type {
|
||||
AnthropicEventType::ContentBlockStart => {
|
||||
let raw: ContentBlockStartEvent = serde_json::from_str(data)?;
|
||||
state.current_block_type = Some(match &raw.content_block {
|
||||
ContentBlock::Text { .. } => BlockType::Text,
|
||||
ContentBlock::Thinking { .. } | ContentBlock::RedactedThinking { .. } => {
|
||||
BlockType::Thinking
|
||||
}
|
||||
ContentBlock::ToolUse { .. } => BlockType::ToolUse,
|
||||
});
|
||||
match &raw.content_block {
|
||||
ContentBlock::Thinking {
|
||||
thinking, signature,
|
||||
} => {
|
||||
state.pending_thinking = Some(PendingThinking {
|
||||
text: thinking.clone(),
|
||||
signature: signature.clone(),
|
||||
redacted_data: None,
|
||||
});
|
||||
}
|
||||
ContentBlock::RedactedThinking { data: blob } => {
|
||||
state.pending_thinking = Some(PendingThinking {
|
||||
text: String::new(),
|
||||
signature: None,
|
||||
redacted_data: Some(blob.clone()),
|
||||
});
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
emitted.push(self.convert_block_start(&raw));
|
||||
}
|
||||
AnthropicEventType::ContentBlockDelta => {
|
||||
let raw: ContentBlockDeltaEvent = serde_json::from_str(data)?;
|
||||
match &raw.delta {
|
||||
DeltaBlock::ThinkingDelta { thinking } => {
|
||||
if let Some(pending) = state.pending_thinking.as_mut() {
|
||||
pending.text.push_str(thinking);
|
||||
}
|
||||
emitted.push(Event::BlockDelta(BlockDelta {
|
||||
index: raw.index,
|
||||
delta: DeltaContent::Thinking(thinking.clone()),
|
||||
}));
|
||||
}
|
||||
DeltaBlock::SignatureDelta { signature } => {
|
||||
if let Some(pending) = state.pending_thinking.as_mut() {
|
||||
// 通常 1 回しか来ないが、複数 fragment 来ても連結しておく
|
||||
match &mut pending.signature {
|
||||
Some(acc) => acc.push_str(signature),
|
||||
None => pending.signature = Some(signature.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
DeltaBlock::TextDelta { text } => {
|
||||
emitted.push(Event::BlockDelta(BlockDelta {
|
||||
index: raw.index,
|
||||
delta: DeltaContent::Text(text.clone()),
|
||||
}));
|
||||
}
|
||||
DeltaBlock::InputJsonDelta { partial_json } => {
|
||||
emitted.push(Event::BlockDelta(BlockDelta {
|
||||
index: raw.index,
|
||||
delta: DeltaContent::InputJson(partial_json.clone()),
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
AnthropicEventType::ContentBlockStop => {
|
||||
let raw: ContentBlockStopEvent = serde_json::from_str(data)?;
|
||||
let block_type = state
|
||||
.current_block_type
|
||||
.take()
|
||||
.unwrap_or(BlockType::Text);
|
||||
emitted.push(Event::BlockStop(BlockStop {
|
||||
index: raw.index,
|
||||
block_type,
|
||||
stop_reason: None,
|
||||
}));
|
||||
if matches!(block_type, BlockType::Thinking) {
|
||||
if let Some(pending) = state.pending_thinking.take() {
|
||||
emitted.push(Event::ReasoningItem(pending.into_event()));
|
||||
}
|
||||
}
|
||||
}
|
||||
// 残りは state を必要としない。既存 parse_event に委譲。
|
||||
_ => {
|
||||
if let Some(event) = self.parse_event(event_type, data)? {
|
||||
emitted.push(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(emitted)
|
||||
}
|
||||
|
||||
fn convert_usage(&self, usage: &UsageData) -> UsageEvent {
|
||||
// Anthropic の `input_tokens` は **キャッシュ外** の入力トークンのみで、
|
||||
// プロンプト全長は input_tokens + cache_read + cache_creation。
|
||||
|
|
@ -391,6 +525,117 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn thinking_block_emits_reasoning_item_with_signature() {
|
||||
// thinking ブロックが完了したら ReasoningItem に text+signature が乗ること
|
||||
let scheme = AnthropicScheme::new();
|
||||
let mut state = AnthropicState::default();
|
||||
|
||||
let evs = scheme
|
||||
.parse_with_state(
|
||||
"content_block_start",
|
||||
r#"{"type":"content_block_start","index":0,"content_block":{"type":"thinking","thinking":""}}"#,
|
||||
&mut state,
|
||||
)
|
||||
.unwrap();
|
||||
assert!(matches!(evs[0], Event::BlockStart(_)));
|
||||
|
||||
scheme
|
||||
.parse_with_state(
|
||||
"content_block_delta",
|
||||
r#"{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"hello "}}"#,
|
||||
&mut state,
|
||||
)
|
||||
.unwrap();
|
||||
scheme
|
||||
.parse_with_state(
|
||||
"content_block_delta",
|
||||
r#"{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"world"}}"#,
|
||||
&mut state,
|
||||
)
|
||||
.unwrap();
|
||||
scheme
|
||||
.parse_with_state(
|
||||
"content_block_delta",
|
||||
r#"{"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":"SIG-XYZ"}}"#,
|
||||
&mut state,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let stop_evs = scheme
|
||||
.parse_with_state(
|
||||
"content_block_stop",
|
||||
r#"{"type":"content_block_stop","index":0}"#,
|
||||
&mut state,
|
||||
)
|
||||
.unwrap();
|
||||
// BlockStop と ReasoningItem の 2 件が並ぶ
|
||||
assert!(matches!(stop_evs[0], Event::BlockStop(_)));
|
||||
let Event::ReasoningItem(reasoning) = &stop_evs[1] else {
|
||||
panic!("expected ReasoningItem, got {:?}", stop_evs[1]);
|
||||
};
|
||||
assert_eq!(reasoning.text, "hello world");
|
||||
assert_eq!(reasoning.signature.as_deref(), Some("SIG-XYZ"));
|
||||
assert!(reasoning.encrypted_content.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn redacted_thinking_emits_reasoning_item_with_data() {
|
||||
let scheme = AnthropicScheme::new();
|
||||
let mut state = AnthropicState::default();
|
||||
|
||||
scheme
|
||||
.parse_with_state(
|
||||
"content_block_start",
|
||||
r#"{"type":"content_block_start","index":0,"content_block":{"type":"redacted_thinking","data":"opaque-blob"}}"#,
|
||||
&mut state,
|
||||
)
|
||||
.unwrap();
|
||||
let stop_evs = scheme
|
||||
.parse_with_state(
|
||||
"content_block_stop",
|
||||
r#"{"type":"content_block_stop","index":0}"#,
|
||||
&mut state,
|
||||
)
|
||||
.unwrap();
|
||||
let Event::ReasoningItem(reasoning) = &stop_evs[1] else {
|
||||
panic!("expected ReasoningItem");
|
||||
};
|
||||
assert!(reasoning.text.is_empty());
|
||||
assert!(reasoning.signature.is_none());
|
||||
assert_eq!(reasoning.encrypted_content.as_deref(), Some("opaque-blob"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn text_block_does_not_emit_reasoning_item() {
|
||||
let scheme = AnthropicScheme::new();
|
||||
let mut state = AnthropicState::default();
|
||||
|
||||
scheme
|
||||
.parse_with_state(
|
||||
"content_block_start",
|
||||
r#"{"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}"#,
|
||||
&mut state,
|
||||
)
|
||||
.unwrap();
|
||||
scheme
|
||||
.parse_with_state(
|
||||
"content_block_delta",
|
||||
r#"{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"hi"}}"#,
|
||||
&mut state,
|
||||
)
|
||||
.unwrap();
|
||||
let stop_evs = scheme
|
||||
.parse_with_state(
|
||||
"content_block_stop",
|
||||
r#"{"type":"content_block_stop","index":0}"#,
|
||||
&mut state,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(stop_evs.len(), 1);
|
||||
assert!(matches!(stop_evs[0], Event::BlockStop(_)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_ping() {
|
||||
let scheme = AnthropicScheme::new();
|
||||
|
|
|
|||
|
|
@ -77,6 +77,21 @@ pub(crate) enum AnthropicContentPart {
|
|||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
cache_control: Option<CacheControl>,
|
||||
},
|
||||
#[serde(rename = "thinking")]
|
||||
Thinking {
|
||||
thinking: String,
|
||||
signature: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
cache_control: Option<CacheControl>,
|
||||
},
|
||||
#[serde(rename = "redacted_thinking")]
|
||||
RedactedThinking {
|
||||
/// 暗号化済み reasoning blob。`Item::Reasoning::encrypted_content`
|
||||
/// から渡る。
|
||||
data: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
cache_control: Option<CacheControl>,
|
||||
},
|
||||
#[serde(rename = "tool_use")]
|
||||
ToolUse {
|
||||
id: String,
|
||||
|
|
@ -102,6 +117,21 @@ impl AnthropicContentPart {
|
|||
}
|
||||
}
|
||||
|
||||
fn thinking(thinking: String, signature: String) -> Self {
|
||||
Self::Thinking {
|
||||
thinking,
|
||||
signature,
|
||||
cache_control: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn redacted_thinking(data: String) -> Self {
|
||||
Self::RedactedThinking {
|
||||
data,
|
||||
cache_control: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn tool_use(id: String, name: String, input: serde_json::Value) -> Self {
|
||||
Self::ToolUse {
|
||||
id,
|
||||
|
|
@ -122,6 +152,8 @@ impl AnthropicContentPart {
|
|||
fn set_cache_control(&mut self, cc: CacheControl) {
|
||||
match self {
|
||||
Self::Text { cache_control, .. }
|
||||
| Self::Thinking { cache_control, .. }
|
||||
| Self::RedactedThinking { cache_control, .. }
|
||||
| Self::ToolUse { cache_control, .. }
|
||||
| Self::ToolResult { cache_control, .. } => {
|
||||
*cache_control = Some(cc);
|
||||
|
|
@ -305,11 +337,33 @@ impl AnthropicScheme {
|
|||
.push((i, AnthropicContentPart::tool_result(call_id.clone(), text)));
|
||||
}
|
||||
|
||||
Item::Reasoning { text, .. } => {
|
||||
Item::Reasoning {
|
||||
text,
|
||||
encrypted_content,
|
||||
signature,
|
||||
..
|
||||
} => {
|
||||
flush_pending(&mut messages, &mut pending_user, "user", &mut locations);
|
||||
// Reasoning is treated as assistant text in Anthropic
|
||||
// (actual thinking blocks are handled differently in streaming).
|
||||
pending_assistant.push((i, AnthropicContentPart::text(text.clone())));
|
||||
// Anthropic はアシスタントターン中の `thinking` /
|
||||
// `redacted_thinking` ブロックを必ず assistant role の
|
||||
// content_part として送り返す必要がある。
|
||||
//
|
||||
// - signature あり: `thinking` content_part を投影
|
||||
// - signature 無し + encrypted_content あり:
|
||||
// `redacted_thinking` content_part を投影
|
||||
// - どちらも無い: 他 scheme(OpenAI 等)から流入した
|
||||
// 素の reasoning text。Anthropic に投げる意味も
|
||||
// round-trip の根拠も無いので drop。
|
||||
if let Some(sig) = signature.clone() {
|
||||
pending_assistant.push((
|
||||
i,
|
||||
AnthropicContentPart::thinking(text.clone(), sig),
|
||||
));
|
||||
} else if let Some(data) = encrypted_content.clone() {
|
||||
pending_assistant
|
||||
.push((i, AnthropicContentPart::redacted_thinking(data)));
|
||||
}
|
||||
// どちらも None なら何も pend せず、本 item は無視。
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -542,6 +596,8 @@ mod tests {
|
|||
fn part_cache_control(part: &AnthropicContentPart) -> Option<CacheControl> {
|
||||
match part {
|
||||
AnthropicContentPart::Text { cache_control, .. }
|
||||
| AnthropicContentPart::Thinking { cache_control, .. }
|
||||
| AnthropicContentPart::RedactedThinking { cache_control, .. }
|
||||
| AnthropicContentPart::ToolUse { cache_control, .. }
|
||||
| AnthropicContentPart::ToolResult { cache_control, .. } => *cache_control,
|
||||
}
|
||||
|
|
@ -737,6 +793,81 @@ mod tests {
|
|||
assert!(breakpoint_positions(&req).is_empty());
|
||||
}
|
||||
|
||||
fn collect_assistant_thinking_parts(req: &AnthropicRequest) -> Vec<&AnthropicContentPart> {
|
||||
let mut out = Vec::new();
|
||||
for msg in &req.messages {
|
||||
if msg.role != "assistant" {
|
||||
continue;
|
||||
}
|
||||
if let AnthropicContent::Parts(parts) = &msg.content {
|
||||
for part in parts {
|
||||
if matches!(
|
||||
part,
|
||||
AnthropicContentPart::Thinking { .. }
|
||||
| AnthropicContentPart::RedactedThinking { .. }
|
||||
) {
|
||||
out.push(part);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reasoning_with_signature_projects_thinking_part() {
|
||||
// Item::Reasoning に signature があれば assistant role の
|
||||
// `thinking` content_part として送る。
|
||||
let scheme = AnthropicScheme::new();
|
||||
let request = Request::new()
|
||||
.user("hi")
|
||||
.item(Item::reasoning("step-by-step").with_signature("SIG-A"))
|
||||
.item(Item::assistant_message("done"));
|
||||
let req = scheme.build_request("claude-sonnet-4-20250514", &request, &cap_explicit());
|
||||
let thinking_parts = collect_assistant_thinking_parts(&req);
|
||||
assert_eq!(thinking_parts.len(), 1);
|
||||
match thinking_parts[0] {
|
||||
AnthropicContentPart::Thinking {
|
||||
thinking, signature, ..
|
||||
} => {
|
||||
assert_eq!(thinking, "step-by-step");
|
||||
assert_eq!(signature, "SIG-A");
|
||||
}
|
||||
other => panic!("expected Thinking part, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reasoning_with_only_encrypted_content_projects_redacted_thinking() {
|
||||
let scheme = AnthropicScheme::new();
|
||||
let request = Request::new()
|
||||
.user("hi")
|
||||
.item(Item::reasoning("").with_encrypted_content("opaque"))
|
||||
.item(Item::assistant_message("done"));
|
||||
let req = scheme.build_request("claude-sonnet-4-20250514", &request, &cap_explicit());
|
||||
let parts = collect_assistant_thinking_parts(&req);
|
||||
assert_eq!(parts.len(), 1);
|
||||
match parts[0] {
|
||||
AnthropicContentPart::RedactedThinking { data, .. } => {
|
||||
assert_eq!(data, "opaque");
|
||||
}
|
||||
other => panic!("expected RedactedThinking, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reasoning_without_signature_or_encrypted_is_dropped() {
|
||||
// 他 scheme から流入した素の reasoning は Anthropic に投げない。
|
||||
let scheme = AnthropicScheme::new();
|
||||
let request = Request::new()
|
||||
.user("hi")
|
||||
.item(Item::reasoning("plain text"))
|
||||
.item(Item::assistant_message("done"));
|
||||
let req = scheme.build_request("claude-sonnet-4-20250514", &request, &cap_explicit());
|
||||
// thinking part は 1 つも乗らない
|
||||
assert!(collect_assistant_thinking_parts(&req).is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tool_definitions_carry_no_cache_control() {
|
||||
// Tool JSON schema must serialise unchanged — no sneak-in of
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ use crate::llm_client::{
|
|||
ClientError,
|
||||
auth::AuthRequirement,
|
||||
capability::ModelCapability,
|
||||
event::{BlockStop, BlockType, Event},
|
||||
event::{BlockType, Event, ReasoningItemEvent},
|
||||
scheme::Scheme,
|
||||
types::Request,
|
||||
};
|
||||
|
|
@ -18,12 +18,37 @@ use super::AnthropicScheme;
|
|||
|
||||
/// Anthropic の SSE パースで必要な状態。
|
||||
///
|
||||
/// `content_block_stop` イベントは `block_type` を持たない仕様なので、
|
||||
/// 直前の `content_block_start` で観測した `block_type` を保持して
|
||||
/// `BlockStop` に書き戻す。
|
||||
/// 1. `content_block_stop` イベントは `block_type` を持たない仕様なので、
|
||||
/// 直前の `content_block_start` で観測した `block_type` を保持して
|
||||
/// `BlockStop` に書き戻す。
|
||||
/// 2. `thinking` ブロック中の `thinking_delta` テキストと `signature_delta`
|
||||
/// 署名、および `redacted_thinking` ブロックの `data` を蓄積し、
|
||||
/// `content_block_stop` で `Event::ReasoningItem` を発火する
|
||||
/// (round-trip 永続化のため)。
|
||||
#[derive(Debug, Default)]
|
||||
pub struct AnthropicState {
|
||||
current_block_type: Option<BlockType>,
|
||||
pub(crate) current_block_type: Option<BlockType>,
|
||||
pub(crate) pending_thinking: Option<PendingThinking>,
|
||||
}
|
||||
|
||||
/// 1 つの `thinking` または `redacted_thinking` content_block の蓄積バッファ。
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct PendingThinking {
|
||||
pub(crate) text: String,
|
||||
pub(crate) signature: Option<String>,
|
||||
pub(crate) redacted_data: Option<String>,
|
||||
}
|
||||
|
||||
impl PendingThinking {
|
||||
pub(crate) fn into_event(self) -> ReasoningItemEvent {
|
||||
ReasoningItemEvent {
|
||||
id: None,
|
||||
text: self.text,
|
||||
summary: Vec::new(),
|
||||
encrypted_content: self.redacted_data,
|
||||
signature: self.signature,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Scheme for AnthropicScheme {
|
||||
|
|
@ -73,24 +98,7 @@ impl Scheme for AnthropicScheme {
|
|||
data: &str,
|
||||
state: &mut Self::State,
|
||||
) -> Result<Vec<Event>, ClientError> {
|
||||
let Some(mut event) = self.parse_event(event_type, data)? else {
|
||||
return Ok(Vec::new());
|
||||
};
|
||||
match &event {
|
||||
Event::BlockStart(start) => {
|
||||
state.current_block_type = Some(start.block_type);
|
||||
}
|
||||
Event::BlockStop(stop) => {
|
||||
if let Some(block_type) = state.current_block_type.take() {
|
||||
event = Event::BlockStop(BlockStop {
|
||||
block_type,
|
||||
..stop.clone()
|
||||
});
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
Ok(vec![event])
|
||||
self.parse_with_state(event_type, data, state)
|
||||
}
|
||||
|
||||
fn default_capability(&self) -> ModelCapability {
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ use crate::llm_client::{
|
|||
ClientError,
|
||||
event::{
|
||||
BlockDelta, BlockMetadata, BlockStart, BlockStop, BlockType, DeltaContent, ErrorEvent,
|
||||
Event, ResponseStatus, StatusEvent, UsageEvent,
|
||||
Event, ReasoningItemEvent, ResponseStatus, StatusEvent, UsageEvent,
|
||||
},
|
||||
};
|
||||
|
||||
|
|
@ -22,6 +22,21 @@ use crate::llm_client::{
|
|||
pub struct OpenAIResponsesState {
|
||||
slots: HashMap<SlotKey, SlotInfo>,
|
||||
next_index: usize,
|
||||
/// 蓄積中の reasoning output_item。`output_item.added`(Reasoning) で
|
||||
/// 確保し、`reasoning_text.delta` / `reasoning_summary_text.delta` で
|
||||
/// 蓄積、`output_item.done`(Reasoning) で `Event::ReasoningItem` を
|
||||
/// 発火してエントリを除去する。
|
||||
pending_reasoning: HashMap<usize, PendingReasoning>,
|
||||
}
|
||||
|
||||
/// 1 つの reasoning output_item の蓄積バッファ。
|
||||
#[derive(Debug, Default)]
|
||||
struct PendingReasoning {
|
||||
id: Option<String>,
|
||||
/// `reasoning_text.delta` の累積。複数 content_part あれば順に concat。
|
||||
text: String,
|
||||
/// `reasoning_summary_text.delta` を summary_index 順に蓄積。
|
||||
summary: Vec<String>,
|
||||
}
|
||||
|
||||
impl OpenAIResponsesState {
|
||||
|
|
@ -45,6 +60,18 @@ impl OpenAIResponsesState {
|
|||
(self.allocate(key, block_type), true)
|
||||
}
|
||||
}
|
||||
|
||||
fn ensure_reasoning(&mut self, output_index: usize) -> &mut PendingReasoning {
|
||||
self.pending_reasoning.entry(output_index).or_default()
|
||||
}
|
||||
|
||||
fn extend_reasoning_summary(&mut self, output_index: usize, summary_index: usize, text: &str) {
|
||||
let entry = self.ensure_reasoning(output_index);
|
||||
if entry.summary.len() <= summary_index {
|
||||
entry.summary.resize(summary_index + 1, String::new());
|
||||
}
|
||||
entry.summary[summary_index].push_str(text);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
|
|
@ -89,8 +116,12 @@ enum OutputItem {
|
|||
id: Option<String>,
|
||||
},
|
||||
Reasoning {
|
||||
#[allow(dead_code)]
|
||||
#[serde(default)]
|
||||
id: Option<String>,
|
||||
/// `output_item.done` で初めて埋まる。`include=["reasoning.encrypted_content"]`
|
||||
/// 指定時に opaque blob が乗る。
|
||||
#[serde(default)]
|
||||
encrypted_content: Option<String>,
|
||||
},
|
||||
FunctionCall {
|
||||
#[allow(dead_code)]
|
||||
|
|
@ -319,12 +350,49 @@ pub(crate) fn parse_sse(
|
|||
metadata: BlockMetadata::ToolUse { id: call_id, name },
|
||||
})])
|
||||
}
|
||||
OutputItem::Reasoning { id, .. } => {
|
||||
// wrapper を確保。中身の content_part / summary_part は
|
||||
// 別 SlotKey で扱われ続ける(Streaming 表示は維持)。
|
||||
let entry = state.ensure_reasoning(ev.output_index);
|
||||
if id.is_some() {
|
||||
entry.id = id;
|
||||
}
|
||||
Ok(Vec::new())
|
||||
}
|
||||
_ => Ok(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
"response.output_item.done" => {
|
||||
let ev: OutputItemDone = from_json(data)?;
|
||||
// Reasoning wrapper の done で蓄積分を ReasoningItem として発火。
|
||||
// これは `slots` の OutputItem slot とは独立している
|
||||
// (FunctionCall は slots、Reasoning は pending_reasoning)。
|
||||
if let OutputItem::Reasoning {
|
||||
id,
|
||||
encrypted_content,
|
||||
..
|
||||
} = ev.item
|
||||
{
|
||||
let mut pending = state
|
||||
.pending_reasoning
|
||||
.remove(&ev.output_index)
|
||||
.unwrap_or_default();
|
||||
if pending.id.is_none() {
|
||||
pending.id = id;
|
||||
}
|
||||
return Ok(vec![Event::ReasoningItem(ReasoningItemEvent {
|
||||
id: pending.id,
|
||||
text: pending.text,
|
||||
summary: pending
|
||||
.summary
|
||||
.into_iter()
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect(),
|
||||
encrypted_content,
|
||||
signature: None,
|
||||
})]);
|
||||
}
|
||||
if let Some(info) = state.slots.remove(&SlotKey::OutputItem(ev.output_index)) {
|
||||
Ok(vec![Event::BlockStop(BlockStop {
|
||||
index: info.flat_index,
|
||||
|
|
@ -389,6 +457,8 @@ pub(crate) fn parse_sse(
|
|||
|
||||
"response.reasoning_text.delta" => {
|
||||
let ev: ReasoningTextDelta = from_json(data)?;
|
||||
// round-trip 用に蓄積
|
||||
state.ensure_reasoning(ev.output_index).text.push_str(&ev.delta);
|
||||
Ok(ensure_and_delta(
|
||||
state,
|
||||
SlotKey::ContentPart {
|
||||
|
|
@ -419,6 +489,8 @@ pub(crate) fn parse_sse(
|
|||
|
||||
"response.reasoning_summary_text.delta" => {
|
||||
let ev: ReasoningSummaryTextDelta = from_json(data)?;
|
||||
// round-trip 用に蓄積
|
||||
state.extend_reasoning_summary(ev.output_index, ev.summary_index, &ev.delta);
|
||||
Ok(ensure_and_delta(
|
||||
state,
|
||||
SlotKey::Summary {
|
||||
|
|
@ -797,6 +869,98 @@ mod tests {
|
|||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reasoning_output_item_emits_reasoning_item_with_text_summary_encrypted() {
|
||||
// 完成済み reasoning wrapper が text + summary[] + encrypted_content を持って
|
||||
// ReasoningItem として届くこと。
|
||||
let mut state = OpenAIResponsesState::default();
|
||||
|
||||
// wrapper added (id だけ持つ)
|
||||
with(
|
||||
&mut state,
|
||||
"response.output_item.added",
|
||||
r#"{"output_index":0,"item":{"type":"reasoning","id":"r1"}}"#,
|
||||
);
|
||||
// 内側の reasoning_text 用 content_part
|
||||
with(
|
||||
&mut state,
|
||||
"response.content_part.added",
|
||||
r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":""}}"#,
|
||||
);
|
||||
with(
|
||||
&mut state,
|
||||
"response.reasoning_text.delta",
|
||||
r#"{"output_index":0,"content_index":0,"item_id":"r1","delta":"hello "}"#,
|
||||
);
|
||||
with(
|
||||
&mut state,
|
||||
"response.reasoning_text.delta",
|
||||
r#"{"output_index":0,"content_index":0,"item_id":"r1","delta":"world"}"#,
|
||||
);
|
||||
with(
|
||||
&mut state,
|
||||
"response.content_part.done",
|
||||
r#"{"output_index":0,"content_index":0,"item_id":"r1","part":{"type":"reasoning_text","text":"hello world"}}"#,
|
||||
);
|
||||
// summary 1 件
|
||||
with(
|
||||
&mut state,
|
||||
"response.reasoning_summary_part.added",
|
||||
r#"{"output_index":0,"summary_index":0,"item_id":"r1","part":{"type":"summary_text","text":""}}"#,
|
||||
);
|
||||
with(
|
||||
&mut state,
|
||||
"response.reasoning_summary_text.delta",
|
||||
r#"{"output_index":0,"summary_index":0,"item_id":"r1","delta":"sum-A"}"#,
|
||||
);
|
||||
with(
|
||||
&mut state,
|
||||
"response.reasoning_summary_part.done",
|
||||
r#"{"output_index":0,"summary_index":0,"item_id":"r1"}"#,
|
||||
);
|
||||
|
||||
// wrapper done (encrypted_content が乗る)
|
||||
let evs = with(
|
||||
&mut state,
|
||||
"response.output_item.done",
|
||||
r#"{"output_index":0,"item":{"type":"reasoning","id":"r1","encrypted_content":"ENC-XYZ"}}"#,
|
||||
);
|
||||
assert_eq!(evs.len(), 1);
|
||||
let Event::ReasoningItem(reasoning) = &evs[0] else {
|
||||
panic!("expected ReasoningItem, got {:?}", evs[0]);
|
||||
};
|
||||
assert_eq!(reasoning.id.as_deref(), Some("r1"));
|
||||
assert_eq!(reasoning.text, "hello world");
|
||||
assert_eq!(reasoning.summary, vec!["sum-A".to_string()]);
|
||||
assert_eq!(reasoning.encrypted_content.as_deref(), Some("ENC-XYZ"));
|
||||
assert!(reasoning.signature.is_none());
|
||||
// pending_reasoning は drain されていること
|
||||
assert!(state.pending_reasoning.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reasoning_wrapper_without_inner_content_emits_empty_text() {
|
||||
// encrypted_content だけ届く(reasoning_text 無し)ケースでも
|
||||
// ReasoningItem は発火する。
|
||||
let mut state = OpenAIResponsesState::default();
|
||||
with(
|
||||
&mut state,
|
||||
"response.output_item.added",
|
||||
r#"{"output_index":2,"item":{"type":"reasoning","id":"r9"}}"#,
|
||||
);
|
||||
let evs = with(
|
||||
&mut state,
|
||||
"response.output_item.done",
|
||||
r#"{"output_index":2,"item":{"type":"reasoning","id":"r9","encrypted_content":"BLOB"}}"#,
|
||||
);
|
||||
let Event::ReasoningItem(r) = &evs[0] else {
|
||||
panic!()
|
||||
};
|
||||
assert!(r.text.is_empty());
|
||||
assert!(r.summary.is_empty());
|
||||
assert_eq!(r.encrypted_content.as_deref(), Some("BLOB"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unknown_event_is_ignored() {
|
||||
let (events, _) = run("response.in_progress", "{}");
|
||||
|
|
|
|||
|
|
@ -94,8 +94,15 @@ pub enum Item {
|
|||
summary: Vec<String>,
|
||||
/// サーバから返された暗号化済み reasoning blob。ZDR / `store=false`
|
||||
/// 運用で stateless に再送するときそのまま添える必要がある。
|
||||
/// Anthropic の `redacted_thinking.data` もここに格納する。
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
encrypted_content: Option<String>,
|
||||
/// Anthropic extended thinking の `signature`。新世代 Claude
|
||||
/// (Opus 4.5+/Sonnet 4.6+) では同一論理ターン内の `thinking`
|
||||
/// ブロックを送り返す際に必須。改ざん検知に使われる。他 scheme
|
||||
/// では `None`。
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
signature: Option<String>,
|
||||
/// Item status
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
status: Option<ItemStatus>,
|
||||
|
|
@ -224,6 +231,7 @@ impl Item {
|
|||
text: text.into(),
|
||||
summary: Vec::new(),
|
||||
encrypted_content: None,
|
||||
signature: None,
|
||||
status: None,
|
||||
}
|
||||
}
|
||||
|
|
@ -247,6 +255,14 @@ impl Item {
|
|||
self
|
||||
}
|
||||
|
||||
/// Set Anthropic `signature` on a `Reasoning` item. No-op on other variants.
|
||||
pub fn with_signature(mut self, sig: impl Into<String>) -> Self {
|
||||
if let Self::Reasoning { signature, .. } = &mut self {
|
||||
*signature = Some(sig.into());
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
// ========================================================================
|
||||
// Builder methods
|
||||
// ========================================================================
|
||||
|
|
|
|||
|
|
@ -10,12 +10,14 @@
|
|||
//! - [`ToolCallCollector`] - ツール呼び出しを収集するHandler
|
||||
|
||||
pub mod event;
|
||||
mod reasoning_item_collector;
|
||||
mod text_block_collector;
|
||||
mod timeline;
|
||||
mod tool_call_collector;
|
||||
|
||||
// 公開API
|
||||
pub use event::*;
|
||||
pub use reasoning_item_collector::ReasoningItemCollector;
|
||||
pub use text_block_collector::TextBlockCollector;
|
||||
pub use timeline::Timeline;
|
||||
pub use tool_call_collector::ToolCallCollector;
|
||||
|
|
@ -28,6 +30,7 @@ pub use crate::handler::{
|
|||
Handler,
|
||||
Kind,
|
||||
PingKind,
|
||||
ReasoningItemKind,
|
||||
StatusKind,
|
||||
// Block Events
|
||||
TextBlockEvent,
|
||||
|
|
|
|||
77
crates/llm-worker/src/timeline/reasoning_item_collector.rs
Normal file
77
crates/llm-worker/src/timeline/reasoning_item_collector.rs
Normal file
|
|
@ -0,0 +1,77 @@
|
|||
//! `ReasoningItemCollector` - 完成済み reasoning item を収集する Handler
|
||||
//!
|
||||
//! Timeline の `ReasoningItemKind` Handler として登録し、scheme 側が
|
||||
//! `Event::ReasoningItem` を発火するたびに 1 件ずつバッファに溜める。
|
||||
//! Worker はターン終了時に `take_collected()` でドレインして
|
||||
//! `Item::Reasoning` として `worker.history` に append する。
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use crate::handler::{Handler, ReasoningItemKind};
|
||||
use crate::llm_client::event::ReasoningItemEvent;
|
||||
|
||||
/// 収集された reasoning item の連列。
|
||||
#[derive(Clone, Default)]
|
||||
pub struct ReasoningItemCollector {
|
||||
collected: Arc<Mutex<Vec<ReasoningItemEvent>>>,
|
||||
}
|
||||
|
||||
impl ReasoningItemCollector {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// 収集済み item を取り出してクリア
|
||||
pub fn take_collected(&self) -> Vec<ReasoningItemEvent> {
|
||||
let mut guard = self.collected.lock().unwrap();
|
||||
std::mem::take(&mut *guard)
|
||||
}
|
||||
|
||||
/// 収集をクリア
|
||||
pub fn clear(&self) {
|
||||
self.collected.lock().unwrap().clear();
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<ReasoningItemKind> for ReasoningItemCollector {
|
||||
type Scope = ();
|
||||
|
||||
fn on_event(&mut self, _scope: &mut Self::Scope, event: &ReasoningItemEvent) {
|
||||
self.collected.lock().unwrap().push(event.clone());
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::llm_client::event::Event;
|
||||
use crate::timeline::Timeline;
|
||||
|
||||
#[test]
|
||||
fn collects_in_order() {
|
||||
let collector = ReasoningItemCollector::new();
|
||||
let mut timeline = Timeline::new();
|
||||
timeline.on_reasoning_item(collector.clone());
|
||||
|
||||
timeline.dispatch(&Event::ReasoningItem(ReasoningItemEvent {
|
||||
id: Some("r1".into()),
|
||||
text: "first".into(),
|
||||
signature: Some("sig1".into()),
|
||||
..Default::default()
|
||||
}));
|
||||
timeline.dispatch(&Event::ReasoningItem(ReasoningItemEvent {
|
||||
id: Some("r2".into()),
|
||||
text: "second".into(),
|
||||
..Default::default()
|
||||
}));
|
||||
|
||||
let items = collector.take_collected();
|
||||
assert_eq!(items.len(), 2);
|
||||
assert_eq!(items[0].text, "first");
|
||||
assert_eq!(items[0].signature.as_deref(), Some("sig1"));
|
||||
assert_eq!(items[1].text, "second");
|
||||
|
||||
// take は drain なので 2 度目は空
|
||||
assert!(collector.take_collected().is_empty());
|
||||
}
|
||||
}
|
||||
|
|
@ -381,6 +381,7 @@ pub struct Timeline {
|
|||
ping_handlers: Vec<Box<dyn ErasedHandler<PingKind>>>,
|
||||
status_handlers: Vec<Box<dyn ErasedHandler<StatusKind>>>,
|
||||
error_handlers: Vec<Box<dyn ErasedHandler<ErrorKind>>>,
|
||||
reasoning_item_handlers: Vec<Box<dyn ErasedHandler<ReasoningItemKind>>>,
|
||||
|
||||
// Block系ハンドラー(BlockTypeごとにグループ化)
|
||||
text_block_handlers: Vec<Box<dyn ErasedBlockHandler>>,
|
||||
|
|
@ -410,6 +411,7 @@ impl Timeline {
|
|||
ping_handlers: Vec::new(),
|
||||
status_handlers: Vec::new(),
|
||||
error_handlers: Vec::new(),
|
||||
reasoning_item_handlers: Vec::new(),
|
||||
text_block_handlers: Vec::new(),
|
||||
thinking_block_handlers: Vec::new(),
|
||||
tool_use_block_handlers: Vec::new(),
|
||||
|
|
@ -471,6 +473,18 @@ impl Timeline {
|
|||
self
|
||||
}
|
||||
|
||||
/// `ReasoningItemKind` 用 Handler を登録
|
||||
pub fn on_reasoning_item<H>(&mut self, handler: H) -> &mut Self
|
||||
where
|
||||
H: Handler<ReasoningItemKind> + Send + Sync + 'static,
|
||||
H::Scope: Send + Sync,
|
||||
{
|
||||
let mut wrapper = HandlerWrapper::new(handler);
|
||||
wrapper.start_scope();
|
||||
self.reasoning_item_handlers.push(Box::new(wrapper));
|
||||
self
|
||||
}
|
||||
|
||||
/// TextBlockKind用のHandlerを登録
|
||||
pub fn on_text_block<H>(&mut self, handler: H) -> &mut Self
|
||||
where
|
||||
|
|
@ -522,6 +536,9 @@ impl Timeline {
|
|||
Event::BlockDelta(d) => self.handle_block_delta(d),
|
||||
Event::BlockStop(s) => self.handle_block_stop(s),
|
||||
Event::BlockAbort(a) => self.handle_block_abort(a),
|
||||
|
||||
// 完成済み reasoning item: 即時ディスパッチ
|
||||
Event::ReasoningItem(r) => self.dispatch_reasoning_item(r),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -564,6 +581,12 @@ impl Timeline {
|
|||
}
|
||||
}
|
||||
|
||||
fn dispatch_reasoning_item(&mut self, event: &ReasoningItemEvent) {
|
||||
for handler in &mut self.reasoning_item_handlers {
|
||||
handler.dispatch(event);
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_block_start(&mut self, start: &BlockStart) {
|
||||
self.current_block = Some(start.block_type);
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ use crate::{
|
|||
},
|
||||
state::{Locked, Mutable, WorkerState},
|
||||
timeline::event::{ErrorEvent, StatusEvent, UsageEvent},
|
||||
timeline::{TextBlockCollector, Timeline, ToolCallCollector},
|
||||
timeline::{ReasoningItemCollector, TextBlockCollector, Timeline, ToolCallCollector},
|
||||
tool::{
|
||||
ToolCall, ToolDefinition as WorkerToolDefinition, ToolError, ToolOutputLimits, ToolResult,
|
||||
truncate_content,
|
||||
|
|
@ -140,6 +140,9 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
|
|||
text_block_collector: TextBlockCollector,
|
||||
/// Tool call collector (Timeline handler)
|
||||
tool_call_collector: ToolCallCollector,
|
||||
/// Reasoning item collector (Timeline handler)。完成済み reasoning
|
||||
/// item を 1 ターン分バッファし、history に append する。
|
||||
reasoning_item_collector: ReasoningItemCollector,
|
||||
/// Tool server handle
|
||||
tool_server: ToolServerHandle,
|
||||
/// Interceptor for control-flow decisions
|
||||
|
|
@ -587,10 +590,37 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
self.tool_server.tool_definitions_sorted()
|
||||
}
|
||||
|
||||
/// Build assistant response items from text blocks and tool calls
|
||||
fn build_assistant_items(&self, text_blocks: &[String], tool_calls: &[ToolCall]) -> Vec<Item> {
|
||||
/// Build assistant response items from reasoning items, text blocks, and tool calls.
|
||||
///
|
||||
/// Reasoning items come first (Anthropic / OpenAI Responses 双方ともに
|
||||
/// アシスタント応答内で reasoning は先頭に並ぶ仕様)。これは Anthropic
|
||||
/// が新世代モデルで thinking ブロックを assistant メッセージの先頭に
|
||||
/// 置くことを要求するためでもある。
|
||||
fn build_assistant_items(
|
||||
&self,
|
||||
reasoning_items: &[crate::llm_client::event::ReasoningItemEvent],
|
||||
text_blocks: &[String],
|
||||
tool_calls: &[ToolCall],
|
||||
) -> Vec<Item> {
|
||||
let mut items = Vec::new();
|
||||
|
||||
for r in reasoning_items {
|
||||
let mut item = Item::reasoning(r.text.clone());
|
||||
if let Some(id) = &r.id {
|
||||
item = item.with_id(id);
|
||||
}
|
||||
if !r.summary.is_empty() {
|
||||
item = item.with_reasoning_summary(r.summary.clone());
|
||||
}
|
||||
if let Some(enc) = &r.encrypted_content {
|
||||
item = item.with_encrypted_content(enc);
|
||||
}
|
||||
if let Some(sig) = &r.signature {
|
||||
item = item.with_signature(sig);
|
||||
}
|
||||
items.push(item);
|
||||
}
|
||||
|
||||
// Add text as assistant message if present
|
||||
let text = text_blocks.join("");
|
||||
if !text.is_empty() {
|
||||
|
|
@ -973,9 +1003,11 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
self.turn_count += 1;
|
||||
|
||||
// Collect and commit assistant items
|
||||
let reasoning_items = self.reasoning_item_collector.take_collected();
|
||||
let text_blocks = self.text_block_collector.take_collected();
|
||||
let tool_calls = self.tool_call_collector.take_collected();
|
||||
let assistant_items = self.build_assistant_items(&text_blocks, &tool_calls);
|
||||
let assistant_items =
|
||||
self.build_assistant_items(&reasoning_items, &text_blocks, &tool_calls);
|
||||
self.history.extend(assistant_items);
|
||||
|
||||
if tool_calls.is_empty() {
|
||||
|
|
@ -1118,18 +1150,21 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
pub fn new(client: C) -> Self {
|
||||
let text_block_collector = TextBlockCollector::new();
|
||||
let tool_call_collector = ToolCallCollector::new();
|
||||
let reasoning_item_collector = ReasoningItemCollector::new();
|
||||
let mut timeline = Timeline::new();
|
||||
let (cancel_tx, cancel_rx) = mpsc::channel(1);
|
||||
|
||||
// Register collectors with Timeline
|
||||
timeline.on_text_block(text_block_collector.clone());
|
||||
timeline.on_tool_use_block(tool_call_collector.clone());
|
||||
timeline.on_reasoning_item(reasoning_item_collector.clone());
|
||||
|
||||
Self {
|
||||
client,
|
||||
timeline,
|
||||
text_block_collector,
|
||||
tool_call_collector,
|
||||
reasoning_item_collector,
|
||||
tool_server: ToolServer::new().handle(),
|
||||
interceptor: Box::new(DefaultInterceptor),
|
||||
system_prompt: None,
|
||||
|
|
@ -1388,6 +1423,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
timeline: self.timeline,
|
||||
text_block_collector: self.text_block_collector,
|
||||
tool_call_collector: self.tool_call_collector,
|
||||
reasoning_item_collector: self.reasoning_item_collector,
|
||||
tool_server: self.tool_server,
|
||||
interceptor: self.interceptor,
|
||||
system_prompt: self.system_prompt,
|
||||
|
|
@ -1470,6 +1506,7 @@ impl<C: LlmClient> Worker<C, Locked> {
|
|||
timeline: self.timeline,
|
||||
text_block_collector: self.text_block_collector,
|
||||
tool_call_collector: self.tool_call_collector,
|
||||
reasoning_item_collector: self.reasoning_item_collector,
|
||||
tool_server: self.tool_server,
|
||||
interceptor: self.interceptor,
|
||||
system_prompt: self.system_prompt,
|
||||
|
|
|
|||
212
crates/llm-worker/tests/reasoning_round_trip_test.rs
Normal file
212
crates/llm-worker/tests/reasoning_round_trip_test.rs
Normal file
|
|
@ -0,0 +1,212 @@
|
|||
//! Reasoning history round-trip 統合テスト
|
||||
//!
|
||||
//! Worker のストリーム → history append → 次リクエスト送出までの
|
||||
//! ライフサイクルで `Item::Reasoning` が脱落せず保持されることを確認する。
|
||||
//!
|
||||
//! 検証点:
|
||||
//! - Anthropic 由来の thinking + signature が `Item::Reasoning::signature` として
|
||||
//! history に残る
|
||||
//! - OpenAI Responses 由来の reasoning text + summary + encrypted_content が
|
||||
//! `Item::Reasoning` の各フィールドに展開される
|
||||
//! - 直前の reasoning は次の outgoing request の `request.items` の先頭付近に
|
||||
//! 含まれる(assistant メッセージの先頭、Anthropic 仕様)
|
||||
|
||||
mod common;
|
||||
|
||||
use common::MockLlmClient;
|
||||
use llm_worker::Item;
|
||||
use llm_worker::Worker;
|
||||
use llm_worker::llm_client::event::{
|
||||
Event, ReasoningItemEvent, ResponseStatus, StatusEvent,
|
||||
};
|
||||
|
||||
/// Anthropic 風: thinking ブロック → text → 終了 のシーケンス。
|
||||
/// Worker history に Reasoning(signature 付き) → assistant_message が並ぶ。
|
||||
#[tokio::test]
|
||||
async fn anthropic_thinking_round_trips_signature_into_history() {
|
||||
let events = vec![
|
||||
Event::ReasoningItem(ReasoningItemEvent {
|
||||
id: None,
|
||||
text: "let me think...".into(),
|
||||
summary: Vec::new(),
|
||||
encrypted_content: None,
|
||||
signature: Some("SIG-OPUS".into()),
|
||||
}),
|
||||
Event::text_block_start(0),
|
||||
Event::text_delta(0, "Here's the answer"),
|
||||
Event::text_block_stop(0, None),
|
||||
Event::Status(StatusEvent {
|
||||
status: ResponseStatus::Completed,
|
||||
}),
|
||||
];
|
||||
let client = MockLlmClient::new(events);
|
||||
let worker = Worker::new(client);
|
||||
let out = worker.run("question?").await.expect("run ok");
|
||||
let worker = out.worker;
|
||||
|
||||
let history = worker.history();
|
||||
// user / reasoning / assistant_message
|
||||
assert_eq!(history.len(), 3, "history: {history:?}");
|
||||
|
||||
assert!(matches!(history[0], Item::Message { .. }));
|
||||
match &history[1] {
|
||||
Item::Reasoning {
|
||||
text, signature, ..
|
||||
} => {
|
||||
assert_eq!(text, "let me think...");
|
||||
assert_eq!(signature.as_deref(), Some("SIG-OPUS"));
|
||||
}
|
||||
other => panic!("expected Reasoning, got {other:?}"),
|
||||
}
|
||||
assert_eq!(history[2].as_text(), Some("Here's the answer"));
|
||||
}
|
||||
|
||||
/// OpenAI Responses 風: encrypted_content + summary を持った reasoning が
|
||||
/// `Item::Reasoning` のフィールドに展開されること。
|
||||
#[tokio::test]
|
||||
async fn openai_reasoning_round_trips_encrypted_and_summary() {
|
||||
let events = vec![
|
||||
Event::ReasoningItem(ReasoningItemEvent {
|
||||
id: Some("r1".into()),
|
||||
text: "inner reasoning".into(),
|
||||
summary: vec!["sum-A".into(), "sum-B".into()],
|
||||
encrypted_content: Some("ENC-OPAQUE".into()),
|
||||
signature: None,
|
||||
}),
|
||||
Event::text_block_start(0),
|
||||
Event::text_delta(0, "answer"),
|
||||
Event::text_block_stop(0, None),
|
||||
Event::Status(StatusEvent {
|
||||
status: ResponseStatus::Completed,
|
||||
}),
|
||||
];
|
||||
let client = MockLlmClient::new(events);
|
||||
let worker = Worker::new(client);
|
||||
let out = worker.run("q").await.expect("run ok");
|
||||
let worker = out.worker;
|
||||
|
||||
let history = worker.history();
|
||||
match &history[1] {
|
||||
Item::Reasoning {
|
||||
text,
|
||||
summary,
|
||||
encrypted_content,
|
||||
signature,
|
||||
id,
|
||||
..
|
||||
} => {
|
||||
assert_eq!(text, "inner reasoning");
|
||||
assert_eq!(summary, &vec!["sum-A".to_string(), "sum-B".to_string()]);
|
||||
assert_eq!(encrypted_content.as_deref(), Some("ENC-OPAQUE"));
|
||||
assert!(signature.is_none());
|
||||
assert_eq!(id.as_deref(), Some("r1"));
|
||||
}
|
||||
other => panic!("expected Reasoning, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Reasoning は assistant ターン内で text/tool_call より先に並ぶこと(Anthropic
|
||||
/// が thinking を assistant メッセージの先頭に要求するため)。
|
||||
#[tokio::test]
|
||||
async fn reasoning_precedes_text_in_assistant_burst() {
|
||||
let events = vec![
|
||||
// text/tool_call とは独立に、ReasoningItem が中盤で発火しても、
|
||||
// history append 時には assistant items の先頭に置かれる。
|
||||
Event::text_block_start(0),
|
||||
Event::text_delta(0, "intermediate"),
|
||||
Event::text_block_stop(0, None),
|
||||
Event::ReasoningItem(ReasoningItemEvent {
|
||||
text: "after text".into(),
|
||||
signature: Some("SIG".into()),
|
||||
..Default::default()
|
||||
}),
|
||||
Event::Status(StatusEvent {
|
||||
status: ResponseStatus::Completed,
|
||||
}),
|
||||
];
|
||||
let client = MockLlmClient::new(events);
|
||||
let worker = Worker::new(client);
|
||||
let out = worker.run("q").await.expect("run ok");
|
||||
let worker = out.worker;
|
||||
|
||||
let history = worker.history();
|
||||
// user / reasoning(先頭) / assistant_message
|
||||
assert!(matches!(history[1], Item::Reasoning { .. }));
|
||||
assert_eq!(history[2].as_text(), Some("intermediate"));
|
||||
}
|
||||
|
||||
/// resume シナリオ: history.json 由来の Item::Reasoning(signature) を Worker に
|
||||
/// 注入して run しても、次の outgoing request の `Request::items` にそのまま
|
||||
/// 載って LLM へ渡る(worker は items を改変しない契約)。
|
||||
#[tokio::test]
|
||||
async fn injected_reasoning_survives_into_outgoing_request() {
|
||||
use async_trait::async_trait;
|
||||
use futures::Stream;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use llm_worker::llm_client::{ClientError, LlmClient, Request};
|
||||
|
||||
/// Request を 1 度だけキャプチャして空ストリームを返す client。
|
||||
#[derive(Clone)]
|
||||
struct CapturingClient {
|
||||
captured: Arc<Mutex<Option<Request>>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl LlmClient for CapturingClient {
|
||||
fn clone_boxed(&self) -> Box<dyn LlmClient> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
|
||||
async fn stream(
|
||||
&self,
|
||||
request: Request,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Result<Event, ClientError>> + Send>>, ClientError>
|
||||
{
|
||||
*self.captured.lock().unwrap() = Some(request);
|
||||
let stream = futures::stream::iter(vec![Ok(Event::Status(StatusEvent {
|
||||
status: ResponseStatus::Completed,
|
||||
}))]);
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
}
|
||||
|
||||
let captured = Arc::new(Mutex::new(None));
|
||||
let client = CapturingClient {
|
||||
captured: captured.clone(),
|
||||
};
|
||||
|
||||
let mut worker = Worker::new(client);
|
||||
// resume: 既存 history を流し込む
|
||||
worker.set_history(vec![
|
||||
Item::user_message("prior question"),
|
||||
Item::reasoning("prior thinking").with_signature("SIG-PRIOR"),
|
||||
Item::assistant_message("prior answer"),
|
||||
]);
|
||||
|
||||
let _ = worker.run("follow up").await.expect("run ok");
|
||||
|
||||
let req = captured
|
||||
.lock()
|
||||
.unwrap()
|
||||
.take()
|
||||
.expect("client should have received a request");
|
||||
// Reasoning item が outgoing items に保持されていること
|
||||
let mut found = false;
|
||||
for item in &req.items {
|
||||
if let Item::Reasoning {
|
||||
text, signature, ..
|
||||
} = item
|
||||
{
|
||||
assert_eq!(text, "prior thinking");
|
||||
assert_eq!(signature.as_deref(), Some("SIG-PRIOR"));
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
assert!(
|
||||
found,
|
||||
"Reasoning item must survive into outgoing request items: {req:?}",
|
||||
req = req.items,
|
||||
);
|
||||
}
|
||||
|
|
@ -39,6 +39,10 @@ pub enum LoggedItem {
|
|||
summary: Vec<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
encrypted_content: Option<String>,
|
||||
/// Anthropic extended thinking signature。新世代 Claude で round-trip
|
||||
/// 必須。OpenAI Responses など他 scheme では `None`。
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
signature: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -92,11 +96,13 @@ impl From<&Item> for LoggedItem {
|
|||
text,
|
||||
summary,
|
||||
encrypted_content,
|
||||
signature,
|
||||
..
|
||||
} => Self::Reasoning {
|
||||
text: text.clone(),
|
||||
summary: summary.clone(),
|
||||
encrypted_content: encrypted_content.clone(),
|
||||
signature: signature.clone(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
@ -142,11 +148,13 @@ impl From<LoggedItem> for Item {
|
|||
text,
|
||||
summary,
|
||||
encrypted_content,
|
||||
signature,
|
||||
} => Item::Reasoning {
|
||||
id: None,
|
||||
text,
|
||||
summary,
|
||||
encrypted_content,
|
||||
signature,
|
||||
status: None,
|
||||
},
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user