fix: unify reasoning block persistence
This commit is contained in:
parent
8135ff9006
commit
42066f1e00
|
|
@ -91,16 +91,6 @@ impl Kind for ErrorKind {
|
|||
type Event = ErrorEvent;
|
||||
}
|
||||
|
||||
/// Reasoning item Kind - 完成済み reasoning item の永続化用
|
||||
///
|
||||
/// 1 reasoning item につき 1 度だけ発火する。Worker は
|
||||
/// `ReasoningItemCollector` 経由で受け取り、ターン終了時に
|
||||
/// `Item::Reasoning` として history に append する。
|
||||
pub struct ReasoningItemKind;
|
||||
impl Kind for ReasoningItemKind {
|
||||
type Event = ReasoningItemEvent;
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Block Kind Definitions
|
||||
// =============================================================================
|
||||
|
|
@ -152,6 +142,7 @@ pub struct ThinkingBlockStart {
|
|||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct ThinkingBlockStop {
|
||||
pub index: usize,
|
||||
pub reasoning: Option<ReasoningBlockData>,
|
||||
}
|
||||
|
||||
/// ToolUseBlock Kind - for tool use blocks
|
||||
|
|
|
|||
|
|
@ -17,14 +17,12 @@ use serde::{Deserialize, Serialize};
|
|||
///
|
||||
/// - **メタイベント**: `Ping`, `Usage`, `Status`, `Error`, `UnhandledSse`
|
||||
/// - **ブロックイベント**: `BlockStart`, `BlockDelta`, `BlockStop`, `BlockAbort`
|
||||
/// - **永続化イベント**: `ReasoningItem` (history に commit すべき完成済み
|
||||
/// reasoning item。streaming 表示用の Thinking BlockStart/Delta/Stop と
|
||||
/// は別経路で発火する)
|
||||
///
|
||||
/// # ブロックのライフサイクル
|
||||
///
|
||||
/// テキストやツール呼び出しは、`BlockStart` → `BlockDelta`(複数) → `BlockStop`
|
||||
/// の順序でイベントが発生します。
|
||||
/// テキスト、thinking、ツール呼び出しは、`BlockStart` → `BlockDelta`(複数) → `BlockStop`
|
||||
/// の順序でイベントが発生します。thinking の round-trip metadata は
|
||||
/// `BlockStop.reasoning` に載ります。
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub enum Event {
|
||||
/// ハートビート
|
||||
|
|
@ -48,18 +46,6 @@ pub enum Event {
|
|||
BlockStop(BlockStop),
|
||||
/// ブロック中断
|
||||
BlockAbort(BlockAbort),
|
||||
|
||||
/// Reasoning item の完成。scheme が「次の request に送り返すための
|
||||
/// reasoning material が揃った」点で 1 度だけ発火する。
|
||||
///
|
||||
/// - Anthropic: 1 つの `thinking` content_block 完了ごと
|
||||
/// - OpenAI Responses: 1 つの reasoning output_item 完了ごと
|
||||
///
|
||||
/// 上位層(Worker / ReasoningItemCollector)はこれを `Item::Reasoning`
|
||||
/// として `worker.history` に append する。streaming 表示用の
|
||||
/// `BlockStart(Thinking)` / `BlockDelta(Thinking)` / `BlockStop(Thinking)`
|
||||
/// は依然として並行発火する(live display と round-trip persist の責務分離)。
|
||||
ReasoningItem(ReasoningItemEvent),
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
|
|
@ -218,6 +204,12 @@ pub struct BlockStop {
|
|||
pub block_type: BlockType,
|
||||
/// 停止理由
|
||||
pub stop_reason: Option<StopReason>,
|
||||
/// Thinking block の停止時に確定した reasoning round-trip metadata。
|
||||
///
|
||||
/// `None` の Thinking block は live streaming / trace 用で、history に
|
||||
/// `Item::Reasoning` として永続化しない。`Some` の場合は block lifecycle
|
||||
/// が永続化の authoritative source になる。
|
||||
pub reasoning: Option<ReasoningBlockData>,
|
||||
}
|
||||
|
||||
impl BlockStop {
|
||||
|
|
@ -243,22 +235,17 @@ impl BlockAbort {
|
|||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Reasoning Item Event
|
||||
// =============================================================================
|
||||
|
||||
/// 完成済み reasoning item。scheme が round-trip に必要なすべての
|
||||
/// material(text, summary, encrypted_content, signature, id)を揃えて
|
||||
/// 1 度だけ発火する。
|
||||
/// Thinking block stop で確定した reasoning material。
|
||||
///
|
||||
/// `Item::Reasoning` のフィールドを 1:1 に持つ。
|
||||
/// `Item::Reasoning` の round-trip に必要な provider material を保持する。
|
||||
/// `text` は deltas から収集した本文を上書きするために使う(metadata-only
|
||||
/// reasoning block や provider completion event で全文が届くケース)。
|
||||
#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
|
||||
pub struct ReasoningItemEvent {
|
||||
pub struct ReasoningBlockData {
|
||||
/// scheme 側で観測した item id(OpenAI Responses の `id`)。
|
||||
pub id: Option<String>,
|
||||
/// reasoning 本体テキスト。Anthropic は `thinking` 累積、OpenAI は
|
||||
/// `reasoning_text` 累積。redacted_thinking では空。
|
||||
pub text: String,
|
||||
/// reasoning 本体テキスト。`None` の場合は block delta 収集結果を使う。
|
||||
pub text: Option<String>,
|
||||
/// summary (OpenAI Responses の `summary_text[]`)。他 scheme は空。
|
||||
pub summary: Vec<String>,
|
||||
/// 暗号化された opaque blob(Anthropic `redacted_thinking.data` /
|
||||
|
|
@ -309,6 +296,7 @@ impl Event {
|
|||
index,
|
||||
block_type: BlockType::Text,
|
||||
stop_reason,
|
||||
reasoning: None,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -338,6 +326,7 @@ impl Event {
|
|||
index,
|
||||
block_type: BlockType::ToolUse,
|
||||
stop_reason: Some(StopReason::ToolUse),
|
||||
reasoning: None,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -216,6 +216,7 @@ impl AnthropicScheme {
|
|||
index: event.index,
|
||||
block_type: BlockType::Text, // Timeline層で上書きされる
|
||||
stop_reason: None,
|
||||
reasoning: None,
|
||||
})))
|
||||
}
|
||||
AnthropicEventType::MessageDelta => {
|
||||
|
|
@ -286,9 +287,9 @@ impl AnthropicScheme {
|
|||
/// `parse_event` の単発 Event に加えて、以下を行う:
|
||||
/// - `content_block_stop` の `block_type` を直前の Start 値で書き戻す
|
||||
/// - `thinking` / `redacted_thinking` ブロックの本体・signature・data を
|
||||
/// `state.pending_thinking` に蓄積し、`content_block_stop` で
|
||||
/// `Event::ReasoningItem` を追加発火する
|
||||
/// - `signature_delta` を蓄積(Stream channel には流さず、reasoning event
|
||||
/// `state.pending_thinking` に蓄積し、`content_block_stop` の Thinking
|
||||
/// BlockStop metadata に載せる
|
||||
/// - `signature_delta` を蓄積(Stream channel には流さず、reasoning metadata
|
||||
/// にだけ反映する)
|
||||
pub(crate) fn parse_with_state(
|
||||
&self,
|
||||
|
|
@ -374,16 +375,21 @@ impl AnthropicScheme {
|
|||
AnthropicEventType::ContentBlockStop => {
|
||||
let raw: ContentBlockStopEvent = serde_json::from_str(data)?;
|
||||
let block_type = state.current_block_type.take().unwrap_or(BlockType::Text);
|
||||
let reasoning = if matches!(block_type, BlockType::Thinking) {
|
||||
state
|
||||
.pending_thinking
|
||||
.take()
|
||||
.map(PendingThinking::into_reasoning)
|
||||
} else {
|
||||
state.pending_thinking.take();
|
||||
None
|
||||
};
|
||||
emitted.push(Event::BlockStop(BlockStop {
|
||||
index: raw.index,
|
||||
block_type,
|
||||
stop_reason: None,
|
||||
reasoning,
|
||||
}));
|
||||
if matches!(block_type, BlockType::Thinking) {
|
||||
if let Some(pending) = state.pending_thinking.take() {
|
||||
emitted.push(Event::ReasoningItem(pending.into_event()));
|
||||
}
|
||||
}
|
||||
}
|
||||
// 残りは state を必要としない。既存 parse_event に委譲。
|
||||
_ => {
|
||||
|
|
@ -524,8 +530,8 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn thinking_block_emits_reasoning_item_with_signature() {
|
||||
// thinking ブロックが完了したら ReasoningItem に text+signature が乗ること
|
||||
fn thinking_block_stop_carries_reasoning_with_signature() {
|
||||
// thinking ブロックが完了したら reasoning metadata に text+signature が乗ること
|
||||
let scheme = AnthropicScheme::new();
|
||||
let mut state = AnthropicState::default();
|
||||
|
||||
|
|
@ -567,18 +573,18 @@ mod tests {
|
|||
&mut state,
|
||||
)
|
||||
.unwrap();
|
||||
// BlockStop と ReasoningItem の 2 件が並ぶ
|
||||
assert!(matches!(stop_evs[0], Event::BlockStop(_)));
|
||||
let Event::ReasoningItem(reasoning) = &stop_evs[1] else {
|
||||
panic!("expected ReasoningItem, got {:?}", stop_evs[1]);
|
||||
assert_eq!(stop_evs.len(), 1);
|
||||
let Event::BlockStop(stop) = &stop_evs[0] else {
|
||||
panic!("expected BlockStop, got {:?}", stop_evs[0]);
|
||||
};
|
||||
assert_eq!(reasoning.text, "hello world");
|
||||
let reasoning = stop.reasoning.as_ref().expect("reasoning metadata");
|
||||
assert_eq!(reasoning.text.as_deref(), Some("hello world"));
|
||||
assert_eq!(reasoning.signature.as_deref(), Some("SIG-XYZ"));
|
||||
assert!(reasoning.encrypted_content.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn redacted_thinking_emits_reasoning_item_with_data() {
|
||||
fn redacted_thinking_stop_carries_reasoning_with_data() {
|
||||
let scheme = AnthropicScheme::new();
|
||||
let mut state = AnthropicState::default();
|
||||
|
||||
|
|
@ -596,16 +602,18 @@ mod tests {
|
|||
&mut state,
|
||||
)
|
||||
.unwrap();
|
||||
let Event::ReasoningItem(reasoning) = &stop_evs[1] else {
|
||||
panic!("expected ReasoningItem");
|
||||
assert_eq!(stop_evs.len(), 1);
|
||||
let Event::BlockStop(stop) = &stop_evs[0] else {
|
||||
panic!("expected BlockStop");
|
||||
};
|
||||
assert!(reasoning.text.is_empty());
|
||||
let reasoning = stop.reasoning.as_ref().expect("reasoning metadata");
|
||||
assert_eq!(reasoning.text.as_deref(), Some(""));
|
||||
assert!(reasoning.signature.is_none());
|
||||
assert_eq!(reasoning.encrypted_content.as_deref(), Some("opaque-blob"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn text_block_does_not_emit_reasoning_item() {
|
||||
fn text_block_stop_has_no_reasoning_metadata() {
|
||||
let scheme = AnthropicScheme::new();
|
||||
let mut state = AnthropicState::default();
|
||||
|
||||
|
|
@ -631,7 +639,10 @@ mod tests {
|
|||
)
|
||||
.unwrap();
|
||||
assert_eq!(stop_evs.len(), 1);
|
||||
assert!(matches!(stop_evs[0], Event::BlockStop(_)));
|
||||
let Event::BlockStop(stop) = &stop_evs[0] else {
|
||||
panic!("expected BlockStop");
|
||||
};
|
||||
assert!(stop.reasoning.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ use crate::llm_client::{
|
|||
ClientError,
|
||||
auth::AuthRequirement,
|
||||
capability::ModelCapability,
|
||||
event::{BlockType, Event, ReasoningItemEvent},
|
||||
event::{BlockType, Event, ReasoningBlockData},
|
||||
scheme::Scheme,
|
||||
types::Request,
|
||||
};
|
||||
|
|
@ -23,7 +23,7 @@ use super::AnthropicScheme;
|
|||
/// `BlockStop` に書き戻す。
|
||||
/// 2. `thinking` ブロック中の `thinking_delta` テキストと `signature_delta`
|
||||
/// 署名、および `redacted_thinking` ブロックの `data` を蓄積し、
|
||||
/// `content_block_stop` で `Event::ReasoningItem` を発火する
|
||||
/// `content_block_stop` の Thinking block metadata として返す
|
||||
/// (round-trip 永続化のため)。
|
||||
#[derive(Debug, Default)]
|
||||
pub struct AnthropicState {
|
||||
|
|
@ -40,10 +40,10 @@ pub(crate) struct PendingThinking {
|
|||
}
|
||||
|
||||
impl PendingThinking {
|
||||
pub(crate) fn into_event(self) -> ReasoningItemEvent {
|
||||
ReasoningItemEvent {
|
||||
pub(crate) fn into_reasoning(self) -> ReasoningBlockData {
|
||||
ReasoningBlockData {
|
||||
id: None,
|
||||
text: self.text,
|
||||
text: Some(self.text),
|
||||
summary: Vec::new(),
|
||||
encrypted_content: self.redacted_data,
|
||||
signature: self.signature,
|
||||
|
|
|
|||
|
|
@ -205,6 +205,7 @@ impl GeminiScheme {
|
|||
index: candidate_index,
|
||||
block_type: BlockType::Text,
|
||||
stop_reason,
|
||||
reasoning: None,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ use crate::llm_client::{
|
|||
ClientError,
|
||||
event::{
|
||||
BlockDelta, BlockMetadata, BlockStart, BlockStop, BlockType, DeltaContent, ErrorEvent,
|
||||
Event, ReasoningItemEvent, ResponseStatus, StatusEvent, UnhandledSseEvent, UsageEvent,
|
||||
Event, ReasoningBlockData, ResponseStatus, StatusEvent, UnhandledSseEvent, UsageEvent,
|
||||
},
|
||||
};
|
||||
|
||||
|
|
@ -25,8 +25,8 @@ pub struct OpenAIResponsesState {
|
|||
next_index: usize,
|
||||
/// 蓄積中の reasoning output_item。`output_item.added`(Reasoning) で
|
||||
/// 確保し、`reasoning_text.delta` / `reasoning_summary_text.delta` で
|
||||
/// 蓄積、`output_item.done`(Reasoning) で `Event::ReasoningItem` を
|
||||
/// 発火してエントリを除去する。
|
||||
/// 蓄積、`output_item.done`(Reasoning) で metadata-only Thinking block を
|
||||
/// 完了させて reasoning persistence material を渡す。
|
||||
pending_reasoning: HashMap<usize, PendingReasoning>,
|
||||
}
|
||||
|
||||
|
|
@ -380,8 +380,8 @@ pub(crate) fn parse_sse(
|
|||
|
||||
"response.output_item.done" => {
|
||||
let ev: OutputItemDone = from_json(data)?;
|
||||
// Reasoning wrapper の done で蓄積分を ReasoningItem として発火。
|
||||
// これは `slots` の OutputItem slot とは独立している
|
||||
// Reasoning wrapper の done で蓄積分を metadata-only Thinking block
|
||||
// stop に載せる。これは `slots` の OutputItem slot とは独立している
|
||||
// (FunctionCall は slots、Reasoning は pending_reasoning)。
|
||||
if let OutputItem::Reasoning {
|
||||
id,
|
||||
|
|
@ -396,23 +396,39 @@ pub(crate) fn parse_sse(
|
|||
if pending.id.is_none() {
|
||||
pending.id = id;
|
||||
}
|
||||
return Ok(vec![Event::ReasoningItem(ReasoningItemEvent {
|
||||
id: pending.id,
|
||||
text: pending.text,
|
||||
summary: pending
|
||||
.summary
|
||||
.into_iter()
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect(),
|
||||
encrypted_content,
|
||||
signature: None,
|
||||
})]);
|
||||
let info =
|
||||
state.allocate(SlotKey::OutputItem(ev.output_index), BlockType::Thinking);
|
||||
state.slots.remove(&SlotKey::OutputItem(ev.output_index));
|
||||
return Ok(vec![
|
||||
Event::BlockStart(BlockStart {
|
||||
index: info.flat_index,
|
||||
block_type: BlockType::Thinking,
|
||||
metadata: BlockMetadata::Thinking,
|
||||
}),
|
||||
Event::BlockStop(BlockStop {
|
||||
index: info.flat_index,
|
||||
block_type: BlockType::Thinking,
|
||||
stop_reason: None,
|
||||
reasoning: Some(ReasoningBlockData {
|
||||
id: pending.id,
|
||||
text: Some(pending.text),
|
||||
summary: pending
|
||||
.summary
|
||||
.into_iter()
|
||||
.filter(|s| !s.is_empty())
|
||||
.collect(),
|
||||
encrypted_content,
|
||||
signature: None,
|
||||
}),
|
||||
}),
|
||||
]);
|
||||
}
|
||||
if let Some(info) = state.slots.remove(&SlotKey::OutputItem(ev.output_index)) {
|
||||
Ok(vec![Event::BlockStop(BlockStop {
|
||||
index: info.flat_index,
|
||||
block_type: info.block_type,
|
||||
stop_reason: None,
|
||||
reasoning: None,
|
||||
})])
|
||||
} else {
|
||||
Ok(Vec::new())
|
||||
|
|
@ -450,6 +466,7 @@ pub(crate) fn parse_sse(
|
|||
index: info.flat_index,
|
||||
block_type: info.block_type,
|
||||
stop_reason: None,
|
||||
reasoning: None,
|
||||
})])
|
||||
} else {
|
||||
Ok(Vec::new())
|
||||
|
|
@ -531,6 +548,7 @@ pub(crate) fn parse_sse(
|
|||
index: info.flat_index,
|
||||
block_type: info.block_type,
|
||||
stop_reason: None,
|
||||
reasoning: None,
|
||||
})])
|
||||
} else {
|
||||
Ok(Vec::new())
|
||||
|
|
@ -1116,9 +1134,9 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn reasoning_output_item_emits_reasoning_item_with_text_summary_encrypted() {
|
||||
fn reasoning_output_item_completes_metadata_thinking_block_with_text_summary_encrypted() {
|
||||
// 完成済み reasoning wrapper が text + summary[] + encrypted_content を持って
|
||||
// ReasoningItem として届くこと。
|
||||
// Thinking BlockStop metadata として届くこと。
|
||||
let mut state = OpenAIResponsesState::default();
|
||||
|
||||
// wrapper added (id だけ持つ)
|
||||
|
|
@ -1171,12 +1189,14 @@ mod tests {
|
|||
"response.output_item.done",
|
||||
r#"{"output_index":0,"item":{"type":"reasoning","id":"r1","encrypted_content":"ENC-XYZ"}}"#,
|
||||
);
|
||||
assert_eq!(evs.len(), 1);
|
||||
let Event::ReasoningItem(reasoning) = &evs[0] else {
|
||||
panic!("expected ReasoningItem, got {:?}", evs[0]);
|
||||
assert_eq!(evs.len(), 2);
|
||||
assert!(matches!(evs[0], Event::BlockStart(_)));
|
||||
let Event::BlockStop(stop) = &evs[1] else {
|
||||
panic!("expected BlockStop, got {:?}", evs[1]);
|
||||
};
|
||||
let reasoning = stop.reasoning.as_ref().expect("reasoning metadata");
|
||||
assert_eq!(reasoning.id.as_deref(), Some("r1"));
|
||||
assert_eq!(reasoning.text, "hello world");
|
||||
assert_eq!(reasoning.text.as_deref(), Some("hello world"));
|
||||
assert_eq!(reasoning.summary, vec!["sum-A".to_string()]);
|
||||
assert_eq!(reasoning.encrypted_content.as_deref(), Some("ENC-XYZ"));
|
||||
assert!(reasoning.signature.is_none());
|
||||
|
|
@ -1187,7 +1207,7 @@ mod tests {
|
|||
#[test]
|
||||
fn reasoning_wrapper_without_inner_content_emits_empty_text() {
|
||||
// encrypted_content だけ届く(reasoning_text 無し)ケースでも
|
||||
// ReasoningItem は発火する。
|
||||
// reasoning metadata は届く。
|
||||
let mut state = OpenAIResponsesState::default();
|
||||
with(
|
||||
&mut state,
|
||||
|
|
@ -1199,10 +1219,13 @@ mod tests {
|
|||
"response.output_item.done",
|
||||
r#"{"output_index":2,"item":{"type":"reasoning","id":"r9","encrypted_content":"BLOB"}}"#,
|
||||
);
|
||||
let Event::ReasoningItem(r) = &evs[0] else {
|
||||
panic!()
|
||||
assert_eq!(evs.len(), 2);
|
||||
assert!(matches!(evs[0], Event::BlockStart(_)));
|
||||
let Event::BlockStop(stop) = &evs[1] else {
|
||||
panic!("expected BlockStop")
|
||||
};
|
||||
assert!(r.text.is_empty());
|
||||
let r = stop.reasoning.as_ref().expect("reasoning metadata");
|
||||
assert_eq!(r.text.as_deref(), Some(""));
|
||||
assert!(r.summary.is_empty());
|
||||
assert_eq!(r.encrypted_content.as_deref(), Some("BLOB"));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,18 +7,19 @@
|
|||
//! - [`Timeline`] - イベントストリームの管理とディスパッチ
|
||||
//! - [`Handler`] - イベントを処理するトレイト
|
||||
//! - [`TextBlockCollector`] - テキストブロックを収集するHandler
|
||||
//! - [`ThinkingBlockCollector`] - reasoning material を Thinking block から収集するHandler
|
||||
//! - [`ToolCallCollector`] - ツール呼び出しを収集するHandler
|
||||
|
||||
pub mod event;
|
||||
mod reasoning_item_collector;
|
||||
mod text_block_collector;
|
||||
mod thinking_block_collector;
|
||||
mod timeline;
|
||||
mod tool_call_collector;
|
||||
|
||||
// 公開API
|
||||
pub use event::*;
|
||||
pub use reasoning_item_collector::ReasoningItemCollector;
|
||||
pub use text_block_collector::TextBlockCollector;
|
||||
pub use thinking_block_collector::ThinkingBlockCollector;
|
||||
pub use timeline::Timeline;
|
||||
pub use tool_call_collector::ToolCallCollector;
|
||||
|
||||
|
|
@ -30,7 +31,6 @@ pub use crate::handler::{
|
|||
Handler,
|
||||
Kind,
|
||||
PingKind,
|
||||
ReasoningItemKind,
|
||||
StatusKind,
|
||||
// Block Events
|
||||
TextBlockEvent,
|
||||
|
|
|
|||
|
|
@ -1,77 +0,0 @@
|
|||
//! `ReasoningItemCollector` - 完成済み reasoning item を収集する Handler
|
||||
//!
|
||||
//! Timeline の `ReasoningItemKind` Handler として登録し、scheme 側が
|
||||
//! `Event::ReasoningItem` を発火するたびに 1 件ずつバッファに溜める。
|
||||
//! Worker はターン終了時に `take_collected()` でドレインして
|
||||
//! `Item::Reasoning` として `worker.history` に append する。
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use crate::handler::{Handler, ReasoningItemKind};
|
||||
use crate::llm_client::event::ReasoningItemEvent;
|
||||
|
||||
/// 収集された reasoning item の連列。
|
||||
#[derive(Clone, Default)]
|
||||
pub struct ReasoningItemCollector {
|
||||
collected: Arc<Mutex<Vec<ReasoningItemEvent>>>,
|
||||
}
|
||||
|
||||
impl ReasoningItemCollector {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// 収集済み item を取り出してクリア
|
||||
pub fn take_collected(&self) -> Vec<ReasoningItemEvent> {
|
||||
let mut guard = self.collected.lock().unwrap();
|
||||
std::mem::take(&mut *guard)
|
||||
}
|
||||
|
||||
/// 収集をクリア
|
||||
pub fn clear(&self) {
|
||||
self.collected.lock().unwrap().clear();
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<ReasoningItemKind> for ReasoningItemCollector {
|
||||
type Scope = ();
|
||||
|
||||
fn on_event(&mut self, _scope: &mut Self::Scope, event: &ReasoningItemEvent) {
|
||||
self.collected.lock().unwrap().push(event.clone());
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::llm_client::event::Event;
|
||||
use crate::timeline::Timeline;
|
||||
|
||||
#[test]
|
||||
fn collects_in_order() {
|
||||
let collector = ReasoningItemCollector::new();
|
||||
let mut timeline = Timeline::new();
|
||||
timeline.on_reasoning_item(collector.clone());
|
||||
|
||||
timeline.dispatch(&Event::ReasoningItem(ReasoningItemEvent {
|
||||
id: Some("r1".into()),
|
||||
text: "first".into(),
|
||||
signature: Some("sig1".into()),
|
||||
..Default::default()
|
||||
}));
|
||||
timeline.dispatch(&Event::ReasoningItem(ReasoningItemEvent {
|
||||
id: Some("r2".into()),
|
||||
text: "second".into(),
|
||||
..Default::default()
|
||||
}));
|
||||
|
||||
let items = collector.take_collected();
|
||||
assert_eq!(items.len(), 2);
|
||||
assert_eq!(items[0].text, "first");
|
||||
assert_eq!(items[0].signature.as_deref(), Some("sig1"));
|
||||
assert_eq!(items[1].text, "second");
|
||||
|
||||
// take は drain なので 2 度目は空
|
||||
assert!(collector.take_collected().is_empty());
|
||||
}
|
||||
}
|
||||
138
crates/llm-worker/src/timeline/thinking_block_collector.rs
Normal file
138
crates/llm-worker/src/timeline/thinking_block_collector.rs
Normal file
|
|
@ -0,0 +1,138 @@
|
|||
//! `ThinkingBlockCollector` - reasoning material from Thinking block lifecycle.
|
||||
//!
|
||||
//! Scheme implementations emit Thinking BlockStart/Delta/Stop events for live
|
||||
//! streaming. A Thinking block stop with `ReasoningBlockData` is also the single
|
||||
//! authoritative persistence signal for `Item::Reasoning` round-trip material.
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use crate::handler::{Handler, ThinkingBlockEvent, ThinkingBlockKind};
|
||||
use crate::llm_client::event::ReasoningBlockData;
|
||||
|
||||
/// Reasoning material collected from completed Thinking blocks.
|
||||
#[derive(Clone, Default)]
|
||||
pub struct ThinkingBlockCollector {
|
||||
collected: Arc<Mutex<Vec<ReasoningBlockData>>>,
|
||||
}
|
||||
|
||||
impl ThinkingBlockCollector {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// 収集済み item を取り出してクリア
|
||||
pub fn take_collected(&self) -> Vec<ReasoningBlockData> {
|
||||
let mut guard = self.collected.lock().unwrap();
|
||||
std::mem::take(&mut *guard)
|
||||
}
|
||||
|
||||
/// 収集をクリア
|
||||
pub fn clear(&self) {
|
||||
self.collected.lock().unwrap().clear();
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<ThinkingBlockKind> for ThinkingBlockCollector {
|
||||
type Scope = String;
|
||||
|
||||
fn on_event(&mut self, scope: &mut Self::Scope, event: &ThinkingBlockEvent) {
|
||||
match event {
|
||||
ThinkingBlockEvent::Start(_) => scope.clear(),
|
||||
ThinkingBlockEvent::Delta(text) => scope.push_str(text),
|
||||
ThinkingBlockEvent::Stop(stop) => {
|
||||
if let Some(mut reasoning) = stop.reasoning.clone() {
|
||||
if reasoning.text.is_none() {
|
||||
reasoning.text = Some(scope.clone());
|
||||
}
|
||||
self.collected.lock().unwrap().push(reasoning);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::llm_client::event::{
|
||||
BlockMetadata, BlockStart, BlockStop, BlockType, DeltaContent, Event, ReasoningBlockData,
|
||||
};
|
||||
use crate::timeline::Timeline;
|
||||
|
||||
#[test]
|
||||
fn collects_in_order_from_thinking_block_stops() {
|
||||
let collector = ThinkingBlockCollector::new();
|
||||
let mut timeline = Timeline::new();
|
||||
timeline.on_thinking_block(collector.clone());
|
||||
|
||||
timeline.dispatch(&Event::BlockStart(BlockStart {
|
||||
index: 0,
|
||||
block_type: BlockType::Thinking,
|
||||
metadata: BlockMetadata::Thinking,
|
||||
}));
|
||||
timeline.dispatch(&Event::BlockDelta(crate::llm_client::event::BlockDelta {
|
||||
index: 0,
|
||||
delta: DeltaContent::Thinking("first".into()),
|
||||
}));
|
||||
timeline.dispatch(&Event::BlockStop(BlockStop {
|
||||
index: 0,
|
||||
block_type: BlockType::Thinking,
|
||||
stop_reason: None,
|
||||
reasoning: Some(ReasoningBlockData {
|
||||
id: Some("r1".into()),
|
||||
signature: Some("sig1".into()),
|
||||
..Default::default()
|
||||
}),
|
||||
}));
|
||||
|
||||
timeline.dispatch(&Event::BlockStart(BlockStart {
|
||||
index: 1,
|
||||
block_type: BlockType::Thinking,
|
||||
metadata: BlockMetadata::Thinking,
|
||||
}));
|
||||
timeline.dispatch(&Event::BlockStop(BlockStop {
|
||||
index: 1,
|
||||
block_type: BlockType::Thinking,
|
||||
stop_reason: None,
|
||||
reasoning: Some(ReasoningBlockData {
|
||||
id: Some("r2".into()),
|
||||
text: Some("second".into()),
|
||||
..Default::default()
|
||||
}),
|
||||
}));
|
||||
|
||||
let items = collector.take_collected();
|
||||
assert_eq!(items.len(), 2);
|
||||
assert_eq!(items[0].text.as_deref(), Some("first"));
|
||||
assert_eq!(items[0].signature.as_deref(), Some("sig1"));
|
||||
assert_eq!(items[1].text.as_deref(), Some("second"));
|
||||
|
||||
// take は drain なので 2 度目は空
|
||||
assert!(collector.take_collected().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ignores_streaming_only_thinking_blocks() {
|
||||
let collector = ThinkingBlockCollector::new();
|
||||
let mut timeline = Timeline::new();
|
||||
timeline.on_thinking_block(collector.clone());
|
||||
|
||||
timeline.dispatch(&Event::BlockStart(BlockStart {
|
||||
index: 0,
|
||||
block_type: BlockType::Thinking,
|
||||
metadata: BlockMetadata::Thinking,
|
||||
}));
|
||||
timeline.dispatch(&Event::BlockDelta(crate::llm_client::event::BlockDelta {
|
||||
index: 0,
|
||||
delta: DeltaContent::Thinking("live only".into()),
|
||||
}));
|
||||
timeline.dispatch(&Event::BlockStop(BlockStop {
|
||||
index: 0,
|
||||
block_type: BlockType::Thinking,
|
||||
stop_reason: None,
|
||||
reasoning: None,
|
||||
}));
|
||||
|
||||
assert!(collector.take_collected().is_empty());
|
||||
}
|
||||
}
|
||||
|
|
@ -231,7 +231,10 @@ where
|
|||
if let Some(scope) = &mut self.scope {
|
||||
self.handler.on_event(
|
||||
scope,
|
||||
&ThinkingBlockEvent::Stop(ThinkingBlockStop { index: stop.index }),
|
||||
&ThinkingBlockEvent::Stop(ThinkingBlockStop {
|
||||
index: stop.index,
|
||||
reasoning: stop.reasoning.clone(),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
@ -375,8 +378,6 @@ pub struct Timeline {
|
|||
ping_handlers: Vec<Box<dyn ErasedHandler<PingKind>>>,
|
||||
status_handlers: Vec<Box<dyn ErasedHandler<StatusKind>>>,
|
||||
error_handlers: Vec<Box<dyn ErasedHandler<ErrorKind>>>,
|
||||
reasoning_item_handlers: Vec<Box<dyn ErasedHandler<ReasoningItemKind>>>,
|
||||
|
||||
// Block系ハンドラー(BlockTypeごとにグループ化)
|
||||
text_block_handlers: Vec<Box<dyn ErasedBlockHandler>>,
|
||||
thinking_block_handlers: Vec<Box<dyn ErasedBlockHandler>>,
|
||||
|
|
@ -405,7 +406,6 @@ impl Timeline {
|
|||
ping_handlers: Vec::new(),
|
||||
status_handlers: Vec::new(),
|
||||
error_handlers: Vec::new(),
|
||||
reasoning_item_handlers: Vec::new(),
|
||||
text_block_handlers: Vec::new(),
|
||||
thinking_block_handlers: Vec::new(),
|
||||
tool_use_block_handlers: Vec::new(),
|
||||
|
|
@ -467,18 +467,6 @@ impl Timeline {
|
|||
self
|
||||
}
|
||||
|
||||
/// `ReasoningItemKind` 用 Handler を登録
|
||||
pub fn on_reasoning_item<H>(&mut self, handler: H) -> &mut Self
|
||||
where
|
||||
H: Handler<ReasoningItemKind> + Send + Sync + 'static,
|
||||
H::Scope: Send + Sync,
|
||||
{
|
||||
let mut wrapper = HandlerWrapper::new(handler);
|
||||
wrapper.start_scope();
|
||||
self.reasoning_item_handlers.push(Box::new(wrapper));
|
||||
self
|
||||
}
|
||||
|
||||
/// TextBlockKind用のHandlerを登録
|
||||
pub fn on_text_block<H>(&mut self, handler: H) -> &mut Self
|
||||
where
|
||||
|
|
@ -532,9 +520,6 @@ impl Timeline {
|
|||
Event::BlockDelta(d) => self.handle_block_delta(d),
|
||||
Event::BlockStop(s) => self.handle_block_stop(s),
|
||||
Event::BlockAbort(a) => self.handle_block_abort(a),
|
||||
|
||||
// 完成済み reasoning item: 即時ディスパッチ
|
||||
Event::ReasoningItem(r) => self.dispatch_reasoning_item(r),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -577,12 +562,6 @@ impl Timeline {
|
|||
}
|
||||
}
|
||||
|
||||
fn dispatch_reasoning_item(&mut self, event: &ReasoningItemEvent) {
|
||||
for handler in &mut self.reasoning_item_handlers {
|
||||
handler.dispatch(event);
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_block_start(&mut self, start: &BlockStart) {
|
||||
self.current_block = Some(start.block_type);
|
||||
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ use crate::{
|
|||
},
|
||||
state::{Locked, Mutable, WorkerState},
|
||||
timeline::event::{ErrorEvent, StatusEvent, UsageEvent},
|
||||
timeline::{ReasoningItemCollector, TextBlockCollector, Timeline, ToolCallCollector},
|
||||
timeline::{TextBlockCollector, ThinkingBlockCollector, Timeline, ToolCallCollector},
|
||||
tool::{
|
||||
ToolCall, ToolDefinition as WorkerToolDefinition, ToolError, ToolOutputLimits, ToolResult,
|
||||
truncate_content,
|
||||
|
|
@ -163,9 +163,9 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
|
|||
text_block_collector: TextBlockCollector,
|
||||
/// Tool call collector (Timeline handler)
|
||||
tool_call_collector: ToolCallCollector,
|
||||
/// Reasoning item collector (Timeline handler)。完成済み reasoning
|
||||
/// item を 1 ターン分バッファし、history に append する。
|
||||
reasoning_item_collector: ReasoningItemCollector,
|
||||
/// Thinking block collector (Timeline handler)。metadata 付きで完了した
|
||||
/// Thinking block を 1 ターン分バッファし、history に append する。
|
||||
thinking_block_collector: ThinkingBlockCollector,
|
||||
/// Tool server handle
|
||||
tool_server: ToolServerHandle,
|
||||
/// Interceptor for control-flow decisions
|
||||
|
|
@ -771,14 +771,14 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
/// 置くことを要求するためでもある。
|
||||
fn build_assistant_items(
|
||||
&self,
|
||||
reasoning_items: &[crate::llm_client::event::ReasoningItemEvent],
|
||||
reasoning_items: &[crate::llm_client::event::ReasoningBlockData],
|
||||
text_blocks: &[String],
|
||||
tool_calls: &[ToolCall],
|
||||
) -> Vec<Item> {
|
||||
let mut items = Vec::new();
|
||||
|
||||
for r in reasoning_items {
|
||||
let mut item = Item::reasoning(r.text.clone());
|
||||
let mut item = Item::reasoning(r.text.clone().unwrap_or_default());
|
||||
if let Some(id) = &r.id {
|
||||
item = item.with_id(id);
|
||||
}
|
||||
|
|
@ -1238,7 +1238,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
|
||||
self.timeline.abort_current_block();
|
||||
self.timeline.flush_usage();
|
||||
let reasoning_items = self.reasoning_item_collector.take_collected();
|
||||
let reasoning_items = self.thinking_block_collector.take_collected();
|
||||
let text_blocks = self.text_block_collector.take_collected();
|
||||
// Do not recover tool calls from an interrupted stream. A completed
|
||||
// tool_use is executable only when the provider finishes the stream.
|
||||
|
|
@ -1270,7 +1270,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
// `append_history_items` so observers (e.g. the
|
||||
// Pod-side per-item session-log committer) see each item
|
||||
// as it lands.
|
||||
let reasoning_items = self.reasoning_item_collector.take_collected();
|
||||
let reasoning_items = self.thinking_block_collector.take_collected();
|
||||
let text_blocks = self.text_block_collector.take_collected();
|
||||
let tool_calls = self.tool_call_collector.take_collected();
|
||||
let assistant_items =
|
||||
|
|
@ -1595,14 +1595,14 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
pub fn new(client: C) -> Self {
|
||||
let text_block_collector = TextBlockCollector::new();
|
||||
let tool_call_collector = ToolCallCollector::new();
|
||||
let reasoning_item_collector = ReasoningItemCollector::new();
|
||||
let thinking_block_collector = ThinkingBlockCollector::new();
|
||||
let mut timeline = Timeline::new();
|
||||
let (cancel_tx, cancel_rx) = mpsc::channel(1);
|
||||
|
||||
// Register collectors with Timeline
|
||||
timeline.on_text_block(text_block_collector.clone());
|
||||
timeline.on_tool_use_block(tool_call_collector.clone());
|
||||
timeline.on_reasoning_item(reasoning_item_collector.clone());
|
||||
timeline.on_thinking_block(thinking_block_collector.clone());
|
||||
|
||||
Self {
|
||||
client,
|
||||
|
|
@ -1610,7 +1610,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
timeline,
|
||||
text_block_collector,
|
||||
tool_call_collector,
|
||||
reasoning_item_collector,
|
||||
thinking_block_collector,
|
||||
tool_server: ToolServer::new().handle(),
|
||||
interceptor: Box::new(DefaultInterceptor),
|
||||
system_prompt: None,
|
||||
|
|
@ -1874,7 +1874,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
timeline: self.timeline,
|
||||
text_block_collector: self.text_block_collector,
|
||||
tool_call_collector: self.tool_call_collector,
|
||||
reasoning_item_collector: self.reasoning_item_collector,
|
||||
thinking_block_collector: self.thinking_block_collector,
|
||||
tool_server: self.tool_server,
|
||||
interceptor: self.interceptor,
|
||||
system_prompt: self.system_prompt,
|
||||
|
|
@ -1966,7 +1966,7 @@ impl<C: LlmClient> Worker<C, Locked> {
|
|||
timeline: self.timeline,
|
||||
text_block_collector: self.text_block_collector,
|
||||
tool_call_collector: self.tool_call_collector,
|
||||
reasoning_item_collector: self.reasoning_item_collector,
|
||||
thinking_block_collector: self.thinking_block_collector,
|
||||
tool_server: self.tool_server,
|
||||
interceptor: self.interceptor,
|
||||
system_prompt: self.system_prompt,
|
||||
|
|
|
|||
|
|
@ -16,27 +16,53 @@ mod common;
|
|||
use common::MockLlmClient;
|
||||
use llm_worker::Item;
|
||||
use llm_worker::Worker;
|
||||
use llm_worker::llm_client::event::{Event, ReasoningItemEvent, ResponseStatus, StatusEvent};
|
||||
use llm_worker::llm_client::event::{
|
||||
BlockMetadata, BlockStart, BlockStop, BlockType, Event, ReasoningBlockData, ResponseStatus,
|
||||
StatusEvent,
|
||||
};
|
||||
|
||||
fn reasoning_block(text: impl Into<String>, data: ReasoningBlockData) -> Vec<Event> {
|
||||
vec![
|
||||
Event::BlockStart(BlockStart {
|
||||
index: 100,
|
||||
block_type: BlockType::Thinking,
|
||||
metadata: BlockMetadata::Thinking,
|
||||
}),
|
||||
Event::BlockDelta(llm_worker::llm_client::event::BlockDelta {
|
||||
index: 100,
|
||||
delta: llm_worker::llm_client::event::DeltaContent::Thinking(text.into()),
|
||||
}),
|
||||
Event::BlockStop(BlockStop {
|
||||
index: 100,
|
||||
block_type: BlockType::Thinking,
|
||||
stop_reason: None,
|
||||
reasoning: Some(data),
|
||||
}),
|
||||
]
|
||||
}
|
||||
|
||||
/// Anthropic 風: thinking ブロック → text → 終了 のシーケンス。
|
||||
/// Worker history に Reasoning(signature 付き) → assistant_message が並ぶ。
|
||||
#[tokio::test]
|
||||
async fn anthropic_thinking_round_trips_signature_into_history() {
|
||||
let events = vec![
|
||||
Event::ReasoningItem(ReasoningItemEvent {
|
||||
let mut events = reasoning_block(
|
||||
"let me think...",
|
||||
ReasoningBlockData {
|
||||
id: None,
|
||||
text: "let me think...".into(),
|
||||
text: None,
|
||||
summary: Vec::new(),
|
||||
encrypted_content: None,
|
||||
signature: Some("SIG-OPUS".into()),
|
||||
}),
|
||||
},
|
||||
);
|
||||
events.extend([
|
||||
Event::text_block_start(0),
|
||||
Event::text_delta(0, "Here's the answer"),
|
||||
Event::text_block_stop(0, None),
|
||||
Event::Status(StatusEvent {
|
||||
status: ResponseStatus::Completed,
|
||||
}),
|
||||
];
|
||||
]);
|
||||
let client = MockLlmClient::new(events);
|
||||
let worker = Worker::new(client);
|
||||
let out = worker.run("question?").await.expect("run ok");
|
||||
|
|
@ -63,21 +89,24 @@ async fn anthropic_thinking_round_trips_signature_into_history() {
|
|||
/// `Item::Reasoning` のフィールドに展開されること。
|
||||
#[tokio::test]
|
||||
async fn openai_reasoning_round_trips_encrypted_and_summary() {
|
||||
let events = vec![
|
||||
Event::ReasoningItem(ReasoningItemEvent {
|
||||
let mut events = reasoning_block(
|
||||
"",
|
||||
ReasoningBlockData {
|
||||
id: Some("r1".into()),
|
||||
text: "inner reasoning".into(),
|
||||
text: Some("inner reasoning".into()),
|
||||
summary: vec!["sum-A".into(), "sum-B".into()],
|
||||
encrypted_content: Some("ENC-OPAQUE".into()),
|
||||
signature: None,
|
||||
}),
|
||||
},
|
||||
);
|
||||
events.extend([
|
||||
Event::text_block_start(0),
|
||||
Event::text_delta(0, "answer"),
|
||||
Event::text_block_stop(0, None),
|
||||
Event::Status(StatusEvent {
|
||||
status: ResponseStatus::Completed,
|
||||
}),
|
||||
];
|
||||
]);
|
||||
let client = MockLlmClient::new(events);
|
||||
let worker = Worker::new(client);
|
||||
let out = worker.run("q").await.expect("run ok");
|
||||
|
|
@ -107,21 +136,23 @@ async fn openai_reasoning_round_trips_encrypted_and_summary() {
|
|||
/// が thinking を assistant メッセージの先頭に要求するため)。
|
||||
#[tokio::test]
|
||||
async fn reasoning_precedes_text_in_assistant_burst() {
|
||||
let events = vec![
|
||||
// text/tool_call とは独立に、ReasoningItem が中盤で発火しても、
|
||||
let mut events = vec![
|
||||
// text/tool_call とは独立に、reasoning block が中盤で完了しても、
|
||||
// history append 時には assistant items の先頭に置かれる。
|
||||
Event::text_block_start(0),
|
||||
Event::text_delta(0, "intermediate"),
|
||||
Event::text_block_stop(0, None),
|
||||
Event::ReasoningItem(ReasoningItemEvent {
|
||||
text: "after text".into(),
|
||||
];
|
||||
events.extend(reasoning_block(
|
||||
"after text",
|
||||
ReasoningBlockData {
|
||||
signature: Some("SIG".into()),
|
||||
..Default::default()
|
||||
}),
|
||||
Event::Status(StatusEvent {
|
||||
status: ResponseStatus::Completed,
|
||||
}),
|
||||
];
|
||||
},
|
||||
));
|
||||
events.push(Event::Status(StatusEvent {
|
||||
status: ResponseStatus::Completed,
|
||||
}));
|
||||
let client = MockLlmClient::new(events);
|
||||
let worker = Worker::new(client);
|
||||
let out = worker.run("q").await.expect("run ok");
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user