merge: integrate orchestration branch

This commit is contained in:
Keisuke Hirata 2026-06-22 01:06:48 +09:00
commit 2bad74046e
No known key found for this signature in database
33 changed files with 3432 additions and 85 deletions

View File

@ -0,0 +1 @@
{"id":"orch-plan-20260621-113559-1","ticket_id":"00001KVMGAEJN","kind":"accepted_plan","accepted_plan":{"summary":"Implement separate `host_api.websocket` Plugin capability with manifest WebSocket target declarations, enablement grants, static inspection/CLI diagnostics, host-owned bounded connection handles, runtime allow/deny policy, request API continued WebSocket rejection, docs/WIT/API updates, and focused tests.","branch":"impl/00001KVMGAEJN-plugin-websocket-host-api","worktree":"/home/hare/Projects/yoi/.worktree/00001KVMGAEJN-plugin-websocket-host-api","role_plan":"Orchestrator creates a dedicated child worktree and spawns a narrow-scope Coder. Reviewer will be spawned read-only after Coder reports implementation commit(s). After approval, Orchestrator integrates into `orchestration`, validates plugin manifest/runtime/CLI/docs tests and Nix if dependency changes occur, records closure, and cleans only the child worktree/branch."},"author":"yoi-orchestrator","at":"2026-06-21T11:35:59Z"}

View File

@ -1,8 +1,8 @@
--- ---
title: 'Plugin: URL 権限ベースの WebSocket host API を実装する' title: 'Plugin: URL 権限ベースの WebSocket host API を実装する'
state: 'queued' state: 'closed'
created_at: '2026-06-21T07:11:34Z' created_at: '2026-06-21T07:11:34Z'
updated_at: '2026-06-21T11:34:07Z' updated_at: '2026-06-21T13:27:28Z'
assignee: null assignee: null
readiness: 'implementation_ready' readiness: 'implementation_ready'
risk_flags: ['plugin', 'host-api', 'websocket', 'service', 'ingress', 'lifecycle', 'permissions', 'security', 'persistence'] risk_flags: ['plugin', 'host-api', 'websocket', 'service', 'ingress', 'lifecycle', 'permissions', 'security', 'persistence']

View File

@ -0,0 +1,28 @@
URL permission based Plugin WebSocket host API を実装し、Orchestrator worktree の `orchestration` branch に統合した。
主な成果:
- `host_api.websocket``host_api.request` とは別 capability として追加。
- Manifest `[[websocket]]` target declaration と enablement `grants.websocket` を追加し、request targets/grants とは独立させた。
- Static inspection / `yoi plugin show` が WebSocket requested/granted/missing/grant-only/broad diagnostics を request diagnostics とは別に表示するようにした。
- Runtime connect は manifest target と enablement grant の両方が URL を許可する場合のみ network I/O に進む。
- URL checks cover scheme (`ws`/`wss`), host, port, and path prefix。
- Local/private/loopback WebSocket targets は ambient ではなく、明示 declaration + grant が必要。
- Host-owned WebSocket handle API を追加: open, send_text / send-text, recv, close。
- Text-only / explicit bounded receive とし、binary receive は fail closed / unsupported。
- Guest arbitrary handshake headers / embedded credentials を reject。
- Request API は WebSocket/SSE/persistent attempts を引き続き reject。
- Open path は pre-dial capacity reservation と bounded async `tokio-tungstenite` open under `tokio::time::timeout` により max-open / timeout semantics を network I/O 前から enforce。
- Reservation cleanup on open failure / failed commit を追加。
- WIT resource `yoi:host/websocket@1.0.0` と docs を更新。
- `tungstenite`, `tokio-tungstenite`, `futures-util` dependencies と `Cargo.lock` / `package.nix` cargo hash を更新。
統合・検証:
- Merge commit: `354f1e10 merge: plugin websocket host api`
- Implementation commits: `4c1b8c3d`, `ce62d235`, `a766048f`
- Reviewer final verdict: approve
- Validation passed: `cargo fmt --check`, `git diff --check HEAD^1..HEAD`, `cargo test -p pod websocket`, `cargo test -p manifest websocket`, `cargo test -p yoi render_show_distinguishes_request_grant_statuses_and_broad_targets`, `cargo test -p manifest request_host_api_manifest_and_grant_parse_with_request_names`, `cargo check -p manifest -p pod -p yoi`, `cargo run -p yoi -- ticket doctor`, and `nix build .#yoi --no-link`
範囲外:
- Discord bridge 本体は実装していない。
- Reconnect/backoff/heartbeat scheduler、hidden context/history injection、Dashboard channel、Ticket mutation、direct model Tool invocation は追加していない。
- SecretRef-based credential injection は future follow-up。

View File

@ -94,4 +94,520 @@ Marked ready by `yoi ticket state`.
Ticket を `workspace-panel` が queued にしました。 Ticket を `workspace-panel` が queued にしました。
---
<!-- event: decision author: yoi-orchestrator at: 2026-06-21T11:35:44Z -->
## Decision
Routing decision: `implementation_ready`
Reason:
- Ticket was previously returned to planning because it needed concrete WebSocket connection ownership / lifecycle / permission model decisions。
- Current Ticket body is now retitled and refined as implementation work: `Plugin: URL 権限ベースの WebSocket host API を実装する`
- It defines explicit `host_api.websocket` semantics, URL permission/grant model, lifecycle/bounds/non-goals, acceptance criteria, tests, docs, and validation。
- Recorded dependency `00001KVMGAEJN depends_on 00001KVMG8FTW` is now satisfied because `00001KVMG8FTW` is `closed` and `host_api.request` URL permission model is integrated。
- Relations / orchestration plan have no remaining blockers。
- Active `00001KVMT2J25` work is protocol/pod/TUI reconnect snapshot and is semantically separate from Plugin WebSocket host API; parallel implementation is acceptable。
- Orchestrator worktree is clean on `orchestration` at `d8027781` before routing side effects; target worktree / branch is not present。
Evidence checked:
- Ticket body / thread / artifacts via `TicketShow` and direct `item.md` read。
- `TicketRelationQuery(00001KVMGAEJN)`: one `depends_on 00001KVMG8FTW`, target Ticket is `closed`
- `TicketOrchestrationPlanQuery(00001KVMGAEJN)`: no records。
- `TicketList(state=queued)`: this Ticket is the only queued Ticket。
- `ListPods`: only active child for other work is `yoi-reviewer-00001KVMT2J25-r1`
- Orchestrator git state / worktree list / branch list checked from `/home/hare/Projects/yoi/.worktree/orchestration` only。
- Bounded code map:
- `crates/manifest/src/plugin.rs` now has `host_api.request`, `PluginRequestGrant`, and manifest request target schema。
- `crates/pod/src/feature/plugin.rs` has `PluginRequestClient`, `validate_plugin_request_request`, request allowlist inspection, and explicit WebSocket rejection in request path。
- No existing tungstenite/tokio-tungstenite/websocket dependency found in Cargo manifests。
- Docs currently state WebSocket/persistent transports require a separate Plugin capability。
IntentPacket:
Intent:
- Add a separate URL-permission-based Plugin WebSocket host API, not an extension of `host_api.request`, suitable as a foundation for Discord/gateway-like integrations without implementing Discord itself。
Binding decisions / invariants:
- API name is `host_api.websocket`; do not fold WebSocket into `host_api.request`
- URL permission model should mirror/reuse the `host_api.request` target/grant review semantics where sensible, while keeping websocket-specific lifecycle/bounds explicit。
- Authority requires both manifest-declared WebSocket target and enablement grant before opening a connection。
- WebSocket connection is host-owned and Plugin-driven: guest requests open/send/recv/close via host API, but host enforces handles, bounds, timeouts, and shutdown cleanup。
- No ambient network/socket access, no raw WASI sockets, no arbitrary URL by default。
- Secrets/auth headers are not solved by guest-memory arbitrary credential headers; keep credential-bearing header policy conservative and explicit。
- Incoming messages from WebSocket are delivered to the guest through explicit host API return values or bounded polling/receive operations, not hidden model context injection。
- No direct model Tool calls, Ticket mutation, Dashboard UI channel, or hidden history/context mutation。
- `host_api.request` must keep rejecting WebSocket/SSE/persistent connection attempts。
- First slice should avoid full background daemon scheduler unless it is minimal and bounded; preserve instance lifecycle cleanup。
Requirements / acceptance criteria:
- Manifest can declare WebSocket targets independently from request targets。
- Enablement config can grant WebSocket targets independently from request grants。
- Static inspection / `yoi plugin show` reports WebSocket requested/granted/missing/broad diagnostics separately from request。
- Runtime refuses connect unless manifest target and grant both allow the URL。
- URL checks cover scheme (`ws`/`wss`), host, port, path prefix, and any method/protocol constraints chosen for handshake。
- Local/private/loopback WebSocket targets require explicit declaration+grant。
- WebSocket API has bounded handle lifetime, max frame/message size, max open connections per Plugin instance, timeout/cancellation behavior, and cleanup on instance stop/trap/drop。
- Send/receive operations are bounded and typed; binary/text behavior is documented。
- Credential-like headers are rejected or explicitly not supported until SecretRef/grants exist。
- Tests cover allow/deny, grant-only/missing-grant, loopback allow/deny, broad diagnostics, request API still rejecting WebSocket, bounds/cleanup, and no hidden context mutation。
Implementation latitude:
- Rust dependency choice is Coders decision, e.g. `tokio-tungstenite` if suitable, but dependency/package/Nix implications must be handled。
- WIT/API shape can be handle-based with `open`, `send_text`/`send_binary`, `recv`, `close`, or similar. Keep it minimal and reviewable。
- If a fully live network integration test is hard, use local test server / mock client abstraction to validate runtime policy and handle lifecycle。
- Reuse request target/grant matching helpers where appropriate, but avoid overgeneralizing if it obscures WebSocket semantics。
Escalate if:
- Implementing safe host-owned connection handles requires a broader Plugin service/instance actor redesign than current runtime supports。
- SecretRef/auth header requirements become necessary for useful functionality。
- Background receive delivery requires hidden context/history mutation or direct model Tool invocation。
- Cargo/Nix dependency updates become blocked by unavailable crates or source filtering。
- Public API compatibility for new WIT world/import names becomes unclear。
Validation plan:
- `cargo fmt --check`
- Focused manifest tests for websocket permission/target/grant parsing and independence from request。
- Focused pod plugin tests for connection allow/deny policy, loopback explicit grants, grant-only/missing-grant, bounds, handle cleanup, and request API rejection of WebSocket。
- Focused yoi plugin CLI tests for WebSocket diagnostics。
- `cargo check` for affected crates。
- `git diff --check`
- stale/boundary grep to ensure request docs still reject WebSocket and WebSocket docs do not claim hidden event injection。
- `yoi ticket doctor`
- `nix build .#yoi --no-link` if dependencies/package/source-filter changed。
---
<!-- event: state_changed author: yoi-orchestrator at: 2026-06-21T11:35:51Z from: queued to: inprogress reason: human_authorized_dependency_satisfied_websocket_implementation field: state -->
## State changed
Human authorized queue routing from Workspace Dashboard. Previously recorded dependency on `00001KVMG8FTW` is now satisfied because `host_api.request` is closed/integrated, and this Ticket now has implementation-ready WebSocket host API requirements. Orchestrator accepts implementation.
---
<!-- event: implementation_report author: yoi-orchestrator at: 2026-06-21T11:37:01Z -->
## Implementation report
Implementation start report:
- Created child implementation worktree:
- `/home/hare/Projects/yoi/.worktree/00001KVMGAEJN-plugin-websocket-host-api`
- Created branch:
- `impl/00001KVMGAEJN-plugin-websocket-host-api`
- Base commit:
- `3eac7f8e ticket: accept plugin websocket host api`
- Confirmed tracked Ticket project records are visible in the child worktree。
- Confirmed `.yoi/memory` has no tracked/untracked entries in the child worktree check。
- Spawned sibling Coder Pod:
- `yoi-coder-00001KVMGAEJN`
- Coder scope:
- read: `/home/hare/Projects/yoi`
- write: `/home/hare/Projects/yoi/.worktree/00001KVMGAEJN-plugin-websocket-host-api`
Parallelization note:
- `00001KVMT2J25` remains active in review and targets protocol/pod/TUI reconnect snapshot semantics. This Ticket should stay limited to Plugin WebSocket host API / manifest / runtime / CLI / docs / tests。
Next action:
- Wait for Coder implementation report, then spawn Reviewer read-only for the implementation diff。
---
<!-- event: implementation_report author: yoi-orchestrator at: 2026-06-21T12:20:55Z -->
## Implementation report
Coder implementation report received from `yoi-coder-00001KVMGAEJN`.
Implementation commit reported:
- `4c1b8c3d feat: add plugin websocket host api`
Changed areas reported:
- Manifest/config schema:
- Added independent `host_api.websocket` capability。
- Added manifest `[[websocket]]` targets and enablement `grants.websocket`
- Runtime:
- Added host-owned WebSocket handle lifecycle for raw Wasm and component imports。
- Added `open`, `send_text` / `send-text`, `recv`, and `close`
- Enforces manifest+grant URL allowlist before network I/O。
- Bounds open descriptor size, text/message size, max open connections, recv timeout, handle lifetime, and cleanup。
- Rejects embedded credentials, guest handshake headers, unsupported subprotocols, non-`ws`/`wss` schemes, and binary message receive。
- Static inspection / CLI:
- Reports WebSocket requested/granted/missing/grant-only/broad diagnostics separately from request diagnostics。
- `yoi plugin show` now includes `configured_websocket_grants`
- Docs/WIT:
- Added `yoi:host/websocket@1.0.0` WIT interface。
- Documented text-only API, explicit bounded receive, no hidden context/history injection, and future SecretRef credential follow-up。
- Dependency/package:
- Added `tungstenite` to `crates/pod`
- Updated `Cargo.lock`
- Updated `package.nix` cargo hash to `sha256-TZrw6nJclXVRpFIUlYvimGTDXlxBMaQt6oM5C5DIGIU=`
Coder validation reported:
- `cargo fmt --check`: passed
- `cargo test -p manifest websocket`: passed
- `cargo test -p pod websocket`: passed
- `cargo test -p yoi render_show_distinguishes_request_grant_statuses_and_broad_targets`: passed
- `cargo test -p manifest request_host_api_manifest_and_grant_parse_with_request_names`: passed
- `cargo check -p manifest -p pod -p yoi`: passed
- `git diff --check`: passed
- stale/boundary grep for request/WebSocket docs and hidden context claims: passed
- `cargo run -p yoi -- ticket doctor`: passed (`doctor: ok`)
Nix status reported:
- First `nix build .#yoi --no-link` exposed expected cargo hash mismatch; Coder updated `package.nix`
- After hash update, build advanced into dependency compilation then failed in sandbox compiling `aws-lc-sys` with `No space left on device`
- Coder treated this as environmental disk-space failure, not Rust/package hash failure。
Orchestrator evidence checked:
- Worktree clean at `4c1b8c3d`
- Diff `3eac7f8e..HEAD` includes expected code/docs/package areas but also includes child worktree edits to `.yoi/tickets/00001KVMGAEJN/{item.md,thread.md}`
- Because Orchestrator owns durable Ticket records in this workflow, Coder was asked to remove those `.yoi/tickets` edits from the implementation branch before Reviewer routing。
Next action:
- Wait for Coder hygiene fix, then route code-only implementation diff to Reviewer。
---
<!-- event: implementation_report author: yoi-orchestrator at: 2026-06-21T12:32:21Z -->
## Implementation report
Coder hygiene fix received from `yoi-coder-00001KVMGAEJN`.
New commit:
- `ce62d235 chore: keep plugin websocket branch code-only`
Result:
- Implementation branch now contains code/docs/package changes only; child-worktree Ticket record edits were removed from the branch diff。
- `git diff --name-status 3eac7f8e..HEAD -- .yoi/tickets/00001KVMGAEJN`: no output。
- Worktree clean at `ce62d235`
- Diff `3eac7f8e..HEAD` now touches 11 expected files: Cargo/package, manifest/pod/yoi plugin CLI, docs, and WIT resources。
- `git diff --check 3eac7f8e..HEAD`: passed。
Next action:
- Route to read-only Reviewer Pod for acceptance review against the Ticket。
---
<!-- event: comment author: yoi-orchestrator at: 2026-06-21T12:33:03Z -->
## Comment
Review routing:
Spawned read-only Reviewer Pod:
- `yoi-reviewer-00001KVMGAEJN-r1`
Review target:
- Worktree: `/home/hare/Projects/yoi/.worktree/00001KVMGAEJN-plugin-websocket-host-api`
- Branch: `impl/00001KVMGAEJN-plugin-websocket-host-api`
- Baseline: `3eac7f8e`
- Implementation commits:
- `4c1b8c3d feat: add plugin websocket host api`
- `ce62d235 chore: keep plugin websocket branch code-only`
- Diff: `3eac7f8e..ce62d235`
Reviewer scope:
- read-only `/home/hare/Projects/yoi`
Reviewer focus:
- `host_api.websocket` remains distinct from `host_api.request`
- Manifest target + enablement grant are both required before opening a connection。
- Host-owned WebSocket handle lifecycle is bounded and cleaned up。
- Credential/header, binary/text, local/private/loopback, broad diagnostics, and no-hidden-context semantics are safe。
- Request API still rejects WebSocket/SSE/persistent attempts。
- Cargo/package/Nix changes are sound; Coder reported Nix advanced past hash check then failed due `No space left on device` in `aws-lc-sys` build。
Orchestrator will wait for reviewer verdict before integration。
---
<!-- event: review author: yoi-reviewer-00001KVMGAEJN-r1 at: 2026-06-21T12:43:06Z status: request_changes -->
## Review: request changes
Verdict: `request_changes`
確認範囲:
- Diff `3eac7f8e..ce62d235` in worktree `/home/hare/Projects/yoi/.worktree/00001KVMGAEJN-plugin-websocket-host-api`:
- `Cargo.lock`
- `crates/manifest/src/plugin.rs`
- `crates/pod/Cargo.toml`
- `crates/pod/src/feature/plugin.rs`
- `crates/pod/src/pod.rs`
- `crates/yoi/src/plugin_cli.rs`
- `docs/development/plugin-development.md`
- `package.nix`
- `resources/plugin/wit/deps/yoi-host/yoi-host-v1.wit`
- `resources/plugin/wit/yoi-plugin-instance-v1.wit`
- `resources/plugin/wit/yoi-plugin-tool-v1.wit`
- Ticket authority:
- `.yoi/tickets/00001KVMGAEJN/item.md`
- `.yoi/tickets/00001KVMGAEJN/thread.md`
Blocking issue:
1. WebSocket open が実際には bounded / cancellable ではなく、open-connection limit も network I/O 後にしか適用されていない。
Evidence:
- `crates/pod/src/feature/plugin.rs:922-931`
- `execute_plugin_websocket_open` は URL/grant validation 後に `client.open(...)` を呼び、その後で handle table に insert している。
- `crates/pod/src/feature/plugin.rs:2536-2558`
- `TungstenitePluginWebSocketClient::open` は最初に `tungstenite::client::connect_with_config(...)` を呼ぶ。
- `set_system_websocket_timeouts(...)` は connection established 後にしか呼ばれない。
- `crates/pod/src/feature/plugin.rs:2687-2698`
- `PLUGIN_WEBSOCKET_MAX_OPEN_CONNECTIONS``PluginWebSocketHandleTable::insert` 内で check されるため、`client.open(...)` 完了後、つまり network I/O 後にしか適用されない。
Implications:
- DNS/TCP/TLS/WebSocket handshake が Yoi の WebSocket timeout 適用前に OS/library behavior に従って block し得る。
- `PLUGIN_WASM_TIMEOUT` wrapper は plugin Tool execution に timeout を返すが、spawned blocking task の open 自体は cancel しないため background で継続し得る。
- handle table が満杯でも、新しい outbound WebSocket connection を開いた後で reject するため、max open connections の resource-bound intent を破り、意図しない network side effect が発生し得る。
Why this blocks:
- Ticket は WebSocket API に bounded handle lifetime、max open connections、timeout/cancellation behavior、cleanup を要求している。
- capacity / timeout / cancellation の enforcement が network I/O 前に fail closed していないため、この acceptance criteria を満たしていない。
Required fix:
- Network I/O 前に capacity を reserve/check する。
- Open path を bounded/cancellable にする、または bound を保証できない場合は dialing 前に fail closed する。
- Timeout/cancellation/resource-bound semantics を tests で固定する。
Non-blocking concerns:
- WebSocket host API は component/tool WIT surface に追加され、raw helper path tests はあるが、「long-lived WebSocket connection を Tool call 内に隠さず Plugin instance/Service lifecycle として扱う」という Ticket invariant を pin する integration test/diagnostic は見当たらなかった。現行 Plugin instance が Tool/Service state を共有するなら許容可能だが、lifecycle boundary の regression test/diagnostic があるとより明確。
- `close_all` / drop cleanup と component `stop` cleanup はあるが、active blocking host calls は blocking operation が戻るまで handle-table mutex を保持し得る。これは open timeout/cancellation blocker の副次的問題。
Reviewer validation:
- `cargo fmt --check`: passed
- `git diff --check 3eac7f8e..HEAD`: passed
- `cargo test -p manifest websocket`: passed
- `cargo test -p pod websocket`: passed, 6 focused pod WebSocket tests
- `cargo test -p yoi render_show_distinguishes_request_grant_statuses_and_broad_targets`: passed
- `cargo test -p manifest request_host_api_manifest_and_grant_parse_with_request_names`: passed
- `cargo check -p manifest -p pod -p yoi`: passed
- `cargo run -p yoi -- ticket doctor`: passed (`doctor: ok`)
- `nix build .#yoi --no-link`: passed; reviewer did not reproduce earlier disk-space failure。
- Boundary/stale grep over docs/resources/manifest/pod/yoi CLI for `host_api.request`, `host_api.websocket`, WebSocket/SSE, hidden context/history, Dashboard/Ticket/Discord/reconnect terms found no hidden context injection, Ticket mutation, Dashboard channel, reconnect daemon, or Discord-specific overreach in inspected changes。
Worktree status:
- Clean on `impl/00001KVMGAEJN-plugin-websocket-host-api`
---
<!-- event: implementation_report author: yoi-orchestrator at: 2026-06-21T13:16:05Z -->
## Implementation report
Coder r1-fix report received from `yoi-coder-00001KVMGAEJN`.
New fix commit:
- `a766048f fix: bound plugin websocket open`
Branch commits now:
- `4c1b8c3d feat: add plugin websocket host api`
- `ce62d235 chore: keep plugin websocket branch code-only`
- `a766048f fix: bound plugin websocket open`
Fix summary:
- `crates/pod/src/feature/plugin.rs`
- Added pre-dial WebSocket capacity reservation / permit flow。
- Max-open-connections now fails before invoking network client/open path。
- Reservation is released on open failure。
- Replaced sync `tungstenite` open path with bounded async `tokio-tungstenite` open under `tokio::time::timeout`
- Added deterministic fail-closed abstraction for clients that cannot guarantee bounded/cancellable open。
- Added focused tests for max-open pre-dial rejection, open failure reservation cleanup, and unbounded open client fail-closed before dialing。
- `crates/pod/Cargo.toml` / `Cargo.lock`
- Added `tokio-tungstenite` and `futures-util`
- `package.nix`
- Updated `cargoHash` to `sha256-cZxkmM42kbDp1Rv9gn4sCD5WIQLc0wCbjj4GbKjuA9Q=`
Coder validation reported:
- `cargo fmt --check`: passed
- `cargo test -p pod websocket`: passed
- `cargo test -p manifest websocket`: passed
- `cargo test -p yoi render_show_distinguishes_request_grant_statuses_and_broad_targets`: passed
- `cargo test -p manifest request_host_api_manifest_and_grant_parse_with_request_names`: passed
- `cargo check -p manifest -p pod -p yoi`: passed
- `git diff --check 3eac7f8e..HEAD`: passed
- `cargo run -p yoi -- ticket doctor`: passed (`doctor: ok`)
- `nix build .#yoi --no-link`: passed
Orchestrator evidence checked:
- Worktree clean on `impl/00001KVMGAEJN-plugin-websocket-host-api` at `a766048f`
- r1-fix diff `ce62d235..HEAD` touches only `Cargo.lock`, `crates/pod/Cargo.toml`, `crates/pod/src/feature/plugin.rs`, and `package.nix`
- `git diff --name-status 3eac7f8e..HEAD -- .yoi/tickets/00001KVMGAEJN`: no output。
- `git diff --check 3eac7f8e..HEAD`: passed。
Next action:
- Route back to Reviewer for r2 verification of WebSocket open capacity/timeout/cancellation bounds and final acceptance。
---
<!-- event: review author: yoi-reviewer-00001KVMGAEJN-r1 at: 2026-06-21T13:20:53Z status: approve -->
## Review: approve
Verdict: `approve`
r1-fix diff inspected:
- Reviewed `ce62d235..a766048f`:
- `Cargo.lock`
- `crates/pod/Cargo.toml`
- `crates/pod/src/feature/plugin.rs`
- `package.nix`
- Also re-checked overall implementation diff `3eac7f8e..a766048f` against Ticket authority and previous r1 blocker。
Previous blocker verification:
- Resolved。
Evidence in `crates/pod/src/feature/plugin.rs`:
- `execute_plugin_websocket_open` now:
- validates URL / manifest / grant before dialing。
- fails closed if selected client cannot guarantee bounded open。
- reserves WebSocket capacity before `client.open(...)`
- commits reservation only after successful open。
- `PluginWebSocketHandles` / `PluginWebSocketHandleTable` now track reserved slots separately from active handles, so `PLUGIN_WEBSOCKET_MAX_OPEN_CONNECTIONS` rejects before network I/O。
- Reservation cleanup is RAII-style:
- failed open drops the reservation。
- failed commit also drops/releases reservation。
- `TungstenitePluginWebSocketClient` now uses `tokio-tungstenite` async connect under `tokio::time::timeout(...)`, replacing previous synchronous `tungstenite::client::connect_with_config(...)` path。
- Focused tests cover:
- max-open rejecting before network open。
- open failure releasing reservation。
- unbounded clients failing closed before dialing。
Overall Ticket fit:
- `host_api.websocket` remains separate from `host_api.request`
- Manifest `[[websocket]]` targets and `grants.websocket` remain independent from request targets/grants。
- Runtime connect requires both manifest target and grant before network I/O。
- URL matching covers scheme, host, port, and path prefix。
- WebSocket API is text-only, explicit receive only, and does not inject incoming messages into hidden context/history。
- Guest arbitrary handshake headers / embedded credentials remain rejected。
- Request API continues to reject WebSocket/SSE/persistent request attempts。
- Static inspection / `yoi plugin show` diagnostics distinguish request and WebSocket grant/target states。
- No Ticket mutation, Dashboard channel, scheduler/reconnect daemon, direct model Tool call, or Discord-specific overreach found in reviewed branch。
Blocking issues:
- None。
Non-blocking concerns:
- None requiring changes。
Reviewer validation:
- `cargo fmt --check`: passed
- `git diff --check 3eac7f8e..HEAD`: passed
- `cargo test -p pod websocket`: passed
- `cargo test -p manifest websocket`: passed
- `cargo test -p yoi render_show_distinguishes_request_grant_statuses_and_broad_targets`: passed
- `cargo test -p manifest request_host_api_manifest_and_grant_parse_with_request_names`: passed
- `cargo check -p manifest -p pod -p yoi`: passed
- `cargo run -p yoi -- ticket doctor`: passed
- `nix build .#yoi --no-link`: passed; disk-space failure did not reproduce。
Worktree status:
- Clean on `impl/00001KVMGAEJN-plugin-websocket-host-api` at `a766048f`
- Branch diff no longer includes `.yoi/tickets/00001KVMGAEJN/*`
---
<!-- event: implementation_report author: yoi-orchestrator at: 2026-06-21T13:26:05Z -->
## Implementation report
Reviewer r2 approvalを受けて、実装ブランチを Orchestrator worktree の `orchestration` branch に統合した。
統合:
- Merge commit: `354f1e10 merge: plugin websocket host api`
- Merged branch: `impl/00001KVMGAEJN-plugin-websocket-host-api`
- Implementation commits:
- `4c1b8c3d feat: add plugin websocket host api`
- `ce62d235 chore: keep plugin websocket branch code-only`
- `a766048f fix: bound plugin websocket open`
Orchestrator worktree validation:
- `cargo fmt --check`: passed
- `git diff --check HEAD^1..HEAD`: passed
- `cargo test -p pod websocket`: passed
- `cargo test -p manifest websocket`: passed
- `cargo test -p yoi render_show_distinguishes_request_grant_statuses_and_broad_targets`: passed
- `cargo test -p manifest request_host_api_manifest_and_grant_parse_with_request_names`: passed
- `cargo check -p manifest -p pod -p yoi`: passed
- `cargo run -p yoi -- ticket doctor`: passed (`doctor: ok`)
- `nix build .#yoi --no-link`: passed
Final Orchestrator worktree status after validation: clean on `orchestration` at `354f1e10`
---
<!-- event: state_changed author: yoi-orchestrator at: 2026-06-21T13:26:12Z from: inprogress to: done reason: implementation_merged_and_validated field: state -->
## State changed
Reviewer approval、Orchestrator worktree への統合、focused manifest/pod/yoi plugin tests、cargo check、Ticket doctor、Nix build が完了したため `done` に遷移する。
---
<!-- event: state_changed author: hare at: 2026-06-21T13:26:27Z from: done to: closed reason: closed field: state -->
## State changed
Ticket を closed にしました。
---
<!-- event: close author: hare at: 2026-06-21T13:26:27Z status: closed -->
## 完了
URL permission based Plugin WebSocket host API を実装し、Orchestrator worktree の `orchestration` branch に統合した。
主な成果:
- `host_api.websocket``host_api.request` とは別 capability として追加。
- Manifest `[[websocket]]` target declaration と enablement `grants.websocket` を追加し、request targets/grants とは独立させた。
- Static inspection / `yoi plugin show` が WebSocket requested/granted/missing/grant-only/broad diagnostics を request diagnostics とは別に表示するようにした。
- Runtime connect は manifest target と enablement grant の両方が URL を許可する場合のみ network I/O に進む。
- URL checks cover scheme (`ws`/`wss`), host, port, and path prefix。
- Local/private/loopback WebSocket targets は ambient ではなく、明示 declaration + grant が必要。
- Host-owned WebSocket handle API を追加: open, send_text / send-text, recv, close。
- Text-only / explicit bounded receive とし、binary receive は fail closed / unsupported。
- Guest arbitrary handshake headers / embedded credentials を reject。
- Request API は WebSocket/SSE/persistent attempts を引き続き reject。
- Open path は pre-dial capacity reservation と bounded async `tokio-tungstenite` open under `tokio::time::timeout` により max-open / timeout semantics を network I/O 前から enforce。
- Reservation cleanup on open failure / failed commit を追加。
- WIT resource `yoi:host/websocket@1.0.0` と docs を更新。
- `tungstenite`, `tokio-tungstenite`, `futures-util` dependencies と `Cargo.lock` / `package.nix` cargo hash を更新。
統合・検証:
- Merge commit: `354f1e10 merge: plugin websocket host api`
- Implementation commits: `4c1b8c3d`, `ce62d235`, `a766048f`
- Reviewer final verdict: approve
- Validation passed: `cargo fmt --check`, `git diff --check HEAD^1..HEAD`, `cargo test -p pod websocket`, `cargo test -p manifest websocket`, `cargo test -p yoi render_show_distinguishes_request_grant_statuses_and_broad_targets`, `cargo test -p manifest request_host_api_manifest_and_grant_parse_with_request_names`, `cargo check -p manifest -p pod -p yoi`, `cargo run -p yoi -- ticket doctor`, and `nix build .#yoi --no-link`
範囲外:
- Discord bridge 本体は実装していない。
- Reconnect/backoff/heartbeat scheduler、hidden context/history injection、Dashboard channel、Ticket mutation、direct model Tool invocation は追加していない。
- SecretRef-based credential injection は future follow-up。
---
<!-- event: implementation_report author: yoi-orchestrator at: 2026-06-21T13:27:28Z -->
## Implementation report
Post-close cleanup completed。
- Stopped child Pods and reclaimed scope:
- `yoi-coder-00001KVMGAEJN`
- `yoi-reviewer-00001KVMGAEJN-r1`
- Removed implementation worktree:
- `/home/hare/Projects/yoi/.worktree/00001KVMGAEJN-plugin-websocket-host-api`
- Deleted implementation branch:
- `impl/00001KVMGAEJN-plugin-websocket-host-api`
- Orchestrator worktree remains clean on `orchestration` at `e8103477`
Root/original workspace was not used for merge/validation/cleanup operations。
--- ---

View File

@ -1,8 +1,8 @@
--- ---
title: 'Pod protocol: in-flight LLM response reconnect snapshot should include unfinished blocks' title: 'Pod protocol: in-flight LLM response reconnect snapshot should include unfinished blocks'
state: 'inprogress' state: 'closed'
created_at: '2026-06-21T10:02:01Z' created_at: '2026-06-21T10:02:01Z'
updated_at: '2026-06-21T11:31:55Z' updated_at: '2026-06-21T12:00:31Z'
assignee: null assignee: null
readiness: 'implementation_ready' readiness: 'implementation_ready'
risk_flags: ['protocol', 'session-history', 'persistence', 'tui-reconnect', 'stream-state'] risk_flags: ['protocol', 'session-history', 'persistence', 'tui-reconnect', 'stream-state']

View File

@ -0,0 +1,20 @@
In-flight LLM response 中の reconnect / late attach snapshot に unfinished blocks を含める protocol/pod/TUI 実装を統合した。
主な成果:
- `Event::Snapshot` に typed `InFlightSnapshot` / `InFlightBlock` を追加。
- Pod 側に assistant text / thinking / tool-call args の in-flight accumulator を追加。
- Streaming callbacks が accumulator 更新と live delta broadcast を同じ stream path で行うようにした。
- Connect-time snapshot が in-flight stream state を含むようにした。
- Session-log mirror snapshot と in-flight snapshot、および finalized `AssistantItem` publish/clear の critical section を揃え、mirror-only assistant commit が snapshot/live boundary で消える race を防止した。
- Finalized assistant item が committed snapshot に含まれる場合は matching in-flight state を clear して duplicate を防ぐ。
- TUI snapshot restore が unfinished text/thinking/tool-call args blocks を seed し、後続 live deltas が同じ logical block に continuation されるようにした。
- Serialization/default compatibility、snapshot/live no-gap/no-duplicate、TUI continuation の focused regression tests を追加。
統合・検証:
- Merge commit: `b21638f5 merge: inflight reconnect snapshot`
- Implementation commits: `74aca6f6`, `061136d7`
- Reviewer final verdict: approve
- Validation passed: `cargo fmt --all --check`, `git diff --check HEAD^1..HEAD`, `cargo test -p protocol`, `cargo test -p pod --lib in_flight`, `cargo test -p pod session_log_and_in_flight_snapshot_prevents_mirror_only_assistant_gap`, `cargo test -p pod committed_assistant_snapshot_does_not_duplicate_in_flight_block`, `cargo test -p tui snapshot_in_flight_blocks_continue_with_live_deltas`, `cargo test -p tui`, `cargo check -p protocol -p pod -p tui`, and `cargo run -p yoi -- ticket doctor`
既知の無関係事項:
- Full `cargo test -p pod` は既存の prompt-resource assertion 2 件で失敗することが reviewer により確認済み。この Ticket の差分とは無関係。

View File

@ -308,3 +308,276 @@ Reviewer focus:
Orchestrator will wait for reviewer verdict before integration。 Orchestrator will wait for reviewer verdict before integration。
--- ---
<!-- event: review author: yoi-reviewer-00001KVMT2J25-r1 at: 2026-06-21T11:37:35Z status: request_changes -->
## Review: request changes
Verdict: `request_changes`
確認範囲:
- Ticket authority:
- `.yoi/tickets/00001KVMT2J25/item.md`
- `.yoi/tickets/00001KVMT2J25/thread.md`
- Diff `155e039e..74aca6f6`, including:
- `crates/protocol/src/lib.rs`
- `crates/pod/src/in_flight.rs`
- `crates/pod/src/controller.rs`
- `crates/pod/src/ipc/alerter.rs`
- `crates/pod/src/ipc/server.rs`
- `crates/pod/src/pod.rs`
- `crates/pod/src/segment_log_sink.rs`
- `crates/tui/src/app.rs`
- constructor/test updates in pod/TUI/discovery/spawn/dashboard-related test files
Blocking issue:
1. Snapshot/live boundary still has a gap for finalized assistant items committed between the session-log snapshot and the in-flight snapshot。
The implementation makes in-flight streaming deltas mostly gap-free by holding the in-flight mutex across subscribe/snapshot, but the full connection snapshot is not atomic across the committed session-log mirror and the new in-flight accumulator。
Relevant path:
- `crates/pod/src/ipc/server.rs:112` takes `handle.sink.subscribe_with_snapshot()` first。
- `crates/pod/src/ipc/server.rs:118-120` then subscribes/snapshots alerts + in-flight state。
- `crates/pod/src/pod.rs:182-191` persists an entry, clears matching in-flight state for `LogEntry::AssistantItem`, then publishes to the session-log sink。
- `crates/pod/src/segment_log_sink.rs:90-103` documents that `AssistantItem` is mirror-only and not live-broadcast。
- `crates/pod/src/segment_log_sink.rs:120-127` confirms only `SegmentStart`, `UserInput`, `SystemItem`, and `Invoke` are live-relevant。
Race:
1. New client connects。
2. Server snapshots session-log entries at N。
3. LLM response finalizes and appends `LogEntry::AssistantItem`
4. `append_entry` clears the matching in-flight text/thinking block。
5. `SegmentLogSink::publish` mirrors the `AssistantItem`, but does not broadcast it live。
6. Server snapshots in-flight state, now empty/cleared。
7. Client receives `Event::Snapshot` with no finalized assistant item in `entries`, no unfinished/finalized in-flight block, and no live committed assistant event forthcoming。
This violates the Ticket invariant that Snapshot and post-Snapshot live events must be gap-free and duplicate-free。
Required fix:
- Make the connection snapshot atomic enough across session-log mirror and in-flight state, or otherwise ensure committed assistant items cannot disappear in this boundary。
- Add an integrated regression test around session-log snapshot + in-flight snapshot + mirror-only assistant commit boundary。
- Ensure the fix still avoids persisting unfinished output as final history and still does not duplicate content when final AssistantItem is included in committed snapshot。
Non-blocking concerns:
- The suggested focused test name `subscribe_with_alerts_and_in_flight_snapshot_splits_snapshot_and_live_deltas` selected 0 tests; no matching test exists。Add/rename an integrated focused test so validation can target the boundary reliably。
- Minor protocol doc issue: `Greeting` doc comment appears displaced so “Pod self-description...” now sits immediately before `InFlightSnapshot` in `crates/protocol/src/lib.rs`。Not blocking but should be cleaned up if easy。
Reviewer validation:
- `git diff --check 155e039e..HEAD`: passed
- `cargo fmt --all --check`: passed
- `cargo test -p protocol`: passed, 40 tests
- `cargo test -p pod in_flight`: passed selected tests
- `cargo test -p pod subscribe_with_alerts_and_in_flight_snapshot_splits_snapshot_and_live_deltas`: selected 0 tests / no matching test
- `cargo test -p tui snapshot_in_flight_blocks_continue_with_live_deltas`: passed
- `cargo test -p tui`: passed, 367 tests
- `cargo check -p protocol -p pod -p tui`: passed
- `target/debug/yoi ticket doctor`: passed (`doctor: ok`)
- Full `cargo test -p pod`: failed only on the two known unrelated prompt-resource assertion tests。
Worktree status:
- Clean on `impl/00001KVMT2J25-inflight-snapshot`
---
<!-- event: implementation_report author: yoi-orchestrator at: 2026-06-21T11:53:00Z -->
## Implementation report
Coder r1-fix report received from `yoi-coder-00001KVMT2J25`.
New fix commit:
- `061136d7 fix: close in-flight snapshot commit race`
Branch commits now:
- `74aca6f6 fix: snapshot in-flight stream state`
- `061136d7 fix: close in-flight snapshot commit race`
Fix summary:
- `crates/pod/src/ipc/server.rs`
- Connect-time snapshot now holds the in-flight stream lock while taking the session-log mirror snapshot, preventing finalized mirror-only `AssistantItem` entries from falling between committed snapshot entries and in-flight snapshot state。
- `crates/pod/src/pod.rs`
- `append_entry` now clears matching in-flight assistant state and publishes finalized `AssistantItem` to the mirror under the same in-flight critical section。
- `crates/pod/src/in_flight.rs`
- Added `clear_for_committed_item_then` helper for atomic clear + publish ordering。
- Added integrated regression tests:
- `session_log_and_in_flight_snapshot_prevents_mirror_only_assistant_gap`
- `committed_assistant_snapshot_does_not_duplicate_in_flight_block`
- `crates/pod/src/ipc/alerter.rs`
- Removed earlier non-atomic combined alert/in-flight snapshot helper。
- `crates/protocol/src/lib.rs`
- Fixed displaced `Greeting` doc comment。
Coder validation reported:
- `cargo fmt --all --check`: passed
- `cargo test -p protocol`: passed
- `cargo test -p pod --lib in_flight`: passed
- `cargo test -p pod session_log_and_in_flight_snapshot_prevents_mirror_only_assistant_gap`: passed
- `cargo test -p pod committed_assistant_snapshot_does_not_duplicate_in_flight_block`: passed
- `cargo test -p tui snapshot_in_flight_blocks_continue_with_live_deltas`: passed
- `cargo test -p tui`: passed
- `cargo check -p protocol -p pod -p tui`: passed
- `git diff --check`: passed
- `target/debug/yoi ticket doctor`: passed
Known unrelated validation note:
- Full `cargo test -p pod` still fails only on the known unrelated prompt-resource assertions:
- `prompt::catalog::tests::pod_orchestration_guidance_section_renders_resource_body`
- `prompt::system::tests::pod_orchestration_guidance_is_included_for_pod_management_tools`
Orchestrator evidence checked:
- Implementation worktree clean on `impl/00001KVMT2J25-inflight-snapshot`
- HEAD is `061136d7`
- r1-fix diff `74aca6f6..HEAD` touches expected protocol/pod files: `crates/pod/src/in_flight.rs`, `crates/pod/src/ipc/alerter.rs`, `crates/pod/src/ipc/server.rs`, `crates/pod/src/pod.rs`, and `crates/protocol/src/lib.rs`
- `git diff --check 155e039e..HEAD` produced no diagnostics。
Next action:
- Route back to Reviewer for r2 verification of the commit-boundary race fix and final acceptance。
---
<!-- event: review author: yoi-reviewer-00001KVMT2J25-r1 at: 2026-06-21T11:56:44Z status: approve -->
## Review: approve
Verdict: `approve`
r2 review result:
- r1 blocker is resolved。
- The fix establishes a single critical section around session-log mirror snapshot and in-flight snapshot, and uses the same in-flight lock while clearing/publishing finalized `AssistantItem`s。
Relevant safe interleavings:
- If connect-time snapshot gets the in-flight lock first: session-log snapshot may not include finalized mirror-only `AssistantItem`, but unfinished/finished in-flight block is still present in `Event::Snapshot.in_flight`
- If commit gets the in-flight lock first: it clears in-flight and publishes to session-log mirror under that lock, so later session-log snapshot includes finalized `AssistantItem` and `in_flight` is empty。
- Since `AssistantItem` remains mirror-only for live clients, this avoids the previous “absent from both snapshot and live events” gap。
- Duplicate prevention is preserved by clearing matching in-flight state before publishing finalized assistant entry。
Files / diff inspected:
- r1-fix diff `74aca6f6..061136d7`:
- `crates/pod/src/in_flight.rs`
- `crates/pod/src/ipc/alerter.rs`
- `crates/pod/src/ipc/server.rs`
- `crates/pod/src/pod.rs`
- `crates/protocol/src/lib.rs`
- Overall scope/diff `155e039e..061136d7` against Ticket authority:
- `.yoi/tickets/00001KVMT2J25/item.md`
- `.yoi/tickets/00001KVMT2J25/thread.md`
Blocking issues:
- None。
Non-blocking concerns:
- None requiring changes. Integrated regression tests cover the r1 commit-boundary race and duplicate case。
Reviewer validation:
- `git diff --check 155e039e..HEAD`: passed
- `cargo fmt --all --check`: passed
- `cargo test -p protocol`: passed, 40 tests
- `cargo test -p pod --lib in_flight`: passed, 5 selected tests
- `cargo test -p pod session_log_and_in_flight_snapshot_prevents_mirror_only_assistant_gap`: passed
- `cargo test -p pod committed_assistant_snapshot_does_not_duplicate_in_flight_block`: passed
- `cargo test -p tui snapshot_in_flight_blocks_continue_with_live_deltas`: passed
- `cargo test -p tui`: passed, 367 tests
- `cargo check -p protocol -p pod -p tui`: passed
- `target/debug/yoi ticket doctor`: passed (`doctor: ok`)
- `cargo test -p pod`: failed only on the known unrelated prompt-resource assertions; summary 424 passed, 2 failed。
Worktree status:
- Clean on `impl/00001KVMT2J25-inflight-snapshot` at `061136d7`
---
<!-- event: implementation_report author: yoi-orchestrator at: 2026-06-21T11:59:39Z -->
## Implementation report
Reviewer r2 approvalを受けて、実装ブランチを Orchestrator worktree の `orchestration` branch に統合した。
統合:
- Merge commit: `b21638f5 merge: inflight reconnect snapshot`
- Merged branch: `impl/00001KVMT2J25-inflight-snapshot`
- Implementation commits:
- `74aca6f6 fix: snapshot in-flight stream state`
- `061136d7 fix: close in-flight snapshot commit race`
Orchestrator worktree validation:
- `cargo fmt --all --check`: passed
- `git diff --check HEAD^1..HEAD`: passed
- `cargo test -p protocol`: passed (`40 passed`)
- `cargo test -p pod --lib in_flight`: passed (`5 selected tests`)
- `cargo test -p pod session_log_and_in_flight_snapshot_prevents_mirror_only_assistant_gap`: passed
- `cargo test -p pod committed_assistant_snapshot_does_not_duplicate_in_flight_block`: passed
- `cargo test -p tui snapshot_in_flight_blocks_continue_with_live_deltas`: passed
- `cargo test -p tui`: passed (`367 passed`)
- `cargo check -p protocol -p pod -p tui`: passed
- `cargo run -p yoi -- ticket doctor`: passed (`doctor: ok`)
Known unrelated validation note:
- Full `cargo test -p pod` was not repeated in Orchestrator after merge because Reviewer confirmed it still fails only on pre-existing prompt-resource assertions unrelated to this diff。
Final Orchestrator worktree status after validation: clean on `orchestration` at `b21638f5`
---
<!-- event: state_changed author: yoi-orchestrator at: 2026-06-21T11:59:47Z from: inprogress to: done reason: implementation_merged_and_validated field: state -->
## State changed
Reviewer approval、Orchestrator worktree への統合、protocol/pod/TUI focused tests、cargo check、Ticket doctor が完了したため `done` に遷移する。
---
<!-- event: state_changed author: hare at: 2026-06-21T12:00:02Z from: done to: closed reason: closed field: state -->
## State changed
Ticket を closed にしました。
---
<!-- event: close author: hare at: 2026-06-21T12:00:02Z status: closed -->
## 完了
In-flight LLM response 中の reconnect / late attach snapshot に unfinished blocks を含める protocol/pod/TUI 実装を統合した。
主な成果:
- `Event::Snapshot` に typed `InFlightSnapshot` / `InFlightBlock` を追加。
- Pod 側に assistant text / thinking / tool-call args の in-flight accumulator を追加。
- Streaming callbacks が accumulator 更新と live delta broadcast を同じ stream path で行うようにした。
- Connect-time snapshot が in-flight stream state を含むようにした。
- Session-log mirror snapshot と in-flight snapshot、および finalized `AssistantItem` publish/clear の critical section を揃え、mirror-only assistant commit が snapshot/live boundary で消える race を防止した。
- Finalized assistant item が committed snapshot に含まれる場合は matching in-flight state を clear して duplicate を防ぐ。
- TUI snapshot restore が unfinished text/thinking/tool-call args blocks を seed し、後続 live deltas が同じ logical block に continuation されるようにした。
- Serialization/default compatibility、snapshot/live no-gap/no-duplicate、TUI continuation の focused regression tests を追加。
統合・検証:
- Merge commit: `b21638f5 merge: inflight reconnect snapshot`
- Implementation commits: `74aca6f6`, `061136d7`
- Reviewer final verdict: approve
- Validation passed: `cargo fmt --all --check`, `git diff --check HEAD^1..HEAD`, `cargo test -p protocol`, `cargo test -p pod --lib in_flight`, `cargo test -p pod session_log_and_in_flight_snapshot_prevents_mirror_only_assistant_gap`, `cargo test -p pod committed_assistant_snapshot_does_not_duplicate_in_flight_block`, `cargo test -p tui snapshot_in_flight_blocks_continue_with_live_deltas`, `cargo test -p tui`, `cargo check -p protocol -p pod -p tui`, and `cargo run -p yoi -- ticket doctor`
既知の無関係事項:
- Full `cargo test -p pod` は既存の prompt-resource assertion 2 件で失敗することが reviewer により確認済み。この Ticket の差分とは無関係。
---
<!-- event: implementation_report author: yoi-orchestrator at: 2026-06-21T12:00:31Z -->
## Implementation report
Post-close cleanup completed。
- Stopped child Pods and reclaimed scope:
- `yoi-coder-00001KVMT2J25`
- `yoi-reviewer-00001KVMT2J25-r1`
- Removed implementation worktree:
- `/home/hare/Projects/yoi/.worktree/00001KVMT2J25-inflight-snapshot`
- Deleted implementation branch:
- `impl/00001KVMT2J25-inflight-snapshot`
- Orchestrator worktree remains clean on `orchestration` at `77b5276f`
Root/original workspace was not used for merge/validation/cleanup operations。
---

53
Cargo.lock generated
View File

@ -876,6 +876,12 @@ dependencies = [
"syn 2.0.117", "syn 2.0.117",
] ]
[[package]]
name = "data-encoding"
version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4ae5f15dda3c708c0ade84bfee31ccab44a3da4f88015ed22f63732abe300c8"
[[package]] [[package]]
name = "deadpool" name = "deadpool"
version = "0.12.3" version = "0.12.3"
@ -2879,6 +2885,7 @@ dependencies = [
"dotenv", "dotenv",
"fs4", "fs4",
"futures", "futures",
"futures-util",
"include_dir", "include_dir",
"libc", "libc",
"llm-worker", "llm-worker",
@ -2900,9 +2907,11 @@ dependencies = [
"thiserror 2.0.18", "thiserror 2.0.18",
"ticket", "ticket",
"tokio", "tokio",
"tokio-tungstenite",
"toml", "toml",
"tools", "tools",
"tracing", "tracing",
"tungstenite",
"uuid", "uuid",
"wasmi", "wasmi",
"wasmtime", "wasmtime",
@ -3903,6 +3912,17 @@ dependencies = [
"uuid", "uuid",
] ]
[[package]]
name = "sha1"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures 0.2.17",
"digest 0.10.7",
]
[[package]] [[package]]
name = "sha2" name = "sha2"
version = "0.10.9" version = "0.10.9"
@ -4453,6 +4473,20 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-tungstenite"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857"
dependencies = [
"futures-util",
"log",
"native-tls",
"tokio",
"tokio-native-tls",
"tungstenite",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.18" version = "0.7.18"
@ -4696,6 +4730,25 @@ dependencies = [
"uuid", "uuid",
] ]
[[package]]
name = "tungstenite"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442"
dependencies = [
"bytes",
"data-encoding",
"http",
"httparse",
"log",
"native-tls",
"rand 0.9.4",
"sha1",
"thiserror 2.0.18",
"url",
"utf-8",
]
[[package]] [[package]]
name = "type1-encoding-parser" name = "type1-encoding-parser"
version = "0.1.1" version = "0.1.1"

View File

@ -152,13 +152,18 @@ pub struct PluginGrantConfig {
pub permissions: Vec<PluginPermission>, pub permissions: Vec<PluginPermission>,
/// Bounded outbound request allowlist entries for `host_api.request`. /// Bounded outbound request allowlist entries for `host_api.request`.
pub request: Vec<PluginRequestGrant>, pub request: Vec<PluginRequestGrant>,
/// Bounded outbound WebSocket target allowlist entries for `host_api.websocket`.
pub websocket: Vec<PluginWebSocketGrant>,
/// Scoped filesystem allowlist entries for `host_api.fs`. /// Scoped filesystem allowlist entries for `host_api.fs`.
pub fs: Vec<PluginFsGrant>, pub fs: Vec<PluginFsGrant>,
} }
impl PluginGrantConfig { impl PluginGrantConfig {
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.permissions.is_empty() && self.request.is_empty() && self.fs.is_empty() self.permissions.is_empty()
&& self.request.is_empty()
&& self.websocket.is_empty()
&& self.fs.is_empty()
} }
pub fn binding_error( pub fn binding_error(
@ -261,6 +266,50 @@ impl PluginRequestGrant {
} }
} }
#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct PluginWebSocketGrant {
/// Exact URL scheme allowed by this WebSocket target: `wss` or `ws`; `*` is broad.
pub scheme: String,
/// Exact WebSocket host allowed by this target. `*` is broad and must be surfaced in diagnostics.
pub host: String,
/// Optional exact port. `None` means the scheme default or any explicit port for that host.
pub port: Option<u16>,
/// Optional path prefixes allowed for this target. Empty means any absolute path on the host.
pub path_prefixes: Vec<String>,
}
impl PluginWebSocketGrant {
pub fn label(&self) -> String {
let scheme = if self.scheme.trim().is_empty() {
"<no-scheme>"
} else {
self.scheme.as_str()
};
let host = if self.host.trim().is_empty() {
"<no-host>"
} else {
self.host.as_str()
};
let port = self.port.map(|port| format!(":{port}")).unwrap_or_default();
let paths = if self.path_prefixes.is_empty() {
"*".to_string()
} else {
self.path_prefixes.join(",")
};
let broad = if self.is_broad() {
" [broad-websocket]"
} else {
""
};
format!("{scheme}://{host}{port} {paths}{broad}")
}
pub fn is_broad(&self) -> bool {
self.scheme.trim() == "*" || self.host.trim() == "*" || self.path_prefixes.is_empty()
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(default, deny_unknown_fields)] #[serde(default, deny_unknown_fields)]
pub struct PluginFsGrant { pub struct PluginFsGrant {
@ -347,6 +396,8 @@ impl PluginPermission {
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum PluginHostApi { pub enum PluginHostApi {
Request, Request,
#[serde(rename = "websocket")]
WebSocket,
Fs, Fs,
} }
@ -354,6 +405,7 @@ impl fmt::Display for PluginHostApi {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
Self::Request => f.write_str("request"), Self::Request => f.write_str("request"),
Self::WebSocket => f.write_str("websocket"),
Self::Fs => f.write_str("fs"), Self::Fs => f.write_str("fs"),
} }
} }
@ -480,6 +532,10 @@ pub struct PluginPackageManifest {
/// enablement grants must explicitly approve matching targets. /// enablement grants must explicitly approve matching targets.
#[serde(default)] #[serde(default)]
pub request: Vec<PluginRequestGrant>, pub request: Vec<PluginRequestGrant>,
/// Manifest-declared URL targets for `host_api.websocket`. These are independent from
/// `host_api.request` targets and require independent enablement grants.
#[serde(default)]
pub websocket: Vec<PluginWebSocketGrant>,
} }
impl PluginPackageManifest { impl PluginPackageManifest {
@ -3190,6 +3246,7 @@ input_schema = { type = "object", properties = { query = { type = "string" } },
digest: Some(digest.clone()), digest: Some(digest.clone()),
permissions: vec![PluginPermission::surface(PluginSurface::Hook)], permissions: vec![PluginPermission::surface(PluginSurface::Hook)],
request: Vec::new(), request: Vec::new(),
websocket: Vec::new(),
fs: Vec::new(), fs: Vec::new(),
}; };
let resolution = resolve_enabled_plugins( let resolution = resolve_enabled_plugins(
@ -3217,6 +3274,7 @@ input_schema = { type = "object", properties = { query = { type = "string" } },
digest: Some(digest.clone()), digest: Some(digest.clone()),
permissions: vec![PluginPermission::surface(PluginSurface::Hook)], permissions: vec![PluginPermission::surface(PluginSurface::Hook)],
request: Vec::new(), request: Vec::new(),
websocket: Vec::new(),
fs: Vec::new(), fs: Vec::new(),
}, },
PluginGrantConfig { PluginGrantConfig {
@ -3225,6 +3283,7 @@ input_schema = { type = "object", properties = { query = { type = "string" } },
digest: Some(digest.clone()), digest: Some(digest.clone()),
permissions: vec![PluginPermission::surface(PluginSurface::Hook)], permissions: vec![PluginPermission::surface(PluginSurface::Hook)],
request: Vec::new(), request: Vec::new(),
websocket: Vec::new(),
fs: Vec::new(), fs: Vec::new(),
}, },
PluginGrantConfig { PluginGrantConfig {
@ -3233,6 +3292,7 @@ input_schema = { type = "object", properties = { query = { type = "string" } },
digest: Some("sha256:unrelated".to_string()), digest: Some("sha256:unrelated".to_string()),
permissions: vec![PluginPermission::surface(PluginSurface::Hook)], permissions: vec![PluginPermission::surface(PluginSurface::Hook)],
request: Vec::new(), request: Vec::new(),
websocket: Vec::new(),
fs: Vec::new(), fs: Vec::new(),
}, },
] { ] {
@ -3449,4 +3509,75 @@ kind = "ambient_shell"
fn write_u32(out: &mut Vec<u8>, value: u32) { fn write_u32(out: &mut Vec<u8>, value: u32) {
out.extend_from_slice(&value.to_le_bytes()); out.extend_from_slice(&value.to_le_bytes());
} }
#[test]
fn websocket_manifest_and_grants_parse_independently_from_request() {
let manifest: PluginPackageManifest = toml::from_str(
r#"
schema_version = 1
id = "project:example"
name = "example"
version = "1.0.0"
surfaces = ["tool"]
[runtime]
kind = "wasm"
entry = "plugin.wasm"
abi = "yoi-plugin-wasm-1"
[[permissions]]
kind = "host_api"
api = "request"
[[permissions]]
kind = "host_api"
api = "websocket"
[[request]]
scheme = "https"
host = "api.example.com"
methods = ["GET"]
path_prefixes = ["/v1"]
[[websocket]]
scheme = "wss"
host = "gateway.example.com"
path_prefixes = ["/gateway"]
"#,
)
.unwrap();
assert_eq!(manifest.request.len(), 1);
assert_eq!(manifest.websocket.len(), 1);
assert_eq!(
manifest.request[0].label(),
"https://api.example.com GET /v1"
);
assert_eq!(
manifest.websocket[0].label(),
"wss://gateway.example.com /gateway"
);
assert_eq!(
manifest.permissions[1],
PluginPermission::host_api(PluginHostApi::WebSocket)
);
let grants: PluginGrantConfig = toml::from_str(
r#"
[[request]]
scheme = "https"
host = "api.example.com"
methods = ["GET"]
path_prefixes = ["/v1"]
[[websocket]]
scheme = "wss"
host = "gateway.example.com"
path_prefixes = ["/gateway"]
"#,
)
.unwrap();
assert_eq!(grants.request.len(), 1);
assert_eq!(grants.websocket.len(), 1);
assert!(!grants.is_empty());
}
} }

View File

@ -39,6 +39,9 @@ session-metrics = { workspace = true }
arc-swap = "1.9.1" arc-swap = "1.9.1"
wasmi = { version = "0.51.1", default-features = false, features = ["std", "extra-checks"] } wasmi = { version = "0.51.1", default-features = false, features = ["std", "extra-checks"] }
wasmtime = { version = "45.0.2", default-features = false, features = ["std", "runtime", "cranelift", "component-model"] } wasmtime = { version = "45.0.2", default-features = false, features = ["std", "runtime", "cranelift", "component-model"] }
tungstenite = { version = "0.28.0", default-features = false, features = ["handshake", "native-tls", "url"] }
tokio-tungstenite = { version = "0.28.0", default-features = false, features = ["native-tls", "connect"] }
futures-util = { version = "0.3", features = ["sink"] }
[dev-dependencies] [dev-dependencies]
dotenv = "0.15.0" dotenv = "0.15.0"

View File

@ -14,6 +14,7 @@ use tracing::{debug, warn};
use crate::discovery::{PodDiscovery, list_pods_tool, restore_pod_tool, send_to_peer_pod_tool}; use crate::discovery::{PodDiscovery, list_pods_tool, restore_pod_tool, send_to_peer_pod_tool};
use crate::feature::FeatureRegistryBuilder; use crate::feature::FeatureRegistryBuilder;
use crate::in_flight::InFlightEvents;
use crate::ipc::alerter::Alerter; use crate::ipc::alerter::Alerter;
use crate::ipc::notify_buffer::NotifyBuffer; use crate::ipc::notify_buffer::NotifyBuffer;
use crate::ipc::server::SocketServer; use crate::ipc::server::SocketServer;
@ -47,6 +48,7 @@ pub struct PodHandle {
pub shared_state: Arc<PodSharedState>, pub shared_state: Arc<PodSharedState>,
pub runtime_dir: Arc<RuntimeDir>, pub runtime_dir: Arc<RuntimeDir>,
pub alerter: Alerter, pub alerter: Alerter,
pub in_flight: InFlightEvents,
/// Segment-log mirror + broadcast handle. The IPC server snapshots /// Segment-log mirror + broadcast handle. The IPC server snapshots
/// it on every new connection (Event::Snapshot) and forwards /// it on every new connection (Event::Snapshot) and forwards
/// subsequent commits (Event::Entry) on the receiver. /// subsequent commits (Event::Entry) on the receiver.
@ -159,6 +161,8 @@ impl PodController {
let (method_tx, method_rx) = mpsc::channel::<Method>(32); let (method_tx, method_rx) = mpsc::channel::<Method>(32);
let (event_tx, _) = broadcast::channel::<Event>(256); let (event_tx, _) = broadcast::channel::<Event>(256);
let alerter = Alerter::new(event_tx.clone()); let alerter = Alerter::new(event_tx.clone());
let in_flight = InFlightEvents::new(event_tx.clone());
pod.attach_in_flight_events(in_flight.clone());
// Runtime directory is created before tool registration because // Runtime directory is created before tool registration because
// the spawn-tool factories need its socket path, and before the // the spawn-tool factories need its socket path, and before the
@ -225,7 +229,7 @@ impl PodController {
pod.wire_history_persistence(); pod.wire_history_persistence();
// === 2. Worker event bridge wiring === // === 2. Worker event bridge wiring ===
wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter); wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter, &in_flight);
// === 3. Tool registration (builtin / memory / spawn-orchestration) === // === 3. Tool registration (builtin / memory / spawn-orchestration) ===
let fs_for_view = register_pod_tools( let fs_for_view = register_pod_tools(
@ -289,6 +293,7 @@ impl PodController {
shared_state: shared_state.clone(), shared_state: shared_state.clone(),
runtime_dir: runtime_dir.clone(), runtime_dir: runtime_dir.clone(),
alerter: alerter.clone(), alerter: alerter.clone(),
in_flight: in_flight.clone(),
sink: pod.sink(), sink: pod.sink(),
}; };
@ -333,6 +338,7 @@ fn wire_event_bridges_on_worker<C, St>(
pod: &mut Pod<C, St>, pod: &mut Pod<C, St>,
event_tx: &broadcast::Sender<Event>, event_tx: &broadcast::Sender<Event>,
alerter: &Alerter, alerter: &Alerter,
in_flight: &InFlightEvents,
) where ) where
C: LlmClient + Clone + 'static, C: LlmClient + Clone + 'static,
St: Store + PodMetadataStore + Clone + 'static, St: Store + PodMetadataStore + Clone + 'static,
@ -386,83 +392,66 @@ fn wire_event_bridges_on_worker<C, St>(
}); });
}); });
let tx = event_tx.clone(); let in_flight_text = in_flight.clone();
let activity = ai_activity.clone(); let activity = ai_activity.clone();
worker.on_text_block(move |block| { worker.on_text_block(move |block| {
let tx_d = tx.clone(); let block_id = in_flight_text.start_text_block();
let in_flight_d = in_flight_text.clone();
let activity_d = activity.clone(); let activity_d = activity.clone();
block.on_delta(move |text| { block.on_delta(move |text| {
activity_d.fetch_add(1, Ordering::SeqCst); activity_d.fetch_add(1, Ordering::SeqCst);
let _ = tx_d.send(Event::TextDelta { in_flight_d.text_delta(block_id, text.to_owned());
text: text.to_owned(),
});
}); });
let tx_s = tx.clone(); let in_flight_s = in_flight_text.clone();
let activity_s = activity.clone(); let activity_s = activity.clone();
block.on_stop(move |text| { block.on_stop(move |text| {
if !text.is_empty() { if !text.is_empty() {
activity_s.fetch_add(1, Ordering::SeqCst); activity_s.fetch_add(1, Ordering::SeqCst);
} }
let _ = tx_s.send(Event::TextDone { in_flight_s.text_done(block_id, text.to_owned());
text: text.to_owned(),
});
}); });
}); });
let tx = event_tx.clone(); let in_flight_thinking = in_flight.clone();
let activity = ai_activity.clone(); let activity = ai_activity.clone();
worker.on_thinking_block(move |block| { worker.on_thinking_block(move |block| {
// Start fires unconditionally so the TUI can show "Thinking..." // Start fires unconditionally so the TUI can show "Thinking..."
// even when the provider doesn't emit plaintext deltas. // even when the provider doesn't emit plaintext deltas.
activity.fetch_add(1, Ordering::SeqCst); activity.fetch_add(1, Ordering::SeqCst);
let _ = tx.send(Event::ThinkingStart); let block_id = in_flight_thinking.thinking_start();
let tx_d = tx.clone(); let in_flight_d = in_flight_thinking.clone();
let activity_d = activity.clone(); let activity_d = activity.clone();
block.on_delta(move |text| { block.on_delta(move |text| {
activity_d.fetch_add(1, Ordering::SeqCst); activity_d.fetch_add(1, Ordering::SeqCst);
let _ = tx_d.send(Event::ThinkingDelta { in_flight_d.thinking_delta(block_id, text.to_owned());
text: text.to_owned(),
});
}); });
let tx_s = tx.clone(); let in_flight_s = in_flight_thinking.clone();
let activity_s = activity.clone(); let activity_s = activity.clone();
block.on_stop(move |text| { block.on_stop(move |text| {
if !text.is_empty() { if !text.is_empty() {
activity_s.fetch_add(1, Ordering::SeqCst); activity_s.fetch_add(1, Ordering::SeqCst);
} }
let _ = tx_s.send(Event::ThinkingDone { in_flight_s.thinking_done(block_id, text.to_owned());
text: text.to_owned(),
});
}); });
}); });
let tx = event_tx.clone(); let in_flight_tool = in_flight.clone();
let activity = ai_activity.clone(); let activity = ai_activity.clone();
worker.on_tool_use_block(move |start, block| { worker.on_tool_use_block(move |start, block| {
activity.fetch_add(1, Ordering::SeqCst); activity.fetch_add(1, Ordering::SeqCst);
let _ = tx.send(Event::ToolCallStart { let block_id = in_flight_tool.tool_call_start(start.id.clone(), start.name.clone());
id: start.id.clone(),
name: start.name.clone(),
});
let id_for_delta = start.id.clone(); let id_for_delta = start.id.clone();
let tx_d = tx.clone(); let in_flight_d = in_flight_tool.clone();
let activity_d = activity.clone(); let activity_d = activity.clone();
block.on_delta(move |json| { block.on_delta(move |json| {
activity_d.fetch_add(1, Ordering::SeqCst); activity_d.fetch_add(1, Ordering::SeqCst);
let _ = tx_d.send(Event::ToolCallArgsDelta { in_flight_d.tool_call_args_delta(block_id, id_for_delta.clone(), json.to_owned());
id: id_for_delta.clone(),
json: json.to_owned(),
});
}); });
let tx_s = tx.clone(); let in_flight_s = in_flight_tool.clone();
let activity_s = activity.clone(); let activity_s = activity.clone();
block.on_stop(move |call| { block.on_stop(move |call| {
activity_s.fetch_add(1, Ordering::SeqCst); activity_s.fetch_add(1, Ordering::SeqCst);
let _ = tx_s.send(Event::ToolCallDone { in_flight_s.tool_call_done(block_id, call.id.clone(), call.input.to_string());
id: call.id.clone(),
name: call.name.clone(),
arguments: call.input.to_string(),
});
}); });
}); });
@ -1535,6 +1524,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.ok()?; .ok()?;

View File

@ -1463,6 +1463,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();
@ -1494,6 +1495,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();
@ -1579,6 +1581,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();
@ -1601,6 +1604,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();
@ -1700,6 +1704,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Paused, status: PodStatus::Paused,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();
@ -1748,6 +1753,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await; .await;
}); });

File diff suppressed because it is too large Load Diff

477
crates/pod/src/in_flight.rs Normal file
View File

@ -0,0 +1,477 @@
use std::sync::{Arc, Mutex, MutexGuard};
use protocol::{Event, InFlightBlock, InFlightSnapshot, InFlightToolCallState};
use session_store::{LoggedContentPart, LoggedItem};
use tokio::sync::broadcast;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct InFlightBlockId(u64);
#[derive(Debug, Clone)]
pub struct InFlightEvents {
inner: Arc<Mutex<InFlightInner>>,
event_tx: broadcast::Sender<Event>,
}
#[derive(Debug)]
pub(crate) struct InFlightInner {
next_block_id: u64,
blocks: Vec<TrackedBlock>,
}
#[derive(Debug, Clone)]
enum TrackedBlock {
Text {
block_id: InFlightBlockId,
text: String,
finished: bool,
},
Thinking {
block_id: InFlightBlockId,
text: String,
finished: bool,
},
ToolCall {
block_id: InFlightBlockId,
id: String,
name: String,
args: String,
state: InFlightToolCallState,
},
}
impl InFlightEvents {
pub(crate) fn new(event_tx: broadcast::Sender<Event>) -> Self {
Self {
inner: Arc::new(Mutex::new(InFlightInner {
next_block_id: 1,
blocks: Vec::new(),
})),
event_tx,
}
}
pub(crate) fn snapshot_guard(&self) -> MutexGuard<'_, InFlightInner> {
self.inner.lock().expect("in-flight event mutex poisoned")
}
pub(crate) fn start_text_block(&self) -> InFlightBlockId {
let mut inner = self.lock();
let block_id = inner.next_id();
inner.blocks.push(TrackedBlock::Text {
block_id,
text: String::new(),
finished: false,
});
block_id
}
pub(crate) fn text_delta(&self, block_id: InFlightBlockId, text: String) {
let mut inner = self.lock();
if let Some(TrackedBlock::Text {
text: current,
finished,
..
}) = inner.find_block_mut(block_id)
{
current.push_str(&text);
*finished = false;
}
let _ = self.event_tx.send(Event::TextDelta { text });
}
pub(crate) fn text_done(&self, block_id: InFlightBlockId, text: String) {
let mut inner = self.lock();
if let Some(TrackedBlock::Text {
text: current,
finished,
..
}) = inner.find_block_mut(block_id)
{
if current.is_empty() {
*current = text.clone();
}
*finished = true;
}
let _ = self.event_tx.send(Event::TextDone { text });
}
pub(crate) fn thinking_start(&self) -> InFlightBlockId {
let mut inner = self.lock();
let block_id = inner.next_id();
inner.blocks.push(TrackedBlock::Thinking {
block_id,
text: String::new(),
finished: false,
});
let _ = self.event_tx.send(Event::ThinkingStart);
block_id
}
pub(crate) fn thinking_delta(&self, block_id: InFlightBlockId, text: String) {
let mut inner = self.lock();
if let Some(TrackedBlock::Thinking {
text: current,
finished,
..
}) = inner.find_block_mut(block_id)
{
current.push_str(&text);
*finished = false;
}
let _ = self.event_tx.send(Event::ThinkingDelta { text });
}
pub(crate) fn thinking_done(&self, block_id: InFlightBlockId, text: String) {
let mut inner = self.lock();
if let Some(TrackedBlock::Thinking {
text: current,
finished,
..
}) = inner.find_block_mut(block_id)
{
if current.is_empty() {
*current = text.clone();
}
*finished = true;
}
let _ = self.event_tx.send(Event::ThinkingDone { text });
}
pub(crate) fn tool_call_start(&self, id: String, name: String) -> InFlightBlockId {
let mut inner = self.lock();
let block_id = inner.next_id();
inner.blocks.push(TrackedBlock::ToolCall {
block_id,
id: id.clone(),
name: name.clone(),
args: String::new(),
state: InFlightToolCallState::Pending,
});
let _ = self.event_tx.send(Event::ToolCallStart { id, name });
block_id
}
pub(crate) fn tool_call_args_delta(
&self,
block_id: InFlightBlockId,
id: String,
delta: String,
) {
let mut inner = self.lock();
if let Some(TrackedBlock::ToolCall { args, state, .. }) = inner.find_block_mut(block_id) {
args.push_str(&delta);
*state = InFlightToolCallState::StreamingArgs;
}
let _ = self
.event_tx
.send(Event::ToolCallArgsDelta { id, json: delta });
}
pub(crate) fn tool_call_done(&self, block_id: InFlightBlockId, id: String, args: String) {
let mut inner = self.lock();
let mut name = String::new();
if let Some(TrackedBlock::ToolCall {
name: current_name,
args: current,
state,
..
}) = inner.find_block_mut(block_id)
{
name = current_name.clone();
if current.is_empty() {
*current = args.clone();
}
*state = InFlightToolCallState::Done;
}
let _ = self.event_tx.send(Event::ToolCallDone {
id,
name,
arguments: args,
});
}
pub(crate) fn clear_for_committed_item_then<R>(
&self,
item: &LoggedItem,
f: impl FnOnce() -> R,
) -> R {
let mut inner = self.lock();
inner.clear_for_committed_item(item);
f()
}
fn lock(&self) -> MutexGuard<'_, InFlightInner> {
self.inner.lock().expect("in-flight event mutex poisoned")
}
}
impl InFlightInner {
fn next_id(&mut self) -> InFlightBlockId {
let id = InFlightBlockId(self.next_block_id);
self.next_block_id = self.next_block_id.saturating_add(1);
id
}
fn find_block_mut(&mut self, block_id: InFlightBlockId) -> Option<&mut TrackedBlock> {
self.blocks
.iter_mut()
.find(|block| block.block_id() == block_id)
}
fn clear_for_committed_item(&mut self, item: &LoggedItem) {
match item {
LoggedItem::Message { role, content }
if matches!(role, session_store::LoggedRole::Assistant) =>
{
let text = content
.iter()
.filter_map(|part| match part {
LoggedContentPart::Text { text } => Some(text.as_str()),
LoggedContentPart::Refusal { refusal } => Some(refusal.as_str()),
})
.collect::<String>();
if !text.is_empty() {
self.remove_first_text_matching(&text);
}
}
LoggedItem::Reasoning { text, .. } => {
self.remove_first_thinking_matching(text);
}
LoggedItem::ToolCall { call_id, .. } => {
self.remove_tool_call(call_id);
}
_ => {}
}
}
fn snapshot(&self) -> InFlightSnapshot {
InFlightSnapshot {
blocks: self
.blocks
.iter()
.filter_map(TrackedBlock::to_snapshot_block)
.collect(),
}
}
fn remove_first_text_matching(&mut self, committed: &str) {
if let Some(index) = self.blocks.iter().position(|block| match block {
TrackedBlock::Text { text, .. } => text == committed,
_ => false,
}) {
self.blocks.remove(index);
}
}
fn remove_first_thinking_matching(&mut self, committed: &str) {
if let Some(index) = self.blocks.iter().position(|block| match block {
TrackedBlock::Thinking { text, .. } => text == committed,
_ => false,
}) {
self.blocks.remove(index);
}
}
fn remove_tool_call(&mut self, call_id: &str) {
if let Some(index) = self.blocks.iter().position(|block| match block {
TrackedBlock::ToolCall { id, .. } => id == call_id,
_ => false,
}) {
self.blocks.remove(index);
}
}
}
impl TrackedBlock {
fn block_id(&self) -> InFlightBlockId {
match self {
TrackedBlock::Text { block_id, .. }
| TrackedBlock::Thinking { block_id, .. }
| TrackedBlock::ToolCall { block_id, .. } => *block_id,
}
}
fn to_snapshot_block(&self) -> Option<InFlightBlock> {
match self {
TrackedBlock::Text { text, finished, .. } => {
if text.is_empty() {
None
} else {
Some(InFlightBlock::Text {
text: text.clone(),
finished: *finished,
})
}
}
TrackedBlock::Thinking { text, finished, .. } => Some(InFlightBlock::Thinking {
text: text.clone(),
finished: *finished,
}),
TrackedBlock::ToolCall {
id,
name,
args,
state,
..
} => Some(InFlightBlock::ToolCall {
id: id.clone(),
name: name.clone(),
args: args.clone(),
state: *state,
}),
}
}
}
pub(crate) fn snapshot_from_guard(guard: &MutexGuard<'_, InFlightInner>) -> InFlightSnapshot {
guard.snapshot()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn snapshot_boundary_does_not_duplicate_or_gap_delta_sent_after_subscribe() {
let (event_tx, _) = broadcast::channel(16);
let in_flight = InFlightEvents::new(event_tx.clone());
let block_id = in_flight.start_text_block();
in_flight.text_delta(block_id, "hel".into());
let guard = in_flight.snapshot_guard();
let mut rx = event_tx.subscribe();
let snapshot = snapshot_from_guard(&guard);
drop(guard);
in_flight.text_delta(block_id, "lo".into());
assert_eq!(
snapshot.blocks,
vec![InFlightBlock::Text {
text: "hel".into(),
finished: false,
}]
);
assert!(matches!(
rx.try_recv().unwrap(),
Event::TextDelta { text } if text == "lo"
));
assert!(rx.try_recv().is_err());
}
#[test]
fn session_log_and_in_flight_snapshot_prevents_mirror_only_assistant_gap() {
use std::sync::mpsc;
use std::thread;
use crate::segment_log_sink::SegmentLogSink;
use session_store::{LogEntry, LoggedRole};
let (event_tx, _) = broadcast::channel(16);
let sink = SegmentLogSink::new();
let in_flight = InFlightEvents::new(event_tx);
let block_id = in_flight.start_text_block();
in_flight.text_delta(block_id, "done".into());
in_flight.text_done(block_id, "done".into());
let assistant_item = LoggedItem::Message {
role: LoggedRole::Assistant,
content: vec![LoggedContentPart::Text {
text: "done".into(),
}],
};
let assistant_entry = LogEntry::AssistantItem {
ts: 1,
item: assistant_item.clone(),
};
let in_flight_guard = in_flight.snapshot_guard();
let in_flight_for_commit = in_flight.clone();
let sink_for_commit = sink.clone();
let (committed_tx, committed_rx) = mpsc::channel();
let commit_thread = thread::spawn(move || {
// This mirrors Pod::append_entry ordering: clear in-flight first,
// then publish the finalized AssistantItem. AssistantItem entries
// are mirror-only and are not delivered as live entry events.
in_flight_for_commit.clear_for_committed_item_then(&assistant_item, || {
sink_for_commit.publish(assistant_entry);
});
committed_tx.send(()).unwrap();
});
let (entries_snapshot, mut entry_rx) = sink.subscribe_with_snapshot();
let in_flight_snapshot = snapshot_from_guard(&in_flight_guard);
drop(in_flight_guard);
committed_rx.recv().unwrap();
commit_thread.join().unwrap();
assert!(entries_snapshot.is_empty());
assert!(matches!(
in_flight_snapshot.blocks.as_slice(),
[InFlightBlock::Text { text, finished: true }] if text == "done"
));
assert!(entry_rx.try_recv().is_err());
let post_commit_guard = in_flight.snapshot_guard();
assert!(snapshot_from_guard(&post_commit_guard).is_empty());
}
#[test]
fn committed_assistant_snapshot_does_not_duplicate_in_flight_block() {
use crate::segment_log_sink::SegmentLogSink;
use session_store::{LogEntry, LoggedRole};
let (event_tx, _) = broadcast::channel(16);
let sink = SegmentLogSink::new();
let in_flight = InFlightEvents::new(event_tx);
let block_id = in_flight.start_text_block();
in_flight.text_delta(block_id, "done".into());
in_flight.text_done(block_id, "done".into());
let assistant_item = LoggedItem::Message {
role: LoggedRole::Assistant,
content: vec![LoggedContentPart::Text {
text: "done".into(),
}],
};
let assistant_entry = LogEntry::AssistantItem {
ts: 1,
item: assistant_item.clone(),
};
in_flight.clear_for_committed_item_then(&assistant_item, || {
sink.publish(assistant_entry);
});
let in_flight_guard = in_flight.snapshot_guard();
let (entries_snapshot, _entry_rx) = sink.subscribe_with_snapshot();
let in_flight_snapshot = snapshot_from_guard(&in_flight_guard);
assert!(matches!(
entries_snapshot.as_slice(),
[LogEntry::AssistantItem { item, .. }] if item == &assistant_item
));
assert!(in_flight_snapshot.is_empty());
}
#[test]
fn committed_item_clears_matching_in_flight_block() {
let (event_tx, _) = broadcast::channel(16);
let in_flight = InFlightEvents::new(event_tx);
let block_id = in_flight.start_text_block();
in_flight.text_delta(block_id, "done".into());
in_flight.clear_for_committed_item_then(
&LoggedItem::Message {
role: session_store::LoggedRole::Assistant,
content: vec![LoggedContentPart::Text {
text: "done".into(),
}],
},
|| (),
);
let guard = in_flight.snapshot_guard();
assert!(snapshot_from_guard(&guard).is_empty());
}
}

View File

@ -7,6 +7,7 @@ use tokio::net::UnixListener;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use crate::controller::PodHandle; use crate::controller::PodHandle;
use crate::in_flight::snapshot_from_guard;
use protocol::{Event, Method}; use protocol::{Event, Method};
/// Unix socket server for Pod Protocol. /// Unix socket server for Pod Protocol.
@ -104,18 +105,22 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
let mut reader = JsonLineReader::new(reader); let mut reader = JsonLineReader::new(reader);
let mut writer = JsonLineWriter::new(writer); let mut writer = JsonLineWriter::new(writer);
// Atomically subscribe to the session-log mirror first. The // Hold the in-flight stream lock while taking the session-log mirror
// returned (snapshot, rx) pair partitions the entry timeline: // snapshot. `LogEntry::AssistantItem` is mirror-only for live clients,
// entries committed before this call appear in `entries`, every // so a finalized assistant block must be observed either as an already
// entry after lands on `entry_rx`. Doing this before the alert // committed entry or as the still-present in-flight block. This lock
// snapshot keeps both ordering pairs internally consistent. // order matches `append_entry` (in-flight clear before sink publish) and
let (entries_snapshot, mut entry_rx) = handle.sink.subscribe_with_snapshot(); // keeps the snapshot/live boundary gap-free.
let (entries_snapshot, mut entry_rx, alert_snapshot, mut rx, in_flight) = {
let in_flight_guard = handle.in_flight.snapshot_guard();
let (entries_snapshot, entry_rx) = handle.sink.subscribe_with_snapshot();
// Atomically subscribe and snapshot buffered alerts so that // Atomically subscribe and snapshot buffered alerts so that warnings
// warnings emitted before this client connected are replayed // emitted before this client connected are replayed exactly once.
// exactly once — they appear in the snapshot, and any alert let (alert_snapshot, rx) = handle.alerter.subscribe_with_snapshot();
// arriving afterwards reaches us through `rx`. let in_flight = snapshot_from_guard(&in_flight_guard);
let (alert_snapshot, mut rx) = handle.alerter.subscribe_with_snapshot(); (entries_snapshot, entry_rx, alert_snapshot, rx, in_flight)
};
for alert in alert_snapshot { for alert in alert_snapshot {
if writer.write(&Event::Alert(alert)).await.is_err() { if writer.write(&Event::Alert(alert)).await.is_err() {
return; return;
@ -131,6 +136,7 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
.collect(), .collect(),
greeting: handle.shared_state.greeting.clone(), greeting: handle.shared_state.greeting.clone(),
status: handle.shared_state.get_status(), status: handle.shared_state.get_status(),
in_flight,
}; };
if writer.write(&snapshot_event).await.is_err() { if writer.write(&snapshot_event).await.is_err() {
return; return;

View File

@ -6,6 +6,7 @@ pub mod entrypoint;
pub mod feature; pub mod feature;
pub mod fs_view; pub mod fs_view;
pub mod hook; pub mod hook;
pub(crate) mod in_flight;
pub mod ipc; pub mod ipc;
pub mod prompt; pub mod prompt;
pub mod runtime; pub mod runtime;

View File

@ -35,6 +35,7 @@ use crate::hook::{
Hook, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd, PostToolCall, PreLlmRequest, Hook, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd, PostToolCall, PreLlmRequest,
PreToolCall, PreToolCall,
}; };
use crate::in_flight::InFlightEvents;
use crate::ipc::alerter::Alerter; use crate::ipc::alerter::Alerter;
use crate::ipc::interceptor::PodInterceptor; use crate::ipc::interceptor::PodInterceptor;
use crate::ipc::notify_buffer::NotifyBuffer; use crate::ipc::notify_buffer::NotifyBuffer;
@ -167,6 +168,7 @@ pub struct LogWriterHandle<St: Clone> {
pub store: St, pub store: St,
pub state: Arc<SegmentState>, pub state: Arc<SegmentState>,
pub sink: SegmentLogSink, pub sink: SegmentLogSink,
pub in_flight: Option<InFlightEvents>,
} }
impl<St> LogWriterHandle<St> impl<St> LogWriterHandle<St>
@ -181,6 +183,15 @@ where
let loc = self.state.location(); let loc = self.state.location();
self.store.append(loc.session_id, loc.segment_id, &entry)?; self.store.append(loc.session_id, loc.segment_id, &entry)?;
self.state.increment_entries(); self.state.increment_entries();
if let Some(in_flight) = &self.in_flight {
if let LogEntry::AssistantItem { item, .. } = &entry {
let item_for_clear = item.clone();
in_flight.clear_for_committed_item_then(&item_for_clear, || {
self.sink.publish(entry);
});
return Ok(());
}
}
self.sink.publish(entry); self.sink.publish(entry);
Ok(()) Ok(())
} }
@ -296,6 +307,7 @@ pub struct Pod<C: LlmClient, St: Store> {
/// notifications, events sent here are NOT replayed to clients that /// notifications, events sent here are NOT replayed to clients that
/// connect after the fact — they are fire-and-forget broadcasts. /// connect after the fact — they are fire-and-forget broadcasts.
event_tx: Option<broadcast::Sender<Event>>, event_tx: Option<broadcast::Sender<Event>>,
in_flight: Option<InFlightEvents>,
/// Monotonic counter incremented by worker event bridges when an /// Monotonic counter incremented by worker event bridges when an
/// assistant-side execution artifact becomes visible to clients before /// assistant-side execution artifact becomes visible to clients before
/// it is necessarily committed to history (e.g. streaming text deltas). /// it is necessarily committed to history (e.g. streaming text deltas).
@ -449,6 +461,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
system_prompt_template: None, system_prompt_template: None,
alerter: self.alerter.clone(), alerter: self.alerter.clone(),
event_tx: self.event_tx.clone(), event_tx: self.event_tx.clone(),
in_flight: self.in_flight.clone(),
ai_activity_counter: self.ai_activity_counter.clone(), ai_activity_counter: self.ai_activity_counter.clone(),
pending_notifies: NotifyBuffer::new(), pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())), pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
@ -484,6 +497,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
store: self.store.clone(), store: self.store.clone(),
state: self.segment_state.clone(), state: self.segment_state.clone(),
sink: self.sink.clone(), sink: self.sink.clone(),
in_flight: self.in_flight.clone(),
} }
} }
@ -495,6 +509,10 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
self.log_writer = Some(writer); self.log_writer = Some(writer);
} }
pub fn attach_in_flight_events(&mut self, in_flight: InFlightEvents) {
self.in_flight = Some(in_flight);
}
/// Wire `Worker::on_history_append` to commit each appended item /// Wire `Worker::on_history_append` to commit each appended item
/// directly as a singular `LogEntry::AssistantItem` / `ToolResult` /// directly as a singular `LogEntry::AssistantItem` / `ToolResult`
/// through the writer. The controller calls this once per spawned /// through the writer. The controller calls this once per spawned
@ -633,6 +651,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
system_prompt_template: None, system_prompt_template: None,
alerter: None, alerter: None,
event_tx: None, event_tx: None,
in_flight: None,
ai_activity_counter: Arc::new(AtomicUsize::new(0)), ai_activity_counter: Arc::new(AtomicUsize::new(0)),
pending_notifies: NotifyBuffer::new(), pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())), pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
@ -3842,6 +3861,7 @@ where
system_prompt_template: common.system_prompt_template, system_prompt_template: common.system_prompt_template,
alerter: None, alerter: None,
event_tx: None, event_tx: None,
in_flight: None,
ai_activity_counter: Arc::new(AtomicUsize::new(0)), ai_activity_counter: Arc::new(AtomicUsize::new(0)),
pending_notifies: NotifyBuffer::new(), pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())), pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
@ -3951,6 +3971,7 @@ where
system_prompt_template: common.system_prompt_template, system_prompt_template: common.system_prompt_template,
alerter: None, alerter: None,
event_tx: None, event_tx: None,
in_flight: None,
ai_activity_counter: Arc::new(AtomicUsize::new(0)), ai_activity_counter: Arc::new(AtomicUsize::new(0)),
pending_notifies: NotifyBuffer::new(), pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())), pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
@ -4187,6 +4208,7 @@ where
system_prompt_template: None, system_prompt_template: None,
alerter: None, alerter: None,
event_tx: None, event_tx: None,
in_flight: None,
ai_activity_counter: Arc::new(AtomicUsize::new(0)), ai_activity_counter: Arc::new(AtomicUsize::new(0)),
pending_notifies: NotifyBuffer::new(), pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())), pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
@ -5379,6 +5401,7 @@ permission = "read"
ingresses: vec![], ingresses: vec![],
permissions: vec![], permissions: vec![],
request: vec![], request: vec![],
websocket: vec![],
}, },
enabled_surfaces: vec![manifest::plugin::PluginSurface::Hook], enabled_surfaces: vec![manifest::plugin::PluginSurface::Hook],
grants: manifest::plugin::PluginGrantConfig::default(), grants: manifest::plugin::PluginGrantConfig::default(),

View File

@ -515,6 +515,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
} }
} }

View File

@ -435,6 +435,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();
@ -457,6 +458,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();

View File

@ -129,6 +129,7 @@ fn empty_snapshot() -> Event {
context_tokens: 0, context_tokens: 0,
}, },
status: protocol::PodStatus::Idle, status: protocol::PodStatus::Idle,
in_flight: Default::default(),
} }
} }
@ -203,6 +204,7 @@ fn serve_history(listener: UnixListener, items: Vec<Item>) -> JoinHandle<()> {
context_tokens: 0, context_tokens: 0,
}, },
status: protocol::PodStatus::Idle, status: protocol::PodStatus::Idle,
in_flight: Default::default(),
}; };
let _ = writer.write(&event).await; let _ = writer.write(&event).await;
} }

View File

@ -91,6 +91,7 @@ fn empty_snapshot() -> Event {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
} }
} }

View File

@ -123,6 +123,7 @@ fn accept_one_method(listener: UnixListener) -> tokio::task::JoinHandle<Option<M
context_tokens: 0, context_tokens: 0,
}, },
status: protocol::PodStatus::Idle, status: protocol::PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.is_err() .is_err()

View File

@ -12,6 +12,10 @@ fn is_true(value: &bool) -> bool {
*value *value
} }
fn is_false(value: &bool) -> bool {
!*value
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Method (Client → Pod via Unix Socket) // Method (Client → Pod via Unix Socket)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -453,6 +457,10 @@ pub enum Event {
greeting: Greeting, greeting: Greeting,
#[serde(default)] #[serde(default)]
status: PodStatus, status: PodStatus,
/// Unfinished model output that has already streamed in the current
/// run but is not yet represented by committed snapshot entries.
#[serde(default, skip_serializing_if = "InFlightSnapshot::is_empty")]
in_flight: InFlightSnapshot,
}, },
/// Server-side segment log rotated to a fresh `SegmentStart`. /// Server-side segment log rotated to a fresh `SegmentStart`.
/// ///
@ -631,6 +639,62 @@ pub struct RewindSummary {
pub tool_side_effect_warning: bool, pub tool_side_effect_warning: bool,
} }
/// Unfinished model output included in `Event::Snapshot` for clients that
/// attach while an LLM response is still streaming.
///
/// These blocks are presentation state only: they are reconstructed from the
/// active Pod controller and must not be treated as committed assistant
/// history. Finalized assistant items continue to come from ordinary snapshot
/// entries.
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct InFlightSnapshot {
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub blocks: Vec<InFlightBlock>,
}
impl InFlightSnapshot {
pub fn is_empty(&self) -> bool {
self.blocks.is_empty()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum InFlightBlock {
Text {
text: String,
#[serde(default, skip_serializing_if = "is_false")]
finished: bool,
},
Thinking {
text: String,
#[serde(default, skip_serializing_if = "is_false")]
finished: bool,
},
ToolCall {
id: String,
name: String,
args: String,
#[serde(default, skip_serializing_if = "InFlightToolCallState::is_pending")]
state: InFlightToolCallState,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum InFlightToolCallState {
#[default]
Pending,
StreamingArgs,
Done,
}
impl InFlightToolCallState {
pub fn is_pending(&self) -> bool {
matches!(self, Self::Pending)
}
}
/// Pod self-description rendered by the TUI when a session starts empty. /// Pod self-description rendered by the TUI when a session starts empty.
/// ///
/// Built once in the Pod controller from the resolved manifest and /// Built once in the Pod controller from the resolved manifest and
@ -1129,6 +1193,7 @@ mod tests {
context_tokens: 42_000, context_tokens: 42_000,
}, },
status: PodStatus::Paused, status: PodStatus::Paused,
in_flight: InFlightSnapshot::default(),
}; };
let json = serde_json::to_string(&event).unwrap(); let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
@ -1142,6 +1207,62 @@ mod tests {
assert_eq!(parsed["data"]["status"], "paused"); assert_eq!(parsed["data"]["status"], "paused");
} }
#[test]
fn event_snapshot_in_flight_roundtrip_and_default() {
let inbound = r#"{"event":"snapshot","data":{"entries":[],"greeting":{"pod_name":"test","cwd":"/tmp","provider":"p","model":"m","scope_summary":"s","tools":[]},"status":"running"}}"#;
let decoded: Event = serde_json::from_str(inbound).unwrap();
match decoded {
Event::Snapshot { in_flight, .. } => assert!(in_flight.is_empty()),
other => panic!("expected Snapshot, got {other:?}"),
}
let event = Event::Snapshot {
entries: Vec::new(),
greeting: Greeting {
pod_name: "test".into(),
cwd: "/tmp".into(),
provider: "p".into(),
model: "m".into(),
scope_summary: "s".into(),
tools: Vec::new(),
context_window: 0,
context_tokens: 0,
},
status: PodStatus::Running,
in_flight: InFlightSnapshot {
blocks: vec![
InFlightBlock::Text {
text: "hel".into(),
finished: false,
},
InFlightBlock::Thinking {
text: "why".into(),
finished: true,
},
InFlightBlock::ToolCall {
id: "call_1".into(),
name: "Read".into(),
args: r#"{"file"#.into(),
state: InFlightToolCallState::StreamingArgs,
},
],
},
};
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["data"]["in_flight"]["blocks"][0]["text"], "hel");
assert_eq!(parsed["data"]["in_flight"]["blocks"][1]["finished"], true);
assert_eq!(
parsed["data"]["in_flight"]["blocks"][2]["state"],
"streaming_args"
);
match serde_json::from_str::<Event>(&json).unwrap() {
Event::Snapshot { in_flight, .. } => assert_eq!(in_flight.blocks.len(), 3),
other => panic!("expected Snapshot, got {other:?}"),
}
}
#[test] #[test]
fn event_segment_rotated_roundtrip() { fn event_segment_rotated_roundtrip() {
let event = Event::SegmentRotated { let event = Event::SegmentRotated {

View File

@ -3,8 +3,8 @@ use std::path::Path;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use protocol::{ use protocol::{
AlertLevel, AlertSource, CompletionEntry, CompletionKind, ErrorCode, Event, Method, PodStatus, AlertLevel, AlertSource, CompletionEntry, CompletionKind, ErrorCode, Event, InFlightBlock,
RewindTarget, RunResult, Segment, InFlightSnapshot, InFlightToolCallState, Method, PodStatus, RewindTarget, RunResult, Segment,
}; };
use crate::block::{ use crate::block::{
@ -1279,9 +1279,10 @@ impl App {
entries, entries,
greeting, greeting,
status, status,
in_flight,
} => { } => {
self.rewind_refresh_fence = false; self.rewind_refresh_fence = false;
self.restore_snapshot(&entries, greeting); self.restore_snapshot(&entries, greeting, in_flight);
self.set_pod_status(status); self.set_pod_status(status);
} }
Event::Status { status } => { Event::Status { status } => {
@ -1410,6 +1411,50 @@ impl App {
}); });
} }
fn apply_in_flight_snapshot(&mut self, snapshot: InFlightSnapshot) {
for block in snapshot.blocks {
match block {
InFlightBlock::Text { text, finished } => {
self.blocks.push(Block::AssistantText { text });
self.assistant_streaming = !finished;
}
InFlightBlock::Thinking { text, finished } => {
let state = if finished {
ThinkingState::Finished { elapsed_secs: None }
} else {
ThinkingState::Streaming {
started_at: Instant::now(),
}
};
self.blocks
.push(Block::Thinking(ThinkingBlock { text, state }));
}
InFlightBlock::ToolCall {
id,
name,
args,
state,
} => {
let (tool_state, arguments) = match state {
InFlightToolCallState::Pending => (ToolCallState::Pending, None),
InFlightToolCallState::StreamingArgs => (ToolCallState::Streaming, None),
InFlightToolCallState::Done => {
(ToolCallState::Executing, Some(args.clone()))
}
};
self.blocks.push(Block::ToolCall(ToolCallBlock {
id,
name,
args_stream: args,
arguments,
state: tool_state,
edit_snapshot: None,
}));
}
}
}
}
fn append_assistant_text(&mut self, text: &str) { fn append_assistant_text(&mut self, text: &str) {
if self.assistant_streaming { if self.assistant_streaming {
if let Some(Block::AssistantText { text: existing }) = self.blocks.last_mut() { if let Some(Block::AssistantText { text: existing }) = self.blocks.last_mut() {
@ -1913,11 +1958,17 @@ impl App {
/// LogEntry variant into the same blocks live events would have /// LogEntry variant into the same blocks live events would have
/// produced. Followed by `Event::Entry` updates for anything /// produced. Followed by `Event::Entry` updates for anything
/// committed after the snapshot. /// committed after the snapshot.
fn restore_snapshot(&mut self, entries: &[serde_json::Value], greeting: protocol::Greeting) { fn restore_snapshot(
&mut self,
entries: &[serde_json::Value],
greeting: protocol::Greeting,
in_flight: InFlightSnapshot,
) {
self.greeting = Some(greeting.clone()); self.greeting = Some(greeting.clone());
self.context_window = greeting.context_window; self.context_window = greeting.context_window;
self.session_context_tokens = greeting.context_tokens; self.session_context_tokens = greeting.context_tokens;
self.restore_entries(entries, Some(greeting)); self.restore_entries(entries, Some(greeting));
self.apply_in_flight_snapshot(in_flight);
} }
/// Restore after a successful destructive rewind. The Pod's /// Restore after a successful destructive rewind. The Pod's
@ -3151,6 +3202,7 @@ mod completion_flow_tests {
greeting: test_greeting(), greeting: test_greeting(),
entries: vec![session_start_value], entries: vec![session_start_value],
status: PodStatus::Running, status: PodStatus::Running,
in_flight: Default::default(),
}); });
assert!(matches!(app.pod_status, PodStatus::Running)); assert!(matches!(app.pod_status, PodStatus::Running));
@ -3161,6 +3213,54 @@ mod completion_flow_tests {
)); ));
} }
#[test]
fn snapshot_in_flight_blocks_continue_with_live_deltas() {
let mut app = App::new("test".into());
app.handle_pod_event(Event::Snapshot {
greeting: test_greeting(),
entries: Vec::new(),
status: PodStatus::Running,
in_flight: InFlightSnapshot {
blocks: vec![
InFlightBlock::Thinking {
text: "why".into(),
finished: false,
},
InFlightBlock::ToolCall {
id: "call_1".into(),
name: "Read".into(),
args: r#"{\"file"#.into(),
state: InFlightToolCallState::StreamingArgs,
},
InFlightBlock::Text {
text: "hel".into(),
finished: false,
},
],
},
});
app.handle_pod_event(Event::TextDelta { text: "lo".into() });
app.handle_pod_event(Event::ThinkingDelta { text: "?".into() });
app.handle_pod_event(Event::ToolCallArgsDelta {
id: "call_1".into(),
json: r#"\":\"src/lib.rs\"}"#.into(),
});
assert!(matches!(
app.blocks.iter().find(|block| matches!(block, Block::AssistantText { .. })),
Some(Block::AssistantText { text }) if text == "hello"
));
assert!(matches!(
app.blocks.iter().find(|block| matches!(block, Block::Thinking(_))),
Some(Block::Thinking(thinking)) if thinking.text == "why?"
));
assert!(matches!(
app.blocks.iter().find(|block| matches!(block, Block::ToolCall(_))),
Some(Block::ToolCall(call)) if call.args_stream == r#"{\"file\":\"src/lib.rs\"}"#
));
}
#[test] #[test]
fn live_system_item_workflow_appends_system_message_block() { fn live_system_item_workflow_appends_system_message_block() {
let mut app = App::new("test".into()); let mut app = App::new("test".into());
@ -3294,6 +3394,7 @@ mod completion_flow_tests {
entries: Vec::new(), entries: Vec::new(),
greeting, greeting,
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}); });
assert_eq!(app.context_window, 123_000); assert_eq!(app.context_window, 123_000);
@ -3492,6 +3593,7 @@ mod completion_flow_tests {
greeting: test_greeting(), greeting: test_greeting(),
entries: assistant_item_entries, entries: assistant_item_entries,
status: PodStatus::Running, status: PodStatus::Running,
in_flight: Default::default(),
}); });
let tasks = app.task_store.tasks(); let tasks = app.task_store.tasks();

View File

@ -1922,6 +1922,7 @@ mod tests {
greeting: test_greeting(), greeting: test_greeting(),
entries: vec![], entries: vec![],
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}); });
app.handle_pod_event(Event::RewindApplied { app.handle_pod_event(Event::RewindApplied {
entries: vec![], entries: vec![],
@ -1947,6 +1948,7 @@ mod tests {
greeting: test_greeting(), greeting: test_greeting(),
entries: vec![], entries: vec![],
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}); });
type_keys(&mut app, "draft"); type_keys(&mut app, "draft");

View File

@ -868,6 +868,7 @@ async fn ticket_queue_notification_sends_notify_when_socket_available() {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();
@ -908,6 +909,7 @@ async fn send_notify_only_can_deliver_weak_notification_without_auto_run() {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();

View File

@ -819,6 +819,7 @@ mod tests {
entries: vec![], entries: vec![],
greeting: test_greeting(), greeting: test_greeting(),
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}, },
]; ];

View File

@ -293,6 +293,7 @@ fn inspect_materialized_package(
digest: Some(materialized.package.digest.clone()), digest: Some(materialized.package.digest.clone()),
permissions: requested_permissions, permissions: requested_permissions,
request: Vec::new(), request: Vec::new(),
websocket: Vec::new(),
fs: Vec::new(), fs: Vec::new(),
}, },
config: None, config: None,
@ -802,6 +803,11 @@ fn render_item_human(item: &PluginInspectionItem) -> Result<String> {
" configured_request_grants: {}", " configured_request_grants: {}",
join_or_none(&item.configured_request_grants) join_or_none(&item.configured_request_grants)
)?; )?;
writeln!(
out,
" configured_websocket_grants: {}",
join_or_none(&item.configured_websocket_grants)
)?;
writeln!( writeln!(
out, out,
" configured_fs_grants: {}", " configured_fs_grants: {}",
@ -977,6 +983,7 @@ fn snapshot_from_resolution(
builder.enabled_surfaces = surface_strings(enablement.surfaces.iter().copied()); builder.enabled_surfaces = surface_strings(enablement.surfaces.iter().copied());
builder.configured_grants = permission_strings(&enablement.grants.permissions); builder.configured_grants = permission_strings(&enablement.grants.permissions);
builder.configured_request_grants = request_grant_strings(&enablement.grants.request); builder.configured_request_grants = request_grant_strings(&enablement.grants.request);
builder.configured_websocket_grants = websocket_grant_strings(&enablement.grants.websocket);
builder.configured_fs_grants = fs_grant_strings(&enablement.grants.fs); builder.configured_fs_grants = fs_grant_strings(&enablement.grants.fs);
if let Ok(identity) = SourceQualifiedPluginId::parse(&enablement.id) { if let Ok(identity) = SourceQualifiedPluginId::parse(&enablement.id) {
builder builder
@ -1070,6 +1077,7 @@ fn fill_resolved(builder: &mut ItemBuilder, resolved: &ResolvedPlugin) {
builder.requested_permissions = permission_strings(&resolved.manifest.permissions); builder.requested_permissions = permission_strings(&resolved.manifest.permissions);
builder.configured_grants = permission_strings(&resolved.grants.permissions); builder.configured_grants = permission_strings(&resolved.grants.permissions);
builder.configured_request_grants = request_grant_strings(&resolved.grants.request); builder.configured_request_grants = request_grant_strings(&resolved.grants.request);
builder.configured_websocket_grants = websocket_grant_strings(&resolved.grants.websocket);
builder.configured_fs_grants = fs_grant_strings(&resolved.grants.fs); builder.configured_fs_grants = fs_grant_strings(&resolved.grants.fs);
let record = ResolvedPluginRecord::from_resolved(resolved); let record = ResolvedPluginRecord::from_resolved(resolved);
@ -1185,6 +1193,13 @@ fn request_grant_strings(grants: &[manifest::plugin::PluginRequestGrant]) -> Vec
values values
} }
fn websocket_grant_strings(grants: &[manifest::plugin::PluginWebSocketGrant]) -> Vec<String> {
let mut values: Vec<_> = grants.iter().map(|grant| grant.label()).collect();
values.sort();
values.dedup();
values
}
fn fs_grant_strings(grants: &[manifest::plugin::PluginFsGrant]) -> Vec<String> { fn fs_grant_strings(grants: &[manifest::plugin::PluginFsGrant]) -> Vec<String> {
let mut values: Vec<_> = grants.iter().map(|grant| grant.label()).collect(); let mut values: Vec<_> = grants.iter().map(|grant| grant.label()).collect();
values.sort(); values.sort();
@ -1263,6 +1278,7 @@ struct PluginInspectionItem {
requested_permissions: Vec<String>, requested_permissions: Vec<String>,
configured_grants: Vec<String>, configured_grants: Vec<String>,
configured_request_grants: Vec<String>, configured_request_grants: Vec<String>,
configured_websocket_grants: Vec<String>,
configured_fs_grants: Vec<String>, configured_fs_grants: Vec<String>,
tools: Vec<ToolSummary>, tools: Vec<ToolSummary>,
static_runtime: Option<PluginStaticInspection>, static_runtime: Option<PluginStaticInspection>,
@ -1332,6 +1348,7 @@ struct ItemBuilder {
requested_permissions: Vec<String>, requested_permissions: Vec<String>,
configured_grants: Vec<String>, configured_grants: Vec<String>,
configured_request_grants: Vec<String>, configured_request_grants: Vec<String>,
configured_websocket_grants: Vec<String>,
configured_fs_grants: Vec<String>, configured_fs_grants: Vec<String>,
tools: Vec<ToolSummary>, tools: Vec<ToolSummary>,
static_runtime: Option<PluginStaticInspection>, static_runtime: Option<PluginStaticInspection>,
@ -1359,6 +1376,7 @@ impl ItemBuilder {
requested_permissions: Vec::new(), requested_permissions: Vec::new(),
configured_grants: Vec::new(), configured_grants: Vec::new(),
configured_request_grants: Vec::new(), configured_request_grants: Vec::new(),
configured_websocket_grants: Vec::new(),
configured_fs_grants: Vec::new(), configured_fs_grants: Vec::new(),
tools: Vec::new(), tools: Vec::new(),
static_runtime: None, static_runtime: None,
@ -1431,6 +1449,7 @@ impl ItemBuilder {
requested_permissions: self.requested_permissions, requested_permissions: self.requested_permissions,
configured_grants: self.configured_grants, configured_grants: self.configured_grants,
configured_request_grants: self.configured_request_grants, configured_request_grants: self.configured_request_grants,
configured_websocket_grants: self.configured_websocket_grants,
configured_fs_grants: self.configured_fs_grants, configured_fs_grants: self.configured_fs_grants,
tools: self.tools, tools: self.tools,
static_runtime: self.static_runtime, static_runtime: self.static_runtime,
@ -1523,9 +1542,10 @@ mod tests {
static_eligible: true, static_eligible: true,
declared_surfaces: vec!["tool".to_string()], declared_surfaces: vec!["tool".to_string()],
enabled_surfaces: vec!["tool".to_string()], enabled_surfaces: vec!["tool".to_string()],
requested_permissions: vec!["host_api.request".to_string()], requested_permissions: vec!["host_api.request".to_string(), "host_api.websocket".to_string()],
configured_grants: vec!["host_api.request".to_string()], configured_grants: vec!["host_api.request".to_string(), "host_api.websocket".to_string()],
configured_request_grants: vec!["*://* GET * [broad-request]".to_string()], configured_request_grants: vec!["*://* GET * [broad-request]".to_string()],
configured_websocket_grants: vec!["*://* * [broad-websocket]".to_string()],
configured_fs_grants: Vec::new(), configured_fs_grants: Vec::new(),
tools: Vec::new(), tools: Vec::new(),
static_runtime: Some(PluginStaticInspection { static_runtime: Some(PluginStaticInspection {
@ -1556,6 +1576,27 @@ mod tests {
.to_string(), .to_string(),
), ),
}, },
PluginPermissionEligibility {
permission: "host_api.websocket target wss://gateway.example.test /gateway"
.to_string(),
requested: true,
granted: false,
eligible: false,
diagnostic: Some(
"missing enabled WebSocket grant for manifest target".to_string(),
),
},
PluginPermissionEligibility {
permission: "host_api.websocket grant-only *://* * [broad-websocket]"
.to_string(),
requested: false,
granted: true,
eligible: false,
diagnostic: Some(
"enabled WebSocket grant has no matching manifest declaration; broad/arbitrary target"
.to_string(),
),
},
], ],
tools: Vec::new(), tools: Vec::new(),
services: Vec::new(), services: Vec::new(),
@ -1569,10 +1610,18 @@ mod tests {
json["configured_request_grants"][0], json["configured_request_grants"][0],
"*://* GET * [broad-request]" "*://* GET * [broad-request]"
); );
assert_eq!(
json["configured_websocket_grants"][0],
"*://* * [broad-websocket]"
);
let human = render_item_human(&item).unwrap(); let human = render_item_human(&item).unwrap();
assert!(human.contains("configured_websocket_grants: *://* * [broad-websocket]"));
assert!(human.contains("host_api.request target https://api.example.test")); assert!(human.contains("host_api.request target https://api.example.test"));
assert!(human.contains("requested=true granted=true eligible=true")); assert!(human.contains("requested=true granted=true eligible=true"));
assert!(human.contains("host_api.request grant *://*")); assert!(human.contains("host_api.request grant *://*"));
assert!(human.contains("host_api.websocket target wss://gateway.example.test"));
assert!(human.contains("host_api.websocket grant-only *://*"));
assert!(human.contains("missing enabled WebSocket grant"));
assert!(human.contains("broad/arbitrary")); assert!(human.contains("broad/arbitrary"));
} }
@ -1596,6 +1645,7 @@ mod tests {
PluginPermission::service("svc"), PluginPermission::service("svc"),
], ],
request: Vec::new(), request: Vec::new(),
websocket: Vec::new(),
fs: Vec::new(), fs: Vec::new(),
}, },
config: None, config: None,
@ -1650,6 +1700,7 @@ mod tests {
PluginPermission::tool("Echo"), PluginPermission::tool("Echo"),
], ],
request: Vec::new(), request: Vec::new(),
websocket: Vec::new(),
fs: Vec::new(), fs: Vec::new(),
}, },
config: None, config: None,
@ -1668,6 +1719,7 @@ mod tests {
PluginPermission::tool("Echo"), PluginPermission::tool("Echo"),
], ],
request: Vec::new(), request: Vec::new(),
websocket: Vec::new(),
fs: Vec::new(), fs: Vec::new(),
}, },
config: None, config: None,
@ -1786,6 +1838,7 @@ mod tests {
PluginPermission::tool("Echo"), PluginPermission::tool("Echo"),
], ],
request: Vec::new(), request: Vec::new(),
websocket: Vec::new(),
fs: Vec::new(), fs: Vec::new(),
}, },
config: None, config: None,
@ -2321,6 +2374,7 @@ lifecycle = "host-managed"
PluginPermission::tool("Echo"), PluginPermission::tool("Echo"),
], ],
request: Vec::new(), request: Vec::new(),
websocket: Vec::new(),
fs: Vec::new(), fs: Vec::new(),
}, },
config: None, config: None,
@ -2352,6 +2406,7 @@ lifecycle = "host-managed"
digest: Some(digest), digest: Some(digest),
permissions, permissions,
request: Vec::new(), request: Vec::new(),
websocket: Vec::new(),
fs: Vec::new(), fs: Vec::new(),
}, },
config: None, config: None,
@ -2380,6 +2435,7 @@ lifecycle = "host-managed"
digest: None, digest: None,
permissions, permissions,
request: Vec::new(), request: Vec::new(),
websocket: Vec::new(),
fs: Vec::new(), fs: Vec::new(),
}, },
config: None, config: None,

View File

@ -335,6 +335,43 @@ path_prefixes = ["/v1/"]
Yoi checks method, scheme, host, optional port, and path prefix against both the manifest declaration and enablement grant before any network I/O. `http://localhost`, loopback, private, and other local targets are never ambient; they require an explicit manifest request target and an explicit matching grant. The explicit request target is the declared URL authority; a granted DNS hostname may resolve to a loopback/private address without requiring a separate literal-IP grant, so reviewers should grant hostnames only when that resolution behavior is intended. Broad targets such as `host = "*"` are supported only as visibly broad request permissions in inspection/diagnostics. Embedded credentials, credential-like headers, oversize requests/responses, WebSocket URLs/upgrades, and SSE/event-stream requests are rejected. Yoi checks method, scheme, host, optional port, and path prefix against both the manifest declaration and enablement grant before any network I/O. `http://localhost`, loopback, private, and other local targets are never ambient; they require an explicit manifest request target and an explicit matching grant. The explicit request target is the declared URL authority; a granted DNS hostname may resolve to a loopback/private address without requiring a separate literal-IP grant, so reviewers should grant hostnames only when that resolution behavior is intended. Broad targets such as `host = "*"` are supported only as visibly broad request permissions in inspection/diagnostics. Embedded credentials, credential-like headers, oversize requests/responses, WebSocket URLs/upgrades, and SSE/event-stream requests are rejected.
## `websocket` host API
The `websocket` host API is a separate grant-gated capability named `host_api.websocket`, not an extension of `host_api.request`. It opens host-owned WebSocket connections only when both the package manifest and enablement config declare matching targets. Plugin code drives the lifecycle explicitly through `open`, `send-text`, `recv`, and `close`; incoming messages are returned only from bounded `recv` calls and are not injected into model context, history, Dashboard state, or Ticket state.
Example manifest shape:
```toml
permissions = [
{ kind = "surface", surface = "tool" },
{ kind = "tool", name = "gateway_step" },
{ kind = "host_api", api = "websocket" },
]
[[websocket]]
scheme = "wss"
host = "gateway.example.com"
path_prefixes = ["/gateway"]
```
Example enablement grant shape:
```toml
[plugins.enabled.grants]
permissions = [
{ kind = "surface", surface = "tool" },
{ kind = "tool", name = "gateway_step" },
{ kind = "host_api", api = "websocket" },
]
[[plugins.enabled.grants.websocket]]
scheme = "wss"
host = "gateway.example.com"
path_prefixes = ["/gateway"]
```
Yoi checks scheme (`ws`/`wss`), host, optional port, and path prefix against both declarations before opening the connection. Loopback/private/local targets are not ambient; they require explicit matching manifest and grant entries. Broad WebSocket targets such as `host = "*"` are reported as broad WebSocket diagnostics. v1 is text-only: `send-text` requires UTF-8, binary receive fails closed, guest-supplied handshake headers and embedded URL credentials are rejected, and SecretRef-based credential/header injection is future work. The host bounds open descriptors, text/message size, receive timeout, connection count, handle lifetime, and cleanup on close/instance stop/drop.
## `fs` host API ## `fs` host API
The `fs` host API is Plugin-scoped and grant-gated. Plugins do not inherit the Pod/workspace filesystem authority automatically. The `fs` host API is Plugin-scoped and grant-gated. Plugins do not inherit the Pod/workspace filesystem authority automatically.

View File

@ -43,7 +43,7 @@ rustPlatform.buildRustPackage rec {
filter = sourceFilter; filter = sourceFilter;
}; };
cargoHash = "sha256-RER/UXd74C2VhPHAeF36u6ruNBg0oLnR4YeQ/zLag88="; cargoHash = "sha256-cZxkmM42kbDp1Rv9gn4sCD5WIQLc0wCbjj4GbKjuA9Q=";
depsExtraArgs = { depsExtraArgs = {
# Older fetchCargoVendor utilities used crates.io's API download endpoint, # Older fetchCargoVendor utilities used crates.io's API download endpoint,

View File

@ -7,6 +7,16 @@ interface request {
request: func(request-json: string) -> string; request: func(request-json: string) -> string;
} }
/// Grant-bound host-owned WebSocket API. Authority requires a manifest `host_api.websocket`
/// target and an enablement grant; messages are delivered only by explicit bounded recv calls.
/// v1 supports text messages only and rejects guest-supplied handshake headers.
interface websocket {
open: func(request-json: string) -> string;
send-text: func(handle: u32, text: string) -> string;
recv: func(handle: u32, timeout-ms: u32) -> string;
close: func(handle: u32) -> string;
}
/// Grant-bound filesystem host API. No ambient WASI filesystem is exposed. /// Grant-bound filesystem host API. No ambient WASI filesystem is exposed.
interface fs { interface fs {
read: func(request-json: string) -> string; read: func(request-json: string) -> string;

View File

@ -2,6 +2,7 @@ package yoi:plugin@1.0.0;
world instance { world instance {
import yoi:host/request@1.0.0; import yoi:host/request@1.0.0;
import yoi:host/websocket@1.0.0;
import yoi:host/fs@1.0.0; import yoi:host/fs@1.0.0;
export start: func(config-json: string) -> string; export start: func(config-json: string) -> string;

View File

@ -2,6 +2,7 @@ package yoi:plugin@1.0.0;
world tool { world tool {
import yoi:host/request@1.0.0; import yoi:host/request@1.0.0;
import yoi:host/websocket@1.0.0;
import yoi:host/fs@1.0.0; import yoi:host/fs@1.0.0;
/// Execute a manifest-declared Tool. `input-json` is the normal Tool input /// Execute a manifest-declared Tool. `input-json` is the normal Tool input