Tool Resultのファイル分離・事後参照可能にする実装

This commit is contained in:
Keisuke Hirata 2026-04-06 04:28:40 +09:00
parent 4fe77b8034
commit ac5d352f31
9 changed files with 571 additions and 1 deletions

View File

@ -1,3 +1,5 @@
- [x] 永続化データ構造の制定
- [ ] テスト設計
- [x] ツール出力の遅延読み込み設計 (ToolOutput / BlobStore / auto_summarize)
- [ ] ツール設計
- [ ] inspect ツール実装

View File

@ -7,6 +7,7 @@ license.workspace = true
[dependencies]
llm-worker = { path = "../llm-worker" }
async-trait = "0.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.49", features = ["fs", "io-util"] }

View File

@ -0,0 +1,47 @@
//! [`ToolOutputProcessor`] implementation backed by a [`BlobStore`].
//!
//! Converts large tool output strings into [`ToolOutput::Stored`] and
//! persists the content via a [`BlobStore`], returning a summary with
//! a blob reference for conversation history.
use crate::blob_store::BlobStore;
use async_trait::async_trait;
use llm_worker::tool::{ToolError, ToolOutput, ToolOutputProcessor};
use std::sync::Arc;
/// A [`ToolOutputProcessor`] that stores large outputs in a [`BlobStore`].
///
/// Small outputs (≤ `INLINE_THRESHOLD` bytes) pass through unchanged.
/// Large outputs are stored as blobs, and a summary with a `[blob:<id>]`
/// reference replaces the original content in conversation history.
pub struct BlobOutputProcessor<B: BlobStore> {
blob_store: Arc<B>,
}
impl<B: BlobStore> BlobOutputProcessor<B> {
/// Create a new processor backed by the given blob store.
pub fn new(blob_store: Arc<B>) -> Self {
Self { blob_store }
}
}
#[async_trait]
impl<B: BlobStore + 'static> ToolOutputProcessor for BlobOutputProcessor<B> {
async fn process(&self, output: String) -> Result<String, ToolError> {
let tool_output = ToolOutput::from(output);
match tool_output {
ToolOutput::Inline(s) => Ok(s),
ToolOutput::Stored { summary, content } => {
let blob_id = self
.blob_store
.store(&content)
.await
.map_err(|e| ToolError::Internal(format!("blob store error: {e}")))?;
// Prepend blob reference to the summary
Ok(format!("[blob:{blob_id}] {summary}"))
}
}
}
}

View File

@ -0,0 +1,54 @@
//! Blob storage abstraction for large tool outputs.
//!
//! [`BlobStore`] provides async storage and retrieval of [`Content`] blobs,
//! keeping them separate from session logs. Session logs reference blobs
//! by [`BlobId`] in tool result summaries.
use llm_worker::tool::Content;
use std::future::Future;
/// Unique blob identifier. UUID v7 (time-ordered).
pub type BlobId = uuid::Uuid;
/// Generate a new blob ID.
pub fn new_blob_id() -> BlobId {
uuid::Uuid::now_v7()
}
/// Errors from the blob store.
#[derive(Debug, thiserror::Error)]
pub enum BlobStoreError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("serialization error: {0}")]
Serde(#[from] serde_json::Error),
#[error("blob not found: {0}")]
NotFound(BlobId),
}
/// Async blob storage backend.
///
/// Stores and retrieves [`Content`] blobs independently of session logs.
/// All methods take `&self` — implementations should use interior mutability
/// when needed.
pub trait BlobStore: Send + Sync {
/// Store content and return its assigned ID.
fn store(
&self,
content: &Content,
) -> impl Future<Output = Result<BlobId, BlobStoreError>> + Send;
/// Load content by ID.
fn load(
&self,
id: BlobId,
) -> impl Future<Output = Result<Content, BlobStoreError>> + Send;
/// Check if a blob exists.
fn exists(
&self,
id: BlobId,
) -> impl Future<Output = Result<bool, BlobStoreError>> + Send;
}

View File

@ -0,0 +1,83 @@
//! Filesystem-backed blob store.
//!
//! Layout:
//! - Text blobs: `{root}/{blob_id}.txt`
//! - Structured blobs: `{root}/{blob_id}.json`
use crate::blob_store::{new_blob_id, BlobId, BlobStore, BlobStoreError};
use llm_worker::tool::Content;
use std::path::PathBuf;
use tokio::fs;
/// Filesystem-backed blob store.
///
/// Each blob is stored as a single file. Text content uses `.txt`,
/// structured (JSON) content uses `.json`.
#[derive(Clone)]
pub struct FsBlobStore {
root: PathBuf,
}
impl FsBlobStore {
/// Create a new `FsBlobStore` rooted at the given directory.
/// Creates the directory if it does not exist.
pub async fn new(root: impl Into<PathBuf>) -> Result<Self, BlobStoreError> {
let root = root.into();
fs::create_dir_all(&root).await?;
Ok(Self { root })
}
fn text_path(&self, id: BlobId) -> PathBuf {
self.root.join(format!("{id}.txt"))
}
fn json_path(&self, id: BlobId) -> PathBuf {
self.root.join(format!("{id}.json"))
}
/// Resolve the actual path for a blob, checking both extensions.
fn resolve_path(&self, id: BlobId) -> Option<(PathBuf, bool)> {
let txt = self.text_path(id);
if txt.exists() {
return Some((txt, true));
}
let json = self.json_path(id);
if json.exists() {
return Some((json, false));
}
None
}
}
impl BlobStore for FsBlobStore {
async fn store(&self, content: &Content) -> Result<BlobId, BlobStoreError> {
let id = new_blob_id();
match content {
Content::Text(text) => {
fs::write(self.text_path(id), text.as_bytes()).await?;
}
Content::Structured(value) => {
let json = serde_json::to_string_pretty(value)?;
fs::write(self.json_path(id), json.as_bytes()).await?;
}
}
Ok(id)
}
async fn load(&self, id: BlobId) -> Result<Content, BlobStoreError> {
let (path, is_text) = self
.resolve_path(id)
.ok_or(BlobStoreError::NotFound(id))?;
let bytes = fs::read_to_string(&path).await?;
if is_text {
Ok(Content::Text(bytes))
} else {
let value = serde_json::from_str(&bytes)?;
Ok(Content::Structured(value))
}
}
async fn exists(&self, id: BlobId) -> Result<bool, BlobStoreError> {
Ok(self.resolve_path(id).is_some())
}
}

View File

@ -20,13 +20,19 @@
//! session.run("Hello!").await?;
//! ```
pub mod blob_output_processor;
pub mod blob_store;
pub mod event_trace;
pub mod fs_blob_store;
pub mod fs_store;
pub mod session;
pub mod session_log;
pub mod store;
pub use blob_output_processor::BlobOutputProcessor;
pub use blob_store::{BlobId, BlobStore, BlobStoreError};
pub use event_trace::TraceEntry;
pub use fs_blob_store::FsBlobStore;
pub use fs_store::FsStore;
pub use session::{Session, SessionConfig, SessionError};
pub use session_log::{LogEntry, Outcome, RestoredState, collect_state};

View File

@ -0,0 +1,132 @@
# ツール出力の遅延読み込み設計
## 課題
ツール実行結果(ファイル内容、検索結果等)は サイズが予測不能 で、
全量を `Item::ToolResult { output: String }` として LLM コンテキストに
載せると、トークン消費が爆発する。
## 方針
- ツール出力に **Inline / Stored** の区別を導入する
- Stored な出力は **BlobStore** に保存し、履歴には要約のみ載せる
- LLM が詳細を見たい場合は **inspect ツール** で部分取得する
## データ型
### ToolOutputllm-worker 側)
```rust
pub enum ToolOutput {
/// 小さな結果: そのまま history に載る
Inline(String),
/// 大きな結果: summary だけ history に載り、全体は BlobStore に保存される
Stored {
summary: String,
content: Content,
},
}
pub enum Content {
Text(String),
Structured(serde_json::Value),
}
```
- `Tool::execute()` の戻り値は `Result<String, ToolError>` のまま据え置き
- `From<String> for ToolOutput` で閾値ベースの自動昇格を行う
- ツール実装者が明示的に `ToolOutput` を返したい場合は別トレイトメソッドを用意
### BlobStorellm-worker-persistence 側)
```rust
pub type BlobId = uuid::Uuid; // UUID v7
pub trait BlobStore: Send + Sync {
fn store(&self, content: &Content) -> impl Future<Output = Result<BlobId, BlobStoreError>> + Send;
fn load(&self, id: BlobId) -> impl Future<Output = Result<Content, BlobStoreError>> + Send;
fn exists(&self, id: BlobId) -> impl Future<Output = Result<bool, BlobStoreError>> + Send;
}
```
### FsBlobStore レイアウト
```
blobs/
├── {blob_id}.txt # Content::Text
└── {blob_id}.json # Content::Structured
```
セッションとは独立したフラットなストア。セッションとの紐付けは
ログ側の参照summary 内の `[blob:<id>]`)で行う。
## 自動サマリ
`From<String>` による自動昇格時のサマリ生成ルール:
| 項目 | 値 |
|---|---|
| Inline 閾値 | 800 bytes |
| サマリ上限 | 400 bytes |
| 先頭行数 | 5 行 |
| 末尾行数 | 3 行 |
### Text のサマリ形式
```
[blob:<id>] text | {N} lines
── head ──
{先頭5行}
── tail ──
{末尾3行}
```
### Structured (JSON Array) のサマリ形式
```
[blob:<id>] json_array | {N} entries
── schema ──
{最初の要素のキー: 型}
── head ──
{先頭2要素}
```
### Structured (JSON Object) のサマリ形式
```
[blob:<id>] json_object | {N} keys
── keys ──
{キー一覧と各値の型/サイズ}
```
## Worker への統合
```
Tool::execute() → Result<String, ToolError>
▼ From<String> for ToolOutput
ToolOutput::Inline(s) ← len ≤ 800
ToolOutput::Stored { .. } ← len > 800
▼ Worker が BlobStore に保存
Item::ToolResult { output: summary } ← history に載る
▼ LLM が詳細を見たい場合
inspect(blob_id, selector?) → 部分取得
```
Worker はオプショナルに `BlobStore` を保持する。
BlobStore が未設定の場合は従来通り全量 Inline として扱う。
## inspect ツール
Worker に BlobStore が設定されている場合、自動的に登録される組み込みツール。
```
inspect(blob_id, selector?)
```
- selector 省略: メタ情報 + 先頭部分
- `lines:20-50`: 行範囲Text 用)
- `slice:3..8`: インデックス範囲Array 用)
- `key:results`: キー指定Object 用)

View File

@ -6,6 +6,7 @@
use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;
@ -23,6 +24,203 @@ pub enum ToolError {
Internal(String),
}
// =============================================================================
// ToolOutput - Tool execution result with size-aware storage
// =============================================================================
/// Tool output size threshold in bytes.
/// Results larger than this are automatically promoted to `Stored`.
pub const INLINE_THRESHOLD: usize = 800;
/// Maximum size of auto-generated summaries in bytes.
pub const SUMMARY_MAX_BYTES: usize = 400;
/// Number of lines to include from the head of text content in summaries.
pub const SUMMARY_HEAD_LINES: usize = 5;
/// Number of lines to include from the tail of text content in summaries.
pub const SUMMARY_TAIL_LINES: usize = 3;
/// Tool execution result.
///
/// Small results are kept inline in conversation history.
/// Large results are stored externally via `BlobStore`, with only
/// a summary placed in the history. The LLM can retrieve details
/// using the built-in `inspect` tool.
#[derive(Debug, Clone)]
pub enum ToolOutput {
/// Small result: placed directly into history as-is.
Inline(String),
/// Large result: summary goes into history, full content is stored externally.
Stored {
/// Concise summary shown to the LLM in conversation context.
summary: String,
/// Full content to be persisted in a BlobStore.
content: Content,
},
}
impl ToolOutput {
/// Get the string that should be placed into conversation history.
pub fn history_text(&self) -> &str {
match self {
ToolOutput::Inline(s) => s,
ToolOutput::Stored { summary, .. } => summary,
}
}
/// Whether this output requires external storage.
pub fn is_stored(&self) -> bool {
matches!(self, ToolOutput::Stored { .. })
}
}
/// Content to be stored in a BlobStore.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum Content {
/// Plain text (file contents, search results, logs, etc.)
Text(String),
/// Structured JSON data (API responses, query results, etc.)
Structured(Value),
}
impl From<String> for ToolOutput {
fn from(s: String) -> Self {
if s.len() <= INLINE_THRESHOLD {
ToolOutput::Inline(s)
} else {
let summary = auto_summarize_text(&s);
ToolOutput::Stored {
summary,
content: Content::Text(s),
}
}
}
}
/// Generate a summary for any [`Content`] variant.
///
/// The blob ID prefix (`[blob:<id>]`) is NOT included here — it is
/// prepended by the Worker after the content is stored and an ID is assigned.
pub fn auto_summarize(content: &Content) -> String {
match content {
Content::Text(text) => auto_summarize_text(text),
Content::Structured(value) => auto_summarize_structured(value),
}
}
/// Generate a summary for plain text content.
fn auto_summarize_text(text: &str) -> String {
let lines: Vec<&str> = text.lines().collect();
let total = lines.len();
let mut summary = format!("text | {total} lines\n");
// Head
summary.push_str("── head ──\n");
for line in lines.iter().take(SUMMARY_HEAD_LINES) {
summary.push_str(line);
summary.push('\n');
}
// Tail (only if there's content beyond head)
if total > SUMMARY_HEAD_LINES + SUMMARY_TAIL_LINES {
summary.push_str("── tail ──\n");
let tail_start = total.saturating_sub(SUMMARY_TAIL_LINES);
for line in &lines[tail_start..] {
summary.push_str(line);
summary.push('\n');
}
}
// Truncate if summary itself is too large
if summary.len() > SUMMARY_MAX_BYTES {
summary.truncate(SUMMARY_MAX_BYTES);
summary.push_str("\n");
}
summary
}
/// Generate a summary for structured JSON content.
fn auto_summarize_structured(value: &Value) -> String {
let mut summary = match value {
Value::Array(arr) => {
let mut s = format!("json_array | {} entries\n", arr.len());
// Show schema from first element
if let Some(first) = arr.first() {
s.push_str("── schema ──\n");
s.push_str(&describe_value_shape(first));
s.push('\n');
}
// Show first 2 entries
s.push_str("── head ──\n");
for item in arr.iter().take(2) {
if let Ok(json) = serde_json::to_string(item) {
s.push_str(&json);
s.push('\n');
}
}
s
}
Value::Object(map) => {
let mut s = format!("json_object | {} keys\n", map.len());
s.push_str("── keys ──\n");
for (key, val) in map.iter() {
s.push_str(&format!("{key}: {}\n", value_type_label(val)));
}
s
}
_ => {
// Scalar or other — just show the JSON
format!(
"json | {}\n",
serde_json::to_string(value).unwrap_or_default()
)
}
};
if summary.len() > SUMMARY_MAX_BYTES {
summary.truncate(SUMMARY_MAX_BYTES);
summary.push_str("\n");
}
summary
}
/// Describe the shape of a JSON value (for schema preview).
fn describe_value_shape(value: &Value) -> String {
match value {
Value::Object(map) => {
let fields: Vec<String> = map
.iter()
.map(|(k, v)| format!("{k}: {}", value_type_label(v)))
.collect();
format!("{{ {} }}", fields.join(", "))
}
_ => value_type_label(value),
}
}
/// Human-readable type label for a JSON value.
fn value_type_label(value: &Value) -> String {
match value {
Value::Null => "null".to_string(),
Value::Bool(_) => "bool".to_string(),
Value::Number(_) => "number".to_string(),
Value::String(s) => {
if s.len() > 50 {
format!("string({})", s.len())
} else {
"string".to_string()
}
}
Value::Array(arr) => format!("array({})", arr.len()),
Value::Object(map) => format!("object({})", map.len()),
}
}
// =============================================================================
// ToolMeta - Immutable Meta Information
// =============================================================================
@ -152,3 +350,23 @@ pub trait Tool: Send + Sync {
/// Result string from execution. This content is returned to LLM.
async fn execute(&self, input_json: &str) -> Result<String, ToolError>;
}
// =============================================================================
// ToolOutputProcessor - Output storage abstraction
// =============================================================================
/// Processes tool output before it enters conversation history.
///
/// When a tool produces a large result, the processor can store the
/// full content externally and return a summary string for the history.
///
/// If no processor is set on Worker, all tool outputs are used as-is (inline).
#[async_trait]
pub trait ToolOutputProcessor: Send + Sync {
/// Process a tool's raw output string.
///
/// Returns the string that should be placed into conversation history.
/// For small outputs, this may be the original string unchanged.
/// For large outputs, this should be a summary with a blob reference.
async fn process(&self, output: String) -> Result<String, ToolError>;
}

View File

@ -23,7 +23,7 @@ use crate::{
ToolUseBlockSubscriberAdapter, UsageSubscriberAdapter, WorkerSubscriber,
},
timeline::{TextBlockCollector, Timeline, ToolCallCollector},
tool::{ToolDefinition as WorkerToolDefinition, ToolError},
tool::{ToolDefinition as WorkerToolDefinition, ToolError, ToolOutputProcessor},
tool_server::{ToolServer, ToolServerError, ToolServerHandle},
};
@ -185,6 +185,8 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
request_config: RequestConfig,
/// Whether the previous run was interrupted
last_run_interrupted: bool,
/// Optional processor for large tool outputs (stores externally, returns summary)
output_processor: Option<Arc<dyn ToolOutputProcessor>>,
/// Cancel notification channel (for interrupting execution)
cancel_tx: mpsc::Sender<()>,
cancel_rx: mpsc::Receiver<()>,
@ -840,6 +842,20 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
}
};
// Phase 2.5: Apply output processor (store large results externally)
if let Some(ref processor) = self.output_processor {
for tool_result in &mut results {
if !tool_result.is_error {
match processor.process(tool_result.content.clone()).await {
Ok(processed) => tool_result.content = processed,
Err(e) => {
warn!(error = %e, "Output processor failed, keeping original content");
}
}
}
}
}
// Phase 3: Apply post_tool_call hooks
for tool_result in &mut results {
// Get saved information
@ -1124,6 +1140,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
turn_notifiers: Vec::new(),
request_config: RequestConfig::default(),
last_run_interrupted: false,
output_processor: None,
cancel_tx,
cancel_rx,
_state: PhantomData,
@ -1318,6 +1335,14 @@ impl<C: LlmClient> Worker<C, Mutable> {
self.last_run_interrupted = interrupted;
}
/// Set a tool output processor for handling large tool results.
///
/// When set, tool execution results are passed through this processor
/// before being placed into conversation history.
pub fn set_output_processor(&mut self, processor: Arc<dyn ToolOutputProcessor>) {
self.output_processor = Some(processor);
}
/// Apply configuration (reserved for future extensions)
#[allow(dead_code)]
pub fn config(self, _config: WorkerConfig) -> Self {
@ -1344,6 +1369,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
turn_notifiers: self.turn_notifiers,
request_config: self.request_config,
last_run_interrupted: self.last_run_interrupted,
output_processor: self.output_processor,
cancel_tx: self.cancel_tx,
cancel_rx: self.cancel_rx,
_state: PhantomData,
@ -1380,6 +1406,7 @@ impl<C: LlmClient> Worker<C, CacheLocked> {
turn_notifiers: self.turn_notifiers,
request_config: self.request_config,
last_run_interrupted: self.last_run_interrupted,
output_processor: self.output_processor,
cancel_tx: self.cancel_tx,
cancel_rx: self.cancel_rx,
_state: PhantomData,