diff --git a/TODO.md b/TODO.md index dcbc4349..3ebb47d9 100644 --- a/TODO.md +++ b/TODO.md @@ -1,3 +1,5 @@ - [x] 永続化データ構造の制定 - [ ] テスト設計 +- [x] ツール出力の遅延読み込み設計 (ToolOutput / BlobStore / auto_summarize) - [ ] ツール設計 +- [ ] inspect ツール実装 diff --git a/crates/llm-worker-persistence/Cargo.toml b/crates/llm-worker-persistence/Cargo.toml index 3128bee2..f2b5ee13 100644 --- a/crates/llm-worker-persistence/Cargo.toml +++ b/crates/llm-worker-persistence/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [dependencies] llm-worker = { path = "../llm-worker" } +async-trait = "0.1" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version = "1.49", features = ["fs", "io-util"] } diff --git a/crates/llm-worker-persistence/src/blob_output_processor.rs b/crates/llm-worker-persistence/src/blob_output_processor.rs new file mode 100644 index 00000000..c2415579 --- /dev/null +++ b/crates/llm-worker-persistence/src/blob_output_processor.rs @@ -0,0 +1,47 @@ +//! [`ToolOutputProcessor`] implementation backed by a [`BlobStore`]. +//! +//! Converts large tool output strings into [`ToolOutput::Stored`] and +//! persists the content via a [`BlobStore`], returning a summary with +//! a blob reference for conversation history. + +use crate::blob_store::BlobStore; +use async_trait::async_trait; +use llm_worker::tool::{ToolError, ToolOutput, ToolOutputProcessor}; +use std::sync::Arc; + +/// A [`ToolOutputProcessor`] that stores large outputs in a [`BlobStore`]. +/// +/// Small outputs (≤ `INLINE_THRESHOLD` bytes) pass through unchanged. +/// Large outputs are stored as blobs, and a summary with a `[blob:]` +/// reference replaces the original content in conversation history. +pub struct BlobOutputProcessor { + blob_store: Arc, +} + +impl BlobOutputProcessor { + /// Create a new processor backed by the given blob store. + pub fn new(blob_store: Arc) -> Self { + Self { blob_store } + } +} + +#[async_trait] +impl ToolOutputProcessor for BlobOutputProcessor { + async fn process(&self, output: String) -> Result { + let tool_output = ToolOutput::from(output); + + match tool_output { + ToolOutput::Inline(s) => Ok(s), + ToolOutput::Stored { summary, content } => { + let blob_id = self + .blob_store + .store(&content) + .await + .map_err(|e| ToolError::Internal(format!("blob store error: {e}")))?; + + // Prepend blob reference to the summary + Ok(format!("[blob:{blob_id}] {summary}")) + } + } + } +} diff --git a/crates/llm-worker-persistence/src/blob_store.rs b/crates/llm-worker-persistence/src/blob_store.rs new file mode 100644 index 00000000..48b26f7c --- /dev/null +++ b/crates/llm-worker-persistence/src/blob_store.rs @@ -0,0 +1,54 @@ +//! Blob storage abstraction for large tool outputs. +//! +//! [`BlobStore`] provides async storage and retrieval of [`Content`] blobs, +//! keeping them separate from session logs. Session logs reference blobs +//! by [`BlobId`] in tool result summaries. + +use llm_worker::tool::Content; +use std::future::Future; + +/// Unique blob identifier. UUID v7 (time-ordered). +pub type BlobId = uuid::Uuid; + +/// Generate a new blob ID. +pub fn new_blob_id() -> BlobId { + uuid::Uuid::now_v7() +} + +/// Errors from the blob store. +#[derive(Debug, thiserror::Error)] +pub enum BlobStoreError { + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + + #[error("serialization error: {0}")] + Serde(#[from] serde_json::Error), + + #[error("blob not found: {0}")] + NotFound(BlobId), +} + +/// Async blob storage backend. +/// +/// Stores and retrieves [`Content`] blobs independently of session logs. +/// All methods take `&self` — implementations should use interior mutability +/// when needed. +pub trait BlobStore: Send + Sync { + /// Store content and return its assigned ID. + fn store( + &self, + content: &Content, + ) -> impl Future> + Send; + + /// Load content by ID. + fn load( + &self, + id: BlobId, + ) -> impl Future> + Send; + + /// Check if a blob exists. + fn exists( + &self, + id: BlobId, + ) -> impl Future> + Send; +} diff --git a/crates/llm-worker-persistence/src/fs_blob_store.rs b/crates/llm-worker-persistence/src/fs_blob_store.rs new file mode 100644 index 00000000..6d15c93b --- /dev/null +++ b/crates/llm-worker-persistence/src/fs_blob_store.rs @@ -0,0 +1,83 @@ +//! Filesystem-backed blob store. +//! +//! Layout: +//! - Text blobs: `{root}/{blob_id}.txt` +//! - Structured blobs: `{root}/{blob_id}.json` + +use crate::blob_store::{new_blob_id, BlobId, BlobStore, BlobStoreError}; +use llm_worker::tool::Content; +use std::path::PathBuf; +use tokio::fs; + +/// Filesystem-backed blob store. +/// +/// Each blob is stored as a single file. Text content uses `.txt`, +/// structured (JSON) content uses `.json`. +#[derive(Clone)] +pub struct FsBlobStore { + root: PathBuf, +} + +impl FsBlobStore { + /// Create a new `FsBlobStore` rooted at the given directory. + /// Creates the directory if it does not exist. + pub async fn new(root: impl Into) -> Result { + let root = root.into(); + fs::create_dir_all(&root).await?; + Ok(Self { root }) + } + + fn text_path(&self, id: BlobId) -> PathBuf { + self.root.join(format!("{id}.txt")) + } + + fn json_path(&self, id: BlobId) -> PathBuf { + self.root.join(format!("{id}.json")) + } + + /// Resolve the actual path for a blob, checking both extensions. + fn resolve_path(&self, id: BlobId) -> Option<(PathBuf, bool)> { + let txt = self.text_path(id); + if txt.exists() { + return Some((txt, true)); + } + let json = self.json_path(id); + if json.exists() { + return Some((json, false)); + } + None + } +} + +impl BlobStore for FsBlobStore { + async fn store(&self, content: &Content) -> Result { + let id = new_blob_id(); + match content { + Content::Text(text) => { + fs::write(self.text_path(id), text.as_bytes()).await?; + } + Content::Structured(value) => { + let json = serde_json::to_string_pretty(value)?; + fs::write(self.json_path(id), json.as_bytes()).await?; + } + } + Ok(id) + } + + async fn load(&self, id: BlobId) -> Result { + let (path, is_text) = self + .resolve_path(id) + .ok_or(BlobStoreError::NotFound(id))?; + let bytes = fs::read_to_string(&path).await?; + if is_text { + Ok(Content::Text(bytes)) + } else { + let value = serde_json::from_str(&bytes)?; + Ok(Content::Structured(value)) + } + } + + async fn exists(&self, id: BlobId) -> Result { + Ok(self.resolve_path(id).is_some()) + } +} diff --git a/crates/llm-worker-persistence/src/lib.rs b/crates/llm-worker-persistence/src/lib.rs index 41f727fa..b5a2e67a 100644 --- a/crates/llm-worker-persistence/src/lib.rs +++ b/crates/llm-worker-persistence/src/lib.rs @@ -20,13 +20,19 @@ //! session.run("Hello!").await?; //! ``` +pub mod blob_output_processor; +pub mod blob_store; pub mod event_trace; +pub mod fs_blob_store; pub mod fs_store; pub mod session; pub mod session_log; pub mod store; +pub use blob_output_processor::BlobOutputProcessor; +pub use blob_store::{BlobId, BlobStore, BlobStoreError}; pub use event_trace::TraceEntry; +pub use fs_blob_store::FsBlobStore; pub use fs_store::FsStore; pub use session::{Session, SessionConfig, SessionError}; pub use session_log::{LogEntry, Outcome, RestoredState, collect_state}; diff --git a/crates/llm-worker/docs/tool-output-design.md b/crates/llm-worker/docs/tool-output-design.md new file mode 100644 index 00000000..0473842e --- /dev/null +++ b/crates/llm-worker/docs/tool-output-design.md @@ -0,0 +1,132 @@ +# ツール出力の遅延読み込み設計 + +## 課題 + +ツール実行結果(ファイル内容、検索結果等)は サイズが予測不能 で、 +全量を `Item::ToolResult { output: String }` として LLM コンテキストに +載せると、トークン消費が爆発する。 + +## 方針 + +- ツール出力に **Inline / Stored** の区別を導入する +- Stored な出力は **BlobStore** に保存し、履歴には要約のみ載せる +- LLM が詳細を見たい場合は **inspect ツール** で部分取得する + +## データ型 + +### ToolOutput(llm-worker 側) + +```rust +pub enum ToolOutput { + /// 小さな結果: そのまま history に載る + Inline(String), + /// 大きな結果: summary だけ history に載り、全体は BlobStore に保存される + Stored { + summary: String, + content: Content, + }, +} + +pub enum Content { + Text(String), + Structured(serde_json::Value), +} +``` + +- `Tool::execute()` の戻り値は `Result` のまま据え置き +- `From for ToolOutput` で閾値ベースの自動昇格を行う +- ツール実装者が明示的に `ToolOutput` を返したい場合は別トレイトメソッドを用意 + +### BlobStore(llm-worker-persistence 側) + +```rust +pub type BlobId = uuid::Uuid; // UUID v7 + +pub trait BlobStore: Send + Sync { + fn store(&self, content: &Content) -> impl Future> + Send; + fn load(&self, id: BlobId) -> impl Future> + Send; + fn exists(&self, id: BlobId) -> impl Future> + Send; +} +``` + +### FsBlobStore レイアウト + +``` +blobs/ +├── {blob_id}.txt # Content::Text +└── {blob_id}.json # Content::Structured +``` + +セッションとは独立したフラットなストア。セッションとの紐付けは +ログ側の参照(summary 内の `[blob:]`)で行う。 + +## 自動サマリ + +`From` による自動昇格時のサマリ生成ルール: + +| 項目 | 値 | +|---|---| +| Inline 閾値 | 800 bytes | +| サマリ上限 | 400 bytes | +| 先頭行数 | 5 行 | +| 末尾行数 | 3 行 | + +### Text のサマリ形式 + +``` +[blob:] text | {N} lines +── head ── +{先頭5行} +── tail ── +{末尾3行} +``` + +### Structured (JSON Array) のサマリ形式 + +``` +[blob:] json_array | {N} entries +── schema ── +{最初の要素のキー: 型} +── head ── +{先頭2要素} +``` + +### Structured (JSON Object) のサマリ形式 + +``` +[blob:] json_object | {N} keys +── keys ── +{キー一覧と各値の型/サイズ} +``` + +## Worker への統合 + +``` +Tool::execute() → Result + │ + ▼ From for ToolOutput + ToolOutput::Inline(s) ← len ≤ 800 + ToolOutput::Stored { .. } ← len > 800 + │ + ▼ Worker が BlobStore に保存 + Item::ToolResult { output: summary } ← history に載る + │ + ▼ LLM が詳細を見たい場合 + inspect(blob_id, selector?) → 部分取得 +``` + +Worker はオプショナルに `BlobStore` を保持する。 +BlobStore が未設定の場合は従来通り全量 Inline として扱う。 + +## inspect ツール + +Worker に BlobStore が設定されている場合、自動的に登録される組み込みツール。 + +``` +inspect(blob_id, selector?) +``` + +- selector 省略: メタ情報 + 先頭部分 +- `lines:20-50`: 行範囲(Text 用) +- `slice:3..8`: インデックス範囲(Array 用) +- `key:results`: キー指定(Object 用) diff --git a/crates/llm-worker/src/tool.rs b/crates/llm-worker/src/tool.rs index eeaeeca4..ec1c5d44 100644 --- a/crates/llm-worker/src/tool.rs +++ b/crates/llm-worker/src/tool.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use async_trait::async_trait; +use serde::{Deserialize, Serialize}; use serde_json::Value; use thiserror::Error; @@ -23,6 +24,203 @@ pub enum ToolError { Internal(String), } +// ============================================================================= +// ToolOutput - Tool execution result with size-aware storage +// ============================================================================= + +/// Tool output size threshold in bytes. +/// Results larger than this are automatically promoted to `Stored`. +pub const INLINE_THRESHOLD: usize = 800; + +/// Maximum size of auto-generated summaries in bytes. +pub const SUMMARY_MAX_BYTES: usize = 400; + +/// Number of lines to include from the head of text content in summaries. +pub const SUMMARY_HEAD_LINES: usize = 5; + +/// Number of lines to include from the tail of text content in summaries. +pub const SUMMARY_TAIL_LINES: usize = 3; + +/// Tool execution result. +/// +/// Small results are kept inline in conversation history. +/// Large results are stored externally via `BlobStore`, with only +/// a summary placed in the history. The LLM can retrieve details +/// using the built-in `inspect` tool. +#[derive(Debug, Clone)] +pub enum ToolOutput { + /// Small result: placed directly into history as-is. + Inline(String), + /// Large result: summary goes into history, full content is stored externally. + Stored { + /// Concise summary shown to the LLM in conversation context. + summary: String, + /// Full content to be persisted in a BlobStore. + content: Content, + }, +} + +impl ToolOutput { + /// Get the string that should be placed into conversation history. + pub fn history_text(&self) -> &str { + match self { + ToolOutput::Inline(s) => s, + ToolOutput::Stored { summary, .. } => summary, + } + } + + /// Whether this output requires external storage. + pub fn is_stored(&self) -> bool { + matches!(self, ToolOutput::Stored { .. }) + } +} + +/// Content to be stored in a BlobStore. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", content = "data")] +pub enum Content { + /// Plain text (file contents, search results, logs, etc.) + Text(String), + /// Structured JSON data (API responses, query results, etc.) + Structured(Value), +} + +impl From for ToolOutput { + fn from(s: String) -> Self { + if s.len() <= INLINE_THRESHOLD { + ToolOutput::Inline(s) + } else { + let summary = auto_summarize_text(&s); + ToolOutput::Stored { + summary, + content: Content::Text(s), + } + } + } +} + +/// Generate a summary for any [`Content`] variant. +/// +/// The blob ID prefix (`[blob:]`) is NOT included here — it is +/// prepended by the Worker after the content is stored and an ID is assigned. +pub fn auto_summarize(content: &Content) -> String { + match content { + Content::Text(text) => auto_summarize_text(text), + Content::Structured(value) => auto_summarize_structured(value), + } +} + +/// Generate a summary for plain text content. +fn auto_summarize_text(text: &str) -> String { + let lines: Vec<&str> = text.lines().collect(); + let total = lines.len(); + + let mut summary = format!("text | {total} lines\n"); + + // Head + summary.push_str("── head ──\n"); + for line in lines.iter().take(SUMMARY_HEAD_LINES) { + summary.push_str(line); + summary.push('\n'); + } + + // Tail (only if there's content beyond head) + if total > SUMMARY_HEAD_LINES + SUMMARY_TAIL_LINES { + summary.push_str("── tail ──\n"); + let tail_start = total.saturating_sub(SUMMARY_TAIL_LINES); + for line in &lines[tail_start..] { + summary.push_str(line); + summary.push('\n'); + } + } + + // Truncate if summary itself is too large + if summary.len() > SUMMARY_MAX_BYTES { + summary.truncate(SUMMARY_MAX_BYTES); + summary.push_str("…\n"); + } + + summary +} + +/// Generate a summary for structured JSON content. +fn auto_summarize_structured(value: &Value) -> String { + let mut summary = match value { + Value::Array(arr) => { + let mut s = format!("json_array | {} entries\n", arr.len()); + // Show schema from first element + if let Some(first) = arr.first() { + s.push_str("── schema ──\n"); + s.push_str(&describe_value_shape(first)); + s.push('\n'); + } + // Show first 2 entries + s.push_str("── head ──\n"); + for item in arr.iter().take(2) { + if let Ok(json) = serde_json::to_string(item) { + s.push_str(&json); + s.push('\n'); + } + } + s + } + Value::Object(map) => { + let mut s = format!("json_object | {} keys\n", map.len()); + s.push_str("── keys ──\n"); + for (key, val) in map.iter() { + s.push_str(&format!("{key}: {}\n", value_type_label(val))); + } + s + } + _ => { + // Scalar or other — just show the JSON + format!( + "json | {}\n", + serde_json::to_string(value).unwrap_or_default() + ) + } + }; + + if summary.len() > SUMMARY_MAX_BYTES { + summary.truncate(SUMMARY_MAX_BYTES); + summary.push_str("…\n"); + } + + summary +} + +/// Describe the shape of a JSON value (for schema preview). +fn describe_value_shape(value: &Value) -> String { + match value { + Value::Object(map) => { + let fields: Vec = map + .iter() + .map(|(k, v)| format!("{k}: {}", value_type_label(v))) + .collect(); + format!("{{ {} }}", fields.join(", ")) + } + _ => value_type_label(value), + } +} + +/// Human-readable type label for a JSON value. +fn value_type_label(value: &Value) -> String { + match value { + Value::Null => "null".to_string(), + Value::Bool(_) => "bool".to_string(), + Value::Number(_) => "number".to_string(), + Value::String(s) => { + if s.len() > 50 { + format!("string({})", s.len()) + } else { + "string".to_string() + } + } + Value::Array(arr) => format!("array({})", arr.len()), + Value::Object(map) => format!("object({})", map.len()), + } +} + // ============================================================================= // ToolMeta - Immutable Meta Information // ============================================================================= @@ -152,3 +350,23 @@ pub trait Tool: Send + Sync { /// Result string from execution. This content is returned to LLM. async fn execute(&self, input_json: &str) -> Result; } + +// ============================================================================= +// ToolOutputProcessor - Output storage abstraction +// ============================================================================= + +/// Processes tool output before it enters conversation history. +/// +/// When a tool produces a large result, the processor can store the +/// full content externally and return a summary string for the history. +/// +/// If no processor is set on Worker, all tool outputs are used as-is (inline). +#[async_trait] +pub trait ToolOutputProcessor: Send + Sync { + /// Process a tool's raw output string. + /// + /// Returns the string that should be placed into conversation history. + /// For small outputs, this may be the original string unchanged. + /// For large outputs, this should be a summary with a blob reference. + async fn process(&self, output: String) -> Result; +} diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index c7acb91a..95db2b9e 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -23,7 +23,7 @@ use crate::{ ToolUseBlockSubscriberAdapter, UsageSubscriberAdapter, WorkerSubscriber, }, timeline::{TextBlockCollector, Timeline, ToolCallCollector}, - tool::{ToolDefinition as WorkerToolDefinition, ToolError}, + tool::{ToolDefinition as WorkerToolDefinition, ToolError, ToolOutputProcessor}, tool_server::{ToolServer, ToolServerError, ToolServerHandle}, }; @@ -185,6 +185,8 @@ pub struct Worker { request_config: RequestConfig, /// Whether the previous run was interrupted last_run_interrupted: bool, + /// Optional processor for large tool outputs (stores externally, returns summary) + output_processor: Option>, /// Cancel notification channel (for interrupting execution) cancel_tx: mpsc::Sender<()>, cancel_rx: mpsc::Receiver<()>, @@ -840,6 +842,20 @@ impl Worker { } }; + // Phase 2.5: Apply output processor (store large results externally) + if let Some(ref processor) = self.output_processor { + for tool_result in &mut results { + if !tool_result.is_error { + match processor.process(tool_result.content.clone()).await { + Ok(processed) => tool_result.content = processed, + Err(e) => { + warn!(error = %e, "Output processor failed, keeping original content"); + } + } + } + } + } + // Phase 3: Apply post_tool_call hooks for tool_result in &mut results { // Get saved information @@ -1124,6 +1140,7 @@ impl Worker { turn_notifiers: Vec::new(), request_config: RequestConfig::default(), last_run_interrupted: false, + output_processor: None, cancel_tx, cancel_rx, _state: PhantomData, @@ -1318,6 +1335,14 @@ impl Worker { self.last_run_interrupted = interrupted; } + /// Set a tool output processor for handling large tool results. + /// + /// When set, tool execution results are passed through this processor + /// before being placed into conversation history. + pub fn set_output_processor(&mut self, processor: Arc) { + self.output_processor = Some(processor); + } + /// Apply configuration (reserved for future extensions) #[allow(dead_code)] pub fn config(self, _config: WorkerConfig) -> Self { @@ -1344,6 +1369,7 @@ impl Worker { turn_notifiers: self.turn_notifiers, request_config: self.request_config, last_run_interrupted: self.last_run_interrupted, + output_processor: self.output_processor, cancel_tx: self.cancel_tx, cancel_rx: self.cancel_rx, _state: PhantomData, @@ -1380,6 +1406,7 @@ impl Worker { turn_notifiers: self.turn_notifiers, request_config: self.request_config, last_run_interrupted: self.last_run_interrupted, + output_processor: self.output_processor, cancel_tx: self.cancel_tx, cancel_rx: self.cancel_rx, _state: PhantomData,