From 496038307f6847e407d3450347e42a78e681e622 Mon Sep 17 00:00:00 2001 From: Hare Date: Sat, 11 Apr 2026 15:58:52 +0900 Subject: [PATCH] =?UTF-8?q?=E3=83=97=E3=83=AD=E3=83=88=E3=82=B3=E3=83=ABSt?= =?UTF-8?q?ream=E3=81=AE=E3=83=A6=E3=83=BC=E3=83=86=E3=82=A3=E3=83=AA?= =?UTF-8?q?=E3=83=86=E3=82=A3=E5=85=B1=E9=80=9A=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 5 ++- TODO.md | 1 + crates/pod/src/socket_server.rs | 26 +++++------- crates/pod/tests/controller_test.rs | 55 ++++++++++++------------ crates/protocol/Cargo.toml | 1 + crates/protocol/src/lib.rs | 24 ++++------- crates/protocol/src/stream.rs | 65 +++++++++++++++++++++++++++++ crates/tui/src/client.rs | 25 ++++------- tickets/protocol-stream-util.md | 42 ------------------- 9 files changed, 121 insertions(+), 123 deletions(-) create mode 100644 crates/protocol/src/stream.rs delete mode 100644 tickets/protocol-stream-util.md diff --git a/Cargo.lock b/Cargo.lock index 271b69a0..8bc7e633 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1270,6 +1270,7 @@ version = "0.1.0" dependencies = [ "serde", "serde_json", + "tokio", ] [[package]] @@ -1845,9 +1846,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.51.0" +version = "1.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bd1c4c0fc4a7ab90fc15ef6daaa3ec3b893f004f915f2392557ed23237820cd" +checksum = "f66bf9585cda4b724d3e78ab34b73fb2bbaba9011b9bfdf69dc836382ea13b8c" dependencies = [ "bytes", "libc", diff --git a/TODO.md b/TODO.md index 90c3e6d9..5e0f1b66 100644 --- a/TODO.md +++ b/TODO.md @@ -9,3 +9,4 @@ - [x] pod バイナリエントリポイント - [x] セッションエントリのハッシュチェーン - [x] Subscriber → クロージャ API 移行 +- [x] JSONL ストリーム変換ユーティリティ (protocol::stream) diff --git a/crates/pod/src/socket_server.rs b/crates/pod/src/socket_server.rs index 87293b6a..13d79968 100644 --- a/crates/pod/src/socket_server.rs +++ b/crates/pod/src/socket_server.rs @@ -1,7 +1,7 @@ use std::io; use std::path::PathBuf; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use protocol::stream::{JsonLineReader, JsonLineWriter}; use tokio::net::UnixListener; use tokio::task::JoinHandle; @@ -61,34 +61,28 @@ impl Drop for SocketServer { } async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { - let (reader, mut writer) = stream.into_split(); - let mut lines = BufReader::new(reader).lines(); + let (reader, writer) = stream.into_split(); + let mut reader = JsonLineReader::new(reader); + let mut writer = JsonLineWriter::new(writer); let mut rx = handle.subscribe(); // Event writer: broadcast events → socket let write_task = tokio::spawn(async move { while let Ok(event) = rx.recv().await { - if let Ok(line) = event.to_json_line() { - let mut buf = line.into_bytes(); - buf.push(b'\n'); - if writer.write_all(&buf).await.is_err() { - break; - } + if writer.write(&event).await.is_err() { + break; } } }); // Method reader: socket → controller - while let Ok(Some(line)) = lines.next_line().await { - if line.is_empty() { - continue; - } - match Method::from_json_line(&line) { - Ok(method) => { + loop { + match reader.next::().await { + Ok(Some(method)) => { let _ = handle.send(method).await; } + Ok(None) => break, Err(e) => { - // Send parse error back as an event let _ = handle.send_event(Event::Error { code: protocol::ErrorCode::Internal, message: format!("invalid method: {e}"), diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index 45714337..8526bbb4 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -331,7 +331,7 @@ async fn status_json_reflects_pod_name() { #[tokio::test] async fn socket_run_receives_events() { - use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; + use protocol::stream::{JsonLineReader, JsonLineWriter}; use tokio::net::UnixStream; let client = MockClient::new(simple_text_events()); @@ -343,12 +343,15 @@ async fn socket_run_receives_events() { let sock_path = handle.runtime_dir.socket_path(); let stream = UnixStream::connect(&sock_path).await.unwrap(); - let (reader, mut writer) = stream.into_split(); - let mut lines = BufReader::new(reader).lines(); + let (reader, writer) = stream.into_split(); + let mut reader = JsonLineReader::new(reader); + let mut writer = JsonLineWriter::new(writer); // Send run method via socket writer - .write_all(b"{\"method\":\"run\",\"params\":{\"input\":\"Hello\"}}\n") + .write(&Method::Run { + input: "Hello".into(), + }) .await .unwrap(); @@ -360,21 +363,16 @@ async fn socket_run_receives_events() { let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); loop { tokio::select! { - line = lines.next_line() => { - match line { - Ok(Some(line)) => { - let parsed: serde_json::Value = serde_json::from_str(&line).unwrap(); - match parsed["event"].as_str() { - Some("turn_start") => saw_turn_start = true, - Some("text_delta") => saw_text_delta = true, - Some("turn_end") => { - saw_turn_end = true; - break; - } - _ => {} - } + event = reader.next::() => { + match event { + Ok(Some(Event::TurnStart { .. })) => saw_turn_start = true, + Ok(Some(Event::TextDelta { .. })) => saw_text_delta = true, + Ok(Some(Event::TurnEnd { .. })) => { + saw_turn_end = true; + break; } - _ => break, + Ok(None) | Err(_) => break, + _ => {} } } _ = tokio::time::sleep_until(deadline) => break, @@ -388,7 +386,8 @@ async fn socket_run_receives_events() { #[tokio::test] async fn socket_invalid_method_returns_error() { - use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; + use protocol::stream::JsonLineReader; + use tokio::io::AsyncWriteExt; use tokio::net::UnixStream; let client = MockClient::new(simple_text_events()); @@ -400,7 +399,7 @@ async fn socket_invalid_method_returns_error() { let sock_path = handle.runtime_dir.socket_path(); let stream = UnixStream::connect(&sock_path).await.unwrap(); let (reader, mut writer) = stream.into_split(); - let mut lines = BufReader::new(reader).lines(); + let mut reader = JsonLineReader::new(reader); // Send garbage writer.write_all(b"{\"bad\":\"json\"}\n").await.unwrap(); @@ -409,16 +408,14 @@ async fn socket_invalid_method_returns_error() { let mut saw_error = false; loop { tokio::select! { - line = lines.next_line() => { - match line { - Ok(Some(line)) => { - let parsed: serde_json::Value = serde_json::from_str(&line).unwrap(); - if parsed["event"] == "error" { - saw_error = true; - break; - } + event = reader.next::() => { + match event { + Ok(Some(Event::Error { .. })) => { + saw_error = true; + break; } - _ => break, + Ok(None) | Err(_) => break, + _ => {} } } _ = tokio::time::sleep_until(deadline) => break, diff --git a/crates/protocol/Cargo.toml b/crates/protocol/Cargo.toml index ce38afeb..bb8c90f8 100644 --- a/crates/protocol/Cargo.toml +++ b/crates/protocol/Cargo.toml @@ -7,3 +7,4 @@ license.workspace = true [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +tokio = { version = "1.51.1", features = ["io-util"] } diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 3a23742d..2b2f8dd9 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -1,3 +1,5 @@ +pub mod stream; + use serde::{Deserialize, Serialize}; // --------------------------------------------------------------------------- @@ -12,12 +14,6 @@ pub enum Method { Cancel, } -impl Method { - pub fn from_json_line(line: &str) -> Result { - serde_json::from_str(line) - } -} - // --------------------------------------------------------------------------- // Event (Pod → Client via Unix Socket broadcast) // --------------------------------------------------------------------------- @@ -69,12 +65,6 @@ pub enum Event { }, } -impl Event { - pub fn to_json_line(&self) -> Result { - serde_json::to_string(self) - } -} - // --------------------------------------------------------------------------- // Supporting types // --------------------------------------------------------------------------- @@ -112,7 +102,7 @@ mod tests { #[test] fn method_run_json_roundtrip() { let json = r#"{"method":"run","params":{"input":"Hello"}}"#; - let method = Method::from_json_line(json).unwrap(); + let method: Method = serde_json::from_str(json).unwrap(); assert!(matches!(method, Method::Run { ref input } if input == "Hello")); let serialized = serde_json::to_string(&method).unwrap(); @@ -122,7 +112,7 @@ mod tests { #[test] fn method_without_params() { let json = r#"{"method":"resume"}"#; - let method = Method::from_json_line(json).unwrap(); + let method: Method = serde_json::from_str(json).unwrap(); assert!(matches!(method, Method::Resume)); } @@ -131,7 +121,7 @@ mod tests { let event = Event::TextDelta { text: "Hello".into(), }; - let json = event.to_json_line().unwrap(); + let json = serde_json::to_string(&event).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); assert_eq!(parsed["event"], "text_delta"); assert_eq!(parsed["data"]["text"], "Hello"); @@ -142,7 +132,7 @@ mod tests { let event = Event::RunEnd { result: RunResult::LimitReached, }; - let json = event.to_json_line().unwrap(); + let json = serde_json::to_string(&event).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); assert_eq!(parsed["event"], "run_end"); assert_eq!(parsed["data"]["result"], "limit_reached"); @@ -154,7 +144,7 @@ mod tests { code: ErrorCode::AlreadyRunning, message: "Pod is already executing a turn".into(), }; - let json = event.to_json_line().unwrap(); + let json = serde_json::to_string(&event).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); assert_eq!(parsed["event"], "error"); assert_eq!(parsed["data"]["code"], "already_running"); diff --git a/crates/protocol/src/stream.rs b/crates/protocol/src/stream.rs new file mode 100644 index 00000000..1916b0a5 --- /dev/null +++ b/crates/protocol/src/stream.rs @@ -0,0 +1,65 @@ +use std::io; + +use serde::de::DeserializeOwned; +use serde::Serialize; +use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader}; + +/// JSONL line reader over an async byte stream. +/// +/// Wraps the inner reader and deserialises each non‑empty line as `T`. +pub struct JsonLineReader { + inner: R, +} + +impl JsonLineReader> { + /// Wrap a raw reader (internally creates a [`BufReader`]). + pub fn new(reader: R) -> Self { + Self { + inner: BufReader::new(reader), + } + } +} + +impl JsonLineReader { + /// Read and deserialise the next non‑empty JSONL line. + /// + /// Returns `Ok(None)` on EOF. + pub async fn next(&mut self) -> Result, io::Error> { + let mut line = String::new(); + loop { + line.clear(); + let n = self.inner.read_line(&mut line).await?; + if n == 0 { + return Ok(None); // EOF + } + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + let value = serde_json::from_str(trimmed) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + return Ok(Some(value)); + } + } +} + +/// JSONL line writer over an async byte stream. +pub struct JsonLineWriter { + inner: W, +} + +impl JsonLineWriter { + pub fn new(writer: W) -> Self { + Self { inner: writer } + } + + /// Serialise `value` as a single JSONL line and flush. + pub async fn write(&mut self, value: &T) -> Result<(), io::Error> { + let json = serde_json::to_string(value) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + self.inner.write_all(json.as_bytes()).await?; + self.inner.write_all(b"\n").await?; + self.inner.flush().await?; + Ok(()) + } +} diff --git a/crates/tui/src/client.rs b/crates/tui/src/client.rs index 632234e1..04d44093 100644 --- a/crates/tui/src/client.rs +++ b/crates/tui/src/client.rs @@ -1,13 +1,13 @@ use std::io; use std::path::Path; +use protocol::stream::{JsonLineReader, JsonLineWriter}; use protocol::{Event, Method}; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixStream; use tokio::sync::mpsc; pub struct PodClient { - writer: tokio::io::WriteHalf, + writer: JsonLineWriter>, event_rx: mpsc::Receiver, } @@ -15,19 +15,15 @@ impl PodClient { pub async fn connect(path: &Path) -> Result { let stream = UnixStream::connect(path).await?; let (reader, writer) = tokio::io::split(stream); + let writer = JsonLineWriter::new(writer); let (event_tx, event_rx) = mpsc::channel::(256); tokio::spawn(async move { - let mut lines = BufReader::new(reader).lines(); - while let Ok(Some(line)) = lines.next_line().await { - if line.is_empty() { - continue; - } - if let Ok(event) = serde_json::from_str::(&line) { - if event_tx.send(event).await.is_err() { - break; - } + let mut reader = JsonLineReader::new(reader); + while let Ok(Some(event)) = reader.next::().await { + if event_tx.send(event).await.is_err() { + break; } } }); @@ -36,12 +32,7 @@ impl PodClient { } pub async fn send(&mut self, method: &Method) -> Result<(), io::Error> { - let json = serde_json::to_string(method) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - self.writer.write_all(json.as_bytes()).await?; - self.writer.write_all(b"\n").await?; - self.writer.flush().await?; - Ok(()) + self.writer.write(method).await } pub async fn next_event(&mut self) -> Option { diff --git a/tickets/protocol-stream-util.md b/tickets/protocol-stream-util.md deleted file mode 100644 index d6e684be..00000000 --- a/tickets/protocol-stream-util.md +++ /dev/null @@ -1,42 +0,0 @@ -# protocol: JSONL ストリーム変換ユーティリティ - -## 背景 - -protocol クレートは現在型定義のみ。JSONL の読み書き処理(BufReader + lines + デシリアライズ / シリアライズ + `\n` + write_all)が socket_server.rs と client.rs で重複している。空行スキップやエラー変換のロジックも各所にベタ書き。 - -## 方針 - -`protocol::stream` モジュールに `JsonLineReader` / `JsonLineWriter` を追加し、JSONL 変換を protocol クレートの責務に含める。 - -```rust -// protocol::stream - -pub struct JsonLineReader { /* BufReader */ } -pub struct JsonLineWriter { /* W */ } - -impl JsonLineReader { - pub fn new(reader: R) -> Self; - pub async fn next(&mut self) -> Result, Error>; -} - -impl JsonLineWriter { - pub fn new(writer: W) -> Self; - pub async fn write(&mut self, value: &T) -> Result<(), Error>; -} -``` - -## 設計ポイント - -- feature gate にはしない。このプロジェクトで protocol を tokio なしで使う場面がない -- tokio の依存は `io-util` のみ(`AsyncBufRead`, `AsyncWrite` に必要な最小限) -- 空行スキップ、改行付与、serde エラーの IO エラー変換を一箇所に集約 -- `JsonLineReader` は内部で `BufReader` を持つ。呼び出し側で `BufReader` を作る必要がない - -## 変更対象 - -- `crates/protocol/Cargo.toml` — tokio (io-util) 依存を追加 -- `crates/protocol/src/stream.rs` — 新規モジュール -- `crates/protocol/src/lib.rs` — `pub mod stream` -- `crates/pod/src/socket_server.rs` — `JsonLineReader` / `JsonLineWriter` に置き換え -- `crates/tui/src/client.rs` — 同上 -- `crates/pod/tests/controller_test.rs` — テストのストリーム処理も置き換え