fix: suppress pod socket peer disconnect noise

This commit is contained in:
Keisuke Hirata 2026-05-30 02:20:33 +09:00
parent a17cd47bdd
commit d5d50a3214
No known key found for this signature in database
2 changed files with 146 additions and 18 deletions

View File

@ -1,4 +1,5 @@
use std::io;
use std::io::ErrorKind;
use std::path::PathBuf;
use protocol::stream::{JsonLineReader, JsonLineWriter};
@ -57,6 +58,16 @@ impl Drop for SocketServer {
}
}
fn is_peer_disconnect_read_error(error: &io::Error) -> bool {
matches!(
error.kind(),
ErrorKind::ConnectionReset
| ErrorKind::ConnectionAborted
| ErrorKind::BrokenPipe
| ErrorKind::UnexpectedEof
)
}
async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
let (reader, writer) = stream.into_split();
let mut reader = JsonLineReader::new(reader);
@ -206,14 +217,48 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
let _ = handle.send(method).await;
}
Ok(None) => break,
Err(e) if is_peer_disconnect_read_error(&e) => break,
Err(e) => {
let _ = handle.send_event(Event::Error {
code: protocol::ErrorCode::Internal,
message: format!("invalid method: {e}"),
});
if writer
.write(&Event::Error {
code: protocol::ErrorCode::InvalidRequest,
message: format!("invalid method: {e}"),
})
.await
.is_err()
{
break;
}
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn peer_disconnect_read_errors_are_connection_close() {
for kind in [
ErrorKind::ConnectionReset,
ErrorKind::ConnectionAborted,
ErrorKind::BrokenPipe,
ErrorKind::UnexpectedEof,
] {
let error = io::Error::new(kind, "peer disconnected");
assert!(
is_peer_disconnect_read_error(&error),
"{kind:?} should be treated as a normal peer disconnect"
);
}
}
#[test]
fn invalid_data_is_not_peer_disconnect() {
let error = io::Error::new(ErrorKind::InvalidData, "malformed method");
assert!(!is_peer_disconnect_read_error(&error));
}
}

View File

@ -1100,45 +1100,128 @@ async fn socket_pod_event_turn_ended_while_idle_auto_starts_turn() {
);
}
#[tokio::test]
async fn socket_invalid_method_returns_error() {
async fn socket_error_after_method_line(
handle: &PodHandle,
line: &[u8],
) -> (pod::ErrorCode, String) {
use protocol::stream::JsonLineReader;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
let sock_path = handle.runtime_dir.socket_path();
let stream = UnixStream::connect(&sock_path).await.unwrap();
let (reader, mut writer) = stream.into_split();
let mut reader = JsonLineReader::new(reader);
writer.write_all(line).await.unwrap();
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(1);
loop {
tokio::select! {
event = reader.next::<Event>() => {
match event {
Ok(Some(Event::Error { code, message })) => return (code, message),
Ok(Some(_)) => {}
Ok(None) => panic!("socket closed before invalid-method error"),
Err(e) => panic!("socket read failed before invalid-method error: {e}"),
}
}
_ = tokio::time::sleep_until(deadline) => {
panic!("timed out waiting for invalid-method error")
}
}
}
}
#[tokio::test]
async fn socket_schema_invalid_method_returns_error() {
let client = MockClient::new(simple_text_events());
let pod = make_pod(client).await;
let handle = spawn_controller(pod).await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let (code, message) = socket_error_after_method_line(&handle, b"{\"bad\":\"json\"}\n").await;
assert_eq!(code, pod::ErrorCode::InvalidRequest);
assert!(
message.contains("invalid method"),
"expected invalid-method diagnostic, got: {message}"
);
}
#[tokio::test]
async fn socket_malformed_method_returns_error() {
let client = MockClient::new(simple_text_events());
let pod = make_pod(client).await;
let handle = spawn_controller(pod).await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let (code, message) = socket_error_after_method_line(&handle, b"{not-json}\n").await;
assert_eq!(code, pod::ErrorCode::InvalidRequest);
assert!(
message.contains("invalid method"),
"expected invalid-method diagnostic, got: {message}"
);
}
#[tokio::test]
async fn socket_peer_close_without_method_does_not_broadcast_error() {
use protocol::stream::JsonLineReader;
use tokio::net::UnixStream;
let client = MockClient::new(simple_text_events());
let pod = make_pod(client).await;
let handle = spawn_controller(pod).await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut broadcast_rx = handle.subscribe();
let sock_path = handle.runtime_dir.socket_path();
let stream = UnixStream::connect(&sock_path).await.unwrap();
let (reader, mut writer) = stream.into_split();
let (reader, writer) = stream.into_split();
let mut reader = JsonLineReader::new(reader);
// Send garbage
writer.write_all(b"{\"bad\":\"json\"}\n").await.unwrap();
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(1);
let mut saw_error = false;
loop {
tokio::select! {
event = reader.next::<Event>() => {
match event {
Ok(Some(Event::Error { .. })) => {
saw_error = true;
break;
Ok(Some(Event::Snapshot { .. })) => break,
Ok(Some(_)) => {}
Ok(None) => panic!("socket closed before connect-time snapshot"),
Err(e) => panic!("socket read failed before connect-time snapshot: {e}"),
}
}
_ = tokio::time::sleep_until(deadline) => {
panic!("timed out waiting for connect-time snapshot")
}
}
}
drop(writer);
drop(reader);
let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(200);
loop {
tokio::select! {
event = broadcast_rx.recv() => {
match event {
Ok(Event::Error { code, message }) => {
panic!("peer close without Method broadcast error {code:?}: {message}")
}
Ok(None) | Err(_) => break,
_ => {}
Ok(_) => {}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
panic!("broadcast receiver lagged while checking peer close: {n}")
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
_ = tokio::time::sleep_until(deadline) => break,
}
}
assert!(saw_error, "should see error for invalid method");
}
// ---------------------------------------------------------------------------