From 25c5da1a7cf63342ae160ca1af38c8fb7be46eb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20Gali=C4=87?= Date: Wed, 29 Jan 2020 09:53:25 +0100 Subject: [PATCH] add tokio (0.2) as dependency to further async-ify our FromData code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit i'm using this opportunity to also update reqwest (0.10), but it's turning out to be a little trickier, as it requires more modern async setup, and that appears to need a lot of thinking… --- Cargo.lock | 181 +++++++++++++++---------- Cargo.toml | 3 +- plume-common/Cargo.toml | 7 +- plume-common/src/activity_pub/inbox.rs | 2 +- plume-common/src/activity_pub/mod.rs | 21 +-- plume-models/Cargo.toml | 8 +- po/plume/ar.po | 3 +- src/api/mod.rs | 8 +- src/inbox.rs | 42 +++--- 9 files changed, 161 insertions(+), 114 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 238d8215..bd20617b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1439,9 +1439,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "377038bf3c89d18d6ca1431e7a5027194fbd724ca10592b9487ede5e8e144f42" +checksum = "79b7246d7e4b979c03fa093da39cfb3617a96bbeee6310af63991668d7e843ff" dependencies = [ "bytes 0.5.4", "fnv", @@ -1619,12 +1619,13 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.2.4", + "h2 0.2.5", "http 0.2.1", "http-body 0.3.1", "httparse", "itoa", "log 0.4.8", + "net2", "pin-project", "time", "tokio 0.2.20", @@ -1645,6 +1646,19 @@ dependencies = [ "tokio-io", ] +[[package]] +name = "hyper-tls" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3adcd308402b9553630734e9c36b77a7e48b3821251ca2493e8cd596763aafaa" +dependencies = [ + "bytes 0.5.4", + "hyper 0.13.5", + "native-tls", + "tokio 0.2.20", + "tokio-tls", +] + [[package]] name = "idna" version = "0.1.5" @@ -2596,8 +2610,9 @@ dependencies = [ "serde", "serde_json", "serde_qs", - "shrinkwraprs", + "shrinkwraprs 0.2.3", "syntect", + "tokio 0.2.20", "validator", "validator_derive", "webfinger", @@ -2632,20 +2647,21 @@ dependencies = [ "array_tool", "base64 0.10.1", "chrono", + "futures-util", "heck", "hex", - "hyper 0.12.35", + "hyper 0.13.5", "openssl", "pulldown-cmark", "regex-syntax 0.6.17", - "reqwest", + "reqwest 0.10.4", "rocket", "serde", "serde_derive", "serde_json", - "shrinkwraprs", + "shrinkwraprs 0.2.3", "syntect", - "tokio 0.1.22", + "tokio 0.2.20", ] [[package]] @@ -2693,14 +2709,14 @@ dependencies = [ "plume-api", "plume-common", "plume-macro", - "reqwest", + "reqwest 0.10.4", "rocket", "rocket_i18n", "scheduled-thread-pool", "serde", "serde_derive", "serde_json", - "shrinkwraprs", + "shrinkwraprs 0.3.0", "tantivy", "url 2.1.1", "walkdir", @@ -3121,14 +3137,14 @@ dependencies = [ "futures 0.1.29", "http 0.1.21", "hyper 0.12.35", - "hyper-tls", + "hyper-tls 0.3.2", "log 0.4.8", "mime 0.3.16", "mime_guess 2.0.3", "native-tls", "serde", "serde_json", - "serde_urlencoded", + "serde_urlencoded 0.5.5", "time", "tokio 0.1.22", "tokio-executor", @@ -3140,6 +3156,41 @@ dependencies = [ "winreg", ] +[[package]] +name = "reqwest" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02b81e49ddec5109a9dcfc5f2a317ff53377c915e9ae9d4f2fb50914b85614e2" +dependencies = [ + "base64 0.11.0", + "bytes 0.5.4", + "encoding_rs", + "futures-core", + "futures-util", + "http 0.2.1", + "http-body 0.3.1", + "hyper 0.13.5", + "hyper-tls 0.4.1", + "js-sys", + "lazy_static", + "log 0.4.8", + "mime 0.3.16", + "mime_guess 2.0.3", + "native-tls", + "percent-encoding 2.1.0", + "pin-project-lite", + "serde", + "serde_urlencoded 0.6.1", + "time", + "tokio 0.2.20", + "tokio-tls", + "url 2.1.1", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "ring" version = "0.16.13" @@ -3467,6 +3518,18 @@ dependencies = [ "url 1.7.2", ] +[[package]] +name = "serde_urlencoded" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97" +dependencies = [ + "dtoa", + "itoa", + "serde", + "url 2.1.1", +] + [[package]] name = "sha1" version = "0.6.0" @@ -3492,6 +3555,19 @@ dependencies = [ "syn 1.0.18", ] +[[package]] +name = "shrinkwraprs" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e63e6744142336dfb606fe2b068afa2e1cca1ee6a5d8377277a92945d81fa331" +dependencies = [ + "bitflags 1.2.1", + "itertools", + "proc-macro2 1.0.12", + "quote 1.0.4", + "syn 1.0.18", +] + [[package]] name = "signal-hook-registry" version = "1.2.0" @@ -3915,18 +3991,13 @@ dependencies = [ "futures 0.1.29", "mio", "num_cpus", - "tokio-codec", "tokio-current-thread", "tokio-executor", - "tokio-fs", "tokio-io", "tokio-reactor", - "tokio-sync", "tokio-tcp", "tokio-threadpool", "tokio-timer", - "tokio-udp", - "tokio-uds", ] [[package]] @@ -3962,17 +4033,6 @@ dependencies = [ "futures 0.1.29", ] -[[package]] -name = "tokio-codec" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25b2998660ba0e70d18684de5d06b70b70a3a747469af9dea7618cc59e75976b" -dependencies = [ - "bytes 0.4.12", - "futures 0.1.29", - "tokio-io", -] - [[package]] name = "tokio-current-thread" version = "0.1.7" @@ -3993,17 +4053,6 @@ dependencies = [ "futures 0.1.29", ] -[[package]] -name = "tokio-fs" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "297a1206e0ca6302a0eed35b700d292b275256f596e2f3fea7729d5e629b6ff4" -dependencies = [ - "futures 0.1.29", - "tokio-io", - "tokio-threadpool", -] - [[package]] name = "tokio-io" version = "0.1.13" @@ -4088,36 +4137,13 @@ dependencies = [ ] [[package]] -name = "tokio-udp" -version = "0.1.6" +name = "tokio-tls" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2a0b10e610b39c38b031a2fcab08e4b82f16ece36504988dcbd81dbba650d82" +checksum = "9a70f4fcd7b3b24fb194f837560168208f669ca8cb70d0c4b862944452396343" dependencies = [ - "bytes 0.4.12", - "futures 0.1.29", - "log 0.4.8", - "mio", - "tokio-codec", - "tokio-io", - "tokio-reactor", -] - -[[package]] -name = "tokio-uds" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5076db410d6fdc6523df7595447629099a1fdc47b3d9f896220780fa48faf798" -dependencies = [ - "bytes 0.4.12", - "futures 0.1.29", - "iovec", - "libc", - "log 0.4.8", - "mio", - "mio-uds", - "tokio-codec", - "tokio-io", - "tokio-reactor", + "native-tls", + "tokio 0.2.20", ] [[package]] @@ -4415,6 +4441,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c7d40d09cdbf0f4895ae58cf57d92e1e57a9dd8ed2e8390514b54a47cc5551" dependencies = [ "cfg-if", + "serde", + "serde_json", "wasm-bindgen-macro", ] @@ -4433,6 +4461,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a369c5e1dfb7569e14d62af4da642a3cbc2f9a3652fe586e26ac22222aa4b04" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.62" @@ -4474,13 +4514,12 @@ dependencies = [ [[package]] name = "webfinger" -version = "0.4.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec24b1b0700d4b466d280228ed0f62274eedeaa80206820f071fdc8ed787b664" +checksum = "4b2821f6de671bfbe4792927622004b6f5364068b8edba4f51af14c10b36204e" dependencies = [ - "reqwest", + "reqwest 0.9.24", "serde", - "serde_derive", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 34f98a0a..57fc39df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,9 +30,10 @@ serde_json = "1.0" serde_qs = "0.5" shrinkwraprs = "0.2.1" syntect = "3.3" +tokio = "0.2" validator = "0.8" validator_derive = "0.8" -webfinger = "0.4.1" +webfinger = "0.5" [[bin]] name = "plume" diff --git a/plume-common/Cargo.toml b/plume-common/Cargo.toml index d6a64aba..6309fee2 100644 --- a/plume-common/Cargo.toml +++ b/plume-common/Cargo.toml @@ -10,18 +10,19 @@ activitystreams-derive = "0.1.1" activitystreams-traits = "0.1.0" array_tool = "1.0" base64 = "0.10" +futures-util = "*" heck = "0.3.0" hex = "0.3" -hyper = "0.12.33" +hyper = "0.13" openssl = "0.10.22" rocket = { git = "https://github.com/SergioBenitez/Rocket", rev = "async" } -reqwest = "0.9" +reqwest = "0.10" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" shrinkwraprs = "0.2.1" syntect = "3.3" -tokio = "0.1.22" +tokio = "0.2" regex-syntax = { version = "0.6.17", default-features = false, features = ["unicode-perl"] } [dependencies.chrono] diff --git a/plume-common/src/activity_pub/inbox.rs b/plume-common/src/activity_pub/inbox.rs index 9c714614..b04b4076 100644 --- a/plume-common/src/activity_pub/inbox.rs +++ b/plume-common/src/activity_pub/inbox.rs @@ -280,7 +280,7 @@ pub trait FromId: Sized { /// Dereferences an ID fn deref(id: &str) -> Result, Self::Error)> { reqwest::ClientBuilder::new() - .connect_timeout(Some(std::time::Duration::from_secs(5))) + .connect_timeout(std::time::Duration::from_secs(5)) .build() .map_err(|_| (None, InboxError::DerefError.into()))? .get(id) diff --git a/plume-common/src/activity_pub/mod.rs b/plume-common/src/activity_pub/mod.rs index 477f3ba7..154224f0 100644 --- a/plume-common/src/activity_pub/mod.rs +++ b/plume-common/src/activity_pub/mod.rs @@ -1,6 +1,6 @@ use activitypub::{Activity, Link, Object}; use array_tool::vec::Uniq; -use reqwest::r#async::ClientBuilder; +use reqwest::ClientBuilder; use rocket::{ http::Status, request::{FromRequestFuture, FromRequestAsync, Request}, @@ -118,9 +118,10 @@ impl<'a, 'r> FromRequestAsync<'a, 'r> for ApRequest { }) } } -pub fn broadcast(sender: &S, act: A, to: Vec) +pub fn broadcast(sender: &'static S, act: A, to: Vec) where S: sign::Signer, + S: std::marker::Sync, A: Activity, T: inbox::AsActor, { @@ -140,7 +141,9 @@ where .sign(sender) .expect("activity_pub::broadcast: signature error"); - let mut rt = tokio::runtime::current_thread::Runtime::new() + let mut rt = tokio::runtime::Builder::new() + .threaded_scheduler() + .build() .expect("Error while initializing tokio runtime for federation"); let client = ClientBuilder::new() .connect_timeout(std::time::Duration::from_secs(5)) @@ -150,7 +153,7 @@ where let body = signed.to_string(); let mut headers = request::headers(); headers.insert("Digest", request::Digest::digest(&body)); - rt.spawn( + rt.spawn(async move{ client .post(&inbox) .headers(headers.clone()) @@ -161,15 +164,17 @@ where ) .body(body) .send() - .and_then(|r| r.into_body().concat2()) + .await + .unwrap() + .text() + .await .map(move |response| { println!("Successfully sent activity to inbox ({})", inbox); println!("Response: \"{:?}\"\n", response) }) - .map_err(|e| println!("Error while sending to inbox ({:?})", e)), - ); + .map_err(|e| println!("Error while sending to inbox ({:?})", e)) + }); } - rt.run().unwrap(); } #[derive(Shrinkwrap, Clone, Serialize, Deserialize)] diff --git a/plume-models/Cargo.toml b/plume-models/Cargo.toml index 5ad8b1d5..8e22ec07 100644 --- a/plume-models/Cargo.toml +++ b/plume-models/Cargo.toml @@ -16,7 +16,7 @@ lazy_static = "1.0" migrations_internals= "1.4.0" openssl = "0.10.22" rocket = { git = "https://github.com/SergioBenitez/Rocket", rev = "async" } -reqwest = "0.9" +reqwest = "0.10" scheduled-thread-pool = "0.2.2" serde = "1.0" serde_derive = "1.0" @@ -24,9 +24,9 @@ serde_json = "1.0" tantivy = "0.10.1" url = "2.1" walkdir = "2.2" -webfinger = "0.4.1" +webfinger = "0.5" whatlang = "0.7.1" -shrinkwraprs = "0.2.1" +shrinkwraprs = "0.3" diesel-derive-newtype = "0.1.2" glob = "0.3.0" @@ -54,7 +54,7 @@ default-features = false features = ["rocket"] [dev-dependencies] -diesel_migrations = "1.3.0" +diesel_migrations = "1.4.0" [features] postgres = ["diesel/postgres", "plume-macro/postgres" ] diff --git a/po/plume/ar.po b/po/plume/ar.po index 85579f24..f464f224 100644 --- a/po/plume/ar.po +++ b/po/plume/ar.po @@ -10,7 +10,8 @@ msgstr "" "MIME-Version: 1.0\n" "Content-Type: text/plain; charset=UTF-8\n" "Content-Transfer-Encoding: 8bit\n" -"Plural-Forms: nplurals=6; plural=(n==0 ? 0 : n==1 ? 1 : n==2 ? 2 : n%100>=3 && n%100<=10 ? 3 : n%100>=11 && n%100<=99 ? 4 : 5);\n" +"Plural-Forms: nplurals=6; plural=(n==0 ? 0 : n==1 ? 1 : n==2 ? 2 : n%100>=3 " +"&& n%100<=10 ? 3 : n%100>=11 && n%100<=99 ? 4 : 5);\n" "X-Crowdin-Project: plume\n" "X-Crowdin-Language: ar\n" "X-Crowdin-File: /master/po/plume/plume.pot\n" diff --git a/src/api/mod.rs b/src/api/mod.rs index 1ea9f6d3..58d0fec3 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -27,20 +27,20 @@ impl From for ApiError { } impl<'r> Responder<'r> for ApiError { - fn respond_to(self, req: &Request<'_>) -> response::Result<'r> { + fn respond_to(self, req: &'r Request) -> response::ResultFuture<'r> { match self.0 { Error::NotFound => Json(json!({ "error": "Not found" })) - .respond_to(req), + .respond_to(req), Error::Unauthorized => Json(json!({ "error": "You are not authorized to access this resource" })) - .respond_to(req), + .respond_to(req), _ => Json(json!({ "error": "Server error" })) - .respond_to(req), + .respond_to(req), } } } diff --git a/src/inbox.rs b/src/inbox.rs index 71270f53..f5d84a27 100644 --- a/src/inbox.rs +++ b/src/inbox.rs @@ -73,32 +73,32 @@ impl<'a, T: Deserialize<'a>> FromData<'a> for SignedJson { type Owned = String; type Borrowed = str; - fn transform( - r: &Request<'_>, - d: Data, - ) -> Transform> { - let size_limit = r.limits().get("json").unwrap_or(JSON_LIMIT); - let mut s = String::with_capacity(512); - match d.open().take(size_limit).read_to_string(&mut s) { - Ok(_) => Transform::Borrowed(Success(s)), - Err(e) => Transform::Borrowed(Failure((Status::BadRequest, JsonError::Io(e)))), - } + fn transform<'r>( + r: &'r Request, + d: Data + ) -> TransformFuture<'r, Self::Owned, Self::Error> { + Box::pin(async move { + let size_limit = r.limits().get("json").unwrap_or(JSON_LIMIT); + let mut s = String::with_capacity(512); + let outcome = match d.open().take(size_limit).read_to_string(&mut s) { + Ok(_) => Success(s), + Err(e) => Failure((Status::BadRequest, JsonError::Io(e))), + }; + Transform::Borrowed(outcome) + }) } fn from_data( _: &Request<'_>, o: Transformed<'a, Self>, - ) -> rocket::data::Outcome { - let string = o.borrowed()?; - match serde_json::from_str(&string) { - Ok(v) => Success(SignedJson(Digest::from_body(&string), Json(v))), - Err(e) => { - if e.is_data() { - Failure((Status::UnprocessableEntity, JsonError::Parse(string, e))) - } else { - Failure((Status::BadRequest, JsonError::Parse(string, e))) - } + ) -> FromDataFuture<'a, Self, Self::Error> { + Box::pin(async move { + let string = try_outcome!(o.borrowed()); + match serde_json::from_str(&string) { + Ok(v) => Success(SignedJson(Digest::from_body(&string), Json(v))), + Err(e) if e.is_data() => return Failure((Status::UnprocessableEntity, JsonError::Parse(string, e))), + Err(e) => Failure((Status::BadRequest, JsonError::Parse(string, e))), } - } + }) } }