516 lines
17 KiB
Rust
516 lines
17 KiB
Rust
//! Free functions for segment persistence operations.
|
||
//!
|
||
//! These functions record and restore segment state without owning a Worker.
|
||
//! The caller (typically Pod) holds the Worker directly and calls these
|
||
//! functions after state-mutating operations.
|
||
|
||
use crate::logged_item::{LoggedItem, to_logged};
|
||
use crate::segment_log::{self, LogEntry, PodScopeSnapshot, SegmentOrigin};
|
||
use crate::store::{Store, StoreError};
|
||
use crate::system_item::SystemItem;
|
||
use crate::{SegmentId, SessionId};
|
||
use llm_worker::WorkerResult;
|
||
use llm_worker::llm_client::RequestConfig;
|
||
use llm_worker::llm_client::types::Item;
|
||
use protocol::Segment;
|
||
|
||
/// State snapshot for creating a SegmentStart entry.
|
||
pub struct SegmentStartState<'a> {
|
||
pub system_prompt: Option<&'a str>,
|
||
pub config: &'a RequestConfig,
|
||
pub history: &'a [Item],
|
||
}
|
||
|
||
/// Create a new session + initial segment, writing the initial
|
||
/// `SegmentStart` entry. Returns the freshly minted `(session_id, segment_id)`.
|
||
pub fn create_segment(
|
||
store: &impl Store,
|
||
state: SegmentStartState<'_>,
|
||
) -> Result<(SessionId, SegmentId), StoreError> {
|
||
let session_id = crate::new_session_id();
|
||
let segment_id = crate::new_segment_id();
|
||
create_segment_with_ids(store, session_id, segment_id, state)?;
|
||
Ok((session_id, segment_id))
|
||
}
|
||
|
||
/// Write a fresh `SegmentStart` entry using pre-generated IDs.
|
||
///
|
||
/// Used by callers that need to reserve `(session_id, segment_id)`
|
||
/// synchronously but defer the initial log append (e.g. Pod, which
|
||
/// resolves a templated system prompt only at first turn).
|
||
pub fn create_segment_with_ids(
|
||
store: &impl Store,
|
||
session_id: SessionId,
|
||
segment_id: SegmentId,
|
||
state: SegmentStartState<'_>,
|
||
) -> Result<(), StoreError> {
|
||
let entry = LogEntry::SegmentStart {
|
||
ts: segment_log::now_millis(),
|
||
session_id,
|
||
system_prompt: state.system_prompt.map(String::from),
|
||
config: state.config.clone(),
|
||
history: to_logged(state.history),
|
||
forked_from: None,
|
||
compacted_from: None,
|
||
};
|
||
store.append(session_id, segment_id, &entry)
|
||
}
|
||
|
||
/// Create a compacted segment from an existing one. Inherits the source's
|
||
/// `session_id` so the compacted lineage stays within the same Session.
|
||
///
|
||
/// Records `compacted_from` provenance linking back to the source segment
|
||
/// at the turn boundary captured by `source_turn_count` (the most recent
|
||
/// completed turn in the source).
|
||
pub fn create_compacted_segment(
|
||
store: &impl Store,
|
||
state: SegmentStartState<'_>,
|
||
source_session_id: SessionId,
|
||
source_segment_id: SegmentId,
|
||
source_turn_count: usize,
|
||
) -> Result<SegmentId, StoreError> {
|
||
let segment_id = crate::new_segment_id();
|
||
let entry = LogEntry::SegmentStart {
|
||
ts: segment_log::now_millis(),
|
||
session_id: source_session_id,
|
||
system_prompt: state.system_prompt.map(String::from),
|
||
config: state.config.clone(),
|
||
history: to_logged(state.history),
|
||
forked_from: None,
|
||
compacted_from: Some(SegmentOrigin {
|
||
segment_id: source_segment_id,
|
||
at_turn_index: source_turn_count,
|
||
}),
|
||
};
|
||
store.append(source_session_id, segment_id, &entry)?;
|
||
Ok(segment_id)
|
||
}
|
||
|
||
/// Restore segment state from a stored log.
|
||
///
|
||
/// Returns the reconstructed state. The caller is responsible for
|
||
/// applying it to a Worker.
|
||
pub fn restore(
|
||
store: &impl Store,
|
||
session_id: SessionId,
|
||
segment_id: SegmentId,
|
||
) -> Result<crate::segment_log::RestoredState, StoreError> {
|
||
let entries = store.read_all(session_id, segment_id)?;
|
||
Ok(segment_log::collect_state(&entries))
|
||
}
|
||
|
||
/// Restore segment state when only the segment ID is known. Uses
|
||
/// [`Store::lookup_session_of`] to resolve the parent Session.
|
||
///
|
||
/// Shim for legacy entry points (`pod-cli --session <UUID>` etc.) that
|
||
/// receive a Segment ID without a Session ID.
|
||
pub fn restore_by_segment(
|
||
store: &impl Store,
|
||
segment_id: SegmentId,
|
||
) -> Result<crate::segment_log::RestoredState, StoreError> {
|
||
let session_id = store
|
||
.lookup_session_of(segment_id)?
|
||
.ok_or(StoreError::NotFound(segment_id))?;
|
||
restore(store, session_id, segment_id)
|
||
}
|
||
|
||
/// Live auto-fork on concurrent-writer detection.
|
||
///
|
||
/// Checks whether the store's on-disk entry count still matches the
|
||
/// writer's own append tally. If they match, the writer still owns the
|
||
/// segment tail and nothing happens. If the store has grown behind the
|
||
/// writer's back, another process appended to the same segment — so we
|
||
/// branch into a fresh segment within the same Session.
|
||
///
|
||
/// # Marker form
|
||
///
|
||
/// Detection is by **tail entry-count comparison**, not by writing a
|
||
/// terminal marker into the source segment. The source segment is left
|
||
/// completely immutable — identical to the past-fork ([`fork_at`])
|
||
/// invariant. The fork relationship is instead recorded forward on the
|
||
/// *new* segment's `SegmentStart.forked_from`, so the lineage is still
|
||
/// reconstructable from the logs alone (read each segment's
|
||
/// `SegmentStart`; follow `forked_from` / `compacted_from` backward).
|
||
/// Listing a parent's children is a cheap `list_segments(session_id)`
|
||
/// scan filtered on `forked_from.segment_id`.
|
||
///
|
||
/// `at_turn_index` is the writer's current `turn_count`: the fork seeds
|
||
/// the new segment with the writer's in-memory history (which reflects
|
||
/// state up to that turn), so that is the divergence point relative to
|
||
/// the now-diverged source segment.
|
||
///
|
||
/// Updates `segment_id` and `entries_written` in place when a fork occurs.
|
||
pub fn ensure_head_or_fork(
|
||
store: &impl Store,
|
||
session_id: SessionId,
|
||
segment_id: &mut SegmentId,
|
||
entries_written: &mut usize,
|
||
at_turn_index: usize,
|
||
state: SegmentStartState<'_>,
|
||
) -> Result<(), StoreError> {
|
||
let store_count = store.read_entry_count(session_id, *segment_id)?;
|
||
if store_count == *entries_written {
|
||
return Ok(());
|
||
}
|
||
let source_segment_id = *segment_id;
|
||
let fork_id = crate::new_segment_id();
|
||
let entry = LogEntry::SegmentStart {
|
||
ts: segment_log::now_millis(),
|
||
session_id,
|
||
system_prompt: state.system_prompt.map(String::from),
|
||
config: state.config.clone(),
|
||
history: to_logged(state.history),
|
||
forked_from: Some(SegmentOrigin {
|
||
segment_id: source_segment_id,
|
||
at_turn_index,
|
||
}),
|
||
compacted_from: None,
|
||
};
|
||
store.create_segment(session_id, fork_id, &[entry])?;
|
||
*segment_id = fork_id;
|
||
*entries_written = 1;
|
||
Ok(())
|
||
}
|
||
|
||
/// Log a `UserInput` entry from the original typed `Vec<Segment>`.
|
||
///
|
||
/// Submit-time entry. Pod calls this at the head of a `Run` turn before
|
||
/// the worker pushes its flattened user message into history; replay
|
||
/// derives the worker `Item::user_message` from these segments via
|
||
/// [`Segment::flatten_to_text`].
|
||
pub fn save_user_input(
|
||
store: &impl Store,
|
||
session_id: SessionId,
|
||
segment_id: SegmentId,
|
||
segments: Vec<Segment>,
|
||
) -> Result<(), StoreError> {
|
||
append_entry(
|
||
store,
|
||
session_id,
|
||
segment_id,
|
||
LogEntry::UserInput {
|
||
ts: segment_log::now_millis(),
|
||
segments,
|
||
},
|
||
)
|
||
}
|
||
|
||
/// Log the history delta — new items added since the previous snapshot.
|
||
///
|
||
/// Classifies items into AssistantItem / ToolResult entries automatically
|
||
/// (one entry per item). User messages are skipped
|
||
/// because they are persisted upfront via [`save_user_input`] at submit
|
||
/// time; the worker pushes a flattened copy into its history that
|
||
/// arrives here in `new_items` and would otherwise produce a duplicate
|
||
/// `UserInput` entry.
|
||
pub fn save_delta(
|
||
store: &impl Store,
|
||
session_id: SessionId,
|
||
segment_id: SegmentId,
|
||
new_items: &[Item],
|
||
) -> Result<(), StoreError> {
|
||
if new_items.is_empty() {
|
||
return Ok(());
|
||
}
|
||
|
||
let ts = segment_log::now_millis();
|
||
for item in new_items {
|
||
if item.is_user_message() {
|
||
// Already persisted by save_user_input at submit time.
|
||
continue;
|
||
}
|
||
let entry = classify_history_item(item, ts);
|
||
append_entry(store, session_id, segment_id, entry)?;
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
/// Map one history item to its singular `LogEntry` form. Used by the
|
||
/// fallback `save_delta` path and the controller's worker-callback
|
||
/// classifier so write classification lives in one place.
|
||
pub fn classify_history_item(item: &Item, ts: u64) -> LogEntry {
|
||
if item.is_tool_result() {
|
||
LogEntry::ToolResult {
|
||
ts,
|
||
item: LoggedItem::from(item),
|
||
}
|
||
} else if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() {
|
||
LogEntry::AssistantItem {
|
||
ts,
|
||
item: LoggedItem::from(item),
|
||
}
|
||
} else {
|
||
// Defensive: anything else (future Item kinds) routes through
|
||
// AssistantItem rather than getting silently dropped.
|
||
LogEntry::AssistantItem {
|
||
ts,
|
||
item: LoggedItem::from(item),
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Append a single typed system item as `LogEntry::SystemItem`. Helper
|
||
/// for the Pod-side interceptor commit path; mirrors the per-item
|
||
/// commit shape used for assistant / tool result entries.
|
||
pub fn append_system_item(
|
||
store: &impl Store,
|
||
session_id: SessionId,
|
||
segment_id: SegmentId,
|
||
item: SystemItem,
|
||
) -> Result<(), StoreError> {
|
||
append_entry(
|
||
store,
|
||
session_id,
|
||
segment_id,
|
||
LogEntry::SystemItem {
|
||
ts: segment_log::now_millis(),
|
||
item,
|
||
},
|
||
)
|
||
}
|
||
|
||
/// Log a TurnEnd entry.
|
||
pub fn save_turn_end(
|
||
store: &impl Store,
|
||
session_id: SessionId,
|
||
segment_id: SegmentId,
|
||
turn_count: usize,
|
||
) -> Result<(), StoreError> {
|
||
append_entry(
|
||
store,
|
||
session_id,
|
||
segment_id,
|
||
LogEntry::TurnEnd {
|
||
ts: segment_log::now_millis(),
|
||
turn_count,
|
||
},
|
||
)
|
||
}
|
||
|
||
/// Log a `RunCompleted` entry — `run()` / `resume()` returned `Ok(WorkerResult)`.
|
||
pub fn save_run_completed(
|
||
store: &impl Store,
|
||
session_id: SessionId,
|
||
segment_id: SegmentId,
|
||
result: WorkerResult,
|
||
interrupted: bool,
|
||
) -> Result<(), StoreError> {
|
||
append_entry(
|
||
store,
|
||
session_id,
|
||
segment_id,
|
||
LogEntry::RunCompleted {
|
||
ts: segment_log::now_millis(),
|
||
interrupted,
|
||
result,
|
||
},
|
||
)
|
||
}
|
||
|
||
/// Log a `RunErrored` entry — `run()` / `resume()` returned `Err(WorkerError)`.
|
||
///
|
||
/// `WorkerError` is not `Serialize`, so the caller passes a lossy
|
||
/// `to_string()` rendering as `message`.
|
||
pub fn save_run_errored(
|
||
store: &impl Store,
|
||
session_id: SessionId,
|
||
segment_id: SegmentId,
|
||
message: String,
|
||
interrupted: bool,
|
||
) -> Result<(), StoreError> {
|
||
append_entry(
|
||
store,
|
||
session_id,
|
||
segment_id,
|
||
LogEntry::RunErrored {
|
||
ts: segment_log::now_millis(),
|
||
interrupted,
|
||
message,
|
||
},
|
||
)
|
||
}
|
||
|
||
/// Log an `LlmUsage` entry — 1 LLM リクエスト分の Usage スナップショット。
|
||
///
|
||
/// `history_len` は送信時の `history.len()`。`input_total_tokens` は
|
||
/// その prefix をプロバイダが実測した占有量(プロンプト全長)で、
|
||
/// プロバイダ別の正規化(Anthropic では `input + cache_read + cache_creation`)を
|
||
/// 済ませた値を渡す。
|
||
pub fn save_usage(
|
||
store: &impl Store,
|
||
session_id: SessionId,
|
||
segment_id: SegmentId,
|
||
history_len: usize,
|
||
input_total_tokens: u64,
|
||
cache_read_tokens: u64,
|
||
cache_write_tokens: u64,
|
||
output_tokens: u64,
|
||
) -> Result<(), StoreError> {
|
||
append_entry(
|
||
store,
|
||
session_id,
|
||
segment_id,
|
||
LogEntry::LlmUsage {
|
||
ts: segment_log::now_millis(),
|
||
history_len,
|
||
input_total_tokens,
|
||
cache_read_tokens,
|
||
cache_write_tokens,
|
||
output_tokens,
|
||
},
|
||
)
|
||
}
|
||
|
||
/// Log an `Extension` entry — domain-tagged opaque payload.
|
||
///
|
||
/// session-store treats `payload` as an unstructured `serde_json::Value`.
|
||
/// Each domain is responsible for serializing into and folding out of it.
|
||
/// Use `RestoredState.extensions` to read entries back at restore time.
|
||
pub fn save_extension(
|
||
store: &impl Store,
|
||
session_id: SessionId,
|
||
segment_id: SegmentId,
|
||
domain: impl Into<String>,
|
||
payload: serde_json::Value,
|
||
) -> Result<(), StoreError> {
|
||
append_entry(
|
||
store,
|
||
session_id,
|
||
segment_id,
|
||
LogEntry::Extension {
|
||
ts: segment_log::now_millis(),
|
||
domain: domain.into(),
|
||
payload,
|
||
},
|
||
)
|
||
}
|
||
|
||
/// Log the Pod's latest runtime scope snapshot.
|
||
pub fn save_pod_scope(
|
||
store: &impl Store,
|
||
session_id: SessionId,
|
||
segment_id: SegmentId,
|
||
snapshot: &PodScopeSnapshot,
|
||
) -> Result<(), StoreError> {
|
||
let payload = serde_json::to_value(snapshot)?;
|
||
save_extension(
|
||
store,
|
||
session_id,
|
||
segment_id,
|
||
segment_log::POD_SCOPE_EXTENSION_DOMAIN,
|
||
payload,
|
||
)
|
||
}
|
||
|
||
/// Log a `ConfigChanged` entry.
|
||
pub fn save_config_changed(
|
||
store: &impl Store,
|
||
session_id: SessionId,
|
||
segment_id: SegmentId,
|
||
config: &RequestConfig,
|
||
) -> Result<(), StoreError> {
|
||
append_entry(
|
||
store,
|
||
session_id,
|
||
segment_id,
|
||
LogEntry::ConfigChanged {
|
||
ts: segment_log::now_millis(),
|
||
config: config.clone(),
|
||
},
|
||
)
|
||
}
|
||
|
||
/// Fork the current state into a brand-new Session (no parent lineage).
|
||
///
|
||
/// Use this for "start a fresh conversation from this state" — the
|
||
/// returned segment does not share `session_id` with any prior segment.
|
||
/// In-Session forks (live auto-fork / past-turn fork) go through
|
||
/// [`fork_at`] or [`ensure_head_or_fork`] instead.
|
||
pub fn fork(
|
||
store: &impl Store,
|
||
state: SegmentStartState<'_>,
|
||
) -> Result<(SessionId, SegmentId), StoreError> {
|
||
let session_id = crate::new_session_id();
|
||
let fork_id = crate::new_segment_id();
|
||
let entry = LogEntry::SegmentStart {
|
||
ts: segment_log::now_millis(),
|
||
session_id,
|
||
system_prompt: state.system_prompt.map(String::from),
|
||
config: state.config.clone(),
|
||
history: to_logged(state.history),
|
||
forked_from: None,
|
||
compacted_from: None,
|
||
};
|
||
store.create_segment(session_id, fork_id, &[entry])?;
|
||
Ok((session_id, fork_id))
|
||
}
|
||
|
||
/// Fork from a turn boundary in a stored segment log, keeping the new
|
||
/// segment in the same Session as `source_id`.
|
||
///
|
||
/// `at_turn_index` is the `turn_count` of the most recent completed
|
||
/// `TurnEnd` in the source segment that the fork should branch from.
|
||
/// Replay collects state up to and including that `TurnEnd`; entries
|
||
/// after it are not carried into the new segment.
|
||
///
|
||
/// # Invariant: the source segment is never mutated
|
||
///
|
||
/// Past-fork only reads the source and seeds a brand-new segment. It
|
||
/// writes no marker back into the source — exactly like live auto-fork
|
||
/// ([`ensure_head_or_fork`]). This keeps nested past-forks simple: a
|
||
/// fork of a fork just reads its own source and branches again, with no
|
||
/// marker-position bookkeeping to reconcile across the chain.
|
||
pub fn fork_at(
|
||
store: &impl Store,
|
||
source_session_id: SessionId,
|
||
source_id: SegmentId,
|
||
at_turn_index: usize,
|
||
) -> Result<SegmentId, StoreError> {
|
||
let entries = store.read_all(source_session_id, source_id)?;
|
||
let cut = if at_turn_index == 0 {
|
||
// Branch directly after the SegmentStart (or whatever opens the
|
||
// segment), before any turn completes.
|
||
entries
|
||
.iter()
|
||
.position(|e| !matches!(e, LogEntry::SegmentStart { .. }))
|
||
.unwrap_or(entries.len())
|
||
} else {
|
||
entries
|
||
.iter()
|
||
.position(|e| matches!(e, LogEntry::TurnEnd { turn_count, .. } if *turn_count == at_turn_index))
|
||
.map(|i| i + 1)
|
||
.unwrap_or(entries.len())
|
||
};
|
||
let state = segment_log::collect_state(&entries[..cut]);
|
||
|
||
let fork_id = crate::new_segment_id();
|
||
let entry = LogEntry::SegmentStart {
|
||
ts: segment_log::now_millis(),
|
||
session_id: source_session_id,
|
||
system_prompt: state.system_prompt,
|
||
config: state.config,
|
||
history: to_logged(&state.history),
|
||
forked_from: Some(SegmentOrigin {
|
||
segment_id: source_id,
|
||
at_turn_index,
|
||
}),
|
||
compacted_from: None,
|
||
};
|
||
store.create_segment(source_session_id, fork_id, &[entry])?;
|
||
Ok(fork_id)
|
||
}
|
||
|
||
/// Append a single `LogEntry`.
|
||
///
|
||
/// Lower-level dual of the `save_*` convenience wrappers in this module.
|
||
/// Use when the caller already builds the typed entry itself (e.g. when
|
||
/// it needs the same value for an in-memory mirror + broadcast).
|
||
pub fn append_entry(
|
||
store: &impl Store,
|
||
session_id: SessionId,
|
||
segment_id: SegmentId,
|
||
entry: LogEntry,
|
||
) -> Result<(), StoreError> {
|
||
store.append(session_id, segment_id, &entry)
|
||
}
|