feat: Introduce WorkerSubscriber system with adapters

This commit is contained in:
Keisuke Hirata 2026-01-07 00:30:58 +09:00
parent 9547d40538
commit 89b12d277a
6 changed files with 693 additions and 6 deletions

View File

@ -12,10 +12,12 @@ mod event;
mod handler; mod handler;
mod hook; mod hook;
mod message; mod message;
mod subscriber;
mod tool; mod tool;
pub use event::*; pub use event::*;
pub use handler::*; pub use handler::*;
pub use hook::*; pub use hook::*;
pub use message::*; pub use message::*;
pub use subscriber::*;
pub use tool::*; pub use tool::*;

View File

@ -0,0 +1,126 @@
//! WorkerSubscriber - Worker層のイベント購読トレイト
//!
//! Timeline層のHandler機構の薄いラッパーとして設計され、
//! UIへのストリーミング表示やリアルタイムフィードバックを可能にする。
use crate::{
ErrorEvent, StatusEvent, TextBlockEvent, ToolCall, ToolUseBlockEvent, UsageEvent,
};
// =============================================================================
// WorkerSubscriber Trait
// =============================================================================
/// Worker層の統合Subscriberトレイト
///
/// Timeline層のHandler機構をラップし、以下のイベントを一括で購読できる:
/// - ブロックイベント(スコープ管理あり): Text, ToolUse
/// - 単発イベント: Usage, Status, Error
/// - 累積イベントWorker層で追加: TextComplete, ToolCallComplete
/// - ターン制御: TurnStart, TurnEnd
///
/// # 使用例
///
/// ```ignore
/// struct MyUI {
/// chat_view: ChatView,
/// }
///
/// impl WorkerSubscriber for MyUI {
/// type TextBlockScope = String;
/// type ToolUseBlockScope = ToolComponent;
///
/// fn on_text_block(&mut self, buffer: &mut String, event: &TextBlockEvent) {
/// match event {
/// TextBlockEvent::Delta(text) => {
/// buffer.push_str(text);
/// self.chat_view.update(buffer);
/// }
/// _ => {}
/// }
/// }
///
/// fn on_text_complete(&mut self, text: &str) {
/// self.chat_view.add_to_history(text);
/// }
/// }
/// ```
pub trait WorkerSubscriber: Send {
// =========================================================================
// スコープ型(ブロックイベント用)
// =========================================================================
/// テキストブロック処理用のスコープ型
///
/// ブロック開始時にDefault::default()で生成され、
/// ブロック終了時に破棄される。
type TextBlockScope: Default + Send;
/// ツール使用ブロック処理用のスコープ型
type ToolUseBlockScope: Default + Send;
// =========================================================================
// ブロックイベント(スコープ管理あり)
// =========================================================================
/// テキストブロックイベント
///
/// Start/Delta/Stopのライフサイクルを持つ。
/// scopeはブロック開始時に生成され、終了時に破棄される。
#[allow(unused_variables)]
fn on_text_block(&mut self, scope: &mut Self::TextBlockScope, event: &TextBlockEvent) {}
/// ツール使用ブロックイベント
///
/// Start/InputJsonDelta/Stopのライフサイクルを持つ。
#[allow(unused_variables)]
fn on_tool_use_block(&mut self, scope: &mut Self::ToolUseBlockScope, event: &ToolUseBlockEvent) {
}
// =========================================================================
// 単発イベント(スコープ不要)
// =========================================================================
/// 使用量イベント
#[allow(unused_variables)]
fn on_usage(&mut self, event: &UsageEvent) {}
/// ステータスイベント
#[allow(unused_variables)]
fn on_status(&mut self, event: &StatusEvent) {}
/// エラーイベント
#[allow(unused_variables)]
fn on_error(&mut self, event: &ErrorEvent) {}
// =========================================================================
// 累積イベントWorker層で追加
// =========================================================================
/// テキスト完了イベント
///
/// テキストブロックが完了した時点で、累積されたテキスト全体が渡される。
/// ブロック処理後の最終結果を受け取るのに便利。
#[allow(unused_variables)]
fn on_text_complete(&mut self, text: &str) {}
/// ツール呼び出し完了イベント
///
/// ツール使用ブロックが完了した時点で、完全なToolCallが渡される。
#[allow(unused_variables)]
fn on_tool_call_complete(&mut self, call: &ToolCall) {}
// =========================================================================
// ターン制御
// =========================================================================
/// ターン開始時
///
/// `turn`は0から始まるターン番号。
#[allow(unused_variables)]
fn on_turn_start(&mut self, turn: usize) {}
/// ターン終了時
#[allow(unused_variables)]
fn on_turn_end(&mut self, turn: usize) {}
}

View File

@ -7,6 +7,7 @@
//! - 型消去されたHandler実装 //! - 型消去されたHandler実装
pub mod llm_client; pub mod llm_client;
mod subscriber_adapter;
mod text_block_collector; mod text_block_collector;
mod timeline; mod timeline;
mod tool_call_collector; mod tool_call_collector;

View File

@ -0,0 +1,240 @@
//! WorkerSubscriber統合
//!
//! WorkerSubscriberをTimeline層のHandlerとしてブリッジする実装
use std::sync::{Arc, Mutex};
use worker_types::{
ErrorEvent, ErrorKind, Handler, StatusEvent, StatusKind, TextBlockEvent, TextBlockKind,
ToolCall, ToolUseBlockEvent, ToolUseBlockKind, UsageEvent, UsageKind, WorkerSubscriber,
};
// =============================================================================
// SubscriberAdapter - WorkerSubscriberをTimelineハンドラにブリッジ
// =============================================================================
// =============================================================================
// TextBlock Handler Adapter
// =============================================================================
/// TextBlockKind用のSubscriberアダプター
pub(crate) struct TextBlockSubscriberAdapter<S: WorkerSubscriber> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber> TextBlockSubscriberAdapter<S> {
pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
Self { subscriber }
}
}
impl<S: WorkerSubscriber> Clone for TextBlockSubscriberAdapter<S> {
fn clone(&self) -> Self {
Self {
subscriber: self.subscriber.clone(),
}
}
}
/// TextBlockのスコープをラップ
pub struct TextBlockScopeWrapper<S: WorkerSubscriber> {
inner: S::TextBlockScope,
buffer: String, // on_text_complete用のバッファ
}
impl<S: WorkerSubscriber> Default for TextBlockScopeWrapper<S> {
fn default() -> Self {
Self {
inner: S::TextBlockScope::default(),
buffer: String::new(),
}
}
}
impl<S: WorkerSubscriber + 'static> Handler<TextBlockKind> for TextBlockSubscriberAdapter<S> {
type Scope = TextBlockScopeWrapper<S>;
fn on_event(&mut self, scope: &mut Self::Scope, event: &TextBlockEvent) {
// Deltaの場合はバッファに蓄積
if let TextBlockEvent::Delta(text) = event {
scope.buffer.push_str(text);
}
// SubscriberのTextBlockイベントハンドラを呼び出し
if let Ok(mut subscriber) = self.subscriber.lock() {
subscriber.on_text_block(&mut scope.inner, event);
// Stopの場合はon_text_completeも呼び出し
if matches!(event, TextBlockEvent::Stop(_)) {
subscriber.on_text_complete(&scope.buffer);
}
}
}
}
// =============================================================================
// ToolUseBlock Handler Adapter
// =============================================================================
/// ToolUseBlockKind用のSubscriberアダプター
pub(crate) struct ToolUseBlockSubscriberAdapter<S: WorkerSubscriber> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber> ToolUseBlockSubscriberAdapter<S> {
pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
Self { subscriber }
}
}
impl<S: WorkerSubscriber> Clone for ToolUseBlockSubscriberAdapter<S> {
fn clone(&self) -> Self {
Self {
subscriber: self.subscriber.clone(),
}
}
}
/// ToolUseBlockのスコープをラップ
pub struct ToolUseBlockScopeWrapper<S: WorkerSubscriber> {
inner: S::ToolUseBlockScope,
id: String,
name: String,
input_json: String, // JSON蓄積用
}
impl<S: WorkerSubscriber> Default for ToolUseBlockScopeWrapper<S> {
fn default() -> Self {
Self {
inner: S::ToolUseBlockScope::default(),
id: String::new(),
name: String::new(),
input_json: String::new(),
}
}
}
impl<S: WorkerSubscriber + 'static> Handler<ToolUseBlockKind> for ToolUseBlockSubscriberAdapter<S> {
type Scope = ToolUseBlockScopeWrapper<S>;
fn on_event(&mut self, scope: &mut Self::Scope, event: &ToolUseBlockEvent) {
// Start時にメタデータを保存
if let ToolUseBlockEvent::Start(start) = event {
scope.id = start.id.clone();
scope.name = start.name.clone();
}
// InputJsonDeltaの場合はバッファに蓄積
if let ToolUseBlockEvent::InputJsonDelta(json) = event {
scope.input_json.push_str(json);
}
// SubscriberのToolUseBlockイベントハンドラを呼び出し
if let Ok(mut subscriber) = self.subscriber.lock() {
subscriber.on_tool_use_block(&mut scope.inner, event);
// Stopの場合はon_tool_call_completeも呼び出し
if matches!(event, ToolUseBlockEvent::Stop(_)) {
let input: serde_json::Value =
serde_json::from_str(&scope.input_json).unwrap_or_default();
let tool_call = ToolCall {
id: scope.id.clone(),
name: scope.name.clone(),
input,
};
subscriber.on_tool_call_complete(&tool_call);
}
}
}
}
// =============================================================================
// Meta Event Handler Adapters
// =============================================================================
/// UsageKind用のSubscriberアダプター
pub(crate) struct UsageSubscriberAdapter<S: WorkerSubscriber> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber> UsageSubscriberAdapter<S> {
pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
Self { subscriber }
}
}
impl<S: WorkerSubscriber> Clone for UsageSubscriberAdapter<S> {
fn clone(&self) -> Self {
Self {
subscriber: self.subscriber.clone(),
}
}
}
impl<S: WorkerSubscriber + 'static> Handler<UsageKind> for UsageSubscriberAdapter<S> {
type Scope = ();
fn on_event(&mut self, _scope: &mut Self::Scope, event: &UsageEvent) {
if let Ok(mut subscriber) = self.subscriber.lock() {
subscriber.on_usage(event);
}
}
}
/// StatusKind用のSubscriberアダプター
pub(crate) struct StatusSubscriberAdapter<S: WorkerSubscriber> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber> StatusSubscriberAdapter<S> {
pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
Self { subscriber }
}
}
impl<S: WorkerSubscriber> Clone for StatusSubscriberAdapter<S> {
fn clone(&self) -> Self {
Self {
subscriber: self.subscriber.clone(),
}
}
}
impl<S: WorkerSubscriber + 'static> Handler<StatusKind> for StatusSubscriberAdapter<S> {
type Scope = ();
fn on_event(&mut self, _scope: &mut Self::Scope, event: &StatusEvent) {
if let Ok(mut subscriber) = self.subscriber.lock() {
subscriber.on_status(event);
}
}
}
/// ErrorKind用のSubscriberアダプター
pub(crate) struct ErrorSubscriberAdapter<S: WorkerSubscriber> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber> ErrorSubscriberAdapter<S> {
pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
Self { subscriber }
}
}
impl<S: WorkerSubscriber> Clone for ErrorSubscriberAdapter<S> {
fn clone(&self) -> Self {
Self {
subscriber: self.subscriber.clone(),
}
}
}
impl<S: WorkerSubscriber + 'static> Handler<ErrorKind> for ErrorSubscriberAdapter<S> {
type Scope = ();
fn on_event(&mut self, _scope: &mut Self::Scope, event: &ErrorEvent) {
if let Ok(mut subscriber) = self.subscriber.lock() {
subscriber.on_error(event);
}
}
}

View File

@ -1,19 +1,19 @@
//! Worker - ターン制御を行う高レベルコンポーネント
//!
//! LlmClientとTimelineを内包し、Tool/Hookを用いて自律的なインタラクションを実現する。
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::{Arc, Mutex};
use futures::StreamExt; use futures::StreamExt;
use crate::llm_client::{ClientError, LlmClient, Request, ToolDefinition}; use crate::llm_client::{ClientError, LlmClient, Request, ToolDefinition};
use crate::subscriber_adapter::{
ErrorSubscriberAdapter, StatusSubscriberAdapter, TextBlockSubscriberAdapter,
ToolUseBlockSubscriberAdapter, UsageSubscriberAdapter,
};
use crate::text_block_collector::TextBlockCollector; use crate::text_block_collector::TextBlockCollector;
use crate::tool_call_collector::ToolCallCollector; use crate::tool_call_collector::ToolCallCollector;
use crate::Timeline; use crate::Timeline;
use worker_types::{ use worker_types::{
ContentPart, ControlFlow, HookError, Message, MessageContent, Tool, ToolCall, ToolError, ContentPart, ControlFlow, HookError, Message, MessageContent, Tool, ToolCall, ToolError,
ToolResult, TurnResult, WorkerHook, ToolResult, TurnResult, WorkerHook, WorkerSubscriber,
}; };
// ============================================================================= // =============================================================================
@ -48,6 +48,34 @@ pub struct WorkerConfig {
_private: (), _private: (),
} }
// =============================================================================
// ターン制御用コールバック保持
// =============================================================================
/// ターンイベントを通知するためのコールバック (型消去)
trait TurnNotifier: Send {
fn on_turn_start(&self, turn: usize);
fn on_turn_end(&self, turn: usize);
}
struct SubscriberTurnNotifier<S: WorkerSubscriber + 'static> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber + 'static> TurnNotifier for SubscriberTurnNotifier<S> {
fn on_turn_start(&self, turn: usize) {
if let Ok(mut s) = self.subscriber.lock() {
s.on_turn_start(turn);
}
}
fn on_turn_end(&self, turn: usize) {
if let Ok(mut s) = self.subscriber.lock() {
s.on_turn_end(turn);
}
}
}
// ============================================================================= // =============================================================================
// Worker // Worker
// ============================================================================= // =============================================================================
@ -74,6 +102,10 @@ pub struct Worker<C: LlmClient> {
hooks: Vec<Box<dyn WorkerHook>>, hooks: Vec<Box<dyn WorkerHook>>,
/// システムプロンプト /// システムプロンプト
system_prompt: Option<String>, system_prompt: Option<String>,
/// ターンカウント
turn_count: usize,
/// ターン通知用のコールバック
turn_notifiers: Vec<Box<dyn TurnNotifier>>,
} }
impl<C: LlmClient> Worker<C> { impl<C: LlmClient> Worker<C> {
@ -95,9 +127,42 @@ impl<C: LlmClient> Worker<C> {
tools: HashMap::new(), tools: HashMap::new(),
hooks: Vec::new(), hooks: Vec::new(),
system_prompt: None, system_prompt: None,
turn_count: 0,
turn_notifiers: Vec::new(),
} }
} }
/// WorkerSubscriberを登録
///
/// Subscriberは以下のイベントを受け取ることができる:
/// - ブロックイベント: on_text_block, on_tool_use_block
/// - 単発イベント: on_usage, on_status, on_error
/// - 累積イベント: on_text_complete, on_tool_call_complete
/// - ターン制御: on_turn_start, on_turn_end
pub fn subscribe<S: WorkerSubscriber + 'static>(&mut self, subscriber: S) {
let subscriber = Arc::new(Mutex::new(subscriber));
// TextBlock用ハンドラを登録
self.timeline
.on_text_block(TextBlockSubscriberAdapter::new(subscriber.clone()));
// ToolUseBlock用ハンドラを登録
self.timeline
.on_tool_use_block(ToolUseBlockSubscriberAdapter::new(subscriber.clone()));
// Meta系ハンドラを登録
self.timeline
.on_usage(UsageSubscriberAdapter::new(subscriber.clone()));
self.timeline
.on_status(StatusSubscriberAdapter::new(subscriber.clone()));
self.timeline
.on_error(ErrorSubscriberAdapter::new(subscriber.clone()));
// ターン制御用コールバックを登録
self.turn_notifiers
.push(Box::new(SubscriberTurnNotifier { subscriber }));
}
/// システムプロンプトを設定 /// システムプロンプトを設定
pub fn system_prompt(mut self, prompt: impl Into<String>) -> Self { pub fn system_prompt(mut self, prompt: impl Into<String>) -> Self {
self.system_prompt = Some(prompt.into()); self.system_prompt = Some(prompt.into());
@ -159,9 +224,19 @@ impl<C: LlmClient> Worker<C> {
let tool_definitions = self.build_tool_definitions(); let tool_definitions = self.build_tool_definitions();
loop { loop {
// ターン開始を通知
let current_turn = self.turn_count;
for notifier in &self.turn_notifiers {
notifier.on_turn_start(current_turn);
}
// Hook: on_message_send // Hook: on_message_send
let control = self.run_on_message_send_hooks(&mut context).await?; let control = self.run_on_message_send_hooks(&mut context).await?;
if let ControlFlow::Abort(reason) = control { if let ControlFlow::Abort(reason) = control {
// ターン終了を通知(異常終了)
for notifier in &self.turn_notifiers {
notifier.on_turn_end(current_turn);
}
return Err(WorkerError::Aborted(reason)); return Err(WorkerError::Aborted(reason));
} }
@ -175,6 +250,12 @@ impl<C: LlmClient> Worker<C> {
self.timeline.dispatch(&event); self.timeline.dispatch(&event);
} }
// ターン終了を通知
for notifier in &self.turn_notifiers {
notifier.on_turn_end(current_turn);
}
self.turn_count += 1;
// 収集結果を取得 // 収集結果を取得
let text_blocks = self.text_block_collector.take_collected(); let text_blocks = self.text_block_collector.take_collected();
let tool_calls = self.tool_call_collector.take_collected(); let tool_calls = self.tool_call_collector.take_collected();

View File

@ -0,0 +1,237 @@
//! WorkerSubscriberのテスト
//!
//! WorkerSubscriberを使ってイベントを購読するテスト
mod common;
use std::sync::{Arc, Mutex};
use common::MockLlmClient;
use worker::{Worker, WorkerSubscriber};
use worker_types::{
ErrorEvent, Event, Message, ResponseStatus, StatusEvent, TextBlockEvent, ToolCall,
ToolUseBlockEvent, UsageEvent,
};
// =============================================================================
// Test Subscriber
// =============================================================================
/// テスト用のシンプルなSubscriber実装
struct TestSubscriber {
// 記録用のバッファ
text_deltas: Arc<Mutex<Vec<String>>>,
text_completes: Arc<Mutex<Vec<String>>>,
tool_call_completes: Arc<Mutex<Vec<ToolCall>>>,
usage_events: Arc<Mutex<Vec<UsageEvent>>>,
status_events: Arc<Mutex<Vec<StatusEvent>>>,
turn_starts: Arc<Mutex<Vec<usize>>>,
turn_ends: Arc<Mutex<Vec<usize>>>,
}
impl TestSubscriber {
fn new() -> Self {
Self {
text_deltas: Arc::new(Mutex::new(Vec::new())),
text_completes: Arc::new(Mutex::new(Vec::new())),
tool_call_completes: Arc::new(Mutex::new(Vec::new())),
usage_events: Arc::new(Mutex::new(Vec::new())),
status_events: Arc::new(Mutex::new(Vec::new())),
turn_starts: Arc::new(Mutex::new(Vec::new())),
turn_ends: Arc::new(Mutex::new(Vec::new())),
}
}
}
impl WorkerSubscriber for TestSubscriber {
type TextBlockScope = String;
type ToolUseBlockScope = ();
fn on_text_block(&mut self, buffer: &mut String, event: &TextBlockEvent) {
if let TextBlockEvent::Delta(text) = event {
buffer.push_str(text);
self.text_deltas.lock().unwrap().push(text.clone());
}
}
fn on_text_complete(&mut self, text: &str) {
self.text_completes.lock().unwrap().push(text.to_string());
}
fn on_tool_use_block(&mut self, _scope: &mut (), _event: &ToolUseBlockEvent) {
// 必要に応じて処理
}
fn on_tool_call_complete(&mut self, call: &ToolCall) {
self.tool_call_completes.lock().unwrap().push(call.clone());
}
fn on_usage(&mut self, event: &UsageEvent) {
self.usage_events.lock().unwrap().push(event.clone());
}
fn on_status(&mut self, event: &StatusEvent) {
self.status_events.lock().unwrap().push(event.clone());
}
fn on_error(&mut self, _event: &ErrorEvent) {
// 必要に応じて処理
}
fn on_turn_start(&mut self, turn: usize) {
self.turn_starts.lock().unwrap().push(turn);
}
fn on_turn_end(&mut self, turn: usize) {
self.turn_ends.lock().unwrap().push(turn);
}
}
// =============================================================================
// Tests
// =============================================================================
/// WorkerSubscriberがテキストブロックイベントを正しく受け取ることを確認
#[tokio::test]
async fn test_subscriber_text_block_events() {
// テキストレスポンスを含むイベントシーケンス
let events = vec![
Event::text_block_start(0),
Event::text_delta(0, "Hello, "),
Event::text_delta(0, "World!"),
Event::text_block_stop(0, None),
Event::Status(StatusEvent {
status: ResponseStatus::Completed,
}),
];
let client = MockLlmClient::new(events);
let mut worker = Worker::new(client);
// Subscriberを登録
let subscriber = TestSubscriber::new();
let text_deltas = subscriber.text_deltas.clone();
let text_completes = subscriber.text_completes.clone();
worker.subscribe(subscriber);
// 実行
let messages = vec![Message::user("Greet me")];
let result = worker.run(messages).await;
assert!(result.is_ok(), "Worker should complete: {:?}", result);
// デルタが収集されていることを確認
let deltas = text_deltas.lock().unwrap();
assert_eq!(deltas.len(), 2);
assert_eq!(deltas[0], "Hello, ");
assert_eq!(deltas[1], "World!");
// 完了テキストが収集されていることを確認
let completes = text_completes.lock().unwrap();
assert_eq!(completes.len(), 1);
assert_eq!(completes[0], "Hello, World!");
}
/// WorkerSubscriberがツール呼び出し完了イベントを正しく受け取ることを確認
#[tokio::test]
async fn test_subscriber_tool_call_complete() {
// ツール呼び出しを含むイベントシーケンス
let events = vec![
Event::tool_use_start(0, "call_123", "get_weather"),
Event::tool_input_delta(0, r#"{"city":"#),
Event::tool_input_delta(0, r#""Tokyo"}"#),
Event::tool_use_stop(0),
Event::Status(StatusEvent {
status: ResponseStatus::Completed,
}),
];
let client = MockLlmClient::new(events);
let mut worker = Worker::new(client);
// Subscriberを登録
let subscriber = TestSubscriber::new();
let tool_call_completes = subscriber.tool_call_completes.clone();
worker.subscribe(subscriber);
// 実行
let messages = vec![Message::user("Weather please")];
let _ = worker.run(messages).await;
// ツール呼び出し完了が収集されていることを確認
let completes = tool_call_completes.lock().unwrap();
assert_eq!(completes.len(), 1);
assert_eq!(completes[0].name, "get_weather");
assert_eq!(completes[0].id, "call_123");
assert_eq!(completes[0].input["city"], "Tokyo");
}
/// WorkerSubscriberがターンイベントを正しく受け取ることを確認
#[tokio::test]
async fn test_subscriber_turn_events() {
let events = vec![
Event::text_block_start(0),
Event::text_delta(0, "Done!"),
Event::text_block_stop(0, None),
Event::Status(StatusEvent {
status: ResponseStatus::Completed,
}),
];
let client = MockLlmClient::new(events);
let mut worker = Worker::new(client);
// Subscriberを登録
let subscriber = TestSubscriber::new();
let turn_starts = subscriber.turn_starts.clone();
let turn_ends = subscriber.turn_ends.clone();
worker.subscribe(subscriber);
// 実行
let messages = vec![Message::user("Do something")];
let result = worker.run(messages).await;
assert!(result.is_ok());
// ターンイベントが収集されていることを確認
let starts = turn_starts.lock().unwrap();
let ends = turn_ends.lock().unwrap();
assert_eq!(starts.len(), 1);
assert_eq!(starts[0], 0); // 最初のターン
assert_eq!(ends.len(), 1);
assert_eq!(ends[0], 0);
}
/// WorkerSubscriberがUsageイベントを正しく受け取ることを確認
#[tokio::test]
async fn test_subscriber_usage_events() {
let events = vec![
Event::text_block_start(0),
Event::text_delta(0, "Hello"),
Event::text_block_stop(0, None),
Event::usage(100, 50),
Event::Status(StatusEvent {
status: ResponseStatus::Completed,
}),
];
let client = MockLlmClient::new(events);
let mut worker = Worker::new(client);
// Subscriberを登録
let subscriber = TestSubscriber::new();
let usage_events = subscriber.usage_events.clone();
worker.subscribe(subscriber);
// 実行
let messages = vec![Message::user("Hello")];
let _ = worker.run(messages).await;
// Usageイベントが収集されていることを確認
let usages = usage_events.lock().unwrap();
assert_eq!(usages.len(), 1);
assert_eq!(usages[0].input_tokens, Some(100));
assert_eq!(usages[0].output_tokens, Some(50));
}