8.2 KiB
8.2 KiB
Timeline層設計メモ
目的
- OpenAI / Anthropic / Gemini のストリーミングイベントを単一の抽象レイヤーに正規化し、LLMクライアントの状態遷移を制御する。
- イベント単位処理(Meta系など)とブロック単位処理(テキスト/Thinking/ToolCall)を同一のパイプラインで扱えるようにする。
要件
イベントストリームに対して直接的にループ処理をしようとすると発生する煩雑な状態管理を避ける。
イベントをloop+matchで処理をするような手段を取ると、テキストのデルタの更新先や、完了タイミングなどの状態管理が必要になる。 また、コンテンツブロックに対する単純なイテレータではping/usageなどの単発イベントを同期的に処理することができない。
- Meta系イベントの即時処理 (ブロック内部イベントと順序が前後しないようにする)
- ブロック開始/差分/終了でスコープを保持する
- 型安全なハンドラー
- blockでキャッチするdeltaについて、 Text/Input JSON/Thinking等、 ブロックに即したイベントの型が必要。
- エラーの適切な制御
TODO:toolのjsonをキャッチする際、定義したToolが期待するjsonschemaに合致しているかのバリデーションはTimeline層の責務か?
Memo
- Blockの定義
- ブロックを処理するHandlerで保持するコンテキストについて、LLMで用いられるコンテキストと混同を避けるために「スコープ」と呼称する。
- ブロックは常に一つである前提。複数のブロックが同時に存在することは無いため。
イベントモデル
前提:llm_client層は各プロバイダのストリーミングレスポンスを正規化し、フラットなEvent列挙として出力する。
pub enum Event {
// Meta events (not tied to a block)
Ping(PingEvent),
Usage(UsageEvent),
Status(StatusEvent),
Error(ErrorEvent),
// Block lifecycle events
BlockStart(BlockStart),
BlockDelta(BlockDelta),
BlockStop(BlockStop),
BlockAbort(BlockAbort),
}
-> Timelineがブロック構造化を担う
設計方針
- 目的: アプリケーションとの連携を楽にする
状態共有/非同期: Handler間の状態共有はアプリケーション側の責務、アプリケーション側で非同期化可能な設計
複数のHandlerに対し、登録順にディスパッチする。Handlerは後続Handlerへのイベントを改変しない。
TimelineとKind/Handler
TimelineはEventストリームを受け取り、登録されたHandler<K: Kind>にディスパッチする。
Kind
Kindはイベント型のみを定義する。スコープはHandler側で定義するため、同じKindに対して異なるスコープを持つHandlerを登録できる。
pub trait Kind {
type Event;
}
Handler
HandlerはKindに対する処理を定義し、自身のスコープ型も決定する。
pub trait Handler<K: Kind> {
type Scope: Default;
fn on_event(&mut self, scope: &mut Self::Scope, event: &K::Event);
}
Kindによって受け取るイベント型が決定されるHandler::ScopeによってHandler固有のスコープ型が決定される- Meta系とBlock系を統一的に扱える
Meta系Kind
スコープ不要の単発イベント:
pub struct UsageKind;
impl Kind for UsageKind {
type Event = UsageEvent;
}
pub struct PingKind;
impl Kind for PingKind {
type Event = PingEvent;
}
// 使用例
struct UsageAccumulator { total_tokens: u64 }
impl Handler<UsageKind> for UsageAccumulator {
type Scope = (); // スコープ不要
fn on_event(&mut self, _scope: &mut (), usage: &UsageEvent) {
self.total_tokens += usage.total_tokens.unwrap_or(0);
}
}
Block系Kind
ライフサイクル(Start/Delta/Stop)を持つ。スコープはHandler側で定義:
pub struct TextBlockKind;
impl Kind for TextBlockKind {
type Event = TextBlockEvent;
}
pub enum TextBlockEvent<'a> {
Start(&'a BlockStart),
Delta(&'a str),
Stop(&'a BlockStop),
}
// 使用例1: デルタを即時出力(スコープ不要)
struct PrintHandler;
impl Handler<TextBlockKind> for PrintHandler {
type Scope = ();
fn on_event(&mut self, _scope: &mut (), event: &TextBlockEvent) {
if let TextBlockEvent::Delta(s) = event {
print!("{}", s);
}
}
}
// 使用例2: テキストを蓄積して収集(Stringをスコープとして利用)
struct TextCollector { results: Vec<String> }
impl Handler<TextBlockKind> for TextCollector {
type Scope = String; // bufferとして使用
fn on_event(&mut self, buffer: &mut String, event: &TextBlockEvent) {
match event {
TextBlockEvent::Start(_) => {}
TextBlockEvent::Delta(s) => buffer.push_str(s),
TextBlockEvent::Stop(_) => {
self.results.push(std::mem::take(buffer));
}
}
}
}
Timelineの責務
Eventストリームを受信- Block系イベント(BlockStart/Delta/Stop)をBlockKindごとのライフサイクルイベントに変換
- 各Handlerごとのスコープの生成・管理(BlockStart時に生成、BlockStop/Abort時に破棄)
- 登録されたHandlerへの登録順ディスパッチ
Handlerの型消去
各Handlerは独自のScope型を持つため、Timelineで保持するには型消去が必要:
// 型消去されたHandler trait
trait ErasedHandler<K: Kind> {
fn dispatch(&mut self, event: &K::Event);
fn start_scope(&mut self); // Scope生成
fn end_scope(&mut self); // Scope破棄
}
// Handler<K>からErasedHandler<K>へのラッパー
struct HandlerWrapper<H, K>
where
H: Handler<K>,
K: Kind,
{
handler: H,
scope: Option<H::Scope>, // Block中のみSome
_kind: PhantomData<K>,
}
impl<H, K> ErasedHandler<K> for HandlerWrapper<H, K>
where
H: Handler<K>,
K: Kind,
{
fn dispatch(&mut self, event: &K::Event) {
if let Some(scope) = &mut self.scope {
self.handler.on_event(scope, event);
}
}
fn start_scope(&mut self) {
self.scope = Some(H::Scope::default());
}
fn end_scope(&mut self) {
self.scope = None;
}
}
Timelineのディスパッチ
impl Timeline {
pub fn dispatch(&mut self, event: &Event) {
match event {
// Meta系: 即時ディスパッチ(登録順)
Event::Usage(u) => self.dispatch_to::<UsageKind>(u),
Event::Ping(p) => self.dispatch_to::<PingKind>(p),
// Block系: スコープ管理しながらディスパッチ
Event::BlockStart(s) => self.handle_block_start(s),
Event::BlockDelta(d) => self.handle_block_delta(d),
Event::BlockStop(s) => self.handle_block_stop(s),
Event::BlockAbort(a) => self.handle_block_abort(a),
_ => {}
}
}
fn handle_block_start(&mut self, start: &BlockStart) {
// 該当Kind の全Handlerに対してスコープ生成
for handler in self.handlers_for_kind(start.kind()) {
handler.start_scope();
handler.dispatch(&start.into());
}
}
fn handle_block_stop(&mut self, stop: &BlockStop) {
for handler in self.handlers_for_kind(stop.kind()) {
handler.dispatch(&stop.into());
handler.end_scope();
}
}
}
期待効果
- 統一インターフェース: Meta系もBlock系も
Handler<K: Kind>で統一 - 型安全:
Kindによってイベント型がコンパイル時に決定、Handler::ScopeによってHandler固有のスコープ型が決定 - スコープの柔軟性: 同一Kindに対して異なるScopeを持つ複数Handlerを登録可能
- 責務分離: llm_client層はフラットなEvent出力、Timeline層がブロック構造化
- スコープ管理の自動化: Handlerは自前でスコープ保持を意識せずに済む