merge: pod socket disconnect noise fix
This commit is contained in:
commit
ccb8f96118
|
|
@ -1,4 +1,5 @@
|
||||||
use std::io;
|
use std::io;
|
||||||
|
use std::io::ErrorKind;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use protocol::stream::{JsonLineReader, JsonLineWriter};
|
use protocol::stream::{JsonLineReader, JsonLineWriter};
|
||||||
|
|
@ -57,6 +58,16 @@ impl Drop for SocketServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn is_peer_disconnect_read_error(error: &io::Error) -> bool {
|
||||||
|
matches!(
|
||||||
|
error.kind(),
|
||||||
|
ErrorKind::ConnectionReset
|
||||||
|
| ErrorKind::ConnectionAborted
|
||||||
|
| ErrorKind::BrokenPipe
|
||||||
|
| ErrorKind::UnexpectedEof
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
|
async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
|
||||||
let (reader, writer) = stream.into_split();
|
let (reader, writer) = stream.into_split();
|
||||||
let mut reader = JsonLineReader::new(reader);
|
let mut reader = JsonLineReader::new(reader);
|
||||||
|
|
@ -206,14 +217,48 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
|
||||||
let _ = handle.send(method).await;
|
let _ = handle.send(method).await;
|
||||||
}
|
}
|
||||||
Ok(None) => break,
|
Ok(None) => break,
|
||||||
|
Err(e) if is_peer_disconnect_read_error(&e) => break,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let _ = handle.send_event(Event::Error {
|
if writer
|
||||||
code: protocol::ErrorCode::Internal,
|
.write(&Event::Error {
|
||||||
message: format!("invalid method: {e}"),
|
code: protocol::ErrorCode::InvalidRequest,
|
||||||
});
|
message: format!("invalid method: {e}"),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn peer_disconnect_read_errors_are_connection_close() {
|
||||||
|
for kind in [
|
||||||
|
ErrorKind::ConnectionReset,
|
||||||
|
ErrorKind::ConnectionAborted,
|
||||||
|
ErrorKind::BrokenPipe,
|
||||||
|
ErrorKind::UnexpectedEof,
|
||||||
|
] {
|
||||||
|
let error = io::Error::new(kind, "peer disconnected");
|
||||||
|
assert!(
|
||||||
|
is_peer_disconnect_read_error(&error),
|
||||||
|
"{kind:?} should be treated as a normal peer disconnect"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn invalid_data_is_not_peer_disconnect() {
|
||||||
|
let error = io::Error::new(ErrorKind::InvalidData, "malformed method");
|
||||||
|
assert!(!is_peer_disconnect_read_error(&error));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1100,45 +1100,128 @@ async fn socket_pod_event_turn_ended_while_idle_auto_starts_turn() {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
async fn socket_error_after_method_line(
|
||||||
async fn socket_invalid_method_returns_error() {
|
handle: &PodHandle,
|
||||||
|
line: &[u8],
|
||||||
|
) -> (pod::ErrorCode, String) {
|
||||||
use protocol::stream::JsonLineReader;
|
use protocol::stream::JsonLineReader;
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
use tokio::net::UnixStream;
|
use tokio::net::UnixStream;
|
||||||
|
|
||||||
|
let sock_path = handle.runtime_dir.socket_path();
|
||||||
|
let stream = UnixStream::connect(&sock_path).await.unwrap();
|
||||||
|
let (reader, mut writer) = stream.into_split();
|
||||||
|
let mut reader = JsonLineReader::new(reader);
|
||||||
|
|
||||||
|
writer.write_all(line).await.unwrap();
|
||||||
|
|
||||||
|
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(1);
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
event = reader.next::<Event>() => {
|
||||||
|
match event {
|
||||||
|
Ok(Some(Event::Error { code, message })) => return (code, message),
|
||||||
|
Ok(Some(_)) => {}
|
||||||
|
Ok(None) => panic!("socket closed before invalid-method error"),
|
||||||
|
Err(e) => panic!("socket read failed before invalid-method error: {e}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = tokio::time::sleep_until(deadline) => {
|
||||||
|
panic!("timed out waiting for invalid-method error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn socket_schema_invalid_method_returns_error() {
|
||||||
|
let client = MockClient::new(simple_text_events());
|
||||||
|
let pod = make_pod(client).await;
|
||||||
|
let handle = spawn_controller(pod).await;
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||||
|
|
||||||
|
let (code, message) = socket_error_after_method_line(&handle, b"{\"bad\":\"json\"}\n").await;
|
||||||
|
|
||||||
|
assert_eq!(code, pod::ErrorCode::InvalidRequest);
|
||||||
|
assert!(
|
||||||
|
message.contains("invalid method"),
|
||||||
|
"expected invalid-method diagnostic, got: {message}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn socket_malformed_method_returns_error() {
|
||||||
|
let client = MockClient::new(simple_text_events());
|
||||||
|
let pod = make_pod(client).await;
|
||||||
|
let handle = spawn_controller(pod).await;
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||||
|
|
||||||
|
let (code, message) = socket_error_after_method_line(&handle, b"{not-json}\n").await;
|
||||||
|
|
||||||
|
assert_eq!(code, pod::ErrorCode::InvalidRequest);
|
||||||
|
assert!(
|
||||||
|
message.contains("invalid method"),
|
||||||
|
"expected invalid-method diagnostic, got: {message}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn socket_peer_close_without_method_does_not_broadcast_error() {
|
||||||
|
use protocol::stream::JsonLineReader;
|
||||||
|
use tokio::net::UnixStream;
|
||||||
|
|
||||||
let client = MockClient::new(simple_text_events());
|
let client = MockClient::new(simple_text_events());
|
||||||
let pod = make_pod(client).await;
|
let pod = make_pod(client).await;
|
||||||
let handle = spawn_controller(pod).await;
|
let handle = spawn_controller(pod).await;
|
||||||
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||||
|
|
||||||
|
let mut broadcast_rx = handle.subscribe();
|
||||||
let sock_path = handle.runtime_dir.socket_path();
|
let sock_path = handle.runtime_dir.socket_path();
|
||||||
let stream = UnixStream::connect(&sock_path).await.unwrap();
|
let stream = UnixStream::connect(&sock_path).await.unwrap();
|
||||||
let (reader, mut writer) = stream.into_split();
|
let (reader, writer) = stream.into_split();
|
||||||
let mut reader = JsonLineReader::new(reader);
|
let mut reader = JsonLineReader::new(reader);
|
||||||
|
|
||||||
// Send garbage
|
|
||||||
writer.write_all(b"{\"bad\":\"json\"}\n").await.unwrap();
|
|
||||||
|
|
||||||
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(1);
|
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(1);
|
||||||
let mut saw_error = false;
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
event = reader.next::<Event>() => {
|
event = reader.next::<Event>() => {
|
||||||
match event {
|
match event {
|
||||||
Ok(Some(Event::Error { .. })) => {
|
Ok(Some(Event::Snapshot { .. })) => break,
|
||||||
saw_error = true;
|
Ok(Some(_)) => {}
|
||||||
break;
|
Ok(None) => panic!("socket closed before connect-time snapshot"),
|
||||||
|
Err(e) => panic!("socket read failed before connect-time snapshot: {e}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = tokio::time::sleep_until(deadline) => {
|
||||||
|
panic!("timed out waiting for connect-time snapshot")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(writer);
|
||||||
|
drop(reader);
|
||||||
|
|
||||||
|
let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(200);
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
event = broadcast_rx.recv() => {
|
||||||
|
match event {
|
||||||
|
Ok(Event::Error { code, message }) => {
|
||||||
|
panic!("peer close without Method broadcast error {code:?}: {message}")
|
||||||
}
|
}
|
||||||
Ok(None) | Err(_) => break,
|
Ok(_) => {}
|
||||||
_ => {}
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||||
|
panic!("broadcast receiver lagged while checking peer close: {n}")
|
||||||
|
}
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = tokio::time::sleep_until(deadline) => break,
|
_ = tokio::time::sleep_until(deadline) => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert!(saw_error, "should see error for invalid method");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user