diff --git a/crates/pod/src/ipc/server.rs b/crates/pod/src/ipc/server.rs index 930b6270..9ad57ace 100644 --- a/crates/pod/src/ipc/server.rs +++ b/crates/pod/src/ipc/server.rs @@ -1,4 +1,5 @@ use std::io; +use std::io::ErrorKind; use std::path::PathBuf; use protocol::stream::{JsonLineReader, JsonLineWriter}; @@ -57,6 +58,16 @@ impl Drop for SocketServer { } } +fn is_peer_disconnect_read_error(error: &io::Error) -> bool { + matches!( + error.kind(), + ErrorKind::ConnectionReset + | ErrorKind::ConnectionAborted + | ErrorKind::BrokenPipe + | ErrorKind::UnexpectedEof + ) +} + async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { let (reader, writer) = stream.into_split(); let mut reader = JsonLineReader::new(reader); @@ -206,14 +217,48 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { let _ = handle.send(method).await; } Ok(None) => break, + Err(e) if is_peer_disconnect_read_error(&e) => break, Err(e) => { - let _ = handle.send_event(Event::Error { - code: protocol::ErrorCode::Internal, - message: format!("invalid method: {e}"), - }); + if writer + .write(&Event::Error { + code: protocol::ErrorCode::InvalidRequest, + message: format!("invalid method: {e}"), + }) + .await + .is_err() + { + break; + } } } } } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn peer_disconnect_read_errors_are_connection_close() { + for kind in [ + ErrorKind::ConnectionReset, + ErrorKind::ConnectionAborted, + ErrorKind::BrokenPipe, + ErrorKind::UnexpectedEof, + ] { + let error = io::Error::new(kind, "peer disconnected"); + assert!( + is_peer_disconnect_read_error(&error), + "{kind:?} should be treated as a normal peer disconnect" + ); + } + } + + #[test] + fn invalid_data_is_not_peer_disconnect() { + let error = io::Error::new(ErrorKind::InvalidData, "malformed method"); + assert!(!is_peer_disconnect_read_error(&error)); + } +} diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index 937816ff..0252cb31 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -1100,45 +1100,128 @@ async fn socket_pod_event_turn_ended_while_idle_auto_starts_turn() { ); } -#[tokio::test] -async fn socket_invalid_method_returns_error() { +async fn socket_error_after_method_line( + handle: &PodHandle, + line: &[u8], +) -> (pod::ErrorCode, String) { use protocol::stream::JsonLineReader; use tokio::io::AsyncWriteExt; use tokio::net::UnixStream; + let sock_path = handle.runtime_dir.socket_path(); + let stream = UnixStream::connect(&sock_path).await.unwrap(); + let (reader, mut writer) = stream.into_split(); + let mut reader = JsonLineReader::new(reader); + + writer.write_all(line).await.unwrap(); + + let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(1); + loop { + tokio::select! { + event = reader.next::() => { + match event { + Ok(Some(Event::Error { code, message })) => return (code, message), + Ok(Some(_)) => {} + Ok(None) => panic!("socket closed before invalid-method error"), + Err(e) => panic!("socket read failed before invalid-method error: {e}"), + } + } + _ = tokio::time::sleep_until(deadline) => { + panic!("timed out waiting for invalid-method error") + } + } + } +} + +#[tokio::test] +async fn socket_schema_invalid_method_returns_error() { + let client = MockClient::new(simple_text_events()); + let pod = make_pod(client).await; + let handle = spawn_controller(pod).await; + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let (code, message) = socket_error_after_method_line(&handle, b"{\"bad\":\"json\"}\n").await; + + assert_eq!(code, pod::ErrorCode::InvalidRequest); + assert!( + message.contains("invalid method"), + "expected invalid-method diagnostic, got: {message}" + ); +} + +#[tokio::test] +async fn socket_malformed_method_returns_error() { + let client = MockClient::new(simple_text_events()); + let pod = make_pod(client).await; + let handle = spawn_controller(pod).await; + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let (code, message) = socket_error_after_method_line(&handle, b"{not-json}\n").await; + + assert_eq!(code, pod::ErrorCode::InvalidRequest); + assert!( + message.contains("invalid method"), + "expected invalid-method diagnostic, got: {message}" + ); +} + +#[tokio::test] +async fn socket_peer_close_without_method_does_not_broadcast_error() { + use protocol::stream::JsonLineReader; + use tokio::net::UnixStream; + let client = MockClient::new(simple_text_events()); let pod = make_pod(client).await; let handle = spawn_controller(pod).await; tokio::time::sleep(std::time::Duration::from_millis(50)).await; + let mut broadcast_rx = handle.subscribe(); let sock_path = handle.runtime_dir.socket_path(); let stream = UnixStream::connect(&sock_path).await.unwrap(); - let (reader, mut writer) = stream.into_split(); + let (reader, writer) = stream.into_split(); let mut reader = JsonLineReader::new(reader); - // Send garbage - writer.write_all(b"{\"bad\":\"json\"}\n").await.unwrap(); - let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(1); - let mut saw_error = false; loop { tokio::select! { event = reader.next::() => { match event { - Ok(Some(Event::Error { .. })) => { - saw_error = true; - break; + Ok(Some(Event::Snapshot { .. })) => break, + Ok(Some(_)) => {} + Ok(None) => panic!("socket closed before connect-time snapshot"), + Err(e) => panic!("socket read failed before connect-time snapshot: {e}"), + } + } + _ = tokio::time::sleep_until(deadline) => { + panic!("timed out waiting for connect-time snapshot") + } + } + } + + drop(writer); + drop(reader); + + let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(200); + loop { + tokio::select! { + event = broadcast_rx.recv() => { + match event { + Ok(Event::Error { code, message }) => { + panic!("peer close without Method broadcast error {code:?}: {message}") } - Ok(None) | Err(_) => break, - _ => {} + Ok(_) => {} + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + panic!("broadcast receiver lagged while checking peer close: {n}") + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, } } _ = tokio::time::sleep_until(deadline) => break, } } - - assert!(saw_error, "should see error for invalid method"); } // ---------------------------------------------------------------------------