セッション関連の責務の分離

This commit is contained in:
Keisuke Hirata 2026-04-28 15:43:34 +09:00
parent e49fb3f1a0
commit 6fe19b84ce
16 changed files with 449 additions and 436 deletions

View File

@ -1,2 +1,2 @@
[memory]
extract_threshold = 1000
extract_threshold = 4000

View File

@ -48,12 +48,15 @@ pub mod llm_client;
pub mod prune;
pub mod state;
pub mod timeline;
pub mod token_counter;
pub mod tool;
pub mod tool_server;
pub mod usage_record;
pub use callback::{TextBlockScope, ToolUseBlockScope};
pub use handler::ToolUseBlockStart;
pub use interceptor::Interceptor;
pub use message::{ContentPart, Item, Message, Role};
pub use tool::{ToolCall, ToolOutputLimits, ToolResult};
pub use usage_record::UsageRecord;
pub use worker::{RunOutput, ToolRegistryError, Worker, WorkerConfig, WorkerError, WorkerResult};

View File

@ -0,0 +1,222 @@
//! Usage 履歴ベースのトークン会計(汎用部分)。
//!
//! `UsageRecord` の列(プロバイダ実測値)と現在の history から、
//! 任意の history index 時点のプロンプト全長トークン数を pure に計算する。
//!
//! # 方針
//!
//! - ローカルトークナイザは持たない。実測値があればそれを採用し、
//! measurement 間はバイト数で按分、最新 measurement より先は最終 rate で外挿する
//! - 推定の出どころは [`EstimateSource`] で呼び出し側に明示する。
//! 課金判断には使えないが、compact / prune / memory phase 1 trigger 等の
//! 閾値判定には十分な精度
//! - `records` は `history_len` 昇順を仮定する(呼び出し側がそのように積む)
use crate::{Item, UsageRecord};
/// 推定の出どころ。
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EstimateSource {
/// measurement の境界にちょうど一致(実測値そのもの)
Measured,
/// 連続する 2 つの measurement の間をバイト按分で計算
Interpolated,
/// 最後の measurement より新しい区間を最終 rate で外挿
Extrapolated,
/// measurement が 1 件も無く、バイト数のみのフォールバック
NoData,
}
/// トークン数の推定値。
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TokenEstimate {
pub tokens: u64,
pub source: EstimateSource,
}
/// `items[..i]` までの累積バイト数(`prefix[i]`)を返す。長さは `items.len()+1`。
pub fn prefix_bytes(items: &[Item]) -> Vec<u64> {
let mut prefix = Vec::with_capacity(items.len() + 1);
let mut acc: u64 = 0;
prefix.push(0);
for item in items {
acc = acc.saturating_add(item_bytes(item));
prefix.push(acc);
}
prefix
}
/// 1 Item の大きさ。JSON シリアライズ長を使う粗い近似。
/// トークン数との絶対変換ではなく区間の按分にしか使わないので、
/// プロバイダごとの overhead は比率でキャンセルされる。
pub fn item_bytes(item: &Item) -> u64 {
serde_json::to_string(item)
.map(|s| s.len() as u64)
.unwrap_or(0)
}
/// `history[..index]` までのトークン数を推定する。
///
/// `prefix` は [`prefix_bytes`] で得た `history.len() + 1` 長の累積バイト列。
/// 呼び出し側が 1 度だけ計算して使い回すことで、線形探索や複数回の推定が
/// O(n) シリアライズで済む(内部で毎回再計算すると O(n²) になる)。
pub fn tokens_at(
history: &[Item],
records: &[UsageRecord],
index: usize,
prefix: &[u64],
) -> TokenEstimate {
debug_assert!(index <= history.len());
debug_assert_eq!(prefix.len(), history.len() + 1);
if index == 0 {
return TokenEstimate {
tokens: 0,
source: EstimateSource::Measured,
};
}
if records.is_empty() {
return TokenEstimate {
tokens: prefix[index] / 4,
source: EstimateSource::NoData,
};
}
// exact matchrev 走査で一番新しい record を採用)
if let Some(r) = records.iter().rev().find(|r| r.history_len == index) {
return TokenEstimate {
tokens: r.input_total_tokens,
source: EstimateSource::Measured,
};
}
let lower = records.iter().rev().find(|r| r.history_len < index);
let upper = records.iter().find(|r| r.history_len > index);
let cap = history.len();
match (lower, upper) {
(Some(lo), Some(up)) => {
let lo_bytes = prefix[lo.history_len.min(cap)];
let up_bytes = prefix[up.history_len.min(cap)];
let at_bytes = prefix[index];
let span_bytes = up_bytes.saturating_sub(lo_bytes);
let span_tokens = up.input_total_tokens.saturating_sub(lo.input_total_tokens);
if span_bytes == 0 || span_tokens == 0 {
return TokenEstimate {
tokens: lo.input_total_tokens,
source: EstimateSource::Interpolated,
};
}
let delta_bytes = at_bytes.saturating_sub(lo_bytes);
let delta_tokens =
(delta_bytes as u128 * span_tokens as u128 / span_bytes as u128) as u64;
TokenEstimate {
tokens: lo.input_total_tokens + delta_tokens,
source: EstimateSource::Interpolated,
}
}
(Some(lo), None) => {
let lo_bytes = prefix[lo.history_len.min(cap)];
let at_bytes = prefix[index];
if lo_bytes == 0 || lo.input_total_tokens == 0 {
return TokenEstimate {
tokens: lo.input_total_tokens,
source: EstimateSource::Extrapolated,
};
}
let delta_bytes = at_bytes.saturating_sub(lo_bytes);
let delta_tokens =
(delta_bytes as u128 * lo.input_total_tokens as u128 / lo_bytes as u128) as u64;
TokenEstimate {
tokens: lo.input_total_tokens + delta_tokens,
source: EstimateSource::Extrapolated,
}
}
(None, Some(up)) => {
let up_bytes = prefix[up.history_len.min(cap)];
let at_bytes = prefix[index];
if up_bytes == 0 {
return TokenEstimate {
tokens: 0,
source: EstimateSource::Interpolated,
};
}
let t = (at_bytes as u128 * up.input_total_tokens as u128 / up_bytes as u128) as u64;
TokenEstimate {
tokens: t,
source: EstimateSource::Interpolated,
}
}
(None, None) => unreachable!("records non-empty but neither lower nor upper matched"),
}
}
/// 現在の history 全体の推定トークン数。
pub fn total_tokens(history: &[Item], records: &[UsageRecord]) -> TokenEstimate {
let prefix = prefix_bytes(history);
tokens_at(history, records, history.len(), &prefix)
}
/// 任意の history index 時点でのプロンプト全長推定。
/// `history_len == 0` で 0 を返す。delta 計算 (extract trigger 等) で
/// `total_tokens_at(now) - total_tokens_at(pointer)` の形で使う。
pub fn total_tokens_at(
history: &[Item],
records: &[UsageRecord],
history_len: usize,
) -> TokenEstimate {
let prefix = prefix_bytes(history);
tokens_at(history, records, history_len.min(history.len()), &prefix)
}
#[cfg(test)]
mod tests {
use super::*;
fn msg(text: &str) -> Item {
Item::user_message(text)
}
fn record(history_len: usize, tokens: u64) -> UsageRecord {
UsageRecord {
history_len,
input_total_tokens: tokens,
cache_read_tokens: 0,
cache_write_tokens: 0,
output_tokens: 0,
}
}
#[test]
fn total_no_data_falls_back_to_byte_estimate() {
let history = vec![msg("hello world")];
let est = total_tokens(&history, &[]);
assert_eq!(est.source, EstimateSource::NoData);
assert!(est.tokens > 0);
}
#[test]
fn total_measured_when_last_record_matches_history_len() {
let history = vec![msg("a"), msg("b"), msg("c")];
let records = vec![record(3, 120)];
let est = total_tokens(&history, &records);
assert_eq!(est.source, EstimateSource::Measured);
assert_eq!(est.tokens, 120);
}
#[test]
fn total_extrapolated_when_history_grew_past_last_measurement() {
let history = vec![msg("a"), msg("b"), msg("c"), msg("d")];
let records = vec![record(3, 100)];
let est = total_tokens(&history, &records);
assert_eq!(est.source, EstimateSource::Extrapolated);
assert!(est.tokens > 100);
}
#[test]
fn total_zero_history_is_zero() {
let est = total_tokens(&[], &[]);
assert_eq!(est.tokens, 0);
}
}

View File

@ -0,0 +1,22 @@
//! Per-LLM-request Usage measurement snapshot.
//!
//! 1 リクエストの送信時点での「ある history prefix 長で計測した占有量」を
//! 1 件分にまとめたもの。`UsageEvent` (provider stream イベント) を
//! 受けて呼び出し側 (typically Pod) が組み立て、永続化層
//! (session-store) に流したり、token accounting (`token_counter`) で
//! 履歴として参照したりする。
/// LLM リクエスト送信時点での占有量スナップショット。
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UsageRecord {
/// 送信時の history.len()
pub history_len: usize,
/// history[..history_len] の占有量(プロンプト全長、実測)
pub input_total_tokens: u64,
/// 上記のうちキャッシュから読み出された分
pub cache_read_tokens: u64,
/// 上記のうちこのリクエストでキャッシュに書かれた分
pub cache_write_tokens: u64,
/// このリクエストで生成された出力トークン数
pub output_tokens: u64,
}

View File

@ -66,7 +66,8 @@ pub struct WorkerConfig {
}
/// Worker execution result (status)
#[derive(Debug)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum WorkerResult {
/// Completed (waiting for user input)
Finished,

View File

@ -1,47 +1,27 @@
//! Usage 履歴ベースのトークン会計
//! Compact / prune 専用のトークン会計補助
//!
//! `UsageRecord` の列(プロバイダ実測値)と現在の history から、
//! 「末尾 N トークン残すための split 位置」「prune 射影で節約される
//! トークン数」などを pure に計算する。
//! 汎用部分(`prefix_bytes`, `tokens_at`, `total_tokens`, `total_tokens_at`)は
//! [`llm_worker::token_counter`] にあり、`UsageRecord` の列と現在の history から
//! pure に推定する。本モジュールは compact / prune 固有のロジック
//! `split_for_retained`, `savings_for_prune`と、Pod 上の公開 API に
//! 限定する。
//!
//! # 方針
//!
//! - ローカルトークナイザは持たない。実測値があればそれを採用し、
//! measurement 間はバイト数で按分、最新 measurement より先は最終 rate で外挿する
//! - 推定の出どころは [`EstimateSource`] で呼び出し側に明示する。
//! 課金判断には使えないが、compact/prune の閾値判定には十分な精度
//! - `records` は `history_len` 昇順を仮定する(`collect_state` と
//! `UsageTracker` がそのように積む)
//!
//! 公開 API は本ファイル内の `impl Pod` で [`Pod`](crate::Pod) のメソッドとして
//! 生やしている。pure な補助関数はこのモジュール内に private に閉じる。
//! 課金判断には使えないが、compact / prune の閾値判定には十分な精度
use llm_worker::Item;
use llm_worker::llm_client::client::LlmClient;
use session_store::{Store, UsageRecord};
use llm_worker::token_counter::{item_bytes, prefix_bytes, tokens_at};
use llm_worker::{Item, UsageRecord};
use session_store::Store;
pub use llm_worker::token_counter::{EstimateSource, TokenEstimate};
use crate::Pod;
/// 推定の出どころ。
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EstimateSource {
/// measurement の境界にちょうど一致(実測値そのもの)
Measured,
/// 連続する 2 つの measurement の間をバイト按分で計算
Interpolated,
/// 最後の measurement より新しい区間を最終 rate で外挿
Extrapolated,
/// measurement が 1 件も無く、バイト数のみのフォールバック
NoData,
}
/// トークン数の推定値。
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TokenEstimate {
pub tokens: u64,
pub source: EstimateSource,
}
/// history を分割する位置。
///
/// `items[..index]` が捨てる/要約される側、`items[index..]` が残る側。
@ -51,141 +31,6 @@ pub struct SplitPoint {
pub source: EstimateSource,
}
/// `items[..i]` までの累積バイト数(`prefix[i]`)を返す。長さは `items.len()+1`。
fn prefix_bytes(items: &[Item]) -> Vec<u64> {
let mut prefix = Vec::with_capacity(items.len() + 1);
let mut acc: u64 = 0;
prefix.push(0);
for item in items {
acc = acc.saturating_add(item_bytes(item));
prefix.push(acc);
}
prefix
}
/// 1 Item の大きさ。JSON シリアライズ長を使う粗い近似。
/// トークン数との絶対変換ではなく区間の按分にしか使わないので、
/// プロバイダごとの overhead は比率でキャンセルされる。
fn item_bytes(item: &Item) -> u64 {
serde_json::to_string(item)
.map(|s| s.len() as u64)
.unwrap_or(0)
}
/// `history[..index]` までのトークン数を推定する。
///
/// `prefix` は [`prefix_bytes`] で得た `history.len() + 1` 長の累積バイト列。
/// 呼び出し側が 1 度だけ計算して使い回すことで、線形探索や複数回の推定が
/// O(n) シリアライズで済む(内部で毎回再計算すると O(n²) になる)。
fn tokens_at(
history: &[Item],
records: &[UsageRecord],
index: usize,
prefix: &[u64],
) -> TokenEstimate {
debug_assert!(index <= history.len());
debug_assert_eq!(prefix.len(), history.len() + 1);
if index == 0 {
return TokenEstimate {
tokens: 0,
source: EstimateSource::Measured,
};
}
if records.is_empty() {
return TokenEstimate {
tokens: prefix[index] / 4,
source: EstimateSource::NoData,
};
}
// exact matchrev 走査で一番新しい record を採用)
if let Some(r) = records.iter().rev().find(|r| r.history_len == index) {
return TokenEstimate {
tokens: r.input_total_tokens,
source: EstimateSource::Measured,
};
}
let lower = records.iter().rev().find(|r| r.history_len < index);
let upper = records.iter().find(|r| r.history_len > index);
let cap = history.len();
match (lower, upper) {
(Some(lo), Some(up)) => {
let lo_bytes = prefix[lo.history_len.min(cap)];
let up_bytes = prefix[up.history_len.min(cap)];
let at_bytes = prefix[index];
let span_bytes = up_bytes.saturating_sub(lo_bytes);
let span_tokens = up.input_total_tokens.saturating_sub(lo.input_total_tokens);
if span_bytes == 0 || span_tokens == 0 {
return TokenEstimate {
tokens: lo.input_total_tokens,
source: EstimateSource::Interpolated,
};
}
let delta_bytes = at_bytes.saturating_sub(lo_bytes);
let delta_tokens =
(delta_bytes as u128 * span_tokens as u128 / span_bytes as u128) as u64;
TokenEstimate {
tokens: lo.input_total_tokens + delta_tokens,
source: EstimateSource::Interpolated,
}
}
(Some(lo), None) => {
let lo_bytes = prefix[lo.history_len.min(cap)];
let at_bytes = prefix[index];
if lo_bytes == 0 || lo.input_total_tokens == 0 {
return TokenEstimate {
tokens: lo.input_total_tokens,
source: EstimateSource::Extrapolated,
};
}
let delta_bytes = at_bytes.saturating_sub(lo_bytes);
let delta_tokens =
(delta_bytes as u128 * lo.input_total_tokens as u128 / lo_bytes as u128) as u64;
TokenEstimate {
tokens: lo.input_total_tokens + delta_tokens,
source: EstimateSource::Extrapolated,
}
}
(None, Some(up)) => {
let up_bytes = prefix[up.history_len.min(cap)];
let at_bytes = prefix[index];
if up_bytes == 0 {
return TokenEstimate {
tokens: 0,
source: EstimateSource::Interpolated,
};
}
let t = (at_bytes as u128 * up.input_total_tokens as u128 / up_bytes as u128) as u64;
TokenEstimate {
tokens: t,
source: EstimateSource::Interpolated,
}
}
(None, None) => unreachable!("records non-empty but neither lower nor upper matched"),
}
}
pub(crate) fn total_tokens_impl(history: &[Item], records: &[UsageRecord]) -> TokenEstimate {
let prefix = prefix_bytes(history);
tokens_at(history, records, history.len(), &prefix)
}
/// 任意の history index 時点でのプロンプト全長推定。
/// `history_len == 0` で 0 を返す。delta 計算 (extract trigger 等) で
/// `total_tokens_at(now) - total_tokens_at(pointer)` の形で使う。
pub(crate) fn total_tokens_at_impl(
history: &[Item],
records: &[UsageRecord],
history_len: usize,
) -> TokenEstimate {
let prefix = prefix_bytes(history);
tokens_at(history, records, history_len.min(history.len()), &prefix)
}
fn split_for_retained_impl(history: &[Item], records: &[UsageRecord], retained: u64) -> SplitPoint {
let prefix = prefix_bytes(history);
let current = tokens_at(history, records, history.len(), &prefix);
@ -300,7 +145,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// 最後の measurement と、その後に追加された未測定分のバイト按分/外挿。
pub fn total_tokens(&self) -> TokenEstimate {
let usage = self.usage_history();
total_tokens_impl(self.history(), &usage)
llm_worker::token_counter::total_tokens(self.history(), &usage)
}
/// 任意の history index 時点でのプロンプト全長推定。
@ -311,7 +156,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// pointer 以降に増えたプロンプト長を測るのに使う。
pub fn total_tokens_at(&self, history_len: usize) -> TokenEstimate {
let usage = self.usage_history();
total_tokens_at_impl(self.history(), &usage, history_len)
llm_worker::token_counter::total_tokens_at(self.history(), &usage, history_len)
}
/// 末尾から `retained` トークン以上を残すための分割位置。
@ -341,38 +186,6 @@ mod tests {
}
}
#[test]
fn total_no_data_falls_back_to_byte_estimate() {
let history = vec![msg("hello world")];
let est = total_tokens_impl(&history, &[]);
assert_eq!(est.source, EstimateSource::NoData);
assert!(est.tokens > 0);
}
#[test]
fn total_measured_when_last_record_matches_history_len() {
let history = vec![msg("a"), msg("b"), msg("c")];
let records = vec![record(3, 120)];
let est = total_tokens_impl(&history, &records);
assert_eq!(est.source, EstimateSource::Measured);
assert_eq!(est.tokens, 120);
}
#[test]
fn total_extrapolated_when_history_grew_past_last_measurement() {
let history = vec![msg("a"), msg("b"), msg("c"), msg("d")];
let records = vec![record(3, 100)];
let est = total_tokens_impl(&history, &records);
assert_eq!(est.source, EstimateSource::Extrapolated);
assert!(est.tokens > 100);
}
#[test]
fn total_zero_history_is_zero() {
let est = total_tokens_impl(&[], &[]);
assert_eq!(est.tokens, 0);
}
#[test]
fn split_returns_zero_when_current_below_retained() {
let history = vec![msg("a"), msg("b")];

View File

@ -16,8 +16,8 @@
use std::sync::Mutex;
use llm_worker::UsageRecord;
use llm_worker::timeline::event::UsageEvent;
use session_store::UsageRecord;
/// Shared between the pre-request hook, the `on_usage` callback, and Pod.
pub(crate) struct UsageTracker {

View File

@ -16,12 +16,12 @@ use llm_worker::interceptor::{
Interceptor, PostToolAction, PreRequestAction, PreToolAction, PromptAction, ToolCallInfo,
ToolResultInfo, TurnEndAction,
};
use llm_worker::UsageRecord;
use llm_worker::tool::ToolOutput;
use session_store::UsageRecord;
use tracing::info;
use crate::compact::state::CompactState;
use crate::compact::token_counter::total_tokens_impl;
use llm_worker::token_counter::total_tokens;
use crate::hook::{
AbortInfo, HookRegistry, PreRequestInfo, PromptSubmitInfo, ToolCallSummary, ToolResultSummary,
TurnEndInfo,
@ -82,7 +82,7 @@ impl PodInterceptor {
fn estimated_tokens(&self, context: &[Item]) -> Option<u64> {
let handle = self.usage_history.as_ref()?;
let records = handle.lock().expect("usage_history poisoned").clone();
Some(total_tokens_impl(context, &records).tokens)
Some(total_tokens(context, &records).tokens)
}
}

View File

@ -6,10 +6,8 @@ use llm_worker::Item;
use llm_worker::llm_client::RequestConfig;
use llm_worker::llm_client::client::LlmClient;
use llm_worker::state::Mutable;
use llm_worker::{ToolOutputLimits, Worker, WorkerError, WorkerResult};
use session_store::{
EntryHash, Outcome, SessionId, SessionStartState, Store, StoreError, UsageRecord,
};
use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult};
use session_store::{EntryHash, SessionId, SessionStartState, Store, StoreError};
use tracing::{info, warn};
use manifest::{PodManifest, PodManifestConfig, ResolveError, Scope, ScopeError, WorkerManifest};
@ -963,23 +961,28 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
}
let interrupted = self.worker.as_ref().unwrap().last_run_interrupted();
let outcome = match result {
Ok(WorkerResult::Finished) => Outcome::Finished,
Ok(WorkerResult::Paused) => Outcome::Paused,
Ok(WorkerResult::LimitReached) => Outcome::LimitReached,
Ok(WorkerResult::Yielded) => Outcome::Yielded,
Err(e) => Outcome::Error {
message: e.to_string(),
},
};
session_store::save_outcome(
&self.store,
self.session_id,
&mut self.head_hash,
outcome,
interrupted,
)
.await?;
match result {
Ok(r) => {
session_store::save_run_completed(
&self.store,
self.session_id,
&mut self.head_hash,
r.clone(),
interrupted,
)
.await?;
}
Err(e) => {
session_store::save_run_errored(
&self.store,
self.session_id,
&mut self.head_hash,
e.to_string(),
interrupted,
)
.await?;
}
}
Ok(())
}

View File

@ -36,12 +36,13 @@ pub use event_trace::TraceEntry;
pub use fs_store::FsStore;
pub use session::{
SessionStartState, create_compacted_session, create_session, create_session_with_id,
ensure_head_or_fork, fork, fork_at, restore, save_cache_locked, save_cache_unlocked,
save_config_changed, save_delta, save_extension, save_outcome, save_turn_end, save_usage,
ensure_head_or_fork, fork, fork_at, restore, save_config_changed, save_delta, save_extension,
save_run_completed, save_run_errored, save_turn_end, save_usage,
};
pub use llm_worker::UsageRecord;
pub use session_log::{
EntryHash, HashedEntry, LogEntry, Outcome, RestoredState, SessionOrigin, UsageRecord,
build_chain, collect_state, compute_hash,
EntryHash, HashedEntry, LogEntry, RestoredState, SessionOrigin, build_chain, collect_state,
compute_hash,
};
pub use store::{Store, StoreError};

View File

@ -5,8 +5,9 @@
//! functions after state-mutating operations.
use crate::SessionId;
use crate::session_log::{self, EntryHash, HashedEntry, LogEntry, Outcome, SessionOrigin};
use crate::session_log::{self, EntryHash, HashedEntry, LogEntry, SessionOrigin};
use crate::store::{Store, StoreError};
use llm_worker::WorkerResult;
use llm_worker::llm_client::RequestConfig;
use llm_worker::llm_client::types::Item;
@ -237,22 +238,46 @@ pub async fn save_turn_end(
.await
}
/// Log a RunOutcome entry.
pub async fn save_outcome(
/// Log a `RunCompleted` entry — `run()` / `resume()` returned `Ok(WorkerResult)`.
pub async fn save_run_completed(
store: &impl Store,
session_id: SessionId,
head_hash: &mut Option<EntryHash>,
outcome: Outcome,
result: WorkerResult,
interrupted: bool,
) -> Result<(), StoreError> {
append_entry(
store,
session_id,
head_hash,
LogEntry::RunOutcome {
LogEntry::RunCompleted {
ts: session_log::now_millis(),
outcome,
interrupted,
result,
},
)
.await
}
/// Log a `RunErrored` entry — `run()` / `resume()` returned `Err(WorkerError)`.
///
/// `WorkerError` is not `Serialize`, so the caller passes a lossy
/// `to_string()` rendering as `message`.
pub async fn save_run_errored(
store: &impl Store,
session_id: SessionId,
head_hash: &mut Option<EntryHash>,
message: String,
interrupted: bool,
) -> Result<(), StoreError> {
append_entry(
store,
session_id,
head_hash,
LogEntry::RunErrored {
ts: session_log::now_millis(),
interrupted,
message,
},
)
.await
@ -290,42 +315,6 @@ pub async fn save_usage(
.await
}
/// Log a `Locked` entry (KV cache locked).
pub async fn save_cache_locked(
store: &impl Store,
session_id: SessionId,
head_hash: &mut Option<EntryHash>,
locked_prefix_len: usize,
) -> Result<(), StoreError> {
append_entry(
store,
session_id,
head_hash,
LogEntry::Locked {
ts: session_log::now_millis(),
locked_prefix_len,
},
)
.await
}
/// Log a `CacheUnlocked` entry.
pub async fn save_cache_unlocked(
store: &impl Store,
session_id: SessionId,
head_hash: &mut Option<EntryHash>,
) -> Result<(), StoreError> {
append_entry(
store,
session_id,
head_hash,
LogEntry::CacheUnlocked {
ts: session_log::now_millis(),
},
)
.await
}
/// Log an `Extension` entry — domain-tagged opaque payload.
///
/// session-store treats `payload` as an unstructured `serde_json::Value`.

View File

@ -9,6 +9,7 @@
//! enables safe fork detection when multiple writers share a session.
use llm_worker::llm_client::types::{Item, RequestConfig};
use llm_worker::{UsageRecord, WorkerResult};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
@ -88,8 +89,7 @@ pub struct HashedEntry {
/// - `SessionStart` — always the first entry; captures initial state
/// - `UserInput` / `AssistantItems` / `ToolResults` / `HookInjectedItems` — history appends
/// - `TurnEnd` — turn boundary marker
/// - `Locked` / `CacheUnlocked` — KV cache state transitions
/// - `RunOutcome` — marks end of a `run()` or `resume()` call
/// - `RunCompleted` / `RunErrored` — marks end of a `run()` or `resume()` call
/// - `ConfigChanged` — `RequestConfig` mutation
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
@ -124,19 +124,21 @@ pub enum LogEntry {
/// Turn boundary. Records the turn count after increment.
TurnEnd { ts: u64, turn_count: usize },
/// KV cache locked. Records the history prefix length that is now immutable.
#[serde(alias = "cache_locked")]
Locked { ts: u64, locked_prefix_len: usize },
/// KV cache unlocked.
CacheUnlocked { ts: u64 },
/// Outcome of a `run()` or `resume()` call.
/// This is metadata for auditing; state collection does not branch on the outcome.
RunOutcome {
/// `run()` / `resume()` が `WorkerResult` で正常終了した。
/// Audit-only metadata: replay は `interrupted` のみ反映する。
RunCompleted {
ts: u64,
outcome: Outcome,
interrupted: bool,
result: WorkerResult,
},
/// `run()` / `resume()` が `WorkerError` で終了した。
/// `WorkerError` は `Serialize` 不可なので `message` のみ lossy 保持する。
/// Audit-only metadata: replay は `interrupted` のみ反映する。
RunErrored {
ts: u64,
interrupted: bool,
message: String,
},
/// `RequestConfig` changed.
@ -188,21 +190,6 @@ pub struct SessionOrigin {
pub at_hash: EntryHash,
}
/// Outcome of a run/resume call. Metadata for auditing only.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum Outcome {
Finished,
Paused,
LimitReached,
/// Worker yielded control to the caller for external processing.
/// Distinct from `Paused`: caller handles internally and resumes.
Yielded,
Error {
message: String,
},
}
/// State collected from log entries.
#[derive(Debug, Clone)]
pub struct RestoredState {
@ -210,7 +197,6 @@ pub struct RestoredState {
pub config: RequestConfig,
pub history: Vec<Item>,
pub turn_count: usize,
pub locked_prefix_len: usize,
pub last_run_interrupted: bool,
/// Hash of the last entry in the chain (None if empty).
pub head_hash: Option<EntryHash>,
@ -223,23 +209,6 @@ pub struct RestoredState {
pub extensions: Vec<(String, serde_json::Value)>,
}
/// LLM リクエスト送信時点での占有量スナップショット。
///
/// `LogEntry::LlmUsage` の replay 時に `RestoredState.usage_history` に積まれる。
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UsageRecord {
/// 送信時の history.len()
pub history_len: usize,
/// history[..history_len] の占有量(プロンプト全長、実測)
pub input_total_tokens: u64,
/// 上記のうちキャッシュから読み出された分
pub cache_read_tokens: u64,
/// 上記のうちこのリクエストでキャッシュに書かれた分
pub cache_write_tokens: u64,
/// このリクエストで生成された出力トークン数
pub output_tokens: u64,
}
/// Replay a sequence of hashed entries to reconstruct worker state.
pub fn collect_state(entries: &[HashedEntry]) -> RestoredState {
let mut state = RestoredState {
@ -247,7 +216,6 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState {
config: RequestConfig::default(),
history: Vec::new(),
turn_count: 0,
locked_prefix_len: 0,
last_run_interrupted: false,
head_hash: None,
usage_history: Vec::new(),
@ -283,15 +251,10 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState {
LogEntry::TurnEnd { turn_count, .. } => {
state.turn_count = *turn_count;
}
LogEntry::Locked {
locked_prefix_len, ..
} => {
state.locked_prefix_len = *locked_prefix_len;
LogEntry::RunCompleted { interrupted, .. } => {
state.last_run_interrupted = *interrupted;
}
LogEntry::CacheUnlocked { .. } => {
state.locked_prefix_len = 0;
}
LogEntry::RunOutcome { interrupted, .. } => {
LogEntry::RunErrored { interrupted, .. } => {
state.last_run_interrupted = *interrupted;
}
LogEntry::ConfigChanged { config, .. } => {
@ -361,7 +324,6 @@ mod tests {
let state = collect_state(&[]);
assert!(state.history.is_empty());
assert_eq!(state.turn_count, 0);
assert_eq!(state.locked_prefix_len, 0);
assert!(state.head_hash.is_none());
}
@ -405,10 +367,10 @@ mod tests {
ts: 3100,
turn_count: 1,
},
LogEntry::RunOutcome {
LogEntry::RunCompleted {
ts: 3200,
outcome: Outcome::Finished,
interrupted: false,
result: WorkerResult::Finished,
},
]);
let state = collect_state(&entries);
@ -459,31 +421,6 @@ mod tests {
assert!(state.history[2].is_tool_result());
}
#[test]
fn replay_cache_lock_unlock() {
let entries = build_chain(&[
LogEntry::SessionStart {
ts: 1000,
system_prompt: None,
config: RequestConfig::default(),
history: vec![Item::user_message("a"), Item::assistant_message("b")],
forked_from: None,
compacted_from: None,
},
LogEntry::Locked {
ts: 2000,
locked_prefix_len: 2,
},
LogEntry::CacheUnlocked { ts: 3000 },
]);
let state = collect_state(&entries);
assert_eq!(state.locked_prefix_len, 0);
// Check locked state before unlock
let state_locked = collect_state(&entries[..2]);
assert_eq!(state_locked.locked_prefix_len, 2);
}
#[test]
fn replay_config_changed() {
let entries = build_chain(&[

View File

@ -1,6 +1,7 @@
use llm_worker::WorkerResult;
use llm_worker::llm_client::types::{Item, RequestConfig};
use session_store::{
FsStore, LogEntry, Outcome, Store, TraceEntry, build_chain, collect_state, new_session_id,
FsStore, LogEntry, Store, TraceEntry, build_chain, collect_state, new_session_id,
};
#[tokio::test]
@ -30,10 +31,10 @@ async fn round_trip_write_and_read() {
ts: 3100,
turn_count: 1,
},
LogEntry::RunOutcome {
LogEntry::RunCompleted {
ts: 3200,
outcome: Outcome::Finished,
interrupted: false,
result: WorkerResult::Finished,
},
];
let entries = build_chain(&raw);

View File

@ -9,9 +9,7 @@ use llm_worker::interceptor::{Interceptor, TurnEndAction};
use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent};
use llm_worker::llm_client::types::{Item, RequestConfig};
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use session_store::{
EntryHash, FsStore, LogEntry, Outcome, SessionStartState, Store, collect_state,
};
use session_store::{EntryHash, FsStore, LogEntry, SessionStartState, Store, collect_state};
// =============================================================================
// Helpers
@ -115,24 +113,30 @@ async fn run_and_persist(
.await
.unwrap();
let outcome = match &result {
Ok(llm_worker::WorkerResult::Finished) => Outcome::Finished,
Ok(llm_worker::WorkerResult::Paused) => Outcome::Paused,
Ok(llm_worker::WorkerResult::LimitReached) => Outcome::LimitReached,
Ok(llm_worker::WorkerResult::Yielded) => Outcome::Yielded,
Err(e) => Outcome::Error {
message: e.to_string(),
},
};
session_store::save_outcome(
store,
session_id,
head_hash,
outcome,
worker.last_run_interrupted(),
)
.await
.unwrap();
match &result {
Ok(r) => {
session_store::save_run_completed(
store,
session_id,
head_hash,
r.clone(),
worker.last_run_interrupted(),
)
.await
.unwrap();
}
Err(e) => {
session_store::save_run_errored(
store,
session_id,
head_hash,
e.to_string(),
worker.last_run_interrupted(),
)
.await
.unwrap();
}
}
let r = result.unwrap();
(worker, r)
@ -165,7 +169,7 @@ async fn session_run_logs_entries() {
let entries = store.read_all(sid).await.unwrap();
// SessionStart, UserInput, AssistantItems, TurnEnd, RunOutcome (at minimum)
// SessionStart, UserInput, AssistantItems, TurnEnd, RunCompleted (at minimum)
assert!(
entries.len() >= 4,
"expected at least 4 entries, got {}",
@ -175,12 +179,12 @@ async fn session_run_logs_entries() {
// First entry is SessionStart
assert!(matches!(&entries[0].entry, LogEntry::SessionStart { .. }));
// Has a RunOutcome with Finished
// Has a RunCompleted with Finished
let has_finished = entries.iter().any(|e| {
matches!(
&e.entry,
LogEntry::RunOutcome {
outcome: Outcome::Finished,
LogEntry::RunCompleted {
result: llm_worker::WorkerResult::Finished,
..
}
)
@ -292,13 +296,13 @@ async fn session_resume_after_pause() {
let (_worker, result) = run_and_persist(worker, &store, sid, &mut head_hash, "Weather?").await;
assert!(matches!(result, llm_worker::WorkerResult::Paused));
// Check RunOutcome is Paused
// Check RunCompleted is Paused
let entries = store.read_all(sid).await.unwrap();
let has_paused = entries.iter().any(|e| {
matches!(
&e.entry,
LogEntry::RunOutcome {
outcome: Outcome::Paused,
LogEntry::RunCompleted {
result: llm_worker::WorkerResult::Paused,
..
}
)
@ -430,54 +434,6 @@ async fn session_config_changed_logged() {
assert!(has_config_changed, "should have ConfigChanged entry");
}
#[tokio::test]
async fn session_cache_lock_unlock_logged() {
let (_dir, store) = make_store().await;
let client = MockLlmClient::new(vec![]);
let worker = Worker::new(client);
let (sid, head_hash) = session_store::create_session(
&store,
SessionStartState {
system_prompt: worker.get_system_prompt(),
config: worker.request_config(),
history: worker.history(),
},
)
.await
.unwrap();
let mut head_hash = Some(head_hash);
session_store::save_cache_locked(&store, sid, &mut head_hash, 5)
.await
.unwrap();
session_store::save_cache_unlocked(&store, sid, &mut head_hash)
.await
.unwrap();
let entries = store.read_all(sid).await.unwrap();
let has_locked = entries.iter().any(|e| {
matches!(
&e.entry,
LogEntry::Locked {
locked_prefix_len: 5,
..
}
)
});
assert!(has_locked, "should have Locked entry");
let has_unlocked = entries
.iter()
.any(|e| matches!(&e.entry, LogEntry::CacheUnlocked { .. }));
assert!(has_unlocked, "should have CacheUnlocked entry");
// State after all entries: unlocked
let state = collect_state(&entries);
assert_eq!(state.locked_prefix_len, 0);
}
#[tokio::test]
async fn session_auto_forks_on_conflict() {
let (_dir, store) = make_store().await;

View File

@ -101,3 +101,8 @@ session-store の `LogEntry` 周りで「llm-worker の概念を session-store
- `crates/pod/src/compact/token_counter.rs` (移動元)
- `crates/pod/src/pod.rs` (handle_worker_result の Outcome 構築箇所、Pod::total_tokens 経路)
- `docs/persistence.md` (元設計の意図: RunOutcome は audit-only)
## Review
- 状態: Approve
- レビュー詳細: [./session-store-llm-worker-type-ownership.review.md](./session-store-llm-worker-type-ownership.review.md)
- 日付: 2026-04-28

View File

@ -0,0 +1,60 @@
# Review: session-store / llm-worker 型責務の整理
## 前提・要件の確認
### 1. `UsageRecord` を llm-worker に移動
- 移動先: `crates/llm-worker/src/usage_record.rs:9``UsageRecord` を新規定義、`crates/llm-worker/src/lib.rs:54,61` で `pub mod` + `pub use` 済み
- session-store 側: `crates/session-store/src/lib.rs:42``pub use llm_worker::UsageRecord;` の互換 re-export
- session-store 内部: `crates/session-store/src/session_log.rs:12``use llm_worker::{UsageRecord, WorkerResult};` に切替済み(旧定義は削除)
- pod の参照経路更新: `crates/pod/src/pod.rs:9`, `crates/pod/src/compact/usage_tracker.rs:19`, `crates/pod/src/ipc/interceptor.rs:19` がいずれも `llm_worker::UsageRecord` 経由
- `LogEntry::LlmUsage` は inline fields のまま(`session_log.rs:160`)— 方針通り
- 充足
### 2. `token_counter` を llm-worker に移動
- 汎用部分: `crates/llm-worker/src/token_counter.rs``prefix_bytes`, `tokens_at`, `total_tokens`, `total_tokens_at`, `item_bytes` 移動済み。`EstimateSource` / `TokenEstimate` も llm-worker に集約
- compact 専用部分: `crates/pod/src/compact/token_counter.rs``SplitPoint`, `split_for_retained_impl`, `tool_result_content_bytes`, `savings_for_prune_impl` 残置
- Pod の薄ラッパー: `total_tokens` (`compact/token_counter.rs:146`), `total_tokens_at` (`:157`), `split_for_retained` (`:165`) はいずれも `llm_worker::token_counter::*` 呼び出しの薄ラッパーに
- 外部呼出経路: `pod/src/ipc/interceptor.rs:24,85``use llm_worker::token_counter::total_tokens;` を直接利用pod 経由を回避できる)
- pod の lib re-export (`pod/src/lib.rs:14`) は維持されており、外部 API の互換が崩れていない
- 充足
### 3. `Outcome` 廃止 + `RunCompleted` / `RunErrored` への分解
- `WorkerResult` の derive: `crates/llm-worker/src/worker.rs:69-70``#[derive(Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]` + `#[serde(rename_all = "snake_case")]` 追加済み
- `Outcome` enum は完全削除(`session_log.rs` 内に痕跡なし、grep 確認済み)
- `LogEntry::RunCompleted` / `LogEntry::RunErrored` 2 variants で表現(`session_log.rs:128-142`)。両方とも audit-only metadata のまま
- `collect_state` の対応 arm`session_log.rs:254-259`)はチケット指示通り `state.last_run_interrupted = *interrupted` のみ
- 関数分割: `save_run_completed` (`session.rs:242`) / `save_run_errored` (`session.rs:266`) の 2 関数。lib.rs の `pub use` (`lib.rs:40`) も追従
- pod 側 match: `pod/src/pod.rs:964-985``Ok(r) => save_run_completed / Err(e) => save_run_errored`
- テスト更新: `session_test.rs:118-138`, `fs_store_test.rs:34`, `session_log.rs` 内の `replay_full_turn` 等が `RunCompleted` 参照に更新
- 既存ログ互換: variant tag 破壊的変更(`run_outcome` → `run_completed` / `run_errored`)。チケットの判断「実運用ログがほぼ無い前提なら破壊的変更で OK」に沿う
- 充足
### 4. `Locked` / `CacheUnlocked` 関連 API 削除
- `LogEntry::Locked` / `LogEntry::CacheUnlocked` variants 削除、grep で痕跡なしsession-store 配下)
- `save_cache_locked` / `save_cache_unlocked` 関数削除、`lib.rs` の re-export からも削除
- `RestoredState.locked_prefix_len` field 削除、`collect_state` の match arm 削除
- 関連 unit test 削除(`replay_cache_lock_unlock`, `session_cache_lock_unlock_logged` 等)
- `replay_empty``assert_eq!(state.locked_prefix_len, 0)` も削除済み
- Worker 内部の `locked_prefix_len: usize` は session-store と無関係 — 残置で問題なし(チケット注記と一致)
- 充足
## アーキテクチャ・スコープ
- 依存方向: pod → llm-worker、pod → session-store、session-store → llm-worker を維持。Cargo.toml で確認、循環なし
- llm-worker の階層性: `usage_record` / `token_counter` の追加は LLM call の per-call measurement と pure な token accounting のみで、上位ドメインcompact 等)を持ち込んでおらず低レベル基盤の方針に沿う
- crate 名前付け: 新規ファイル追加のみで、新規 crate なし
- ScopedFs 等のスクリプティング計画への影響なし
- LLM provider policy への影響なし
- 変更範囲: チケット記載の 4 項目に正確に対応、範囲外の改変は見当たらない
- ビルド & テスト: `cargo check --workspace --all-targets` 新規 warning なし。session-store 8+7+13 件、llm-worker token_counter 4 件、pod 全テスト pass を再確認
## 指摘事項
### Non-blocking / Follow-up
- なし
### Nits
- `crates/pod/src/ipc/interceptor.rs:274` のテスト用ヘルパー doc comment が `total_tokens_impl` を参照したまま。実際の関数は `llm_worker::token_counter::total_tokens` にリネーム済みなので追従が望ましい
- `crates/pod/src/compact/token_counter.rs:34``split_for_retained_impl` は pod ローカルなヘルパー(公開 API は `Pod::split_for_retained`)なので `_impl` サフィックスは慣習的に許容範囲。ただし `total_tokens_impl``total_tokens` のリネーム方針と揃えるなら `split_for_retained_inner` 等に揃える余地あり(現状のままで実害なし)
## 判断
Approve — チケットに記載された 4 項目すべてが過不足なく実装され、既存テストはパス、依存方向と層責務も維持されている。残課題は doc comment 1 行のみで、ブロッキングではない。