refactor: remove legacy plural log entries
This commit is contained in:
parent
90d4c8f5ad
commit
5ba4be1c9b
1
TODO.md
1
TODO.md
|
|
@ -8,7 +8,6 @@
|
|||
- Pod: 任意ターンからの Fork(複数ターン巻き戻しを汎用化) → [tickets/pod-session-fork.md](tickets/pod-session-fork.md)
|
||||
- Pod: Inbound PodEvent ハンドリングの重複を統合 → [tickets/pod-inbound-pod-event-dedup.md](tickets/pod-inbound-pod-event-dedup.md)
|
||||
- Pod: 過去 Pod の探索と restore ツール → [tickets/pod-discovery-restore-tools.md](tickets/pod-discovery-restore-tools.md)
|
||||
- Pod: ReadPodOutput を singular LogEntry 形式に追従 → [tickets/read-pod-output-singular-log-entry.md](tickets/read-pod-output-singular-log-entry.md)
|
||||
- llm-worker のエラー耐性
|
||||
- ストリーム途中失敗時の継続 → [tickets/llm-worker-stream-continuation.md](tickets/llm-worker-stream-continuation.md)
|
||||
- llm-worker: history append を callback 経由の単一経路に閉じる → [tickets/worker-history-append-contract.md](tickets/worker-history-append-contract.md)
|
||||
|
|
|
|||
|
|
@ -429,39 +429,43 @@ fn extract_assistant_text(entries: &[serde_json::Value]) -> String {
|
|||
let mut out = String::new();
|
||||
for value in entries {
|
||||
// The wire payload is the JSON form of `session_store::LogEntry`.
|
||||
// Walk Assistant items inside each entry that can carry them:
|
||||
// post-compaction `SegmentStart.history` (seed) and per-LLM-call
|
||||
// `AssistantItems` deltas.
|
||||
// Walk current singular assistant items and the seeded history in
|
||||
// post-compaction `SegmentStart` entries.
|
||||
let Ok(entry) = serde_json::from_value::<LogEntry>(value.clone()) else {
|
||||
continue;
|
||||
};
|
||||
let logged_items = match entry {
|
||||
LogEntry::SegmentStart { history, .. } => history,
|
||||
LogEntry::AssistantItems { items, .. } => items,
|
||||
_ => continue,
|
||||
};
|
||||
for logged in logged_items {
|
||||
let item: Item = logged.into();
|
||||
if let Item::Message {
|
||||
role: Role::Assistant,
|
||||
content,
|
||||
..
|
||||
} = item
|
||||
{
|
||||
for part in content {
|
||||
if let ContentPart::Text { text } = part {
|
||||
if !out.is_empty() {
|
||||
out.push_str("\n\n");
|
||||
}
|
||||
out.push_str(&text);
|
||||
}
|
||||
match entry {
|
||||
LogEntry::SegmentStart { history, .. } => {
|
||||
for logged in history {
|
||||
push_assistant_text(&mut out, logged);
|
||||
}
|
||||
}
|
||||
LogEntry::AssistantItem { item, .. } => push_assistant_text(&mut out, item),
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
fn push_assistant_text(out: &mut String, logged: session_store::LoggedItem) {
|
||||
let item: Item = logged.into();
|
||||
if let Item::Message {
|
||||
role: Role::Assistant,
|
||||
content,
|
||||
..
|
||||
} = item
|
||||
{
|
||||
for part in content {
|
||||
if let ContentPart::Text { text } = part {
|
||||
if !out.is_empty() {
|
||||
out.push_str("\n\n");
|
||||
}
|
||||
out.push_str(&text);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn summarize_scope(record: &SpawnedPodRecord) -> String {
|
||||
if record.scope_delegated.is_empty() {
|
||||
return "(none)".into();
|
||||
|
|
|
|||
|
|
@ -35,14 +35,6 @@ fn history_from_sink(handle: &PodHandle) -> Vec<Item> {
|
|||
LogEntry::SystemItem { item, .. } => {
|
||||
items.push(item.to_history_item());
|
||||
}
|
||||
LogEntry::AssistantItems { items: i, .. }
|
||||
| LogEntry::ToolResults { items: i, .. }
|
||||
| LogEntry::HookInjectedItems { items: i, .. } => {
|
||||
items.extend(i.into_iter().map(Item::from));
|
||||
}
|
||||
LogEntry::SystemItems { items: si, .. } => {
|
||||
items.extend(si.iter().map(|s| s.to_history_item()));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -158,18 +158,18 @@ fn serve_history(listener: UnixListener, items: Vec<Item>) -> JoinHandle<()> {
|
|||
};
|
||||
let (_r, w) = stream.into_split();
|
||||
let mut writer = JsonLineWriter::new(w);
|
||||
// Wrap the assistant items in a single
|
||||
// `LogEntry::AssistantItems` entry — that's the only kind
|
||||
// that contributes assistant text via `extract_assistant_text`.
|
||||
let logged: Vec<session_store::LoggedItem> =
|
||||
items.iter().map(session_store::LoggedItem::from).collect();
|
||||
let entry = session_store::LogEntry::AssistantItems {
|
||||
ts: 0,
|
||||
items: logged,
|
||||
};
|
||||
let entry_value = serde_json::to_value(&entry).unwrap();
|
||||
let entries: Vec<serde_json::Value> = items
|
||||
.iter()
|
||||
.map(|item| {
|
||||
let entry = session_store::LogEntry::AssistantItem {
|
||||
ts: 0,
|
||||
item: session_store::LoggedItem::from(item),
|
||||
};
|
||||
serde_json::to_value(&entry).unwrap()
|
||||
})
|
||||
.collect();
|
||||
let event = Event::Snapshot {
|
||||
entries: vec![entry_value],
|
||||
entries,
|
||||
greeting: Greeting {
|
||||
pod_name: "child".into(),
|
||||
cwd: "/tmp".into(),
|
||||
|
|
|
|||
|
|
@ -226,9 +226,7 @@ pub enum Event {
|
|||
/// `[File: …]`.
|
||||
///
|
||||
/// One event per `LogEntry::SystemItem` commit. Disk-side and
|
||||
/// wire-side are 1:1 (singular variant); legacy `SystemItems`
|
||||
/// entries from older sessions are read-only and never emitted on
|
||||
/// this lane.
|
||||
/// wire-side are 1:1.
|
||||
SystemItem {
|
||||
item: serde_json::Value,
|
||||
},
|
||||
|
|
@ -363,9 +361,8 @@ pub enum Event {
|
|||
///
|
||||
/// Live updates after the snapshot arrive through the streaming
|
||||
/// events (`TextDelta` / `ToolCall*` / `ToolResult` / etc.) plus
|
||||
/// the two role-specific entry events
|
||||
/// (`SegmentRotated` / `HookInjectedItems`) — there is no generic
|
||||
/// "every committed entry" broadcast.
|
||||
/// role-specific entry events (`SegmentRotated` / `SystemItem`) —
|
||||
/// there is no generic "every committed entry" broadcast.
|
||||
Snapshot {
|
||||
entries: Vec<serde_json::Value>,
|
||||
greeting: Greeting,
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ Worker のセッション永続化を提供するクレート。追記専用の
|
|||
|
||||
### ログ
|
||||
|
||||
- `LogEntry` — セッションログのエントリ型(`SessionStart`, `UserInput`, `AssistantItems`, `TurnEnd` など)
|
||||
- `LogEntry` — セッションログのエントリ型(`SegmentStart`, `UserInput`, `AssistantItem`, `ToolResult`, `SystemItem`, `TurnEnd` など)
|
||||
- `RestoredState` — ログ再生で復元された状態
|
||||
- `collect_state()` — ログエントリ列から状態を復元する関数
|
||||
|
||||
|
|
|
|||
|
|
@ -197,8 +197,8 @@ pub fn save_user_input(
|
|||
|
||||
/// Log the history delta — new items added since the previous snapshot.
|
||||
///
|
||||
/// Classifies items into AssistantItem / ToolResult / HookInjectedItems
|
||||
/// entries automatically (one entry per item). User messages are skipped
|
||||
/// Classifies items into AssistantItem / ToolResult entries automatically
|
||||
/// (one entry per item). User messages are skipped
|
||||
/// because they are persisted upfront via [`save_user_input`] at submit
|
||||
/// time; the worker pushes a flattened copy into its history that
|
||||
/// arrives here in `new_items` and would otherwise produce a duplicate
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ use crate::system_item::SystemItem;
|
|||
/// Variants correspond to specific mutation points in `Worker`:
|
||||
/// - `SegmentStart` — always the first entry; captures initial state
|
||||
/// - `Invoke` — IDLE → active marker (start of a new self-driving cycle)
|
||||
/// - `UserInput` / `AssistantItems` / `ToolResults` / `HookInjectedItems` — history appends
|
||||
/// - `UserInput` / `AssistantItem` / `ToolResult` / `SystemItem` — history appends
|
||||
/// - `TurnEnd` — AgentTurn boundary marker; carries the post-increment
|
||||
/// `turn_count`. With retry unimplemented today this fires once per
|
||||
/// `run()`/`resume()` (current callers persist a single TurnEnd at
|
||||
|
|
@ -94,23 +94,6 @@ pub enum LogEntry {
|
|||
/// dispatch on `kind` for typed rendering.
|
||||
SystemItem { ts: u64, item: SystemItem },
|
||||
|
||||
/// Legacy plural form: kept **read-only** so old segment logs still
|
||||
/// open. New writes always use the singular `AssistantItem`. Items
|
||||
/// are flattened on replay.
|
||||
AssistantItems { ts: u64, items: Vec<LoggedItem> },
|
||||
|
||||
/// Legacy plural form: kept **read-only**. New writes use the
|
||||
/// singular `ToolResult`.
|
||||
ToolResults { ts: u64, items: Vec<LoggedItem> },
|
||||
|
||||
/// Legacy plural form: kept **read-only**. New writes use the
|
||||
/// singular `SystemItem`.
|
||||
SystemItems { ts: u64, items: Vec<SystemItem> },
|
||||
|
||||
/// Legacy pre-`SystemItem*` form. Deserialize-only. Items are
|
||||
/// flattened to `Item::system_message` on replay.
|
||||
HookInjectedItems { ts: u64, items: Vec<LoggedItem> },
|
||||
|
||||
/// Turn boundary. Records the turn count after increment.
|
||||
TurnEnd { ts: u64, turn_count: usize },
|
||||
|
||||
|
|
@ -279,20 +262,6 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState {
|
|||
LogEntry::SystemItem { item, .. } => {
|
||||
state.history.push(item.to_history_item());
|
||||
}
|
||||
LogEntry::AssistantItems { items, .. } => {
|
||||
state.history.extend(items.iter().cloned().map(Item::from));
|
||||
}
|
||||
LogEntry::ToolResults { items, .. } => {
|
||||
state.history.extend(items.iter().cloned().map(Item::from));
|
||||
}
|
||||
LogEntry::SystemItems { items, .. } => {
|
||||
state
|
||||
.history
|
||||
.extend(items.iter().map(|si| si.to_history_item()));
|
||||
}
|
||||
LogEntry::HookInjectedItems { items, .. } => {
|
||||
state.history.extend(items.iter().cloned().map(Item::from));
|
||||
}
|
||||
LogEntry::TurnEnd { turn_count, .. } => {
|
||||
state.turn_count = *turn_count;
|
||||
}
|
||||
|
|
@ -396,9 +365,9 @@ mod tests {
|
|||
ts: 2000,
|
||||
segments: vec![Segment::text("Hello")],
|
||||
},
|
||||
LogEntry::AssistantItems {
|
||||
LogEntry::AssistantItem {
|
||||
ts: 3000,
|
||||
items: vec![Item::assistant_message("Hi!").into()],
|
||||
item: Item::assistant_message("Hi!").into(),
|
||||
},
|
||||
LogEntry::TurnEnd {
|
||||
ts: 3100,
|
||||
|
|
@ -431,17 +400,17 @@ mod tests {
|
|||
ts: 2000,
|
||||
segments: vec![Segment::text("Check weather")],
|
||||
},
|
||||
LogEntry::AssistantItems {
|
||||
LogEntry::AssistantItem {
|
||||
ts: 3000,
|
||||
items: vec![Item::tool_call("call_1", "get_weather", r#"{"city":"Tokyo"}"#).into()],
|
||||
item: Item::tool_call("call_1", "get_weather", r#"{"city":"Tokyo"}"#).into(),
|
||||
},
|
||||
LogEntry::ToolResults {
|
||||
LogEntry::ToolResult {
|
||||
ts: 3500,
|
||||
items: vec![Item::tool_result("call_1", "Sunny, 25C").into()],
|
||||
item: Item::tool_result("call_1", "Sunny, 25C").into(),
|
||||
},
|
||||
LogEntry::AssistantItems {
|
||||
LogEntry::AssistantItem {
|
||||
ts: 4000,
|
||||
items: vec![Item::assistant_message("It's sunny in Tokyo!").into()],
|
||||
item: Item::assistant_message("It's sunny in Tokyo!").into(),
|
||||
},
|
||||
LogEntry::TurnEnd {
|
||||
ts: 4100,
|
||||
|
|
@ -497,9 +466,9 @@ mod tests {
|
|||
cache_write_tokens: 0,
|
||||
output_tokens: 10,
|
||||
},
|
||||
LogEntry::AssistantItems {
|
||||
LogEntry::AssistantItem {
|
||||
ts: 2200,
|
||||
items: vec![Item::assistant_message("yo").into()],
|
||||
item: Item::assistant_message("yo").into(),
|
||||
},
|
||||
LogEntry::LlmUsage {
|
||||
ts: 3100,
|
||||
|
|
|
|||
|
|
@ -172,7 +172,7 @@ async fn session_run_logs_entries() {
|
|||
|
||||
let entries = store.read_all(sid, segid).unwrap();
|
||||
|
||||
// SegmentStart, UserInput, AssistantItems, TurnEnd, RunCompleted (at minimum)
|
||||
// SegmentStart, UserInput, AssistantItem, TurnEnd, RunCompleted (at minimum)
|
||||
assert!(
|
||||
entries.len() >= 4,
|
||||
"expected at least 4 entries, got {}",
|
||||
|
|
|
|||
|
|
@ -982,22 +982,6 @@ impl App {
|
|||
let value = serde_json::to_value(&item).expect("SystemItem is Serialize");
|
||||
self.apply_system_item(&value);
|
||||
}
|
||||
session_store::LogEntry::AssistantItems { items, .. }
|
||||
| session_store::LogEntry::ToolResults { items, .. }
|
||||
| session_store::LogEntry::HookInjectedItems { items, .. } => {
|
||||
for logged in items {
|
||||
let item: llm_worker::Item = logged.into();
|
||||
let item_value = serde_json::to_value(&item).expect("Item is Serialize");
|
||||
self.push_history_item(&item_value);
|
||||
}
|
||||
}
|
||||
session_store::LogEntry::SystemItems { items, .. } => {
|
||||
for system_item in items {
|
||||
let value =
|
||||
serde_json::to_value(&system_item).expect("SystemItem is Serialize");
|
||||
self.apply_system_item(&value);
|
||||
}
|
||||
}
|
||||
// Non-history-bearing variants don't affect the block view.
|
||||
_ => {}
|
||||
}
|
||||
|
|
@ -1685,33 +1669,41 @@ mod completion_flow_tests {
|
|||
arguments: r#"{"subject":"live","description":""}"#.into(),
|
||||
});
|
||||
|
||||
let assistant_items_entry = serde_json::json!({
|
||||
"kind": "assistant_items",
|
||||
"ts": 1,
|
||||
"items": [
|
||||
{
|
||||
let assistant_item_entries = vec![
|
||||
serde_json::json!({
|
||||
"kind": "assistant_item",
|
||||
"ts": 1,
|
||||
"item": {
|
||||
"kind": "tool_call",
|
||||
"call_id": "c1",
|
||||
"name": "TaskCreate",
|
||||
"arguments": r#"{"subject":"a","description":"A"}"#,
|
||||
},
|
||||
{
|
||||
}),
|
||||
serde_json::json!({
|
||||
"kind": "assistant_item",
|
||||
"ts": 2,
|
||||
"item": {
|
||||
"kind": "tool_call",
|
||||
"call_id": "c2",
|
||||
"name": "TaskCreate",
|
||||
"arguments": r#"{"subject":"b","description":"B"}"#,
|
||||
},
|
||||
{
|
||||
}),
|
||||
serde_json::json!({
|
||||
"kind": "assistant_item",
|
||||
"ts": 3,
|
||||
"item": {
|
||||
"kind": "tool_call",
|
||||
"call_id": "u1",
|
||||
"name": "TaskUpdate",
|
||||
"arguments": r#"{"taskid":2,"status":"inprogress"}"#,
|
||||
},
|
||||
],
|
||||
});
|
||||
}),
|
||||
];
|
||||
app.handle_pod_event(Event::Snapshot {
|
||||
greeting: test_greeting(),
|
||||
entries: vec![assistant_items_entry],
|
||||
entries: assistant_item_entries,
|
||||
status: PodStatus::Running,
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -200,11 +200,6 @@ fn last_message_preview(entries: &[LogEntry]) -> Option<String> {
|
|||
return Some(format!("assistant: {}", trim_one_line(&text, 60)));
|
||||
}
|
||||
}
|
||||
LogEntry::AssistantItems { items, .. } => {
|
||||
if let Some(text) = items.iter().find_map(first_text_logged) {
|
||||
return Some(format!("assistant: {}", trim_one_line(&text, 60)));
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -99,13 +99,13 @@ pub enum LogEntry {
|
|||
UserInput { ts: u64, item: Item },
|
||||
|
||||
// アシスタント応答(worker.rs:1040-1041 に対応)
|
||||
AssistantItems { ts: u64, items: Vec<Item> },
|
||||
AssistantItem { ts: u64, item: Item },
|
||||
|
||||
// ツール実行結果(worker.rs:897-900, 1072-1076 に対応)
|
||||
ToolResults { ts: u64, items: Vec<Item> },
|
||||
ToolResult { ts: u64, item: Item },
|
||||
|
||||
// Hook 注入 Items(worker.rs:1055 ContinueWithMessages に対応)
|
||||
HookInjectedItems { ts: u64, items: Vec<Item> },
|
||||
// typed system injection
|
||||
SystemItem { ts: u64, item: SystemItem },
|
||||
|
||||
// ターン境界
|
||||
TurnEnd { ts: u64, turn_count: usize },
|
||||
|
|
@ -126,7 +126,7 @@ pub enum LogEntry {
|
|||
pub enum Outcome { Finished, Paused, Error { message: String } }
|
||||
```
|
||||
|
||||
**Replay ロジック**: 全エントリ種別を走査し、`*Items` / `UserInput` → history に append、
|
||||
**Replay ロジック**: 全エントリ種別を走査し、`AssistantItem` / `ToolResult` / `SystemItem` / `UserInput` → history に append、
|
||||
`TurnEnd` → turn_count 更新、`CacheLocked` → locked_prefix_len 設定。
|
||||
|
||||
### TraceEntry(event_trace.rs)
|
||||
|
|
|
|||
|
|
@ -1,20 +1,15 @@
|
|||
# ReadPodOutput が新しい LogEntry 形式を読めず no new assistant text になる
|
||||
# ReadPodOutput が LogEntry schema の分岐に取り残され no new assistant text になる
|
||||
|
||||
## 観測
|
||||
|
||||
spawned Pod のレビュー出力について、operator が attach すると assistant 出力が見える一方、spawner 側の `ReadPodOutput` は `pod ... running; no new assistant text` を返した。
|
||||
|
||||
## 原因候補
|
||||
## 原因
|
||||
|
||||
`crates/pod/src/spawn/comm_tools.rs` の `extract_assistant_text` は、`Event::Snapshot` 内の `LogEntry` から assistant text を抽出する際に以下だけを対象にしている。
|
||||
`crates/pod/src/spawn/comm_tools.rs` の `extract_assistant_text` が、`Event::Snapshot` 内の `LogEntry` を独自に解釈していた。session log の標準形は `LogEntry::AssistantItem { item, .. }` だが、`ReadPodOutput` 側の抽出対象が古い複数 item 形式に寄ったままになっていたため、新しい assistant 出力が snapshot に存在しても取りこぼした。
|
||||
|
||||
- `LogEntry::SegmentStart { history, .. }`
|
||||
- legacy plural の `LogEntry::AssistantItems { items, .. }`
|
||||
|
||||
現在の session log は新規書き込みで singular の `LogEntry::AssistantItem { item, .. }` を使うため、新しい assistant 出力が snapshot に存在しても抽出対象にならない。その結果、`ReadPodOutput` は cursor を進めつつ text は空と判断し、以後の read でも見えなくなる。
|
||||
|
||||
これは entry hash の問題ではなく、session-store の新しい singular LogEntry 形式への tool 側追従漏れと見られる。
|
||||
これは entry hash の問題ではなく、`LogEntry` に対する派生操作が各所で独自実装され、後方互換 variant が残ったことで標準形への追従漏れを型で検出できなかった問題。
|
||||
|
||||
## 対応
|
||||
|
||||
`tickets/read-pod-output-singular-log-entry.md` を追加した。`extract_assistant_text` に `LogEntry::AssistantItem` 対応を追加し、singular entry の snapshot から text を返す test を追加する。
|
||||
`LogEntry` から古い複数 item 形式の後方互換 variant を削除し、`ReadPodOutput` / TUI / picker / tests を現在の singular entry だけに揃える。これにより schema 変更時に独自 match の取り残しが compile error として表面化する。
|
||||
|
|
|
|||
|
|
@ -1,32 +0,0 @@
|
|||
# ReadPodOutput: singular LogEntry 形式への追従
|
||||
|
||||
## 背景
|
||||
|
||||
`ReadPodOutput` は spawned Pod の socket に接続して `Event::Snapshot` を読み、差分 cursor 以降の assistant text を抽出する。現在の抽出処理は `SegmentStart.history` と legacy plural の `LogEntry::AssistantItems` だけを見ている。
|
||||
|
||||
一方、現在の session log は新規書き込みで singular の `LogEntry::AssistantItem` を使う。実際に attach では assistant 出力が見えているのに、spawner 側の `ReadPodOutput` が `no new assistant text` を返す事象が確認された。これは hash cursor の問題ではなく、`extract_assistant_text` が新しい LogEntry 形式を読めていないことが主因と見られる。
|
||||
|
||||
## 要件
|
||||
|
||||
- `crates/pod/src/spawn/comm_tools.rs` の `extract_assistant_text` を現在の `LogEntry` 形式に追従させる。
|
||||
- `LogEntry::AssistantItem { item, .. }` を assistant text 抽出対象に含める。
|
||||
- legacy `AssistantItems` と `SegmentStart.history` の扱いは維持する。
|
||||
- 必要なら `SystemItem` / tool result / reasoning item は対象外でよいが、assistant message item の取りこぼしはなくす。
|
||||
- `ReadPodOutput` の cursor は item index ベースのままでよいが、抽出対象を見落として cursor だけ進む退行を test で防ぐ。
|
||||
- attach/TUI に見える assistant output と `ReadPodOutput` の結果が乖離しないようにする。
|
||||
|
||||
## 完了条件
|
||||
|
||||
- singular `LogEntry::AssistantItem` を含む snapshot に対して、`ReadPodOutput` が assistant text を返す unit/integration test を追加する。
|
||||
- legacy `LogEntry::AssistantItems` の既存 test が引き続き通る。
|
||||
- cursor の2回目 read は引き続き `no new assistant text` になる。
|
||||
- `cargo fmt --check`
|
||||
- `cargo check --workspace`
|
||||
- `cargo test -p pod`
|
||||
|
||||
## 範囲外
|
||||
|
||||
- cursor 永続化
|
||||
- ReadPodOutput の protocol/API 変更
|
||||
- 過去 Pod 探索 / restore tool の実装
|
||||
- assistant text 以外の structured event 表示
|
||||
Loading…
Reference in New Issue
Block a user