Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/body/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,16 @@ impl Body for Incoming {
ping.record_non_data();
Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
}
Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
Err(e) => {
match e.reason() {
// These reasons should cause reading the trailers to stop, but not fail it.
// The same logic as for `Read for H2Upgraded` is applied here.
Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => {
Poll::Ready(None)
}
_ => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
}
}
}
}

Expand Down
97 changes: 89 additions & 8 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2552,7 +2552,7 @@ mod conn {
}

#[tokio::test]
async fn http2_responds_before_consuming_request_body() {
async fn http2_responds_before_consuming_request_body_no_trailers() {
// Test that a early-response from server works correctly (request body wasn't fully consumed).
// https://github.com/hyperium/hyper/issues/2872
use hyper::service::service_fn;
Expand Down Expand Up @@ -2596,15 +2596,96 @@ mod conn {
let resp = client.send_request(req).await.expect("send_request");
assert!(resp.status().is_success());

let mut body = String::new();
concat(resp.into_body())
let (body, trailers) = crate::concat_with_trailers(resp.into_body()).await.unwrap();
assert_eq!(body.as_ref(), b"No bread for you!");
assert!(trailers.is_none());
}

#[tokio::test]
async fn http2_responds_before_consuming_request_body_with_trailers() {
// Test that a early-response from server works correctly (request body wasn't fully consumed).
// https://github.com/hyperium/hyper/issues/2872
use hyper::body::{Body, Frame, SizeHint};
use hyper::header::{HeaderMap, HeaderValue};
use hyper::service::service_fn;

let _ = pretty_env_logger::try_init();

let (listener, addr) = setup_tk_test_server().await;

/// An `HttpBody` implementation whose `is_end_stream()` will
/// return `true` after sending trailers.
pub struct TrailersBody(Option<HeaderMap>);

impl Body for TrailersBody {
type Data = bytes::Bytes;
type Error = hyper::Error;

fn poll_frame(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
if let Some(trailers) = self.0.take() {
Poll::Ready(Some(Ok(Frame::trailers(trailers))))
} else {
Poll::Ready(None)
}
}

fn is_end_stream(&self) -> bool {
self.0.is_none()
}

fn size_hint(&self) -> SizeHint {
SizeHint::with_exact(0)
}
}

// Spawn an HTTP2 server that responds before reading the whole request body.
// It's normal case to decline the request due to headers or size of the body.
tokio::spawn(async move {
let sock = TokioIo::new(listener.accept().await.unwrap().0);
hyper::server::conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.serve_connection(
sock,
service_fn(|_req| async move {
let mut trailers = HeaderMap::new();
trailers.insert("grpc", HeaderValue::from_static("0"));
let body = TrailersBody(Some(trailers));
Ok::<_, hyper::Error>(http::Response::new(body))
}),
)
.await
.expect("serve_connection");
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.handshake(io)
.await
.unwrap()
.reader()
.read_to_string(&mut body)
.unwrap();
.expect("http handshake");

tokio::spawn(async move {
conn.await.expect("client conn shouldn't error");
});

// Use a channel to keep request stream open
let (_tx, recv) = mpsc::channel::<Result<Frame<Bytes>, Box<dyn Error + Send + Sync>>>(0);
let req = Request::post("/a").body(StreamBody::new(recv)).unwrap();
let resp = client.send_request(req).await.expect("send_request");
assert!(resp.status().is_success());

let (body, trailers) = crate::concat_with_trailers(resp.into_body()).await.unwrap();

// No body:
assert!(body.is_empty());

assert_eq!(&body, "No bread for you!");
// Have our `grpc` trailer:
let trailers = trailers.expect("response has trailers");
assert_eq!(trailers.len(), 1);
assert_eq!(trailers.get("grpc").unwrap(), "0");
}

#[tokio::test]
Expand Down