From 663ec91b458a9c3a92aea8b6c2a1c871de08614a Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 19 Apr 2026 11:57:55 +0900 Subject: [PATCH] =?UTF-8?q?Anthropic=E3=81=AE=E3=82=AD=E3=83=A3=E3=83=83?= =?UTF-8?q?=E3=82=B7=E3=83=A5=E3=83=9D=E3=82=A4=E3=83=B3=E3=83=88=E3=82=92?= =?UTF-8?q?=E6=89=93=E3=81=A4=E5=AE=9F=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TODO.md | 1 + .../llm_client/scheme/anthropic/request.rs | 500 ++++++++++++++---- crates/llm-worker/src/llm_client/types.rs | 7 + crates/llm-worker/src/worker.rs | 25 + crates/pod/src/pod.rs | 21 +- tickets/anthropic-prompt-cache.md | 117 ++++ tickets/anthropic-prompt-cache.review.md | 75 +++ 7 files changed, 646 insertions(+), 100 deletions(-) create mode 100644 tickets/anthropic-prompt-cache.md create mode 100644 tickets/anthropic-prompt-cache.review.md diff --git a/TODO.md b/TODO.md index 974d878a..58f38a26 100644 --- a/TODO.md +++ b/TODO.md @@ -1,4 +1,5 @@ - [ ] テスト設計 → [tickets/test-design.md](tickets/test-design.md) +- [ ] Anthropic プロンプトキャッシュの有効化 → [tickets/anthropic-prompt-cache.md](tickets/anthropic-prompt-cache.md) - [ ] ツール設計 - [ ] Bash ツール (Permission 層と統合) → [tickets/bash-tool.md](tickets/bash-tool.md) - [ ] Compact の改善(要約品質 + 挙動詳細) → [tickets/compact-improvements.md](tickets/compact-improvements.md) diff --git a/crates/llm-worker/src/llm_client/scheme/anthropic/request.rs b/crates/llm-worker/src/llm_client/scheme/anthropic/request.rs index 44ecd4f0..7a398b65 100644 --- a/crates/llm-worker/src/llm_client/scheme/anthropic/request.rs +++ b/crates/llm-worker/src/llm_client/scheme/anthropic/request.rs @@ -2,6 +2,8 @@ //! //! Converts Open Responses native Item model to Anthropic Messages API format. +use std::collections::BTreeSet; + use serde::Serialize; use crate::llm_client::{ @@ -47,25 +49,77 @@ pub(crate) enum AnthropicContent { Parts(Vec), } +/// `cache_control` marker attached to a content part, signalling that the +/// prefix up to and including this part should be cached. +#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)] +#[serde(tag = "type", rename_all = "snake_case")] +pub(crate) enum CacheControl { + Ephemeral, +} + /// Anthropic content part #[derive(Debug, Serialize)] #[serde(tag = "type")] pub(crate) enum AnthropicContentPart { #[serde(rename = "text")] - Text { text: String }, + Text { + text: String, + #[serde(skip_serializing_if = "Option::is_none")] + cache_control: Option, + }, #[serde(rename = "tool_use")] ToolUse { id: String, name: String, input: serde_json::Value, + #[serde(skip_serializing_if = "Option::is_none")] + cache_control: Option, }, #[serde(rename = "tool_result")] ToolResult { tool_use_id: String, content: String, + #[serde(skip_serializing_if = "Option::is_none")] + cache_control: Option, }, } +impl AnthropicContentPart { + fn text(text: String) -> Self { + Self::Text { + text, + cache_control: None, + } + } + + fn tool_use(id: String, name: String, input: serde_json::Value) -> Self { + Self::ToolUse { + id, + name, + input, + cache_control: None, + } + } + + fn tool_result(tool_use_id: String, content: String) -> Self { + Self::ToolResult { + tool_use_id, + content, + cache_control: None, + } + } + + fn set_cache_control(&mut self, cc: CacheControl) { + match self { + Self::Text { cache_control, .. } + | Self::ToolUse { cache_control, .. } + | Self::ToolResult { cache_control, .. } => { + *cache_control = Some(cc); + } + } + } +} + /// Anthropic tool definition #[derive(Debug, Serialize)] pub(crate) struct AnthropicTool { @@ -78,7 +132,8 @@ pub(crate) struct AnthropicTool { impl AnthropicScheme { /// Build Anthropic request from Request pub(crate) fn build_request(&self, model: &str, request: &Request) -> AnthropicRequest { - let messages = self.convert_items_to_messages(&request.items); + let breakpoints = compute_breakpoints(&request.items, request.cache_anchor); + let messages = self.convert_items_to_messages(&request.items, &breakpoints); let tools = request.tools.iter().map(|t| self.convert_tool(t)).collect(); AnthropicRequest { @@ -95,27 +150,37 @@ impl AnthropicScheme { } } - /// Convert Open Responses Items to Anthropic Messages + /// Convert Open Responses Items to Anthropic Messages and attach + /// `cache_control` markers at each breakpoint item's last content + /// part. /// /// Anthropic uses a message-based model where: /// - User messages have role "user" /// - Assistant messages have role "assistant" /// - Tool calls are content parts within assistant messages /// - Tool results are content parts within user messages - fn convert_items_to_messages(&self, items: &[Item]) -> Vec { + /// + /// Each non-`Message` item produces exactly one content part, so + /// "last part for the item" is always well-defined. For breakpoint + /// `Message` items the output is forced into the array form so a + /// marker has a part to attach to. + fn convert_items_to_messages( + &self, + items: &[Item], + breakpoints: &BTreeSet, + ) -> Vec { let mut messages = Vec::new(); - let mut pending_assistant_parts: Vec = Vec::new(); - let mut pending_user_parts: Vec = Vec::new(); + // Pending parts carry their origin item index so we can record + // (msg_idx, part_idx) when we flush them into a message. + let mut pending_assistant: Vec<(usize, AnthropicContentPart)> = Vec::new(); + let mut pending_user: Vec<(usize, AnthropicContentPart)> = Vec::new(); + let mut locations: Vec> = vec![None; items.len()]; - for item in items { + for (i, item) in items.iter().enumerate() { match item { Item::Message { role, content, .. } => { - // Flush pending parts before a new message - self.flush_pending_parts( - &mut messages, - &mut pending_assistant_parts, - &mut pending_user_parts, - ); + flush_pending(&mut messages, &mut pending_assistant, "assistant", &mut locations); + flush_pending(&mut messages, &mut pending_user, "user", &mut locations); let anthropic_role = match role { Role::User | Role::System => "user", @@ -126,32 +191,35 @@ impl AnthropicScheme { .iter() .map(|p| match p { ContentPart::Text { text } => { - AnthropicContentPart::Text { text: text.clone() } + AnthropicContentPart::text(text.clone()) + } + ContentPart::Refusal { refusal } => { + AnthropicContentPart::text(refusal.clone()) } - ContentPart::Refusal { refusal } => AnthropicContentPart::Text { - text: refusal.clone(), - }, }) .collect(); - if parts.len() == 1 { - if let AnthropicContentPart::Text { text } = &parts[0] { + let force_parts = breakpoints.contains(&i); + let msg_idx = messages.len(); + + // Preserve the single-text shorthand unless a + // breakpoint needs a concrete part to live on. + if parts.len() == 1 && !force_parts { + if let AnthropicContentPart::Text { text, .. } = &parts[0] { messages.push(AnthropicMessage { role: anthropic_role.to_string(), content: AnthropicContent::Text(text.clone()), }); - } else { - messages.push(AnthropicMessage { - role: anthropic_role.to_string(), - content: AnthropicContent::Parts(parts), - }); + continue; } - } else { - messages.push(AnthropicMessage { - role: anthropic_role.to_string(), - content: AnthropicContent::Parts(parts), - }); } + + let last_part_idx = parts.len().saturating_sub(1); + messages.push(AnthropicMessage { + role: anthropic_role.to_string(), + content: AnthropicContent::Parts(parts), + }); + locations[i] = Some((msg_idx, last_part_idx)); } Item::ToolCall { @@ -160,25 +228,15 @@ impl AnthropicScheme { arguments, .. } => { - // Flush pending user parts first - if !pending_user_parts.is_empty() { - messages.push(AnthropicMessage { - role: "user".to_string(), - content: AnthropicContent::Parts(std::mem::take( - &mut pending_user_parts, - )), - }); - } - - // Parse arguments JSON string to Value (defensive: normalize - // non-object / legacy "null" payloads to {} so Anthropic API accepts it) + flush_pending(&mut messages, &mut pending_user, "user", &mut locations); + // Parse arguments JSON string to Value (defensive: + // normalize non-object / legacy "null" payloads to + // `{}` so the Anthropic API accepts it). let input = parse_tool_arguments(arguments); - - pending_assistant_parts.push(AnthropicContentPart::ToolUse { - id: call_id.clone(), - name: name.clone(), - input, - }); + pending_assistant.push(( + i, + AnthropicContentPart::tool_use(call_id.clone(), name.clone(), input), + )); } Item::ToolResult { @@ -187,74 +245,44 @@ impl AnthropicScheme { content, .. } => { - // Flush pending assistant parts first - if !pending_assistant_parts.is_empty() { - messages.push(AnthropicMessage { - role: "assistant".to_string(), - content: AnthropicContent::Parts(std::mem::take( - &mut pending_assistant_parts, - )), - }); - } - + flush_pending(&mut messages, &mut pending_assistant, "assistant", &mut locations); let text = match content { Some(c) => format!("{summary}\n{c}"), None => summary.clone(), }; - pending_user_parts.push(AnthropicContentPart::ToolResult { - tool_use_id: call_id.clone(), - content: text, - }); + pending_user.push(( + i, + AnthropicContentPart::tool_result(call_id.clone(), text), + )); } Item::Reasoning { text, .. } => { - // Flush pending user parts first - if !pending_user_parts.is_empty() { - messages.push(AnthropicMessage { - role: "user".to_string(), - content: AnthropicContent::Parts(std::mem::take( - &mut pending_user_parts, - )), - }); - } - + flush_pending(&mut messages, &mut pending_user, "user", &mut locations); // Reasoning is treated as assistant text in Anthropic - // (actual thinking blocks are handled differently in streaming) - pending_assistant_parts.push(AnthropicContentPart::Text { text: text.clone() }); + // (actual thinking blocks are handled differently in streaming). + pending_assistant.push((i, AnthropicContentPart::text(text.clone()))); } } } - // Flush remaining pending parts - self.flush_pending_parts( - &mut messages, - &mut pending_assistant_parts, - &mut pending_user_parts, - ); + flush_pending(&mut messages, &mut pending_assistant, "assistant", &mut locations); + flush_pending(&mut messages, &mut pending_user, "user", &mut locations); + + // Apply cache_control markers at each breakpoint item's last part. + for &bp in breakpoints { + let Some((msg_idx, part_idx)) = locations.get(bp).copied().flatten() else { + continue; + }; + if let AnthropicContent::Parts(parts) = &mut messages[msg_idx].content { + if let Some(part) = parts.get_mut(part_idx) { + part.set_cache_control(CacheControl::Ephemeral); + } + } + } messages } - fn flush_pending_parts( - &self, - messages: &mut Vec, - pending_assistant_parts: &mut Vec, - pending_user_parts: &mut Vec, - ) { - if !pending_assistant_parts.is_empty() { - messages.push(AnthropicMessage { - role: "assistant".to_string(), - content: AnthropicContent::Parts(std::mem::take(pending_assistant_parts)), - }); - } - if !pending_user_parts.is_empty() { - messages.push(AnthropicMessage { - role: "user".to_string(), - content: AnthropicContent::Parts(std::mem::take(pending_user_parts)), - }); - } - } - fn convert_tool(&self, tool: &ToolDefinition) -> AnthropicTool { AnthropicTool { name: tool.name.clone(), @@ -264,6 +292,71 @@ impl AnthropicScheme { } } +/// Flush a pending parts buffer into a new message, recording the +/// emitted `(msg_idx, part_idx)` for each originating item. +fn flush_pending( + messages: &mut Vec, + pending: &mut Vec<(usize, AnthropicContentPart)>, + role: &str, + locations: &mut [Option<(usize, usize)>], +) { + if pending.is_empty() { + return; + } + let msg_idx = messages.len(); + let taken: Vec<(usize, AnthropicContentPart)> = std::mem::take(pending); + let mut parts = Vec::with_capacity(taken.len()); + for (part_idx, (origin, part)) in taken.into_iter().enumerate() { + locations[origin] = Some((msg_idx, part_idx)); + parts.push(part); + } + messages.push(AnthropicMessage { + role: role.to_string(), + content: AnthropicContent::Parts(parts), + }); +} + +/// Compute the set of item indices that should receive a cache_control +/// breakpoint: +/// +/// 1. `cache_anchor` (stable prefix boundary — typically a post-compact +/// summary at index 0). +/// 2. The item immediately preceding the most recent `Role::User` +/// message (i.e. the end of the previous turn). This is the rewind +/// boundary: if a user re-issues the turn from scratch, the cache up +/// to here remains valid. +/// 3. The final item (head of the outgoing request, for the next +/// request in the same turn to read from). +/// +/// Overlapping positions are collapsed to a single breakpoint. +fn compute_breakpoints(items: &[Item], cache_anchor: Option) -> BTreeSet { + let mut bps = BTreeSet::new(); + if items.is_empty() { + return bps; + } + if let Some(anchor) = cache_anchor { + if anchor < items.len() { + bps.insert(anchor); + } + } + let last_user = items.iter().rposition(|it| { + matches!( + it, + Item::Message { + role: Role::User, + .. + } + ) + }); + if let Some(i) = last_user { + if i > 0 { + bps.insert(i - 1); + } + } + bps.insert(items.len() - 1); + bps +} + #[cfg(test)] mod tests { use super::*; @@ -326,4 +419,213 @@ mod tests { assert_eq!(anthropic_req.messages[1].role, "assistant"); assert_eq!(anthropic_req.messages[2].role, "user"); } + + /// Pull out the `cache_control` field from a part regardless of variant. + fn part_cache_control(part: &AnthropicContentPart) -> Option { + match part { + AnthropicContentPart::Text { cache_control, .. } + | AnthropicContentPart::ToolUse { cache_control, .. } + | AnthropicContentPart::ToolResult { cache_control, .. } => *cache_control, + } + } + + /// All `(msg_idx, part_idx, cache_control)` triples whose `cache_control` + /// is set, in iteration order over the output. + fn breakpoint_positions(req: &AnthropicRequest) -> Vec<(usize, usize, CacheControl)> { + let mut out = Vec::new(); + for (mi, msg) in req.messages.iter().enumerate() { + if let AnthropicContent::Parts(parts) = &msg.content { + for (pi, part) in parts.iter().enumerate() { + if let Some(cc) = part_cache_control(part) { + out.push((mi, pi, cc)); + } + } + } + } + out + } + + /// Convenience: a turn that ends with one assistant text, one tool + /// call/result pair, and a final assistant text. Produced at + /// `history[head..]` indices shown alongside, so tests can reason + /// about breakpoint positions. + fn completed_turn() -> Vec { + vec![ + Item::user_message("hello"), // 0 + Item::assistant_message("hi"), // 1 + Item::tool_call("c1", "tool_a", "{}"), // 2 + Item::tool_result("c1", "ok"), // 3 + Item::assistant_message("done"), // 4 + ] + } + + #[test] + fn three_breakpoints_when_compact_plus_prior_turn() { + let scheme = AnthropicScheme::new(); + let mut items = vec![Item::system_message("[Compacted context summary]\n\nprior")]; + items.extend(completed_turn()); // now indices 1..=5 + items.push(Item::user_message("next turn")); + // anchor=0, last user idx=6 → turn_end=5, head=6. + let mut request = Request::new().items(items); + request.cache_anchor = Some(0); + + let req = scheme.build_request("claude-sonnet-4-20250514", &request); + let bps = breakpoint_positions(&req); + assert_eq!(bps.len(), 3, "expected 3 breakpoints, got {:?}", bps); + for (_, _, cc) in bps { + assert_eq!(cc, CacheControl::Ephemeral); + } + } + + #[test] + fn two_breakpoints_without_compaction() { + let scheme = AnthropicScheme::new(); + let mut items = completed_turn(); + items.push(Item::user_message("next turn")); // index 5 = latest user + // cache_anchor=None, turn_end=4, head=5. + let request = Request::new().items(items); + + let req = scheme.build_request("claude-sonnet-4-20250514", &request); + let bps = breakpoint_positions(&req); + assert_eq!(bps.len(), 2, "expected 2 breakpoints, got {:?}", bps); + } + + #[test] + fn single_breakpoint_when_only_first_user_message() { + let scheme = AnthropicScheme::new(); + let request = Request::new().user("first ever turn"); + // latest user at 0 → no turn_end; head=0; no anchor. Collapse → 1. + let req = scheme.build_request("claude-sonnet-4-20250514", &request); + let bps = breakpoint_positions(&req); + assert_eq!(bps.len(), 1, "expected 1 breakpoint, got {:?}", bps); + } + + #[test] + fn overlap_collapses_anchor_and_turn_end() { + // items = [compact_summary(0, Role::System), user(1)] + // anchor=0, latest user=1 → turn_end=0, head=1. Anchor∩turn_end at 0. + let scheme = AnthropicScheme::new(); + let mut request = Request::new().items(vec![ + Item::system_message("[Compacted context summary]\n\nprior"), + Item::user_message("fresh user"), + ]); + request.cache_anchor = Some(0); + + let req = scheme.build_request("claude-sonnet-4-20250514", &request); + let bps = breakpoint_positions(&req); + assert_eq!(bps.len(), 2, "expected collapse to 2, got {:?}", bps); + } + + #[test] + fn breakpoint_on_tool_result_head() { + // Mid-turn second call: items end with a tool_result. Head must + // land on the ToolResult part. + let scheme = AnthropicScheme::new(); + let request = Request::new() + .user("run it") + .item(Item::tool_call("c1", "t", "{}")) + .item(Item::tool_result("c1", "result")); + let req = scheme.build_request("claude-sonnet-4-20250514", &request); + let bps = breakpoint_positions(&req); + assert_eq!(bps.len(), 1); + let (mi, pi, _) = bps[0]; + let part = match &req.messages[mi].content { + AnthropicContent::Parts(parts) => &parts[pi], + _ => panic!("expected Parts for breakpoint-bearing message"), + }; + assert!( + matches!(part, AnthropicContentPart::ToolResult { .. }), + "expected ToolResult, got {:?}", + part, + ); + } + + #[test] + fn single_text_message_uses_text_shorthand_without_breakpoint() { + // Non-breakpoint single-text messages stay on the text shorthand + // so we don't bloat requests with wrapper arrays. Here the Head + // lands on items[1], leaving items[0] without a marker. + let scheme = AnthropicScheme::new(); + let request = Request::new() + .user("hello") + .assistant("hi there"); + let req = scheme.build_request("claude-sonnet-4-20250514", &request); + assert!( + matches!(req.messages[0].content, AnthropicContent::Text(_)), + "non-breakpoint single-text message should use text shorthand", + ); + } + + #[test] + fn single_text_message_is_forced_to_parts_when_breakpoint() { + // A breakpoint on a single-text message must be emitted in the + // array form so cache_control has a part to attach to. + let scheme = AnthropicScheme::new(); + let mut request = Request::new().user("hello"); + request.cache_anchor = Some(0); + let req = scheme.build_request("claude-sonnet-4-20250514", &request); + match &req.messages[0].content { + AnthropicContent::Parts(parts) => { + assert_eq!(parts.len(), 1); + assert_eq!( + part_cache_control(&parts[0]), + Some(CacheControl::Ephemeral) + ); + } + AnthropicContent::Text(_) => panic!("breakpoint item should use Parts form"), + } + } + + #[test] + fn serialized_json_shape_matches_anthropic_spec() { + // Single-sentence smoke test against the exact JSON key shape + // Anthropic expects for cache_control. + let scheme = AnthropicScheme::new(); + let mut request = Request::new().user("hello"); + request.cache_anchor = Some(0); + let req = scheme.build_request("claude-sonnet-4-20250514", &request); + let json = serde_json::to_value(&req).unwrap(); + let part = &json["messages"][0]["content"][0]; + assert_eq!(part["type"], "text"); + assert_eq!(part["text"], "hello"); + assert_eq!(part["cache_control"]["type"], "ephemeral"); + } + + #[test] + fn cache_anchor_out_of_range_is_ignored() { + // Defensive: if a caller passes a stale anchor beyond items.len(), + // we drop it silently rather than panicking. + let scheme = AnthropicScheme::new(); + let mut request = Request::new().user("one"); + request.cache_anchor = Some(99); + let req = scheme.build_request("claude-sonnet-4-20250514", &request); + // Only the Head breakpoint survives. + let bps = breakpoint_positions(&req); + assert_eq!(bps.len(), 1); + } + + #[test] + fn empty_items_produce_no_breakpoints() { + let scheme = AnthropicScheme::new(); + let req = scheme.build_request("claude-sonnet-4-20250514", &Request::new()); + assert!(req.messages.is_empty()); + assert!(breakpoint_positions(&req).is_empty()); + } + + #[test] + fn tool_definitions_carry_no_cache_control() { + // Tool JSON schema must serialise unchanged — no sneak-in of + // cache_control at the tools-array level. + let scheme = AnthropicScheme::new(); + let request = Request::new() + .user("hello") + .tool(ToolDefinition::new("noop").input_schema(serde_json::json!({ + "type": "object", + "properties": {} + }))); + let req = scheme.build_request("claude-sonnet-4-20250514", &request); + let json = serde_json::to_value(&req).unwrap(); + let tool = &json["tools"][0]; + assert!(tool.get("cache_control").is_none()); + } } diff --git a/crates/llm-worker/src/llm_client/types.rs b/crates/llm-worker/src/llm_client/types.rs index 495318a2..030380c5 100644 --- a/crates/llm-worker/src/llm_client/types.rs +++ b/crates/llm-worker/src/llm_client/types.rs @@ -419,6 +419,13 @@ pub struct Request { pub tools: Vec, /// Request configuration pub config: RequestConfig, + /// Index into `items` marking the end of a stable, cacheable prefix. + /// + /// Higher layers that know about durable prefix boundaries (e.g. a + /// post-compaction summary) set this so that caching-aware providers + /// (Anthropic today) can place a long-lived cache breakpoint there. + /// Providers without prompt caching ignore the field. + pub cache_anchor: Option, } impl Request { diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index 5cedf79a..e18397d6 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -178,6 +178,9 @@ pub struct Worker { /// by higher layers that own usage measurements. `None` disables /// the prune projection. savings_estimator: Option, + /// Index of the last stable cache prefix item, set by higher layers. + /// Plumbed into [`Request::cache_anchor`] at request build time. + cache_anchor: Option, /// State marker _state: PhantomData, } @@ -340,6 +343,18 @@ impl Worker { self.savings_estimator = estimator; } + /// Mark an index into the current history as a stable, cacheable + /// prefix boundary. The value is included in each outgoing + /// [`Request`] via [`Request::cache_anchor`] — caching-aware + /// providers (Anthropic) place a long-lived breakpoint there. + /// + /// Pass `None` to clear. Typically set by layers that compact the + /// conversation: after a compaction rebuilds history starting with a + /// summary item, the anchor is `Some(0)`. + pub fn set_cache_anchor(&mut self, anchor: Option) { + self.cache_anchor = anchor; + } + /// Get a mutable reference to the timeline (for additional handler registration) pub fn timeline_mut(&mut self) -> &mut Timeline { &mut self.timeline @@ -529,6 +544,13 @@ impl Worker { // Apply request configuration request = request.config(self.request_config.clone()); + // Attach the cache prefix anchor (may be narrower than `context` + // if the prune projection trimmed items from the head — keep it + // in range). + request.cache_anchor = self + .cache_anchor + .filter(|&anchor| anchor < context.len()); + request } @@ -1001,6 +1023,7 @@ impl Worker { tool_output_limits: None, prune_config: None, savings_estimator: None, + cache_anchor: None, _state: PhantomData, } } @@ -1255,6 +1278,7 @@ impl Worker { tool_output_limits: self.tool_output_limits, prune_config: self.prune_config, savings_estimator: self.savings_estimator, + cache_anchor: self.cache_anchor, _state: PhantomData, } } @@ -1328,6 +1352,7 @@ impl Worker { tool_output_limits: self.tool_output_limits, prune_config: self.prune_config, savings_estimator: self.savings_estimator, + cache_anchor: self.cache_anchor, _state: PhantomData, } } diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 5967ff22..49f562c9 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -189,10 +189,24 @@ impl Pod { if let Some(ref prompt) = state.system_prompt { worker.set_system_prompt(prompt); } + // A leading `Role::System` item can only come from `compact` + // (the Pod's one and only write path that prepends a summary at + // history[0]). Restoring the anchor lets Anthropic re-use a + // stable cache prefix for long-lived restored sessions. + let anchored_on_summary = matches!( + state.history.first(), + Some(Item::Message { + role: llm_worker::Role::System, + .. + }) + ); worker.set_history(state.history); worker.set_request_config(state.config); worker.set_turn_count(state.turn_count); worker.set_last_run_interrupted(state.last_run_interrupted); + if anchored_on_summary { + worker.set_cache_anchor(Some(0)); + } let mut pod = Self { manifest, @@ -853,7 +867,12 @@ impl Pod { // until its first LLM call. self.session_id = new_session_id; self.head_hash = Some(new_head_hash); - self.worker.as_mut().unwrap().set_history(new_history); + let worker = self.worker.as_mut().unwrap(); + worker.set_history(new_history); + // Anchor the prompt cache at the summary item so that Anthropic + // can place a durable `cache_control` breakpoint there — our + // compact layout guarantees history[0] is the summary. + worker.set_cache_anchor(Some(0)); self.usage_history .lock() .expect("usage_history poisoned") diff --git a/tickets/anthropic-prompt-cache.md b/tickets/anthropic-prompt-cache.md new file mode 100644 index 00000000..d51d38b2 --- /dev/null +++ b/tickets/anthropic-prompt-cache.md @@ -0,0 +1,117 @@ +# Anthropic プロンプトキャッシュの有効化 + +レビュー中: [anthropic-prompt-cache.review.md](anthropic-prompt-cache.review.md) + +## 背景 + +Anthropic プロバイダ経由のリクエストで prompt caching が一切機能していない。セッションログの `cache_read_tokens` / `cache_write_tokens` が常に 0、`AnthropicScheme::build_request` に `cache_control: ephemeral` の breakpoint が一つも入っていない。 + +結果: + +- 毎ターン system prompt + tools + 全履歴が full-price / full-token で再送信される +- 30k ITPM 帯の組織では、数ターン debug を回しただけで `rate_limit_error` (429) に到達する。実例:`ListPods` → `SpawnPod`(失敗)→ `Glob`(大量出力)→ `Read` のデバッグシーケンスで 429 +- 長期セッションのコストが本来の ~10 倍になっている(キャッシュなら cache read は通常 input の ~10%) + +Anthropic の自動キャッシュ(モデル・時期依存)の有無に関わらず、明示的 breakpoint は自分で制御できる確実な手段。 + +## 依存 + +- なし。`crates/llm-worker/src/llm_client/scheme/anthropic/request.rs` 単独 + +## 設計 + +### Breakpoint 戦略 + +Anthropic の breakpoint は「その位置までの prefix をキャッシュする」ので後方が前方を subsume する。前方 breakpoint は後方キャッシュが TTL で失効した時の fallback として機能する。 + +安定度の異なる **3 層**に置く: + +| 位置 | 前進するタイミング | 主な用途 | +|---|---|---| +| **Prune 位置** | compaction 走行時 | 超長期フォールバック | +| **最後のターン末** | ターン完了時 | 次ターン以降の read | +| **新規リクエスト Head** | 毎 LLM コール(= messages 末尾に追従) | 同ターン内の tool round での read | + +### 期待される挙動 + +**1 ターン内で M 回の tool round(agent loop):** + +- Call 1: 最後のターン末 = 前ターンの終わり → read、新規 Head = Call 1 の messages 末尾に creation +- Call 2: 新規 Head(前 Call の末尾)→ read、新しい Head を messages 末尾に creation +- ...(K 回目も同様) + +ターン全体の累積入力コスト: O(K²) → **O(K)** に改善。 + +**ターン N+1 開始時:** + +- 最後のターン末 = ターン N 最終状態 → read +- Head = N+1 最初の Call の末尾に creation + +**compaction 走行時:** + +- Prune 位置が前進、以降 read + +### TTL 耐性 + +5 分 TTL で最新が失効しても段階的に fallback: + +- 新規 Head 失効 → 最後のターン末 で read +- 最後のターン末 失効 → Prune 位置 で read +- Prune 位置 失効 → compaction 境界から re-create + +### 4 枠のうち 3 枠使用 + +残り 1 枠は将来の拡張用(tools 配列を別 TTL で管理したい場合など)に温存。 + +前方に system 単独 / tools 単独の breakpoint を打つ案は subsume されるだけで意味がないので採用しない。 + +### 実装方針 + +`AnthropicContentPart` に `cache_control` フィールドを追加。Anthropic の API 形式: + +```json +{ "type": "text", "text": "...", "cache_control": { "type": "ephemeral" } } +``` + +`Option` で持ち、値がある場合のみシリアライズ(`skip_serializing_if`)する。既存の非 Anthropic プロバイダ(OpenAI / Gemini / Ollama)には影響させない — Anthropic scheme 内部型のみ。 + +3 つの breakpoint 位置の決定: + +- **Prune 位置**: 既存 compaction 機構(`pod::compact_state` か Pod 内で管理される prune 済みインデックス)から取得。`build_request` 経路で Anthropic scheme に prune インデックスを渡す API 拡張が必要 +- **最後のターン末**: Pod / Worker 側が既にターン境界を記録しているならそれを使う。無ければ `messages` 逆走査で「直近 user メッセージの 1 つ前」を探す +- **新規リクエスト Head**: `messages.last()` に付ける(= 現行 request の末尾) + +3 箇所が重なる場合(例: 初回 request で Prune 位置・ターン末・Head が全部同じ item)は重複を除去して実質 1 breakpoint にする。 + +### 自動テスト + +- breakpoint が Prune 位置 / 最後のターン末 / 新規リクエスト Head の 3 箇所に付いたリクエスト JSON が生成されること +- Prune 位置が 0(compaction 未走行)のケースでは 2 箇所(ターン末 + Head)のみに付くこと +- ターン末と Head が重なる最終 request(= 新ターンの最初の Call)では 2 箇所に縮退すること +- 3 箇所が全て重なる初回 request では 1 箇所に縮退すること +- OpenAI / Gemini の request 生成が一切変わらないこと(Anthropic 専用だが回帰防止) + +## 影響範囲 + +- `crates/llm-worker/src/llm_client/scheme/anthropic/request.rs`: 内部型 + breakpoint 配置ロジック +- `crates/llm-worker/src/llm_client`: Prune 境界インデックスを `build_request` 経路で渡す API 拡張(`Request` に prune hint を足すか、別経路で scheme に伝える) +- `crates/pod/src/pod.rs` or `compact_state.rs`: 現在の prune 済み件数を読み出せるようにする(既に内部で管理されているはず、公開 API 化) +- 既存の serde round-trip テスト: 追加フィールドを skip_serializing_if で出さないので差分なし + +## 完了条件 + +- Anthropic リクエストの Prune 位置(compact 済みサマリ末尾)に `cache_control: ephemeral` が付く +- Anthropic リクエストの最後のターン末(直近 user メッセージの直前)に `cache_control: ephemeral` が付く +- Anthropic リクエストの新規リクエスト Head(messages 末尾)に `cache_control: ephemeral` が付く +- Prune 位置 / ターン末 / Head が重なる場合は重複除去される +- 実セッションで `cache_read_tokens` が 2 コール目以降に非 0 になる +- 特に同一ターン内の 2 回目以降の LLM コールで直前の tool_result 以前が cache read されること +- 既存の Anthropic / OpenAI / Gemini テストが全 pass +- cache_control が正しい位置に入ることを検証する新規ユニットテスト + +## 範囲外 + +- OpenAI / Gemini の prompt caching(各プロバイダの API 設計が違うため別チケット) +- 動的な breakpoint 数の調整(4 枠目を状況により使い分ける、など)。まずは固定 3 箇所 +- Cache hit 率の可観測化(TUI 表示など)。集計は `cache_read_tokens` として既に記録されるので、表示は別途 +- Rate limit 429 を受けた際の retry-after honor(別課題) diff --git a/tickets/anthropic-prompt-cache.review.md b/tickets/anthropic-prompt-cache.review.md new file mode 100644 index 00000000..bec466dd --- /dev/null +++ b/tickets/anthropic-prompt-cache.review.md @@ -0,0 +1,75 @@ +# Review: anthropic-prompt-cache + +実装は未コミット(staged + unstaged)。`cargo build` clean、`cargo test --workspace` 467 / 0 fail。 + +## 総評 + +チケット要件を忠実に反映。3 層 breakpoint(Prune 位置 / 最後のターン末 / 新規 Head)を `BTreeSet` で管理し重複除去、Anthropic scheme のみに局所化、OpenAI / Gemini は透過的に無視。エッジケース(empty / out-of-range / overlap / shorthand 保持)まで網羅。軽微な指摘のみで blocker なし。 + +## 完了条件の対応 + +| 要件 | 状態 | 根拠 | +|---|---|---| +| Prune 位置に `cache_control` | ✅ | `compute_breakpoints` anchor 分岐、`three_breakpoints_when_compact_plus_prior_turn` | +| 最後のターン末に `cache_control` | ✅ | `last_user - 1` 計算、同テスト | +| 新規 Head に `cache_control` | ✅ | `items.len() - 1`、`two_breakpoints_without_compaction` | +| 3 箇所重複の除去 | ✅ | `BTreeSet` で自動重複削除、`overlap_collapses_*` / `single_breakpoint_*` | +| 実セッションで cache_read 非 0 | ⚠️ | 結合で要確認(自動テスト範囲外、実運用で観測) | +| 既存テスト全 pass | ✅ | workspace 467 / 0 fail | +| 新規ユニットテスト | ✅ | 10 件(シリアライズ形状検証含む) | + +## 良い点 + +1. **層構造が素直に分離**: `Request::cache_anchor: Option` を追加、`Worker::set_cache_anchor` で透過的に伝播、Anthropic scheme のみが実際の breakpoint 配置を知る。OpenAI / Gemini は透過的に無視して回帰なし。 + +2. **`Pod::compact` / `Pod::from_state` の両方で anchor を復旧**: `from_state` は `history[0]` が `Role::System` かどうかで compact の痕跡を検出して anchor を張る。resume 時のキャッシュ継続性が確保されている。 + +3. **`force_parts` で breakpoint 位置だけ array 形式に落とす**: 通常は text shorthand を保ちつつ、breakpoint を付ける位置だけ array 化。リクエストサイズを最小化する配慮(`single_text_message_uses_text_shorthand_without_breakpoint` で検証)。 + +4. **シリアライズ形状の検証**: `serialized_json_shape_matches_anthropic_spec` で `{"type":"ephemeral"}` が正しい階層に出ることを明示的にテスト。Anthropic spec からずれたら即発見できる。 + +5. **エッジケース網羅**: 空 items / out-of-range anchor / 1 item のみ / tool_result が head になるケース / Parts 強制化 / tool 定義に cache_control が漏れないこと、が全部テスト。 + +6. **冪等な anchor 範囲チェック**: `request.cache_anchor = self.cache_anchor.filter(|&anchor| anchor < context.len())`。prune が先頭をトリムしても安全。 + +## 指摘と判断 + +### 軽微 + +#### 1. `anchor=0` に `compact` が暗黙依存 + +```rust +// pod.rs :872 +worker.set_history(new_history); +worker.set_cache_anchor(Some(0)); +``` + +`compact` が history[0] に summary を置く前提で `Some(0)` をハードコード。将来 compact のレイアウトを変えたら breaker になる。コメントで invariant を明記してあるので read 可能だが、`compact_state` 側に「summary の位置」を返す API を置いて受け渡す方が健全。 + +**判断**: 対応は任意。現状は compact の設計上 history[0] が summary であることが不変、from_state 側の検出ロジックも Role::System を見ているので、レイアウトを変えるなら両方を直すことになる。今回は放置で OK。 + +#### 2. `Request::cache_anchor` が raw フィールド代入 + +既存の `Request` builder は fluent(`Request::new().user(...).item(...)`)だが、`cache_anchor` のみ `request.cache_anchor = Some(0)` の直代入。テスト内で何度も使われていて少し浮く。`fn cache_anchor(self, anchor: Option) -> Self` を足して fluent を揃える方が一貫性が良い。 + +**判断**: スタイルの範囲。機能に影響なし、後で揃えても良い。本チケットでは見送り可。 + +#### 3. 「直近 user msg の 1 つ前」が tool_call になりうるケース + +前ターンが interrupted で最後が `tool_call`(tool_result 未応答)だった場合、turn_end が tool_call を指す。Anthropic 的には tool_use にも cache_control を付けられるので wire 上は OK。意味論的に「ターン末」と呼ぶには微妙だが、キャッシュ位置としての機能は成立(その prefix が安定ならキャッシュが効く)ので実害なし。 + +**判断**: 仕様上許容。注記不要。 + +#### 4. Tools array 自体にキャッシュマーカーを付けない選択 + +ticket で「tools 別 TTL 管理は将来」と明記されており、今は意図的に off。tools は message-level breakpoint の prefix に含まれるので、実質的に system + tools + history のまとまりで cache される。問題なし。 + +**判断**: ticket 通り。 + +## 完了に向けた作業 + +- 必須修正なし +- 指摘 1〜2 は style の範囲、本チケットでは対応不要 +- 実セッションで cache_read_tokens が 2 コール目以降に非 0 になるかは実運用で確認(LLM への実アクセスが必要、CI の範囲外) + +**完了 OK**。