llm-workerのAPI改善

This commit is contained in:
Keisuke Hirata 2026-04-11 14:11:40 +09:00
parent 3883fab29d
commit 89481c2c82
11 changed files with 582 additions and 828 deletions

View File

@ -16,8 +16,8 @@ LLM との対話を管理する低レベル基盤クレート。会話履歴、
- `tool` — ツール定義・実行(`Tool` トレイト、`ToolDefinition`, `ToolOutput`, サイズ判定による Inline/Stored 切替) - `tool` — ツール定義・実行(`Tool` トレイト、`ToolDefinition`, `ToolOutput`, サイズ判定による Inline/Stored 切替)
- `tool_server` — ツール登録・ルックアップ(`ToolServer`, `ToolServerHandle` - `tool_server` — ツール登録・ルックアップ(`ToolServer`, `ToolServerHandle`
- `hook` — 実行フローへの介入ポイント(`Hook` トレイト、`PreToolCall`, `PostToolCall`, `OnTurnEnd` など) - `hook` — 実行フローへの介入ポイント(`Hook` トレイト、`PreToolCall`, `PostToolCall`, `OnTurnEnd` など)
- `subscriber` — リアルタイムイベント購読(`WorkerSubscriber` トレイト - クロージャベースイベント購読(`Worker::on_text_block()`, `on_tool_use_block()`, `on_usage()`
- `timeline` — イベントストリームのディスパッチ(`Handler` トレイト、各ブロックコレクター) - `timeline` — イベントストリームのディスパッチ(`Handler` トレイト、各ブロックコレクター)。パワーユーザー向けに `timeline_mut()` も提供
- `event` — ストリーミングイベント型(`Event`, `BlockStart`, `BlockDelta` など) - `event` — ストリーミングイベント型(`Event`, `BlockStart`, `BlockDelta` など)
- `state` — 型状態パターンによるキャッシュ保護(`Mutable` / `CacheLocked` - `state` — 型状態パターンによるキャッシュ保護(`Mutable` / `CacheLocked`
cratesの整理Add READMEsRE to all crates@@ cratesの整理Add READMEsRE to all crates@@

View File

@ -33,7 +33,7 @@ llm-workerは3層構成でLLMとのインタラクションを管理する。
| `tool` / `tool_server` | ツール定義・登録・実行 | R3 | | `tool` / `tool_server` | ツール定義・登録・実行 | R3 |
| `timeline` | イベントストリーム処理、Handler dispatch | — | | `timeline` | イベントストリーム処理、Handler dispatch | — |
| `handler` | Handler/Kind trait、ブロック別ハンドラ | — | | `handler` | Handler/Kind trait、ブロック別ハンドラ | — |
| `subscriber` | WorkerSubscriber trait、UI向けイベント配信 | — | | `callback` | クロージャベースイベント購読(`on_text_block`, `on_usage` 等) | — |
| `llm_client` | LLMプロバイダへのHTTPリクエスト/ストリーミング | — | | `llm_client` | LLMプロバイダへのHTTPリクエスト/ストリーミング | — |
| `llm_client/scheme` | プロバイダ固有ワイヤーフォーマット変換 | — | | `llm_client/scheme` | プロバイダ固有ワイヤーフォーマット変換 | — |
| `llm_client/providers` | Anthropic, OpenAI, Gemini, Ollama実装 | — | | `llm_client/providers` | Anthropic, OpenAI, Gemini, Ollama実装 | — |

View File

@ -0,0 +1,216 @@
//! Closure-based event callback API
//!
//! Provides a closure-based alternative to implementing `Handler<K>` directly.
//! Register callbacks on `Worker` via `on_text_block()`, `on_tool_use_block()`,
//! `on_usage()`, etc.
use std::marker::PhantomData;
use crate::handler::{
Handler, Kind, TextBlockEvent, TextBlockKind, ToolUseBlockEvent, ToolUseBlockKind,
ToolUseBlockStart,
};
use crate::hook::ToolCall;
// =============================================================================
// TextBlock Closure Handler
// =============================================================================
/// Callback scope for a text block.
///
/// Passed to the setup closure registered with `Worker::on_text_block()`.
/// Register per-block callbacks via `on_delta()` and `on_stop()`.
///
/// # Examples
///
/// ```ignore
/// worker.on_text_block(|block| {
/// block.on_delta(|text| print!("{}", text));
/// block.on_stop(|full_text| println!("\n--- {} chars ---", full_text.len()));
/// });
/// ```
pub struct TextBlockScope {
pub(crate) on_delta: Option<Box<dyn FnMut(&str) + Send + Sync>>,
pub(crate) on_stop: Option<Box<dyn FnMut(&str) + Send + Sync>>,
}
impl TextBlockScope {
fn new() -> Self {
Self {
on_delta: None,
on_stop: None,
}
}
/// Register a callback for each text delta (streaming fragment).
pub fn on_delta(&mut self, f: impl FnMut(&str) + Send + Sync + 'static) {
self.on_delta = Some(Box::new(f));
}
/// Register a callback invoked when the block completes.
///
/// Receives the full accumulated text of the block.
pub fn on_stop(&mut self, f: impl FnMut(&str) + Send + Sync + 'static) {
self.on_stop = Some(Box::new(f));
}
}
/// Per-block state created by Timeline's scope lifecycle.
#[derive(Default)]
pub(crate) struct TextBlockClosureState {
on_delta: Option<Box<dyn FnMut(&str) + Send + Sync>>,
on_stop: Option<Box<dyn FnMut(&str) + Send + Sync>>,
buffer: String,
}
/// Closure-based `Handler<TextBlockKind>` adapter.
pub(crate) struct ClosureTextBlockHandler {
pub(crate) setup: Box<dyn FnMut(&mut TextBlockScope) + Send + Sync>,
}
impl Handler<TextBlockKind> for ClosureTextBlockHandler {
type Scope = TextBlockClosureState;
fn on_event(&mut self, scope: &mut Self::Scope, event: &TextBlockEvent) {
match event {
TextBlockEvent::Start(_) => {
scope.buffer.clear();
let mut builder = TextBlockScope::new();
(self.setup)(&mut builder);
scope.on_delta = builder.on_delta;
scope.on_stop = builder.on_stop;
}
TextBlockEvent::Delta(text) => {
scope.buffer.push_str(text);
if let Some(f) = &mut scope.on_delta {
f(text);
}
}
TextBlockEvent::Stop(_) => {
if let Some(f) = &mut scope.on_stop {
f(&scope.buffer);
}
}
}
}
}
// =============================================================================
// ToolUseBlock Closure Handler
// =============================================================================
/// Callback scope for a tool use block.
///
/// Passed to the setup closure registered with `Worker::on_tool_use_block()`.
/// The setup closure also receives `&ToolUseBlockStart` with `id` and `name`.
///
/// # Examples
///
/// ```ignore
/// worker.on_tool_use_block(|start, block| {
/// println!("Tool: {} ({})", start.name, start.id);
/// block.on_delta(|json| { /* streaming JSON fragment */ });
/// block.on_stop(|call| println!("Done: {}", call.name));
/// });
/// ```
pub struct ToolUseBlockScope {
pub(crate) on_delta: Option<Box<dyn FnMut(&str) + Send + Sync>>,
pub(crate) on_stop: Option<Box<dyn FnMut(&ToolCall) + Send + Sync>>,
}
impl ToolUseBlockScope {
fn new() -> Self {
Self {
on_delta: None,
on_stop: None,
}
}
/// Register a callback for each JSON input delta (streaming fragment).
pub fn on_delta(&mut self, f: impl FnMut(&str) + Send + Sync + 'static) {
self.on_delta = Some(Box::new(f));
}
/// Register a callback invoked when the block completes.
///
/// Receives the fully assembled `ToolCall` with parsed JSON input.
pub fn on_stop(&mut self, f: impl FnMut(&ToolCall) + Send + Sync + 'static) {
self.on_stop = Some(Box::new(f));
}
}
/// Per-block state for tool use closure handler.
#[derive(Default)]
pub(crate) struct ToolUseBlockClosureState {
on_delta: Option<Box<dyn FnMut(&str) + Send + Sync>>,
on_stop: Option<Box<dyn FnMut(&ToolCall) + Send + Sync>>,
id: String,
name: String,
input_json: String,
}
/// Closure-based `Handler<ToolUseBlockKind>` adapter.
pub(crate) struct ClosureToolUseBlockHandler {
pub(crate) setup: Box<dyn FnMut(&ToolUseBlockStart, &mut ToolUseBlockScope) + Send + Sync>,
}
impl Handler<ToolUseBlockKind> for ClosureToolUseBlockHandler {
type Scope = ToolUseBlockClosureState;
fn on_event(&mut self, scope: &mut Self::Scope, event: &ToolUseBlockEvent) {
match event {
ToolUseBlockEvent::Start(start) => {
scope.id = start.id.clone();
scope.name = start.name.clone();
scope.input_json.clear();
let mut builder = ToolUseBlockScope::new();
(self.setup)(start, &mut builder);
scope.on_delta = builder.on_delta;
scope.on_stop = builder.on_stop;
}
ToolUseBlockEvent::InputJsonDelta(json) => {
scope.input_json.push_str(json);
if let Some(f) = &mut scope.on_delta {
f(json);
}
}
ToolUseBlockEvent::Stop(_) => {
let input: serde_json::Value =
serde_json::from_str(&scope.input_json).unwrap_or_default();
let tool_call = ToolCall {
id: std::mem::take(&mut scope.id),
name: std::mem::take(&mut scope.name),
input,
};
if let Some(f) = &mut scope.on_stop {
f(&tool_call);
}
}
}
}
}
// =============================================================================
// Generic Meta Event Closure Handler
// =============================================================================
/// Closure-based `Handler<K>` adapter for meta events (Usage, Status, Error).
pub(crate) struct ClosureMetaHandler<F, K>
where
K: Kind,
{
pub(crate) callback: F,
pub(crate) _kind: PhantomData<K>,
}
impl<F, K> Handler<K> for ClosureMetaHandler<F, K>
where
F: FnMut(&K::Event) + Send + Sync,
K: Kind,
{
type Scope = ();
fn on_event(&mut self, _scope: &mut (), event: &K::Event) {
(self.callback)(event);
}
}

View File

@ -7,7 +7,7 @@
//! - [`Worker`] - Central component for managing LLM interactions //! - [`Worker`] - Central component for managing LLM interactions
//! - [`tool::Tool`] - Tools that can be invoked by the LLM //! - [`tool::Tool`] - Tools that can be invoked by the LLM
//! - [`hook::Hook`] - Hooks for intercepting turn progression //! - [`hook::Hook`] - Hooks for intercepting turn progression
//! - [`subscriber::WorkerSubscriber`] - Subscribing to streaming events //! - Closure-based event callbacks via `Worker::on_text_block()`, `on_tool_use_block()`, etc.
//! //!
//! # Quick Start //! # Quick Start
//! //!
@ -39,14 +39,16 @@ mod handler;
mod message; mod message;
mod worker; mod worker;
pub(crate) mod callback;
pub mod event; pub mod event;
pub mod hook; pub mod hook;
pub mod llm_client; pub mod llm_client;
pub mod state; pub mod state;
pub mod subscriber;
pub mod timeline; pub mod timeline;
pub mod tool; pub mod tool;
pub mod tool_server; pub mod tool_server;
pub use callback::{TextBlockScope, ToolUseBlockScope};
pub use handler::ToolUseBlockStart;
pub use message::{ContentPart, Item, Message, Role}; pub use message::{ContentPart, Item, Message, Role};
pub use worker::{ToolRegistryError, Worker, WorkerConfig, WorkerError, WorkerResult}; pub use worker::{ToolRegistryError, Worker, WorkerConfig, WorkerError, WorkerResult};

View File

@ -1,371 +0,0 @@
//! Event Subscription
//!
//! Trait for receiving streaming events from LLM in real-time.
//! Used for stream display to UI and progress display.
use std::sync::{Arc, Mutex};
use crate::{
handler::{
ErrorKind, Handler, StatusKind, TextBlockEvent, TextBlockKind, ToolUseBlockEvent,
ToolUseBlockKind, UsageKind,
},
hook::ToolCall,
timeline::event::{ErrorEvent, StatusEvent, UsageEvent},
};
// =============================================================================
// WorkerSubscriber Trait
// =============================================================================
/// Trait for subscribing to streaming events from LLM
///
/// When registered with Worker, you can receive events from text generation
/// and tool calls in real-time. Ideal for stream display to UI.
///
/// # Available Events
///
/// - **Block events**: Text, tool use (with scope)
/// - **Meta events**: Usage, status, error
/// - **Completion events**: Text complete, tool call complete
/// - **Turn control**: Turn start, turn end
///
/// # Examples
///
/// ```ignore
/// use llm_worker::subscriber::WorkerSubscriber;
/// use llm_worker::timeline::TextBlockEvent;
///
/// struct StreamPrinter;
///
/// impl WorkerSubscriber for StreamPrinter {
/// type TextBlockScope = ();
/// type ToolUseBlockScope = ();
///
/// fn on_text_block(&mut self, _: &mut (), event: &TextBlockEvent) {
/// if let TextBlockEvent::Delta(text) = event {
/// print!("{}", text); // Real-time output
/// }
/// }
///
/// fn on_text_complete(&mut self, text: &str) {
/// println!("\n--- Complete: {} chars ---", text.len());
/// }
/// }
///
/// // Register with Worker
/// worker.subscribe(StreamPrinter);
/// ```
pub trait WorkerSubscriber: Send {
// =========================================================================
// Scope Types (for block events)
// =========================================================================
/// Scope type for text block processing
///
/// Generated with Default::default() at block start,
/// destroyed at block end.
type TextBlockScope: Default + Send + Sync;
/// Scope type for tool use block processing
type ToolUseBlockScope: Default + Send + Sync;
// =========================================================================
// Block Events (with scope management)
// =========================================================================
/// Text block event
///
/// Has Start/Delta/Stop lifecycle.
/// Scope is generated at block start and destroyed at end.
#[allow(unused_variables)]
fn on_text_block(&mut self, scope: &mut Self::TextBlockScope, event: &TextBlockEvent) {}
/// Tool use block event
///
/// Has Start/InputJsonDelta/Stop lifecycle.
#[allow(unused_variables)]
fn on_tool_use_block(
&mut self,
scope: &mut Self::ToolUseBlockScope,
event: &ToolUseBlockEvent,
) {
}
// =========================================================================
// Single Events (no scope needed)
// =========================================================================
/// Usage event
#[allow(unused_variables)]
fn on_usage(&mut self, event: &UsageEvent) {}
/// Status event
#[allow(unused_variables)]
fn on_status(&mut self, event: &StatusEvent) {}
/// Error event
#[allow(unused_variables)]
fn on_error(&mut self, event: &ErrorEvent) {}
// =========================================================================
// Accumulated Events (added in Worker layer)
// =========================================================================
/// Text complete event
///
/// When a text block completes, the entire accumulated text is passed.
/// Convenient for receiving the final result after block processing.
#[allow(unused_variables)]
fn on_text_complete(&mut self, text: &str) {}
/// Tool call complete event
///
/// When a tool use block completes, the complete ToolCall is passed.
#[allow(unused_variables)]
fn on_tool_call_complete(&mut self, call: &ToolCall) {}
// =========================================================================
// Turn Control
// =========================================================================
/// On turn start
///
/// `turn` is a 0-based turn number.
#[allow(unused_variables)]
fn on_turn_start(&mut self, turn: usize) {}
/// On turn end
#[allow(unused_variables)]
fn on_turn_end(&mut self, turn: usize) {}
}
// =============================================================================
// SubscriberAdapter - Bridge WorkerSubscriber to Timeline handlers
// =============================================================================
// =============================================================================
// TextBlock Handler Adapter
// =============================================================================
/// Subscriber adapter for TextBlockKind
pub(crate) struct TextBlockSubscriberAdapter<S: WorkerSubscriber> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber> TextBlockSubscriberAdapter<S> {
pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
Self { subscriber }
}
}
impl<S: WorkerSubscriber> Clone for TextBlockSubscriberAdapter<S> {
fn clone(&self) -> Self {
Self {
subscriber: self.subscriber.clone(),
}
}
}
/// Wrapper for TextBlock scope
pub struct TextBlockScopeWrapper<S: WorkerSubscriber> {
inner: S::TextBlockScope,
buffer: String, // Buffer for on_text_complete
}
impl<S: WorkerSubscriber> Default for TextBlockScopeWrapper<S> {
fn default() -> Self {
Self {
inner: S::TextBlockScope::default(),
buffer: String::new(),
}
}
}
impl<S: WorkerSubscriber + 'static> Handler<TextBlockKind> for TextBlockSubscriberAdapter<S> {
type Scope = TextBlockScopeWrapper<S>;
fn on_event(&mut self, scope: &mut Self::Scope, event: &TextBlockEvent) {
// Accumulate deltas into buffer
if let TextBlockEvent::Delta(text) = event {
scope.buffer.push_str(text);
}
// Call Subscriber's TextBlock event handler
if let Ok(mut subscriber) = self.subscriber.lock() {
subscriber.on_text_block(&mut scope.inner, event);
// Also call on_text_complete on Stop
if matches!(event, TextBlockEvent::Stop(_)) {
subscriber.on_text_complete(&scope.buffer);
}
}
}
}
// =============================================================================
// ToolUseBlock Handler Adapter
// =============================================================================
/// Subscriber adapter for ToolUseBlockKind
pub(crate) struct ToolUseBlockSubscriberAdapter<S: WorkerSubscriber> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber> ToolUseBlockSubscriberAdapter<S> {
pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
Self { subscriber }
}
}
impl<S: WorkerSubscriber> Clone for ToolUseBlockSubscriberAdapter<S> {
fn clone(&self) -> Self {
Self {
subscriber: self.subscriber.clone(),
}
}
}
/// Wrapper for ToolUseBlock scope
pub struct ToolUseBlockScopeWrapper<S: WorkerSubscriber> {
inner: S::ToolUseBlockScope,
id: String,
name: String,
input_json: String, // JSON accumulation
}
impl<S: WorkerSubscriber> Default for ToolUseBlockScopeWrapper<S> {
fn default() -> Self {
Self {
inner: S::ToolUseBlockScope::default(),
id: String::new(),
name: String::new(),
input_json: String::new(),
}
}
}
impl<S: WorkerSubscriber + 'static> Handler<ToolUseBlockKind> for ToolUseBlockSubscriberAdapter<S> {
type Scope = ToolUseBlockScopeWrapper<S>;
fn on_event(&mut self, scope: &mut Self::Scope, event: &ToolUseBlockEvent) {
// Save metadata on Start
if let ToolUseBlockEvent::Start(start) = event {
scope.id = start.id.clone();
scope.name = start.name.clone();
}
// Accumulate InputJsonDelta into buffer
if let ToolUseBlockEvent::InputJsonDelta(json) = event {
scope.input_json.push_str(json);
}
// Call Subscriber's ToolUseBlock event handler
if let Ok(mut subscriber) = self.subscriber.lock() {
subscriber.on_tool_use_block(&mut scope.inner, event);
// Also call on_tool_call_complete on Stop
if matches!(event, ToolUseBlockEvent::Stop(_)) {
let input: serde_json::Value =
serde_json::from_str(&scope.input_json).unwrap_or_default();
let tool_call = ToolCall {
id: scope.id.clone(),
name: scope.name.clone(),
input,
};
subscriber.on_tool_call_complete(&tool_call);
}
}
}
}
// =============================================================================
// Meta Event Handler Adapters
// =============================================================================
/// Subscriber adapter for UsageKind
pub(crate) struct UsageSubscriberAdapter<S: WorkerSubscriber> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber> UsageSubscriberAdapter<S> {
pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
Self { subscriber }
}
}
impl<S: WorkerSubscriber> Clone for UsageSubscriberAdapter<S> {
fn clone(&self) -> Self {
Self {
subscriber: self.subscriber.clone(),
}
}
}
impl<S: WorkerSubscriber + 'static> Handler<UsageKind> for UsageSubscriberAdapter<S> {
type Scope = ();
fn on_event(&mut self, _scope: &mut Self::Scope, event: &UsageEvent) {
if let Ok(mut subscriber) = self.subscriber.lock() {
subscriber.on_usage(event);
}
}
}
/// Subscriber adapter for StatusKind
pub(crate) struct StatusSubscriberAdapter<S: WorkerSubscriber> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber> StatusSubscriberAdapter<S> {
pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
Self { subscriber }
}
}
impl<S: WorkerSubscriber> Clone for StatusSubscriberAdapter<S> {
fn clone(&self) -> Self {
Self {
subscriber: self.subscriber.clone(),
}
}
}
impl<S: WorkerSubscriber + 'static> Handler<StatusKind> for StatusSubscriberAdapter<S> {
type Scope = ();
fn on_event(&mut self, _scope: &mut Self::Scope, event: &StatusEvent) {
if let Ok(mut subscriber) = self.subscriber.lock() {
subscriber.on_status(event);
}
}
}
/// Subscriber adapter for ErrorKind
pub(crate) struct ErrorSubscriberAdapter<S: WorkerSubscriber> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber> ErrorSubscriberAdapter<S> {
pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
Self { subscriber }
}
}
impl<S: WorkerSubscriber> Clone for ErrorSubscriberAdapter<S> {
fn clone(&self) -> Self {
Self {
subscriber: self.subscriber.clone(),
}
}
}
impl<S: WorkerSubscriber + 'static> Handler<ErrorKind> for ErrorSubscriberAdapter<S> {
type Scope = ();
fn on_event(&mut self, _scope: &mut Self::Scope, event: &ErrorEvent) {
if let Ok(mut subscriber) = self.subscriber.lock() {
subscriber.on_error(event);
}
}
}

View File

@ -17,7 +17,7 @@ mod tool_call_collector;
// 公開API // 公開API
pub use event::*; pub use event::*;
pub use text_block_collector::TextBlockCollector; pub use text_block_collector::TextBlockCollector;
pub use timeline::{ErasedHandler, HandlerWrapper, Timeline}; pub use timeline::Timeline;
pub use tool_call_collector::ToolCallCollector; pub use tool_call_collector::ToolCallCollector;
// 型定義からのre-export // 型定義からのre-export

View File

@ -1,6 +1,6 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::{Arc, Mutex}; use std::sync::Arc;
use futures::StreamExt; use futures::StreamExt;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -18,11 +18,13 @@ use crate::{
}, },
llm_client::{ClientError, ConfigWarning, LlmClient, Request, RequestConfig, ToolDefinition}, llm_client::{ClientError, ConfigWarning, LlmClient, Request, RequestConfig, ToolDefinition},
state::{CacheLocked, Mutable, WorkerState}, state::{CacheLocked, Mutable, WorkerState},
subscriber::{ callback::{
ErrorSubscriberAdapter, StatusSubscriberAdapter, TextBlockSubscriberAdapter, ClosureMetaHandler, ClosureTextBlockHandler, ClosureToolUseBlockHandler, TextBlockScope,
ToolUseBlockSubscriberAdapter, UsageSubscriberAdapter, WorkerSubscriber, ToolUseBlockScope,
}, },
handler::{ErrorKind, StatusKind, ToolUseBlockStart, UsageKind},
timeline::{TextBlockCollector, Timeline, ToolCallCollector}, timeline::{TextBlockCollector, Timeline, ToolCallCollector},
timeline::event::{ErrorEvent, StatusEvent, UsageEvent},
tool::{ToolDefinition as WorkerToolDefinition, ToolError, ToolOutputProcessor}, tool::{ToolDefinition as WorkerToolDefinition, ToolError, ToolOutputProcessor},
tool_server::{ToolServer, ToolServerError, ToolServerHandle}, tool_server::{ToolServer, ToolServerError, ToolServerHandle},
}; };
@ -94,34 +96,6 @@ enum ToolExecutionResult {
Paused, Paused,
} }
// =============================================================================
// Turn Control Callback Storage
// =============================================================================
/// Callback for notifying turn events (type-erased)
trait TurnNotifier: Send + Sync {
fn on_turn_start(&self, turn: usize);
fn on_turn_end(&self, turn: usize);
}
struct SubscriberTurnNotifier<S: WorkerSubscriber + 'static> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber + 'static> TurnNotifier for SubscriberTurnNotifier<S> {
fn on_turn_start(&self, turn: usize) {
if let Ok(mut s) = self.subscriber.lock() {
s.on_turn_start(turn);
}
}
fn on_turn_end(&self, turn: usize) {
if let Ok(mut s) = self.subscriber.lock() {
s.on_turn_end(turn);
}
}
}
// ============================================================================= // =============================================================================
// Worker // Worker
// ============================================================================= // =============================================================================
@ -183,8 +157,10 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
turn_count: usize, turn_count: usize,
/// Maximum number of turns (None = unlimited) /// Maximum number of turns (None = unlimited)
max_turns: Option<u32>, max_turns: Option<u32>,
/// Turn notification callbacks /// Turn-start callbacks
turn_notifiers: Vec<Box<dyn TurnNotifier>>, turn_start_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
/// Turn-end callbacks
turn_end_cbs: Vec<Box<dyn Fn(usize) + Send + Sync>>,
/// Request configuration (max_tokens, temperature, etc.) /// Request configuration (max_tokens, temperature, etc.)
request_config: RequestConfig, request_config: RequestConfig,
/// Whether the previous run was interrupted /// Whether the previous run was interrupted
@ -256,59 +232,102 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
} }
} }
/// Register an event subscriber /// Register a text block observer with scoped callbacks.
/// ///
/// Registered subscribers receive streaming events from the LLM /// The setup closure is called once per text block. Inside it, register
/// in real-time. Useful for streaming display to UI. /// `on_delta` and/or `on_stop` callbacks on the provided scope.
///
/// # Available Events
///
/// - **Block events**: `on_text_block`, `on_tool_use_block`
/// - **Meta events**: `on_usage`, `on_status`, `on_error`
/// - **Completion events**: `on_text_complete`, `on_tool_call_complete`
/// - **Turn control**: `on_turn_start`, `on_turn_end`
/// ///
/// # Examples /// # Examples
/// ///
/// ```ignore /// ```ignore
/// use llm_worker::{Worker, WorkerSubscriber, TextBlockEvent}; /// worker.on_text_block(|block| {
/// /// block.on_delta(|text| print!("{}", text));
/// struct MyPrinter; /// block.on_stop(|full_text| println!("\n--- {} chars ---", full_text.len()));
/// impl WorkerSubscriber for MyPrinter { /// });
/// type TextBlockScope = ();
/// type ToolUseBlockScope = ();
///
/// fn on_text_block(&mut self, _: &mut (), event: &TextBlockEvent) {
/// if let TextBlockEvent::Delta(text) = event {
/// print!("{}", text);
/// }
/// }
/// }
///
/// worker.subscribe(MyPrinter);
/// ``` /// ```
pub fn subscribe<Sub: WorkerSubscriber + 'static>(&mut self, subscriber: Sub) { pub fn on_text_block(
let subscriber = Arc::new(Mutex::new(subscriber)); &mut self,
setup: impl FnMut(&mut TextBlockScope) + Send + Sync + 'static,
) {
self.timeline
.on_text_block(ClosureTextBlockHandler {
setup: Box::new(setup),
});
}
// Register TextBlock handler /// Register a tool use block observer with scoped callbacks.
///
/// The setup closure receives `&ToolUseBlockStart` (containing `id` and `name`)
/// and a scope for registering `on_delta` and `on_stop` callbacks.
///
/// `on_stop` receives a fully assembled `&ToolCall` with parsed JSON input.
///
/// # Examples
///
/// ```ignore
/// worker.on_tool_use_block(|start, block| {
/// println!("Tool: {} ({})", start.name, start.id);
/// block.on_delta(|json| { /* streaming JSON fragment */ });
/// block.on_stop(|call| println!("Done: {}", call.name));
/// });
/// ```
pub fn on_tool_use_block(
&mut self,
setup: impl FnMut(&ToolUseBlockStart, &mut ToolUseBlockScope) + Send + Sync + 'static,
) {
self.timeline self.timeline
.on_text_block(TextBlockSubscriberAdapter::new(subscriber.clone())); .on_tool_use_block(ClosureToolUseBlockHandler {
setup: Box::new(setup),
});
}
// Register ToolUseBlock handler /// Register a usage event callback.
self.timeline pub fn on_usage(
.on_tool_use_block(ToolUseBlockSubscriberAdapter::new(subscriber.clone())); &mut self,
callback: impl FnMut(&UsageEvent) + Send + Sync + 'static,
) {
self.timeline.on_usage(ClosureMetaHandler {
callback,
_kind: PhantomData::<UsageKind>,
});
}
// Register meta handlers /// Register a status event callback.
self.timeline pub fn on_status(
.on_usage(UsageSubscriberAdapter::new(subscriber.clone())); &mut self,
self.timeline callback: impl FnMut(&StatusEvent) + Send + Sync + 'static,
.on_status(StatusSubscriberAdapter::new(subscriber.clone())); ) {
self.timeline self.timeline.on_status(ClosureMetaHandler {
.on_error(ErrorSubscriberAdapter::new(subscriber.clone())); callback,
_kind: PhantomData::<StatusKind>,
});
}
// Register turn control callback /// Register an error event callback.
self.turn_notifiers pub fn on_error(
.push(Box::new(SubscriberTurnNotifier { subscriber })); &mut self,
callback: impl FnMut(&ErrorEvent) + Send + Sync + 'static,
) {
self.timeline.on_error(ClosureMetaHandler {
callback,
_kind: PhantomData::<ErrorKind>,
});
}
/// Register a turn-start callback (receives 0-based turn number).
pub fn on_turn_start(
&mut self,
callback: impl Fn(usize) + Send + Sync + 'static,
) {
self.turn_start_cbs.push(Box::new(callback));
}
/// Register a turn-end callback (receives 0-based turn number).
pub fn on_turn_end(
&mut self,
callback: impl Fn(usize) + Send + Sync + 'static,
) {
self.turn_end_cbs.push(Box::new(callback));
} }
/// Get a shared tool server handle. /// Get a shared tool server handle.
@ -940,8 +959,8 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
// Notify turn start // Notify turn start
let current_turn = self.turn_count; let current_turn = self.turn_count;
debug!(turn = current_turn, "Turn start"); debug!(turn = current_turn, "Turn start");
for notifier in &self.turn_notifiers { for cb in &self.turn_start_cbs {
notifier.on_turn_start(current_turn); cb(current_turn);
} }
// Hook: pre_llm_request // Hook: pre_llm_request
@ -952,8 +971,8 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
match control { match control {
PreLlmRequestResult::Cancel(reason) => { PreLlmRequestResult::Cancel(reason) => {
info!(reason = %reason, "Aborted by hook"); info!(reason = %reason, "Aborted by hook");
for notifier in &self.turn_notifiers { for cb in &self.turn_end_cbs {
notifier.on_turn_end(current_turn); cb(current_turn);
} }
self.last_run_interrupted = true; self.last_run_interrupted = true;
return Err(WorkerError::Aborted(reason)); return Err(WorkerError::Aborted(reason));
@ -1047,8 +1066,8 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
debug!(event_count = event_count, "Stream completed"); debug!(event_count = event_count, "Stream completed");
// Notify turn end // Notify turn end
for notifier in &self.turn_notifiers { for cb in &self.turn_end_cbs {
notifier.on_turn_end(current_turn); cb(current_turn);
} }
self.turn_count += 1; self.turn_count += 1;
@ -1151,7 +1170,8 @@ impl<C: LlmClient> Worker<C, Mutable> {
locked_prefix_len: 0, locked_prefix_len: 0,
turn_count: 0, turn_count: 0,
max_turns: None, max_turns: None,
turn_notifiers: Vec::new(), turn_start_cbs: Vec::new(),
turn_end_cbs: Vec::new(),
request_config: RequestConfig::default(), request_config: RequestConfig::default(),
last_run_interrupted: false, last_run_interrupted: false,
output_processor: None, output_processor: None,
@ -1386,7 +1406,8 @@ impl<C: LlmClient> Worker<C, Mutable> {
locked_prefix_len, locked_prefix_len,
turn_count: self.turn_count, turn_count: self.turn_count,
max_turns: self.max_turns, max_turns: self.max_turns,
turn_notifiers: self.turn_notifiers, turn_start_cbs: self.turn_start_cbs,
turn_end_cbs: self.turn_end_cbs,
request_config: self.request_config, request_config: self.request_config,
last_run_interrupted: self.last_run_interrupted, last_run_interrupted: self.last_run_interrupted,
output_processor: self.output_processor, output_processor: self.output_processor,
@ -1424,7 +1445,8 @@ impl<C: LlmClient> Worker<C, CacheLocked> {
locked_prefix_len: 0, locked_prefix_len: 0,
turn_count: self.turn_count, turn_count: self.turn_count,
max_turns: self.max_turns, max_turns: self.max_turns,
turn_notifiers: self.turn_notifiers, turn_start_cbs: self.turn_start_cbs,
turn_end_cbs: self.turn_end_cbs,
request_config: self.request_config, request_config: self.request_config,
last_run_interrupted: self.last_run_interrupted, last_run_interrupted: self.last_run_interrupted,
output_processor: self.output_processor, output_processor: self.output_processor,

View File

@ -0,0 +1,178 @@
//! Closure callback API tests
//!
//! Tests for the closure-based event subscription API on Worker.
mod common;
use std::sync::{Arc, Mutex};
use common::MockLlmClient;
use llm_worker::Worker;
use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent as ClientStatusEvent};
// =============================================================================
// Tests
// =============================================================================
/// Verify that on_text_block correctly receives delta and stop events
#[tokio::test]
async fn test_callback_text_block_events() {
let events = vec![
Event::text_block_start(0),
Event::text_delta(0, "Hello, "),
Event::text_delta(0, "World!"),
Event::text_block_stop(0, None),
Event::Status(ClientStatusEvent {
status: ResponseStatus::Completed,
}),
];
let client = MockLlmClient::new(events);
let mut worker = Worker::new(client);
let text_deltas = Arc::new(Mutex::new(Vec::new()));
let text_completes = Arc::new(Mutex::new(Vec::new()));
let deltas = text_deltas.clone();
let completes = text_completes.clone();
worker.on_text_block(move |block| {
let d = deltas.clone();
block.on_delta(move |text| {
d.lock().unwrap().push(text.to_owned());
});
let c = completes.clone();
block.on_stop(move |text| {
c.lock().unwrap().push(text.to_owned());
});
});
let result = worker.run("Greet me").await;
assert!(result.is_ok(), "Worker should complete: {:?}", result);
let deltas = text_deltas.lock().unwrap();
assert_eq!(deltas.len(), 2);
assert_eq!(deltas[0], "Hello, ");
assert_eq!(deltas[1], "World!");
let completes = text_completes.lock().unwrap();
assert_eq!(completes.len(), 1);
assert_eq!(completes[0], "Hello, World!");
}
/// Verify that on_tool_use_block correctly receives start info and stop with ToolCall
#[tokio::test]
async fn test_callback_tool_call_complete() {
let events = vec![
Event::tool_use_start(0, "call_123", "get_weather"),
Event::tool_input_delta(0, r#"{"city":"#),
Event::tool_input_delta(0, r#""Tokyo"}"#),
Event::tool_use_stop(0),
Event::Status(ClientStatusEvent {
status: ResponseStatus::Completed,
}),
];
let client = MockLlmClient::new(events);
let mut worker = Worker::new(client);
let tool_starts = Arc::new(Mutex::new(Vec::<(String, String)>::new()));
let tool_completes = Arc::new(Mutex::new(Vec::new()));
let starts = tool_starts.clone();
let completes = tool_completes.clone();
worker.on_tool_use_block(move |start, block| {
starts
.lock()
.unwrap()
.push((start.id.clone(), start.name.clone()));
let c = completes.clone();
block.on_stop(move |call| {
c.lock().unwrap().push(call.clone());
});
});
let _ = worker.run("Weather please").await;
let starts = tool_starts.lock().unwrap();
assert_eq!(starts.len(), 1);
assert_eq!(starts[0].0, "call_123");
assert_eq!(starts[0].1, "get_weather");
let completes = tool_completes.lock().unwrap();
assert_eq!(completes.len(), 1);
assert_eq!(completes[0].name, "get_weather");
assert_eq!(completes[0].id, "call_123");
assert_eq!(completes[0].input["city"], "Tokyo");
}
/// Verify that on_turn_start and on_turn_end callbacks are called
#[tokio::test]
async fn test_callback_turn_events() {
let events = vec![
Event::text_block_start(0),
Event::text_delta(0, "Done!"),
Event::text_block_stop(0, None),
Event::Status(ClientStatusEvent {
status: ResponseStatus::Completed,
}),
];
let client = MockLlmClient::new(events);
let mut worker = Worker::new(client);
let turn_starts = Arc::new(Mutex::new(Vec::new()));
let turn_ends = Arc::new(Mutex::new(Vec::new()));
let starts = turn_starts.clone();
worker.on_turn_start(move |turn| {
starts.lock().unwrap().push(turn);
});
let ends = turn_ends.clone();
worker.on_turn_end(move |turn| {
ends.lock().unwrap().push(turn);
});
let result = worker.run("Do something").await;
assert!(result.is_ok());
let starts = turn_starts.lock().unwrap();
let ends = turn_ends.lock().unwrap();
assert_eq!(starts.len(), 1);
assert_eq!(starts[0], 0);
assert_eq!(ends.len(), 1);
assert_eq!(ends[0], 0);
}
/// Verify that on_usage callback receives usage events
#[tokio::test]
async fn test_callback_usage_events() {
let events = vec![
Event::text_block_start(0),
Event::text_delta(0, "Hello"),
Event::text_block_stop(0, None),
Event::usage(100, 50),
Event::Status(ClientStatusEvent {
status: ResponseStatus::Completed,
}),
];
let client = MockLlmClient::new(events);
let mut worker = Worker::new(client);
let usage_events = Arc::new(Mutex::new(Vec::new()));
let usages = usage_events.clone();
worker.on_usage(move |event| {
usages.lock().unwrap().push(event.clone());
});
let _ = worker.run("Hello").await;
let usages = usage_events.lock().unwrap();
assert_eq!(usages.len(), 1);
assert_eq!(usages[0].input_tokens, Some(100));
assert_eq!(usages[0].output_tokens, Some(50));
}

View File

@ -1,234 +0,0 @@
//! WorkerSubscriber tests
//!
//! Tests for subscribing to events using WorkerSubscriber
mod common;
use std::sync::{Arc, Mutex};
use common::MockLlmClient;
use llm_worker::Worker;
use llm_worker::hook::ToolCall;
use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent as ClientStatusEvent};
use llm_worker::subscriber::WorkerSubscriber;
use llm_worker::timeline::event::{ErrorEvent, StatusEvent, UsageEvent};
use llm_worker::timeline::{TextBlockEvent, ToolUseBlockEvent};
// =============================================================================
// Test Subscriber
// =============================================================================
/// Simple Subscriber implementation for testing
struct TestSubscriber {
// Recording buffers
text_deltas: Arc<Mutex<Vec<String>>>,
text_completes: Arc<Mutex<Vec<String>>>,
tool_call_completes: Arc<Mutex<Vec<ToolCall>>>,
usage_events: Arc<Mutex<Vec<UsageEvent>>>,
status_events: Arc<Mutex<Vec<StatusEvent>>>,
turn_starts: Arc<Mutex<Vec<usize>>>,
turn_ends: Arc<Mutex<Vec<usize>>>,
}
impl TestSubscriber {
fn new() -> Self {
Self {
text_deltas: Arc::new(Mutex::new(Vec::new())),
text_completes: Arc::new(Mutex::new(Vec::new())),
tool_call_completes: Arc::new(Mutex::new(Vec::new())),
usage_events: Arc::new(Mutex::new(Vec::new())),
status_events: Arc::new(Mutex::new(Vec::new())),
turn_starts: Arc::new(Mutex::new(Vec::new())),
turn_ends: Arc::new(Mutex::new(Vec::new())),
}
}
}
impl WorkerSubscriber for TestSubscriber {
type TextBlockScope = String;
type ToolUseBlockScope = ();
fn on_text_block(&mut self, buffer: &mut String, event: &TextBlockEvent) {
if let TextBlockEvent::Delta(text) = event {
buffer.push_str(text);
self.text_deltas.lock().unwrap().push(text.clone());
}
}
fn on_text_complete(&mut self, text: &str) {
self.text_completes.lock().unwrap().push(text.to_string());
}
fn on_tool_use_block(&mut self, _scope: &mut (), _event: &ToolUseBlockEvent) {
// Process as needed
}
fn on_tool_call_complete(&mut self, call: &ToolCall) {
self.tool_call_completes.lock().unwrap().push(call.clone());
}
fn on_usage(&mut self, event: &UsageEvent) {
self.usage_events.lock().unwrap().push(event.clone());
}
fn on_status(&mut self, event: &StatusEvent) {
self.status_events.lock().unwrap().push(event.clone());
}
fn on_error(&mut self, _event: &ErrorEvent) {
// Process as needed
}
fn on_turn_start(&mut self, turn: usize) {
self.turn_starts.lock().unwrap().push(turn);
}
fn on_turn_end(&mut self, turn: usize) {
self.turn_ends.lock().unwrap().push(turn);
}
}
// =============================================================================
// Tests
// =============================================================================
/// Verify that WorkerSubscriber correctly receives text block events
#[tokio::test]
async fn test_subscriber_text_block_events() {
// Event sequence containing text response
let events = vec![
Event::text_block_start(0),
Event::text_delta(0, "Hello, "),
Event::text_delta(0, "World!"),
Event::text_block_stop(0, None),
Event::Status(ClientStatusEvent {
status: ResponseStatus::Completed,
}),
];
let client = MockLlmClient::new(events);
let mut worker = Worker::new(client);
// Register Subscriber
let subscriber = TestSubscriber::new();
let text_deltas = subscriber.text_deltas.clone();
let text_completes = subscriber.text_completes.clone();
worker.subscribe(subscriber);
// Execute
let result = worker.run("Greet me").await;
assert!(result.is_ok(), "Worker should complete: {:?}", result);
// Verify deltas were collected
let deltas = text_deltas.lock().unwrap();
assert_eq!(deltas.len(), 2);
assert_eq!(deltas[0], "Hello, ");
assert_eq!(deltas[1], "World!");
// Verify complete text was collected
let completes = text_completes.lock().unwrap();
assert_eq!(completes.len(), 1);
assert_eq!(completes[0], "Hello, World!");
}
/// Verify that WorkerSubscriber correctly receives tool call complete events
#[tokio::test]
async fn test_subscriber_tool_call_complete() {
// Event sequence containing tool call
let events = vec![
Event::tool_use_start(0, "call_123", "get_weather"),
Event::tool_input_delta(0, r#"{"city":"#),
Event::tool_input_delta(0, r#""Tokyo"}"#),
Event::tool_use_stop(0),
Event::Status(ClientStatusEvent {
status: ResponseStatus::Completed,
}),
];
let client = MockLlmClient::new(events);
let mut worker = Worker::new(client);
// Register Subscriber
let subscriber = TestSubscriber::new();
let tool_call_completes = subscriber.tool_call_completes.clone();
worker.subscribe(subscriber);
// Execute
let _ = worker.run("Weather please").await;
// Verify tool call complete was collected
let completes = tool_call_completes.lock().unwrap();
assert_eq!(completes.len(), 1);
assert_eq!(completes[0].name, "get_weather");
assert_eq!(completes[0].id, "call_123");
assert_eq!(completes[0].input["city"], "Tokyo");
}
/// Verify that WorkerSubscriber correctly receives turn events
#[tokio::test]
async fn test_subscriber_turn_events() {
let events = vec![
Event::text_block_start(0),
Event::text_delta(0, "Done!"),
Event::text_block_stop(0, None),
Event::Status(ClientStatusEvent {
status: ResponseStatus::Completed,
}),
];
let client = MockLlmClient::new(events);
let mut worker = Worker::new(client);
// Register Subscriber
let subscriber = TestSubscriber::new();
let turn_starts = subscriber.turn_starts.clone();
let turn_ends = subscriber.turn_ends.clone();
worker.subscribe(subscriber);
// Execute
let result = worker.run("Do something").await;
assert!(result.is_ok());
// Verify turn events were collected
let starts = turn_starts.lock().unwrap();
let ends = turn_ends.lock().unwrap();
assert_eq!(starts.len(), 1);
assert_eq!(starts[0], 0); // First turn
assert_eq!(ends.len(), 1);
assert_eq!(ends[0], 0);
}
/// Verify that WorkerSubscriber correctly receives Usage events
#[tokio::test]
async fn test_subscriber_usage_events() {
let events = vec![
Event::text_block_start(0),
Event::text_delta(0, "Hello"),
Event::text_block_stop(0, None),
Event::usage(100, 50),
Event::Status(ClientStatusEvent {
status: ResponseStatus::Completed,
}),
];
let client = MockLlmClient::new(events);
let mut worker = Worker::new(client);
// Register Subscriber
let subscriber = TestSubscriber::new();
let usage_events = subscriber.usage_events.clone();
worker.subscribe(subscriber);
// Execute
let _ = worker.run("Hello").await;
// Verify Usage events were collected
let usages = usage_events.lock().unwrap();
assert_eq!(usages.len(), 1);
assert_eq!(usages[0].input_tokens, Some(100));
assert_eq!(usages[0].output_tokens, Some(50));
}

View File

@ -1,11 +1,7 @@
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use llm_worker::hook::ToolCall;
use llm_worker::llm_client::client::LlmClient; use llm_worker::llm_client::client::LlmClient;
use llm_worker::subscriber::WorkerSubscriber;
use llm_worker::timeline::event::{ErrorEvent, UsageEvent};
use llm_worker::timeline::{TextBlockEvent, ToolUseBlockEvent};
use llm_worker::WorkerError; use llm_worker::WorkerError;
use llm_worker_persistence::Store; use llm_worker_persistence::Store;
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc};
@ -87,11 +83,79 @@ impl PodController {
// Keep the server alive by moving it into the controller task // Keep the server alive by moving it into the controller task
// (it will be dropped when the task ends) // (it will be dropped when the task ends)
// Register the event bridge subscriber on the worker // Register event bridge callbacks on the worker
let bridge = EventBridgeSubscriber { {
event_tx: event_tx.clone(), let worker = &mut pod.session_mut().worker;
};
pod.session_mut().worker.subscribe(bridge); let tx = event_tx.clone();
worker.on_turn_start(move |turn| {
let _ = tx.send(Event::TurnStart { turn });
});
let tx = event_tx.clone();
worker.on_turn_end(move |turn| {
let _ = tx.send(Event::TurnEnd {
turn,
result: TurnResult::Finished,
});
});
let tx = event_tx.clone();
worker.on_text_block(move |block| {
let tx_d = tx.clone();
block.on_delta(move |text| {
let _ = tx_d.send(Event::TextDelta {
text: text.to_owned(),
});
});
let tx_s = tx.clone();
block.on_stop(move |text| {
let _ = tx_s.send(Event::TextDone {
text: text.to_owned(),
});
});
});
let tx = event_tx.clone();
worker.on_tool_use_block(move |start, block| {
let _ = tx.send(Event::ToolCallStart {
id: start.id.clone(),
name: start.name.clone(),
});
let id_for_delta = start.id.clone();
let tx_d = tx.clone();
block.on_delta(move |json| {
let _ = tx_d.send(Event::ToolCallArgsDelta {
id: id_for_delta.clone(),
json: json.to_owned(),
});
});
let tx_s = tx.clone();
block.on_stop(move |call| {
let _ = tx_s.send(Event::ToolCallDone {
id: call.id.clone(),
name: call.name.clone(),
arguments: call.input.to_string(),
});
});
});
let tx = event_tx.clone();
worker.on_usage(move |event| {
let _ = tx.send(Event::Usage {
input_tokens: event.input_tokens,
output_tokens: event.output_tokens,
});
});
let tx = event_tx.clone();
worker.on_error(move |event| {
let _ = tx.send(Event::Error {
code: ErrorCode::ProviderError,
message: event.message.clone(),
});
});
}
// Clone cancel sender before moving pod // Clone cancel sender before moving pod
let cancel_tx = pod.session_mut().worker.cancel_sender(); let cancel_tx = pod.session_mut().worker.cancel_sender();
@ -252,83 +316,3 @@ fn worker_error_code(e: &PodError) -> ErrorCode {
} }
} }
// ---------------------------------------------------------------------------
// EventBridgeSubscriber — bridges Worker events to broadcast channel
// ---------------------------------------------------------------------------
struct EventBridgeSubscriber {
event_tx: broadcast::Sender<Event>,
}
impl WorkerSubscriber for EventBridgeSubscriber {
type TextBlockScope = ();
type ToolUseBlockScope = ();
fn on_turn_start(&mut self, turn: usize) {
let _ = self.event_tx.send(Event::TurnStart { turn });
}
fn on_turn_end(&mut self, turn: usize) {
let _ = self.event_tx.send(Event::TurnEnd {
turn,
result: TurnResult::Finished,
});
}
fn on_text_block(&mut self, _scope: &mut (), event: &TextBlockEvent) {
match event {
TextBlockEvent::Delta(text) => {
let _ = self.event_tx.send(Event::TextDelta {
text: text.clone(),
});
}
TextBlockEvent::Start(_) | TextBlockEvent::Stop(_) => {}
}
}
fn on_text_complete(&mut self, text: &str) {
let _ = self.event_tx.send(Event::TextDone {
text: text.to_owned(),
});
}
fn on_tool_use_block(&mut self, _scope: &mut (), event: &ToolUseBlockEvent) {
match event {
ToolUseBlockEvent::Start(start) => {
let _ = self.event_tx.send(Event::ToolCallStart {
id: start.id.clone(),
name: start.name.clone(),
});
}
ToolUseBlockEvent::InputJsonDelta(json) => {
let _ = self.event_tx.send(Event::ToolCallArgsDelta {
id: String::new(),
json: json.clone(),
});
}
ToolUseBlockEvent::Stop(_) => {}
}
}
fn on_tool_call_complete(&mut self, call: &ToolCall) {
let _ = self.event_tx.send(Event::ToolCallDone {
id: call.id.clone(),
name: call.name.clone(),
arguments: call.input.to_string(),
});
}
fn on_usage(&mut self, event: &UsageEvent) {
let _ = self.event_tx.send(Event::Usage {
input_tokens: event.input_tokens,
output_tokens: event.output_tokens,
});
}
fn on_error(&mut self, event: &ErrorEvent) {
let _ = self.event_tx.send(Event::Error {
code: ErrorCode::ProviderError,
message: event.message.clone(),
});
}
}

View File

@ -1,43 +0,0 @@
# Subscriber API: クロージャベースのスコープ表現
## 背景
Block系イベントは時間的に排他TextBlock中にToolUseBlockは来ないで、
Meta系イベントUsage等はいつでも流れ得る。
現行の `Handler<K>` + `Scope: Default` はこの保証を実現しているが、
ユーザーから見ると Kind/Scope/型消去のボイラープレートが重く、
`Timeline``WorkerSubscriber` の2層が「どちらを使えばいいか」分かりにくい。
## 方針
クロージャでスコープの寿命を表現し、ブロックの時間的排他性を Rust の借用で自然に保証する。
```rust
// Block系: クロージャ引数 = スコープの寿命保証
worker.on_text_block(|block| {
block.on_delta(|text| print!("{}", text));
block.on_stop(|reason| println!("\n---"));
});
worker.on_tool_use_block(|block| {
block.on_delta(|json| { /* ... */ });
block.on_stop(|call| { /* ... */ });
});
// Meta系: スコープ不要(いつでも来る)
worker.on_usage(|usage| { /* ... */ });
```
## 設計ポイント
- ブロックのスコープ = クロージャの借用寿命。ユーザーは `Kind` / `Scope: Default` を知らなくていい
- Block系の排他性が「ブロックが始まったらクロージャが呼ばれ、終わったら抜ける」という直感に一致する
- Meta系はフラットなコールバック。スコープ管理不要
- 内部的には現行の `Handler<K>` + `Timeline` ディスパッチ機構を維持し、クロージャを Handler に変換するアダプタを挟む
- `WorkerSubscriber` トレイト + 5種の SubscriberAdapter ボイラープレートが不要になる
## 現行からの変更
- `Timeline``pub mod` → 内部実装に格下げWorker の実装詳細に閉じ込める)
- `WorkerSubscriber` トレイト → 廃止。クロージャ登録 API に置き換え
- `Handler<K>` トレイトは内部で維持(クロージャからの変換先として)