yoi/crates/llm-worker/src/timeline/timeline.rs
2026-04-04 03:30:49 +09:00

638 lines
18 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Timeline層
//!
//! LLMからのイベントストリームを受信し、登録されたHandlerにディスパッチします。
//! 通常はWorker経由で使用しますが、直接使用することも可能です。
use std::marker::PhantomData;
use super::event::*;
use crate::handler::*;
// =============================================================================
// Type-erased Handler
// =============================================================================
/// 型消去された`Handler` trait
///
/// 各Handlerは独自のScope型を持つため、Timelineで保持するには型消去が必要です。
/// 通常は直接使用せず、`Timeline::on_text_block()`などのメソッド経由で
/// 自動的にラップされます。
pub trait ErasedHandler<K: Kind>: Send + Sync {
/// イベントをディスパッチ
fn dispatch(&mut self, event: &K::Event);
/// スコープを開始Block開始時
fn start_scope(&mut self);
/// スコープを終了Block終了時
fn end_scope(&mut self);
}
/// `Handler<K>`を`ErasedHandler<K>`として扱うためのラッパー
pub struct HandlerWrapper<H, K>
where
H: Handler<K>,
K: Kind,
{
handler: H,
scope: Option<H::Scope>,
// fn() -> K は常にSend+Syncなので、Kの制約に関係なくSendを満たせる
_kind: PhantomData<fn() -> K>,
}
impl<H, K> HandlerWrapper<H, K>
where
H: Handler<K>,
K: Kind,
{
pub fn new(handler: H) -> Self {
Self {
handler,
scope: None,
_kind: PhantomData,
}
}
}
impl<H, K> ErasedHandler<K> for HandlerWrapper<H, K>
where
H: Handler<K> + Send + Sync,
K: Kind,
H::Scope: Send + Sync,
{
fn dispatch(&mut self, event: &K::Event) {
if let Some(scope) = &mut self.scope {
self.handler.on_event(scope, event);
}
}
fn start_scope(&mut self) {
self.scope = Some(H::Scope::default());
}
fn end_scope(&mut self) {
self.scope = None;
}
}
// =============================================================================
// Block Handler Registry
// =============================================================================
/// ブロックハンドラーの型消去trait
trait ErasedBlockHandler: Send + Sync {
fn dispatch_start(&mut self, start: &BlockStart);
fn dispatch_delta(&mut self, delta: &BlockDelta);
fn dispatch_stop(&mut self, stop: &BlockStop);
fn dispatch_abort(&mut self, abort: &BlockAbort);
fn start_scope(&mut self);
fn end_scope(&mut self);
/// スコープがアクティブかどうか
fn has_scope(&self) -> bool;
}
/// TextBlockKind用のラッパー
struct TextBlockHandlerWrapper<H>
where
H: Handler<TextBlockKind>,
{
handler: H,
scope: Option<H::Scope>,
}
impl<H> TextBlockHandlerWrapper<H>
where
H: Handler<TextBlockKind>,
{
fn new(handler: H) -> Self {
Self {
handler,
scope: None,
}
}
}
impl<H> ErasedBlockHandler for TextBlockHandlerWrapper<H>
where
H: Handler<TextBlockKind> + Send + Sync,
H::Scope: Send + Sync,
{
fn dispatch_start(&mut self, start: &BlockStart) {
if let Some(scope) = &mut self.scope {
self.handler.on_event(
scope,
&TextBlockEvent::Start(TextBlockStart { index: start.index }),
);
}
}
fn dispatch_delta(&mut self, delta: &BlockDelta) {
if let Some(scope) = &mut self.scope {
if let DeltaContent::Text(text) = &delta.delta {
self.handler
.on_event(scope, &TextBlockEvent::Delta(text.clone()));
}
}
}
fn dispatch_stop(&mut self, stop: &BlockStop) {
if let Some(scope) = &mut self.scope {
self.handler.on_event(
scope,
&TextBlockEvent::Stop(TextBlockStop {
index: stop.index,
stop_reason: stop.stop_reason.clone(),
}),
);
}
}
fn dispatch_abort(&mut self, _abort: &BlockAbort) {
// TextBlockはabortを特別扱いしないスコープ終了のみ
}
fn start_scope(&mut self) {
self.scope = Some(H::Scope::default());
}
fn end_scope(&mut self) {
self.scope = None;
}
fn has_scope(&self) -> bool {
self.scope.is_some()
}
}
/// ThinkingBlockKind用のラッパー
struct ThinkingBlockHandlerWrapper<H>
where
H: Handler<ThinkingBlockKind>,
{
handler: H,
scope: Option<H::Scope>,
}
impl<H> ThinkingBlockHandlerWrapper<H>
where
H: Handler<ThinkingBlockKind>,
{
fn new(handler: H) -> Self {
Self {
handler,
scope: None,
}
}
}
impl<H> ErasedBlockHandler for ThinkingBlockHandlerWrapper<H>
where
H: Handler<ThinkingBlockKind> + Send + Sync,
H::Scope: Send + Sync,
{
fn dispatch_start(&mut self, start: &BlockStart) {
if let Some(scope) = &mut self.scope {
self.handler.on_event(
scope,
&ThinkingBlockEvent::Start(ThinkingBlockStart { index: start.index }),
);
}
}
fn dispatch_delta(&mut self, delta: &BlockDelta) {
if let Some(scope) = &mut self.scope {
if let DeltaContent::Thinking(text) = &delta.delta {
self.handler
.on_event(scope, &ThinkingBlockEvent::Delta(text.clone()));
}
}
}
fn dispatch_stop(&mut self, stop: &BlockStop) {
if let Some(scope) = &mut self.scope {
self.handler.on_event(
scope,
&ThinkingBlockEvent::Stop(ThinkingBlockStop { index: stop.index }),
);
}
}
fn dispatch_abort(&mut self, _abort: &BlockAbort) {}
fn start_scope(&mut self) {
self.scope = Some(H::Scope::default());
}
fn end_scope(&mut self) {
self.scope = None;
}
fn has_scope(&self) -> bool {
self.scope.is_some()
}
}
/// ToolUseBlockKind用のラッパー
struct ToolUseBlockHandlerWrapper<H>
where
H: Handler<ToolUseBlockKind>,
{
handler: H,
scope: Option<H::Scope>,
current_tool: Option<(String, String)>, // (id, name)
}
impl<H> ToolUseBlockHandlerWrapper<H>
where
H: Handler<ToolUseBlockKind>,
{
fn new(handler: H) -> Self {
Self {
handler,
scope: None,
current_tool: None,
}
}
}
impl<H> ErasedBlockHandler for ToolUseBlockHandlerWrapper<H>
where
H: Handler<ToolUseBlockKind> + Send + Sync,
H::Scope: Send + Sync,
{
fn dispatch_start(&mut self, start: &BlockStart) {
if let Some(scope) = &mut self.scope {
if let BlockMetadata::ToolUse { id, name } = &start.metadata {
self.current_tool = Some((id.clone(), name.clone()));
self.handler.on_event(
scope,
&ToolUseBlockEvent::Start(ToolUseBlockStart {
index: start.index,
id: id.clone(),
name: name.clone(),
}),
);
}
}
}
fn dispatch_delta(&mut self, delta: &BlockDelta) {
if let Some(scope) = &mut self.scope {
if let DeltaContent::InputJson(json) = &delta.delta {
self.handler
.on_event(scope, &ToolUseBlockEvent::InputJsonDelta(json.clone()));
}
}
}
fn dispatch_stop(&mut self, stop: &BlockStop) {
if let Some(scope) = &mut self.scope {
if let Some((id, name)) = self.current_tool.take() {
self.handler.on_event(
scope,
&ToolUseBlockEvent::Stop(ToolUseBlockStop {
index: stop.index,
id,
name,
}),
);
}
}
}
fn dispatch_abort(&mut self, _abort: &BlockAbort) {
self.current_tool = None;
}
fn start_scope(&mut self) {
self.scope = Some(H::Scope::default());
}
fn end_scope(&mut self) {
self.scope = None;
self.current_tool = None;
}
fn has_scope(&self) -> bool {
self.scope.is_some()
}
}
// =============================================================================
// Timeline
// =============================================================================
/// イベントストリームの管理とハンドラへのディスパッチ
///
/// LLMからのイベントを受信し、登録されたハンドラに振り分けます。
/// ブロック系イベントはスコープ管理付きで処理されます。
///
/// # Examples
///
/// ```ignore
/// use llm_worker::{Timeline, Handler, TextBlockKind, TextBlockEvent};
///
/// struct MyHandler;
/// impl Handler<TextBlockKind> for MyHandler {
/// type Scope = String;
/// fn on_event(&mut self, buffer: &mut String, event: &TextBlockEvent) {
/// if let TextBlockEvent::Delta(text) = event {
/// buffer.push_str(text);
/// }
/// }
/// }
///
/// let mut timeline = Timeline::new();
/// timeline.on_text_block(MyHandler);
/// ```
///
/// # サポートするイベント種別
///
/// - **メタ系**: Usage, Ping, Status, Error
/// - **ブロック系**: TextBlock, ThinkingBlock, ToolUseBlock
pub struct Timeline {
// Meta系ハンドラー
usage_handlers: Vec<Box<dyn ErasedHandler<UsageKind>>>,
ping_handlers: Vec<Box<dyn ErasedHandler<PingKind>>>,
status_handlers: Vec<Box<dyn ErasedHandler<StatusKind>>>,
error_handlers: Vec<Box<dyn ErasedHandler<ErrorKind>>>,
// Block系ハンドラーBlockTypeごとにグループ化
text_block_handlers: Vec<Box<dyn ErasedBlockHandler>>,
thinking_block_handlers: Vec<Box<dyn ErasedBlockHandler>>,
tool_use_block_handlers: Vec<Box<dyn ErasedBlockHandler>>,
// 現在アクティブなブロック
current_block: Option<BlockType>,
}
impl Default for Timeline {
fn default() -> Self {
Self::new()
}
}
impl Timeline {
pub fn new() -> Self {
Self {
usage_handlers: Vec::new(),
ping_handlers: Vec::new(),
status_handlers: Vec::new(),
error_handlers: Vec::new(),
text_block_handlers: Vec::new(),
thinking_block_handlers: Vec::new(),
tool_use_block_handlers: Vec::new(),
current_block: None,
}
}
// =========================================================================
// Handler Registration
// =========================================================================
/// UsageKind用のHandlerを登録
pub fn on_usage<H>(&mut self, handler: H) -> &mut Self
where
H: Handler<UsageKind> + Send + Sync + 'static,
H::Scope: Send + Sync,
{
// Meta系はデフォルトでスコープを開始しておく
let mut wrapper = HandlerWrapper::new(handler);
wrapper.start_scope();
self.usage_handlers.push(Box::new(wrapper));
self
}
/// PingKind用のHandlerを登録
pub fn on_ping<H>(&mut self, handler: H) -> &mut Self
where
H: Handler<PingKind> + Send + Sync + 'static,
H::Scope: Send + Sync,
{
let mut wrapper = HandlerWrapper::new(handler);
wrapper.start_scope();
self.ping_handlers.push(Box::new(wrapper));
self
}
/// StatusKind用のHandlerを登録
pub fn on_status<H>(&mut self, handler: H) -> &mut Self
where
H: Handler<StatusKind> + Send + Sync + 'static,
H::Scope: Send + Sync,
{
let mut wrapper = HandlerWrapper::new(handler);
wrapper.start_scope();
self.status_handlers.push(Box::new(wrapper));
self
}
/// ErrorKind用のHandlerを登録
pub fn on_error<H>(&mut self, handler: H) -> &mut Self
where
H: Handler<ErrorKind> + Send + Sync + 'static,
H::Scope: Send + Sync,
{
let mut wrapper = HandlerWrapper::new(handler);
wrapper.start_scope();
self.error_handlers.push(Box::new(wrapper));
self
}
/// TextBlockKind用のHandlerを登録
pub fn on_text_block<H>(&mut self, handler: H) -> &mut Self
where
H: Handler<TextBlockKind> + Send + Sync + 'static,
H::Scope: Send + Sync,
{
self.text_block_handlers
.push(Box::new(TextBlockHandlerWrapper::new(handler)));
self
}
/// ThinkingBlockKind用のHandlerを登録
pub fn on_thinking_block<H>(&mut self, handler: H) -> &mut Self
where
H: Handler<ThinkingBlockKind> + Send + Sync + 'static,
H::Scope: Send + Sync,
{
self.thinking_block_handlers
.push(Box::new(ThinkingBlockHandlerWrapper::new(handler)));
self
}
/// ToolUseBlockKind用のHandlerを登録
pub fn on_tool_use_block<H>(&mut self, handler: H) -> &mut Self
where
H: Handler<ToolUseBlockKind> + Send + Sync + 'static,
H::Scope: Send + Sync,
{
self.tool_use_block_handlers
.push(Box::new(ToolUseBlockHandlerWrapper::new(handler)));
self
}
// =========================================================================
// Event Dispatch
// =========================================================================
/// メインのディスパッチエントリポイント
pub fn dispatch(&mut self, event: &Event) {
match event {
// Meta系: 即時ディスパッチ(登録順)
Event::Usage(u) => self.dispatch_usage(u),
Event::Ping(p) => self.dispatch_ping(p),
Event::Status(s) => self.dispatch_status(s),
Event::Error(e) => self.dispatch_error(e),
// Block系: スコープ管理しながらディスパッチ
Event::BlockStart(s) => self.handle_block_start(s),
Event::BlockDelta(d) => self.handle_block_delta(d),
Event::BlockStop(s) => self.handle_block_stop(s),
Event::BlockAbort(a) => self.handle_block_abort(a),
}
}
fn dispatch_usage(&mut self, event: &UsageEvent) {
for handler in &mut self.usage_handlers {
handler.dispatch(event);
}
}
fn dispatch_ping(&mut self, event: &PingEvent) {
for handler in &mut self.ping_handlers {
handler.dispatch(event);
}
}
fn dispatch_status(&mut self, event: &StatusEvent) {
for handler in &mut self.status_handlers {
handler.dispatch(event);
}
}
fn dispatch_error(&mut self, event: &ErrorEvent) {
for handler in &mut self.error_handlers {
handler.dispatch(event);
}
}
fn handle_block_start(&mut self, start: &BlockStart) {
self.current_block = Some(start.block_type);
let handlers = self.get_block_handlers_mut(start.block_type);
for handler in handlers {
handler.start_scope();
handler.dispatch_start(start);
}
}
fn handle_block_delta(&mut self, delta: &BlockDelta) {
let block_type = delta.delta.block_type();
// OpenAIなどのプロバイダはBlockStartを送らない場合があるため、
// Deltaが来たときにスコープがなければ暗黙的に開始する
if self.current_block.is_none() {
self.current_block = Some(block_type);
}
let handlers = self.get_block_handlers_mut(block_type);
for handler in handlers {
// スコープがなければ暗黙的に開始
if !handler.has_scope() {
handler.start_scope();
}
handler.dispatch_delta(delta);
}
}
fn handle_block_stop(&mut self, stop: &BlockStop) {
let handlers = self.get_block_handlers_mut(stop.block_type);
for handler in handlers {
handler.dispatch_stop(stop);
handler.end_scope();
}
self.current_block = None;
}
fn handle_block_abort(&mut self, abort: &BlockAbort) {
let handlers = self.get_block_handlers_mut(abort.block_type);
for handler in handlers {
handler.dispatch_abort(abort);
handler.end_scope();
}
self.current_block = None;
}
fn get_block_handlers_mut(
&mut self,
block_type: BlockType,
) -> &mut Vec<Box<dyn ErasedBlockHandler>> {
match block_type {
BlockType::Text => &mut self.text_block_handlers,
BlockType::Thinking => &mut self.thinking_block_handlers,
BlockType::ToolUse => &mut self.tool_use_block_handlers,
BlockType::ToolResult => &mut self.text_block_handlers, // ToolResultはTextとして扱う
}
}
/// 現在アクティブなブロックタイプを取得
pub fn current_block(&self) -> Option<BlockType> {
self.current_block
}
/// 現在アクティブなブロックを中断する
///
/// キャンセルやエラー時に呼び出し、進行中のブロックに対して
/// BlockAbortイベントを発火してスコープをクリーンアップする。
pub fn abort_current_block(&mut self) {
if let Some(block_type) = self.current_block {
let abort = crate::timeline::event::BlockAbort {
index: 0, // インデックスは不明なので0
block_type,
reason: "Cancelled".to_string(),
};
self.handle_block_abort(&abort);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
#[test]
fn test_timeline_creation() {
let timeline = Timeline::new();
assert!(timeline.current_block().is_none());
}
#[test]
fn test_meta_event_dispatch() {
// シンプルなテスト用構造体
struct TestUsageHandler {
calls: Arc<Mutex<Vec<UsageEvent>>>,
}
impl Handler<UsageKind> for TestUsageHandler {
type Scope = ();
fn on_event(&mut self, _scope: &mut (), event: &UsageEvent) {
self.calls.lock().unwrap().push(event.clone());
}
}
let calls = Arc::new(Mutex::new(Vec::new()));
let handler = TestUsageHandler {
calls: calls.clone(),
};
let mut timeline = Timeline::new();
timeline.on_usage(handler);
timeline.dispatch(&Event::usage(100, 50));
let recorded = calls.lock().unwrap();
assert_eq!(recorded.len(), 1);
assert_eq!(recorded[0].input_tokens, Some(100));
}
}