diff --git a/crates/llm-worker/README.md b/crates/llm-worker/README.md index 0396e55b..3077f261 100644 --- a/crates/llm-worker/README.md +++ b/crates/llm-worker/README.md @@ -16,8 +16,8 @@ LLM との対話を管理する低レベル基盤クレート。会話履歴、 - `tool` — ツール定義・実行(`Tool` トレイト、`ToolDefinition`, `ToolOutput`, サイズ判定による Inline/Stored 切替) - `tool_server` — ツール登録・ルックアップ(`ToolServer`, `ToolServerHandle`) - `hook` — 実行フローへの介入ポイント(`Hook` トレイト、`PreToolCall`, `PostToolCall`, `OnTurnEnd` など) -- `subscriber` — リアルタイムイベント購読(`WorkerSubscriber` トレイト) -- `timeline` — イベントストリームのディスパッチ(`Handler` トレイト、各ブロックコレクター) +- クロージャベースイベント購読(`Worker::on_text_block()`, `on_tool_use_block()`, `on_usage()` 等) +- `timeline` — イベントストリームのディスパッチ(`Handler` トレイト、各ブロックコレクター)。パワーユーザー向けに `timeline_mut()` も提供 - `event` — ストリーミングイベント型(`Event`, `BlockStart`, `BlockDelta` など) - `state` — 型状態パターンによるキャッシュ保護(`Mutable` / `CacheLocked`) cratesの整理Add READMEsRE to all crates@@ diff --git a/crates/llm-worker/docs/architecture.md b/crates/llm-worker/docs/architecture.md index 3114401a..920e917d 100644 --- a/crates/llm-worker/docs/architecture.md +++ b/crates/llm-worker/docs/architecture.md @@ -33,7 +33,7 @@ llm-workerは3層構成でLLMとのインタラクションを管理する。 | `tool` / `tool_server` | ツール定義・登録・実行 | R3 | | `timeline` | イベントストリーム処理、Handler dispatch | — | | `handler` | Handler/Kind trait、ブロック別ハンドラ | — | -| `subscriber` | WorkerSubscriber trait、UI向けイベント配信 | — | +| `callback` | クロージャベースイベント購読(`on_text_block`, `on_usage` 等) | — | | `llm_client` | LLMプロバイダへのHTTPリクエスト/ストリーミング | — | | `llm_client/scheme` | プロバイダ固有ワイヤーフォーマット変換 | — | | `llm_client/providers` | Anthropic, OpenAI, Gemini, Ollama実装 | — | diff --git a/crates/llm-worker/src/callback.rs b/crates/llm-worker/src/callback.rs new file mode 100644 index 00000000..a66de95f --- /dev/null +++ b/crates/llm-worker/src/callback.rs @@ -0,0 +1,216 @@ +//! Closure-based event callback API +//! +//! Provides a closure-based alternative to implementing `Handler` directly. +//! Register callbacks on `Worker` via `on_text_block()`, `on_tool_use_block()`, +//! `on_usage()`, etc. + +use std::marker::PhantomData; + +use crate::handler::{ + Handler, Kind, TextBlockEvent, TextBlockKind, ToolUseBlockEvent, ToolUseBlockKind, + ToolUseBlockStart, +}; +use crate::hook::ToolCall; + +// ============================================================================= +// TextBlock Closure Handler +// ============================================================================= + +/// Callback scope for a text block. +/// +/// Passed to the setup closure registered with `Worker::on_text_block()`. +/// Register per-block callbacks via `on_delta()` and `on_stop()`. +/// +/// # Examples +/// +/// ```ignore +/// worker.on_text_block(|block| { +/// block.on_delta(|text| print!("{}", text)); +/// block.on_stop(|full_text| println!("\n--- {} chars ---", full_text.len())); +/// }); +/// ``` +pub struct TextBlockScope { + pub(crate) on_delta: Option>, + pub(crate) on_stop: Option>, +} + +impl TextBlockScope { + fn new() -> Self { + Self { + on_delta: None, + on_stop: None, + } + } + + /// Register a callback for each text delta (streaming fragment). + pub fn on_delta(&mut self, f: impl FnMut(&str) + Send + Sync + 'static) { + self.on_delta = Some(Box::new(f)); + } + + /// Register a callback invoked when the block completes. + /// + /// Receives the full accumulated text of the block. + pub fn on_stop(&mut self, f: impl FnMut(&str) + Send + Sync + 'static) { + self.on_stop = Some(Box::new(f)); + } +} + +/// Per-block state created by Timeline's scope lifecycle. +#[derive(Default)] +pub(crate) struct TextBlockClosureState { + on_delta: Option>, + on_stop: Option>, + buffer: String, +} + +/// Closure-based `Handler` adapter. +pub(crate) struct ClosureTextBlockHandler { + pub(crate) setup: Box, +} + +impl Handler for ClosureTextBlockHandler { + type Scope = TextBlockClosureState; + + fn on_event(&mut self, scope: &mut Self::Scope, event: &TextBlockEvent) { + match event { + TextBlockEvent::Start(_) => { + scope.buffer.clear(); + let mut builder = TextBlockScope::new(); + (self.setup)(&mut builder); + scope.on_delta = builder.on_delta; + scope.on_stop = builder.on_stop; + } + TextBlockEvent::Delta(text) => { + scope.buffer.push_str(text); + if let Some(f) = &mut scope.on_delta { + f(text); + } + } + TextBlockEvent::Stop(_) => { + if let Some(f) = &mut scope.on_stop { + f(&scope.buffer); + } + } + } + } +} + +// ============================================================================= +// ToolUseBlock Closure Handler +// ============================================================================= + +/// Callback scope for a tool use block. +/// +/// Passed to the setup closure registered with `Worker::on_tool_use_block()`. +/// The setup closure also receives `&ToolUseBlockStart` with `id` and `name`. +/// +/// # Examples +/// +/// ```ignore +/// worker.on_tool_use_block(|start, block| { +/// println!("Tool: {} ({})", start.name, start.id); +/// block.on_delta(|json| { /* streaming JSON fragment */ }); +/// block.on_stop(|call| println!("Done: {}", call.name)); +/// }); +/// ``` +pub struct ToolUseBlockScope { + pub(crate) on_delta: Option>, + pub(crate) on_stop: Option>, +} + +impl ToolUseBlockScope { + fn new() -> Self { + Self { + on_delta: None, + on_stop: None, + } + } + + /// Register a callback for each JSON input delta (streaming fragment). + pub fn on_delta(&mut self, f: impl FnMut(&str) + Send + Sync + 'static) { + self.on_delta = Some(Box::new(f)); + } + + /// Register a callback invoked when the block completes. + /// + /// Receives the fully assembled `ToolCall` with parsed JSON input. + pub fn on_stop(&mut self, f: impl FnMut(&ToolCall) + Send + Sync + 'static) { + self.on_stop = Some(Box::new(f)); + } +} + +/// Per-block state for tool use closure handler. +#[derive(Default)] +pub(crate) struct ToolUseBlockClosureState { + on_delta: Option>, + on_stop: Option>, + id: String, + name: String, + input_json: String, +} + +/// Closure-based `Handler` adapter. +pub(crate) struct ClosureToolUseBlockHandler { + pub(crate) setup: Box, +} + +impl Handler for ClosureToolUseBlockHandler { + type Scope = ToolUseBlockClosureState; + + fn on_event(&mut self, scope: &mut Self::Scope, event: &ToolUseBlockEvent) { + match event { + ToolUseBlockEvent::Start(start) => { + scope.id = start.id.clone(); + scope.name = start.name.clone(); + scope.input_json.clear(); + let mut builder = ToolUseBlockScope::new(); + (self.setup)(start, &mut builder); + scope.on_delta = builder.on_delta; + scope.on_stop = builder.on_stop; + } + ToolUseBlockEvent::InputJsonDelta(json) => { + scope.input_json.push_str(json); + if let Some(f) = &mut scope.on_delta { + f(json); + } + } + ToolUseBlockEvent::Stop(_) => { + let input: serde_json::Value = + serde_json::from_str(&scope.input_json).unwrap_or_default(); + let tool_call = ToolCall { + id: std::mem::take(&mut scope.id), + name: std::mem::take(&mut scope.name), + input, + }; + if let Some(f) = &mut scope.on_stop { + f(&tool_call); + } + } + } + } +} + +// ============================================================================= +// Generic Meta Event Closure Handler +// ============================================================================= + +/// Closure-based `Handler` adapter for meta events (Usage, Status, Error). +pub(crate) struct ClosureMetaHandler +where + K: Kind, +{ + pub(crate) callback: F, + pub(crate) _kind: PhantomData, +} + +impl Handler for ClosureMetaHandler +where + F: FnMut(&K::Event) + Send + Sync, + K: Kind, +{ + type Scope = (); + + fn on_event(&mut self, _scope: &mut (), event: &K::Event) { + (self.callback)(event); + } +} diff --git a/crates/llm-worker/src/lib.rs b/crates/llm-worker/src/lib.rs index a958bfe9..0f0fb052 100644 --- a/crates/llm-worker/src/lib.rs +++ b/crates/llm-worker/src/lib.rs @@ -7,7 +7,7 @@ //! - [`Worker`] - Central component for managing LLM interactions //! - [`tool::Tool`] - Tools that can be invoked by the LLM //! - [`hook::Hook`] - Hooks for intercepting turn progression -//! - [`subscriber::WorkerSubscriber`] - Subscribing to streaming events +//! - Closure-based event callbacks via `Worker::on_text_block()`, `on_tool_use_block()`, etc. //! //! # Quick Start //! @@ -39,14 +39,16 @@ mod handler; mod message; mod worker; +pub(crate) mod callback; pub mod event; pub mod hook; pub mod llm_client; pub mod state; -pub mod subscriber; pub mod timeline; pub mod tool; pub mod tool_server; +pub use callback::{TextBlockScope, ToolUseBlockScope}; +pub use handler::ToolUseBlockStart; pub use message::{ContentPart, Item, Message, Role}; pub use worker::{ToolRegistryError, Worker, WorkerConfig, WorkerError, WorkerResult}; diff --git a/crates/llm-worker/src/subscriber.rs b/crates/llm-worker/src/subscriber.rs deleted file mode 100644 index 3f384930..00000000 --- a/crates/llm-worker/src/subscriber.rs +++ /dev/null @@ -1,371 +0,0 @@ -//! Event Subscription -//! -//! Trait for receiving streaming events from LLM in real-time. -//! Used for stream display to UI and progress display. - -use std::sync::{Arc, Mutex}; - -use crate::{ - handler::{ - ErrorKind, Handler, StatusKind, TextBlockEvent, TextBlockKind, ToolUseBlockEvent, - ToolUseBlockKind, UsageKind, - }, - hook::ToolCall, - timeline::event::{ErrorEvent, StatusEvent, UsageEvent}, -}; - -// ============================================================================= -// WorkerSubscriber Trait -// ============================================================================= - -/// Trait for subscribing to streaming events from LLM -/// -/// When registered with Worker, you can receive events from text generation -/// and tool calls in real-time. Ideal for stream display to UI. -/// -/// # Available Events -/// -/// - **Block events**: Text, tool use (with scope) -/// - **Meta events**: Usage, status, error -/// - **Completion events**: Text complete, tool call complete -/// - **Turn control**: Turn start, turn end -/// -/// # Examples -/// -/// ```ignore -/// use llm_worker::subscriber::WorkerSubscriber; -/// use llm_worker::timeline::TextBlockEvent; -/// -/// struct StreamPrinter; -/// -/// impl WorkerSubscriber for StreamPrinter { -/// type TextBlockScope = (); -/// type ToolUseBlockScope = (); -/// -/// fn on_text_block(&mut self, _: &mut (), event: &TextBlockEvent) { -/// if let TextBlockEvent::Delta(text) = event { -/// print!("{}", text); // Real-time output -/// } -/// } -/// -/// fn on_text_complete(&mut self, text: &str) { -/// println!("\n--- Complete: {} chars ---", text.len()); -/// } -/// } -/// -/// // Register with Worker -/// worker.subscribe(StreamPrinter); -/// ``` -pub trait WorkerSubscriber: Send { - // ========================================================================= - // Scope Types (for block events) - // ========================================================================= - - /// Scope type for text block processing - /// - /// Generated with Default::default() at block start, - /// destroyed at block end. - type TextBlockScope: Default + Send + Sync; - - /// Scope type for tool use block processing - type ToolUseBlockScope: Default + Send + Sync; - - // ========================================================================= - // Block Events (with scope management) - // ========================================================================= - - /// Text block event - /// - /// Has Start/Delta/Stop lifecycle. - /// Scope is generated at block start and destroyed at end. - #[allow(unused_variables)] - fn on_text_block(&mut self, scope: &mut Self::TextBlockScope, event: &TextBlockEvent) {} - - /// Tool use block event - /// - /// Has Start/InputJsonDelta/Stop lifecycle. - #[allow(unused_variables)] - fn on_tool_use_block( - &mut self, - scope: &mut Self::ToolUseBlockScope, - event: &ToolUseBlockEvent, - ) { - } - - // ========================================================================= - // Single Events (no scope needed) - // ========================================================================= - - /// Usage event - #[allow(unused_variables)] - fn on_usage(&mut self, event: &UsageEvent) {} - - /// Status event - #[allow(unused_variables)] - fn on_status(&mut self, event: &StatusEvent) {} - - /// Error event - #[allow(unused_variables)] - fn on_error(&mut self, event: &ErrorEvent) {} - - // ========================================================================= - // Accumulated Events (added in Worker layer) - // ========================================================================= - - /// Text complete event - /// - /// When a text block completes, the entire accumulated text is passed. - /// Convenient for receiving the final result after block processing. - #[allow(unused_variables)] - fn on_text_complete(&mut self, text: &str) {} - - /// Tool call complete event - /// - /// When a tool use block completes, the complete ToolCall is passed. - #[allow(unused_variables)] - fn on_tool_call_complete(&mut self, call: &ToolCall) {} - - // ========================================================================= - // Turn Control - // ========================================================================= - - /// On turn start - /// - /// `turn` is a 0-based turn number. - #[allow(unused_variables)] - fn on_turn_start(&mut self, turn: usize) {} - - /// On turn end - #[allow(unused_variables)] - fn on_turn_end(&mut self, turn: usize) {} -} - -// ============================================================================= -// SubscriberAdapter - Bridge WorkerSubscriber to Timeline handlers -// ============================================================================= - -// ============================================================================= -// TextBlock Handler Adapter -// ============================================================================= - -/// Subscriber adapter for TextBlockKind -pub(crate) struct TextBlockSubscriberAdapter { - subscriber: Arc>, -} - -impl TextBlockSubscriberAdapter { - pub fn new(subscriber: Arc>) -> Self { - Self { subscriber } - } -} - -impl Clone for TextBlockSubscriberAdapter { - fn clone(&self) -> Self { - Self { - subscriber: self.subscriber.clone(), - } - } -} - -/// Wrapper for TextBlock scope -pub struct TextBlockScopeWrapper { - inner: S::TextBlockScope, - buffer: String, // Buffer for on_text_complete -} - -impl Default for TextBlockScopeWrapper { - fn default() -> Self { - Self { - inner: S::TextBlockScope::default(), - buffer: String::new(), - } - } -} - -impl Handler for TextBlockSubscriberAdapter { - type Scope = TextBlockScopeWrapper; - - fn on_event(&mut self, scope: &mut Self::Scope, event: &TextBlockEvent) { - // Accumulate deltas into buffer - if let TextBlockEvent::Delta(text) = event { - scope.buffer.push_str(text); - } - - // Call Subscriber's TextBlock event handler - if let Ok(mut subscriber) = self.subscriber.lock() { - subscriber.on_text_block(&mut scope.inner, event); - - // Also call on_text_complete on Stop - if matches!(event, TextBlockEvent::Stop(_)) { - subscriber.on_text_complete(&scope.buffer); - } - } - } -} - -// ============================================================================= -// ToolUseBlock Handler Adapter -// ============================================================================= - -/// Subscriber adapter for ToolUseBlockKind -pub(crate) struct ToolUseBlockSubscriberAdapter { - subscriber: Arc>, -} - -impl ToolUseBlockSubscriberAdapter { - pub fn new(subscriber: Arc>) -> Self { - Self { subscriber } - } -} - -impl Clone for ToolUseBlockSubscriberAdapter { - fn clone(&self) -> Self { - Self { - subscriber: self.subscriber.clone(), - } - } -} - -/// Wrapper for ToolUseBlock scope -pub struct ToolUseBlockScopeWrapper { - inner: S::ToolUseBlockScope, - id: String, - name: String, - input_json: String, // JSON accumulation -} - -impl Default for ToolUseBlockScopeWrapper { - fn default() -> Self { - Self { - inner: S::ToolUseBlockScope::default(), - id: String::new(), - name: String::new(), - input_json: String::new(), - } - } -} - -impl Handler for ToolUseBlockSubscriberAdapter { - type Scope = ToolUseBlockScopeWrapper; - - fn on_event(&mut self, scope: &mut Self::Scope, event: &ToolUseBlockEvent) { - // Save metadata on Start - if let ToolUseBlockEvent::Start(start) = event { - scope.id = start.id.clone(); - scope.name = start.name.clone(); - } - - // Accumulate InputJsonDelta into buffer - if let ToolUseBlockEvent::InputJsonDelta(json) = event { - scope.input_json.push_str(json); - } - - // Call Subscriber's ToolUseBlock event handler - if let Ok(mut subscriber) = self.subscriber.lock() { - subscriber.on_tool_use_block(&mut scope.inner, event); - - // Also call on_tool_call_complete on Stop - if matches!(event, ToolUseBlockEvent::Stop(_)) { - let input: serde_json::Value = - serde_json::from_str(&scope.input_json).unwrap_or_default(); - let tool_call = ToolCall { - id: scope.id.clone(), - name: scope.name.clone(), - input, - }; - subscriber.on_tool_call_complete(&tool_call); - } - } - } -} - -// ============================================================================= -// Meta Event Handler Adapters -// ============================================================================= - -/// Subscriber adapter for UsageKind -pub(crate) struct UsageSubscriberAdapter { - subscriber: Arc>, -} - -impl UsageSubscriberAdapter { - pub fn new(subscriber: Arc>) -> Self { - Self { subscriber } - } -} - -impl Clone for UsageSubscriberAdapter { - fn clone(&self) -> Self { - Self { - subscriber: self.subscriber.clone(), - } - } -} - -impl Handler for UsageSubscriberAdapter { - type Scope = (); - - fn on_event(&mut self, _scope: &mut Self::Scope, event: &UsageEvent) { - if let Ok(mut subscriber) = self.subscriber.lock() { - subscriber.on_usage(event); - } - } -} - -/// Subscriber adapter for StatusKind -pub(crate) struct StatusSubscriberAdapter { - subscriber: Arc>, -} - -impl StatusSubscriberAdapter { - pub fn new(subscriber: Arc>) -> Self { - Self { subscriber } - } -} - -impl Clone for StatusSubscriberAdapter { - fn clone(&self) -> Self { - Self { - subscriber: self.subscriber.clone(), - } - } -} - -impl Handler for StatusSubscriberAdapter { - type Scope = (); - - fn on_event(&mut self, _scope: &mut Self::Scope, event: &StatusEvent) { - if let Ok(mut subscriber) = self.subscriber.lock() { - subscriber.on_status(event); - } - } -} - -/// Subscriber adapter for ErrorKind -pub(crate) struct ErrorSubscriberAdapter { - subscriber: Arc>, -} - -impl ErrorSubscriberAdapter { - pub fn new(subscriber: Arc>) -> Self { - Self { subscriber } - } -} - -impl Clone for ErrorSubscriberAdapter { - fn clone(&self) -> Self { - Self { - subscriber: self.subscriber.clone(), - } - } -} - -impl Handler for ErrorSubscriberAdapter { - type Scope = (); - - fn on_event(&mut self, _scope: &mut Self::Scope, event: &ErrorEvent) { - if let Ok(mut subscriber) = self.subscriber.lock() { - subscriber.on_error(event); - } - } -} diff --git a/crates/llm-worker/src/timeline/mod.rs b/crates/llm-worker/src/timeline/mod.rs index 1f944a2b..9ee896e8 100644 --- a/crates/llm-worker/src/timeline/mod.rs +++ b/crates/llm-worker/src/timeline/mod.rs @@ -17,7 +17,7 @@ mod tool_call_collector; // 公開API pub use event::*; pub use text_block_collector::TextBlockCollector; -pub use timeline::{ErasedHandler, HandlerWrapper, Timeline}; +pub use timeline::Timeline; pub use tool_call_collector::ToolCallCollector; // 型定義からのre-export diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index 97ee83f7..c895e918 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::marker::PhantomData; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use futures::StreamExt; use tokio::sync::mpsc; @@ -18,11 +18,13 @@ use crate::{ }, llm_client::{ClientError, ConfigWarning, LlmClient, Request, RequestConfig, ToolDefinition}, state::{CacheLocked, Mutable, WorkerState}, - subscriber::{ - ErrorSubscriberAdapter, StatusSubscriberAdapter, TextBlockSubscriberAdapter, - ToolUseBlockSubscriberAdapter, UsageSubscriberAdapter, WorkerSubscriber, + callback::{ + ClosureMetaHandler, ClosureTextBlockHandler, ClosureToolUseBlockHandler, TextBlockScope, + ToolUseBlockScope, }, + handler::{ErrorKind, StatusKind, ToolUseBlockStart, UsageKind}, timeline::{TextBlockCollector, Timeline, ToolCallCollector}, + timeline::event::{ErrorEvent, StatusEvent, UsageEvent}, tool::{ToolDefinition as WorkerToolDefinition, ToolError, ToolOutputProcessor}, tool_server::{ToolServer, ToolServerError, ToolServerHandle}, }; @@ -94,34 +96,6 @@ enum ToolExecutionResult { Paused, } -// ============================================================================= -// Turn Control Callback Storage -// ============================================================================= - -/// Callback for notifying turn events (type-erased) -trait TurnNotifier: Send + Sync { - fn on_turn_start(&self, turn: usize); - fn on_turn_end(&self, turn: usize); -} - -struct SubscriberTurnNotifier { - subscriber: Arc>, -} - -impl TurnNotifier for SubscriberTurnNotifier { - fn on_turn_start(&self, turn: usize) { - if let Ok(mut s) = self.subscriber.lock() { - s.on_turn_start(turn); - } - } - - fn on_turn_end(&self, turn: usize) { - if let Ok(mut s) = self.subscriber.lock() { - s.on_turn_end(turn); - } - } -} - // ============================================================================= // Worker // ============================================================================= @@ -183,8 +157,10 @@ pub struct Worker { turn_count: usize, /// Maximum number of turns (None = unlimited) max_turns: Option, - /// Turn notification callbacks - turn_notifiers: Vec>, + /// Turn-start callbacks + turn_start_cbs: Vec>, + /// Turn-end callbacks + turn_end_cbs: Vec>, /// Request configuration (max_tokens, temperature, etc.) request_config: RequestConfig, /// Whether the previous run was interrupted @@ -256,59 +232,102 @@ impl Worker { } } - /// Register an event subscriber + /// Register a text block observer with scoped callbacks. /// - /// Registered subscribers receive streaming events from the LLM - /// in real-time. Useful for streaming display to UI. - /// - /// # Available Events - /// - /// - **Block events**: `on_text_block`, `on_tool_use_block` - /// - **Meta events**: `on_usage`, `on_status`, `on_error` - /// - **Completion events**: `on_text_complete`, `on_tool_call_complete` - /// - **Turn control**: `on_turn_start`, `on_turn_end` + /// The setup closure is called once per text block. Inside it, register + /// `on_delta` and/or `on_stop` callbacks on the provided scope. /// /// # Examples /// /// ```ignore - /// use llm_worker::{Worker, WorkerSubscriber, TextBlockEvent}; - /// - /// struct MyPrinter; - /// impl WorkerSubscriber for MyPrinter { - /// type TextBlockScope = (); - /// type ToolUseBlockScope = (); - /// - /// fn on_text_block(&mut self, _: &mut (), event: &TextBlockEvent) { - /// if let TextBlockEvent::Delta(text) = event { - /// print!("{}", text); - /// } - /// } - /// } - /// - /// worker.subscribe(MyPrinter); + /// worker.on_text_block(|block| { + /// block.on_delta(|text| print!("{}", text)); + /// block.on_stop(|full_text| println!("\n--- {} chars ---", full_text.len())); + /// }); /// ``` - pub fn subscribe(&mut self, subscriber: Sub) { - let subscriber = Arc::new(Mutex::new(subscriber)); + pub fn on_text_block( + &mut self, + setup: impl FnMut(&mut TextBlockScope) + Send + Sync + 'static, + ) { + self.timeline + .on_text_block(ClosureTextBlockHandler { + setup: Box::new(setup), + }); + } - // Register TextBlock handler + /// Register a tool use block observer with scoped callbacks. + /// + /// The setup closure receives `&ToolUseBlockStart` (containing `id` and `name`) + /// and a scope for registering `on_delta` and `on_stop` callbacks. + /// + /// `on_stop` receives a fully assembled `&ToolCall` with parsed JSON input. + /// + /// # Examples + /// + /// ```ignore + /// worker.on_tool_use_block(|start, block| { + /// println!("Tool: {} ({})", start.name, start.id); + /// block.on_delta(|json| { /* streaming JSON fragment */ }); + /// block.on_stop(|call| println!("Done: {}", call.name)); + /// }); + /// ``` + pub fn on_tool_use_block( + &mut self, + setup: impl FnMut(&ToolUseBlockStart, &mut ToolUseBlockScope) + Send + Sync + 'static, + ) { self.timeline - .on_text_block(TextBlockSubscriberAdapter::new(subscriber.clone())); + .on_tool_use_block(ClosureToolUseBlockHandler { + setup: Box::new(setup), + }); + } - // Register ToolUseBlock handler - self.timeline - .on_tool_use_block(ToolUseBlockSubscriberAdapter::new(subscriber.clone())); + /// Register a usage event callback. + pub fn on_usage( + &mut self, + callback: impl FnMut(&UsageEvent) + Send + Sync + 'static, + ) { + self.timeline.on_usage(ClosureMetaHandler { + callback, + _kind: PhantomData::, + }); + } - // Register meta handlers - self.timeline - .on_usage(UsageSubscriberAdapter::new(subscriber.clone())); - self.timeline - .on_status(StatusSubscriberAdapter::new(subscriber.clone())); - self.timeline - .on_error(ErrorSubscriberAdapter::new(subscriber.clone())); + /// Register a status event callback. + pub fn on_status( + &mut self, + callback: impl FnMut(&StatusEvent) + Send + Sync + 'static, + ) { + self.timeline.on_status(ClosureMetaHandler { + callback, + _kind: PhantomData::, + }); + } - // Register turn control callback - self.turn_notifiers - .push(Box::new(SubscriberTurnNotifier { subscriber })); + /// Register an error event callback. + pub fn on_error( + &mut self, + callback: impl FnMut(&ErrorEvent) + Send + Sync + 'static, + ) { + self.timeline.on_error(ClosureMetaHandler { + callback, + _kind: PhantomData::, + }); + } + + /// Register a turn-start callback (receives 0-based turn number). + pub fn on_turn_start( + &mut self, + callback: impl Fn(usize) + Send + Sync + 'static, + ) { + self.turn_start_cbs.push(Box::new(callback)); + } + + /// Register a turn-end callback (receives 0-based turn number). + pub fn on_turn_end( + &mut self, + callback: impl Fn(usize) + Send + Sync + 'static, + ) { + self.turn_end_cbs.push(Box::new(callback)); } /// Get a shared tool server handle. @@ -940,8 +959,8 @@ impl Worker { // Notify turn start let current_turn = self.turn_count; debug!(turn = current_turn, "Turn start"); - for notifier in &self.turn_notifiers { - notifier.on_turn_start(current_turn); + for cb in &self.turn_start_cbs { + cb(current_turn); } // Hook: pre_llm_request @@ -952,8 +971,8 @@ impl Worker { match control { PreLlmRequestResult::Cancel(reason) => { info!(reason = %reason, "Aborted by hook"); - for notifier in &self.turn_notifiers { - notifier.on_turn_end(current_turn); + for cb in &self.turn_end_cbs { + cb(current_turn); } self.last_run_interrupted = true; return Err(WorkerError::Aborted(reason)); @@ -1047,8 +1066,8 @@ impl Worker { debug!(event_count = event_count, "Stream completed"); // Notify turn end - for notifier in &self.turn_notifiers { - notifier.on_turn_end(current_turn); + for cb in &self.turn_end_cbs { + cb(current_turn); } self.turn_count += 1; @@ -1151,7 +1170,8 @@ impl Worker { locked_prefix_len: 0, turn_count: 0, max_turns: None, - turn_notifiers: Vec::new(), + turn_start_cbs: Vec::new(), + turn_end_cbs: Vec::new(), request_config: RequestConfig::default(), last_run_interrupted: false, output_processor: None, @@ -1386,7 +1406,8 @@ impl Worker { locked_prefix_len, turn_count: self.turn_count, max_turns: self.max_turns, - turn_notifiers: self.turn_notifiers, + turn_start_cbs: self.turn_start_cbs, + turn_end_cbs: self.turn_end_cbs, request_config: self.request_config, last_run_interrupted: self.last_run_interrupted, output_processor: self.output_processor, @@ -1424,7 +1445,8 @@ impl Worker { locked_prefix_len: 0, turn_count: self.turn_count, max_turns: self.max_turns, - turn_notifiers: self.turn_notifiers, + turn_start_cbs: self.turn_start_cbs, + turn_end_cbs: self.turn_end_cbs, request_config: self.request_config, last_run_interrupted: self.last_run_interrupted, output_processor: self.output_processor, diff --git a/crates/llm-worker/tests/callback_test.rs b/crates/llm-worker/tests/callback_test.rs new file mode 100644 index 00000000..c1c4b71a --- /dev/null +++ b/crates/llm-worker/tests/callback_test.rs @@ -0,0 +1,178 @@ +//! Closure callback API tests +//! +//! Tests for the closure-based event subscription API on Worker. + +mod common; + +use std::sync::{Arc, Mutex}; + +use common::MockLlmClient; +use llm_worker::Worker; +use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent as ClientStatusEvent}; + +// ============================================================================= +// Tests +// ============================================================================= + +/// Verify that on_text_block correctly receives delta and stop events +#[tokio::test] +async fn test_callback_text_block_events() { + let events = vec![ + Event::text_block_start(0), + Event::text_delta(0, "Hello, "), + Event::text_delta(0, "World!"), + Event::text_block_stop(0, None), + Event::Status(ClientStatusEvent { + status: ResponseStatus::Completed, + }), + ]; + + let client = MockLlmClient::new(events); + let mut worker = Worker::new(client); + + let text_deltas = Arc::new(Mutex::new(Vec::new())); + let text_completes = Arc::new(Mutex::new(Vec::new())); + + let deltas = text_deltas.clone(); + let completes = text_completes.clone(); + worker.on_text_block(move |block| { + let d = deltas.clone(); + block.on_delta(move |text| { + d.lock().unwrap().push(text.to_owned()); + }); + let c = completes.clone(); + block.on_stop(move |text| { + c.lock().unwrap().push(text.to_owned()); + }); + }); + + let result = worker.run("Greet me").await; + assert!(result.is_ok(), "Worker should complete: {:?}", result); + + let deltas = text_deltas.lock().unwrap(); + assert_eq!(deltas.len(), 2); + assert_eq!(deltas[0], "Hello, "); + assert_eq!(deltas[1], "World!"); + + let completes = text_completes.lock().unwrap(); + assert_eq!(completes.len(), 1); + assert_eq!(completes[0], "Hello, World!"); +} + +/// Verify that on_tool_use_block correctly receives start info and stop with ToolCall +#[tokio::test] +async fn test_callback_tool_call_complete() { + let events = vec![ + Event::tool_use_start(0, "call_123", "get_weather"), + Event::tool_input_delta(0, r#"{"city":"#), + Event::tool_input_delta(0, r#""Tokyo"}"#), + Event::tool_use_stop(0), + Event::Status(ClientStatusEvent { + status: ResponseStatus::Completed, + }), + ]; + + let client = MockLlmClient::new(events); + let mut worker = Worker::new(client); + + let tool_starts = Arc::new(Mutex::new(Vec::<(String, String)>::new())); + let tool_completes = Arc::new(Mutex::new(Vec::new())); + + let starts = tool_starts.clone(); + let completes = tool_completes.clone(); + worker.on_tool_use_block(move |start, block| { + starts + .lock() + .unwrap() + .push((start.id.clone(), start.name.clone())); + let c = completes.clone(); + block.on_stop(move |call| { + c.lock().unwrap().push(call.clone()); + }); + }); + + let _ = worker.run("Weather please").await; + + let starts = tool_starts.lock().unwrap(); + assert_eq!(starts.len(), 1); + assert_eq!(starts[0].0, "call_123"); + assert_eq!(starts[0].1, "get_weather"); + + let completes = tool_completes.lock().unwrap(); + assert_eq!(completes.len(), 1); + assert_eq!(completes[0].name, "get_weather"); + assert_eq!(completes[0].id, "call_123"); + assert_eq!(completes[0].input["city"], "Tokyo"); +} + +/// Verify that on_turn_start and on_turn_end callbacks are called +#[tokio::test] +async fn test_callback_turn_events() { + let events = vec![ + Event::text_block_start(0), + Event::text_delta(0, "Done!"), + Event::text_block_stop(0, None), + Event::Status(ClientStatusEvent { + status: ResponseStatus::Completed, + }), + ]; + + let client = MockLlmClient::new(events); + let mut worker = Worker::new(client); + + let turn_starts = Arc::new(Mutex::new(Vec::new())); + let turn_ends = Arc::new(Mutex::new(Vec::new())); + + let starts = turn_starts.clone(); + worker.on_turn_start(move |turn| { + starts.lock().unwrap().push(turn); + }); + + let ends = turn_ends.clone(); + worker.on_turn_end(move |turn| { + ends.lock().unwrap().push(turn); + }); + + let result = worker.run("Do something").await; + assert!(result.is_ok()); + + let starts = turn_starts.lock().unwrap(); + let ends = turn_ends.lock().unwrap(); + + assert_eq!(starts.len(), 1); + assert_eq!(starts[0], 0); + + assert_eq!(ends.len(), 1); + assert_eq!(ends[0], 0); +} + +/// Verify that on_usage callback receives usage events +#[tokio::test] +async fn test_callback_usage_events() { + let events = vec![ + Event::text_block_start(0), + Event::text_delta(0, "Hello"), + Event::text_block_stop(0, None), + Event::usage(100, 50), + Event::Status(ClientStatusEvent { + status: ResponseStatus::Completed, + }), + ]; + + let client = MockLlmClient::new(events); + let mut worker = Worker::new(client); + + let usage_events = Arc::new(Mutex::new(Vec::new())); + + let usages = usage_events.clone(); + worker.on_usage(move |event| { + usages.lock().unwrap().push(event.clone()); + }); + + let _ = worker.run("Hello").await; + + let usages = usage_events.lock().unwrap(); + assert_eq!(usages.len(), 1); + assert_eq!(usages[0].input_tokens, Some(100)); + assert_eq!(usages[0].output_tokens, Some(50)); +} diff --git a/crates/llm-worker/tests/subscriber_test.rs b/crates/llm-worker/tests/subscriber_test.rs deleted file mode 100644 index 552aa043..00000000 --- a/crates/llm-worker/tests/subscriber_test.rs +++ /dev/null @@ -1,234 +0,0 @@ -//! WorkerSubscriber tests -//! -//! Tests for subscribing to events using WorkerSubscriber - -mod common; - -use std::sync::{Arc, Mutex}; - -use common::MockLlmClient; -use llm_worker::Worker; -use llm_worker::hook::ToolCall; -use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent as ClientStatusEvent}; -use llm_worker::subscriber::WorkerSubscriber; -use llm_worker::timeline::event::{ErrorEvent, StatusEvent, UsageEvent}; -use llm_worker::timeline::{TextBlockEvent, ToolUseBlockEvent}; - -// ============================================================================= -// Test Subscriber -// ============================================================================= - -/// Simple Subscriber implementation for testing -struct TestSubscriber { - // Recording buffers - text_deltas: Arc>>, - text_completes: Arc>>, - tool_call_completes: Arc>>, - usage_events: Arc>>, - status_events: Arc>>, - turn_starts: Arc>>, - turn_ends: Arc>>, -} - -impl TestSubscriber { - fn new() -> Self { - Self { - text_deltas: Arc::new(Mutex::new(Vec::new())), - text_completes: Arc::new(Mutex::new(Vec::new())), - tool_call_completes: Arc::new(Mutex::new(Vec::new())), - usage_events: Arc::new(Mutex::new(Vec::new())), - status_events: Arc::new(Mutex::new(Vec::new())), - turn_starts: Arc::new(Mutex::new(Vec::new())), - turn_ends: Arc::new(Mutex::new(Vec::new())), - } - } -} - -impl WorkerSubscriber for TestSubscriber { - type TextBlockScope = String; - type ToolUseBlockScope = (); - - fn on_text_block(&mut self, buffer: &mut String, event: &TextBlockEvent) { - if let TextBlockEvent::Delta(text) = event { - buffer.push_str(text); - self.text_deltas.lock().unwrap().push(text.clone()); - } - } - - fn on_text_complete(&mut self, text: &str) { - self.text_completes.lock().unwrap().push(text.to_string()); - } - - fn on_tool_use_block(&mut self, _scope: &mut (), _event: &ToolUseBlockEvent) { - // Process as needed - } - - fn on_tool_call_complete(&mut self, call: &ToolCall) { - self.tool_call_completes.lock().unwrap().push(call.clone()); - } - - fn on_usage(&mut self, event: &UsageEvent) { - self.usage_events.lock().unwrap().push(event.clone()); - } - - fn on_status(&mut self, event: &StatusEvent) { - self.status_events.lock().unwrap().push(event.clone()); - } - - fn on_error(&mut self, _event: &ErrorEvent) { - // Process as needed - } - - fn on_turn_start(&mut self, turn: usize) { - self.turn_starts.lock().unwrap().push(turn); - } - - fn on_turn_end(&mut self, turn: usize) { - self.turn_ends.lock().unwrap().push(turn); - } -} - -// ============================================================================= -// Tests -// ============================================================================= - -/// Verify that WorkerSubscriber correctly receives text block events -#[tokio::test] -async fn test_subscriber_text_block_events() { - // Event sequence containing text response - let events = vec![ - Event::text_block_start(0), - Event::text_delta(0, "Hello, "), - Event::text_delta(0, "World!"), - Event::text_block_stop(0, None), - Event::Status(ClientStatusEvent { - status: ResponseStatus::Completed, - }), - ]; - - let client = MockLlmClient::new(events); - let mut worker = Worker::new(client); - - // Register Subscriber - let subscriber = TestSubscriber::new(); - let text_deltas = subscriber.text_deltas.clone(); - let text_completes = subscriber.text_completes.clone(); - worker.subscribe(subscriber); - - // Execute - let result = worker.run("Greet me").await; - - assert!(result.is_ok(), "Worker should complete: {:?}", result); - - // Verify deltas were collected - let deltas = text_deltas.lock().unwrap(); - assert_eq!(deltas.len(), 2); - assert_eq!(deltas[0], "Hello, "); - assert_eq!(deltas[1], "World!"); - - // Verify complete text was collected - let completes = text_completes.lock().unwrap(); - assert_eq!(completes.len(), 1); - assert_eq!(completes[0], "Hello, World!"); -} - -/// Verify that WorkerSubscriber correctly receives tool call complete events -#[tokio::test] -async fn test_subscriber_tool_call_complete() { - // Event sequence containing tool call - let events = vec![ - Event::tool_use_start(0, "call_123", "get_weather"), - Event::tool_input_delta(0, r#"{"city":"#), - Event::tool_input_delta(0, r#""Tokyo"}"#), - Event::tool_use_stop(0), - Event::Status(ClientStatusEvent { - status: ResponseStatus::Completed, - }), - ]; - - let client = MockLlmClient::new(events); - let mut worker = Worker::new(client); - - // Register Subscriber - let subscriber = TestSubscriber::new(); - let tool_call_completes = subscriber.tool_call_completes.clone(); - worker.subscribe(subscriber); - - // Execute - let _ = worker.run("Weather please").await; - - // Verify tool call complete was collected - let completes = tool_call_completes.lock().unwrap(); - assert_eq!(completes.len(), 1); - assert_eq!(completes[0].name, "get_weather"); - assert_eq!(completes[0].id, "call_123"); - assert_eq!(completes[0].input["city"], "Tokyo"); -} - -/// Verify that WorkerSubscriber correctly receives turn events -#[tokio::test] -async fn test_subscriber_turn_events() { - let events = vec![ - Event::text_block_start(0), - Event::text_delta(0, "Done!"), - Event::text_block_stop(0, None), - Event::Status(ClientStatusEvent { - status: ResponseStatus::Completed, - }), - ]; - - let client = MockLlmClient::new(events); - let mut worker = Worker::new(client); - - // Register Subscriber - let subscriber = TestSubscriber::new(); - let turn_starts = subscriber.turn_starts.clone(); - let turn_ends = subscriber.turn_ends.clone(); - worker.subscribe(subscriber); - - // Execute - let result = worker.run("Do something").await; - - assert!(result.is_ok()); - - // Verify turn events were collected - let starts = turn_starts.lock().unwrap(); - let ends = turn_ends.lock().unwrap(); - - assert_eq!(starts.len(), 1); - assert_eq!(starts[0], 0); // First turn - - assert_eq!(ends.len(), 1); - assert_eq!(ends[0], 0); -} - -/// Verify that WorkerSubscriber correctly receives Usage events -#[tokio::test] -async fn test_subscriber_usage_events() { - let events = vec![ - Event::text_block_start(0), - Event::text_delta(0, "Hello"), - Event::text_block_stop(0, None), - Event::usage(100, 50), - Event::Status(ClientStatusEvent { - status: ResponseStatus::Completed, - }), - ]; - - let client = MockLlmClient::new(events); - let mut worker = Worker::new(client); - - // Register Subscriber - let subscriber = TestSubscriber::new(); - let usage_events = subscriber.usage_events.clone(); - worker.subscribe(subscriber); - - // Execute - let _ = worker.run("Hello").await; - - // Verify Usage events were collected - let usages = usage_events.lock().unwrap(); - assert_eq!(usages.len(), 1); - assert_eq!(usages[0].input_tokens, Some(100)); - assert_eq!(usages[0].output_tokens, Some(50)); -} diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 666371b1..bd8478d8 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -1,11 +1,7 @@ use std::path::Path; use std::sync::Arc; -use llm_worker::hook::ToolCall; use llm_worker::llm_client::client::LlmClient; -use llm_worker::subscriber::WorkerSubscriber; -use llm_worker::timeline::event::{ErrorEvent, UsageEvent}; -use llm_worker::timeline::{TextBlockEvent, ToolUseBlockEvent}; use llm_worker::WorkerError; use llm_worker_persistence::Store; use tokio::sync::{broadcast, mpsc}; @@ -87,11 +83,79 @@ impl PodController { // Keep the server alive by moving it into the controller task // (it will be dropped when the task ends) - // Register the event bridge subscriber on the worker - let bridge = EventBridgeSubscriber { - event_tx: event_tx.clone(), - }; - pod.session_mut().worker.subscribe(bridge); + // Register event bridge callbacks on the worker + { + let worker = &mut pod.session_mut().worker; + + let tx = event_tx.clone(); + worker.on_turn_start(move |turn| { + let _ = tx.send(Event::TurnStart { turn }); + }); + + let tx = event_tx.clone(); + worker.on_turn_end(move |turn| { + let _ = tx.send(Event::TurnEnd { + turn, + result: TurnResult::Finished, + }); + }); + + let tx = event_tx.clone(); + worker.on_text_block(move |block| { + let tx_d = tx.clone(); + block.on_delta(move |text| { + let _ = tx_d.send(Event::TextDelta { + text: text.to_owned(), + }); + }); + let tx_s = tx.clone(); + block.on_stop(move |text| { + let _ = tx_s.send(Event::TextDone { + text: text.to_owned(), + }); + }); + }); + + let tx = event_tx.clone(); + worker.on_tool_use_block(move |start, block| { + let _ = tx.send(Event::ToolCallStart { + id: start.id.clone(), + name: start.name.clone(), + }); + let id_for_delta = start.id.clone(); + let tx_d = tx.clone(); + block.on_delta(move |json| { + let _ = tx_d.send(Event::ToolCallArgsDelta { + id: id_for_delta.clone(), + json: json.to_owned(), + }); + }); + let tx_s = tx.clone(); + block.on_stop(move |call| { + let _ = tx_s.send(Event::ToolCallDone { + id: call.id.clone(), + name: call.name.clone(), + arguments: call.input.to_string(), + }); + }); + }); + + let tx = event_tx.clone(); + worker.on_usage(move |event| { + let _ = tx.send(Event::Usage { + input_tokens: event.input_tokens, + output_tokens: event.output_tokens, + }); + }); + + let tx = event_tx.clone(); + worker.on_error(move |event| { + let _ = tx.send(Event::Error { + code: ErrorCode::ProviderError, + message: event.message.clone(), + }); + }); + } // Clone cancel sender before moving pod let cancel_tx = pod.session_mut().worker.cancel_sender(); @@ -252,83 +316,3 @@ fn worker_error_code(e: &PodError) -> ErrorCode { } } -// --------------------------------------------------------------------------- -// EventBridgeSubscriber — bridges Worker events to broadcast channel -// --------------------------------------------------------------------------- - -struct EventBridgeSubscriber { - event_tx: broadcast::Sender, -} - -impl WorkerSubscriber for EventBridgeSubscriber { - type TextBlockScope = (); - type ToolUseBlockScope = (); - - fn on_turn_start(&mut self, turn: usize) { - let _ = self.event_tx.send(Event::TurnStart { turn }); - } - - fn on_turn_end(&mut self, turn: usize) { - let _ = self.event_tx.send(Event::TurnEnd { - turn, - result: TurnResult::Finished, - }); - } - - fn on_text_block(&mut self, _scope: &mut (), event: &TextBlockEvent) { - match event { - TextBlockEvent::Delta(text) => { - let _ = self.event_tx.send(Event::TextDelta { - text: text.clone(), - }); - } - TextBlockEvent::Start(_) | TextBlockEvent::Stop(_) => {} - } - } - - fn on_text_complete(&mut self, text: &str) { - let _ = self.event_tx.send(Event::TextDone { - text: text.to_owned(), - }); - } - - fn on_tool_use_block(&mut self, _scope: &mut (), event: &ToolUseBlockEvent) { - match event { - ToolUseBlockEvent::Start(start) => { - let _ = self.event_tx.send(Event::ToolCallStart { - id: start.id.clone(), - name: start.name.clone(), - }); - } - ToolUseBlockEvent::InputJsonDelta(json) => { - let _ = self.event_tx.send(Event::ToolCallArgsDelta { - id: String::new(), - json: json.clone(), - }); - } - ToolUseBlockEvent::Stop(_) => {} - } - } - - fn on_tool_call_complete(&mut self, call: &ToolCall) { - let _ = self.event_tx.send(Event::ToolCallDone { - id: call.id.clone(), - name: call.name.clone(), - arguments: call.input.to_string(), - }); - } - - fn on_usage(&mut self, event: &UsageEvent) { - let _ = self.event_tx.send(Event::Usage { - input_tokens: event.input_tokens, - output_tokens: event.output_tokens, - }); - } - - fn on_error(&mut self, event: &ErrorEvent) { - let _ = self.event_tx.send(Event::Error { - code: ErrorCode::ProviderError, - message: event.message.clone(), - }); - } -} diff --git a/tickets/subscriber-closure-api.md b/tickets/subscriber-closure-api.md deleted file mode 100644 index 69a04f2b..00000000 --- a/tickets/subscriber-closure-api.md +++ /dev/null @@ -1,43 +0,0 @@ -# Subscriber API: クロージャベースのスコープ表現 - -## 背景 - -Block系イベントは時間的に排他(TextBlock中にToolUseBlockは来ない)で、 -Meta系イベント(Usage等)はいつでも流れ得る。 -現行の `Handler` + `Scope: Default` はこの保証を実現しているが、 -ユーザーから見ると Kind/Scope/型消去のボイラープレートが重く、 -`Timeline` と `WorkerSubscriber` の2層が「どちらを使えばいいか」分かりにくい。 - -## 方針 - -クロージャでスコープの寿命を表現し、ブロックの時間的排他性を Rust の借用で自然に保証する。 - -```rust -// Block系: クロージャ引数 = スコープの寿命保証 -worker.on_text_block(|block| { - block.on_delta(|text| print!("{}", text)); - block.on_stop(|reason| println!("\n---")); -}); - -worker.on_tool_use_block(|block| { - block.on_delta(|json| { /* ... */ }); - block.on_stop(|call| { /* ... */ }); -}); - -// Meta系: スコープ不要(いつでも来る) -worker.on_usage(|usage| { /* ... */ }); -``` - -## 設計ポイント - -- ブロックのスコープ = クロージャの借用寿命。ユーザーは `Kind` / `Scope: Default` を知らなくていい -- Block系の排他性が「ブロックが始まったらクロージャが呼ばれ、終わったら抜ける」という直感に一致する -- Meta系はフラットなコールバック。スコープ管理不要 -- 内部的には現行の `Handler` + `Timeline` ディスパッチ機構を維持し、クロージャを Handler に変換するアダプタを挟む -- `WorkerSubscriber` トレイト + 5種の SubscriberAdapter ボイラープレートが不要になる - -## 現行からの変更 - -- `Timeline` は `pub mod` → 内部実装に格下げ(Worker の実装詳細に閉じ込める) -- `WorkerSubscriber` トレイト → 廃止。クロージャ登録 API に置き換え -- `Handler` トレイトは内部で維持(クロージャからの変換先として)