llm_worker_rs/llm-worker/src/subscriber.rs

372 lines
12 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! イベント購読
//!
//! LLMからのストリーミングイベントをリアルタイムで受信するためのトレイト。
//! UIへのストリーム表示やプログレス表示に使用します。
use std::sync::{Arc, Mutex};
use crate::{
handler::{
ErrorKind, Handler, StatusKind, TextBlockEvent, TextBlockKind, ToolUseBlockEvent,
ToolUseBlockKind, UsageKind,
},
hook::ToolCall,
timeline::event::{ErrorEvent, StatusEvent, UsageEvent},
};
// =============================================================================
// WorkerSubscriber Trait
// =============================================================================
/// LLMからのストリーミングイベントを購読するトレイト
///
/// Workerに登録すると、テキスト生成やツール呼び出しのイベントを
/// リアルタイムで受信できます。UIへのストリーム表示に最適です。
///
/// # 受信できるイベント
///
/// - **ブロックイベント**: テキスト、ツール使用(スコープ付き)
/// - **メタイベント**: 使用量、ステータス、エラー
/// - **完了イベント**: テキスト完了、ツール呼び出し完了
/// - **ターン制御**: ターン開始、ターン終了
///
/// # Examples
///
/// ```ignore
/// use llm_worker::subscriber::WorkerSubscriber;
/// use llm_worker::timeline::TextBlockEvent;
///
/// struct StreamPrinter;
///
/// impl WorkerSubscriber for StreamPrinter {
/// type TextBlockScope = ();
/// type ToolUseBlockScope = ();
///
/// fn on_text_block(&mut self, _: &mut (), event: &TextBlockEvent) {
/// if let TextBlockEvent::Delta(text) = event {
/// print!("{}", text); // リアルタイム出力
/// }
/// }
///
/// fn on_text_complete(&mut self, text: &str) {
/// println!("\n--- Complete: {} chars ---", text.len());
/// }
/// }
///
/// // Workerに登録
/// worker.subscribe(StreamPrinter);
/// ```
pub trait WorkerSubscriber: Send {
// =========================================================================
// スコープ型(ブロックイベント用)
// =========================================================================
/// テキストブロック処理用のスコープ型
///
/// ブロック開始時にDefault::default()で生成され、
/// ブロック終了時に破棄される。
type TextBlockScope: Default + Send + Sync;
/// ツール使用ブロック処理用のスコープ型
type ToolUseBlockScope: Default + Send + Sync;
// =========================================================================
// ブロックイベント(スコープ管理あり)
// =========================================================================
/// テキストブロックイベント
///
/// Start/Delta/Stopのライフサイクルを持つ。
/// scopeはブロック開始時に生成され、終了時に破棄される。
#[allow(unused_variables)]
fn on_text_block(&mut self, scope: &mut Self::TextBlockScope, event: &TextBlockEvent) {}
/// ツール使用ブロックイベント
///
/// Start/InputJsonDelta/Stopのライフサイクルを持つ。
#[allow(unused_variables)]
fn on_tool_use_block(
&mut self,
scope: &mut Self::ToolUseBlockScope,
event: &ToolUseBlockEvent,
) {
}
// =========================================================================
// 単発イベント(スコープ不要)
// =========================================================================
/// 使用量イベント
#[allow(unused_variables)]
fn on_usage(&mut self, event: &UsageEvent) {}
/// ステータスイベント
#[allow(unused_variables)]
fn on_status(&mut self, event: &StatusEvent) {}
/// エラーイベント
#[allow(unused_variables)]
fn on_error(&mut self, event: &ErrorEvent) {}
// =========================================================================
// 累積イベントWorker層で追加
// =========================================================================
/// テキスト完了イベント
///
/// テキストブロックが完了した時点で、累積されたテキスト全体が渡される。
/// ブロック処理後の最終結果を受け取るのに便利。
#[allow(unused_variables)]
fn on_text_complete(&mut self, text: &str) {}
/// ツール呼び出し完了イベント
///
/// ツール使用ブロックが完了した時点で、完全なToolCallが渡される。
#[allow(unused_variables)]
fn on_tool_call_complete(&mut self, call: &ToolCall) {}
// =========================================================================
// ターン制御
// =========================================================================
/// ターン開始時
///
/// `turn`は0から始まるターン番号。
#[allow(unused_variables)]
fn on_turn_start(&mut self, turn: usize) {}
/// ターン終了時
#[allow(unused_variables)]
fn on_turn_end(&mut self, turn: usize) {}
}
// =============================================================================
// SubscriberAdapter - WorkerSubscriberをTimelineハンドラにブリッジ
// =============================================================================
// =============================================================================
// TextBlock Handler Adapter
// =============================================================================
/// TextBlockKind用のSubscriberアダプター
pub(crate) struct TextBlockSubscriberAdapter<S: WorkerSubscriber> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber> TextBlockSubscriberAdapter<S> {
pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
Self { subscriber }
}
}
impl<S: WorkerSubscriber> Clone for TextBlockSubscriberAdapter<S> {
fn clone(&self) -> Self {
Self {
subscriber: self.subscriber.clone(),
}
}
}
/// TextBlockのスコープをラップ
pub struct TextBlockScopeWrapper<S: WorkerSubscriber> {
inner: S::TextBlockScope,
buffer: String, // on_text_complete用のバッファ
}
impl<S: WorkerSubscriber> Default for TextBlockScopeWrapper<S> {
fn default() -> Self {
Self {
inner: S::TextBlockScope::default(),
buffer: String::new(),
}
}
}
impl<S: WorkerSubscriber + 'static> Handler<TextBlockKind> for TextBlockSubscriberAdapter<S> {
type Scope = TextBlockScopeWrapper<S>;
fn on_event(&mut self, scope: &mut Self::Scope, event: &TextBlockEvent) {
// Deltaの場合はバッファに蓄積
if let TextBlockEvent::Delta(text) = event {
scope.buffer.push_str(text);
}
// SubscriberのTextBlockイベントハンドラを呼び出し
if let Ok(mut subscriber) = self.subscriber.lock() {
subscriber.on_text_block(&mut scope.inner, event);
// Stopの場合はon_text_completeも呼び出し
if matches!(event, TextBlockEvent::Stop(_)) {
subscriber.on_text_complete(&scope.buffer);
}
}
}
}
// =============================================================================
// ToolUseBlock Handler Adapter
// =============================================================================
/// ToolUseBlockKind用のSubscriberアダプター
pub(crate) struct ToolUseBlockSubscriberAdapter<S: WorkerSubscriber> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber> ToolUseBlockSubscriberAdapter<S> {
pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
Self { subscriber }
}
}
impl<S: WorkerSubscriber> Clone for ToolUseBlockSubscriberAdapter<S> {
fn clone(&self) -> Self {
Self {
subscriber: self.subscriber.clone(),
}
}
}
/// ToolUseBlockのスコープをラップ
pub struct ToolUseBlockScopeWrapper<S: WorkerSubscriber> {
inner: S::ToolUseBlockScope,
id: String,
name: String,
input_json: String, // JSON蓄積用
}
impl<S: WorkerSubscriber> Default for ToolUseBlockScopeWrapper<S> {
fn default() -> Self {
Self {
inner: S::ToolUseBlockScope::default(),
id: String::new(),
name: String::new(),
input_json: String::new(),
}
}
}
impl<S: WorkerSubscriber + 'static> Handler<ToolUseBlockKind> for ToolUseBlockSubscriberAdapter<S> {
type Scope = ToolUseBlockScopeWrapper<S>;
fn on_event(&mut self, scope: &mut Self::Scope, event: &ToolUseBlockEvent) {
// Start時にメタデータを保存
if let ToolUseBlockEvent::Start(start) = event {
scope.id = start.id.clone();
scope.name = start.name.clone();
}
// InputJsonDeltaの場合はバッファに蓄積
if let ToolUseBlockEvent::InputJsonDelta(json) = event {
scope.input_json.push_str(json);
}
// SubscriberのToolUseBlockイベントハンドラを呼び出し
if let Ok(mut subscriber) = self.subscriber.lock() {
subscriber.on_tool_use_block(&mut scope.inner, event);
// Stopの場合はon_tool_call_completeも呼び出し
if matches!(event, ToolUseBlockEvent::Stop(_)) {
let input: serde_json::Value =
serde_json::from_str(&scope.input_json).unwrap_or_default();
let tool_call = ToolCall {
id: scope.id.clone(),
name: scope.name.clone(),
input,
};
subscriber.on_tool_call_complete(&tool_call);
}
}
}
}
// =============================================================================
// Meta Event Handler Adapters
// =============================================================================
/// UsageKind用のSubscriberアダプター
pub(crate) struct UsageSubscriberAdapter<S: WorkerSubscriber> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber> UsageSubscriberAdapter<S> {
pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
Self { subscriber }
}
}
impl<S: WorkerSubscriber> Clone for UsageSubscriberAdapter<S> {
fn clone(&self) -> Self {
Self {
subscriber: self.subscriber.clone(),
}
}
}
impl<S: WorkerSubscriber + 'static> Handler<UsageKind> for UsageSubscriberAdapter<S> {
type Scope = ();
fn on_event(&mut self, _scope: &mut Self::Scope, event: &UsageEvent) {
if let Ok(mut subscriber) = self.subscriber.lock() {
subscriber.on_usage(event);
}
}
}
/// StatusKind用のSubscriberアダプター
pub(crate) struct StatusSubscriberAdapter<S: WorkerSubscriber> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber> StatusSubscriberAdapter<S> {
pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
Self { subscriber }
}
}
impl<S: WorkerSubscriber> Clone for StatusSubscriberAdapter<S> {
fn clone(&self) -> Self {
Self {
subscriber: self.subscriber.clone(),
}
}
}
impl<S: WorkerSubscriber + 'static> Handler<StatusKind> for StatusSubscriberAdapter<S> {
type Scope = ();
fn on_event(&mut self, _scope: &mut Self::Scope, event: &StatusEvent) {
if let Ok(mut subscriber) = self.subscriber.lock() {
subscriber.on_status(event);
}
}
}
/// ErrorKind用のSubscriberアダプター
pub(crate) struct ErrorSubscriberAdapter<S: WorkerSubscriber> {
subscriber: Arc<Mutex<S>>,
}
impl<S: WorkerSubscriber> ErrorSubscriberAdapter<S> {
pub fn new(subscriber: Arc<Mutex<S>>) -> Self {
Self { subscriber }
}
}
impl<S: WorkerSubscriber> Clone for ErrorSubscriberAdapter<S> {
fn clone(&self) -> Self {
Self {
subscriber: self.subscriber.clone(),
}
}
}
impl<S: WorkerSubscriber + 'static> Handler<ErrorKind> for ErrorSubscriberAdapter<S> {
type Scope = ();
fn on_event(&mut self, _scope: &mut Self::Scope, event: &ErrorEvent) {
if let Ok(mut subscriber) = self.subscriber.lock() {
subscriber.on_error(event);
}
}
}