From 4fe77b80348ea45bd633c50236836273ef6d82b1 Mon Sep 17 00:00:00 2001 From: Hare Date: Mon, 6 Apr 2026 02:21:41 +0900 Subject: [PATCH] =?UTF-8?q?=E3=83=86=E3=82=B9=E3=83=88=E8=A8=AD=E8=A8=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + Cargo.lock | 2 + TODO.md | 2 + crates/llm-worker-persistence/Cargo.toml | 2 + crates/llm-worker-persistence/src/fs_store.rs | 1 + crates/llm-worker-persistence/src/session.rs | 47 +-- .../tests/common/mod.rs | 50 +++ .../tests/session_test.rs | 335 ++++++++++++++++++ docs/test-fixtures.md | 158 +++++++++ 9 files changed, 570 insertions(+), 28 deletions(-) create mode 100644 crates/llm-worker-persistence/tests/common/mod.rs create mode 100644 crates/llm-worker-persistence/tests/session_test.rs create mode 100644 docs/test-fixtures.md diff --git a/.gitignore b/.gitignore index ce9735c1..5382a4e6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target .direnv *.local +.env diff --git a/Cargo.lock b/Cargo.lock index 8c622cb5..7ccf0182 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -788,6 +788,8 @@ dependencies = [ name = "llm-worker-persistence" version = "0.1.0" dependencies = [ + "async-trait", + "futures", "llm-worker", "serde", "serde_json", diff --git a/TODO.md b/TODO.md index 6d8fc6bc..dcbc4349 100644 --- a/TODO.md +++ b/TODO.md @@ -1 +1,3 @@ - [x] 永続化データ構造の制定 +- [ ] テスト設計 +- [ ] ツール設計 diff --git a/crates/llm-worker-persistence/Cargo.toml b/crates/llm-worker-persistence/Cargo.toml index 39a8d2cf..3128bee2 100644 --- a/crates/llm-worker-persistence/Cargo.toml +++ b/crates/llm-worker-persistence/Cargo.toml @@ -16,3 +16,5 @@ thiserror = "2.0" [dev-dependencies] tokio = { version = "1.49", features = ["macros", "rt-multi-thread", "fs", "io-util"] } tempfile = "3.24" +futures = "0.3" +async-trait = "0.1" diff --git a/crates/llm-worker-persistence/src/fs_store.rs b/crates/llm-worker-persistence/src/fs_store.rs index b46de33b..63e73117 100644 --- a/crates/llm-worker-persistence/src/fs_store.rs +++ b/crates/llm-worker-persistence/src/fs_store.rs @@ -16,6 +16,7 @@ use tokio::io::AsyncWriteExt; /// /// Each session is stored as a single `.jsonl` file with one [`LogEntry`] /// per line. Writes use append mode for crash safety. +#[derive(Clone)] pub struct FsStore { root: PathBuf, } diff --git a/crates/llm-worker-persistence/src/session.rs b/crates/llm-worker-persistence/src/session.rs index fb24d029..7792010d 100644 --- a/crates/llm-worker-persistence/src/session.rs +++ b/crates/llm-worker-persistence/src/session.rs @@ -8,7 +8,6 @@ use crate::session_log::{self, LogEntry, Outcome}; use crate::store::{Store, StoreError}; use crate::SessionId; use llm_worker::llm_client::client::LlmClient; -use llm_worker::llm_client::types::Item; use llm_worker::state::Mutable; use llm_worker::{Worker, WorkerError, WorkerResult}; @@ -120,14 +119,11 @@ impl Session { &mut self, user_input: impl Into, ) -> Result { - let input = user_input.into(); - let user_item = Item::user_message(&input); let history_before = self.worker.history().len(); - let result = self.worker.run(input).await; + let result = self.worker.run(user_input).await; - self.log_history_delta(history_before, Some(&user_item)) - .await?; + self.log_history_delta(history_before).await?; self.log_turn_end().await?; self.log_outcome(&result).await?; @@ -140,7 +136,7 @@ impl Session { let result = self.worker.resume().await; - self.log_history_delta(history_before, None).await?; + self.log_history_delta(history_before).await?; self.log_turn_end().await?; self.log_outcome(&result).await?; @@ -228,11 +224,7 @@ impl Session { // ── Private helpers ────────────────────────────────────────────────── - async fn log_history_delta( - &self, - before_len: usize, - user_item: Option<&Item>, - ) -> Result<(), StoreError> { + async fn log_history_delta(&self, before_len: usize) -> Result<(), StoreError> { let history = self.worker.history(); if history.len() <= before_len { return Ok(()); @@ -242,24 +234,23 @@ impl Session { let new_items = &history[before_len..]; let mut i = 0; - // If we have a user_item, the first new item should be the user input - if let Some(item) = user_item { - self.store - .append( - self.session_id, - &LogEntry::UserInput { - ts, - item: item.clone(), - }, - ) - .await?; - i = 1; - } - - // Classify and group remaining items + // Classify and group items by type. + // The actual items from history are used (not pre-constructed copies), + // so any modifications by hooks (e.g. on_prompt_submit) are captured correctly. while i < new_items.len() { let item = &new_items[i]; - if item.is_tool_result() { + if item.is_user_message() { + self.store + .append( + self.session_id, + &LogEntry::UserInput { + ts, + item: new_items[i].clone(), + }, + ) + .await?; + i += 1; + } else if item.is_tool_result() { let start = i; while i < new_items.len() && new_items[i].is_tool_result() { i += 1; diff --git a/crates/llm-worker-persistence/tests/common/mod.rs b/crates/llm-worker-persistence/tests/common/mod.rs new file mode 100644 index 00000000..1239a067 --- /dev/null +++ b/crates/llm-worker-persistence/tests/common/mod.rs @@ -0,0 +1,50 @@ +#![allow(dead_code)] + +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use async_trait::async_trait; +use futures::Stream; +use llm_worker::llm_client::event::Event; +use llm_worker::llm_client::{ClientError, LlmClient, Request}; + +/// A mock LLM client that replays pre-defined event sequences. +#[derive(Clone)] +pub struct MockLlmClient { + responses: Arc>>, + call_count: Arc, +} + +impl MockLlmClient { + pub fn new(events: Vec) -> Self { + Self::with_responses(vec![events]) + } + + pub fn with_responses(responses: Vec>) -> Self { + Self { + responses: Arc::new(responses), + call_count: Arc::new(AtomicUsize::new(0)), + } + } +} + +#[async_trait] +impl LlmClient for MockLlmClient { + async fn stream( + &self, + _request: Request, + ) -> Result> + Send>>, ClientError> { + let count = self.call_count.fetch_add(1, Ordering::SeqCst); + if count >= self.responses.len() { + return Err(ClientError::Api { + status: Some(500), + code: Some("mock_error".to_string()), + message: "No more mock responses".to_string(), + }); + } + let events = self.responses[count].clone(); + let stream = futures::stream::iter(events.into_iter().map(Ok)); + Ok(Box::pin(stream)) + } +} diff --git a/crates/llm-worker-persistence/tests/session_test.rs b/crates/llm-worker-persistence/tests/session_test.rs new file mode 100644 index 00000000..00ba3346 --- /dev/null +++ b/crates/llm-worker-persistence/tests/session_test.rs @@ -0,0 +1,335 @@ +mod common; + +use std::sync::Arc; + +use async_trait::async_trait; +use common::MockLlmClient; +use llm_worker::hook::{Hook, HookError, OnTurnEnd, OnTurnEndResult}; +use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent}; +use llm_worker::llm_client::types::{Item, RequestConfig}; +use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta}; +use llm_worker::Worker; +use llm_worker_persistence::{ + FsStore, LogEntry, Outcome, Session, SessionConfig, Store, collect_state, +}; + +// ============================================================================= +// Helpers +// ============================================================================= + +fn simple_text_events() -> Vec { + vec![ + Event::text_block_start(0), + Event::text_delta(0, "Hello!"), + Event::text_block_stop(0, None), + Event::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ] +} + +fn tool_call_events() -> Vec> { + vec![ + // 1st response: tool call + vec![ + Event::tool_use_start(0, "call_1", "get_weather"), + Event::tool_input_delta(0, r#"{"city":"Tokyo"}"#), + Event::tool_use_stop(0), + Event::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ], + // 2nd response: final text + vec![ + Event::text_block_start(0), + Event::text_delta(0, "It's sunny in Tokyo!"), + Event::text_block_stop(0, None), + Event::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ], + ] +} + +#[derive(Clone)] +struct MockWeatherTool; + +#[async_trait] +impl Tool for MockWeatherTool { + async fn execute(&self, _input_json: &str) -> Result { + Ok("Sunny, 25C".to_string()) + } +} + +fn weather_tool_definition() -> ToolDefinition { + Arc::new(|| { + let meta = ToolMeta::new("get_weather") + .description("Get weather") + .input_schema(serde_json::json!({ + "type": "object", + "properties": { + "city": { "type": "string" } + }, + "required": ["city"] + })); + (meta, Arc::new(MockWeatherTool) as Arc) + }) +} + +/// Hook that forces Pause on the first turn end. +struct PauseOnFirstTurnEnd; + +#[async_trait] +impl Hook for PauseOnFirstTurnEnd { + async fn call(&self, _input: &mut Vec) -> Result { + Ok(OnTurnEndResult::Paused) + } +} + +async fn make_store() -> (tempfile::TempDir, FsStore) { + let dir = tempfile::tempdir().unwrap(); + let store = FsStore::new(dir.path()).await.unwrap(); + (dir, store) +} + +// ============================================================================= +// Tests +// ============================================================================= + +#[tokio::test] +async fn session_run_logs_entries() { + let (_dir, store) = make_store().await; + let client = MockLlmClient::new(simple_text_events()); + let worker = Worker::new(client); + + let mut session = Session::new(worker, store.clone(), SessionConfig::default()) + .await + .unwrap(); + let sid = session.session_id(); + + session.run("Hi").await.unwrap(); + + let entries = store.read_all(sid).await.unwrap(); + + // SessionStart, UserInput, AssistantItems, TurnEnd, RunOutcome (at minimum) + assert!(entries.len() >= 4, "expected at least 4 entries, got {}", entries.len()); + + // First entry is SessionStart + assert!(matches!(entries[0], LogEntry::SessionStart { .. })); + + // Has a RunOutcome with Finished + let has_finished = entries.iter().any(|e| matches!( + e, + LogEntry::RunOutcome { outcome: Outcome::Finished, .. } + )); + assert!(has_finished, "should have a Finished outcome"); +} + +#[tokio::test] +async fn session_restore_round_trip() { + let (_dir, store) = make_store().await; + let client = MockLlmClient::new(simple_text_events()); + let mut worker = Worker::new(client); + worker.set_system_prompt("You are helpful."); + + let mut session = Session::new(worker, store.clone(), SessionConfig::default()) + .await + .unwrap(); + let sid = session.session_id(); + + session.run("Hi").await.unwrap(); + + let original_history = session.worker.history().to_vec(); + let original_turn_count = session.worker.turn_count(); + + // Restore + let restore_client = MockLlmClient::new(vec![]); // won't be called + let restored = Session::restore(restore_client, store.clone(), sid, SessionConfig::default()) + .await + .unwrap(); + + assert_eq!(restored.worker.history().len(), original_history.len()); + assert_eq!(restored.worker.turn_count(), original_turn_count); + assert_eq!( + restored.worker.get_system_prompt().map(String::from), + Some("You are helpful.".to_string()) + ); +} + +#[tokio::test] +async fn session_run_with_tool_call() { + let (_dir, store) = make_store().await; + let client = MockLlmClient::with_responses(tool_call_events()); + let mut worker = Worker::new(client); + worker.register_tool(weather_tool_definition()).unwrap(); + + let mut session = Session::new(worker, store.clone(), SessionConfig::default()) + .await + .unwrap(); + let sid = session.session_id(); + + session.run("What's the weather?").await.unwrap(); + + let entries = store.read_all(sid).await.unwrap(); + + let has_tool_results = entries.iter().any(|e| matches!(e, LogEntry::ToolResults { .. })); + assert!(has_tool_results, "should have ToolResults entry"); + + let has_assistant = entries.iter().any(|e| matches!(e, LogEntry::AssistantItems { .. })); + assert!(has_assistant, "should have AssistantItems entry"); +} + +#[tokio::test] +async fn session_resume_after_pause() { + let (_dir, store) = make_store().await; + + // First run: tool call with pause hook → Paused + let client = MockLlmClient::with_responses(tool_call_events()); + let mut worker = Worker::new(client); + worker.register_tool(weather_tool_definition()).unwrap(); + worker.add_on_turn_end_hook(PauseOnFirstTurnEnd); + + let mut session = Session::new(worker, store.clone(), SessionConfig::default()) + .await + .unwrap(); + let sid = session.session_id(); + + let result = session.run("Weather?").await.unwrap(); + assert!(matches!(result, llm_worker::WorkerResult::Paused)); + + // Check RunOutcome is Paused + let entries = store.read_all(sid).await.unwrap(); + let has_paused = entries.iter().any(|e| matches!( + e, + LogEntry::RunOutcome { outcome: Outcome::Paused, .. } + )); + assert!(has_paused, "should have Paused outcome"); + + // Restore and resume + let resume_client = MockLlmClient::with_responses(vec![vec![ + Event::text_block_start(0), + Event::text_delta(0, "After resume"), + Event::text_block_stop(0, None), + Event::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ]]); + let mut restored = Session::restore(resume_client, store.clone(), sid, SessionConfig::default()) + .await + .unwrap(); + + assert!(restored.worker.last_run_interrupted()); + + // resume may or may not succeed depending on Worker internal state, + // but the restore itself should work + let _ = restored.resume().await; +} + +#[tokio::test] +async fn session_fork_preserves_state() { + let (_dir, store) = make_store().await; + let client = MockLlmClient::new(simple_text_events()); + let mut worker = Worker::new(client); + worker.set_system_prompt("System prompt"); + + let mut session = Session::new(worker, store.clone(), SessionConfig::default()) + .await + .unwrap(); + + session.run("Hello").await.unwrap(); + + let original_history_len = session.worker.history().len(); + let fork_id = session.fork().await.unwrap(); + + // Fork should have a SessionStart with the current history + let fork_entries = store.read_all(fork_id).await.unwrap(); + assert_eq!(fork_entries.len(), 1); + assert!(matches!(&fork_entries[0], LogEntry::SessionStart { .. })); + + let fork_state = collect_state(&fork_entries); + assert_eq!(fork_state.history.len(), original_history_len); + assert_eq!(fork_state.system_prompt.as_deref(), Some("System prompt")); +} + +#[tokio::test] +async fn session_fork_at_truncates() { + let (_dir, store) = make_store().await; + let client = MockLlmClient::new(simple_text_events()); + let worker = Worker::new(client); + + let mut session = Session::new(worker, store.clone(), SessionConfig::default()) + .await + .unwrap(); + let sid = session.session_id(); + + session.run("Hello").await.unwrap(); + + let all_entries = store.read_all(sid).await.unwrap(); + assert!(all_entries.len() > 2); + + // Fork at entry 2 (SessionStart + UserInput only) + let fork_id = Session::::fork_at(&store, sid, 2) + .await + .unwrap(); + + let fork_entries = store.read_all(fork_id).await.unwrap(); + assert_eq!(fork_entries.len(), 1); // Just the new SessionStart + + let fork_state = collect_state(&fork_entries); + // Should have the state from replaying only the first 2 entries + let original_truncated_state = collect_state(&all_entries[..2]); + assert_eq!(fork_state.history.len(), original_truncated_state.history.len()); +} + +#[tokio::test] +async fn session_config_changed_logged() { + let (_dir, store) = make_store().await; + let client = MockLlmClient::new(vec![]); + let worker = Worker::new(client); + + let mut session = Session::new(worker, store.clone(), SessionConfig::default()) + .await + .unwrap(); + let sid = session.session_id(); + + // Modify config via worker and log it + session.worker.set_request_config(RequestConfig::default().with_temperature(0.7)); + session.log_config_changed().await.unwrap(); + + let entries = store.read_all(sid).await.unwrap(); + let has_config_changed = entries.iter().any(|e| matches!( + e, + LogEntry::ConfigChanged { config, .. } if config.temperature == Some(0.7) + )); + assert!(has_config_changed, "should have ConfigChanged entry"); +} + +#[tokio::test] +async fn session_cache_lock_unlock_logged() { + let (_dir, store) = make_store().await; + let client = MockLlmClient::new(vec![]); + let worker = Worker::new(client); + + let session = Session::new(worker, store.clone(), SessionConfig::default()) + .await + .unwrap(); + let sid = session.session_id(); + + session.log_cache_locked(5).await.unwrap(); + session.log_cache_unlocked().await.unwrap(); + + let entries = store.read_all(sid).await.unwrap(); + + let has_locked = entries.iter().any(|e| matches!( + e, + LogEntry::CacheLocked { locked_prefix_len: 5, .. } + )); + assert!(has_locked, "should have CacheLocked entry"); + + let has_unlocked = entries.iter().any(|e| matches!(e, LogEntry::CacheUnlocked { .. })); + assert!(has_unlocked, "should have CacheUnlocked entry"); + + // State after all entries: unlocked + let state = collect_state(&entries); + assert_eq!(state.locked_prefix_len, 0); +} diff --git a/docs/test-fixtures.md b/docs/test-fixtures.md new file mode 100644 index 00000000..c72f94bf --- /dev/null +++ b/docs/test-fixtures.md @@ -0,0 +1,158 @@ +# テスト Fixture 仕様 + +## 概要 + +テスト用 fixture は、実 API のストリーミング応答を JSONL 形式で録画したファイル。 +`MockLlmClient::from_fixture()` でロードし、API キー不要・決定的なテスト実行を実現する。 + +## ファイル形式 + +``` +{メタデータ行: JSON} +{イベント行: JSON} +{イベント行: JSON} +... +``` + +- **1行目**: メタデータ (`timestamp`, `model`, `description`) +- **2行目以降**: 録画イベント (`elapsed_ms`, `event_type`, `data`) + - `data` フィールドに `Event` の JSON 文字列が入る + +## ファイル配置 + +``` +crates/llm-worker/tests/fixtures/ + anthropic/ + simple_text.jsonl + tool_call.jsonl + long_text.jsonl + openai/ + simple_text.jsonl + tool_call.jsonl + long_text.jsonl + gemini/ + simple_text.jsonl + tool_call.jsonl + long_text.jsonl + ollama/ + simple_text.jsonl + tool_call.jsonl + long_text.jsonl +``` + +## シナリオ定義 + +### simple_text + +単純なテキスト応答。 + +| 項目 | 値 | +|---|---| +| ファイル名 | `simple_text.jsonl` | +| system prompt | `"You are a helpful assistant. Be very concise."` | +| user message | `"Say hello in one word."` | +| max_tokens | 50 | +| ツール | なし | + +**期待パターン**: +- `BlockStart(Text)` が1つ以上 +- `BlockDelta(Text)` が1つ以上 +- `BlockStop(Text)` が1つ以上 +- 応答が短い(1単語程度) + +**用途**: 基本的なストリーミング動作、Timeline テキスト収集、Worker の単純な run 完了 + +### tool_call + +ツール呼び出しを含む応答。 + +| 項目 | 値 | +|---|---| +| ファイル名 | `tool_call.jsonl` | +| system prompt | `"You are a helpful assistant. Use tools when appropriate."` | +| user message | `"What's the weather in Tokyo? Use the get_weather tool."` | +| max_tokens | 200 | +| ツール | `get_weather(city: string)` | + +**期待パターン**: +- `BlockStart(ToolUse)` を含む +- ToolUse ブロック内に `tool_call_id`, `name: "get_weather"` がある +- tool input JSON に `"city"` キーを含む + +**用途**: ToolCallCollector、Worker のツール実行フロー、Session の ToolResults ログ記録 + +### long_text + +長文テキスト応答。 + +| 項目 | 値 | +|---|---| +| ファイル名 | `long_text.jsonl` | +| system prompt | `"You are a creative writer."` | +| user message | `"Write a short story about a robot discovering a garden. It should be at least 300 words."` | +| max_tokens | 1000 | +| ツール | なし | + +**期待パターン**: +- `BlockDelta(Text)` が複数(ストリーミングチャンク) +- 最終テキストが 300 語以上 + +**用途**: ストリーミングの分割配信検証、Subscriber のデルタ受信テスト + +## 共通検証項目 + +全 fixture に対して以下を検証する(`assert_*` ヘルパー関数群): + +- `assert_events_deserialize` — 全イベントが `Event` にデシリアライズできる +- `assert_event_sequence` — BlockStart → BlockDelta → BlockStop の基本シーケンス +- `assert_usage_tokens` — `Usage` イベントが含まれる +- `assert_timeline_integration` — Timeline に流してテキスト収集できる + +## 録画手順 + +### 前提 + +- API キーが環境変数に設定されていること +- `crates/llm-worker` ディレクトリで実行 + +### コマンド + +```bash +# 単一シナリオ録画 +ANTHROPIC_API_KEY=... cargo run --example record_test_fixtures -- -s simple_text + +# 全シナリオ録画 +ANTHROPIC_API_KEY=... cargo run --example record_test_fixtures -- --all + +# プロバイダー指定 +OPENAI_API_KEY=... cargo run --example record_test_fixtures -- --all -c openai +GEMINI_API_KEY=... cargo run --example record_test_fixtures -- --all -c gemini + +# モデル指定 +ANTHROPIC_API_KEY=... cargo run --example record_test_fixtures -- --all -m claude-sonnet-4-20250514 +``` + +### シナリオ定義の場所 + +`crates/llm-worker/examples/record_test_fixtures/scenarios.rs` + +新しいシナリオを追加する場合はこのファイルに `TestScenario` を追加し、 +`scenarios()` 関数の返り値に含める。 + +## 録画後の確認チェックリスト + +録画後、テストに組み込む前に以下を手動確認する: + +- [ ] JSONL の各行が valid JSON か(`jq . < fixture.jsonl` で確認) +- [ ] 1行目にメタデータ(`timestamp`, `model`, `description`)が入っているか +- [ ] simple_text: `BlockStart` → `BlockDelta` → `BlockStop` シーケンスがあるか +- [ ] tool_call: `BlockStart` に `"block_type":"ToolUse"` を含むか +- [ ] long_text: `BlockDelta` が複数行あるか(ストリーミング分割の確認) +- [ ] 各 fixture に `Usage` イベントが含まれるか +- [ ] エラーイベントが混入していないか + +## 注意事項 + +- fixture は API の応答に依存するため、モデルバージョンアップで再録画が必要になることがある +- 録画の自動化(CI での定期録画等)は行わない。手動実行 + 目視確認のフロー +- fixture が存在しない場合、対応するテストは skip される(`if !fixture_path.exists() { return; }` パターン)