From 077efee13bb57a35e482520cba62c9288c1305bf Mon Sep 17 00:00:00 2001 From: Hare Date: Wed, 20 May 2026 06:42:09 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20live=20auto-fork=20=E3=81=AE=20marker?= =?UTF-8?q?=20=E5=BD=A2=E5=BC=8F=E3=82=92=E7=A2=BA=E5=AE=9A=EF=BC=88seq=20?= =?UTF-8?q?=E6=AF=94=E8=BC=83=20+=20forked=5Ffrom=20=E8=A8=98=E9=8C=B2?= =?UTF-8?q?=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 方針: 末尾 entry-count 比較で検知し、元 Segment は immutable のまま (terminal marker を書き戻さない)。fork lineage は新 Segment の SegmentStart.forked_from に前向きに記録するため、log だけから辿れる。 過去 fork と対称で、nested fork も marker 位置の調停が不要。 - session-store ensure_head_or_fork に at_turn_index 引数を追加し 新 Segment へ forked_from を記録 - pod ensure_segment_head の auto-fork も同様に forked_from を記録 (at_turn_index = writer の現 turn_count) - fork_at の doc に「元 Segment を mutate しない」invariant を明記 - test: nested past-fork が祖先を不変に保つ / Pod 並行 writer drift で auto-fork し forked_from を記録 / 元 Segment に marker が書かれない --- crates/pod/src/pod.rs | 15 +++- crates/pod/tests/compact_events_test.rs | 85 +++++++++++++++++++++- crates/session-store/src/segment.rs | 41 ++++++++++- crates/session-store/tests/session_test.rs | 74 ++++++++++++++++++- 4 files changed, 206 insertions(+), 9 deletions(-) diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index def8d557..c88e2276 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -1672,9 +1672,13 @@ impl Pod { return Ok(()); } // Auto-fork within the same Session: mint a fresh Segment and - // switch to it. The new SegmentStart entry replaces the mirror - // and is broadcast through the sink so existing subscribers - // reset their view. + // switch to it. The source segment is left immutable (no terminal + // marker is written back); the fork relationship is recorded + // forward on the new segment's `forked_from`, with `at_turn_index` + // = the writer's current turn (its in-memory history reflects + // state up to that turn). The new SegmentStart replaces the mirror + // and is broadcast through the sink so existing subscribers reset + // their view. let fork_segment_id = session_store::new_segment_id(); let entry = LogEntry::SegmentStart { ts: segment_log::now_millis(), @@ -1682,7 +1686,10 @@ impl Pod { system_prompt: w.get_system_prompt().map(String::from), config: w.request_config().clone(), history: to_logged(w.history()), - forked_from: None, + forked_from: Some(session_store::SegmentOrigin { + segment_id: loc.segment_id, + at_turn_index: w.turn_count(), + }), compacted_from: None, }; self.store diff --git a/crates/pod/tests/compact_events_test.rs b/crates/pod/tests/compact_events_test.rs index 3c6d13de..1376b85c 100644 --- a/crates/pod/tests/compact_events_test.rs +++ b/crates/pod/tests/compact_events_test.rs @@ -17,7 +17,7 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve use llm_worker::llm_client::types::Item; use llm_worker::llm_client::{ClientError, LlmClient, Request}; use protocol::Event; -use session_store::FsStore; +use session_store::{FsStore, LogEntry, Store}; use tokio::sync::broadcast; use pod::Pod; @@ -213,6 +213,89 @@ fn system_texts_in_sink_session_start( Vec::new() } +/// Live auto-fork: when another writer extends the segment behind the +/// Pod's back, the next run's `ensure_segment_head` detects the +/// entry-count drift and branches into a fresh segment **within the same +/// Session**. The source segment is left immutable (no terminal marker +/// written back); the new segment records its parentage forward via +/// `SegmentStart.forked_from`. +#[tokio::test] +async fn concurrent_writer_drift_auto_forks_with_forked_from() { + // No compaction: keep run → run deterministic so each run consumes + // exactly one mock response and ensure_segment_head is the only fork + // trigger. + const NO_COMPACT_MANIFEST_TOML: &str = r#" +[pod] +name = "test-pod" +pwd = "./" + +[model] +scheme = "anthropic" +model_id = "test-model" + +[worker] +max_tokens = 100 + +[[scope.allow]] +target = "./" +permission = "write" +"#; + let client = MockClient::new(vec![single_text_events("first"), single_text_events("second")]); + let mut pod = make_pod_with_manifest(NO_COMPACT_MANIFEST_TOML, client).await; + + pod.run_text("first").await.unwrap(); + + let store = pod.store().clone(); + let session_id = pod.session_id(); + let source_segment_id = pod.segment_id(); + let source_len_before = store.read_all(session_id, source_segment_id).unwrap().len(); + + // Simulate a foreign writer appending to the same segment. This bumps + // the on-disk entry count past the Pod's own append tally without + // updating the Pod's `entries_written`. + store + .append( + session_id, + source_segment_id, + &LogEntry::UserInput { + ts: 9999, + segments: vec![protocol::Segment::text("interloper")], + }, + ) + .unwrap(); + + // Next run triggers ensure_segment_head, which sees the drift. + pod.run_text("second").await.unwrap(); + + // The Pod moved to a new segment in the same Session. + let new_segment_id = pod.segment_id(); + assert_ne!(new_segment_id, source_segment_id); + assert_eq!(pod.session_id(), session_id, "auto-fork stays in-Session"); + + // New segment records forked_from pointing at the source. + let new_entries = store.read_all(session_id, new_segment_id).unwrap(); + match &new_entries[0] { + LogEntry::SegmentStart { + session_id: seg_session, + forked_from: Some(origin), + .. + } => { + assert_eq!(*seg_session, session_id); + assert_eq!(origin.segment_id, source_segment_id); + } + other => panic!("expected SegmentStart with forked_from, got {other:?}"), + } + + // Source segment is unchanged except for the foreign append — the + // auto-fork wrote no terminal marker back into it. + let source_after = store.read_all(session_id, source_segment_id).unwrap(); + assert_eq!(source_after.len(), source_len_before + 1); + assert!(matches!( + source_after.last(), + Some(LogEntry::UserInput { .. }) + )); +} + #[tokio::test] async fn compact_emits_session_start_carrying_summary_and_task_snapshot() { let client = MockClient::new(vec![ diff --git a/crates/session-store/src/segment.rs b/crates/session-store/src/segment.rs index 00b62f65..503a5675 100644 --- a/crates/session-store/src/segment.rs +++ b/crates/session-store/src/segment.rs @@ -114,8 +114,30 @@ pub fn restore_by_segment( restore(store, session_id, segment_id) } -/// Check if the store's entry count still matches the writer's tally. -/// If not, auto-fork into a new segment within the same Session. +/// Live auto-fork on concurrent-writer detection. +/// +/// Checks whether the store's on-disk entry count still matches the +/// writer's own append tally. If they match, the writer still owns the +/// segment tail and nothing happens. If the store has grown behind the +/// writer's back, another process appended to the same segment — so we +/// branch into a fresh segment within the same Session. +/// +/// # Marker form +/// +/// Detection is by **tail entry-count comparison**, not by writing a +/// terminal marker into the source segment. The source segment is left +/// completely immutable — identical to the past-fork ([`fork_at`]) +/// invariant. The fork relationship is instead recorded forward on the +/// *new* segment's `SegmentStart.forked_from`, so the lineage is still +/// reconstructable from the logs alone (read each segment's +/// `SegmentStart`; follow `forked_from` / `compacted_from` backward). +/// Listing a parent's children is a cheap `list_segments(session_id)` +/// scan filtered on `forked_from.segment_id`. +/// +/// `at_turn_index` is the writer's current `turn_count`: the fork seeds +/// the new segment with the writer's in-memory history (which reflects +/// state up to that turn), so that is the divergence point relative to +/// the now-diverged source segment. /// /// Updates `segment_id` and `entries_written` in place when a fork occurs. pub fn ensure_head_or_fork( @@ -123,12 +145,14 @@ pub fn ensure_head_or_fork( session_id: SessionId, segment_id: &mut SegmentId, entries_written: &mut usize, + at_turn_index: usize, state: SegmentStartState<'_>, ) -> Result<(), StoreError> { let store_count = store.read_entry_count(session_id, *segment_id)?; if store_count == *entries_written { return Ok(()); } + let source_segment_id = *segment_id; let fork_id = crate::new_segment_id(); let entry = LogEntry::SegmentStart { ts: segment_log::now_millis(), @@ -136,7 +160,10 @@ pub fn ensure_head_or_fork( system_prompt: state.system_prompt.map(String::from), config: state.config.clone(), history: to_logged(state.history), - forked_from: None, + forked_from: Some(SegmentOrigin { + segment_id: source_segment_id, + at_turn_index, + }), compacted_from: None, }; store.create_segment(session_id, fork_id, &[entry])?; @@ -425,6 +452,14 @@ pub fn fork( /// `TurnEnd` in the source segment that the fork should branch from. /// Replay collects state up to and including that `TurnEnd`; entries /// after it are not carried into the new segment. +/// +/// # Invariant: the source segment is never mutated +/// +/// Past-fork only reads the source and seeds a brand-new segment. It +/// writes no marker back into the source — exactly like live auto-fork +/// ([`ensure_head_or_fork`]). This keeps nested past-forks simple: a +/// fork of a fork just reads its own source and branches again, with no +/// marker-position bookkeeping to reconcile across the chain. pub fn fork_at( store: &impl Store, source_session_id: SessionId, diff --git a/crates/session-store/tests/session_test.rs b/crates/session-store/tests/session_test.rs index e05ab309..a18e8ac8 100644 --- a/crates/session-store/tests/session_test.rs +++ b/crates/session-store/tests/session_test.rs @@ -459,6 +459,7 @@ async fn session_auto_forks_on_conflict() { sid, &mut segment_id, &mut entries_written, + /* at_turn_index */ 0, SegmentStartState { system_prompt: worker_a.get_system_prompt(), config: worker_a.request_config(), @@ -476,10 +477,81 @@ async fn session_auto_forks_on_conflict() { let fork_state = collect_state(&fork_entries); assert_eq!(fork_state.session_id, Some(sid), "auto-fork inherits Session"); - // Original segment should still have the interloper entry + // The new segment records its lineage forward via forked_from; the + // source segment is left immutable (no terminal marker written back). + match &fork_entries[0] { + LogEntry::SegmentStart { + forked_from: Some(origin), + .. + } => { + assert_eq!(origin.segment_id, original_segid); + assert_eq!(origin.at_turn_index, 0); + } + other => panic!("expected SegmentStart with forked_from, got {other:?}"), + } + + // Original segment should still have the interloper entry and NO + // terminal fork marker — it is byte-for-byte unchanged. let original_entries = store.read_all(sid, original_segid).unwrap(); + assert_eq!( + original_entries.len(), + 2, + "source segment holds only SegmentStart + interloper UserInput" + ); let has_interloper = original_entries .iter() .any(|e| matches!(e, LogEntry::UserInput { .. })); assert!(has_interloper); } + +/// Nested past-fork: forking a segment that is itself a fork must not +/// require touching any ancestor. Each `fork_at` only reads its direct +/// source and seeds a new segment, so a chain of forks composes cleanly. +#[tokio::test] +async fn nested_past_fork_leaves_ancestors_immutable() { + let (_dir, store) = make_store(); + let client = MockLlmClient::new(simple_text_events()); + let worker = Worker::new(client); + + let (sid, root_segid) = session_store::create_segment( + &store, + SegmentStartState { + system_prompt: worker.get_system_prompt(), + config: worker.request_config(), + history: worker.history(), + }, + ) + .unwrap(); + + let (worker, _) = run_and_persist(worker, &store, sid, root_segid, "Hello").await; + let root_before = store.read_all(sid, root_segid).unwrap(); + + // First past-fork at the completed turn. + let fork1 = session_store::fork_at(&store, sid, root_segid, worker.turn_count()).unwrap(); + // Fork the fork (turn 0 = right after its SegmentStart seed). + let fork2 = session_store::fork_at(&store, sid, fork1, 0).unwrap(); + + // All three are distinct, all in the same Session. + assert_ne!(fork1, root_segid); + assert_ne!(fork2, fork1); + for seg in [root_segid, fork1, fork2] { + assert_eq!( + collect_state(&store.read_all(sid, seg).unwrap()).session_id, + Some(sid) + ); + } + + // The root and fork1 are untouched by forking their descendants. + assert_eq!(store.read_all(sid, root_segid).unwrap().len(), root_before.len()); + let fork1_entries = store.read_all(sid, fork1).unwrap(); + assert_eq!(fork1_entries.len(), 1, "fork1 is just its SegmentStart seed"); + + // fork2's lineage points at fork1, not the root. + match &store.read_all(sid, fork2).unwrap()[0] { + LogEntry::SegmentStart { + forked_from: Some(origin), + .. + } => assert_eq!(origin.segment_id, fork1), + other => panic!("expected SegmentStart with forked_from, got {other:?}"), + } +}