feat: live auto-fork の marker 形式を確定(seq 比較 + forked_from 記録)
方針: 末尾 entry-count 比較で検知し、元 Segment は immutable のまま (terminal marker を書き戻さない)。fork lineage は新 Segment の SegmentStart.forked_from に前向きに記録するため、log だけから辿れる。 過去 fork と対称で、nested fork も marker 位置の調停が不要。 - session-store ensure_head_or_fork に at_turn_index 引数を追加し 新 Segment へ forked_from を記録 - pod ensure_segment_head の auto-fork も同様に forked_from を記録 (at_turn_index = writer の現 turn_count) - fork_at の doc に「元 Segment を mutate しない」invariant を明記 - test: nested past-fork が祖先を不変に保つ / Pod 並行 writer drift で auto-fork し forked_from を記録 / 元 Segment に marker が書かれない
This commit is contained in:
parent
46b0e20685
commit
ac3ee5fcbe
|
|
@ -1672,9 +1672,13 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
// Auto-fork within the same Session: mint a fresh Segment and
|
// Auto-fork within the same Session: mint a fresh Segment and
|
||||||
// switch to it. The new SegmentStart entry replaces the mirror
|
// switch to it. The source segment is left immutable (no terminal
|
||||||
// and is broadcast through the sink so existing subscribers
|
// marker is written back); the fork relationship is recorded
|
||||||
// reset their view.
|
// forward on the new segment's `forked_from`, with `at_turn_index`
|
||||||
|
// = the writer's current turn (its in-memory history reflects
|
||||||
|
// state up to that turn). The new SegmentStart replaces the mirror
|
||||||
|
// and is broadcast through the sink so existing subscribers reset
|
||||||
|
// their view.
|
||||||
let fork_segment_id = session_store::new_segment_id();
|
let fork_segment_id = session_store::new_segment_id();
|
||||||
let entry = LogEntry::SegmentStart {
|
let entry = LogEntry::SegmentStart {
|
||||||
ts: segment_log::now_millis(),
|
ts: segment_log::now_millis(),
|
||||||
|
|
@ -1682,7 +1686,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
system_prompt: w.get_system_prompt().map(String::from),
|
system_prompt: w.get_system_prompt().map(String::from),
|
||||||
config: w.request_config().clone(),
|
config: w.request_config().clone(),
|
||||||
history: to_logged(w.history()),
|
history: to_logged(w.history()),
|
||||||
forked_from: None,
|
forked_from: Some(session_store::SegmentOrigin {
|
||||||
|
segment_id: loc.segment_id,
|
||||||
|
at_turn_index: w.turn_count(),
|
||||||
|
}),
|
||||||
compacted_from: None,
|
compacted_from: None,
|
||||||
};
|
};
|
||||||
self.store
|
self.store
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve
|
||||||
use llm_worker::llm_client::types::Item;
|
use llm_worker::llm_client::types::Item;
|
||||||
use llm_worker::llm_client::{ClientError, LlmClient, Request};
|
use llm_worker::llm_client::{ClientError, LlmClient, Request};
|
||||||
use protocol::Event;
|
use protocol::Event;
|
||||||
use session_store::FsStore;
|
use session_store::{FsStore, LogEntry, Store};
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
use pod::Pod;
|
use pod::Pod;
|
||||||
|
|
@ -213,6 +213,89 @@ fn system_texts_in_sink_session_start(
|
||||||
Vec::new()
|
Vec::new()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Live auto-fork: when another writer extends the segment behind the
|
||||||
|
/// Pod's back, the next run's `ensure_segment_head` detects the
|
||||||
|
/// entry-count drift and branches into a fresh segment **within the same
|
||||||
|
/// Session**. The source segment is left immutable (no terminal marker
|
||||||
|
/// written back); the new segment records its parentage forward via
|
||||||
|
/// `SegmentStart.forked_from`.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn concurrent_writer_drift_auto_forks_with_forked_from() {
|
||||||
|
// No compaction: keep run → run deterministic so each run consumes
|
||||||
|
// exactly one mock response and ensure_segment_head is the only fork
|
||||||
|
// trigger.
|
||||||
|
const NO_COMPACT_MANIFEST_TOML: &str = r#"
|
||||||
|
[pod]
|
||||||
|
name = "test-pod"
|
||||||
|
pwd = "./"
|
||||||
|
|
||||||
|
[model]
|
||||||
|
scheme = "anthropic"
|
||||||
|
model_id = "test-model"
|
||||||
|
|
||||||
|
[worker]
|
||||||
|
max_tokens = 100
|
||||||
|
|
||||||
|
[[scope.allow]]
|
||||||
|
target = "./"
|
||||||
|
permission = "write"
|
||||||
|
"#;
|
||||||
|
let client = MockClient::new(vec![single_text_events("first"), single_text_events("second")]);
|
||||||
|
let mut pod = make_pod_with_manifest(NO_COMPACT_MANIFEST_TOML, client).await;
|
||||||
|
|
||||||
|
pod.run_text("first").await.unwrap();
|
||||||
|
|
||||||
|
let store = pod.store().clone();
|
||||||
|
let session_id = pod.session_id();
|
||||||
|
let source_segment_id = pod.segment_id();
|
||||||
|
let source_len_before = store.read_all(session_id, source_segment_id).unwrap().len();
|
||||||
|
|
||||||
|
// Simulate a foreign writer appending to the same segment. This bumps
|
||||||
|
// the on-disk entry count past the Pod's own append tally without
|
||||||
|
// updating the Pod's `entries_written`.
|
||||||
|
store
|
||||||
|
.append(
|
||||||
|
session_id,
|
||||||
|
source_segment_id,
|
||||||
|
&LogEntry::UserInput {
|
||||||
|
ts: 9999,
|
||||||
|
segments: vec![protocol::Segment::text("interloper")],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Next run triggers ensure_segment_head, which sees the drift.
|
||||||
|
pod.run_text("second").await.unwrap();
|
||||||
|
|
||||||
|
// The Pod moved to a new segment in the same Session.
|
||||||
|
let new_segment_id = pod.segment_id();
|
||||||
|
assert_ne!(new_segment_id, source_segment_id);
|
||||||
|
assert_eq!(pod.session_id(), session_id, "auto-fork stays in-Session");
|
||||||
|
|
||||||
|
// New segment records forked_from pointing at the source.
|
||||||
|
let new_entries = store.read_all(session_id, new_segment_id).unwrap();
|
||||||
|
match &new_entries[0] {
|
||||||
|
LogEntry::SegmentStart {
|
||||||
|
session_id: seg_session,
|
||||||
|
forked_from: Some(origin),
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
assert_eq!(*seg_session, session_id);
|
||||||
|
assert_eq!(origin.segment_id, source_segment_id);
|
||||||
|
}
|
||||||
|
other => panic!("expected SegmentStart with forked_from, got {other:?}"),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Source segment is unchanged except for the foreign append — the
|
||||||
|
// auto-fork wrote no terminal marker back into it.
|
||||||
|
let source_after = store.read_all(session_id, source_segment_id).unwrap();
|
||||||
|
assert_eq!(source_after.len(), source_len_before + 1);
|
||||||
|
assert!(matches!(
|
||||||
|
source_after.last(),
|
||||||
|
Some(LogEntry::UserInput { .. })
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn compact_emits_session_start_carrying_summary_and_task_snapshot() {
|
async fn compact_emits_session_start_carrying_summary_and_task_snapshot() {
|
||||||
let client = MockClient::new(vec![
|
let client = MockClient::new(vec![
|
||||||
|
|
|
||||||
|
|
@ -114,8 +114,30 @@ pub fn restore_by_segment(
|
||||||
restore(store, session_id, segment_id)
|
restore(store, session_id, segment_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if the store's entry count still matches the writer's tally.
|
/// Live auto-fork on concurrent-writer detection.
|
||||||
/// If not, auto-fork into a new segment within the same Session.
|
///
|
||||||
|
/// Checks whether the store's on-disk entry count still matches the
|
||||||
|
/// writer's own append tally. If they match, the writer still owns the
|
||||||
|
/// segment tail and nothing happens. If the store has grown behind the
|
||||||
|
/// writer's back, another process appended to the same segment — so we
|
||||||
|
/// branch into a fresh segment within the same Session.
|
||||||
|
///
|
||||||
|
/// # Marker form
|
||||||
|
///
|
||||||
|
/// Detection is by **tail entry-count comparison**, not by writing a
|
||||||
|
/// terminal marker into the source segment. The source segment is left
|
||||||
|
/// completely immutable — identical to the past-fork ([`fork_at`])
|
||||||
|
/// invariant. The fork relationship is instead recorded forward on the
|
||||||
|
/// *new* segment's `SegmentStart.forked_from`, so the lineage is still
|
||||||
|
/// reconstructable from the logs alone (read each segment's
|
||||||
|
/// `SegmentStart`; follow `forked_from` / `compacted_from` backward).
|
||||||
|
/// Listing a parent's children is a cheap `list_segments(session_id)`
|
||||||
|
/// scan filtered on `forked_from.segment_id`.
|
||||||
|
///
|
||||||
|
/// `at_turn_index` is the writer's current `turn_count`: the fork seeds
|
||||||
|
/// the new segment with the writer's in-memory history (which reflects
|
||||||
|
/// state up to that turn), so that is the divergence point relative to
|
||||||
|
/// the now-diverged source segment.
|
||||||
///
|
///
|
||||||
/// Updates `segment_id` and `entries_written` in place when a fork occurs.
|
/// Updates `segment_id` and `entries_written` in place when a fork occurs.
|
||||||
pub fn ensure_head_or_fork(
|
pub fn ensure_head_or_fork(
|
||||||
|
|
@ -123,12 +145,14 @@ pub fn ensure_head_or_fork(
|
||||||
session_id: SessionId,
|
session_id: SessionId,
|
||||||
segment_id: &mut SegmentId,
|
segment_id: &mut SegmentId,
|
||||||
entries_written: &mut usize,
|
entries_written: &mut usize,
|
||||||
|
at_turn_index: usize,
|
||||||
state: SegmentStartState<'_>,
|
state: SegmentStartState<'_>,
|
||||||
) -> Result<(), StoreError> {
|
) -> Result<(), StoreError> {
|
||||||
let store_count = store.read_entry_count(session_id, *segment_id)?;
|
let store_count = store.read_entry_count(session_id, *segment_id)?;
|
||||||
if store_count == *entries_written {
|
if store_count == *entries_written {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
let source_segment_id = *segment_id;
|
||||||
let fork_id = crate::new_segment_id();
|
let fork_id = crate::new_segment_id();
|
||||||
let entry = LogEntry::SegmentStart {
|
let entry = LogEntry::SegmentStart {
|
||||||
ts: segment_log::now_millis(),
|
ts: segment_log::now_millis(),
|
||||||
|
|
@ -136,7 +160,10 @@ pub fn ensure_head_or_fork(
|
||||||
system_prompt: state.system_prompt.map(String::from),
|
system_prompt: state.system_prompt.map(String::from),
|
||||||
config: state.config.clone(),
|
config: state.config.clone(),
|
||||||
history: to_logged(state.history),
|
history: to_logged(state.history),
|
||||||
forked_from: None,
|
forked_from: Some(SegmentOrigin {
|
||||||
|
segment_id: source_segment_id,
|
||||||
|
at_turn_index,
|
||||||
|
}),
|
||||||
compacted_from: None,
|
compacted_from: None,
|
||||||
};
|
};
|
||||||
store.create_segment(session_id, fork_id, &[entry])?;
|
store.create_segment(session_id, fork_id, &[entry])?;
|
||||||
|
|
@ -425,6 +452,14 @@ pub fn fork(
|
||||||
/// `TurnEnd` in the source segment that the fork should branch from.
|
/// `TurnEnd` in the source segment that the fork should branch from.
|
||||||
/// Replay collects state up to and including that `TurnEnd`; entries
|
/// Replay collects state up to and including that `TurnEnd`; entries
|
||||||
/// after it are not carried into the new segment.
|
/// after it are not carried into the new segment.
|
||||||
|
///
|
||||||
|
/// # Invariant: the source segment is never mutated
|
||||||
|
///
|
||||||
|
/// Past-fork only reads the source and seeds a brand-new segment. It
|
||||||
|
/// writes no marker back into the source — exactly like live auto-fork
|
||||||
|
/// ([`ensure_head_or_fork`]). This keeps nested past-forks simple: a
|
||||||
|
/// fork of a fork just reads its own source and branches again, with no
|
||||||
|
/// marker-position bookkeeping to reconcile across the chain.
|
||||||
pub fn fork_at(
|
pub fn fork_at(
|
||||||
store: &impl Store,
|
store: &impl Store,
|
||||||
source_session_id: SessionId,
|
source_session_id: SessionId,
|
||||||
|
|
|
||||||
|
|
@ -459,6 +459,7 @@ async fn session_auto_forks_on_conflict() {
|
||||||
sid,
|
sid,
|
||||||
&mut segment_id,
|
&mut segment_id,
|
||||||
&mut entries_written,
|
&mut entries_written,
|
||||||
|
/* at_turn_index */ 0,
|
||||||
SegmentStartState {
|
SegmentStartState {
|
||||||
system_prompt: worker_a.get_system_prompt(),
|
system_prompt: worker_a.get_system_prompt(),
|
||||||
config: worker_a.request_config(),
|
config: worker_a.request_config(),
|
||||||
|
|
@ -476,10 +477,81 @@ async fn session_auto_forks_on_conflict() {
|
||||||
let fork_state = collect_state(&fork_entries);
|
let fork_state = collect_state(&fork_entries);
|
||||||
assert_eq!(fork_state.session_id, Some(sid), "auto-fork inherits Session");
|
assert_eq!(fork_state.session_id, Some(sid), "auto-fork inherits Session");
|
||||||
|
|
||||||
// Original segment should still have the interloper entry
|
// The new segment records its lineage forward via forked_from; the
|
||||||
|
// source segment is left immutable (no terminal marker written back).
|
||||||
|
match &fork_entries[0] {
|
||||||
|
LogEntry::SegmentStart {
|
||||||
|
forked_from: Some(origin),
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
assert_eq!(origin.segment_id, original_segid);
|
||||||
|
assert_eq!(origin.at_turn_index, 0);
|
||||||
|
}
|
||||||
|
other => panic!("expected SegmentStart with forked_from, got {other:?}"),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Original segment should still have the interloper entry and NO
|
||||||
|
// terminal fork marker — it is byte-for-byte unchanged.
|
||||||
let original_entries = store.read_all(sid, original_segid).unwrap();
|
let original_entries = store.read_all(sid, original_segid).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
original_entries.len(),
|
||||||
|
2,
|
||||||
|
"source segment holds only SegmentStart + interloper UserInput"
|
||||||
|
);
|
||||||
let has_interloper = original_entries
|
let has_interloper = original_entries
|
||||||
.iter()
|
.iter()
|
||||||
.any(|e| matches!(e, LogEntry::UserInput { .. }));
|
.any(|e| matches!(e, LogEntry::UserInput { .. }));
|
||||||
assert!(has_interloper);
|
assert!(has_interloper);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Nested past-fork: forking a segment that is itself a fork must not
|
||||||
|
/// require touching any ancestor. Each `fork_at` only reads its direct
|
||||||
|
/// source and seeds a new segment, so a chain of forks composes cleanly.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn nested_past_fork_leaves_ancestors_immutable() {
|
||||||
|
let (_dir, store) = make_store();
|
||||||
|
let client = MockLlmClient::new(simple_text_events());
|
||||||
|
let worker = Worker::new(client);
|
||||||
|
|
||||||
|
let (sid, root_segid) = session_store::create_segment(
|
||||||
|
&store,
|
||||||
|
SegmentStartState {
|
||||||
|
system_prompt: worker.get_system_prompt(),
|
||||||
|
config: worker.request_config(),
|
||||||
|
history: worker.history(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let (worker, _) = run_and_persist(worker, &store, sid, root_segid, "Hello").await;
|
||||||
|
let root_before = store.read_all(sid, root_segid).unwrap();
|
||||||
|
|
||||||
|
// First past-fork at the completed turn.
|
||||||
|
let fork1 = session_store::fork_at(&store, sid, root_segid, worker.turn_count()).unwrap();
|
||||||
|
// Fork the fork (turn 0 = right after its SegmentStart seed).
|
||||||
|
let fork2 = session_store::fork_at(&store, sid, fork1, 0).unwrap();
|
||||||
|
|
||||||
|
// All three are distinct, all in the same Session.
|
||||||
|
assert_ne!(fork1, root_segid);
|
||||||
|
assert_ne!(fork2, fork1);
|
||||||
|
for seg in [root_segid, fork1, fork2] {
|
||||||
|
assert_eq!(
|
||||||
|
collect_state(&store.read_all(sid, seg).unwrap()).session_id,
|
||||||
|
Some(sid)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// The root and fork1 are untouched by forking their descendants.
|
||||||
|
assert_eq!(store.read_all(sid, root_segid).unwrap().len(), root_before.len());
|
||||||
|
let fork1_entries = store.read_all(sid, fork1).unwrap();
|
||||||
|
assert_eq!(fork1_entries.len(), 1, "fork1 is just its SegmentStart seed");
|
||||||
|
|
||||||
|
// fork2's lineage points at fork1, not the root.
|
||||||
|
match &store.read_all(sid, fork2).unwrap()[0] {
|
||||||
|
LogEntry::SegmentStart {
|
||||||
|
forked_from: Some(origin),
|
||||||
|
..
|
||||||
|
} => assert_eq!(origin.segment_id, fork1),
|
||||||
|
other => panic!("expected SegmentStart with forked_from, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user