diff --git a/src/body/incoming.rs b/src/body/incoming.rs index 64ee5001a9..e85dceba42 100644 --- a/src/body/incoming.rs +++ b/src/body/incoming.rs @@ -270,7 +270,16 @@ impl Body for Incoming { ping.record_non_data(); Poll::Ready(Ok(t.map(Frame::trailers)).transpose()) } - Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))), + Err(e) => { + match e.reason() { + // These reasons should cause reading the trailers to stop, but not fail it. + // The same logic as for `Read for H2Upgraded` is applied here. + Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => { + Poll::Ready(None) + } + _ => Poll::Ready(Some(Err(crate::Error::new_h2(e)))), + } + } } } diff --git a/tests/client.rs b/tests/client.rs index d5a0e4e005..b14c1008a0 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -2552,7 +2552,7 @@ mod conn { } #[tokio::test] - async fn http2_responds_before_consuming_request_body() { + async fn http2_responds_before_consuming_request_body_no_trailers() { // Test that a early-response from server works correctly (request body wasn't fully consumed). // https://github.com/hyperium/hyper/issues/2872 use hyper::service::service_fn; @@ -2596,15 +2596,96 @@ mod conn { let resp = client.send_request(req).await.expect("send_request"); assert!(resp.status().is_success()); - let mut body = String::new(); - concat(resp.into_body()) + let (body, trailers) = crate::concat_with_trailers(resp.into_body()).await.unwrap(); + assert_eq!(body.as_ref(), b"No bread for you!"); + assert!(trailers.is_none()); + } + + #[tokio::test] + async fn http2_responds_before_consuming_request_body_with_trailers() { + // Test that a early-response from server works correctly (request body wasn't fully consumed). + // https://github.com/hyperium/hyper/issues/2872 + use hyper::body::{Body, Frame, SizeHint}; + use hyper::header::{HeaderMap, HeaderValue}; + use hyper::service::service_fn; + + let _ = pretty_env_logger::try_init(); + + let (listener, addr) = setup_tk_test_server().await; + + /// An `HttpBody` implementation whose `is_end_stream()` will + /// return `true` after sending trailers. + pub struct TrailersBody(Option); + + impl Body for TrailersBody { + type Data = bytes::Bytes; + type Error = hyper::Error; + + fn poll_frame( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + if let Some(trailers) = self.0.take() { + Poll::Ready(Some(Ok(Frame::trailers(trailers)))) + } else { + Poll::Ready(None) + } + } + + fn is_end_stream(&self) -> bool { + self.0.is_none() + } + + fn size_hint(&self) -> SizeHint { + SizeHint::with_exact(0) + } + } + + // Spawn an HTTP2 server that responds before reading the whole request body. + // It's normal case to decline the request due to headers or size of the body. + tokio::spawn(async move { + let sock = TokioIo::new(listener.accept().await.unwrap().0); + hyper::server::conn::http2::Builder::new(TokioExecutor) + .timer(TokioTimer) + .serve_connection( + sock, + service_fn(|_req| async move { + let mut trailers = HeaderMap::new(); + trailers.insert("grpc", HeaderValue::from_static("0")); + let body = TrailersBody(Some(trailers)); + Ok::<_, hyper::Error>(http::Response::new(body)) + }), + ) + .await + .expect("serve_connection"); + }); + + let io = tcp_connect(&addr).await.expect("tcp connect"); + let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) + .timer(TokioTimer) + .handshake(io) .await - .unwrap() - .reader() - .read_to_string(&mut body) - .unwrap(); + .expect("http handshake"); + + tokio::spawn(async move { + conn.await.expect("client conn shouldn't error"); + }); + + // Use a channel to keep request stream open + let (_tx, recv) = mpsc::channel::, Box>>(0); + let req = Request::post("/a").body(StreamBody::new(recv)).unwrap(); + let resp = client.send_request(req).await.expect("send_request"); + assert!(resp.status().is_success()); + + let (body, trailers) = crate::concat_with_trailers(resp.into_body()).await.unwrap(); + + // No body: + assert!(body.is_empty()); - assert_eq!(&body, "No bread for you!"); + // Have our `grpc` trailer: + let trailers = trailers.expect("response has trailers"); + assert_eq!(trailers.len(), 1); + assert_eq!(trailers.get("grpc").unwrap(), "0"); } #[tokio::test]