From 7f538f38bfcd22e877538a608b3206d8063b0e48 Mon Sep 17 00:00:00 2001 From: ArcticLampyrid Date: Tue, 9 Jan 2024 17:14:03 +0800 Subject: [PATCH] feat(deps): upgrade dependencies to http v1 & hyper v1 --- Cargo.toml | 16 +++++---- examples/actix.rs | 3 +- examples/hyper.rs | 45 +++++++++++++++---------- examples/sample-litmus-server.rs | 51 ++++++++++++++++------------ examples/warp.rs | 2 +- src/actix.rs | 55 +++++++++++++++--------------- src/body.rs | 29 ++++------------ src/davhandler.rs | 27 +++++++++------ src/handle_gethead.rs | 2 +- src/handle_put.rs | 58 ++++++++++++++++---------------- src/util.rs | 2 +- src/warp.rs | 45 +++++++++++++++++++------ 12 files changed, 184 insertions(+), 151 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cd69b05..7851e42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ path = "src/lib.rs" [features] default = [] actix-compat = [ "actix-web" ] -warp-compat = [ "warp", "hyper" ] +warp-compat = [ "warp" ] all = [ "actix-compat", "warp-compat" ] [[example]] @@ -48,10 +48,11 @@ required-features = [ "warp-compat" ] bytes = "1.5.0" futures = "0.3.29" handlebars = "4.5.0" -headers = "0.3.9" +headers = "0.4.0" htmlescape = "0.3.1" -http = "0.2.11" -http-body = "0.4.5" +http = "1.0.0" +http-body = "1.0.0" +http-body-util = "0.1.0" lazy_static = "1.4.0" libc = "0.2.150" log = "0.4.20" @@ -69,14 +70,15 @@ uuid = { version = "1.6.1", features = ["v4"] } xml-rs = "0.8.19" xmltree = "0.10.3" -hyper = {version = "0.14.27", optional = true } +hyper = {version = "1.1.0", optional = true } warp = { version = "0.3.6", optional = true } -actix-web = { version = "4.4.0", optional = true } +actix-web = { version = "4.4.1", optional = true } [dev-dependencies] clap = { version = "4.4.8", features = ["derive"] } env_logger = "0.10.1" -hyper = { version = "0.14.27", features = [ "http1", "http2", "server", "stream", "runtime" ] } +hyper = { version = "1.1.0", features = [ "http1", "http2", "server" ] } +hyper-util = { version = "0.1.2", features = [ "http1", "http2", "server", "server-auto", "tokio" ] } tokio = { version = "1.34.0", features = ["full"] } time = { version = "0.3.30", default-features = false, features = ["local-offset"] } diff --git a/examples/actix.rs b/examples/actix.rs index 7128b30..147c185 100644 --- a/examples/actix.rs +++ b/examples/actix.rs @@ -1,5 +1,6 @@ use std::io; +use actix_web::web::Data; use actix_web::{web, App, HttpServer}; use webdav_handler::actix::*; use webdav_handler::{fakels::FakeLs, localfs::LocalFs, DavConfig, DavHandler}; @@ -28,7 +29,7 @@ async fn main() -> io::Result<()> { HttpServer::new(move || { App::new() - .data(dav_server.clone()) + .app_data(Data::new(dav_server.clone())) .service(web::resource("/{tail:.*}").to(dav_handler)) }) .bind(addr)? diff --git a/examples/hyper.rs b/examples/hyper.rs index ed25997..51d22dc 100644 --- a/examples/hyper.rs +++ b/examples/hyper.rs @@ -1,31 +1,42 @@ +use hyper::service::service_fn; +use hyper_util::rt::{TokioExecutor, TokioIo}; use std::convert::Infallible; +use std::net::SocketAddr; +use tokio::net::TcpListener; use webdav_handler::{fakels::FakeLs, localfs::LocalFs, DavHandler}; #[tokio::main] -async fn main() { +async fn main() -> Result<(), Box> { env_logger::init(); let dir = "/tmp"; - let addr = ([127, 0, 0, 1], 4918).into(); + let addr = SocketAddr::from(([127, 0, 0, 1], 4918)); let dav_server = DavHandler::builder() .filesystem(LocalFs::new(dir, false, false, false)) .locksystem(FakeLs::new()) + .autoindex(true, None) .build_handler(); - let make_service = hyper::service::make_service_fn(move |_| { - let dav_server = dav_server.clone(); - async move { - let func = move |req| { - let dav_server = dav_server.clone(); - async move { Ok::<_, Infallible>(dav_server.handle(req).await) } - }; - Ok::<_, Infallible>(hyper::service::service_fn(func)) - } - }); - + let listener = TcpListener::bind(addr).await?; println!("hyper example: listening on {:?} serving {}", addr, dir); - let _ = hyper::Server::bind(&addr) - .serve(make_service) - .await - .map_err(|e| eprintln!("server error: {}", e)); + + loop { + let (stream, _) = listener.accept().await?; + let io = TokioIo::new(stream); + let dav_server = dav_server.clone(); + tokio::task::spawn(async move { + if let Err(err) = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()) + .serve_connection( + io, + service_fn(move |req| { + let dav_server = dav_server.clone(); + async move { Ok::<_, Infallible>(dav_server.handle(req).await) } + }), + ) + .await + { + println!("Error serving connection: {:?}", err); + } + }); + } } diff --git a/examples/sample-litmus-server.rs b/examples/sample-litmus-server.rs index 58b45e2..2fbf46a 100644 --- a/examples/sample-litmus-server.rs +++ b/examples/sample-litmus-server.rs @@ -12,8 +12,9 @@ use std::str::FromStr; use clap::Parser; use env_logger; -use futures::future::TryFutureExt; -use hyper; +use hyper::service::service_fn; +use hyper_util::rt::{TokioExecutor, TokioIo}; +use tokio::net::TcpListener; use headers::{authorization::Basic, Authorization, HeaderMapExt}; @@ -50,7 +51,10 @@ impl Server { } } - async fn handle(&self, req: hyper::Request) -> Result, Infallible> { + async fn handle( + &self, + req: hyper::Request, + ) -> Result, Infallible> { let user = if self.auth { // we want the client to authenticate. match req.headers().typed_get::>() { @@ -113,27 +117,30 @@ async fn main() -> Result<(), Box> { let memls = args.memfs || args.memls; let fakels = args.fakels; let auth = args.auth; - - let dav_server = Server::new(dir.to_string(), memls, fakels, auth); - let make_service = hyper::service::make_service_fn(|_| { - let dav_server = dav_server.clone(); - async move { - let func = move |req| { - let dav_server = dav_server.clone(); - async move { dav_server.clone().handle(req).await } - }; - Ok::<_, hyper::Error>(hyper::service::service_fn(func)) - } - }); - let addr = format!("0.0.0.0:{}", args.port); let addr = SocketAddr::from_str(&addr)?; + let dav_server = Server::new(dir.to_string(), memls, fakels, auth); - let server = hyper::Server::try_bind(&addr)? - .serve(make_service) - .map_err(|e| eprintln!("server error: {}", e)); - + let listener = TcpListener::bind(addr).await?; println!("Serving {} on {}", name, args.port); - let _ = server.await; - Ok(()) + + loop { + let (stream, _) = listener.accept().await?; + let io = TokioIo::new(stream); + let dav_server = dav_server.clone(); + tokio::task::spawn(async move { + if let Err(err) = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()) + .serve_connection( + io, + service_fn(move |req| { + let dav_server = dav_server.clone(); + async move { dav_server.handle(req).await } + }), + ) + .await + { + println!("Error serving connection: {:?}", err); + } + }); + } } diff --git a/examples/warp.rs b/examples/warp.rs index eb1e43e..8cc2786 100644 --- a/examples/warp.rs +++ b/examples/warp.rs @@ -8,6 +8,6 @@ async fn main() { let addr: SocketAddr = ([127, 0, 0, 1], 4918).into(); println!("warp example: listening on {:?} serving {}", addr, dir); - let warpdav = dav_dir(dir, true, true); + let warpdav = dav_dir(dir, true, true, None); warp::serve(warpdav).run(addr).await; } diff --git a/src/actix.rs b/src/actix.rs index 70393da..973f66a 100644 --- a/src/actix.rs +++ b/src/actix.rs @@ -18,6 +18,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use actix_web::error::PayloadError; +use actix_web::http::StatusCode; use actix_web::{dev, Error, FromRequest, HttpRequest, HttpResponse}; use bytes::Bytes; use futures::{future, Stream}; @@ -28,7 +29,7 @@ use pin_project::pin_project; /// Wraps `http::Request` and implements `actix_web::FromRequest`. pub struct DavRequest { pub request: http::Request, - prefix: Option, + prefix: Option, } impl DavRequest { @@ -39,17 +40,24 @@ impl DavRequest { } impl FromRequest for DavRequest { - type Config = (); type Error = Error; type Future = future::Ready>; fn from_request(req: &HttpRequest, payload: &mut dev::Payload) -> Self::Future { + let http_version = match req.version() { + actix_web::http::Version::HTTP_09 => http::Version::HTTP_09, + actix_web::http::Version::HTTP_10 => http::Version::HTTP_10, + actix_web::http::Version::HTTP_11 => http::Version::HTTP_11, + actix_web::http::Version::HTTP_2 => http::Version::HTTP_2, + actix_web::http::Version::HTTP_3 => http::Version::HTTP_3, + _ => return future::ready(Err(io::Error::from(io::ErrorKind::Unsupported).into())), + }; let mut builder = http::Request::builder() - .method(req.method().to_owned()) - .uri(req.uri().to_owned()) - .version(req.version().to_owned()); + .method(req.method().as_str()) + .uri(req.uri().to_string()) + .version(http_version); for (name, value) in req.headers().iter() { - builder = builder.header(name, value); + builder = builder.header(name.as_str(), value.as_ref()); } let path = req.match_info().path(); let tail = req.match_info().unprocessed(); @@ -80,34 +88,23 @@ impl http_body::Body for DavBody { type Data = Bytes; type Error = io::Error; - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> - { + ) -> Poll, Self::Error>>> { let this = self.project(); match this.body.poll_next(cx) { - Poll::Ready(Some(Ok(data))) => Poll::Ready(Some(Ok(data))), - Poll::Ready(Some(Err(err))) => { - Poll::Ready(Some(Err(match err { - PayloadError::Incomplete(Some(err)) => err, - PayloadError::Incomplete(None) => io::ErrorKind::BrokenPipe.into(), - PayloadError::Io(err) => err, - other => io::Error::new(io::ErrorKind::Other, format!("{:?}", other)), - }))) - }, + Poll::Ready(Some(Ok(data))) => Poll::Ready(Some(Ok(http_body::Frame::data(data)))), + Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(match err { + PayloadError::Incomplete(Some(err)) => err, + PayloadError::Incomplete(None) => io::ErrorKind::BrokenPipe.into(), + PayloadError::Io(err) => err, + other => io::Error::new(io::ErrorKind::Other, format!("{:?}", other)), + }))), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context, - ) -> Poll, Self::Error>> - { - Poll::Ready(Ok(None)) - } } /// `http::Response` compatibility. @@ -122,14 +119,16 @@ impl From> for DavResponse { } impl actix_web::Responder for DavResponse { + type Body = actix_web::body::BoxBody; fn respond_to(self, _req: &HttpRequest) -> HttpResponse { use crate::body::{Body, BodyType}; let (parts, body) = self.0.into_parts(); - let mut builder = HttpResponse::build(parts.status); + let status = StatusCode::from_u16(parts.status.as_u16()).unwrap(); + let mut builder = HttpResponse::build(status); for (name, value) in parts.headers.into_iter() { - builder.append_header((name.unwrap(), value)); + builder.append_header((name.unwrap().as_str(), value.as_ref())); } // I noticed that actix-web returns an empty chunked body // (\r\n0\r\n\r\n) and _no_ Transfer-Encoding header on diff --git a/src/body.rs b/src/body.rs index 785c414..82d1009 100644 --- a/src/body.rs +++ b/src/body.rs @@ -7,7 +7,6 @@ use std::task::{Context, Poll}; use bytes::{Buf, Bytes}; use futures::stream::Stream; -use http::header::HeaderMap; use http_body::Body as HttpBody; use crate::async_stream::AsyncStream; @@ -52,16 +51,11 @@ impl HttpBody for Body { type Data = Bytes; type Error = io::Error; - fn poll_data(self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { - self.poll_next(cx) - } - - fn poll_trailers( + fn poll_frame( self: Pin<&mut Self>, - _cx: &mut Context, - ) -> Poll, Self::Error>> - { - Poll::Ready(Ok(None)) + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + self.poll_next(cx).map_ok(http_body::Frame::data) } } @@ -117,21 +111,12 @@ where type Data = ReqData; type Error = ReqError; - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> - { + ) -> Poll, Self::Error>>> { let this = self.project(); - this.body.poll_next(cx) - } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context, - ) -> Poll, Self::Error>> - { - Poll::Ready(Ok(None)) + this.body.poll_next(cx).map_ok(http_body::Frame::data) } } diff --git a/src/davhandler.rs b/src/davhandler.rs index a78969b..151b850 100644 --- a/src/davhandler.rs +++ b/src/davhandler.rs @@ -11,6 +11,7 @@ use futures::stream::Stream; use headers::HeaderMapExt; use http::{Request, Response, StatusCode}; use http_body::Body as HttpBody; +use http_body_util::BodyExt; use crate::body::{Body, StreamBody}; use crate::davheaders; @@ -348,18 +349,22 @@ impl DavInner { { let mut data = Vec::new(); pin_utils::pin_mut!(body); - while let Some(res) = body.data().await { - let mut buf = res.map_err(|_| { - DavError::IoError(io::Error::new(io::ErrorKind::UnexpectedEof, "UnexpectedEof")) - })?; - while buf.has_remaining() { - if data.len() + buf.remaining() > max_size { - return Err(StatusCode::PAYLOAD_TOO_LARGE.into()); + while let Some(frame) = body.frame().await { + if let Ok(mut buf) = frame + .map_err(|_| { + DavError::IoError(io::Error::new(io::ErrorKind::UnexpectedEof, "UnexpectedEof")) + })? + .into_data() + { + while buf.has_remaining() { + if data.len() + buf.remaining() > max_size { + return Err(StatusCode::PAYLOAD_TOO_LARGE.into()); + } + let b = buf.chunk(); + let l = b.len(); + data.extend_from_slice(b); + buf.advance(l); } - let b = buf.chunk(); - let l = b.len(); - data.extend_from_slice(b); - buf.advance(l); } } Ok(data) diff --git a/src/handle_gethead.rs b/src/handle_gethead.rs index 9eefa91..f914646 100644 --- a/src/handle_gethead.rs +++ b/src/handle_gethead.rs @@ -118,7 +118,7 @@ impl crate::DavInner { if let Some(r) = req.headers().typed_get::() { trace!("handle_gethead: range header {:?}", r); use std::ops::Bound::*; - for range in r.iter() { + for range in r.satisfiable_ranges(len) { let (start, mut count, valid) = match range { (Included(s), Included(e)) if e >= s => (s, e - s + 1, true), (Included(s), Unbounded) if s <= len => (s, len - s, true), diff --git a/src/handle_put.rs b/src/handle_put.rs index 9f0de59..f79f5b5 100644 --- a/src/handle_put.rs +++ b/src/handle_put.rs @@ -7,6 +7,7 @@ use headers::HeaderMapExt; use http::StatusCode as SC; use http::{self, Request, Response}; use http_body::Body as HttpBody; +use http_body_util::BodyExt as _; use crate::body::Body; use crate::conditional::if_match_get_tokens; @@ -25,28 +26,26 @@ const SABRE: &'static str = "application/x-sabredav-partialupdate"; // Also, this is senseless. It's not as if we _do_ anything with the // io::Error, other than noticing "oops an error occured". fn to_ioerror(err: E) -> io::Error -where E: StdError + Sync + Send + 'static { +where + E: StdError + Sync + Send + 'static, +{ let e = &err as &dyn Any; if e.is::() || e.is::>() { let err = Box::new(err) as Box; match err.downcast::() { Ok(e) => *e, - Err(e) => { - match e.downcast::>() { - Ok(e) => *(*e), - Err(_) => io::ErrorKind::Other.into(), - } + Err(e) => match e.downcast::>() { + Ok(e) => *(*e), + Err(_) => io::ErrorKind::Other.into(), }, } } else if e.is::() || e.is::>() { let err = Box::new(err) as Box; match err.downcast::() { Ok(e) => (*e).into(), - Err(e) => { - match e.downcast::>() { - Ok(e) => (*(*e)).into(), - Err(_) => io::ErrorKind::Other.into(), - } + Err(e) => match e.downcast::>() { + Ok(e) => (*(*e)).into(), + Err(_) => io::ErrorKind::Other.into(), }, } } else { @@ -211,24 +210,25 @@ impl crate::DavInner { // loop, read body, write to file. let mut total = 0u64; - while let Some(data) = body.data().await { - let mut buf = data.map_err(|e| to_ioerror(e))?; - let buflen = buf.remaining(); - total += buflen as u64; - // consistency check. - if have_count && total > count { - break; - } - // The `Buf` might actually be a `Bytes`. - let b = { - let b: &mut dyn std::any::Any = &mut buf; - b.downcast_mut::() - }; - if let Some(bytes) = b { - let bytes = std::mem::replace(bytes, Bytes::new()); - file.write_bytes(bytes).await?; - } else { - file.write_buf(Box::new(buf)).await?; + while let Some(frame) = body.frame().await { + if let Ok(mut buf) = frame.map_err(|e| to_ioerror(e))?.into_data() { + let buflen = buf.remaining(); + total += buflen as u64; + // consistency check. + if have_count && total > count { + break; + } + // The `Buf` might actually be a `Bytes`. + let b = { + let b: &mut dyn std::any::Any = &mut buf; + b.downcast_mut::() + }; + if let Some(bytes) = b { + let bytes = std::mem::replace(bytes, Bytes::new()); + file.write_bytes(bytes).await?; + } else { + file.write_buf(Box::new(buf)).await?; + } } } file.flush().await?; diff --git a/src/util.rs b/src/util.rs index c1c1528..8b0cc01 100644 --- a/src/util.rs +++ b/src/util.rs @@ -174,7 +174,7 @@ impl Write for MemBuffer { #[cfg(test)] mod tests { - use super::*; + use crate::time::systemtime_to_rfc3339; use std::time::UNIX_EPOCH; #[test] diff --git a/src/warp.rs b/src/warp.rs index 6051ca4..6717a32 100644 --- a/src/warp.rs +++ b/src/warp.rs @@ -8,7 +8,7 @@ use std::convert::Infallible; use std::path::Path; -use crate::{fakels::FakeLs, localfs::LocalFs, DavHandler}; +use crate::{fakels::FakeLs, localfs::LocalFs, time::UtcOffset, DavHandler}; use warp::{filters::BoxedFilter, Filter, Reply}; /// Reply-filter that runs a DavHandler. @@ -16,9 +16,8 @@ use warp::{filters::BoxedFilter, Filter, Reply}; /// Just pass in a pre-configured DavHandler. If a prefix was not /// configured, it will be the request path up to this point. pub fn dav_handler(handler: DavHandler) -> BoxedFilter<(impl Reply,)> { - use http::header::HeaderMap; - use http::uri::Uri; - use http::Response; + use warp::http::header::HeaderMap; + use warp::http::Method; use warp::path::{FullPath, Tail}; warp::method() @@ -27,16 +26,16 @@ pub fn dav_handler(handler: DavHandler) -> BoxedFilter<(impl Reply,)> { .and(warp::header::headers_cloned()) .and(warp::body::stream()) .and_then( - move |method, path_full: FullPath, path_tail: Tail, headers: HeaderMap, body| { + move |method: Method, path_full: FullPath, path_tail: Tail, headers: HeaderMap, body| { let handler = handler.clone(); async move { // rebuild an http::Request struct. let path_str = path_full.as_str(); - let uri = path_str.parse::().unwrap(); - let mut builder = http::Request::builder().method(method).uri(uri); + let uri = path_str.parse::().unwrap(); + let mut builder = http::Request::builder().method(method.as_str()).uri(uri); for (k, v) in headers.iter() { - builder = builder.header(k, v); + builder = builder.header(k.as_str(), v.as_ref()); } let request = builder.body(body).unwrap(); @@ -54,7 +53,26 @@ pub fn dav_handler(handler: DavHandler) -> BoxedFilter<(impl Reply,)> { // Need to remap the http_body::Body to a hyper::Body. let (parts, body) = response.into_parts(); - let response = Response::from_parts(parts, hyper::Body::wrap_stream(body)); + + let http_version = match parts.version { + http::Version::HTTP_09 => Some(warp::http::Version::HTTP_09), + http::Version::HTTP_10 => Some(warp::http::Version::HTTP_10), + http::Version::HTTP_11 => Some(warp::http::Version::HTTP_11), + http::Version::HTTP_2 => Some(warp::http::Version::HTTP_2), + http::Version::HTTP_3 => Some(warp::http::Version::HTTP_3), + _ => None, + }; + let response = match http_version { + Some(http_version) => warp::http::response::Response::builder() + .status(parts.status.as_u16()) + .version(http_version) + .body(warp::hyper::Body::wrap_stream(body)) + .unwrap(), + None => warp::http::response::Response::builder() + .status(warp::http::StatusCode::INTERNAL_SERVER_ERROR) + .body(warp::hyper::Body::empty()) + .unwrap(), + }; Ok::<_, Infallible>(response) } }, @@ -71,11 +89,16 @@ pub fn dav_handler(handler: DavHandler) -> BoxedFilter<(impl Reply,)> { /// - `index_html`: if an `index.html` file is found, serve it. /// - `auto_index`: create a directory listing. /// - no flags set: 404. -pub fn dav_dir(base: impl AsRef, index_html: bool, auto_index: bool) -> BoxedFilter<(impl Reply,)> { +pub fn dav_dir( + base: impl AsRef, + index_html: bool, + auto_index: bool, + utc_offset: Option, +) -> BoxedFilter<(impl Reply,)> { let mut builder = DavHandler::builder() .filesystem(LocalFs::new(base, false, false, false)) .locksystem(FakeLs::new()) - .autoindex(auto_index); + .autoindex(auto_index, utc_offset); if index_html { builder = builder.indexfile("index.html".to_string()) }