feat: live auto-fork の marker 形式を確定(seq 比較 + forked_from 記録)
方針: 末尾 entry-count 比較で検知し、元 Segment は immutable のまま (terminal marker を書き戻さない)。fork lineage は新 Segment の SegmentStart.forked_from に前向きに記録するため、log だけから辿れる。 過去 fork と対称で、nested fork も marker 位置の調停が不要。 - session-store ensure_head_or_fork に at_turn_index 引数を追加し 新 Segment へ forked_from を記録 - pod ensure_segment_head の auto-fork も同様に forked_from を記録 (at_turn_index = writer の現 turn_count) - fork_at の doc に「元 Segment を mutate しない」invariant を明記 - test: nested past-fork が祖先を不変に保つ / Pod 並行 writer drift で auto-fork し forked_from を記録 / 元 Segment に marker が書かれない
This commit is contained in:
parent
c47a539278
commit
a9340a8817
|
|
@ -1672,9 +1672,13 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
return Ok(());
|
||||
}
|
||||
// Auto-fork within the same Session: mint a fresh Segment and
|
||||
// switch to it. The new SegmentStart entry replaces the mirror
|
||||
// and is broadcast through the sink so existing subscribers
|
||||
// reset their view.
|
||||
// switch to it. The source segment is left immutable (no terminal
|
||||
// marker is written back); the fork relationship is recorded
|
||||
// forward on the new segment's `forked_from`, with `at_turn_index`
|
||||
// = the writer's current turn (its in-memory history reflects
|
||||
// state up to that turn). The new SegmentStart replaces the mirror
|
||||
// and is broadcast through the sink so existing subscribers reset
|
||||
// their view.
|
||||
let fork_segment_id = session_store::new_segment_id();
|
||||
let entry = LogEntry::SegmentStart {
|
||||
ts: segment_log::now_millis(),
|
||||
|
|
@ -1682,7 +1686,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
system_prompt: w.get_system_prompt().map(String::from),
|
||||
config: w.request_config().clone(),
|
||||
history: to_logged(w.history()),
|
||||
forked_from: None,
|
||||
forked_from: Some(session_store::SegmentOrigin {
|
||||
segment_id: loc.segment_id,
|
||||
at_turn_index: w.turn_count(),
|
||||
}),
|
||||
compacted_from: None,
|
||||
};
|
||||
self.store
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve
|
|||
use llm_worker::llm_client::types::Item;
|
||||
use llm_worker::llm_client::{ClientError, LlmClient, Request};
|
||||
use protocol::Event;
|
||||
use session_store::FsStore;
|
||||
use session_store::{FsStore, LogEntry, Store};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use pod::Pod;
|
||||
|
|
@ -213,6 +213,89 @@ fn system_texts_in_sink_session_start(
|
|||
Vec::new()
|
||||
}
|
||||
|
||||
/// Live auto-fork: when another writer extends the segment behind the
|
||||
/// Pod's back, the next run's `ensure_segment_head` detects the
|
||||
/// entry-count drift and branches into a fresh segment **within the same
|
||||
/// Session**. The source segment is left immutable (no terminal marker
|
||||
/// written back); the new segment records its parentage forward via
|
||||
/// `SegmentStart.forked_from`.
|
||||
#[tokio::test]
|
||||
async fn concurrent_writer_drift_auto_forks_with_forked_from() {
|
||||
// No compaction: keep run → run deterministic so each run consumes
|
||||
// exactly one mock response and ensure_segment_head is the only fork
|
||||
// trigger.
|
||||
const NO_COMPACT_MANIFEST_TOML: &str = r#"
|
||||
[pod]
|
||||
name = "test-pod"
|
||||
pwd = "./"
|
||||
|
||||
[model]
|
||||
scheme = "anthropic"
|
||||
model_id = "test-model"
|
||||
|
||||
[worker]
|
||||
max_tokens = 100
|
||||
|
||||
[[scope.allow]]
|
||||
target = "./"
|
||||
permission = "write"
|
||||
"#;
|
||||
let client = MockClient::new(vec![single_text_events("first"), single_text_events("second")]);
|
||||
let mut pod = make_pod_with_manifest(NO_COMPACT_MANIFEST_TOML, client).await;
|
||||
|
||||
pod.run_text("first").await.unwrap();
|
||||
|
||||
let store = pod.store().clone();
|
||||
let session_id = pod.session_id();
|
||||
let source_segment_id = pod.segment_id();
|
||||
let source_len_before = store.read_all(session_id, source_segment_id).unwrap().len();
|
||||
|
||||
// Simulate a foreign writer appending to the same segment. This bumps
|
||||
// the on-disk entry count past the Pod's own append tally without
|
||||
// updating the Pod's `entries_written`.
|
||||
store
|
||||
.append(
|
||||
session_id,
|
||||
source_segment_id,
|
||||
&LogEntry::UserInput {
|
||||
ts: 9999,
|
||||
segments: vec![protocol::Segment::text("interloper")],
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Next run triggers ensure_segment_head, which sees the drift.
|
||||
pod.run_text("second").await.unwrap();
|
||||
|
||||
// The Pod moved to a new segment in the same Session.
|
||||
let new_segment_id = pod.segment_id();
|
||||
assert_ne!(new_segment_id, source_segment_id);
|
||||
assert_eq!(pod.session_id(), session_id, "auto-fork stays in-Session");
|
||||
|
||||
// New segment records forked_from pointing at the source.
|
||||
let new_entries = store.read_all(session_id, new_segment_id).unwrap();
|
||||
match &new_entries[0] {
|
||||
LogEntry::SegmentStart {
|
||||
session_id: seg_session,
|
||||
forked_from: Some(origin),
|
||||
..
|
||||
} => {
|
||||
assert_eq!(*seg_session, session_id);
|
||||
assert_eq!(origin.segment_id, source_segment_id);
|
||||
}
|
||||
other => panic!("expected SegmentStart with forked_from, got {other:?}"),
|
||||
}
|
||||
|
||||
// Source segment is unchanged except for the foreign append — the
|
||||
// auto-fork wrote no terminal marker back into it.
|
||||
let source_after = store.read_all(session_id, source_segment_id).unwrap();
|
||||
assert_eq!(source_after.len(), source_len_before + 1);
|
||||
assert!(matches!(
|
||||
source_after.last(),
|
||||
Some(LogEntry::UserInput { .. })
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn compact_emits_session_start_carrying_summary_and_task_snapshot() {
|
||||
let client = MockClient::new(vec![
|
||||
|
|
|
|||
|
|
@ -114,8 +114,30 @@ pub fn restore_by_segment(
|
|||
restore(store, session_id, segment_id)
|
||||
}
|
||||
|
||||
/// Check if the store's entry count still matches the writer's tally.
|
||||
/// If not, auto-fork into a new segment within the same Session.
|
||||
/// Live auto-fork on concurrent-writer detection.
|
||||
///
|
||||
/// Checks whether the store's on-disk entry count still matches the
|
||||
/// writer's own append tally. If they match, the writer still owns the
|
||||
/// segment tail and nothing happens. If the store has grown behind the
|
||||
/// writer's back, another process appended to the same segment — so we
|
||||
/// branch into a fresh segment within the same Session.
|
||||
///
|
||||
/// # Marker form
|
||||
///
|
||||
/// Detection is by **tail entry-count comparison**, not by writing a
|
||||
/// terminal marker into the source segment. The source segment is left
|
||||
/// completely immutable — identical to the past-fork ([`fork_at`])
|
||||
/// invariant. The fork relationship is instead recorded forward on the
|
||||
/// *new* segment's `SegmentStart.forked_from`, so the lineage is still
|
||||
/// reconstructable from the logs alone (read each segment's
|
||||
/// `SegmentStart`; follow `forked_from` / `compacted_from` backward).
|
||||
/// Listing a parent's children is a cheap `list_segments(session_id)`
|
||||
/// scan filtered on `forked_from.segment_id`.
|
||||
///
|
||||
/// `at_turn_index` is the writer's current `turn_count`: the fork seeds
|
||||
/// the new segment with the writer's in-memory history (which reflects
|
||||
/// state up to that turn), so that is the divergence point relative to
|
||||
/// the now-diverged source segment.
|
||||
///
|
||||
/// Updates `segment_id` and `entries_written` in place when a fork occurs.
|
||||
pub fn ensure_head_or_fork(
|
||||
|
|
@ -123,12 +145,14 @@ pub fn ensure_head_or_fork(
|
|||
session_id: SessionId,
|
||||
segment_id: &mut SegmentId,
|
||||
entries_written: &mut usize,
|
||||
at_turn_index: usize,
|
||||
state: SegmentStartState<'_>,
|
||||
) -> Result<(), StoreError> {
|
||||
let store_count = store.read_entry_count(session_id, *segment_id)?;
|
||||
if store_count == *entries_written {
|
||||
return Ok(());
|
||||
}
|
||||
let source_segment_id = *segment_id;
|
||||
let fork_id = crate::new_segment_id();
|
||||
let entry = LogEntry::SegmentStart {
|
||||
ts: segment_log::now_millis(),
|
||||
|
|
@ -136,7 +160,10 @@ pub fn ensure_head_or_fork(
|
|||
system_prompt: state.system_prompt.map(String::from),
|
||||
config: state.config.clone(),
|
||||
history: to_logged(state.history),
|
||||
forked_from: None,
|
||||
forked_from: Some(SegmentOrigin {
|
||||
segment_id: source_segment_id,
|
||||
at_turn_index,
|
||||
}),
|
||||
compacted_from: None,
|
||||
};
|
||||
store.create_segment(session_id, fork_id, &[entry])?;
|
||||
|
|
@ -425,6 +452,14 @@ pub fn fork(
|
|||
/// `TurnEnd` in the source segment that the fork should branch from.
|
||||
/// Replay collects state up to and including that `TurnEnd`; entries
|
||||
/// after it are not carried into the new segment.
|
||||
///
|
||||
/// # Invariant: the source segment is never mutated
|
||||
///
|
||||
/// Past-fork only reads the source and seeds a brand-new segment. It
|
||||
/// writes no marker back into the source — exactly like live auto-fork
|
||||
/// ([`ensure_head_or_fork`]). This keeps nested past-forks simple: a
|
||||
/// fork of a fork just reads its own source and branches again, with no
|
||||
/// marker-position bookkeeping to reconcile across the chain.
|
||||
pub fn fork_at(
|
||||
store: &impl Store,
|
||||
source_session_id: SessionId,
|
||||
|
|
|
|||
|
|
@ -459,6 +459,7 @@ async fn session_auto_forks_on_conflict() {
|
|||
sid,
|
||||
&mut segment_id,
|
||||
&mut entries_written,
|
||||
/* at_turn_index */ 0,
|
||||
SegmentStartState {
|
||||
system_prompt: worker_a.get_system_prompt(),
|
||||
config: worker_a.request_config(),
|
||||
|
|
@ -476,10 +477,81 @@ async fn session_auto_forks_on_conflict() {
|
|||
let fork_state = collect_state(&fork_entries);
|
||||
assert_eq!(fork_state.session_id, Some(sid), "auto-fork inherits Session");
|
||||
|
||||
// Original segment should still have the interloper entry
|
||||
// The new segment records its lineage forward via forked_from; the
|
||||
// source segment is left immutable (no terminal marker written back).
|
||||
match &fork_entries[0] {
|
||||
LogEntry::SegmentStart {
|
||||
forked_from: Some(origin),
|
||||
..
|
||||
} => {
|
||||
assert_eq!(origin.segment_id, original_segid);
|
||||
assert_eq!(origin.at_turn_index, 0);
|
||||
}
|
||||
other => panic!("expected SegmentStart with forked_from, got {other:?}"),
|
||||
}
|
||||
|
||||
// Original segment should still have the interloper entry and NO
|
||||
// terminal fork marker — it is byte-for-byte unchanged.
|
||||
let original_entries = store.read_all(sid, original_segid).unwrap();
|
||||
assert_eq!(
|
||||
original_entries.len(),
|
||||
2,
|
||||
"source segment holds only SegmentStart + interloper UserInput"
|
||||
);
|
||||
let has_interloper = original_entries
|
||||
.iter()
|
||||
.any(|e| matches!(e, LogEntry::UserInput { .. }));
|
||||
assert!(has_interloper);
|
||||
}
|
||||
|
||||
/// Nested past-fork: forking a segment that is itself a fork must not
|
||||
/// require touching any ancestor. Each `fork_at` only reads its direct
|
||||
/// source and seeds a new segment, so a chain of forks composes cleanly.
|
||||
#[tokio::test]
|
||||
async fn nested_past_fork_leaves_ancestors_immutable() {
|
||||
let (_dir, store) = make_store();
|
||||
let client = MockLlmClient::new(simple_text_events());
|
||||
let worker = Worker::new(client);
|
||||
|
||||
let (sid, root_segid) = session_store::create_segment(
|
||||
&store,
|
||||
SegmentStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
config: worker.request_config(),
|
||||
history: worker.history(),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, root_segid, "Hello").await;
|
||||
let root_before = store.read_all(sid, root_segid).unwrap();
|
||||
|
||||
// First past-fork at the completed turn.
|
||||
let fork1 = session_store::fork_at(&store, sid, root_segid, worker.turn_count()).unwrap();
|
||||
// Fork the fork (turn 0 = right after its SegmentStart seed).
|
||||
let fork2 = session_store::fork_at(&store, sid, fork1, 0).unwrap();
|
||||
|
||||
// All three are distinct, all in the same Session.
|
||||
assert_ne!(fork1, root_segid);
|
||||
assert_ne!(fork2, fork1);
|
||||
for seg in [root_segid, fork1, fork2] {
|
||||
assert_eq!(
|
||||
collect_state(&store.read_all(sid, seg).unwrap()).session_id,
|
||||
Some(sid)
|
||||
);
|
||||
}
|
||||
|
||||
// The root and fork1 are untouched by forking their descendants.
|
||||
assert_eq!(store.read_all(sid, root_segid).unwrap().len(), root_before.len());
|
||||
let fork1_entries = store.read_all(sid, fork1).unwrap();
|
||||
assert_eq!(fork1_entries.len(), 1, "fork1 is just its SegmentStart seed");
|
||||
|
||||
// fork2's lineage points at fork1, not the root.
|
||||
match &store.read_all(sid, fork2).unwrap()[0] {
|
||||
LogEntry::SegmentStart {
|
||||
forked_from: Some(origin),
|
||||
..
|
||||
} => assert_eq!(origin.segment_id, fork1),
|
||||
other => panic!("expected SegmentStart with forked_from, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user