yoi/crates/pod/src/ipc/server.rs

198 lines
8.1 KiB
Rust

use std::io;
use std::path::PathBuf;
use protocol::stream::{JsonLineReader, JsonLineWriter};
use tokio::net::UnixListener;
use tokio::task::JoinHandle;
use crate::controller::PodHandle;
use protocol::{Event, Method};
/// Unix socket server for Pod Protocol.
///
/// Listens on the Pod's runtime directory socket path.
/// Each client connection gets bidirectional JSONL:
/// - Client writes Method lines → forwarded to PodController
/// - Pod events → written as Event lines to all connected clients
pub struct SocketServer {
_accept_task: JoinHandle<()>,
path: PathBuf,
}
impl SocketServer {
/// Start listening on the PodHandle's socket path.
pub async fn start(handle: &PodHandle) -> Result<Self, io::Error> {
let path = handle.runtime_dir.socket_path();
// Remove stale socket file if it exists
let _ = tokio::fs::remove_file(&path).await;
let listener = UnixListener::bind(&path)?;
let handle = handle.clone();
let _accept_task = tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, _)) => {
let handle = handle.clone();
tokio::spawn(handle_connection(stream, handle));
}
Err(_) => break,
}
}
});
Ok(Self { _accept_task, path })
}
/// The socket file path.
pub fn path(&self) -> &std::path::Path {
&self.path
}
}
impl Drop for SocketServer {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.path);
}
}
async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
let (reader, writer) = stream.into_split();
let mut reader = JsonLineReader::new(reader);
let mut writer = JsonLineWriter::new(writer);
// Atomically subscribe and snapshot buffered alerts so that
// warnings emitted before this client connected are replayed
// exactly once — they appear in the snapshot, and any alert
// arriving afterwards reaches us through `rx`.
let (alert_snapshot, mut rx) = handle.alerter.subscribe_with_snapshot();
for alert in alert_snapshot {
if writer.write(&Event::Alert(alert)).await.is_err() {
return;
}
}
loop {
tokio::select! {
// Broadcast events → this client
event = rx.recv() => {
match event {
Ok(event) => {
if writer.write(&event).await.is_err() {
break;
}
}
Err(_) => break,
}
}
// Client methods → handle or forward to controller
method = reader.next::<Method>() => {
match method {
Ok(Some(Method::ListCompletions { kind, prefix })) => {
let entries = match kind {
protocol::CompletionKind::File => handle
.shared_state
.fs_view()
.map(|view| view.list_file_completions(&prefix))
.unwrap_or_default()
.into_iter()
.map(|c| protocol::CompletionEntry {
value: c.path,
is_dir: c.is_dir,
})
.collect(),
protocol::CompletionKind::Knowledge => handle
.shared_state
.list_knowledge_completions(&prefix)
.into_iter()
.map(|c| protocol::CompletionEntry {
value: c.slug,
is_dir: false,
})
.collect(),
protocol::CompletionKind::Workflow => handle
.shared_state
.list_workflow_completions(&prefix)
.into_iter()
.map(|c| protocol::CompletionEntry {
value: c.slug,
is_dir: false,
})
.collect(),
};
if writer
.write(&Event::Completions { kind, entries })
.await
.is_err()
{
break;
}
}
Ok(Some(Method::GetHistory)) => {
let items = handle.shared_state.history();
let segments_per_user = handle.shared_state.user_segments();
// Embed `segments` on user-message JSON values so
// the TUI can re-render typed atoms on restore.
// Alignment: segments are recorded only for
// submissions made during the live session, never
// for seed history loaded via `SessionStart.history`
// (post-compaction). The seed user_messages always
// come first in worker history, so the last
// `segments_per_user.len()` user_messages are the
// ones that map 1:1 to the segments list.
let total_user_msgs =
items.iter().filter(|i| i.is_user_message()).count();
let skip = total_user_msgs.saturating_sub(segments_per_user.len());
let mut user_idx = 0usize;
let values = items
.iter()
.map(|item| {
let mut value =
serde_json::to_value(item).expect("Item is Serialize");
if item.is_user_message() {
if user_idx >= skip {
let seg_idx = user_idx - skip;
if let Some(obj) = value.as_object_mut() {
let segs = serde_json::to_value(
&segments_per_user[seg_idx],
)
.expect("Segment is Serialize");
obj.insert("segments".into(), segs);
}
}
user_idx += 1;
}
value
})
.collect();
let greeting = handle.shared_state.greeting.clone();
let status = handle.shared_state.get_status();
if writer
.write(&Event::History {
items: values,
greeting,
status,
})
.await
.is_err()
{
break;
}
}
Ok(Some(method)) => {
let _ = handle.send(method).await;
}
Ok(None) => break,
Err(e) => {
let _ = handle.send_event(Event::Error {
code: protocol::ErrorCode::Internal,
message: format!("invalid method: {e}"),
});
}
}
}
}
}
}