add tokio (0.2) as dependency to further async-ify our FromData code

i'm using this opportunity to also update reqwest (0.10), but it's
turning out to be a little trickier, as it requires more modern async
setup, and that appears to need a lot of thinking…
This commit is contained in:
Igor Galić 2020-01-29 09:53:25 +01:00
parent 022e037eea
commit 25c5da1a7c
No known key found for this signature in database
GPG Key ID: ACFEFF7F6A123A86
9 changed files with 161 additions and 114 deletions

181
Cargo.lock generated
View File

@ -1439,9 +1439,9 @@ dependencies = [
[[package]]
name = "h2"
version = "0.2.4"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "377038bf3c89d18d6ca1431e7a5027194fbd724ca10592b9487ede5e8e144f42"
checksum = "79b7246d7e4b979c03fa093da39cfb3617a96bbeee6310af63991668d7e843ff"
dependencies = [
"bytes 0.5.4",
"fnv",
@ -1619,12 +1619,13 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"h2 0.2.4",
"h2 0.2.5",
"http 0.2.1",
"http-body 0.3.1",
"httparse",
"itoa",
"log 0.4.8",
"net2",
"pin-project",
"time",
"tokio 0.2.20",
@ -1645,6 +1646,19 @@ dependencies = [
"tokio-io",
]
[[package]]
name = "hyper-tls"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3adcd308402b9553630734e9c36b77a7e48b3821251ca2493e8cd596763aafaa"
dependencies = [
"bytes 0.5.4",
"hyper 0.13.5",
"native-tls",
"tokio 0.2.20",
"tokio-tls",
]
[[package]]
name = "idna"
version = "0.1.5"
@ -2596,8 +2610,9 @@ dependencies = [
"serde",
"serde_json",
"serde_qs",
"shrinkwraprs",
"shrinkwraprs 0.2.3",
"syntect",
"tokio 0.2.20",
"validator",
"validator_derive",
"webfinger",
@ -2632,20 +2647,21 @@ dependencies = [
"array_tool",
"base64 0.10.1",
"chrono",
"futures-util",
"heck",
"hex",
"hyper 0.12.35",
"hyper 0.13.5",
"openssl",
"pulldown-cmark",
"regex-syntax 0.6.17",
"reqwest",
"reqwest 0.10.4",
"rocket",
"serde",
"serde_derive",
"serde_json",
"shrinkwraprs",
"shrinkwraprs 0.2.3",
"syntect",
"tokio 0.1.22",
"tokio 0.2.20",
]
[[package]]
@ -2693,14 +2709,14 @@ dependencies = [
"plume-api",
"plume-common",
"plume-macro",
"reqwest",
"reqwest 0.10.4",
"rocket",
"rocket_i18n",
"scheduled-thread-pool",
"serde",
"serde_derive",
"serde_json",
"shrinkwraprs",
"shrinkwraprs 0.3.0",
"tantivy",
"url 2.1.1",
"walkdir",
@ -3121,14 +3137,14 @@ dependencies = [
"futures 0.1.29",
"http 0.1.21",
"hyper 0.12.35",
"hyper-tls",
"hyper-tls 0.3.2",
"log 0.4.8",
"mime 0.3.16",
"mime_guess 2.0.3",
"native-tls",
"serde",
"serde_json",
"serde_urlencoded",
"serde_urlencoded 0.5.5",
"time",
"tokio 0.1.22",
"tokio-executor",
@ -3140,6 +3156,41 @@ dependencies = [
"winreg",
]
[[package]]
name = "reqwest"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02b81e49ddec5109a9dcfc5f2a317ff53377c915e9ae9d4f2fb50914b85614e2"
dependencies = [
"base64 0.11.0",
"bytes 0.5.4",
"encoding_rs",
"futures-core",
"futures-util",
"http 0.2.1",
"http-body 0.3.1",
"hyper 0.13.5",
"hyper-tls 0.4.1",
"js-sys",
"lazy_static",
"log 0.4.8",
"mime 0.3.16",
"mime_guess 2.0.3",
"native-tls",
"percent-encoding 2.1.0",
"pin-project-lite",
"serde",
"serde_urlencoded 0.6.1",
"time",
"tokio 0.2.20",
"tokio-tls",
"url 2.1.1",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg",
]
[[package]]
name = "ring"
version = "0.16.13"
@ -3467,6 +3518,18 @@ dependencies = [
"url 1.7.2",
]
[[package]]
name = "serde_urlencoded"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97"
dependencies = [
"dtoa",
"itoa",
"serde",
"url 2.1.1",
]
[[package]]
name = "sha1"
version = "0.6.0"
@ -3492,6 +3555,19 @@ dependencies = [
"syn 1.0.18",
]
[[package]]
name = "shrinkwraprs"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e63e6744142336dfb606fe2b068afa2e1cca1ee6a5d8377277a92945d81fa331"
dependencies = [
"bitflags 1.2.1",
"itertools",
"proc-macro2 1.0.12",
"quote 1.0.4",
"syn 1.0.18",
]
[[package]]
name = "signal-hook-registry"
version = "1.2.0"
@ -3915,18 +3991,13 @@ dependencies = [
"futures 0.1.29",
"mio",
"num_cpus",
"tokio-codec",
"tokio-current-thread",
"tokio-executor",
"tokio-fs",
"tokio-io",
"tokio-reactor",
"tokio-sync",
"tokio-tcp",
"tokio-threadpool",
"tokio-timer",
"tokio-udp",
"tokio-uds",
]
[[package]]
@ -3962,17 +4033,6 @@ dependencies = [
"futures 0.1.29",
]
[[package]]
name = "tokio-codec"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25b2998660ba0e70d18684de5d06b70b70a3a747469af9dea7618cc59e75976b"
dependencies = [
"bytes 0.4.12",
"futures 0.1.29",
"tokio-io",
]
[[package]]
name = "tokio-current-thread"
version = "0.1.7"
@ -3993,17 +4053,6 @@ dependencies = [
"futures 0.1.29",
]
[[package]]
name = "tokio-fs"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "297a1206e0ca6302a0eed35b700d292b275256f596e2f3fea7729d5e629b6ff4"
dependencies = [
"futures 0.1.29",
"tokio-io",
"tokio-threadpool",
]
[[package]]
name = "tokio-io"
version = "0.1.13"
@ -4088,36 +4137,13 @@ dependencies = [
]
[[package]]
name = "tokio-udp"
version = "0.1.6"
name = "tokio-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2a0b10e610b39c38b031a2fcab08e4b82f16ece36504988dcbd81dbba650d82"
checksum = "9a70f4fcd7b3b24fb194f837560168208f669ca8cb70d0c4b862944452396343"
dependencies = [
"bytes 0.4.12",
"futures 0.1.29",
"log 0.4.8",
"mio",
"tokio-codec",
"tokio-io",
"tokio-reactor",
]
[[package]]
name = "tokio-uds"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5076db410d6fdc6523df7595447629099a1fdc47b3d9f896220780fa48faf798"
dependencies = [
"bytes 0.4.12",
"futures 0.1.29",
"iovec",
"libc",
"log 0.4.8",
"mio",
"mio-uds",
"tokio-codec",
"tokio-io",
"tokio-reactor",
"native-tls",
"tokio 0.2.20",
]
[[package]]
@ -4415,6 +4441,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c7d40d09cdbf0f4895ae58cf57d92e1e57a9dd8ed2e8390514b54a47cc5551"
dependencies = [
"cfg-if",
"serde",
"serde_json",
"wasm-bindgen-macro",
]
@ -4433,6 +4461,18 @@ dependencies = [
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a369c5e1dfb7569e14d62af4da642a3cbc2f9a3652fe586e26ac22222aa4b04"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.62"
@ -4474,13 +4514,12 @@ dependencies = [
[[package]]
name = "webfinger"
version = "0.4.1"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec24b1b0700d4b466d280228ed0f62274eedeaa80206820f071fdc8ed787b664"
checksum = "4b2821f6de671bfbe4792927622004b6f5364068b8edba4f51af14c10b36204e"
dependencies = [
"reqwest",
"reqwest 0.9.24",
"serde",
"serde_derive",
]
[[package]]

View File

@ -30,9 +30,10 @@ serde_json = "1.0"
serde_qs = "0.5"
shrinkwraprs = "0.2.1"
syntect = "3.3"
tokio = "0.2"
validator = "0.8"
validator_derive = "0.8"
webfinger = "0.4.1"
webfinger = "0.5"
[[bin]]
name = "plume"

View File

@ -10,18 +10,19 @@ activitystreams-derive = "0.1.1"
activitystreams-traits = "0.1.0"
array_tool = "1.0"
base64 = "0.10"
futures-util = "*"
heck = "0.3.0"
hex = "0.3"
hyper = "0.12.33"
hyper = "0.13"
openssl = "0.10.22"
rocket = { git = "https://github.com/SergioBenitez/Rocket", rev = "async" }
reqwest = "0.9"
reqwest = "0.10"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
shrinkwraprs = "0.2.1"
syntect = "3.3"
tokio = "0.1.22"
tokio = "0.2"
regex-syntax = { version = "0.6.17", default-features = false, features = ["unicode-perl"] }
[dependencies.chrono]

View File

@ -280,7 +280,7 @@ pub trait FromId<C>: Sized {
/// Dereferences an ID
fn deref(id: &str) -> Result<Self::Object, (Option<serde_json::Value>, Self::Error)> {
reqwest::ClientBuilder::new()
.connect_timeout(Some(std::time::Duration::from_secs(5)))
.connect_timeout(std::time::Duration::from_secs(5))
.build()
.map_err(|_| (None, InboxError::DerefError.into()))?
.get(id)

View File

@ -1,6 +1,6 @@
use activitypub::{Activity, Link, Object};
use array_tool::vec::Uniq;
use reqwest::r#async::ClientBuilder;
use reqwest::ClientBuilder;
use rocket::{
http::Status,
request::{FromRequestFuture, FromRequestAsync, Request},
@ -118,9 +118,10 @@ impl<'a, 'r> FromRequestAsync<'a, 'r> for ApRequest {
})
}
}
pub fn broadcast<S, A, T, C>(sender: &S, act: A, to: Vec<T>)
pub fn broadcast<S, A, T, C>(sender: &'static S, act: A, to: Vec<T>)
where
S: sign::Signer,
S: std::marker::Sync,
A: Activity,
T: inbox::AsActor<C>,
{
@ -140,7 +141,9 @@ where
.sign(sender)
.expect("activity_pub::broadcast: signature error");
let mut rt = tokio::runtime::current_thread::Runtime::new()
let mut rt = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.expect("Error while initializing tokio runtime for federation");
let client = ClientBuilder::new()
.connect_timeout(std::time::Duration::from_secs(5))
@ -150,7 +153,7 @@ where
let body = signed.to_string();
let mut headers = request::headers();
headers.insert("Digest", request::Digest::digest(&body));
rt.spawn(
rt.spawn(async move{
client
.post(&inbox)
.headers(headers.clone())
@ -161,15 +164,17 @@ where
)
.body(body)
.send()
.and_then(|r| r.into_body().concat2())
.await
.unwrap()
.text()
.await
.map(move |response| {
println!("Successfully sent activity to inbox ({})", inbox);
println!("Response: \"{:?}\"\n", response)
})
.map_err(|e| println!("Error while sending to inbox ({:?})", e)),
);
.map_err(|e| println!("Error while sending to inbox ({:?})", e))
});
}
rt.run().unwrap();
}
#[derive(Shrinkwrap, Clone, Serialize, Deserialize)]

View File

@ -16,7 +16,7 @@ lazy_static = "1.0"
migrations_internals= "1.4.0"
openssl = "0.10.22"
rocket = { git = "https://github.com/SergioBenitez/Rocket", rev = "async" }
reqwest = "0.9"
reqwest = "0.10"
scheduled-thread-pool = "0.2.2"
serde = "1.0"
serde_derive = "1.0"
@ -24,9 +24,9 @@ serde_json = "1.0"
tantivy = "0.10.1"
url = "2.1"
walkdir = "2.2"
webfinger = "0.4.1"
webfinger = "0.5"
whatlang = "0.7.1"
shrinkwraprs = "0.2.1"
shrinkwraprs = "0.3"
diesel-derive-newtype = "0.1.2"
glob = "0.3.0"
@ -54,7 +54,7 @@ default-features = false
features = ["rocket"]
[dev-dependencies]
diesel_migrations = "1.3.0"
diesel_migrations = "1.4.0"
[features]
postgres = ["diesel/postgres", "plume-macro/postgres" ]

View File

@ -10,7 +10,8 @@ msgstr ""
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Plural-Forms: nplurals=6; plural=(n==0 ? 0 : n==1 ? 1 : n==2 ? 2 : n%100>=3 && n%100<=10 ? 3 : n%100>=11 && n%100<=99 ? 4 : 5);\n"
"Plural-Forms: nplurals=6; plural=(n==0 ? 0 : n==1 ? 1 : n==2 ? 2 : n%100>=3 "
"&& n%100<=10 ? 3 : n%100>=11 && n%100<=99 ? 4 : 5);\n"
"X-Crowdin-Project: plume\n"
"X-Crowdin-Language: ar\n"
"X-Crowdin-File: /master/po/plume/plume.pot\n"

View File

@ -27,20 +27,20 @@ impl From<std::option::NoneError> for ApiError {
}
impl<'r> Responder<'r> for ApiError {
fn respond_to(self, req: &Request<'_>) -> response::Result<'r> {
fn respond_to(self, req: &'r Request) -> response::ResultFuture<'r> {
match self.0 {
Error::NotFound => Json(json!({
"error": "Not found"
}))
.respond_to(req),
.respond_to(req),
Error::Unauthorized => Json(json!({
"error": "You are not authorized to access this resource"
}))
.respond_to(req),
.respond_to(req),
_ => Json(json!({
"error": "Server error"
}))
.respond_to(req),
.respond_to(req),
}
}
}

View File

@ -73,32 +73,32 @@ impl<'a, T: Deserialize<'a>> FromData<'a> for SignedJson<T> {
type Owned = String;
type Borrowed = str;
fn transform(
r: &Request<'_>,
d: Data,
) -> Transform<rocket::data::Outcome<Self::Owned, Self::Error>> {
let size_limit = r.limits().get("json").unwrap_or(JSON_LIMIT);
let mut s = String::with_capacity(512);
match d.open().take(size_limit).read_to_string(&mut s) {
Ok(_) => Transform::Borrowed(Success(s)),
Err(e) => Transform::Borrowed(Failure((Status::BadRequest, JsonError::Io(e)))),
}
fn transform<'r>(
r: &'r Request,
d: Data
) -> TransformFuture<'r, Self::Owned, Self::Error> {
Box::pin(async move {
let size_limit = r.limits().get("json").unwrap_or(JSON_LIMIT);
let mut s = String::with_capacity(512);
let outcome = match d.open().take(size_limit).read_to_string(&mut s) {
Ok(_) => Success(s),
Err(e) => Failure((Status::BadRequest, JsonError::Io(e))),
};
Transform::Borrowed(outcome)
})
}
fn from_data(
_: &Request<'_>,
o: Transformed<'a, Self>,
) -> rocket::data::Outcome<Self, Self::Error> {
let string = o.borrowed()?;
match serde_json::from_str(&string) {
Ok(v) => Success(SignedJson(Digest::from_body(&string), Json(v))),
Err(e) => {
if e.is_data() {
Failure((Status::UnprocessableEntity, JsonError::Parse(string, e)))
} else {
Failure((Status::BadRequest, JsonError::Parse(string, e)))
}
) -> FromDataFuture<'a, Self, Self::Error> {
Box::pin(async move {
let string = try_outcome!(o.borrowed());
match serde_json::from_str(&string) {
Ok(v) => Success(SignedJson(Digest::from_body(&string), Json(v))),
Err(e) if e.is_data() => return Failure((Status::UnprocessableEntity, JsonError::Parse(string, e))),
Err(e) => Failure((Status::BadRequest, JsonError::Parse(string, e))),
}
}
})
}
}