プロトコルStreamのユーティリティ共通化

This commit is contained in:
Keisuke Hirata 2026-04-11 15:58:52 +09:00
parent 3d2a49e1e4
commit 496038307f
9 changed files with 121 additions and 123 deletions

5
Cargo.lock generated
View File

@ -1270,6 +1270,7 @@ version = "0.1.0"
dependencies = [
"serde",
"serde_json",
"tokio",
]
[[package]]
@ -1845,9 +1846,9 @@ dependencies = [
[[package]]
name = "tokio"
version = "1.51.0"
version = "1.51.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bd1c4c0fc4a7ab90fc15ef6daaa3ec3b893f004f915f2392557ed23237820cd"
checksum = "f66bf9585cda4b724d3e78ab34b73fb2bbaba9011b9bfdf69dc836382ea13b8c"
dependencies = [
"bytes",
"libc",

View File

@ -9,3 +9,4 @@
- [x] pod バイナリエントリポイント
- [x] セッションエントリのハッシュチェーン
- [x] Subscriber → クロージャ API 移行
- [x] JSONL ストリーム変換ユーティリティ (protocol::stream)

View File

@ -1,7 +1,7 @@
use std::io;
use std::path::PathBuf;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use protocol::stream::{JsonLineReader, JsonLineWriter};
use tokio::net::UnixListener;
use tokio::task::JoinHandle;
@ -61,34 +61,28 @@ impl Drop for SocketServer {
}
async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
let (reader, mut writer) = stream.into_split();
let mut lines = BufReader::new(reader).lines();
let (reader, writer) = stream.into_split();
let mut reader = JsonLineReader::new(reader);
let mut writer = JsonLineWriter::new(writer);
let mut rx = handle.subscribe();
// Event writer: broadcast events → socket
let write_task = tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
if let Ok(line) = event.to_json_line() {
let mut buf = line.into_bytes();
buf.push(b'\n');
if writer.write_all(&buf).await.is_err() {
if writer.write(&event).await.is_err() {
break;
}
}
}
});
// Method reader: socket → controller
while let Ok(Some(line)) = lines.next_line().await {
if line.is_empty() {
continue;
}
match Method::from_json_line(&line) {
Ok(method) => {
loop {
match reader.next::<Method>().await {
Ok(Some(method)) => {
let _ = handle.send(method).await;
}
Ok(None) => break,
Err(e) => {
// Send parse error back as an event
let _ = handle.send_event(Event::Error {
code: protocol::ErrorCode::Internal,
message: format!("invalid method: {e}"),

View File

@ -331,7 +331,7 @@ async fn status_json_reflects_pod_name() {
#[tokio::test]
async fn socket_run_receives_events() {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use protocol::stream::{JsonLineReader, JsonLineWriter};
use tokio::net::UnixStream;
let client = MockClient::new(simple_text_events());
@ -343,12 +343,15 @@ async fn socket_run_receives_events() {
let sock_path = handle.runtime_dir.socket_path();
let stream = UnixStream::connect(&sock_path).await.unwrap();
let (reader, mut writer) = stream.into_split();
let mut lines = BufReader::new(reader).lines();
let (reader, writer) = stream.into_split();
let mut reader = JsonLineReader::new(reader);
let mut writer = JsonLineWriter::new(writer);
// Send run method via socket
writer
.write_all(b"{\"method\":\"run\",\"params\":{\"input\":\"Hello\"}}\n")
.write(&Method::Run {
input: "Hello".into(),
})
.await
.unwrap();
@ -360,23 +363,18 @@ async fn socket_run_receives_events() {
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
loop {
tokio::select! {
line = lines.next_line() => {
match line {
Ok(Some(line)) => {
let parsed: serde_json::Value = serde_json::from_str(&line).unwrap();
match parsed["event"].as_str() {
Some("turn_start") => saw_turn_start = true,
Some("text_delta") => saw_text_delta = true,
Some("turn_end") => {
event = reader.next::<Event>() => {
match event {
Ok(Some(Event::TurnStart { .. })) => saw_turn_start = true,
Ok(Some(Event::TextDelta { .. })) => saw_text_delta = true,
Ok(Some(Event::TurnEnd { .. })) => {
saw_turn_end = true;
break;
}
Ok(None) | Err(_) => break,
_ => {}
}
}
_ => break,
}
}
_ = tokio::time::sleep_until(deadline) => break,
}
}
@ -388,7 +386,8 @@ async fn socket_run_receives_events() {
#[tokio::test]
async fn socket_invalid_method_returns_error() {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use protocol::stream::JsonLineReader;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
let client = MockClient::new(simple_text_events());
@ -400,7 +399,7 @@ async fn socket_invalid_method_returns_error() {
let sock_path = handle.runtime_dir.socket_path();
let stream = UnixStream::connect(&sock_path).await.unwrap();
let (reader, mut writer) = stream.into_split();
let mut lines = BufReader::new(reader).lines();
let mut reader = JsonLineReader::new(reader);
// Send garbage
writer.write_all(b"{\"bad\":\"json\"}\n").await.unwrap();
@ -409,16 +408,14 @@ async fn socket_invalid_method_returns_error() {
let mut saw_error = false;
loop {
tokio::select! {
line = lines.next_line() => {
match line {
Ok(Some(line)) => {
let parsed: serde_json::Value = serde_json::from_str(&line).unwrap();
if parsed["event"] == "error" {
event = reader.next::<Event>() => {
match event {
Ok(Some(Event::Error { .. })) => {
saw_error = true;
break;
}
}
_ => break,
Ok(None) | Err(_) => break,
_ => {}
}
}
_ = tokio::time::sleep_until(deadline) => break,

View File

@ -7,3 +7,4 @@ license.workspace = true
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.51.1", features = ["io-util"] }

View File

@ -1,3 +1,5 @@
pub mod stream;
use serde::{Deserialize, Serialize};
// ---------------------------------------------------------------------------
@ -12,12 +14,6 @@ pub enum Method {
Cancel,
}
impl Method {
pub fn from_json_line(line: &str) -> Result<Self, serde_json::Error> {
serde_json::from_str(line)
}
}
// ---------------------------------------------------------------------------
// Event (Pod → Client via Unix Socket broadcast)
// ---------------------------------------------------------------------------
@ -69,12 +65,6 @@ pub enum Event {
},
}
impl Event {
pub fn to_json_line(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(self)
}
}
// ---------------------------------------------------------------------------
// Supporting types
// ---------------------------------------------------------------------------
@ -112,7 +102,7 @@ mod tests {
#[test]
fn method_run_json_roundtrip() {
let json = r#"{"method":"run","params":{"input":"Hello"}}"#;
let method = Method::from_json_line(json).unwrap();
let method: Method = serde_json::from_str(json).unwrap();
assert!(matches!(method, Method::Run { ref input } if input == "Hello"));
let serialized = serde_json::to_string(&method).unwrap();
@ -122,7 +112,7 @@ mod tests {
#[test]
fn method_without_params() {
let json = r#"{"method":"resume"}"#;
let method = Method::from_json_line(json).unwrap();
let method: Method = serde_json::from_str(json).unwrap();
assert!(matches!(method, Method::Resume));
}
@ -131,7 +121,7 @@ mod tests {
let event = Event::TextDelta {
text: "Hello".into(),
};
let json = event.to_json_line().unwrap();
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["event"], "text_delta");
assert_eq!(parsed["data"]["text"], "Hello");
@ -142,7 +132,7 @@ mod tests {
let event = Event::RunEnd {
result: RunResult::LimitReached,
};
let json = event.to_json_line().unwrap();
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["event"], "run_end");
assert_eq!(parsed["data"]["result"], "limit_reached");
@ -154,7 +144,7 @@ mod tests {
code: ErrorCode::AlreadyRunning,
message: "Pod is already executing a turn".into(),
};
let json = event.to_json_line().unwrap();
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["event"], "error");
assert_eq!(parsed["data"]["code"], "already_running");

View File

@ -0,0 +1,65 @@
use std::io;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader};
/// JSONL line reader over an async byte stream.
///
/// Wraps the inner reader and deserialises each nonempty line as `T`.
pub struct JsonLineReader<R> {
inner: R,
}
impl<R: tokio::io::AsyncRead + Unpin> JsonLineReader<BufReader<R>> {
/// Wrap a raw reader (internally creates a [`BufReader`]).
pub fn new(reader: R) -> Self {
Self {
inner: BufReader::new(reader),
}
}
}
impl<R: AsyncBufRead + Unpin> JsonLineReader<R> {
/// Read and deserialise the next nonempty JSONL line.
///
/// Returns `Ok(None)` on EOF.
pub async fn next<T: DeserializeOwned>(&mut self) -> Result<Option<T>, io::Error> {
let mut line = String::new();
loop {
line.clear();
let n = self.inner.read_line(&mut line).await?;
if n == 0 {
return Ok(None); // EOF
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let value = serde_json::from_str(trimmed)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
return Ok(Some(value));
}
}
}
/// JSONL line writer over an async byte stream.
pub struct JsonLineWriter<W> {
inner: W,
}
impl<W: AsyncWrite + Unpin> JsonLineWriter<W> {
pub fn new(writer: W) -> Self {
Self { inner: writer }
}
/// Serialise `value` as a single JSONL line and flush.
pub async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), io::Error> {
let json = serde_json::to_string(value)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
self.inner.write_all(json.as_bytes()).await?;
self.inner.write_all(b"\n").await?;
self.inner.flush().await?;
Ok(())
}
}

View File

@ -1,13 +1,13 @@
use std::io;
use std::path::Path;
use protocol::stream::{JsonLineReader, JsonLineWriter};
use protocol::{Event, Method};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
use tokio::sync::mpsc;
pub struct PodClient {
writer: tokio::io::WriteHalf<UnixStream>,
writer: JsonLineWriter<tokio::io::WriteHalf<UnixStream>>,
event_rx: mpsc::Receiver<Event>,
}
@ -15,33 +15,24 @@ impl PodClient {
pub async fn connect(path: &Path) -> Result<Self, io::Error> {
let stream = UnixStream::connect(path).await?;
let (reader, writer) = tokio::io::split(stream);
let writer = JsonLineWriter::new(writer);
let (event_tx, event_rx) = mpsc::channel::<Event>(256);
tokio::spawn(async move {
let mut lines = BufReader::new(reader).lines();
while let Ok(Some(line)) = lines.next_line().await {
if line.is_empty() {
continue;
}
if let Ok(event) = serde_json::from_str::<Event>(&line) {
let mut reader = JsonLineReader::new(reader);
while let Ok(Some(event)) = reader.next::<Event>().await {
if event_tx.send(event).await.is_err() {
break;
}
}
}
});
Ok(Self { writer, event_rx })
}
pub async fn send(&mut self, method: &Method) -> Result<(), io::Error> {
let json = serde_json::to_string(method)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
self.writer.write_all(json.as_bytes()).await?;
self.writer.write_all(b"\n").await?;
self.writer.flush().await?;
Ok(())
self.writer.write(method).await
}
pub async fn next_event(&mut self) -> Option<Event> {

View File

@ -1,42 +0,0 @@
# protocol: JSONL ストリーム変換ユーティリティ
## 背景
protocol クレートは現在型定義のみ。JSONL の読み書き処理BufReader + lines + デシリアライズ / シリアライズ + `\n` + write_allが socket_server.rs と client.rs で重複している。空行スキップやエラー変換のロジックも各所にベタ書き。
## 方針
`protocol::stream` モジュールに `JsonLineReader` / `JsonLineWriter` を追加し、JSONL 変換を protocol クレートの責務に含める。
```rust
// protocol::stream
pub struct JsonLineReader<R> { /* BufReader<R> */ }
pub struct JsonLineWriter<W> { /* W */ }
impl<R: AsyncBufRead + Unpin> JsonLineReader<R> {
pub fn new(reader: R) -> Self;
pub async fn next<T: DeserializeOwned>(&mut self) -> Result<Option<T>, Error>;
}
impl<W: AsyncWrite + Unpin> JsonLineWriter<W> {
pub fn new(writer: W) -> Self;
pub async fn write<T: Serialize>(&mut self, value: &T) -> Result<(), Error>;
}
```
## 設計ポイント
- feature gate にはしない。このプロジェクトで protocol を tokio なしで使う場面がない
- tokio の依存は `io-util` のみ(`AsyncBufRead`, `AsyncWrite` に必要な最小限)
- 空行スキップ、改行付与、serde エラーの IO エラー変換を一箇所に集約
- `JsonLineReader` は内部で `BufReader` を持つ。呼び出し側で `BufReader` を作る必要がない
## 変更対象
- `crates/protocol/Cargo.toml` — tokio (io-util) 依存を追加
- `crates/protocol/src/stream.rs` — 新規モジュール
- `crates/protocol/src/lib.rs``pub mod stream`
- `crates/pod/src/socket_server.rs``JsonLineReader` / `JsonLineWriter` に置き換え
- `crates/tui/src/client.rs` — 同上
- `crates/pod/tests/controller_test.rs` — テストのストリーム処理も置き換え