Merge: live-fork-marker
This commit is contained in:
commit
67f135fbc6
|
|
@ -14,4 +14,5 @@ Ticket を切るほどではないが、次に近所を触るときに合わせ
|
|||
- `crates/tui/src/app.rs:478-485` — bad workflow slug を含む `Method::Run` 送信時、`Event::UserMessage` の早期 broadcast で `turn_index += 1` されターンヘッダだけ残る ("ghost turn header")。次に TUI のターンヘッダ / エラー表示周りを触るときに整理。→ [tickets/pod-input-validate-internalize.md] の review 由来。
|
||||
- `crates/pod/src/controller.rs:944` — `worker_error_code` で `PodError::WorkflowResolve(_) => InvalidRequest` が post-commit な resolve エラー (`KnowledgeNotFound` 等) にも適用される。意味論的には妥当方向だが、resolve 系のエラー粒度を分けたくなったタイミングで再評価。
|
||||
- `crates/pod/tests/controller_test.rs` — `double_run_returns_error` がたまに失敗する flakiness を観測。`pod-interrupt-prep-internalize` 以前から存在する別件。次に controller_test の Run 連投系のタイミングを触るときに併せて原因を切り分け。
|
||||
- `crates/session-store/src/fs_store.rs:117-122` — `FsStore::read_entry_count` が `fs::read_to_string` で全文ロードしてから行数カウントするため O(n)。`ensure_head_or_fork` は run-start でしか呼ばれず現状は許容範囲だが、長期セッションが普通になった時点で `\n` バイト数の cheap count か末尾 seek に置き換える。→ [tickets/entry-hash-abolish.review.md] follow-up。
|
||||
- `crates/session-store/src/fs_store.rs:117-122` — `FsStore::read_entry_count` が `fs::read_to_string` で全文ロードしてから行数カウントするため O(n)。`ensure_head_or_fork` は run-start でしか呼ばれず現状は許容範囲だが、長期セッションが普通になった時点で `\n` バイト数の cheap count か末尾 seek に置き換える。
|
||||
- `crates/session-store/src/segment.rs:121` `ensure_head_or_fork` (free fn, test 専用・本番 caller ゼロ) と `crates/pod/src/pod.rs` `Pod::ensure_segment_head` (本番 inline) に live auto-fork の検知 + forked_from 記録が二重実装されている。entry-hash-abolish 以前からの重複で、両方独立にテスト済みだが drift 必至。session-store 側を本番から呼ぶ形に寄せるか free fn を畳むかは要設計判断。Pod state / fork 周辺を次に触るときに統合を検討。
|
||||
|
|
|
|||
2
TODO.md
2
TODO.md
|
|
@ -7,8 +7,6 @@
|
|||
- Pod: 空応答ターン (Submit 後 AI 応答ゼロで Pause/Cancel) を自動巻き戻し → [tickets/pod-empty-turn-rollback.md](tickets/pod-empty-turn-rollback.md)
|
||||
- Pod: 任意ターンからの Fork(複数ターン巻き戻しを汎用化) → [tickets/pod-session-fork.md](tickets/pod-session-fork.md)
|
||||
- Pod: Inbound PodEvent ハンドリングの重複を統合 → [tickets/pod-inbound-pod-event-dedup.md](tickets/pod-inbound-pod-event-dedup.md)
|
||||
- 永続化層整理 (Storage)
|
||||
- live auto-fork の marker 形式確定 → [tickets/live-fork-marker.md](tickets/live-fork-marker.md)
|
||||
- Pod 単位永続化
|
||||
- Pod state backend と FsStore 実装 → [tickets/pod-state-backend.md](tickets/pod-state-backend.md)
|
||||
- Pod lifecycle 各点での write 配線 → [tickets/pod-state-write-points.md](tickets/pod-state-write-points.md)
|
||||
|
|
|
|||
|
|
@ -1672,9 +1672,13 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
return Ok(());
|
||||
}
|
||||
// Auto-fork within the same Session: mint a fresh Segment and
|
||||
// switch to it. The new SegmentStart entry replaces the mirror
|
||||
// and is broadcast through the sink so existing subscribers
|
||||
// reset their view.
|
||||
// switch to it. The source segment is left immutable (no terminal
|
||||
// marker is written back); the fork relationship is recorded
|
||||
// forward on the new segment's `forked_from`, with `at_turn_index`
|
||||
// = the writer's current turn (its in-memory history reflects
|
||||
// state up to that turn). The new SegmentStart replaces the mirror
|
||||
// and is broadcast through the sink so existing subscribers reset
|
||||
// their view.
|
||||
let fork_segment_id = session_store::new_segment_id();
|
||||
let entry = LogEntry::SegmentStart {
|
||||
ts: segment_log::now_millis(),
|
||||
|
|
@ -1682,7 +1686,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
system_prompt: w.get_system_prompt().map(String::from),
|
||||
config: w.request_config().clone(),
|
||||
history: to_logged(w.history()),
|
||||
forked_from: None,
|
||||
forked_from: Some(session_store::SegmentOrigin {
|
||||
segment_id: loc.segment_id,
|
||||
at_turn_index: w.turn_count(),
|
||||
}),
|
||||
compacted_from: None,
|
||||
};
|
||||
self.store
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve
|
|||
use llm_worker::llm_client::types::Item;
|
||||
use llm_worker::llm_client::{ClientError, LlmClient, Request};
|
||||
use protocol::Event;
|
||||
use session_store::FsStore;
|
||||
use session_store::{FsStore, LogEntry, Store};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use pod::Pod;
|
||||
|
|
@ -213,6 +213,89 @@ fn system_texts_in_sink_session_start(
|
|||
Vec::new()
|
||||
}
|
||||
|
||||
/// Live auto-fork: when another writer extends the segment behind the
|
||||
/// Pod's back, the next run's `ensure_segment_head` detects the
|
||||
/// entry-count drift and branches into a fresh segment **within the same
|
||||
/// Session**. The source segment is left immutable (no terminal marker
|
||||
/// written back); the new segment records its parentage forward via
|
||||
/// `SegmentStart.forked_from`.
|
||||
#[tokio::test]
|
||||
async fn concurrent_writer_drift_auto_forks_with_forked_from() {
|
||||
// No compaction: keep run → run deterministic so each run consumes
|
||||
// exactly one mock response and ensure_segment_head is the only fork
|
||||
// trigger.
|
||||
const NO_COMPACT_MANIFEST_TOML: &str = r#"
|
||||
[pod]
|
||||
name = "test-pod"
|
||||
pwd = "./"
|
||||
|
||||
[model]
|
||||
scheme = "anthropic"
|
||||
model_id = "test-model"
|
||||
|
||||
[worker]
|
||||
max_tokens = 100
|
||||
|
||||
[[scope.allow]]
|
||||
target = "./"
|
||||
permission = "write"
|
||||
"#;
|
||||
let client = MockClient::new(vec![single_text_events("first"), single_text_events("second")]);
|
||||
let mut pod = make_pod_with_manifest(NO_COMPACT_MANIFEST_TOML, client).await;
|
||||
|
||||
pod.run_text("first").await.unwrap();
|
||||
|
||||
let store = pod.store().clone();
|
||||
let session_id = pod.session_id();
|
||||
let source_segment_id = pod.segment_id();
|
||||
let source_len_before = store.read_all(session_id, source_segment_id).unwrap().len();
|
||||
|
||||
// Simulate a foreign writer appending to the same segment. This bumps
|
||||
// the on-disk entry count past the Pod's own append tally without
|
||||
// updating the Pod's `entries_written`.
|
||||
store
|
||||
.append(
|
||||
session_id,
|
||||
source_segment_id,
|
||||
&LogEntry::UserInput {
|
||||
ts: 9999,
|
||||
segments: vec![protocol::Segment::text("interloper")],
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Next run triggers ensure_segment_head, which sees the drift.
|
||||
pod.run_text("second").await.unwrap();
|
||||
|
||||
// The Pod moved to a new segment in the same Session.
|
||||
let new_segment_id = pod.segment_id();
|
||||
assert_ne!(new_segment_id, source_segment_id);
|
||||
assert_eq!(pod.session_id(), session_id, "auto-fork stays in-Session");
|
||||
|
||||
// New segment records forked_from pointing at the source.
|
||||
let new_entries = store.read_all(session_id, new_segment_id).unwrap();
|
||||
match &new_entries[0] {
|
||||
LogEntry::SegmentStart {
|
||||
session_id: seg_session,
|
||||
forked_from: Some(origin),
|
||||
..
|
||||
} => {
|
||||
assert_eq!(*seg_session, session_id);
|
||||
assert_eq!(origin.segment_id, source_segment_id);
|
||||
}
|
||||
other => panic!("expected SegmentStart with forked_from, got {other:?}"),
|
||||
}
|
||||
|
||||
// Source segment is unchanged except for the foreign append — the
|
||||
// auto-fork wrote no terminal marker back into it.
|
||||
let source_after = store.read_all(session_id, source_segment_id).unwrap();
|
||||
assert_eq!(source_after.len(), source_len_before + 1);
|
||||
assert!(matches!(
|
||||
source_after.last(),
|
||||
Some(LogEntry::UserInput { .. })
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn compact_emits_session_start_carrying_summary_and_task_snapshot() {
|
||||
let client = MockClient::new(vec![
|
||||
|
|
|
|||
|
|
@ -114,8 +114,30 @@ pub fn restore_by_segment(
|
|||
restore(store, session_id, segment_id)
|
||||
}
|
||||
|
||||
/// Check if the store's entry count still matches the writer's tally.
|
||||
/// If not, auto-fork into a new segment within the same Session.
|
||||
/// Live auto-fork on concurrent-writer detection.
|
||||
///
|
||||
/// Checks whether the store's on-disk entry count still matches the
|
||||
/// writer's own append tally. If they match, the writer still owns the
|
||||
/// segment tail and nothing happens. If the store has grown behind the
|
||||
/// writer's back, another process appended to the same segment — so we
|
||||
/// branch into a fresh segment within the same Session.
|
||||
///
|
||||
/// # Marker form
|
||||
///
|
||||
/// Detection is by **tail entry-count comparison**, not by writing a
|
||||
/// terminal marker into the source segment. The source segment is left
|
||||
/// completely immutable — identical to the past-fork ([`fork_at`])
|
||||
/// invariant. The fork relationship is instead recorded forward on the
|
||||
/// *new* segment's `SegmentStart.forked_from`, so the lineage is still
|
||||
/// reconstructable from the logs alone (read each segment's
|
||||
/// `SegmentStart`; follow `forked_from` / `compacted_from` backward).
|
||||
/// Listing a parent's children is a cheap `list_segments(session_id)`
|
||||
/// scan filtered on `forked_from.segment_id`.
|
||||
///
|
||||
/// `at_turn_index` is the writer's current `turn_count`: the fork seeds
|
||||
/// the new segment with the writer's in-memory history (which reflects
|
||||
/// state up to that turn), so that is the divergence point relative to
|
||||
/// the now-diverged source segment.
|
||||
///
|
||||
/// Updates `segment_id` and `entries_written` in place when a fork occurs.
|
||||
pub fn ensure_head_or_fork(
|
||||
|
|
@ -123,12 +145,14 @@ pub fn ensure_head_or_fork(
|
|||
session_id: SessionId,
|
||||
segment_id: &mut SegmentId,
|
||||
entries_written: &mut usize,
|
||||
at_turn_index: usize,
|
||||
state: SegmentStartState<'_>,
|
||||
) -> Result<(), StoreError> {
|
||||
let store_count = store.read_entry_count(session_id, *segment_id)?;
|
||||
if store_count == *entries_written {
|
||||
return Ok(());
|
||||
}
|
||||
let source_segment_id = *segment_id;
|
||||
let fork_id = crate::new_segment_id();
|
||||
let entry = LogEntry::SegmentStart {
|
||||
ts: segment_log::now_millis(),
|
||||
|
|
@ -136,7 +160,10 @@ pub fn ensure_head_or_fork(
|
|||
system_prompt: state.system_prompt.map(String::from),
|
||||
config: state.config.clone(),
|
||||
history: to_logged(state.history),
|
||||
forked_from: None,
|
||||
forked_from: Some(SegmentOrigin {
|
||||
segment_id: source_segment_id,
|
||||
at_turn_index,
|
||||
}),
|
||||
compacted_from: None,
|
||||
};
|
||||
store.create_segment(session_id, fork_id, &[entry])?;
|
||||
|
|
@ -425,6 +452,14 @@ pub fn fork(
|
|||
/// `TurnEnd` in the source segment that the fork should branch from.
|
||||
/// Replay collects state up to and including that `TurnEnd`; entries
|
||||
/// after it are not carried into the new segment.
|
||||
///
|
||||
/// # Invariant: the source segment is never mutated
|
||||
///
|
||||
/// Past-fork only reads the source and seeds a brand-new segment. It
|
||||
/// writes no marker back into the source — exactly like live auto-fork
|
||||
/// ([`ensure_head_or_fork`]). This keeps nested past-forks simple: a
|
||||
/// fork of a fork just reads its own source and branches again, with no
|
||||
/// marker-position bookkeeping to reconcile across the chain.
|
||||
pub fn fork_at(
|
||||
store: &impl Store,
|
||||
source_session_id: SessionId,
|
||||
|
|
|
|||
|
|
@ -459,6 +459,7 @@ async fn session_auto_forks_on_conflict() {
|
|||
sid,
|
||||
&mut segment_id,
|
||||
&mut entries_written,
|
||||
/* at_turn_index */ 0,
|
||||
SegmentStartState {
|
||||
system_prompt: worker_a.get_system_prompt(),
|
||||
config: worker_a.request_config(),
|
||||
|
|
@ -476,10 +477,81 @@ async fn session_auto_forks_on_conflict() {
|
|||
let fork_state = collect_state(&fork_entries);
|
||||
assert_eq!(fork_state.session_id, Some(sid), "auto-fork inherits Session");
|
||||
|
||||
// Original segment should still have the interloper entry
|
||||
// The new segment records its lineage forward via forked_from; the
|
||||
// source segment is left immutable (no terminal marker written back).
|
||||
match &fork_entries[0] {
|
||||
LogEntry::SegmentStart {
|
||||
forked_from: Some(origin),
|
||||
..
|
||||
} => {
|
||||
assert_eq!(origin.segment_id, original_segid);
|
||||
assert_eq!(origin.at_turn_index, 0);
|
||||
}
|
||||
other => panic!("expected SegmentStart with forked_from, got {other:?}"),
|
||||
}
|
||||
|
||||
// Original segment should still have the interloper entry and NO
|
||||
// terminal fork marker — it is byte-for-byte unchanged.
|
||||
let original_entries = store.read_all(sid, original_segid).unwrap();
|
||||
assert_eq!(
|
||||
original_entries.len(),
|
||||
2,
|
||||
"source segment holds only SegmentStart + interloper UserInput"
|
||||
);
|
||||
let has_interloper = original_entries
|
||||
.iter()
|
||||
.any(|e| matches!(e, LogEntry::UserInput { .. }));
|
||||
assert!(has_interloper);
|
||||
}
|
||||
|
||||
/// Nested past-fork: forking a segment that is itself a fork must not
|
||||
/// require touching any ancestor. Each `fork_at` only reads its direct
|
||||
/// source and seeds a new segment, so a chain of forks composes cleanly.
|
||||
#[tokio::test]
|
||||
async fn nested_past_fork_leaves_ancestors_immutable() {
|
||||
let (_dir, store) = make_store();
|
||||
let client = MockLlmClient::new(simple_text_events());
|
||||
let worker = Worker::new(client);
|
||||
|
||||
let (sid, root_segid) = session_store::create_segment(
|
||||
&store,
|
||||
SegmentStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
config: worker.request_config(),
|
||||
history: worker.history(),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, root_segid, "Hello").await;
|
||||
let root_before = store.read_all(sid, root_segid).unwrap();
|
||||
|
||||
// First past-fork at the completed turn.
|
||||
let fork1 = session_store::fork_at(&store, sid, root_segid, worker.turn_count()).unwrap();
|
||||
// Fork the fork (turn 0 = right after its SegmentStart seed).
|
||||
let fork2 = session_store::fork_at(&store, sid, fork1, 0).unwrap();
|
||||
|
||||
// All three are distinct, all in the same Session.
|
||||
assert_ne!(fork1, root_segid);
|
||||
assert_ne!(fork2, fork1);
|
||||
for seg in [root_segid, fork1, fork2] {
|
||||
assert_eq!(
|
||||
collect_state(&store.read_all(sid, seg).unwrap()).session_id,
|
||||
Some(sid)
|
||||
);
|
||||
}
|
||||
|
||||
// The root and fork1 are untouched by forking their descendants.
|
||||
assert_eq!(store.read_all(sid, root_segid).unwrap().len(), root_before.len());
|
||||
let fork1_entries = store.read_all(sid, fork1).unwrap();
|
||||
assert_eq!(fork1_entries.len(), 1, "fork1 is just its SegmentStart seed");
|
||||
|
||||
// fork2's lineage points at fork1, not the root.
|
||||
match &store.read_all(sid, fork2).unwrap()[0] {
|
||||
LogEntry::SegmentStart {
|
||||
forked_from: Some(origin),
|
||||
..
|
||||
} => assert_eq!(origin.segment_id, fork1),
|
||||
other => panic!("expected SegmentStart with forked_from, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,40 +0,0 @@
|
|||
# session-store: live auto-fork の marker 形式確定
|
||||
|
||||
## 背景
|
||||
|
||||
`entry-hash-abolish` で `ensure_head_or_fork` は末尾 seq 比較ベースに置換されたが、最小実装で繋いだだけで marker 形式の本決定は保留している。
|
||||
|
||||
live auto-fork(concurrent writer 検知)と過去 fork(UI から turn 選択)は性質が違う:
|
||||
|
||||
- **live auto-fork**: 元 Segment の末尾に terminal marker (例: `Forked { to: SegmentId }`) を append する CoW semantics。以降の writer は marker を見て新 Segment に自動移動。
|
||||
- **過去 fork**: 元 Segment は無変更で、replay して新 Segment を生やすだけ。
|
||||
|
||||
両者を同じ marker で扱うと、過去 fork から更に過去で fork した場合に元 Segment への marker 位置解釈が複雑化して破綻する。**過去 fork は元 Segment に触れない方針を固定**した上で、live auto-fork 側の marker 形式を確定する。
|
||||
|
||||
## 要件
|
||||
|
||||
- live auto-fork 検知の形式を以下のどちらかに確定して実装:
|
||||
- (a) **terminal marker entry**: 元 Segment 末尾に `Forked { to: SegmentId }` 等の LogEntry を 1 行 append する
|
||||
- (b) **末尾 seq 比較**: 元 Segment に書き込みは行わず、writer の保持する末尾 seq と store の末尾 seq の差分のみで検知する
|
||||
- 選択基準:
|
||||
- (a) は読み手側が fork チェーンを log だけから辿れる利点。書き手競合時に 1 write 増えるコスト。
|
||||
- (b) は元 Segment が完全 immutable で、過去 fork との semantics 統一が綺麗。fork 関係を引くには別の metadata index が要る。
|
||||
- 過去 fork 側は引き続き元 Segment を touch しないことを invariant として明文化。
|
||||
- `pod.rs` の `ensure_head_or_fork` を確定後の形式に合わせて書き直す。
|
||||
|
||||
## 完了条件
|
||||
|
||||
- live auto-fork の marker 形式が決まり、実装に反映されている。
|
||||
- 過去 fork からの nested 過去 fork が正しく動く(test で確認)。
|
||||
- 並行 writer による live auto-fork が正しく検知され、新 Segment に分岐する(test で確認)。
|
||||
|
||||
## 範囲外
|
||||
|
||||
- 過去 fork の物理コピー方式(既に `fork_at` で `SessionStart` seed 1 回書き込みの方針で固定)。
|
||||
- Fork tree の可視化 UI。
|
||||
|
||||
## 関連
|
||||
|
||||
- `tickets/entry-hash-abolish.md` (前提)
|
||||
- `tickets/session-grouping-introduce.md` (前提、Session 単位の lineage と整合)
|
||||
- `tickets/pod-session-fork.md`
|
||||
Loading…
Reference in New Issue
Block a user