pod: add hook system item sink

This commit is contained in:
Keisuke Hirata 2026-06-05 10:16:27 +09:00
parent 3cc3134386
commit 04a4a730fb
No known key found for this signature in database
3 changed files with 252 additions and 14 deletions

View File

@ -1,8 +1,10 @@
//! Pod-layer hook infrastructure //! Pod-layer hook infrastructure
//! //!
//! Hooks are the **public** orchestration extension point. They receive //! Hooks are the **public** orchestration extension point. They receive
//! read-only summary information about each event in the Worker //! event-specific context values about each event in the Worker execution loop
//! execution loop and return a safe public control-flow action. //! and return a safe public control-flow action. Contexts may carry narrow
//! host-created handles for approved side effects; hook return values remain
//! flow-control decisions only.
//! //!
//! Hooks intentionally cannot mutate the Worker's context, history, tool //! Hooks intentionally cannot mutate the Worker's context, history, tool
//! call, or tool result. Internal mechanisms that need such access (e.g. //! call, or tool result. Internal mechanisms that need such access (e.g.
@ -13,12 +15,16 @@
//! extension surfaces (scripting, plugins) in the future without //! extension surfaces (scripting, plugins) in the future without
//! exposing the underlying mutable state. //! exposing the underlying mutable state.
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use async_trait::async_trait; use async_trait::async_trait;
use llm_worker::interceptor::{ use llm_worker::interceptor::{
PostToolAction, PreRequestAction, PreToolAction, PromptAction, TurnEndAction, PostToolAction, PreRequestAction, PreToolAction, PromptAction, TurnEndAction,
}; };
use llm_worker::tool::{ToolOutput, ToolResult}; use llm_worker::tool::{ToolOutput, ToolResult};
use serde_json::Value; use serde_json::Value;
use session_store::{SystemItem, SystemReminder};
/// Hook-facing prompt-submit action. /// Hook-facing prompt-submit action.
/// ///
@ -148,7 +154,42 @@ impl From<HookTurnEndAction> for TurnEndAction {
} }
// ============================================================================= // =============================================================================
// Hook input summary types (read-only) // Hook context handles
// =============================================================================
/// Host-created handle for appending approved durable [`SystemItem`] requests.
///
/// Hook code can use this handle only when the Pod host includes it in an
/// event-specific context. The handle queues typed requests; the host drains the
/// queue, commits each entry through `LogEntry::SystemItem`, and only then makes
/// the matching system message visible to the model. It deliberately exposes no
/// raw `llm_worker::Item`, history writer, event sender, `Pod`, `Worker`, or
/// notification buffer.
pub struct SystemItemAppendHandle {
pending: Arc<Mutex<Vec<SystemItem>>>,
}
impl SystemItemAppendHandle {
pub(crate) fn new(pending: Arc<Mutex<Vec<SystemItem>>>) -> Self {
Self { pending }
}
/// Queue a task-inactivity reminder for durable model-visible append.
///
/// The body should be the unwrapped reminder text; the host-side
/// `SystemReminder` renderer wraps it exactly once in `<system-reminder>`
/// tags before commit.
pub fn append_task_reminder(&self, body: impl Into<String>) {
let item = SystemReminder::task_inactivity(body).into_system_item();
self.pending
.lock()
.expect("system-item append queue poisoned")
.push(item);
}
}
// =============================================================================
// Hook input summary/context types (read-only)
// ============================================================================= // =============================================================================
/// Information passed to `OnPromptSubmit` hooks. /// Information passed to `OnPromptSubmit` hooks.
@ -159,7 +200,7 @@ pub struct PromptSubmitInfo {
pub turn_index: usize, pub turn_index: usize,
} }
/// Information passed to `PreLlmRequest` hooks. /// Summary information included in `PreLlmRequest` contexts.
pub struct PreRequestInfo { pub struct PreRequestInfo {
/// Number of items currently in the Worker context. /// Number of items currently in the Worker context.
pub item_count: usize, pub item_count: usize,
@ -173,6 +214,41 @@ pub struct PreRequestInfo {
pub tool_calls_this_turn: usize, pub tool_calls_this_turn: usize,
} }
/// Context passed to `PreLlmRequest` hooks.
///
/// The summary remains read-only. When the host grants durable system-item
/// append authority for this request, `system_items()` exposes a typed append
/// handle; otherwise it returns `None` and hooks cannot produce model-visible
/// additions.
pub struct PreRequestContext {
info: PreRequestInfo,
system_items: Option<SystemItemAppendHandle>,
}
impl PreRequestContext {
pub(crate) fn new(info: PreRequestInfo, system_items: Option<SystemItemAppendHandle>) -> Self {
Self { info, system_items }
}
/// Read-only request summary.
pub fn info(&self) -> &PreRequestInfo {
&self.info
}
/// Host-provided durable system-item append handle, when available.
pub fn system_items(&self) -> Option<&SystemItemAppendHandle> {
self.system_items.as_ref()
}
}
impl Deref for PreRequestContext {
type Target = PreRequestInfo;
fn deref(&self) -> &Self::Target {
&self.info
}
}
/// Information passed to `PreToolCall` hooks. /// Information passed to `PreToolCall` hooks.
pub struct ToolCallSummary { pub struct ToolCallSummary {
/// Provider-assigned tool call id. /// Provider-assigned tool call id.
@ -252,7 +328,7 @@ impl HookEventKind for OnPromptSubmit {
} }
impl HookEventKind for PreLlmRequest { impl HookEventKind for PreLlmRequest {
type Input = PreRequestInfo; type Input = PreRequestContext;
type Output = HookPreRequestAction; type Output = HookPreRequestAction;
} }
@ -365,6 +441,39 @@ pub struct HookRegistry {
mod tests { mod tests {
use super::*; use super::*;
#[test]
fn system_item_append_handle_queues_only_approved_task_reminder_items() {
let pending = Arc::new(Mutex::new(Vec::new()));
let handle = SystemItemAppendHandle::new(Arc::clone(&pending));
handle.append_task_reminder("remember tasks");
let queued = pending.lock().expect("pending queue poisoned");
assert_eq!(queued.len(), 1);
match &queued[0] {
SystemItem::TaskReminder { body, .. } => {
assert_eq!(body.matches("<system-reminder>").count(), 1);
assert!(body.contains("remember tasks"));
}
other => panic!("unexpected system item: {other:?}"),
}
}
#[test]
fn pre_request_context_exposes_handle_only_when_host_supplies_one() {
let info = PreRequestInfo {
item_count: 3,
estimated_tokens: Some(42),
turn_index: 1,
tool_calls_this_turn: 2,
};
let context = PreRequestContext::new(info, None);
assert_eq!(context.item_count, 3);
assert_eq!(context.info().estimated_tokens, Some(42));
assert!(context.system_items().is_none());
}
#[test] #[test]
fn public_pre_tool_hook_actions_cannot_emit_internal_no_result_skip() { fn public_pre_tool_hook_actions_cannot_emit_internal_no_result_skip() {
let continue_action = HookPreToolAction::Continue.into_worker_action("call_1".into()); let continue_action = HookPreToolAction::Continue.into_worker_action("call_1".into());

View File

@ -4,7 +4,7 @@
//! notification injection / output truncation in the future) and the //! notification injection / output truncation in the future) and the
//! public `HookRegistry`. Internal mechanisms run first and have full //! public `HookRegistry`. Internal mechanisms run first and have full
//! mutable access via the `Interceptor` trait. Hooks then receive //! mutable access via the `Interceptor` trait. Hooks then receive
//! read-only summary information and only return control-flow //! event-specific read-only contexts and only return control-flow
//! decisions (continue / skip / abort / pause). //! decisions (continue / skip / abort / pause).
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
@ -28,8 +28,8 @@ use tools::{TaskEntry, TaskStatus, TaskStore};
use crate::hook::{ use crate::hook::{
AbortInfo, HookPostToolAction, HookPreRequestAction, HookPreToolAction, HookPromptAction, AbortInfo, HookPostToolAction, HookPreRequestAction, HookPreToolAction, HookPromptAction,
HookRegistry, HookTurnEndAction, PreRequestInfo, PromptSubmitInfo, ToolCallSummary, HookRegistry, HookTurnEndAction, PreRequestContext, PreRequestInfo, PromptSubmitInfo,
ToolResultSummary, TurnEndInfo, SystemItemAppendHandle, ToolCallSummary, ToolResultSummary, TurnEndInfo,
}; };
use crate::ipc::notify_buffer::{NotifyBuffer, build_system_item}; use crate::ipc::notify_buffer::{NotifyBuffer, build_system_item};
use crate::pod::SystemItemCommitter; use crate::pod::SystemItemCommitter;
@ -342,13 +342,34 @@ impl Interceptor for PodInterceptor {
turn_index: self.current_turn_index(), turn_index: self.current_turn_index(),
tool_calls_this_turn: self.tool_calls_this_turn.load(Ordering::Relaxed), tool_calls_this_turn: self.tool_calls_this_turn.load(Ordering::Relaxed),
}; };
let pending_hook_system_items = Arc::new(Mutex::new(Vec::new()));
let system_item_sink = self
.log_writer
.as_ref()
.map(|_| SystemItemAppendHandle::new(Arc::clone(&pending_hook_system_items)));
let hook_context = PreRequestContext::new(info, system_item_sink);
for hook in &self.registry.pre_llm_request { for hook in &self.registry.pre_llm_request {
let action = hook.call(&info).await; let action = hook.call(&hook_context).await;
if !matches!(action, HookPreRequestAction::Continue) { if !matches!(action, HookPreRequestAction::Continue) {
return action.into(); return action.into();
} }
} }
PreRequestAction::Continue
let system_items: Vec<SystemItem> = std::mem::take(
&mut *pending_hook_system_items
.lock()
.expect("pending hook system-item queue poisoned"),
);
if system_items.is_empty() {
return PreRequestAction::Continue;
}
self.commit_system_items(&system_items);
PreRequestAction::ContinueWith(
system_items
.into_iter()
.map(|item| item.to_history_item())
.collect(),
)
} }
async fn pre_tool_call(&self, info: &mut ToolCallInfo) -> PreToolAction { async fn pre_tool_call(&self, info: &mut ToolCallInfo) -> PreToolAction {
@ -491,7 +512,7 @@ mod tests {
#[async_trait] #[async_trait]
impl Hook<PreLlmRequest> for CountingHook { impl Hook<PreLlmRequest> for CountingHook {
async fn call(&self, _info: &PreRequestInfo) -> HookPreRequestAction { async fn call(&self, _info: &PreRequestContext) -> HookPreRequestAction {
self.0.fetch_add(1, Ordering::Relaxed); self.0.fetch_add(1, Ordering::Relaxed);
HookPreRequestAction::Continue HookPreRequestAction::Continue
} }
@ -503,6 +524,34 @@ mod tests {
Arc::new(builder.build()) Arc::new(builder.build())
} }
struct RecordingSystemItemCommitter {
committed: Arc<Mutex<Vec<SystemItem>>>,
}
impl SystemItemCommitter for RecordingSystemItemCommitter {
fn commit_system_item(&self, item: SystemItem) {
self.committed
.lock()
.expect("committed system-item list poisoned")
.push(item);
}
}
struct AppendingPreRequestHook {
saw_handle: Arc<AtomicBool>,
}
#[async_trait]
impl Hook<PreLlmRequest> for AppendingPreRequestHook {
async fn call(&self, input: &PreRequestContext) -> HookPreRequestAction {
if let Some(system_items) = input.system_items() {
self.saw_handle.store(true, Ordering::Relaxed);
system_items.append_task_reminder("hook reminder");
}
HookPreRequestAction::Continue
}
}
fn interceptor_for_task_reminders( fn interceptor_for_task_reminders(
task_store: TaskStore, task_store: TaskStore,
task_reminder_state: Arc<TaskReminderState>, task_reminder_state: Arc<TaskReminderState>,
@ -743,11 +792,91 @@ mod tests {
assert_eq!(count.load(Ordering::Relaxed), 1); assert_eq!(count.load(Ordering::Relaxed), 1);
} }
#[tokio::test]
async fn pre_llm_request_commits_hook_system_items_before_continue_with() {
let saw_handle = Arc::new(AtomicBool::new(false));
let mut builder = HookRegistryBuilder::new();
builder.add_pre_llm_request(AppendingPreRequestHook {
saw_handle: Arc::clone(&saw_handle),
});
let registry = Arc::new(builder.build());
let committed = Arc::new(Mutex::new(Vec::new()));
let committer = Arc::new(RecordingSystemItemCommitter {
committed: Arc::clone(&committed),
});
let interceptor = PodInterceptor::new(
registry,
None,
None,
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
Some(committer),
);
let mut ctx: Vec<Item> = Vec::new();
let action = interceptor.pre_llm_request(&mut ctx).await;
assert!(saw_handle.load(Ordering::Relaxed));
let PreRequestAction::ContinueWith(items) = action else {
panic!("expected ContinueWith for committed hook system item");
};
assert_eq!(items.len(), 1);
assert!(matches!(
&items[0],
Item::Message {
role: llm_worker::Role::System,
..
}
));
assert!(
extract_message_text(&items[0])
.expect("system message text")
.contains("hook reminder")
);
let committed = committed
.lock()
.expect("committed system-item list poisoned");
assert_eq!(committed.len(), 1);
match &committed[0] {
SystemItem::TaskReminder { body, .. } => assert!(body.contains("hook reminder")),
other => panic!("unexpected committed system item: {other:?}"),
}
}
#[tokio::test]
async fn pre_llm_request_without_log_writer_does_not_expose_system_item_handle() {
let saw_handle = Arc::new(AtomicBool::new(false));
let mut builder = HookRegistryBuilder::new();
builder.add_pre_llm_request(AppendingPreRequestHook {
saw_handle: Arc::clone(&saw_handle),
});
let interceptor = PodInterceptor::new(
Arc::new(builder.build()),
None,
None,
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
let mut ctx: Vec<Item> = Vec::new();
let action = interceptor.pre_llm_request(&mut ctx).await;
assert!(!saw_handle.load(Ordering::Relaxed));
assert!(matches!(action, PreRequestAction::Continue));
}
struct AbortingHook(Arc<AtomicBool>); struct AbortingHook(Arc<AtomicBool>);
#[async_trait] #[async_trait]
impl Hook<PreLlmRequest> for AbortingHook { impl Hook<PreLlmRequest> for AbortingHook {
async fn call(&self, _info: &PreRequestInfo) -> HookPreRequestAction { async fn call(&self, _info: &PreRequestContext) -> HookPreRequestAction {
self.0.store(true, Ordering::Relaxed); self.0.store(true, Ordering::Relaxed);
HookPreRequestAction::Cancel("nope".into()) HookPreRequestAction::Cancel("nope".into())
} }

View File

@ -31,7 +31,7 @@ use crate::compact::usage_tracker::UsageTracker;
use crate::feature::{FeatureRegistryBuilder, FeatureRegistryInstallReport}; use crate::feature::{FeatureRegistryBuilder, FeatureRegistryInstallReport};
use crate::hook::{ use crate::hook::{
Hook, HookPreRequestAction, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd, Hook, HookPreRequestAction, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd,
PostToolCall, PreLlmRequest, PreRequestInfo, PreToolCall, PostToolCall, PreLlmRequest, PreRequestContext, PreToolCall,
}; };
use crate::ipc::alerter::Alerter; use crate::ipc::alerter::Alerter;
use crate::ipc::interceptor::{PodInterceptor, TaskReminderState}; use crate::ipc::interceptor::{PodInterceptor, TaskReminderState};
@ -221,7 +221,7 @@ struct UsageTrackingHook {
#[async_trait] #[async_trait]
impl Hook<PreLlmRequest> for UsageTrackingHook { impl Hook<PreLlmRequest> for UsageTrackingHook {
async fn call(&self, info: &PreRequestInfo) -> HookPreRequestAction { async fn call(&self, info: &PreRequestContext) -> HookPreRequestAction {
self.tracker.note_request(info.item_count); self.tracker.note_request(info.item_count);
HookPreRequestAction::Continue HookPreRequestAction::Continue
} }