Compare commits
No commits in common. "e95c35b76da7e9334d2eef1eece37857754a9a6f" and "46765404bf5a3bce86b07182a723bcdee78488d8" have entirely different histories.
e95c35b76d
...
46765404bf
8
.insomnia/.gitignore
vendored
8
.insomnia/.gitignore
vendored
|
|
@ -1 +1,7 @@
|
||||||
/memory/
|
# Generated / session-derived memory state
|
||||||
|
/memory/_staging/
|
||||||
|
/memory/summary.md
|
||||||
|
/memory/decisions/
|
||||||
|
/memory/requests/
|
||||||
|
|
||||||
|
# Project-authored workflows and knowledge are intentionally tracked.
|
||||||
|
|
|
||||||
6
.insomnia/memory/_usage/events.jsonl
Normal file
6
.insomnia/memory/_usage/events.jsonl
Normal file
|
|
@ -0,0 +1,6 @@
|
||||||
|
{"id":"019e1729-5daf-7580-b765-095156c4009a","occurred_at":"2026-05-11T13:10:47.471344537Z","session_id":"019e1706-d800-7021-8042-e40debe644cd","event":"use","source":"WorkflowInvoke","records":[{"kind":"workflow","slug":"auto-maintain","file_bytes":6831,"file_tokens_estimate":1708}]}
|
||||||
|
{"id":"019e1ac0-b9c0-7a31-a04c-12f6d74129b0","occurred_at":"2026-05-12T05:54:58.624201015Z","session_id":"019e1706-d800-7021-8042-e40debe644cd","event":"use","source":"WorkflowInvoke","records":[{"kind":"workflow","slug":"auto-maintain","file_bytes":6831,"file_tokens_estimate":1708}]}
|
||||||
|
{"id":"019e1ac0-d0d0-7151-ae38-4cd148263cda","occurred_at":"2026-05-12T05:55:04.528959274Z","session_id":"019e1706-d800-7021-8042-e40debe644cd","event":"use","source":"WorkflowInvoke","records":[{"kind":"workflow","slug":"worktree-workflow","file_bytes":4188,"file_tokens_estimate":1047}]}
|
||||||
|
{"id":"019e1b66-b238-7342-a75f-3b4258de8c92","occurred_at":"2026-05-12T08:56:15.672925627Z","session_id":"019e1b63-ae47-7123-89b6-3a49d73ae200","event":"use","source":"WorkflowInvoke","records":[{"kind":"workflow","slug":"auto-maintain","file_bytes":6783,"file_tokens_estimate":1696}]}
|
||||||
|
{"id":"019e1b66-b239-71f0-90d7-968f4c0d2ee0","occurred_at":"2026-05-12T08:56:15.673018070Z","session_id":"019e1b63-ae47-7123-89b6-3a49d73ae200","event":"use","source":"WorkflowInvoke","records":[{"kind":"workflow","slug":"worktree-workflow","file_bytes":4188,"file_tokens_estimate":1047}]}
|
||||||
|
{"id":"019e1d46-abf0-7e82-85d6-652eb8535494","occurred_at":"2026-05-12T17:40:31.344574876Z","session_id":"019e1d42-3b60-7c02-9815-f90c89466a46","event":"use","source":"WorkflowInvoke","records":[{"kind":"workflow","slug":"worktree-workflow","file_bytes":4188,"file_tokens_estimate":1047}]}
|
||||||
|
|
@ -21,10 +21,6 @@ use uuid::Uuid;
|
||||||
|
|
||||||
use crate::workspace::WorkspaceLayout;
|
use crate::workspace::WorkspaceLayout;
|
||||||
|
|
||||||
fn is_zero_usize(value: &usize) -> bool {
|
|
||||||
*value == 0
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "snake_case")]
|
#[serde(rename_all = "snake_case")]
|
||||||
pub enum AuditWorker {
|
pub enum AuditWorker {
|
||||||
|
|
@ -145,8 +141,6 @@ pub struct ExtractAudit {
|
||||||
pub struct ConsolidationAudit {
|
pub struct ConsolidationAudit {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub staging_count: usize,
|
pub staging_count: usize,
|
||||||
#[serde(default, skip_serializing_if = "is_zero_usize")]
|
|
||||||
pub invalid_staging_count: usize,
|
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub staging_bytes: u64,
|
pub staging_bytes: u64,
|
||||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,5 @@ pub use input::{
|
||||||
render_tidy_hints,
|
render_tidy_hints,
|
||||||
};
|
};
|
||||||
pub use lock::{LockError, LockRecord, StagingLock};
|
pub use lock::{LockError, LockRecord, StagingLock};
|
||||||
pub use staging::{
|
pub use staging::{StagingEntry, list_staging_entries};
|
||||||
StagingEntriesSnapshot, StagingEntry, list_staging_entries, list_staging_entries_snapshot,
|
|
||||||
};
|
|
||||||
pub use tidy::{TidyHints, collect_tidy_hints};
|
pub use tidy::{TidyHints, collect_tidy_hints};
|
||||||
|
|
|
||||||
|
|
@ -26,58 +26,34 @@ pub struct StagingEntry {
|
||||||
pub bytes: u64,
|
pub bytes: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// staging directory の検査結果。`entries` は current schema として読めた
|
|
||||||
/// staging のみで、`invalid_count` は `.json` だが staging として採用できなかった
|
|
||||||
/// ファイル数。
|
|
||||||
#[derive(Debug, Clone, Default)]
|
|
||||||
pub struct StagingEntriesSnapshot {
|
|
||||||
pub entries: Vec<StagingEntry>,
|
|
||||||
pub invalid_count: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// `<staging_dir>/*.json` を読んで UUIDv7 順に並べた [`StagingEntry`]
|
/// `<staging_dir>/*.json` を読んで UUIDv7 順に並べた [`StagingEntry`]
|
||||||
/// 配列を返す。staging_dir が存在しなければ空配列。読めないファイルや
|
/// 配列を返す。staging_dir が存在しなければ空配列。読めないファイルや
|
||||||
/// JSON parse 失敗は `tracing::warn!` してスキップ(壊れた個別ファイルが
|
/// JSON parse 失敗は `tracing::warn!` してスキップ(壊れた個別ファイルが
|
||||||
/// consolidation 全体を止めないように)。
|
/// consolidation 全体を止めないように)。
|
||||||
pub fn list_staging_entries(layout: &WorkspaceLayout) -> Vec<StagingEntry> {
|
pub fn list_staging_entries(layout: &WorkspaceLayout) -> Vec<StagingEntry> {
|
||||||
list_staging_entries_snapshot(layout).entries
|
|
||||||
}
|
|
||||||
|
|
||||||
/// `<staging_dir>/*.json` を読んで valid staging と invalid staging 件数を返す。
|
|
||||||
/// invalid は自動 migration / 削除 / archive せず、観測可能にするための件数だけを
|
|
||||||
/// 呼び出し側へ渡す。
|
|
||||||
pub fn list_staging_entries_snapshot(layout: &WorkspaceLayout) -> StagingEntriesSnapshot {
|
|
||||||
let dir = layout.staging_dir();
|
let dir = layout.staging_dir();
|
||||||
let entries = match std::fs::read_dir(&dir) {
|
let entries = match std::fs::read_dir(&dir) {
|
||||||
Ok(it) => it,
|
Ok(it) => it,
|
||||||
Err(_) => return StagingEntriesSnapshot::default(),
|
Err(_) => return Vec::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut out: Vec<StagingEntry> = Vec::new();
|
let mut out: Vec<StagingEntry> = Vec::new();
|
||||||
let mut invalid_count = 0;
|
|
||||||
for entry in entries.flatten() {
|
for entry in entries.flatten() {
|
||||||
let path = entry.path();
|
let path = entry.path();
|
||||||
if !path.is_file() {
|
if !path.is_file() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
let stem = match path.file_stem().and_then(|s| s.to_str()) {
|
||||||
|
Some(s) => s,
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
|
let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
|
||||||
if ext != "json" {
|
if ext != "json" {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let stem = match path.file_stem().and_then(|s| s.to_str()) {
|
|
||||||
Some(s) => s,
|
|
||||||
None => {
|
|
||||||
invalid_count += 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let id = match Uuid::parse_str(stem) {
|
let id = match Uuid::parse_str(stem) {
|
||||||
Ok(u) => u,
|
Ok(u) => u,
|
||||||
Err(e) => {
|
Err(_) => continue,
|
||||||
invalid_count += 1;
|
|
||||||
tracing::warn!(path = %path.display(), error = %e, "failed to parse staging entry id");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
let bytes = match std::fs::metadata(&path) {
|
let bytes = match std::fs::metadata(&path) {
|
||||||
Ok(m) => m.len(),
|
Ok(m) => m.len(),
|
||||||
|
|
@ -86,7 +62,6 @@ pub fn list_staging_entries_snapshot(layout: &WorkspaceLayout) -> StagingEntries
|
||||||
let raw = match std::fs::read_to_string(&path) {
|
let raw = match std::fs::read_to_string(&path) {
|
||||||
Ok(s) => s,
|
Ok(s) => s,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
invalid_count += 1;
|
|
||||||
tracing::warn!(path = %path.display(), error = %e, "failed to read staging entry");
|
tracing::warn!(path = %path.display(), error = %e, "failed to read staging entry");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
@ -94,7 +69,6 @@ pub fn list_staging_entries_snapshot(layout: &WorkspaceLayout) -> StagingEntries
|
||||||
let record = match serde_json::from_str::<StagingRecord>(&raw) {
|
let record = match serde_json::from_str::<StagingRecord>(&raw) {
|
||||||
Ok(r) => r,
|
Ok(r) => r,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
invalid_count += 1;
|
|
||||||
tracing::warn!(path = %path.display(), error = %e, "failed to parse staging entry");
|
tracing::warn!(path = %path.display(), error = %e, "failed to parse staging entry");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
@ -107,10 +81,7 @@ pub fn list_staging_entries_snapshot(layout: &WorkspaceLayout) -> StagingEntries
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
out.sort_by_key(|e| e.id);
|
out.sort_by_key(|e| e.id);
|
||||||
StagingEntriesSnapshot {
|
out
|
||||||
entries: out,
|
|
||||||
invalid_count,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
@ -145,26 +116,17 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn skips_lock_file_and_counts_invalid_json() {
|
fn skips_lock_file_and_garbage() {
|
||||||
let tmp = tempfile::TempDir::new().unwrap();
|
let tmp = tempfile::TempDir::new().unwrap();
|
||||||
let layout = WorkspaceLayout::new(tmp.path().to_path_buf());
|
let layout = WorkspaceLayout::new(tmp.path().to_path_buf());
|
||||||
let (_id, _) = write_staging(&layout, source("s", [0, 1]), empty_payload()).unwrap();
|
let (_id, _) = write_staging(&layout, source("s", [0, 1]), empty_payload()).unwrap();
|
||||||
|
|
||||||
// Drop a non-UUID json file, an unparsable UUID-named json file, and
|
// Drop a non-UUID json file and a bare lock file alongside.
|
||||||
// a bare lock file alongside. Lock files are not `.json`; invalid
|
|
||||||
// `.json` files are surfaced separately instead of being mistaken for
|
|
||||||
// an empty staging directory.
|
|
||||||
std::fs::write(layout.staging_dir().join("not-a-uuid.json"), "{}").unwrap();
|
std::fs::write(layout.staging_dir().join("not-a-uuid.json"), "{}").unwrap();
|
||||||
let bad_id = Uuid::now_v7();
|
|
||||||
std::fs::write(layout.staging_dir().join(format!("{bad_id}.json")), "{").unwrap();
|
|
||||||
std::fs::write(layout.staging_dir().join(".consolidation.lock"), "{}").unwrap();
|
std::fs::write(layout.staging_dir().join(".consolidation.lock"), "{}").unwrap();
|
||||||
|
|
||||||
let entries = list_staging_entries(&layout);
|
let entries = list_staging_entries(&layout);
|
||||||
assert_eq!(entries.len(), 1);
|
assert_eq!(entries.len(), 1);
|
||||||
|
|
||||||
let snapshot = list_staging_entries_snapshot(&layout);
|
|
||||||
assert_eq!(snapshot.entries.len(), 1);
|
|
||||||
assert_eq!(snapshot.invalid_count, 2);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
|
||||||
|
|
@ -3098,26 +3098,16 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
);
|
);
|
||||||
let event_tx = self.event_tx.as_ref();
|
let event_tx = self.event_tx.as_ref();
|
||||||
|
|
||||||
let staging_snapshot = consolidate::list_staging_entries_snapshot(&layout);
|
let entries = consolidate::list_staging_entries(&layout);
|
||||||
let invalid_staging_count = staging_snapshot.invalid_count;
|
|
||||||
let entries = staging_snapshot.entries;
|
|
||||||
if entries.is_empty() {
|
if entries.is_empty() {
|
||||||
let reason = if invalid_staging_count == 0 {
|
|
||||||
"no_staging_entries".to_string()
|
|
||||||
} else {
|
|
||||||
format!("no_valid_staging_entries invalid={invalid_staging_count}")
|
|
||||||
};
|
|
||||||
audit.emit(
|
audit.emit(
|
||||||
&layout,
|
&layout,
|
||||||
event_tx,
|
event_tx,
|
||||||
memory::audit::WorkerLifecycleStatus::Skipped,
|
memory::audit::WorkerLifecycleStatus::Skipped,
|
||||||
reason,
|
"no_staging_entries",
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
Some(memory::audit::ConsolidationAudit {
|
Some(memory::audit::ConsolidationAudit::default()),
|
||||||
invalid_staging_count,
|
|
||||||
..Default::default()
|
|
||||||
}),
|
|
||||||
);
|
);
|
||||||
return Ok(ConsolidateDecision::Skipped);
|
return Ok(ConsolidateDecision::Skipped);
|
||||||
}
|
}
|
||||||
|
|
@ -3127,7 +3117,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
let consumed_ids: Vec<uuid::Uuid> = entries.iter().map(|e| e.id).collect();
|
let consumed_ids: Vec<uuid::Uuid> = entries.iter().map(|e| e.id).collect();
|
||||||
let base_consolidation = memory::audit::ConsolidationAudit {
|
let base_consolidation = memory::audit::ConsolidationAudit {
|
||||||
staging_count: total_files,
|
staging_count: total_files,
|
||||||
invalid_staging_count,
|
|
||||||
staging_bytes: total_bytes,
|
staging_bytes: total_bytes,
|
||||||
consumed_staging_ids: consumed_ids.iter().map(ToString::to_string).collect(),
|
consumed_staging_ids: consumed_ids.iter().map(ToString::to_string).collect(),
|
||||||
operations: memory::audit::OperationCounts::default(),
|
operations: memory::audit::OperationCounts::default(),
|
||||||
|
|
@ -3415,38 +3404,17 @@ impl WorkerAuditBase {
|
||||||
consolidation,
|
consolidation,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
if should_emit_memory_worker_event(self.worker, status, &reason) {
|
emit_memory_worker_event(
|
||||||
emit_memory_worker_event(
|
event_tx,
|
||||||
event_tx,
|
self.run_id,
|
||||||
self.run_id,
|
self.worker,
|
||||||
self.worker,
|
status,
|
||||||
status,
|
self.trigger,
|
||||||
self.trigger,
|
&reason,
|
||||||
&reason,
|
);
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn should_emit_memory_worker_event(
|
|
||||||
worker: memory::audit::AuditWorker,
|
|
||||||
status: memory::audit::WorkerLifecycleStatus,
|
|
||||||
reason: &str,
|
|
||||||
) -> bool {
|
|
||||||
if worker == memory::audit::AuditWorker::MemoryConsolidation
|
|
||||||
&& status == memory::audit::WorkerLifecycleStatus::Skipped
|
|
||||||
{
|
|
||||||
return !is_idle_consolidation_skip_reason(reason);
|
|
||||||
}
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_idle_consolidation_skip_reason(reason: &str) -> bool {
|
|
||||||
reason == "no_staging_entries"
|
|
||||||
|| reason == "consolidation_threshold_disabled"
|
|
||||||
|| reason.starts_with("threshold_not_reached")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn memory_language(cfg: &manifest::MemoryConfig) -> &str {
|
fn memory_language(cfg: &manifest::MemoryConfig) -> &str {
|
||||||
cfg.language
|
cfg.language
|
||||||
.as_deref()
|
.as_deref()
|
||||||
|
|
@ -4279,45 +4247,6 @@ fn current_pwd() -> Result<PathBuf, PodError> {
|
||||||
.map_err(|source| PodError::InvalidPwd { pwd: cwd, source })
|
.map_err(|source| PodError::InvalidPwd { pwd: cwd, source })
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod memory_worker_event_tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn suppresses_idle_consolidation_skip_worker_events() {
|
|
||||||
assert!(!should_emit_memory_worker_event(
|
|
||||||
memory::audit::AuditWorker::MemoryConsolidation,
|
|
||||||
memory::audit::WorkerLifecycleStatus::Skipped,
|
|
||||||
"no_staging_entries",
|
|
||||||
));
|
|
||||||
assert!(!should_emit_memory_worker_event(
|
|
||||||
memory::audit::AuditWorker::MemoryConsolidation,
|
|
||||||
memory::audit::WorkerLifecycleStatus::Skipped,
|
|
||||||
"threshold_not_reached files=1 bytes=64 min_files=2 min_bytes=1048576",
|
|
||||||
));
|
|
||||||
assert!(!should_emit_memory_worker_event(
|
|
||||||
memory::audit::AuditWorker::MemoryConsolidation,
|
|
||||||
memory::audit::WorkerLifecycleStatus::Skipped,
|
|
||||||
"consolidation_threshold_disabled",
|
|
||||||
));
|
|
||||||
assert!(should_emit_memory_worker_event(
|
|
||||||
memory::audit::AuditWorker::MemoryConsolidation,
|
|
||||||
memory::audit::WorkerLifecycleStatus::Skipped,
|
|
||||||
"no_valid_staging_entries invalid=1",
|
|
||||||
));
|
|
||||||
assert!(should_emit_memory_worker_event(
|
|
||||||
memory::audit::AuditWorker::MemoryConsolidation,
|
|
||||||
memory::audit::WorkerLifecycleStatus::Completed,
|
|
||||||
"completed",
|
|
||||||
));
|
|
||||||
assert!(should_emit_memory_worker_event(
|
|
||||||
memory::audit::AuditWorker::MemoryExtract,
|
|
||||||
memory::audit::WorkerLifecycleStatus::Skipped,
|
|
||||||
"threshold_not_reached files=1",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod build_summary_prompt_tests {
|
mod build_summary_prompt_tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ use async_trait::async_trait;
|
||||||
use llm_worker::llm_client::types::{ContentPart, Item, Role};
|
use llm_worker::llm_client::types::{ContentPart, Item, Role};
|
||||||
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
|
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
|
||||||
use protocol::stream::{JsonLineReader, JsonLineWriter};
|
use protocol::stream::{JsonLineReader, JsonLineWriter};
|
||||||
use protocol::{ErrorCode, Event, InvokeKind, Method};
|
use protocol::{ErrorCode, Event, Method};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use session_store::LogEntry;
|
use session_store::LogEntry;
|
||||||
use tokio::net::UnixStream;
|
use tokio::net::UnixStream;
|
||||||
|
|
@ -365,8 +365,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Failure modes distinguished by `SendToPod`.
|
/// Failure modes distinguished by `SendToPod`.
|
||||||
#[derive(Debug)]
|
enum SendRunError {
|
||||||
pub(crate) enum SendRunError {
|
|
||||||
/// Target Pod responded with `Error { AlreadyRunning }` — the
|
/// Target Pod responded with `Error { AlreadyRunning }` — the
|
||||||
/// caller can retry once the current turn ends.
|
/// caller can retry once the current turn ends.
|
||||||
AlreadyRunning,
|
AlreadyRunning,
|
||||||
|
|
@ -375,12 +374,10 @@ pub(crate) enum SendRunError {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Write `Method::Run` to the target and read back events until we see
|
/// Write `Method::Run` to the target and read back events until we see
|
||||||
/// evidence that the controller accepted the run (`UserMessage`,
|
/// either `TurnStart` (accepted) or `Error { AlreadyRunning }`
|
||||||
/// `TurnStart`, or a user-send `InvokeStart`) or rejected it with
|
/// (rejected). Any replayed alerts that precede the response are
|
||||||
/// `Error { AlreadyRunning }`. Any connect-time Snapshot or replayed alerts
|
/// skipped. Times out per-read so a stuck Pod doesn't hang the tool.
|
||||||
/// that precede the response are skipped. Times out per-read so a stuck Pod
|
async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRunError> {
|
||||||
/// doesn't hang the tool.
|
|
||||||
pub(crate) async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRunError> {
|
|
||||||
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
|
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
|
||||||
.await
|
.await
|
||||||
.map_err(|_| SendRunError::Io("connect timed out".into()))?
|
.map_err(|_| SendRunError::Io("connect timed out".into()))?
|
||||||
|
|
@ -407,19 +404,10 @@ pub(crate) async fn send_run_and_confirm(socket: &Path, input: String) -> Result
|
||||||
code: ErrorCode::AlreadyRunning,
|
code: ErrorCode::AlreadyRunning,
|
||||||
..
|
..
|
||||||
}) => return Err(SendRunError::AlreadyRunning),
|
}) => return Err(SendRunError::AlreadyRunning),
|
||||||
Some(Event::Error { code, message }) => {
|
Some(Event::TurnStart { .. }) => return Ok(()),
|
||||||
return Err(SendRunError::Io(format!(
|
// Alerts and other pre-turn events are replayed to new
|
||||||
"pod returned {code:?}: {message}"
|
// subscribers; keep reading until the controller's response
|
||||||
)));
|
// to our `Run` shows up.
|
||||||
}
|
|
||||||
Some(Event::InvokeStart {
|
|
||||||
kind: InvokeKind::UserSend,
|
|
||||||
})
|
|
||||||
| Some(Event::UserMessage { .. })
|
|
||||||
| Some(Event::TurnStart { .. }) => return Ok(()),
|
|
||||||
// Alerts, Snapshot, and other pre-turn events can precede the
|
|
||||||
// controller's response; keep reading until the Run is accepted
|
|
||||||
// or rejected.
|
|
||||||
Some(_) => continue,
|
Some(_) => continue,
|
||||||
None => return Err(SendRunError::Io("connection closed before response".into())),
|
None => return Err(SendRunError::Io("connection closed before response".into())),
|
||||||
}
|
}
|
||||||
|
|
@ -567,78 +555,6 @@ mod tests {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn serve_initial_events_then_run_ack(
|
|
||||||
listener: UnixListener,
|
|
||||||
initial_events: Vec<Event>,
|
|
||||||
ack: Event,
|
|
||||||
) -> JoinHandle<Option<Method>> {
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let (stream, _) = listener.accept().await.ok()?;
|
|
||||||
let (r, w) = stream.into_split();
|
|
||||||
let mut reader = JsonLineReader::new(r);
|
|
||||||
let mut writer = JsonLineWriter::new(w);
|
|
||||||
for event in initial_events {
|
|
||||||
writer.write(&event).await.ok()?;
|
|
||||||
}
|
|
||||||
let method = reader.next::<Method>().await.ok().flatten()?;
|
|
||||||
writer.write(&ack).await.ok()?;
|
|
||||||
Some(method)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn send_run_and_confirm_keeps_connection_open_until_user_message_ack() {
|
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
let socket = tmp.path().join("pod.sock");
|
|
||||||
let listener = UnixListener::bind(&socket).unwrap();
|
|
||||||
let received = serve_initial_events_then_run_ack(
|
|
||||||
listener,
|
|
||||||
vec![
|
|
||||||
Event::Alert(Alert {
|
|
||||||
level: AlertLevel::Warn,
|
|
||||||
source: AlertSource::Pod,
|
|
||||||
message: "replayed alert".into(),
|
|
||||||
timestamp_ms: 0,
|
|
||||||
}),
|
|
||||||
snapshot(Vec::new()),
|
|
||||||
],
|
|
||||||
Event::UserMessage {
|
|
||||||
segments: vec![protocol::Segment::text("hello")],
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
send_run_and_confirm(&socket, "hello".into()).await.unwrap();
|
|
||||||
|
|
||||||
let method = received.await.unwrap().expect("expected method");
|
|
||||||
match method {
|
|
||||||
Method::Run { input } => {
|
|
||||||
assert_eq!(protocol::Segment::flatten_to_text(&input), "hello");
|
|
||||||
}
|
|
||||||
other => panic!("expected Run, got {other:?}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn send_run_and_confirm_reports_already_running() {
|
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
let socket = tmp.path().join("pod.sock");
|
|
||||||
let listener = UnixListener::bind(&socket).unwrap();
|
|
||||||
let received = serve_initial_events_then_run_ack(
|
|
||||||
listener,
|
|
||||||
vec![snapshot(Vec::new())],
|
|
||||||
Event::Error {
|
|
||||||
code: ErrorCode::AlreadyRunning,
|
|
||||||
message: "busy".into(),
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
let err = send_run_and_confirm(&socket, "hello".into())
|
|
||||||
.await
|
|
||||||
.expect_err("expected AlreadyRunning");
|
|
||||||
assert!(matches!(err, SendRunError::AlreadyRunning));
|
|
||||||
assert!(matches!(received.await.unwrap(), Some(Method::Run { .. })));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn connect_and_send_drains_initial_alert_and_snapshot_before_method() {
|
async fn connect_and_send_drains_initial_alert_and_snapshot_before_method() {
|
||||||
let tmp = TempDir::new().unwrap();
|
let tmp = TempDir::new().unwrap();
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,8 @@ use manifest::{
|
||||||
ModelManifest, Permission, PodManifestConfig, PodMetaConfig, ScopeConfig, ScopeRule,
|
ModelManifest, Permission, PodManifestConfig, PodMetaConfig, ScopeConfig, ScopeRule,
|
||||||
SharedScope, WorkerManifestConfig,
|
SharedScope, WorkerManifestConfig,
|
||||||
};
|
};
|
||||||
|
use protocol::Method;
|
||||||
|
use protocol::stream::JsonLineWriter;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use session_store::PodScopeSnapshot;
|
use session_store::PodScopeSnapshot;
|
||||||
use tokio::net::UnixStream;
|
use tokio::net::UnixStream;
|
||||||
|
|
@ -26,7 +28,6 @@ use tokio::time::sleep;
|
||||||
use crate::ipc::event;
|
use crate::ipc::event;
|
||||||
use crate::runtime::dir::SpawnedPodRecord;
|
use crate::runtime::dir::SpawnedPodRecord;
|
||||||
use crate::runtime::pod_registry::{self, LockFileGuard, ScopeLockError};
|
use crate::runtime::pod_registry::{self, LockFileGuard, ScopeLockError};
|
||||||
use crate::spawn::comm_tools::{SendRunError, send_run_and_confirm};
|
|
||||||
use crate::spawn::registry::SpawnedPodRegistry;
|
use crate::spawn::registry::SpawnedPodRegistry;
|
||||||
use protocol::PodEvent;
|
use protocol::PodEvent;
|
||||||
|
|
||||||
|
|
@ -257,6 +258,8 @@ impl Tool for SpawnPodTool {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
send_run(&predicted_socket, &input.task).await?;
|
||||||
|
|
||||||
let record = SpawnedPodRecord {
|
let record = SpawnedPodRecord {
|
||||||
pod_name: input.name.clone(),
|
pod_name: input.name.clone(),
|
||||||
socket_path: predicted_socket.clone(),
|
socket_path: predicted_socket.clone(),
|
||||||
|
|
@ -281,10 +284,6 @@ impl Tool for SpawnPodTool {
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
send_run_and_confirm(&predicted_socket, input.task.clone())
|
|
||||||
.await
|
|
||||||
.map_err(|err| spawn_delivery_error(&input.name, err))?;
|
|
||||||
|
|
||||||
Ok(ToolOutput {
|
Ok(ToolOutput {
|
||||||
summary: format!(
|
summary: format!(
|
||||||
"spawned pod `{}` listening on {}",
|
"spawned pod `{}` listening on {}",
|
||||||
|
|
@ -459,15 +458,23 @@ async fn wait_for_socket(path: &Path, timeout: Duration) -> Result<(), ToolError
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn spawn_delivery_error(pod_name: &str, err: SendRunError) -> ToolError {
|
async fn send_run(socket: &Path, task: &str) -> Result<(), ToolError> {
|
||||||
match err {
|
let stream = UnixStream::connect(socket)
|
||||||
SendRunError::AlreadyRunning => ToolError::ExecutionFailed(format!(
|
.await
|
||||||
"spawned pod `{pod_name}` rejected its initial task as already running; the pod remains registered and can be inspected or stopped"
|
.map_err(|e| ToolError::ExecutionFailed(format!("connect {}: {e}", socket.display())))?;
|
||||||
)),
|
let (_reader, writer) = stream.into_split();
|
||||||
SendRunError::Io(msg) => ToolError::ExecutionFailed(format!(
|
let mut w = JsonLineWriter::new(writer);
|
||||||
"spawned pod `{pod_name}` did not confirm initial task delivery: {msg}; the pod remains registered and can be inspected or stopped"
|
w.write(&Method::Run {
|
||||||
)),
|
input: vec![protocol::Segment::text(task)],
|
||||||
}
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|e| ToolError::ExecutionFailed(format!("send Method::Run: {e}")))?;
|
||||||
|
// Drop the writer to close the socket's write half. The flush
|
||||||
|
// inside `JsonLineWriter::write` has already pushed the bytes
|
||||||
|
// across, so the child will see a complete method line followed by
|
||||||
|
// EOF.
|
||||||
|
drop(w);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pod_registry_err_to_tool(e: ScopeLockError) -> ToolError {
|
fn pod_registry_err_to_tool(e: ScopeLockError) -> ToolError {
|
||||||
|
|
|
||||||
|
|
@ -27,9 +27,8 @@ use memory::WorkspaceLayout;
|
||||||
use memory::extract::{ExtractedPayload, write_staging};
|
use memory::extract::{ExtractedPayload, write_staging};
|
||||||
use memory::schema::SourceRef;
|
use memory::schema::SourceRef;
|
||||||
use session_store::FsStore;
|
use session_store::FsStore;
|
||||||
use tokio::sync::broadcast;
|
|
||||||
|
|
||||||
use pod::{Event, Pod};
|
use pod::Pod;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct MockClient {
|
struct MockClient {
|
||||||
|
|
@ -184,32 +183,6 @@ fn write_n_staging(layout: &WorkspaceLayout, n: usize) -> Vec<uuid::Uuid> {
|
||||||
ids
|
ids
|
||||||
}
|
}
|
||||||
|
|
||||||
fn attach_event_receiver(pod: &mut Pod<MockClient, FsStore>) -> broadcast::Receiver<Event> {
|
|
||||||
let (tx, rx) = broadcast::channel(16);
|
|
||||||
pod.attach_event_tx(tx);
|
|
||||||
rx
|
|
||||||
}
|
|
||||||
|
|
||||||
fn collect_memory_worker_reasons(rx: &mut broadcast::Receiver<Event>) -> Vec<String> {
|
|
||||||
let mut reasons = Vec::new();
|
|
||||||
loop {
|
|
||||||
match rx.try_recv() {
|
|
||||||
Ok(Event::MemoryWorker(event)) => reasons.push(event.reason),
|
|
||||||
Ok(_) => {}
|
|
||||||
Err(broadcast::error::TryRecvError::Empty) => break,
|
|
||||||
Err(err) => panic!("unexpected broadcast receive error: {err}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
reasons
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_audit_jsonl(layout: &WorkspaceLayout) -> Vec<serde_json::Value> {
|
|
||||||
let text = std::fs::read_to_string(layout.audit_current_log_path()).unwrap();
|
|
||||||
text.lines()
|
|
||||||
.map(|line| serde_json::from_str::<serde_json::Value>(line).unwrap())
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn no_memory_section_is_a_noop() {
|
async fn no_memory_section_is_a_noop() {
|
||||||
let pwd = tempfile::tempdir().unwrap();
|
let pwd = tempfile::tempdir().unwrap();
|
||||||
|
|
@ -269,79 +242,6 @@ async fn empty_staging_skips() {
|
||||||
// No mock calls expected.
|
// No mock calls expected.
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn empty_staging_skip_is_audit_only() {
|
|
||||||
let pwd = tempfile::tempdir().unwrap();
|
|
||||||
let client = MockClient::new(vec![]);
|
|
||||||
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
|
|
||||||
let mut rx = attach_event_receiver(&mut pod);
|
|
||||||
|
|
||||||
pod.try_post_run_consolidate().await.unwrap();
|
|
||||||
|
|
||||||
assert!(collect_memory_worker_reasons(&mut rx).is_empty());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn invalid_only_staging_is_distinct_from_no_staging() {
|
|
||||||
let pwd = tempfile::tempdir().unwrap();
|
|
||||||
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
|
||||||
std::fs::create_dir_all(layout.staging_dir()).unwrap();
|
|
||||||
let invalid_id = uuid::Uuid::now_v7();
|
|
||||||
let invalid_path = layout.staging_dir().join(format!("{invalid_id}.json"));
|
|
||||||
std::fs::write(&invalid_path, "{").unwrap();
|
|
||||||
|
|
||||||
let client = MockClient::new(vec![]);
|
|
||||||
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
|
|
||||||
let mut rx = attach_event_receiver(&mut pod);
|
|
||||||
|
|
||||||
pod.try_post_run_consolidate().await.unwrap();
|
|
||||||
|
|
||||||
assert!(invalid_path.exists(), "invalid staging is not auto-deleted");
|
|
||||||
let reasons = collect_memory_worker_reasons(&mut rx);
|
|
||||||
assert_eq!(reasons, vec!["no_valid_staging_entries invalid=1"]);
|
|
||||||
|
|
||||||
let audit = read_audit_jsonl(&layout);
|
|
||||||
let last = audit.last().unwrap();
|
|
||||||
assert_eq!(last["reason"], "no_valid_staging_entries invalid=1");
|
|
||||||
assert_eq!(last["consolidation"]["staging_count"], 0);
|
|
||||||
assert_eq!(last["consolidation"]["invalid_staging_count"], 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn below_threshold_skip_is_audit_only() {
|
|
||||||
let pwd = tempfile::tempdir().unwrap();
|
|
||||||
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
|
||||||
write_n_staging(&layout, 1); // threshold is 2
|
|
||||||
|
|
||||||
let client = MockClient::new(vec![]);
|
|
||||||
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
|
|
||||||
let mut rx = attach_event_receiver(&mut pod);
|
|
||||||
|
|
||||||
pod.try_post_run_consolidate().await.unwrap();
|
|
||||||
|
|
||||||
assert!(collect_memory_worker_reasons(&mut rx).is_empty());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn completed_event_survives_terminal_empty_drain_skip() {
|
|
||||||
let pwd = tempfile::tempdir().unwrap();
|
|
||||||
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
|
||||||
write_n_staging(&layout, 2); // threshold is 2 — fires.
|
|
||||||
|
|
||||||
let client = MockClient::new(vec![done("ok")]);
|
|
||||||
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
|
|
||||||
let mut rx = attach_event_receiver(&mut pod);
|
|
||||||
|
|
||||||
pod.try_post_run_consolidate().await.unwrap();
|
|
||||||
|
|
||||||
let reasons = collect_memory_worker_reasons(&mut rx);
|
|
||||||
assert_eq!(reasons.len(), 2);
|
|
||||||
assert!(reasons[0].starts_with("staging_threshold_reached files=2 bytes="));
|
|
||||||
assert_eq!(reasons[1], "completed_no_record_changes");
|
|
||||||
let audit = read_audit_jsonl(&layout);
|
|
||||||
assert_eq!(audit.last().unwrap()["reason"], "no_staging_entries");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn below_threshold_skips_and_does_not_take_lock() {
|
async fn below_threshold_skips_and_does_not_take_lock() {
|
||||||
let pwd = tempfile::tempdir().unwrap();
|
let pwd = tempfile::tempdir().unwrap();
|
||||||
|
|
|
||||||
|
|
@ -16,8 +16,8 @@ use pod::runtime::dir::{RuntimeDir, SpawnedPodRecord};
|
||||||
use pod::runtime::pod_registry::{self, LockFileGuard};
|
use pod::runtime::pod_registry::{self, LockFileGuard};
|
||||||
use pod::spawn::registry::SpawnedPodRegistry;
|
use pod::spawn::registry::SpawnedPodRegistry;
|
||||||
use pod::spawn::tool::spawn_pod_tool;
|
use pod::spawn::tool::spawn_pod_tool;
|
||||||
use protocol::stream::{JsonLineReader, JsonLineWriter};
|
use protocol::Method;
|
||||||
use protocol::{Event, Method};
|
use protocol::stream::JsonLineReader;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
|
|
@ -97,22 +97,16 @@ async fn bind_mock_pod_socket(runtime_base: &Path, pod_name: &str) -> (PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Launch a tokio task that accepts connections until one carries a
|
/// Launch a tokio task that accepts connections until one carries a
|
||||||
/// `Method` line, then acknowledges it and returns it. `wait_for_socket`
|
/// `Method` line, then returns it. `wait_for_socket` inside the tool
|
||||||
/// inside the tool makes a probe connection that carries no data, so the
|
/// makes a probe connection that carries no data, so the task must
|
||||||
/// task must tolerate an empty connection and keep listening.
|
/// tolerate an empty connection and keep listening.
|
||||||
fn accept_one_method(listener: UnixListener) -> tokio::task::JoinHandle<Option<Method>> {
|
fn accept_one_method(listener: UnixListener) -> tokio::task::JoinHandle<Option<Method>> {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
let (stream, _) = listener.accept().await.ok()?;
|
let (stream, _) = listener.accept().await.ok()?;
|
||||||
let (reader, writer) = stream.into_split();
|
let (reader, _writer) = stream.into_split();
|
||||||
let mut r = JsonLineReader::new(reader);
|
let mut r = JsonLineReader::new(reader);
|
||||||
let mut w = JsonLineWriter::new(writer);
|
|
||||||
if let Ok(Some(method)) = r.next::<Method>().await {
|
if let Ok(Some(method)) = r.next::<Method>().await {
|
||||||
w.write(&Event::UserMessage {
|
|
||||||
segments: vec![protocol::Segment::text("accepted")],
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.ok()?;
|
|
||||||
return Some(method);
|
return Some(method);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,58 +2,42 @@
|
||||||
|
|
||||||
## 背景
|
## 背景
|
||||||
|
|
||||||
`memory-audit-log` 実装後、TUI actionbar に `memory consolidation skipped: no_staging_entries` が表示されるようになった。これは正常な idle/no-op でも頻繁に発生するため、actionbar 上では「何かが skip され続けている」ように見える。
|
`memory-audit-log` 実装後、TUI actionbar に `memory consolidation skipped: no_staging_entries` が表示されるようになった。これは一見すると「staging が無い」状態に見えるが、実際の workspace には `.insomnia/memory/_staging/*.json` が残っている場合がある。
|
||||||
|
|
||||||
また、workspace に `.insomnia/memory/_staging/*.json` が残っているのに、現行 schema として読めないため valid staging entry が 0 件になり、`no_staging_entries` と区別できない場合がある。今回観測された主因は旧 schema の staging file で、`source.session_id` を持ち、現行 schema が要求する `source.segment_id` を持たないものだった。
|
現状の原因は二つある。
|
||||||
|
|
||||||
旧 schema staging の後方互換・migration は求めない。この workspace に残っている旧 staging は必要に応じて手動で削除 / 退避する。実装としては、壊れた staging / 現行 schema として読めない staging がある場合に、`no_staging_entries` と誤認しない観測性だけを持たせる。
|
1. `run_consolidate_once` が成功後に backlog drain のため loop し、直後の空確認で `skipped: no_staging_entries` を emit する。これにより、直前の `completed_record_changes` 表示が actionbar 上で上書きされる。
|
||||||
|
2. 旧 schema の staging file は `source.session_id` を持ち、現行 schema が要求する `source.segment_id` を持たないため parse で skip される。結果として、ファイルは存在するのに valid entry が 0 件となり、`no_staging_entries` と記録される。
|
||||||
|
|
||||||
|
この状態は機能停止ではないが、観測面として misleading であり、memory 機構の稼働状況を人間が誤解しやすい。
|
||||||
|
|
||||||
## 方針
|
## 方針
|
||||||
|
|
||||||
Audit log には worker の skip / no-op を残してよいが、actionbar には人間が見る価値のある memory worker 状態だけを出す。
|
Audit log には worker の skip / no-op を残しつつ、actionbar には人間が見る価値のある状態だけを出す。特に successful consolidation の直後に発生する drain 終端確認の `no_staging_entries` は actionbar に出さない。
|
||||||
|
|
||||||
特に以下は actionbar に出さない。
|
また、staging directory にファイルがあるが current schema として読めない場合は、`no_staging_entries` ではなく invalid staging が存在することを audit log から分かるようにする。
|
||||||
|
|
||||||
- `no_staging_entries`
|
|
||||||
- successful consolidation の直後、backlog drain 終端確認で出る空確認 skip
|
|
||||||
- post-run / periodic check で staging が本当に空だっただけの skip
|
|
||||||
- 通常運用で頻出する no-op / idle skip
|
|
||||||
|
|
||||||
一方で、以下は actionbar に出してよい。
|
|
||||||
|
|
||||||
- consolidation の started / completed
|
|
||||||
- record changes を伴う completed
|
|
||||||
- failed / error
|
|
||||||
- invalid staging が存在するなど、人間が確認すべき状態
|
|
||||||
|
|
||||||
staging directory に file があるが current schema として読めない場合は、audit log 上で invalid staging の存在と件数が分かるようにする。actionbar に出す場合も、`no_staging_entries` ではなく invalid staging と分かる文言にする。
|
|
||||||
|
|
||||||
## 要件
|
## 要件
|
||||||
|
|
||||||
- consolidation が `completed` した直後の drain 確認で発生する idle skip が、直前の actionbar 表示を上書きしない。
|
- consolidation が `completed` した直後の drain 確認で発生する idle skip が、直前の actionbar 表示を上書きしない。
|
||||||
- 例: `completed_record_changes` の直後に `no_staging_entries` を actionbar へ出さない。
|
- 例: `completed_record_changes` の直後に `no_staging_entries` を actionbar へ出さない。
|
||||||
- audit log へ残すかどうかは実装判断でよいが、UI event としては抑制する。
|
- audit log へ残すかどうかは実装判断でよいが、UI event としては抑制する。
|
||||||
- 通常の post-run check / periodic check で staging が本当に空の場合も、actionbar に `no_staging_entries` を出さない。
|
- 通常の post-run check で staging が本当に空の場合も、actionbar に `no_staging_entries` を毎回出さない。
|
||||||
- idle/no-op 状態は audit log 側で確認できればよい。
|
- idle/no-op 状態は audit log 側で確認できればよい。
|
||||||
- `threshold_not_reached` は通常運用の no-op として扱い、actionbar へ常時表示しない。
|
|
||||||
- audit log 側では `no_staging_entries` と区別して記録する。
|
|
||||||
- `list_staging_entries` あるいはその呼び出し側で、invalid / parse-failed staging file の存在を区別できるようにする。
|
- `list_staging_entries` あるいはその呼び出し側で、invalid / parse-failed staging file の存在を区別できるようにする。
|
||||||
- 少なくとも invalid count が audit reason または structured field から分かる。
|
- 少なくとも invalid count が audit reason または structured field から分かる。
|
||||||
- 例: `no_valid_staging_entries invalid=6`。
|
- 例: `no_valid_staging_entries invalid=6`。
|
||||||
- `threshold_not_reached` と `no_valid_staging_entries` / `invalid_staging_entries` は区別される。
|
- `threshold_not_reached` と `no_valid_staging_entries` / `invalid_staging_entries` は区別される。
|
||||||
- old schema staging file の自動 migration / 自動削除 / 自動 archive はしない。
|
- 既存の old schema staging を自動 migration / delete しない。
|
||||||
- 後方互換は不要。
|
- `source.session_id` と `source.segment_id` は意味が違う可能性があるため、この ticket では観測性改善に留める。
|
||||||
- 既存 workspace の旧 staging は手動整理でよい。
|
- 必要なら後続で archive/drop/migration 方針を決める。
|
||||||
- 実装側では、現行 schema として読めない staging を invalid として観測できればよい。
|
|
||||||
|
|
||||||
## 完了条件
|
## 完了条件
|
||||||
|
|
||||||
- successful consolidation の直後に actionbar が `no_staging_entries` で上書きされない。
|
- successful consolidation の直後に actionbar が `no_staging_entries` で上書きされない。
|
||||||
- staging が本当に空の periodic/post-run check は actionbar にノイズを出さない。
|
|
||||||
- `threshold_not_reached` が actionbar を継続的に上書きしない。
|
|
||||||
- staging directory に parse 不能な `.json` がある場合、audit log が `no_staging_entries` ではなく invalid staging の存在を示す。
|
- staging directory に parse 不能な `.json` がある場合、audit log が `no_staging_entries` ではなく invalid staging の存在を示す。
|
||||||
- invalid staging を actionbar に出す場合、`no_staging_entries` ではなく invalid staging と分かる表示になる。
|
- staging が本当に空の periodic/post-run check は actionbar にノイズを出さない。
|
||||||
- consolidation skip / invalid staging / actionbar 抑制の挙動を確認する test がある。
|
- consolidation skip / invalid staging の挙動を確認する test がある。
|
||||||
- `cargo fmt --check` と関連 crate の test が通る。
|
- `cargo fmt --check` と関連 crate の test が通る。
|
||||||
|
|
||||||
## 範囲外
|
## 範囲外
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user