Merge: live-fork-marker

This commit is contained in:
Keisuke Hirata 2026-05-20 06:45:49 +09:00
commit 5830bb9c85
7 changed files with 208 additions and 52 deletions

View File

@ -14,4 +14,5 @@ Ticket を切るほどではないが、次に近所を触るときに合わせ
- `crates/tui/src/app.rs:478-485` — bad workflow slug を含む `Method::Run` 送信時、`Event::UserMessage` の早期 broadcast で `turn_index += 1` されターンヘッダだけ残る ("ghost turn header")。次に TUI のターンヘッダ / エラー表示周りを触るときに整理。→ [tickets/pod-input-validate-internalize.md] の review 由来。
- `crates/pod/src/controller.rs:944``worker_error_code``PodError::WorkflowResolve(_) => InvalidRequest` が post-commit な resolve エラー (`KnowledgeNotFound` 等) にも適用される。意味論的には妥当方向だが、resolve 系のエラー粒度を分けたくなったタイミングで再評価。
- `crates/pod/tests/controller_test.rs``double_run_returns_error` がたまに失敗する flakiness を観測。`pod-interrupt-prep-internalize` 以前から存在する別件。次に controller_test の Run 連投系のタイミングを触るときに併せて原因を切り分け。
- `crates/session-store/src/fs_store.rs:117-122``FsStore::read_entry_count``fs::read_to_string` で全文ロードしてから行数カウントするため O(n)。`ensure_head_or_fork` は run-start でしか呼ばれず現状は許容範囲だが、長期セッションが普通になった時点で `\n` バイト数の cheap count か末尾 seek に置き換える。→ [tickets/entry-hash-abolish.review.md] follow-up。
- `crates/session-store/src/fs_store.rs:117-122``FsStore::read_entry_count``fs::read_to_string` で全文ロードしてから行数カウントするため O(n)。`ensure_head_or_fork` は run-start でしか呼ばれず現状は許容範囲だが、長期セッションが普通になった時点で `\n` バイト数の cheap count か末尾 seek に置き換える。
- `crates/session-store/src/segment.rs:121` `ensure_head_or_fork` (free fn, test 専用・本番 caller ゼロ) と `crates/pod/src/pod.rs` `Pod::ensure_segment_head` (本番 inline) に live auto-fork の検知 + forked_from 記録が二重実装されている。entry-hash-abolish 以前からの重複で、両方独立にテスト済みだが drift 必至。session-store 側を本番から呼ぶ形に寄せるか free fn を畳むかは要設計判断。Pod state / fork 周辺を次に触るときに統合を検討。

View File

@ -7,8 +7,6 @@
- Pod: 空応答ターン (Submit 後 AI 応答ゼロで Pause/Cancel) を自動巻き戻し → [tickets/pod-empty-turn-rollback.md](tickets/pod-empty-turn-rollback.md)
- Pod: 任意ターンからの Fork複数ターン巻き戻しを汎用化 → [tickets/pod-session-fork.md](tickets/pod-session-fork.md)
- Pod: Inbound PodEvent ハンドリングの重複を統合 → [tickets/pod-inbound-pod-event-dedup.md](tickets/pod-inbound-pod-event-dedup.md)
- 永続化層整理 (Storage)
- live auto-fork の marker 形式確定 → [tickets/live-fork-marker.md](tickets/live-fork-marker.md)
- Pod 単位永続化
- Pod state backend と FsStore 実装 → [tickets/pod-state-backend.md](tickets/pod-state-backend.md)
- Pod lifecycle 各点での write 配線 → [tickets/pod-state-write-points.md](tickets/pod-state-write-points.md)

View File

@ -1672,9 +1672,13 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
return Ok(());
}
// Auto-fork within the same Session: mint a fresh Segment and
// switch to it. The new SegmentStart entry replaces the mirror
// and is broadcast through the sink so existing subscribers
// reset their view.
// switch to it. The source segment is left immutable (no terminal
// marker is written back); the fork relationship is recorded
// forward on the new segment's `forked_from`, with `at_turn_index`
// = the writer's current turn (its in-memory history reflects
// state up to that turn). The new SegmentStart replaces the mirror
// and is broadcast through the sink so existing subscribers reset
// their view.
let fork_segment_id = session_store::new_segment_id();
let entry = LogEntry::SegmentStart {
ts: segment_log::now_millis(),
@ -1682,7 +1686,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
system_prompt: w.get_system_prompt().map(String::from),
config: w.request_config().clone(),
history: to_logged(w.history()),
forked_from: None,
forked_from: Some(session_store::SegmentOrigin {
segment_id: loc.segment_id,
at_turn_index: w.turn_count(),
}),
compacted_from: None,
};
self.store

View File

@ -17,7 +17,7 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve
use llm_worker::llm_client::types::Item;
use llm_worker::llm_client::{ClientError, LlmClient, Request};
use protocol::Event;
use session_store::FsStore;
use session_store::{FsStore, LogEntry, Store};
use tokio::sync::broadcast;
use pod::Pod;
@ -213,6 +213,89 @@ fn system_texts_in_sink_session_start(
Vec::new()
}
/// Live auto-fork: when another writer extends the segment behind the
/// Pod's back, the next run's `ensure_segment_head` detects the
/// entry-count drift and branches into a fresh segment **within the same
/// Session**. The source segment is left immutable (no terminal marker
/// written back); the new segment records its parentage forward via
/// `SegmentStart.forked_from`.
#[tokio::test]
async fn concurrent_writer_drift_auto_forks_with_forked_from() {
// No compaction: keep run → run deterministic so each run consumes
// exactly one mock response and ensure_segment_head is the only fork
// trigger.
const NO_COMPACT_MANIFEST_TOML: &str = r#"
[pod]
name = "test-pod"
pwd = "./"
[model]
scheme = "anthropic"
model_id = "test-model"
[worker]
max_tokens = 100
[[scope.allow]]
target = "./"
permission = "write"
"#;
let client = MockClient::new(vec![single_text_events("first"), single_text_events("second")]);
let mut pod = make_pod_with_manifest(NO_COMPACT_MANIFEST_TOML, client).await;
pod.run_text("first").await.unwrap();
let store = pod.store().clone();
let session_id = pod.session_id();
let source_segment_id = pod.segment_id();
let source_len_before = store.read_all(session_id, source_segment_id).unwrap().len();
// Simulate a foreign writer appending to the same segment. This bumps
// the on-disk entry count past the Pod's own append tally without
// updating the Pod's `entries_written`.
store
.append(
session_id,
source_segment_id,
&LogEntry::UserInput {
ts: 9999,
segments: vec![protocol::Segment::text("interloper")],
},
)
.unwrap();
// Next run triggers ensure_segment_head, which sees the drift.
pod.run_text("second").await.unwrap();
// The Pod moved to a new segment in the same Session.
let new_segment_id = pod.segment_id();
assert_ne!(new_segment_id, source_segment_id);
assert_eq!(pod.session_id(), session_id, "auto-fork stays in-Session");
// New segment records forked_from pointing at the source.
let new_entries = store.read_all(session_id, new_segment_id).unwrap();
match &new_entries[0] {
LogEntry::SegmentStart {
session_id: seg_session,
forked_from: Some(origin),
..
} => {
assert_eq!(*seg_session, session_id);
assert_eq!(origin.segment_id, source_segment_id);
}
other => panic!("expected SegmentStart with forked_from, got {other:?}"),
}
// Source segment is unchanged except for the foreign append — the
// auto-fork wrote no terminal marker back into it.
let source_after = store.read_all(session_id, source_segment_id).unwrap();
assert_eq!(source_after.len(), source_len_before + 1);
assert!(matches!(
source_after.last(),
Some(LogEntry::UserInput { .. })
));
}
#[tokio::test]
async fn compact_emits_session_start_carrying_summary_and_task_snapshot() {
let client = MockClient::new(vec![

View File

@ -114,8 +114,30 @@ pub fn restore_by_segment(
restore(store, session_id, segment_id)
}
/// Check if the store's entry count still matches the writer's tally.
/// If not, auto-fork into a new segment within the same Session.
/// Live auto-fork on concurrent-writer detection.
///
/// Checks whether the store's on-disk entry count still matches the
/// writer's own append tally. If they match, the writer still owns the
/// segment tail and nothing happens. If the store has grown behind the
/// writer's back, another process appended to the same segment — so we
/// branch into a fresh segment within the same Session.
///
/// # Marker form
///
/// Detection is by **tail entry-count comparison**, not by writing a
/// terminal marker into the source segment. The source segment is left
/// completely immutable — identical to the past-fork ([`fork_at`])
/// invariant. The fork relationship is instead recorded forward on the
/// *new* segment's `SegmentStart.forked_from`, so the lineage is still
/// reconstructable from the logs alone (read each segment's
/// `SegmentStart`; follow `forked_from` / `compacted_from` backward).
/// Listing a parent's children is a cheap `list_segments(session_id)`
/// scan filtered on `forked_from.segment_id`.
///
/// `at_turn_index` is the writer's current `turn_count`: the fork seeds
/// the new segment with the writer's in-memory history (which reflects
/// state up to that turn), so that is the divergence point relative to
/// the now-diverged source segment.
///
/// Updates `segment_id` and `entries_written` in place when a fork occurs.
pub fn ensure_head_or_fork(
@ -123,12 +145,14 @@ pub fn ensure_head_or_fork(
session_id: SessionId,
segment_id: &mut SegmentId,
entries_written: &mut usize,
at_turn_index: usize,
state: SegmentStartState<'_>,
) -> Result<(), StoreError> {
let store_count = store.read_entry_count(session_id, *segment_id)?;
if store_count == *entries_written {
return Ok(());
}
let source_segment_id = *segment_id;
let fork_id = crate::new_segment_id();
let entry = LogEntry::SegmentStart {
ts: segment_log::now_millis(),
@ -136,7 +160,10 @@ pub fn ensure_head_or_fork(
system_prompt: state.system_prompt.map(String::from),
config: state.config.clone(),
history: to_logged(state.history),
forked_from: None,
forked_from: Some(SegmentOrigin {
segment_id: source_segment_id,
at_turn_index,
}),
compacted_from: None,
};
store.create_segment(session_id, fork_id, &[entry])?;
@ -425,6 +452,14 @@ pub fn fork(
/// `TurnEnd` in the source segment that the fork should branch from.
/// Replay collects state up to and including that `TurnEnd`; entries
/// after it are not carried into the new segment.
///
/// # Invariant: the source segment is never mutated
///
/// Past-fork only reads the source and seeds a brand-new segment. It
/// writes no marker back into the source — exactly like live auto-fork
/// ([`ensure_head_or_fork`]). This keeps nested past-forks simple: a
/// fork of a fork just reads its own source and branches again, with no
/// marker-position bookkeeping to reconcile across the chain.
pub fn fork_at(
store: &impl Store,
source_session_id: SessionId,

View File

@ -459,6 +459,7 @@ async fn session_auto_forks_on_conflict() {
sid,
&mut segment_id,
&mut entries_written,
/* at_turn_index */ 0,
SegmentStartState {
system_prompt: worker_a.get_system_prompt(),
config: worker_a.request_config(),
@ -476,10 +477,81 @@ async fn session_auto_forks_on_conflict() {
let fork_state = collect_state(&fork_entries);
assert_eq!(fork_state.session_id, Some(sid), "auto-fork inherits Session");
// Original segment should still have the interloper entry
// The new segment records its lineage forward via forked_from; the
// source segment is left immutable (no terminal marker written back).
match &fork_entries[0] {
LogEntry::SegmentStart {
forked_from: Some(origin),
..
} => {
assert_eq!(origin.segment_id, original_segid);
assert_eq!(origin.at_turn_index, 0);
}
other => panic!("expected SegmentStart with forked_from, got {other:?}"),
}
// Original segment should still have the interloper entry and NO
// terminal fork marker — it is byte-for-byte unchanged.
let original_entries = store.read_all(sid, original_segid).unwrap();
assert_eq!(
original_entries.len(),
2,
"source segment holds only SegmentStart + interloper UserInput"
);
let has_interloper = original_entries
.iter()
.any(|e| matches!(e, LogEntry::UserInput { .. }));
assert!(has_interloper);
}
/// Nested past-fork: forking a segment that is itself a fork must not
/// require touching any ancestor. Each `fork_at` only reads its direct
/// source and seeds a new segment, so a chain of forks composes cleanly.
#[tokio::test]
async fn nested_past_fork_leaves_ancestors_immutable() {
let (_dir, store) = make_store();
let client = MockLlmClient::new(simple_text_events());
let worker = Worker::new(client);
let (sid, root_segid) = session_store::create_segment(
&store,
SegmentStartState {
system_prompt: worker.get_system_prompt(),
config: worker.request_config(),
history: worker.history(),
},
)
.unwrap();
let (worker, _) = run_and_persist(worker, &store, sid, root_segid, "Hello").await;
let root_before = store.read_all(sid, root_segid).unwrap();
// First past-fork at the completed turn.
let fork1 = session_store::fork_at(&store, sid, root_segid, worker.turn_count()).unwrap();
// Fork the fork (turn 0 = right after its SegmentStart seed).
let fork2 = session_store::fork_at(&store, sid, fork1, 0).unwrap();
// All three are distinct, all in the same Session.
assert_ne!(fork1, root_segid);
assert_ne!(fork2, fork1);
for seg in [root_segid, fork1, fork2] {
assert_eq!(
collect_state(&store.read_all(sid, seg).unwrap()).session_id,
Some(sid)
);
}
// The root and fork1 are untouched by forking their descendants.
assert_eq!(store.read_all(sid, root_segid).unwrap().len(), root_before.len());
let fork1_entries = store.read_all(sid, fork1).unwrap();
assert_eq!(fork1_entries.len(), 1, "fork1 is just its SegmentStart seed");
// fork2's lineage points at fork1, not the root.
match &store.read_all(sid, fork2).unwrap()[0] {
LogEntry::SegmentStart {
forked_from: Some(origin),
..
} => assert_eq!(origin.segment_id, fork1),
other => panic!("expected SegmentStart with forked_from, got {other:?}"),
}
}

View File

@ -1,40 +0,0 @@
# session-store: live auto-fork の marker 形式確定
## 背景
`entry-hash-abolish``ensure_head_or_fork` は末尾 seq 比較ベースに置換されたが、最小実装で繋いだだけで marker 形式の本決定は保留している。
live auto-forkconcurrent writer 検知)と過去 forkUI から turn 選択)は性質が違う:
- **live auto-fork**: 元 Segment の末尾に terminal marker (例: `Forked { to: SegmentId }`) を append する CoW semantics。以降の writer は marker を見て新 Segment に自動移動。
- **過去 fork**: 元 Segment は無変更で、replay して新 Segment を生やすだけ。
両者を同じ marker で扱うと、過去 fork から更に過去で fork した場合に元 Segment への marker 位置解釈が複雑化して破綻する。**過去 fork は元 Segment に触れない方針を固定**した上で、live auto-fork 側の marker 形式を確定する。
## 要件
- live auto-fork 検知の形式を以下のどちらかに確定して実装:
- (a) **terminal marker entry**: 元 Segment 末尾に `Forked { to: SegmentId }` 等の LogEntry を 1 行 append する
- (b) **末尾 seq 比較**: 元 Segment に書き込みは行わず、writer の保持する末尾 seq と store の末尾 seq の差分のみで検知する
- 選択基準:
- (a) は読み手側が fork チェーンを log だけから辿れる利点。書き手競合時に 1 write 増えるコスト。
- (b) は元 Segment が完全 immutable で、過去 fork との semantics 統一が綺麗。fork 関係を引くには別の metadata index が要る。
- 過去 fork 側は引き続き元 Segment を touch しないことを invariant として明文化。
- `pod.rs``ensure_head_or_fork` を確定後の形式に合わせて書き直す。
## 完了条件
- live auto-fork の marker 形式が決まり、実装に反映されている。
- 過去 fork からの nested 過去 fork が正しく動くtest で確認)。
- 並行 writer による live auto-fork が正しく検知され、新 Segment に分岐するtest で確認)。
## 範囲外
- 過去 fork の物理コピー方式(既に `fork_at``SessionStart` seed 1 回書き込みの方針で固定)。
- Fork tree の可視化 UI。
## 関連
- `tickets/entry-hash-abolish.md` (前提)
- `tickets/session-grouping-introduce.md` (前提、Session 単位の lineage と整合)
- `tickets/pod-session-fork.md`