Anthropicのキャッシュポイントを打つ実装

This commit is contained in:
Keisuke Hirata 2026-04-19 11:57:55 +09:00
parent 68885a03d8
commit 663ec91b45
7 changed files with 646 additions and 100 deletions

View File

@ -1,4 +1,5 @@
- [ ] テスト設計 → [tickets/test-design.md](tickets/test-design.md)
- [ ] Anthropic プロンプトキャッシュの有効化 → [tickets/anthropic-prompt-cache.md](tickets/anthropic-prompt-cache.md)
- [ ] ツール設計
- [ ] Bash ツール (Permission 層と統合) → [tickets/bash-tool.md](tickets/bash-tool.md)
- [ ] Compact の改善(要約品質 + 挙動詳細) → [tickets/compact-improvements.md](tickets/compact-improvements.md)

View File

@ -2,6 +2,8 @@
//!
//! Converts Open Responses native Item model to Anthropic Messages API format.
use std::collections::BTreeSet;
use serde::Serialize;
use crate::llm_client::{
@ -47,25 +49,77 @@ pub(crate) enum AnthropicContent {
Parts(Vec<AnthropicContentPart>),
}
/// `cache_control` marker attached to a content part, signalling that the
/// prefix up to and including this part should be cached.
#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub(crate) enum CacheControl {
Ephemeral,
}
/// Anthropic content part
#[derive(Debug, Serialize)]
#[serde(tag = "type")]
pub(crate) enum AnthropicContentPart {
#[serde(rename = "text")]
Text { text: String },
Text {
text: String,
#[serde(skip_serializing_if = "Option::is_none")]
cache_control: Option<CacheControl>,
},
#[serde(rename = "tool_use")]
ToolUse {
id: String,
name: String,
input: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
cache_control: Option<CacheControl>,
},
#[serde(rename = "tool_result")]
ToolResult {
tool_use_id: String,
content: String,
#[serde(skip_serializing_if = "Option::is_none")]
cache_control: Option<CacheControl>,
},
}
impl AnthropicContentPart {
fn text(text: String) -> Self {
Self::Text {
text,
cache_control: None,
}
}
fn tool_use(id: String, name: String, input: serde_json::Value) -> Self {
Self::ToolUse {
id,
name,
input,
cache_control: None,
}
}
fn tool_result(tool_use_id: String, content: String) -> Self {
Self::ToolResult {
tool_use_id,
content,
cache_control: None,
}
}
fn set_cache_control(&mut self, cc: CacheControl) {
match self {
Self::Text { cache_control, .. }
| Self::ToolUse { cache_control, .. }
| Self::ToolResult { cache_control, .. } => {
*cache_control = Some(cc);
}
}
}
}
/// Anthropic tool definition
#[derive(Debug, Serialize)]
pub(crate) struct AnthropicTool {
@ -78,7 +132,8 @@ pub(crate) struct AnthropicTool {
impl AnthropicScheme {
/// Build Anthropic request from Request
pub(crate) fn build_request(&self, model: &str, request: &Request) -> AnthropicRequest {
let messages = self.convert_items_to_messages(&request.items);
let breakpoints = compute_breakpoints(&request.items, request.cache_anchor);
let messages = self.convert_items_to_messages(&request.items, &breakpoints);
let tools = request.tools.iter().map(|t| self.convert_tool(t)).collect();
AnthropicRequest {
@ -95,27 +150,37 @@ impl AnthropicScheme {
}
}
/// Convert Open Responses Items to Anthropic Messages
/// Convert Open Responses Items to Anthropic Messages and attach
/// `cache_control` markers at each breakpoint item's last content
/// part.
///
/// Anthropic uses a message-based model where:
/// - User messages have role "user"
/// - Assistant messages have role "assistant"
/// - Tool calls are content parts within assistant messages
/// - Tool results are content parts within user messages
fn convert_items_to_messages(&self, items: &[Item]) -> Vec<AnthropicMessage> {
///
/// Each non-`Message` item produces exactly one content part, so
/// "last part for the item" is always well-defined. For breakpoint
/// `Message` items the output is forced into the array form so a
/// marker has a part to attach to.
fn convert_items_to_messages(
&self,
items: &[Item],
breakpoints: &BTreeSet<usize>,
) -> Vec<AnthropicMessage> {
let mut messages = Vec::new();
let mut pending_assistant_parts: Vec<AnthropicContentPart> = Vec::new();
let mut pending_user_parts: Vec<AnthropicContentPart> = Vec::new();
// Pending parts carry their origin item index so we can record
// (msg_idx, part_idx) when we flush them into a message.
let mut pending_assistant: Vec<(usize, AnthropicContentPart)> = Vec::new();
let mut pending_user: Vec<(usize, AnthropicContentPart)> = Vec::new();
let mut locations: Vec<Option<(usize, usize)>> = vec![None; items.len()];
for item in items {
for (i, item) in items.iter().enumerate() {
match item {
Item::Message { role, content, .. } => {
// Flush pending parts before a new message
self.flush_pending_parts(
&mut messages,
&mut pending_assistant_parts,
&mut pending_user_parts,
);
flush_pending(&mut messages, &mut pending_assistant, "assistant", &mut locations);
flush_pending(&mut messages, &mut pending_user, "user", &mut locations);
let anthropic_role = match role {
Role::User | Role::System => "user",
@ -126,32 +191,35 @@ impl AnthropicScheme {
.iter()
.map(|p| match p {
ContentPart::Text { text } => {
AnthropicContentPart::Text { text: text.clone() }
AnthropicContentPart::text(text.clone())
}
ContentPart::Refusal { refusal } => {
AnthropicContentPart::text(refusal.clone())
}
ContentPart::Refusal { refusal } => AnthropicContentPart::Text {
text: refusal.clone(),
},
})
.collect();
if parts.len() == 1 {
if let AnthropicContentPart::Text { text } = &parts[0] {
let force_parts = breakpoints.contains(&i);
let msg_idx = messages.len();
// Preserve the single-text shorthand unless a
// breakpoint needs a concrete part to live on.
if parts.len() == 1 && !force_parts {
if let AnthropicContentPart::Text { text, .. } = &parts[0] {
messages.push(AnthropicMessage {
role: anthropic_role.to_string(),
content: AnthropicContent::Text(text.clone()),
});
} else {
messages.push(AnthropicMessage {
role: anthropic_role.to_string(),
content: AnthropicContent::Parts(parts),
});
continue;
}
} else {
messages.push(AnthropicMessage {
role: anthropic_role.to_string(),
content: AnthropicContent::Parts(parts),
});
}
let last_part_idx = parts.len().saturating_sub(1);
messages.push(AnthropicMessage {
role: anthropic_role.to_string(),
content: AnthropicContent::Parts(parts),
});
locations[i] = Some((msg_idx, last_part_idx));
}
Item::ToolCall {
@ -160,25 +228,15 @@ impl AnthropicScheme {
arguments,
..
} => {
// Flush pending user parts first
if !pending_user_parts.is_empty() {
messages.push(AnthropicMessage {
role: "user".to_string(),
content: AnthropicContent::Parts(std::mem::take(
&mut pending_user_parts,
)),
});
}
// Parse arguments JSON string to Value (defensive: normalize
// non-object / legacy "null" payloads to {} so Anthropic API accepts it)
flush_pending(&mut messages, &mut pending_user, "user", &mut locations);
// Parse arguments JSON string to Value (defensive:
// normalize non-object / legacy "null" payloads to
// `{}` so the Anthropic API accepts it).
let input = parse_tool_arguments(arguments);
pending_assistant_parts.push(AnthropicContentPart::ToolUse {
id: call_id.clone(),
name: name.clone(),
input,
});
pending_assistant.push((
i,
AnthropicContentPart::tool_use(call_id.clone(), name.clone(), input),
));
}
Item::ToolResult {
@ -187,74 +245,44 @@ impl AnthropicScheme {
content,
..
} => {
// Flush pending assistant parts first
if !pending_assistant_parts.is_empty() {
messages.push(AnthropicMessage {
role: "assistant".to_string(),
content: AnthropicContent::Parts(std::mem::take(
&mut pending_assistant_parts,
)),
});
}
flush_pending(&mut messages, &mut pending_assistant, "assistant", &mut locations);
let text = match content {
Some(c) => format!("{summary}\n{c}"),
None => summary.clone(),
};
pending_user_parts.push(AnthropicContentPart::ToolResult {
tool_use_id: call_id.clone(),
content: text,
});
pending_user.push((
i,
AnthropicContentPart::tool_result(call_id.clone(), text),
));
}
Item::Reasoning { text, .. } => {
// Flush pending user parts first
if !pending_user_parts.is_empty() {
messages.push(AnthropicMessage {
role: "user".to_string(),
content: AnthropicContent::Parts(std::mem::take(
&mut pending_user_parts,
)),
});
}
flush_pending(&mut messages, &mut pending_user, "user", &mut locations);
// Reasoning is treated as assistant text in Anthropic
// (actual thinking blocks are handled differently in streaming)
pending_assistant_parts.push(AnthropicContentPart::Text { text: text.clone() });
// (actual thinking blocks are handled differently in streaming).
pending_assistant.push((i, AnthropicContentPart::text(text.clone())));
}
}
}
// Flush remaining pending parts
self.flush_pending_parts(
&mut messages,
&mut pending_assistant_parts,
&mut pending_user_parts,
);
flush_pending(&mut messages, &mut pending_assistant, "assistant", &mut locations);
flush_pending(&mut messages, &mut pending_user, "user", &mut locations);
// Apply cache_control markers at each breakpoint item's last part.
for &bp in breakpoints {
let Some((msg_idx, part_idx)) = locations.get(bp).copied().flatten() else {
continue;
};
if let AnthropicContent::Parts(parts) = &mut messages[msg_idx].content {
if let Some(part) = parts.get_mut(part_idx) {
part.set_cache_control(CacheControl::Ephemeral);
}
}
}
messages
}
fn flush_pending_parts(
&self,
messages: &mut Vec<AnthropicMessage>,
pending_assistant_parts: &mut Vec<AnthropicContentPart>,
pending_user_parts: &mut Vec<AnthropicContentPart>,
) {
if !pending_assistant_parts.is_empty() {
messages.push(AnthropicMessage {
role: "assistant".to_string(),
content: AnthropicContent::Parts(std::mem::take(pending_assistant_parts)),
});
}
if !pending_user_parts.is_empty() {
messages.push(AnthropicMessage {
role: "user".to_string(),
content: AnthropicContent::Parts(std::mem::take(pending_user_parts)),
});
}
}
fn convert_tool(&self, tool: &ToolDefinition) -> AnthropicTool {
AnthropicTool {
name: tool.name.clone(),
@ -264,6 +292,71 @@ impl AnthropicScheme {
}
}
/// Flush a pending parts buffer into a new message, recording the
/// emitted `(msg_idx, part_idx)` for each originating item.
fn flush_pending(
messages: &mut Vec<AnthropicMessage>,
pending: &mut Vec<(usize, AnthropicContentPart)>,
role: &str,
locations: &mut [Option<(usize, usize)>],
) {
if pending.is_empty() {
return;
}
let msg_idx = messages.len();
let taken: Vec<(usize, AnthropicContentPart)> = std::mem::take(pending);
let mut parts = Vec::with_capacity(taken.len());
for (part_idx, (origin, part)) in taken.into_iter().enumerate() {
locations[origin] = Some((msg_idx, part_idx));
parts.push(part);
}
messages.push(AnthropicMessage {
role: role.to_string(),
content: AnthropicContent::Parts(parts),
});
}
/// Compute the set of item indices that should receive a cache_control
/// breakpoint:
///
/// 1. `cache_anchor` (stable prefix boundary — typically a post-compact
/// summary at index 0).
/// 2. The item immediately preceding the most recent `Role::User`
/// message (i.e. the end of the previous turn). This is the rewind
/// boundary: if a user re-issues the turn from scratch, the cache up
/// to here remains valid.
/// 3. The final item (head of the outgoing request, for the next
/// request in the same turn to read from).
///
/// Overlapping positions are collapsed to a single breakpoint.
fn compute_breakpoints(items: &[Item], cache_anchor: Option<usize>) -> BTreeSet<usize> {
let mut bps = BTreeSet::new();
if items.is_empty() {
return bps;
}
if let Some(anchor) = cache_anchor {
if anchor < items.len() {
bps.insert(anchor);
}
}
let last_user = items.iter().rposition(|it| {
matches!(
it,
Item::Message {
role: Role::User,
..
}
)
});
if let Some(i) = last_user {
if i > 0 {
bps.insert(i - 1);
}
}
bps.insert(items.len() - 1);
bps
}
#[cfg(test)]
mod tests {
use super::*;
@ -326,4 +419,213 @@ mod tests {
assert_eq!(anthropic_req.messages[1].role, "assistant");
assert_eq!(anthropic_req.messages[2].role, "user");
}
/// Pull out the `cache_control` field from a part regardless of variant.
fn part_cache_control(part: &AnthropicContentPart) -> Option<CacheControl> {
match part {
AnthropicContentPart::Text { cache_control, .. }
| AnthropicContentPart::ToolUse { cache_control, .. }
| AnthropicContentPart::ToolResult { cache_control, .. } => *cache_control,
}
}
/// All `(msg_idx, part_idx, cache_control)` triples whose `cache_control`
/// is set, in iteration order over the output.
fn breakpoint_positions(req: &AnthropicRequest) -> Vec<(usize, usize, CacheControl)> {
let mut out = Vec::new();
for (mi, msg) in req.messages.iter().enumerate() {
if let AnthropicContent::Parts(parts) = &msg.content {
for (pi, part) in parts.iter().enumerate() {
if let Some(cc) = part_cache_control(part) {
out.push((mi, pi, cc));
}
}
}
}
out
}
/// Convenience: a turn that ends with one assistant text, one tool
/// call/result pair, and a final assistant text. Produced at
/// `history[head..]` indices shown alongside, so tests can reason
/// about breakpoint positions.
fn completed_turn() -> Vec<Item> {
vec![
Item::user_message("hello"), // 0
Item::assistant_message("hi"), // 1
Item::tool_call("c1", "tool_a", "{}"), // 2
Item::tool_result("c1", "ok"), // 3
Item::assistant_message("done"), // 4
]
}
#[test]
fn three_breakpoints_when_compact_plus_prior_turn() {
let scheme = AnthropicScheme::new();
let mut items = vec![Item::system_message("[Compacted context summary]\n\nprior")];
items.extend(completed_turn()); // now indices 1..=5
items.push(Item::user_message("next turn"));
// anchor=0, last user idx=6 → turn_end=5, head=6.
let mut request = Request::new().items(items);
request.cache_anchor = Some(0);
let req = scheme.build_request("claude-sonnet-4-20250514", &request);
let bps = breakpoint_positions(&req);
assert_eq!(bps.len(), 3, "expected 3 breakpoints, got {:?}", bps);
for (_, _, cc) in bps {
assert_eq!(cc, CacheControl::Ephemeral);
}
}
#[test]
fn two_breakpoints_without_compaction() {
let scheme = AnthropicScheme::new();
let mut items = completed_turn();
items.push(Item::user_message("next turn")); // index 5 = latest user
// cache_anchor=None, turn_end=4, head=5.
let request = Request::new().items(items);
let req = scheme.build_request("claude-sonnet-4-20250514", &request);
let bps = breakpoint_positions(&req);
assert_eq!(bps.len(), 2, "expected 2 breakpoints, got {:?}", bps);
}
#[test]
fn single_breakpoint_when_only_first_user_message() {
let scheme = AnthropicScheme::new();
let request = Request::new().user("first ever turn");
// latest user at 0 → no turn_end; head=0; no anchor. Collapse → 1.
let req = scheme.build_request("claude-sonnet-4-20250514", &request);
let bps = breakpoint_positions(&req);
assert_eq!(bps.len(), 1, "expected 1 breakpoint, got {:?}", bps);
}
#[test]
fn overlap_collapses_anchor_and_turn_end() {
// items = [compact_summary(0, Role::System), user(1)]
// anchor=0, latest user=1 → turn_end=0, head=1. Anchor∩turn_end at 0.
let scheme = AnthropicScheme::new();
let mut request = Request::new().items(vec![
Item::system_message("[Compacted context summary]\n\nprior"),
Item::user_message("fresh user"),
]);
request.cache_anchor = Some(0);
let req = scheme.build_request("claude-sonnet-4-20250514", &request);
let bps = breakpoint_positions(&req);
assert_eq!(bps.len(), 2, "expected collapse to 2, got {:?}", bps);
}
#[test]
fn breakpoint_on_tool_result_head() {
// Mid-turn second call: items end with a tool_result. Head must
// land on the ToolResult part.
let scheme = AnthropicScheme::new();
let request = Request::new()
.user("run it")
.item(Item::tool_call("c1", "t", "{}"))
.item(Item::tool_result("c1", "result"));
let req = scheme.build_request("claude-sonnet-4-20250514", &request);
let bps = breakpoint_positions(&req);
assert_eq!(bps.len(), 1);
let (mi, pi, _) = bps[0];
let part = match &req.messages[mi].content {
AnthropicContent::Parts(parts) => &parts[pi],
_ => panic!("expected Parts for breakpoint-bearing message"),
};
assert!(
matches!(part, AnthropicContentPart::ToolResult { .. }),
"expected ToolResult, got {:?}",
part,
);
}
#[test]
fn single_text_message_uses_text_shorthand_without_breakpoint() {
// Non-breakpoint single-text messages stay on the text shorthand
// so we don't bloat requests with wrapper arrays. Here the Head
// lands on items[1], leaving items[0] without a marker.
let scheme = AnthropicScheme::new();
let request = Request::new()
.user("hello")
.assistant("hi there");
let req = scheme.build_request("claude-sonnet-4-20250514", &request);
assert!(
matches!(req.messages[0].content, AnthropicContent::Text(_)),
"non-breakpoint single-text message should use text shorthand",
);
}
#[test]
fn single_text_message_is_forced_to_parts_when_breakpoint() {
// A breakpoint on a single-text message must be emitted in the
// array form so cache_control has a part to attach to.
let scheme = AnthropicScheme::new();
let mut request = Request::new().user("hello");
request.cache_anchor = Some(0);
let req = scheme.build_request("claude-sonnet-4-20250514", &request);
match &req.messages[0].content {
AnthropicContent::Parts(parts) => {
assert_eq!(parts.len(), 1);
assert_eq!(
part_cache_control(&parts[0]),
Some(CacheControl::Ephemeral)
);
}
AnthropicContent::Text(_) => panic!("breakpoint item should use Parts form"),
}
}
#[test]
fn serialized_json_shape_matches_anthropic_spec() {
// Single-sentence smoke test against the exact JSON key shape
// Anthropic expects for cache_control.
let scheme = AnthropicScheme::new();
let mut request = Request::new().user("hello");
request.cache_anchor = Some(0);
let req = scheme.build_request("claude-sonnet-4-20250514", &request);
let json = serde_json::to_value(&req).unwrap();
let part = &json["messages"][0]["content"][0];
assert_eq!(part["type"], "text");
assert_eq!(part["text"], "hello");
assert_eq!(part["cache_control"]["type"], "ephemeral");
}
#[test]
fn cache_anchor_out_of_range_is_ignored() {
// Defensive: if a caller passes a stale anchor beyond items.len(),
// we drop it silently rather than panicking.
let scheme = AnthropicScheme::new();
let mut request = Request::new().user("one");
request.cache_anchor = Some(99);
let req = scheme.build_request("claude-sonnet-4-20250514", &request);
// Only the Head breakpoint survives.
let bps = breakpoint_positions(&req);
assert_eq!(bps.len(), 1);
}
#[test]
fn empty_items_produce_no_breakpoints() {
let scheme = AnthropicScheme::new();
let req = scheme.build_request("claude-sonnet-4-20250514", &Request::new());
assert!(req.messages.is_empty());
assert!(breakpoint_positions(&req).is_empty());
}
#[test]
fn tool_definitions_carry_no_cache_control() {
// Tool JSON schema must serialise unchanged — no sneak-in of
// cache_control at the tools-array level.
let scheme = AnthropicScheme::new();
let request = Request::new()
.user("hello")
.tool(ToolDefinition::new("noop").input_schema(serde_json::json!({
"type": "object",
"properties": {}
})));
let req = scheme.build_request("claude-sonnet-4-20250514", &request);
let json = serde_json::to_value(&req).unwrap();
let tool = &json["tools"][0];
assert!(tool.get("cache_control").is_none());
}
}

View File

@ -419,6 +419,13 @@ pub struct Request {
pub tools: Vec<ToolDefinition>,
/// Request configuration
pub config: RequestConfig,
/// Index into `items` marking the end of a stable, cacheable prefix.
///
/// Higher layers that know about durable prefix boundaries (e.g. a
/// post-compaction summary) set this so that caching-aware providers
/// (Anthropic today) can place a long-lived cache breakpoint there.
/// Providers without prompt caching ignore the field.
pub cache_anchor: Option<usize>,
}
impl Request {

View File

@ -178,6 +178,9 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
/// by higher layers that own usage measurements. `None` disables
/// the prune projection.
savings_estimator: Option<crate::prune::SavingsEstimator>,
/// Index of the last stable cache prefix item, set by higher layers.
/// Plumbed into [`Request::cache_anchor`] at request build time.
cache_anchor: Option<usize>,
/// State marker
_state: PhantomData<S>,
}
@ -340,6 +343,18 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
self.savings_estimator = estimator;
}
/// Mark an index into the current history as a stable, cacheable
/// prefix boundary. The value is included in each outgoing
/// [`Request`] via [`Request::cache_anchor`] — caching-aware
/// providers (Anthropic) place a long-lived breakpoint there.
///
/// Pass `None` to clear. Typically set by layers that compact the
/// conversation: after a compaction rebuilds history starting with a
/// summary item, the anchor is `Some(0)`.
pub fn set_cache_anchor(&mut self, anchor: Option<usize>) {
self.cache_anchor = anchor;
}
/// Get a mutable reference to the timeline (for additional handler registration)
pub fn timeline_mut(&mut self) -> &mut Timeline {
&mut self.timeline
@ -529,6 +544,13 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
// Apply request configuration
request = request.config(self.request_config.clone());
// Attach the cache prefix anchor (may be narrower than `context`
// if the prune projection trimmed items from the head — keep it
// in range).
request.cache_anchor = self
.cache_anchor
.filter(|&anchor| anchor < context.len());
request
}
@ -1001,6 +1023,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
tool_output_limits: None,
prune_config: None,
savings_estimator: None,
cache_anchor: None,
_state: PhantomData,
}
}
@ -1255,6 +1278,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
tool_output_limits: self.tool_output_limits,
prune_config: self.prune_config,
savings_estimator: self.savings_estimator,
cache_anchor: self.cache_anchor,
_state: PhantomData,
}
}
@ -1328,6 +1352,7 @@ impl<C: LlmClient> Worker<C, Locked> {
tool_output_limits: self.tool_output_limits,
prune_config: self.prune_config,
savings_estimator: self.savings_estimator,
cache_anchor: self.cache_anchor,
_state: PhantomData,
}
}

View File

@ -189,10 +189,24 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
if let Some(ref prompt) = state.system_prompt {
worker.set_system_prompt(prompt);
}
// A leading `Role::System` item can only come from `compact`
// (the Pod's one and only write path that prepends a summary at
// history[0]). Restoring the anchor lets Anthropic re-use a
// stable cache prefix for long-lived restored sessions.
let anchored_on_summary = matches!(
state.history.first(),
Some(Item::Message {
role: llm_worker::Role::System,
..
})
);
worker.set_history(state.history);
worker.set_request_config(state.config);
worker.set_turn_count(state.turn_count);
worker.set_last_run_interrupted(state.last_run_interrupted);
if anchored_on_summary {
worker.set_cache_anchor(Some(0));
}
let mut pod = Self {
manifest,
@ -853,7 +867,12 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
// until its first LLM call.
self.session_id = new_session_id;
self.head_hash = Some(new_head_hash);
self.worker.as_mut().unwrap().set_history(new_history);
let worker = self.worker.as_mut().unwrap();
worker.set_history(new_history);
// Anchor the prompt cache at the summary item so that Anthropic
// can place a durable `cache_control` breakpoint there — our
// compact layout guarantees history[0] is the summary.
worker.set_cache_anchor(Some(0));
self.usage_history
.lock()
.expect("usage_history poisoned")

View File

@ -0,0 +1,117 @@
# Anthropic プロンプトキャッシュの有効化
レビュー中: [anthropic-prompt-cache.review.md](anthropic-prompt-cache.review.md)
## 背景
Anthropic プロバイダ経由のリクエストで prompt caching が一切機能していない。セッションログの `cache_read_tokens` / `cache_write_tokens` が常に 0、`AnthropicScheme::build_request` に `cache_control: ephemeral` の breakpoint が一つも入っていない。
結果:
- 毎ターン system prompt + tools + 全履歴が full-price / full-token で再送信される
- 30k ITPM 帯の組織では、数ターン debug を回しただけで `rate_limit_error` (429) に到達する。実例:`ListPods` → `SpawnPod`(失敗)→ `Glob`(大量出力)→ `Read` のデバッグシーケンスで 429
- 長期セッションのコストが本来の ~10 倍になっている(キャッシュなら cache read は通常 input の ~10%
Anthropic の自動キャッシュ(モデル・時期依存)の有無に関わらず、明示的 breakpoint は自分で制御できる確実な手段。
## 依存
- なし。`crates/llm-worker/src/llm_client/scheme/anthropic/request.rs` 単独
## 設計
### Breakpoint 戦略
Anthropic の breakpoint は「その位置までの prefix をキャッシュする」ので後方が前方を subsume する。前方 breakpoint は後方キャッシュが TTL で失効した時の fallback として機能する。
安定度の異なる **3 層**に置く:
| 位置 | 前進するタイミング | 主な用途 |
|---|---|---|
| **Prune 位置** | compaction 走行時 | 超長期フォールバック |
| **最後のターン末** | ターン完了時 | 次ターン以降の read |
| **新規リクエスト Head** | 毎 LLM コール(= messages 末尾に追従) | 同ターン内の tool round での read |
### 期待される挙動
**1 ターン内で M 回の tool roundagent loop:**
- Call 1: 最後のターン末 = 前ターンの終わり → read、新規 Head = Call 1 の messages 末尾に creation
- Call 2: 新規 Head前 Call の末尾)→ read、新しい Head を messages 末尾に creation
- ...K 回目も同様)
ターン全体の累積入力コスト: O(K²) → **O(K)** に改善。
**ターン N+1 開始時:**
- 最後のターン末 = ターン N 最終状態 → read
- Head = N+1 最初の Call の末尾に creation
**compaction 走行時:**
- Prune 位置が前進、以降 read
### TTL 耐性
5 分 TTL で最新が失効しても段階的に fallback
- 新規 Head 失効 → 最後のターン末 で read
- 最後のターン末 失効 → Prune 位置 で read
- Prune 位置 失効 → compaction 境界から re-create
### 4 枠のうち 3 枠使用
残り 1 枠は将来の拡張用tools 配列を別 TTL で管理したい場合など)に温存。
前方に system 単独 / tools 単独の breakpoint を打つ案は subsume されるだけで意味がないので採用しない。
### 実装方針
`AnthropicContentPart``cache_control` フィールドを追加。Anthropic の API 形式:
```json
{ "type": "text", "text": "...", "cache_control": { "type": "ephemeral" } }
```
`Option<CacheControl>` で持ち、値がある場合のみシリアライズ(`skip_serializing_if`)する。既存の非 Anthropic プロバイダOpenAI / Gemini / Ollamaには影響させない — Anthropic scheme 内部型のみ。
3 つの breakpoint 位置の決定:
- **Prune 位置**: 既存 compaction 機構(`pod::compact_state` か Pod 内で管理される prune 済みインデックス)から取得。`build_request` 経路で Anthropic scheme に prune インデックスを渡す API 拡張が必要
- **最後のターン末**: Pod / Worker 側が既にターン境界を記録しているならそれを使う。無ければ `messages` 逆走査で「直近 user メッセージの 1 つ前」を探す
- **新規リクエスト Head**: `messages.last()` に付ける(= 現行 request の末尾)
3 箇所が重なる場合(例: 初回 request で Prune 位置・ターン末・Head が全部同じ itemは重複を除去して実質 1 breakpoint にする。
### 自動テスト
- breakpoint が Prune 位置 / 最後のターン末 / 新規リクエスト Head の 3 箇所に付いたリクエスト JSON が生成されること
- Prune 位置が 0compaction 未走行)のケースでは 2 箇所(ターン末 + Headのみに付くこと
- ターン末と Head が重なる最終 request= 新ターンの最初の Callでは 2 箇所に縮退すること
- 3 箇所が全て重なる初回 request では 1 箇所に縮退すること
- OpenAI / Gemini の request 生成が一切変わらないことAnthropic 専用だが回帰防止)
## 影響範囲
- `crates/llm-worker/src/llm_client/scheme/anthropic/request.rs`: 内部型 + breakpoint 配置ロジック
- `crates/llm-worker/src/llm_client`: Prune 境界インデックスを `build_request` 経路で渡す API 拡張(`Request` に prune hint を足すか、別経路で scheme に伝える)
- `crates/pod/src/pod.rs` or `compact_state.rs`: 現在の prune 済み件数を読み出せるようにする(既に内部で管理されているはず、公開 API 化)
- 既存の serde round-trip テスト: 追加フィールドを skip_serializing_if で出さないので差分なし
## 完了条件
- Anthropic リクエストの Prune 位置compact 済みサマリ末尾)に `cache_control: ephemeral` が付く
- Anthropic リクエストの最後のターン末(直近 user メッセージの直前)に `cache_control: ephemeral` が付く
- Anthropic リクエストの新規リクエスト Headmessages 末尾)に `cache_control: ephemeral` が付く
- Prune 位置 / ターン末 / Head が重なる場合は重複除去される
- 実セッションで `cache_read_tokens` が 2 コール目以降に非 0 になる
- 特に同一ターン内の 2 回目以降の LLM コールで直前の tool_result 以前が cache read されること
- 既存の Anthropic / OpenAI / Gemini テストが全 pass
- cache_control が正しい位置に入ることを検証する新規ユニットテスト
## 範囲外
- OpenAI / Gemini の prompt caching各プロバイダの API 設計が違うため別チケット)
- 動的な breakpoint 数の調整4 枠目を状況により使い分ける、など)。まずは固定 3 箇所
- Cache hit 率の可観測化TUI 表示など)。集計は `cache_read_tokens` として既に記録されるので、表示は別途
- Rate limit 429 を受けた際の retry-after honor別課題

View File

@ -0,0 +1,75 @@
# Review: anthropic-prompt-cache
実装は未コミットstaged + unstaged。`cargo build` clean、`cargo test --workspace` 467 / 0 fail。
## 総評
チケット要件を忠実に反映。3 層 breakpointPrune 位置 / 最後のターン末 / 新規 Head`BTreeSet` で管理し重複除去、Anthropic scheme のみに局所化、OpenAI / Gemini は透過的に無視。エッジケースempty / out-of-range / overlap / shorthand 保持)まで網羅。軽微な指摘のみで blocker なし。
## 完了条件の対応
| 要件 | 状態 | 根拠 |
|---|---|---|
| Prune 位置に `cache_control` | ✅ | `compute_breakpoints` anchor 分岐、`three_breakpoints_when_compact_plus_prior_turn` |
| 最後のターン末に `cache_control` | ✅ | `last_user - 1` 計算、同テスト |
| 新規 Head に `cache_control` | ✅ | `items.len() - 1`、`two_breakpoints_without_compaction` |
| 3 箇所重複の除去 | ✅ | `BTreeSet` で自動重複削除、`overlap_collapses_*` / `single_breakpoint_*` |
| 実セッションで cache_read 非 0 | ⚠️ | 結合で要確認(自動テスト範囲外、実運用で観測) |
| 既存テスト全 pass | ✅ | workspace 467 / 0 fail |
| 新規ユニットテスト | ✅ | 10 件(シリアライズ形状検証含む) |
## 良い点
1. **層構造が素直に分離**: `Request::cache_anchor: Option<usize>` を追加、`Worker::set_cache_anchor` で透過的に伝播、Anthropic scheme のみが実際の breakpoint 配置を知る。OpenAI / Gemini は透過的に無視して回帰なし。
2. **`Pod::compact` / `Pod::from_state` の両方で anchor を復旧**: `from_state``history[0]``Role::System` かどうかで compact の痕跡を検出して anchor を張る。resume 時のキャッシュ継続性が確保されている。
3. **`force_parts` で breakpoint 位置だけ array 形式に落とす**: 通常は text shorthand を保ちつつ、breakpoint を付ける位置だけ array 化。リクエストサイズを最小化する配慮(`single_text_message_uses_text_shorthand_without_breakpoint` で検証)。
4. **シリアライズ形状の検証**: `serialized_json_shape_matches_anthropic_spec``{"type":"ephemeral"}` が正しい階層に出ることを明示的にテスト。Anthropic spec からずれたら即発見できる。
5. **エッジケース網羅**: 空 items / out-of-range anchor / 1 item のみ / tool_result が head になるケース / Parts 強制化 / tool 定義に cache_control が漏れないこと、が全部テスト。
6. **冪等な anchor 範囲チェック**: `request.cache_anchor = self.cache_anchor.filter(|&anchor| anchor < context.len())`。prune が先頭をトリムしても安全。
## 指摘と判断
### 軽微
#### 1. `anchor=0``compact` が暗黙依存
```rust
// pod.rs :872
worker.set_history(new_history);
worker.set_cache_anchor(Some(0));
```
`compact` が history[0] に summary を置く前提で `Some(0)` をハードコード。将来 compact のレイアウトを変えたら breaker になる。コメントで invariant を明記してあるので read 可能だが、`compact_state` 側に「summary の位置」を返す API を置いて受け渡す方が健全。
**判断**: 対応は任意。現状は compact の設計上 history[0] が summary であることが不変、from_state 側の検出ロジックも Role::System を見ているので、レイアウトを変えるなら両方を直すことになる。今回は放置で OK。
#### 2. `Request::cache_anchor` が raw フィールド代入
既存の `Request` builder は fluent`Request::new().user(...).item(...)`)だが、`cache_anchor` のみ `request.cache_anchor = Some(0)` の直代入。テスト内で何度も使われていて少し浮く。`fn cache_anchor(self, anchor: Option<usize>) -> Self` を足して fluent を揃える方が一貫性が良い。
**判断**: スタイルの範囲。機能に影響なし、後で揃えても良い。本チケットでは見送り可。
#### 3. 「直近 user msg の 1 つ前」が tool_call になりうるケース
前ターンが interrupted で最後が `tool_call`tool_result 未応答だった場合、turn_end が tool_call を指す。Anthropic 的には tool_use にも cache_control を付けられるので wire 上は OK。意味論的に「ターン末」と呼ぶには微妙だが、キャッシュ位置としての機能は成立その prefix が安定ならキャッシュが効く)ので実害なし。
**判断**: 仕様上許容。注記不要。
#### 4. Tools array 自体にキャッシュマーカーを付けない選択
ticket で「tools 別 TTL 管理は将来」と明記されており、今は意図的に off。tools は message-level breakpoint の prefix に含まれるので、実質的に system + tools + history のまとまりで cache される。問題なし。
**判断**: ticket 通り。
## 完了に向けた作業
- 必須修正なし
- 指摘 1〜2 は style の範囲、本チケットでは対応不要
- 実セッションで cache_read_tokens が 2 コール目以降に非 0 になるかは実運用で確認LLM への実アクセスが必要、CI の範囲外)
**完了 OK**。