session-log-segments実装
This commit is contained in:
parent
1ab6dbcee3
commit
010edf2c94
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -2922,6 +2922,7 @@ dependencies = [
|
|||
"futures",
|
||||
"hex",
|
||||
"llm-worker",
|
||||
"protocol",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2 0.11.0",
|
||||
|
|
|
|||
|
|
@ -276,6 +276,7 @@ impl PodController {
|
|||
manifest_toml.clone(),
|
||||
greeting,
|
||||
));
|
||||
shared_state.set_user_segments(pod.user_segments().to_vec());
|
||||
runtime_dir.write_manifest(&manifest_toml).await?;
|
||||
runtime_dir.write_status(&shared_state).await?;
|
||||
runtime_dir.write_history(&shared_state).await?;
|
||||
|
|
@ -323,7 +324,11 @@ impl PodController {
|
|||
// Broadcast the accepted user message so every
|
||||
// subscriber (including the submitter) can
|
||||
// render the turn header + user line from a
|
||||
// single source of truth.
|
||||
// single source of truth. Mirror the segments
|
||||
// into shared_state so subsequent History fetches
|
||||
// can re-attach them to the corresponding worker
|
||||
// user_message item.
|
||||
shared_state.push_user_segments(input.clone());
|
||||
let _ = event_tx.send(Event::UserMessage {
|
||||
segments: input.clone(),
|
||||
});
|
||||
|
|
|
|||
|
|
@ -91,9 +91,40 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
|
|||
match method {
|
||||
Ok(Some(Method::GetHistory)) => {
|
||||
let items = handle.shared_state.history();
|
||||
let segments_per_user = handle.shared_state.user_segments();
|
||||
// Embed `segments` on user-message JSON values so
|
||||
// the TUI can re-render typed atoms on restore.
|
||||
// Alignment: segments are recorded only for
|
||||
// submissions made during the live session, never
|
||||
// for seed history loaded via `SessionStart.history`
|
||||
// (post-compaction). The seed user_messages always
|
||||
// come first in worker history, so the last
|
||||
// `segments_per_user.len()` user_messages are the
|
||||
// ones that map 1:1 to the segments list.
|
||||
let total_user_msgs =
|
||||
items.iter().filter(|i| i.is_user_message()).count();
|
||||
let skip = total_user_msgs.saturating_sub(segments_per_user.len());
|
||||
let mut user_idx = 0usize;
|
||||
let values = items
|
||||
.iter()
|
||||
.map(|item| serde_json::to_value(item).expect("Item is Serialize"))
|
||||
.map(|item| {
|
||||
let mut value =
|
||||
serde_json::to_value(item).expect("Item is Serialize");
|
||||
if item.is_user_message() {
|
||||
if user_idx >= skip {
|
||||
let seg_idx = user_idx - skip;
|
||||
if let Some(obj) = value.as_object_mut() {
|
||||
let segs = serde_json::to_value(
|
||||
&segments_per_user[seg_idx],
|
||||
)
|
||||
.expect("Segment is Serialize");
|
||||
obj.insert("segments".into(), segs);
|
||||
}
|
||||
}
|
||||
user_idx += 1;
|
||||
}
|
||||
value
|
||||
})
|
||||
.collect();
|
||||
let greeting = handle.shared_state.greeting.clone();
|
||||
if writer
|
||||
|
|
|
|||
|
|
@ -135,6 +135,13 @@ pub struct Pod<C: LlmClient, St: Store> {
|
|||
/// Restored from `RestoredState.extensions` on `restore`, updated
|
||||
/// after each successful extract via `save_extension`.
|
||||
extract_pointer: Mutex<Option<memory::ExtractPointerPayload>>,
|
||||
/// Typed user submissions in submit order. K-th entry corresponds to
|
||||
/// the K-th `Item::user_message` in `worker.history()` (modulo seed
|
||||
/// history loaded via `SessionStart.history`, whose original segments
|
||||
/// are not preserved). Populated from log on `restore_from_manifest`,
|
||||
/// appended after `save_user_input` on each `run`. Mirrored to
|
||||
/// `PodSharedState` by the controller for `Event::History` use.
|
||||
user_segments: Vec<Vec<Segment>>,
|
||||
}
|
||||
|
||||
impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||
|
|
@ -184,6 +191,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
inject_resident_knowledge: true,
|
||||
extract_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
extract_pointer: Mutex::new(None),
|
||||
user_segments: Vec::new(),
|
||||
};
|
||||
pod.apply_prune_from_manifest();
|
||||
Ok(pod)
|
||||
|
|
@ -280,6 +288,15 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
/// and reset by [`compact`](Self::compact) (the new compacted
|
||||
/// session has a fresh log with no `LogEntry::Extension` entries).
|
||||
/// Cheap clone via `Option<Clone>`.
|
||||
/// Snapshot of the typed user segments tracked alongside worker
|
||||
/// history. The K-th entry corresponds to the K-th `Item::user_message`
|
||||
/// derived from `LogEntry::UserInput` entries (post-compaction); seed
|
||||
/// history loaded via `SessionStart.history` does not contribute,
|
||||
/// which is acceptable because the original segments are unrecoverable.
|
||||
pub fn user_segments(&self) -> &[Vec<Segment>] {
|
||||
&self.user_segments
|
||||
}
|
||||
|
||||
pub fn extract_pointer(&self) -> Option<memory::ExtractPointerPayload> {
|
||||
self.extract_pointer
|
||||
.lock()
|
||||
|
|
@ -585,6 +602,18 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
self.ensure_system_prompt_materialized()?;
|
||||
self.ensure_session_head().await?;
|
||||
|
||||
// Persist the user input as typed segments before the worker
|
||||
// pushes its flattened copy into history. save_delta deliberately
|
||||
// skips the resulting `is_user_message()` item to avoid double-write.
|
||||
session_store::save_user_input(
|
||||
&self.store,
|
||||
self.session_id,
|
||||
&mut self.head_hash,
|
||||
input.clone(),
|
||||
)
|
||||
.await?;
|
||||
self.user_segments.push(input.clone());
|
||||
|
||||
let flattened = self.flatten_segments(&input);
|
||||
|
||||
let history_before = self.worker.as_ref().unwrap().history().len();
|
||||
|
|
@ -599,16 +628,15 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
}
|
||||
|
||||
/// Flatten a typed segment list into the single string the Worker
|
||||
/// receives as the user message. Inlines text and paste content;
|
||||
/// substitutes `[unresolved <kind>: <key>]` placeholders for
|
||||
/// segments that have no resolver, and emits a user-facing alert so
|
||||
/// neither the LLM nor the human is blind to the dropped intent.
|
||||
/// receives as the user message, and emit user-facing alerts for
|
||||
/// segments that fall through to placeholder (file/knowledge/workflow
|
||||
/// refs without a resolver, or unknown variants from a newer client).
|
||||
/// The text reconstruction itself comes from `Segment::flatten_to_text`,
|
||||
/// shared with replay paths that should not re-alert.
|
||||
fn flatten_segments(&self, segments: &[Segment]) -> String {
|
||||
let mut out = String::new();
|
||||
for seg in segments {
|
||||
match seg {
|
||||
Segment::Text { content } => out.push_str(content),
|
||||
Segment::Paste { content, .. } => out.push_str(content),
|
||||
Segment::Text { .. } | Segment::Paste { .. } => {}
|
||||
Segment::FileRef { path } => {
|
||||
self.alert(
|
||||
AlertLevel::Warn,
|
||||
|
|
@ -618,7 +646,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
(resolver not yet implemented); passed to LLM as placeholder"
|
||||
),
|
||||
);
|
||||
out.push_str(&format!("[unresolved file ref: {path}]"));
|
||||
}
|
||||
Segment::KnowledgeRef { slug } => {
|
||||
self.alert(
|
||||
|
|
@ -629,7 +656,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
(resolver not yet implemented); passed to LLM as placeholder"
|
||||
),
|
||||
);
|
||||
out.push_str(&format!("[unresolved knowledge ref: {slug}]"));
|
||||
}
|
||||
Segment::WorkflowInvoke { slug } => {
|
||||
self.alert(
|
||||
|
|
@ -640,7 +666,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
(resolver not yet implemented); passed to LLM as placeholder"
|
||||
),
|
||||
);
|
||||
out.push_str(&format!("[unresolved workflow invoke: {slug}]"));
|
||||
}
|
||||
Segment::Unknown => {
|
||||
self.alert(
|
||||
|
|
@ -650,11 +675,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
passed to LLM as placeholder"
|
||||
.into(),
|
||||
);
|
||||
out.push_str("[unknown input segment]");
|
||||
}
|
||||
}
|
||||
}
|
||||
out
|
||||
Segment::flatten_to_text(segments)
|
||||
}
|
||||
|
||||
/// Run a turn triggered by `Method::Notify` while the Pod is idle.
|
||||
|
|
@ -1539,6 +1563,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
inject_resident_knowledge: true,
|
||||
extract_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
extract_pointer: Mutex::new(None),
|
||||
user_segments: Vec::new(),
|
||||
};
|
||||
pod.apply_prune_from_manifest();
|
||||
Ok(pod)
|
||||
|
|
@ -1591,6 +1616,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
inject_resident_knowledge: true,
|
||||
extract_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
extract_pointer: Mutex::new(None),
|
||||
user_segments: Vec::new(),
|
||||
};
|
||||
pod.apply_prune_from_manifest();
|
||||
Ok(pod)
|
||||
|
|
@ -1698,6 +1724,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
inject_resident_knowledge: true,
|
||||
extract_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
extract_pointer: Mutex::new(extract_pointer),
|
||||
user_segments: state.user_segments,
|
||||
};
|
||||
pod.apply_prune_from_manifest();
|
||||
Ok(pod)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
use std::sync::RwLock;
|
||||
|
||||
use llm_worker::llm_client::types::Item;
|
||||
use protocol::Segment;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session_store::SessionId;
|
||||
|
||||
|
|
@ -15,6 +16,12 @@ pub struct PodSharedState {
|
|||
pub greeting: protocol::Greeting,
|
||||
pub status: RwLock<PodStatus>,
|
||||
pub history: RwLock<Vec<Item>>,
|
||||
/// Typed user submissions in submit order. The K-th entry corresponds
|
||||
/// to the K-th `Item::user_message` in `history` (modulo seed history
|
||||
/// loaded from a pre-compaction `SessionStart.history`, whose original
|
||||
/// segments are not preserved). Surfaced via `Event::History` so
|
||||
/// clients can re-render typed atoms on session restore.
|
||||
pub user_segments: RwLock<Vec<Vec<Segment>>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
|
|
@ -39,6 +46,26 @@ impl PodSharedState {
|
|||
greeting,
|
||||
status: RwLock::new(PodStatus::Idle),
|
||||
history: RwLock::new(Vec::new()),
|
||||
user_segments: RwLock::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn user_segments(&self) -> Vec<Vec<Segment>> {
|
||||
self.user_segments
|
||||
.read()
|
||||
.map(|s| s.clone())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
pub fn set_user_segments(&self, segments: Vec<Vec<Segment>>) {
|
||||
if let Ok(mut s) = self.user_segments.write() {
|
||||
*s = segments;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_user_segments(&self, segments: Vec<Segment>) {
|
||||
if let Ok(mut s) = self.user_segments.write() {
|
||||
s.push(segments);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -133,6 +133,37 @@ impl Segment {
|
|||
pub fn text(s: impl Into<String>) -> Self {
|
||||
Self::Text { content: s.into() }
|
||||
}
|
||||
|
||||
/// Flatten a segment slice into the single string the LLM receives
|
||||
/// as a user message. Pure — no I/O, no alerts. Callers that need
|
||||
/// to surface user-visible alerts for unresolved refs should do so
|
||||
/// alongside this call (Pod does so at submit time).
|
||||
///
|
||||
/// Unresolved variants (`FileRef` / `KnowledgeRef` / `WorkflowInvoke`)
|
||||
/// and `Unknown` map to `[unresolved <kind>: <key>]` placeholders so
|
||||
/// the LLM sees an explicit token rather than silent omission.
|
||||
pub fn flatten_to_text(segments: &[Segment]) -> String {
|
||||
let mut out = String::new();
|
||||
for seg in segments {
|
||||
match seg {
|
||||
Segment::Text { content } => out.push_str(content),
|
||||
Segment::Paste { content, .. } => out.push_str(content),
|
||||
Segment::FileRef { path } => {
|
||||
out.push_str(&format!("[unresolved file ref: {path}]"));
|
||||
}
|
||||
Segment::KnowledgeRef { slug } => {
|
||||
out.push_str(&format!("[unresolved knowledge ref: {slug}]"));
|
||||
}
|
||||
Segment::WorkflowInvoke { slug } => {
|
||||
out.push_str(&format!("[unresolved workflow invoke: {slug}]"));
|
||||
}
|
||||
Segment::Unknown => {
|
||||
out.push_str("[unknown input segment]");
|
||||
}
|
||||
}
|
||||
}
|
||||
out
|
||||
}
|
||||
}
|
||||
|
||||
impl Method {
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ uuid = { version = "1", features = ["v7", "serde"] }
|
|||
thiserror = "2.0"
|
||||
sha2 = "0.11.0"
|
||||
hex = "0.4.3"
|
||||
protocol = { version = "0.1.0", path = "../protocol" }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1.49", features = ["macros", "rt-multi-thread", "fs", "io-util"] }
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to
|
|||
pub use session::{
|
||||
SessionStartState, create_compacted_session, create_session, create_session_with_id,
|
||||
ensure_head_or_fork, fork, fork_at, restore, save_config_changed, save_delta, save_extension,
|
||||
save_run_completed, save_run_errored, save_turn_end, save_usage,
|
||||
save_run_completed, save_run_errored, save_turn_end, save_usage, save_user_input,
|
||||
};
|
||||
pub use llm_worker::UsageRecord;
|
||||
pub use llm_worker::llm_client::types::{ContentPart, Item, Role};
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ use crate::store::{Store, StoreError};
|
|||
use llm_worker::WorkerResult;
|
||||
use llm_worker::llm_client::RequestConfig;
|
||||
use llm_worker::llm_client::types::Item;
|
||||
use protocol::Segment;
|
||||
|
||||
/// State snapshot for creating a SessionStart entry.
|
||||
pub struct SessionStartState<'a> {
|
||||
|
|
@ -138,10 +139,37 @@ pub async fn ensure_head_or_fork(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Log a `UserInput` entry from the original typed `Vec<Segment>`.
|
||||
///
|
||||
/// Submit-time entry. Pod calls this at the head of a `Run` turn before
|
||||
/// the worker pushes its flattened user message into history; replay
|
||||
/// derives the worker `Item::user_message` from these segments via
|
||||
/// [`Segment::flatten_to_text`].
|
||||
pub async fn save_user_input(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
segments: Vec<Segment>,
|
||||
) -> Result<(), StoreError> {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
head_hash,
|
||||
LogEntry::UserInput {
|
||||
ts: session_log::now_millis(),
|
||||
segments,
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Log the history delta — new items added since the previous snapshot.
|
||||
///
|
||||
/// Classifies items into UserInput, AssistantItems, ToolResults, and
|
||||
/// HookInjectedItems entries automatically.
|
||||
/// Classifies items into AssistantItems, ToolResults, and HookInjectedItems
|
||||
/// entries automatically. User messages are skipped because they are
|
||||
/// persisted upfront via [`save_user_input`] at submit time; the worker
|
||||
/// pushes a flattened copy into its history that arrives here in
|
||||
/// `new_items` and would otherwise produce a duplicate `UserInput` entry.
|
||||
pub async fn save_delta(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
|
|
@ -158,16 +186,7 @@ pub async fn save_delta(
|
|||
while i < new_items.len() {
|
||||
let item = &new_items[i];
|
||||
if item.is_user_message() {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
head_hash,
|
||||
LogEntry::UserInput {
|
||||
ts,
|
||||
item: new_items[i].clone(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
// Already persisted by save_user_input at submit time.
|
||||
i += 1;
|
||||
} else if item.is_tool_result() {
|
||||
let start = i;
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@
|
|||
|
||||
use llm_worker::llm_client::types::{Item, RequestConfig};
|
||||
use llm_worker::{UsageRecord, WorkerResult};
|
||||
use protocol::Segment;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
|
|
@ -111,8 +112,12 @@ pub enum LogEntry {
|
|||
compacted_from: Option<SessionOrigin>,
|
||||
},
|
||||
|
||||
/// User input pushed to history (worker.rs:229).
|
||||
UserInput { ts: u64, item: Item },
|
||||
/// User input accepted at submit time. Carries the original typed
|
||||
/// `Vec<Segment>` so clients can re-render typed atoms (paste chips,
|
||||
/// file/knowledge refs, workflow invocations) on session restore.
|
||||
/// Replay flattens these into a `Item::user_message` for the worker
|
||||
/// history; the worker layer never sees segments directly.
|
||||
UserInput { ts: u64, segments: Vec<Segment> },
|
||||
|
||||
/// Assistant response items added to history (worker.rs:1040-1041).
|
||||
AssistantItems { ts: u64, items: Vec<LoggedItem> },
|
||||
|
|
@ -209,6 +214,13 @@ pub struct RestoredState {
|
|||
/// `LogEntry::Extension` を replay 順に積んだもの。`(domain, payload)`。
|
||||
/// session-store は domain を不透明扱いし、各ドメインが自前で fold する。
|
||||
pub extensions: Vec<(String, serde_json::Value)>,
|
||||
/// User submissions in original typed form, in submit order.
|
||||
/// One entry per `LogEntry::UserInput`; the K-th entry corresponds to
|
||||
/// the K-th `Item::user_message` derived during replay (modulo
|
||||
/// pre-compaction history seeded via `SessionStart.history`, whose
|
||||
/// original segments are not preserved). Used by clients to re-render
|
||||
/// typed atoms (paste chips, refs) on session restore.
|
||||
pub user_segments: Vec<Vec<Segment>>,
|
||||
}
|
||||
|
||||
/// Replay a sequence of hashed entries to reconstruct worker state.
|
||||
|
|
@ -222,6 +234,7 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState {
|
|||
head_hash: None,
|
||||
usage_history: Vec::new(),
|
||||
extensions: Vec::new(),
|
||||
user_segments: Vec::new(),
|
||||
};
|
||||
|
||||
for hashed in entries {
|
||||
|
|
@ -238,8 +251,10 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState {
|
|||
state.config = config.clone();
|
||||
state.history = history.iter().cloned().map(Item::from).collect();
|
||||
}
|
||||
LogEntry::UserInput { item, .. } => {
|
||||
state.history.push(item.clone());
|
||||
LogEntry::UserInput { segments, .. } => {
|
||||
let text = Segment::flatten_to_text(segments);
|
||||
state.history.push(Item::user_message(text));
|
||||
state.user_segments.push(segments.clone());
|
||||
}
|
||||
LogEntry::AssistantItems { items, .. } => {
|
||||
state
|
||||
|
|
@ -365,7 +380,7 @@ mod tests {
|
|||
},
|
||||
LogEntry::UserInput {
|
||||
ts: 2000,
|
||||
item: Item::user_message("Hello"),
|
||||
segments: vec![Segment::text("Hello")],
|
||||
},
|
||||
LogEntry::AssistantItems {
|
||||
ts: 3000,
|
||||
|
|
@ -400,7 +415,7 @@ mod tests {
|
|||
},
|
||||
LogEntry::UserInput {
|
||||
ts: 2000,
|
||||
item: Item::user_message("Check weather"),
|
||||
segments: vec![Segment::text("Check weather")],
|
||||
},
|
||||
LogEntry::AssistantItems {
|
||||
ts: 3000,
|
||||
|
|
@ -460,7 +475,7 @@ mod tests {
|
|||
},
|
||||
LogEntry::UserInput {
|
||||
ts: 2000,
|
||||
item: Item::user_message("Hello"),
|
||||
segments: vec![Segment::text("Hello")],
|
||||
},
|
||||
];
|
||||
let chain_a = build_chain(&raw);
|
||||
|
|
@ -473,11 +488,11 @@ mod tests {
|
|||
fn different_content_produces_different_hash() {
|
||||
let entry_a = LogEntry::UserInput {
|
||||
ts: 1000,
|
||||
item: Item::user_message("Hello"),
|
||||
segments: vec![Segment::text("Hello")],
|
||||
};
|
||||
let entry_b = LogEntry::UserInput {
|
||||
ts: 1000,
|
||||
item: Item::user_message("World"),
|
||||
segments: vec![Segment::text("World")],
|
||||
};
|
||||
let hash_a = compute_hash(None, &entry_a);
|
||||
let hash_b = compute_hash(None, &entry_b);
|
||||
|
|
@ -497,7 +512,7 @@ mod tests {
|
|||
},
|
||||
LogEntry::UserInput {
|
||||
ts: 2000,
|
||||
item: Item::user_message("hi"),
|
||||
segments: vec![Segment::text("hi")],
|
||||
},
|
||||
LogEntry::LlmUsage {
|
||||
ts: 2100,
|
||||
|
|
@ -545,7 +560,7 @@ mod tests {
|
|||
},
|
||||
LogEntry::UserInput {
|
||||
ts: 2000,
|
||||
item: Item::user_message("hi"),
|
||||
segments: vec![Segment::text("hi")],
|
||||
},
|
||||
]);
|
||||
let state = collect_state(&entries);
|
||||
|
|
@ -658,4 +673,80 @@ mod tests {
|
|||
let parsed = EntryHash::from_hex(&hex).unwrap();
|
||||
assert_eq!(hash, parsed);
|
||||
}
|
||||
|
||||
/// Mixed segments survive a JSON round-trip through `LogEntry::UserInput`,
|
||||
/// and `collect_state` derives `Item::user_message` from the flattened
|
||||
/// text while preserving the original segments separately. This covers
|
||||
/// the segments → flatten → Item replay path from the ticket.
|
||||
#[test]
|
||||
fn replay_user_input_segments_round_trip() {
|
||||
let segments = vec![
|
||||
Segment::Text {
|
||||
content: "see ".into(),
|
||||
},
|
||||
Segment::Paste {
|
||||
id: 1,
|
||||
chars: 12,
|
||||
lines: 2,
|
||||
content: "line1\nline2".into(),
|
||||
},
|
||||
Segment::FileRef {
|
||||
path: "src/main.rs".into(),
|
||||
},
|
||||
];
|
||||
let entry = LogEntry::UserInput {
|
||||
ts: 4242,
|
||||
segments: segments.clone(),
|
||||
};
|
||||
// Hash + JSON round-trip preserves the variant byte-for-byte.
|
||||
let json = serde_json::to_string(&entry).unwrap();
|
||||
let parsed: LogEntry = serde_json::from_str(&json).unwrap();
|
||||
let entries = build_chain(&[
|
||||
LogEntry::SessionStart {
|
||||
ts: 1,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
},
|
||||
parsed,
|
||||
]);
|
||||
let state = collect_state(&entries);
|
||||
// Worker history gets a flattened user_message item.
|
||||
assert_eq!(state.history.len(), 1);
|
||||
match &state.history[0] {
|
||||
Item::Message { role, content, .. } => {
|
||||
assert!(matches!(role, llm_worker::Role::User));
|
||||
assert_eq!(content.len(), 1);
|
||||
match &content[0] {
|
||||
llm_worker::ContentPart::Text { text } => {
|
||||
assert_eq!(
|
||||
text,
|
||||
"see line1\nline2[unresolved file ref: src/main.rs]"
|
||||
);
|
||||
}
|
||||
other => panic!("unexpected content: {other:?}"),
|
||||
}
|
||||
}
|
||||
other => panic!("unexpected variant: {other:?}"),
|
||||
}
|
||||
// Segments survive verbatim for client-side restore.
|
||||
assert_eq!(state.user_segments.len(), 1);
|
||||
assert_eq!(state.user_segments[0].len(), 3);
|
||||
match &state.user_segments[0][1] {
|
||||
Segment::Paste {
|
||||
id,
|
||||
chars,
|
||||
lines,
|
||||
content,
|
||||
} => {
|
||||
assert_eq!(*id, 1);
|
||||
assert_eq!(*chars, 12);
|
||||
assert_eq!(*lines, 2);
|
||||
assert_eq!(content, "line1\nline2");
|
||||
}
|
||||
other => panic!("expected Paste, got {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ async fn round_trip_write_and_read() {
|
|||
},
|
||||
LogEntry::UserInput {
|
||||
ts: 2000,
|
||||
item: Item::user_message("Hello"),
|
||||
segments: vec![protocol::Segment::text("Hello")],
|
||||
},
|
||||
LogEntry::AssistantItems {
|
||||
ts: 3000,
|
||||
|
|
@ -210,7 +210,7 @@ async fn read_head_hash_returns_last_entry_hash() {
|
|||
},
|
||||
LogEntry::UserInput {
|
||||
ts: 2000,
|
||||
item: Item::user_message("Hello"),
|
||||
segments: vec![protocol::Segment::text("Hello")],
|
||||
},
|
||||
]);
|
||||
|
||||
|
|
|
|||
|
|
@ -99,6 +99,18 @@ async fn run_and_persist(
|
|||
head_hash: &mut Option<EntryHash>,
|
||||
input: &str,
|
||||
) -> (Worker<MockLlmClient>, llm_worker::WorkerResult) {
|
||||
// Mirror Pod's run-entry contract: log the user input as segments
|
||||
// before the worker pushes its flattened user_message; save_delta
|
||||
// skips the resulting user_message item to avoid double-write.
|
||||
session_store::save_user_input(
|
||||
store,
|
||||
session_id,
|
||||
head_hash,
|
||||
vec![protocol::Segment::text(input)],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let history_before = worker.history().len();
|
||||
|
||||
let mut locked = worker.lock();
|
||||
|
|
@ -458,7 +470,7 @@ async fn session_auto_forks_on_conflict() {
|
|||
// Simulate another Pod writing to the same session behind our back
|
||||
let extra_entry = LogEntry::UserInput {
|
||||
ts: 9999,
|
||||
item: Item::user_message("Interloper"),
|
||||
segments: vec![protocol::Segment::text("Interloper")],
|
||||
};
|
||||
let current_head = store.read_head_hash(original_sid).await.unwrap();
|
||||
let hash = session_store::compute_hash(current_head.as_ref(), &extra_entry);
|
||||
|
|
@ -492,12 +504,8 @@ async fn session_auto_forks_on_conflict() {
|
|||
|
||||
// Original session should still have the interloper entry
|
||||
let original_entries = store.read_all(original_sid).await.unwrap();
|
||||
let has_interloper = original_entries.iter().any(|e| {
|
||||
if let LogEntry::UserInput { item, .. } = &e.entry {
|
||||
item.is_user_message()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
let has_interloper = original_entries
|
||||
.iter()
|
||||
.any(|e| matches!(&e.entry, LogEntry::UserInput { .. }));
|
||||
assert!(has_interloper);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -452,10 +452,24 @@ impl App {
|
|||
self.blocks.push(Block::TurnHeader {
|
||||
turn: self.turn_index,
|
||||
});
|
||||
if !text.is_empty() {
|
||||
self.blocks.push(Block::UserMessage {
|
||||
segments: vec![Segment::text(text)],
|
||||
// Pod attaches the original `Vec<Segment>` to
|
||||
// user messages from live submissions, so we
|
||||
// can rebuild typed atoms (paste chips, refs)
|
||||
// here. Seed history loaded post-compaction
|
||||
// has no `segments` field — fall back to a
|
||||
// single text segment.
|
||||
let segments = item
|
||||
.get("segments")
|
||||
.and_then(|v| serde_json::from_value::<Vec<Segment>>(v.clone()).ok())
|
||||
.unwrap_or_else(|| {
|
||||
if text.is_empty() {
|
||||
Vec::new()
|
||||
} else {
|
||||
vec![Segment::text(text.clone())]
|
||||
}
|
||||
});
|
||||
if !segments.is_empty() {
|
||||
self.blocks.push(Block::UserMessage { segments });
|
||||
}
|
||||
}
|
||||
"assistant" if !text.is_empty() => {
|
||||
|
|
|
|||
|
|
@ -21,8 +21,7 @@ use ratatui::widgets::Paragraph;
|
|||
use ratatui::{Frame, TerminalOptions, Viewport};
|
||||
use pod_registry::lookup_session;
|
||||
use session_store::{
|
||||
ContentPart, FsStore, HashedEntry, Item, LogEntry, LoggedContentPart, LoggedItem, SessionId,
|
||||
Store,
|
||||
FsStore, HashedEntry, LogEntry, LoggedContentPart, LoggedItem, SessionId, Store,
|
||||
};
|
||||
|
||||
const MAX_ROWS: usize = 10;
|
||||
|
|
@ -176,8 +175,9 @@ async fn build_preview(store: &FsStore, id: SessionId) -> String {
|
|||
fn last_message_preview(entries: &[HashedEntry]) -> Option<String> {
|
||||
for hashed in entries.iter().rev() {
|
||||
match &hashed.entry {
|
||||
LogEntry::UserInput { item, .. } => {
|
||||
if let Some(text) = first_text(item) {
|
||||
LogEntry::UserInput { segments, .. } => {
|
||||
let text = protocol::Segment::flatten_to_text(segments);
|
||||
if !text.is_empty() {
|
||||
return Some(format!("user: {}", trim_one_line(&text, 60)));
|
||||
}
|
||||
}
|
||||
|
|
@ -192,16 +192,6 @@ fn last_message_preview(entries: &[HashedEntry]) -> Option<String> {
|
|||
None
|
||||
}
|
||||
|
||||
fn first_text(item: &Item) -> Option<String> {
|
||||
match item {
|
||||
Item::Message { content, .. } => content.iter().find_map(|p| match p {
|
||||
ContentPart::Text { text } => Some(text.clone()),
|
||||
_ => None,
|
||||
}),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn first_text_logged(item: &LoggedItem) -> Option<String> {
|
||||
match item {
|
||||
LoggedItem::Message { content, .. } => content.iter().find_map(|p| match p {
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user