diff --git a/Cargo.lock b/Cargo.lock index f0b2fca5..06106f68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -491,7 +491,7 @@ checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" dependencies = [ "libc", "num-integer", - "num-traits 0.2.14", + "num-traits 0.2.15", "serde 1.0.137", "time 0.1.44", "winapi 0.3.9", @@ -1652,6 +1652,25 @@ dependencies = [ "tracing-futures", ] +[[package]] +name = "h2" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37a82c6d637fc9515a4694bbf1cb2457b79d81ce52b3108bdeea58b07dd34a57" +dependencies = [ + "bytes 1.1.0", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.7", + "indexmap", + "slab", + "tokio 1.18.1", + "tokio-util 0.7.1", + "tracing", +] + [[package]] name = "hashbrown" version = "0.11.2" @@ -1785,6 +1804,17 @@ dependencies = [ "http 0.2.7", ] +[[package]] +name = "http-body" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +dependencies = [ + "bytes 1.1.0", + "http 0.2.7", + "pin-project-lite 0.2.9", +] + [[package]] name = "httparse" version = "1.7.1" @@ -1797,6 +1827,12 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47" +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + [[package]] name = "hyper" version = "0.10.16" @@ -1860,7 +1896,7 @@ dependencies = [ "http 0.2.7", "http-body 0.3.1", "httparse", - "httpdate", + "httpdate 0.3.2", "itoa 0.4.8", "pin-project", "socket2 0.3.19", @@ -1870,6 +1906,30 @@ dependencies = [ "want 0.3.0", ] +[[package]] +name = "hyper" +version = "0.14.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2" +dependencies = [ + "bytes 1.1.0", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.3.13", + "http 0.2.7", + "http-body 0.4.4", + "httparse", + "httpdate 1.0.2", + "itoa 1.0.1", + "pin-project-lite 0.2.9", + "socket2 0.4.4", + "tokio 1.18.1", + "tower-service", + "tracing", + "want 0.3.0", +] + [[package]] name = "hyper-tls" version = "0.3.2" @@ -1896,6 +1956,19 @@ dependencies = [ "tokio-tls", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes 1.1.0", + "hyper 0.14.18", + "native-tls", + "tokio 1.18.1", + "tokio-native-tls", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -2728,7 +2801,7 @@ checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" dependencies = [ "autocfg 1.1.0", "num-integer", - "num-traits 0.2.14", + "num-traits 0.2.15", ] [[package]] @@ -2738,7 +2811,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" dependencies = [ "autocfg 1.1.0", - "num-traits 0.2.14", + "num-traits 0.2.15", ] [[package]] @@ -2749,7 +2822,7 @@ checksum = "d41702bd167c2df5520b384281bc111a4b5efcf7fbc4c9c222c815b07e0a6a6a" dependencies = [ "autocfg 1.1.0", "num-integer", - "num-traits 0.2.14", + "num-traits 0.2.15", ] [[package]] @@ -2758,14 +2831,14 @@ version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92e5113e9fd4cc14ded8e499429f396a20f98c772a47cc8622a736e1ec843c31" dependencies = [ - "num-traits 0.2.14", + "num-traits 0.2.15", ] [[package]] name = "num-traits" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" dependencies = [ "autocfg 1.1.0", ] @@ -2782,9 +2855,9 @@ dependencies = [ [[package]] name = "num_threads" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aba1801fb138d8e85e11d0fc70baf4fe1cdfffda7c6cd34a854905df588e5ed0" +checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" dependencies = [ "libc", ] @@ -2834,9 +2907,9 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "openssl" -version = "0.10.39" +version = "0.10.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28f3916d46d9d813a62d7b7d2724d7b14785ac999fb623d990ee4603f9122742" +checksum = "fb81a6430ac911acb25fe5ac8f1d2af1b4ea8a4fdfda0f1ee4292af2e2d8eb0e" dependencies = [ "bitflags 1.3.2", "cfg-if 1.0.0", @@ -3173,15 +3246,16 @@ dependencies = [ "openssl", "pulldown-cmark", "regex-syntax 0.6.25", - "reqwest 0.9.24", + "reqwest 0.11.10", "rocket", "serde 1.0.137", "serde_derive", "serde_json", "shrinkwraprs", "syntect", - "tokio 0.1.22", + "tokio 1.18.1", "tracing", + "url 2.2.2", ] [[package]] @@ -3236,7 +3310,7 @@ dependencies = [ "plume-api", "plume-common", "plume-macro", - "reqwest 0.9.24", + "reqwest 0.11.10", "riker", "rocket", "rocket_i18n", @@ -3724,7 +3798,6 @@ dependencies = [ "serde 1.0.137", "serde_json", "serde_urlencoded 0.5.5", - "socks", "time 0.1.44", "tokio 0.1.22", "tokio-executor", @@ -3771,6 +3844,43 @@ dependencies = [ "winreg 0.7.0", ] +[[package]] +name = "reqwest" +version = "0.11.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46a1f7aa4f35e5e8b4160449f51afc758f0ce6454315a9fa7d0d113e958c41eb" +dependencies = [ + "base64 0.13.0", + "bytes 1.1.0", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.3.13", + "http 0.2.7", + "http-body 0.4.4", + "hyper 0.14.18", + "hyper-tls 0.5.0", + "ipnet", + "js-sys", + "lazy_static", + "log 0.4.17", + "mime 0.3.16", + "native-tls", + "percent-encoding 2.1.0", + "pin-project-lite 0.2.9", + "serde 1.0.137", + "serde_json", + "serde_urlencoded 0.7.1", + "tokio 1.18.1", + "tokio-native-tls", + "tokio-socks", + "url 2.2.2", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg 0.10.1", +] + [[package]] name = "riker" version = "0.4.2" @@ -3928,7 +4038,7 @@ dependencies = [ "num-bigint", "num-integer", "num-rational", - "num-traits 0.2.14", + "num-traits 0.2.15", ] [[package]] @@ -4100,9 +4210,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.80" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f972498cf015f7c0746cac89ebe1d6ef10c293b94175a243a2d9442c163d9944" +checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c" dependencies = [ "itoa 1.0.1", "ryu", @@ -4268,17 +4378,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "socks" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0c3dbbd9ae980613c6dd8e28a9407b50509d3803b57624d5dfe8315218cd58b" -dependencies = [ - "byteorder", - "libc", - "winapi 0.3.9", -] - [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -4684,18 +4783,13 @@ dependencies = [ "futures 0.1.31", "mio 0.6.23", "num_cpus", - "tokio-codec", "tokio-current-thread", "tokio-executor", - "tokio-fs", "tokio-io", "tokio-reactor", - "tokio-sync", "tokio-tcp", "tokio-threadpool", "tokio-timer", - "tokio-udp", - "tokio-uds", ] [[package]] @@ -4732,8 +4826,11 @@ dependencies = [ "libc", "memchr", "mio 0.8.2", + "num_cpus", "once_cell", + "parking_lot 0.12.0", "pin-project-lite 0.2.9", + "signal-hook-registry", "socket2 0.4.4", "tokio-macros 1.7.0", "winapi 0.3.9", @@ -4750,17 +4847,6 @@ dependencies = [ "futures 0.1.31", ] -[[package]] -name = "tokio-codec" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25b2998660ba0e70d18684de5d06b70b70a3a747469af9dea7618cc59e75976b" -dependencies = [ - "bytes 0.4.12", - "futures 0.1.31", - "tokio-io", -] - [[package]] name = "tokio-current-thread" version = "0.1.7" @@ -4781,17 +4867,6 @@ dependencies = [ "futures 0.1.31", ] -[[package]] -name = "tokio-fs" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "297a1206e0ca6302a0eed35b700d292b275256f596e2f3fea7729d5e629b6ff4" -dependencies = [ - "futures 0.1.31", - "tokio-io", - "tokio-threadpool", -] - [[package]] name = "tokio-io" version = "0.1.13" @@ -4854,6 +4929,18 @@ dependencies = [ "tokio-sync", ] +[[package]] +name = "tokio-socks" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51165dfa029d2a65969413a6cc96f354b86b464498702f174a4efa13608fd8c0" +dependencies = [ + "either 1.6.1", + "futures-util", + "thiserror", + "tokio 1.18.1", +] + [[package]] name = "tokio-stream" version = "0.1.8" @@ -4928,39 +5015,6 @@ dependencies = [ "tokio 0.2.25", ] -[[package]] -name = "tokio-udp" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2a0b10e610b39c38b031a2fcab08e4b82f16ece36504988dcbd81dbba650d82" -dependencies = [ - "bytes 0.4.12", - "futures 0.1.31", - "log 0.4.17", - "mio 0.6.23", - "tokio-codec", - "tokio-io", - "tokio-reactor", -] - -[[package]] -name = "tokio-uds" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab57a4ac4111c8c9dbcf70779f6fc8bc35ae4b2454809febac840ad19bd7e4e0" -dependencies = [ - "bytes 0.4.12", - "futures 0.1.31", - "iovec", - "libc", - "log 0.4.17", - "mio 0.6.23", - "mio-uds", - "tokio-codec", - "tokio-io", - "tokio-reactor", -] - [[package]] name = "tokio-util" version = "0.3.1" @@ -5576,6 +5630,15 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "winreg" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "winutil" version = "0.1.1" @@ -5597,9 +5660,9 @@ dependencies = [ [[package]] name = "xattr" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "244c3741f4240ef46274860397c7c74e50eb23624996930e484c16679633a54c" +checksum = "6d1526bbe5aaeb5eb06885f4d987bcdfa5e23187055de9b83fe00156a821fabc" dependencies = [ "libc", ] diff --git a/plume-common/Cargo.toml b/plume-common/Cargo.toml index 00b4ec02..96090759 100644 --- a/plume-common/Cargo.toml +++ b/plume-common/Cargo.toml @@ -11,18 +11,19 @@ heck = "0.4.0" hex = "0.4" openssl = "0.10.22" rocket = "0.4.6" -reqwest = { version = "0.9", features = ["socks"] } +reqwest = { version = "0.11.10", features = ["blocking", "json", "socks"] } serde = "1.0" serde_derive = "1.0" serde_json = "1.0.80" shrinkwraprs = "0.3.0" syntect = "4.5.0" -tokio = "0.1.22" regex-syntax = { version = "0.6.17", default-features = false, features = ["unicode-perl"] } tracing = "0.1.34" askama_escape = "0.10.3" activitystreams = "0.7.0-alpha.18" activitystreams-ext = "0.1.0-alpha.2" +url = "2.2.2" +tokio = { version = "1.18.1", features = ["full"] } [dependencies.chrono] features = ["serde"] diff --git a/plume-common/src/activity_pub/inbox.rs b/plume-common/src/activity_pub/inbox.rs index 2a77c28c..e2a769ee 100644 --- a/plume-common/src/activity_pub/inbox.rs +++ b/plume-common/src/activity_pub/inbox.rs @@ -366,7 +366,7 @@ pub trait FromId: Sized { ) -> Result, Self::Error)> { request::get(id, Self::get_sender(), proxy) .map_err(|_| (None, InboxError::DerefError)) - .and_then(|mut r| { + .and_then(|r| { let json: serde_json::Value = r .json() .map_err(|_| (None, InboxError::InvalidObject(None)))?; diff --git a/plume-common/src/activity_pub/mod.rs b/plume-common/src/activity_pub/mod.rs index afbc3de7..cb86fcf4 100644 --- a/plume-common/src/activity_pub/mod.rs +++ b/plume-common/src/activity_pub/mod.rs @@ -10,14 +10,14 @@ use activitystreams::{ }; use activitystreams_ext::{Ext1, Ext2, UnparsedExtension}; use array_tool::vec::Uniq; -use reqwest::{header::HeaderValue, r#async::ClientBuilder, Url}; +use reqwest::{header::HeaderValue, ClientBuilder, Url}; use rocket::{ http::Status, request::{FromRequest, Request}, response::{Responder, Response}, Outcome, }; -use tokio::prelude::*; +use tokio::{runtime, sync::mpsc}; use tracing::{debug, warn}; use self::sign::Signable; @@ -151,52 +151,65 @@ where .connect_timeout(std::time::Duration::from_secs(5)) .build() .expect("Can't build client"); - let mut rt = tokio::runtime::current_thread::Runtime::new() + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() .expect("Error while initializing tokio runtime for federation"); - for inbox in boxes { - let body = signed.to_string(); - let mut headers = request::headers(); - let url = Url::parse(&inbox); - if url.is_err() { - warn!("Inbox is invalid URL: {:?}", &inbox); - continue; + rt.block_on(async { + let (tx, mut rx) = mpsc::channel(32); + for inbox in boxes { + let body = signed.to_string(); + let mut headers = request::headers(); + let url = Url::parse(&inbox); + if url.is_err() { + warn!("Inbox is invalid URL: {:?}", &inbox); + continue; + } + let url = url.unwrap(); + if !url.has_host() { + warn!("Inbox doesn't have host: {:?}", &inbox); + continue; + }; + let host_header_value = HeaderValue::from_str(url.host_str().expect("Unreachable")); + if host_header_value.is_err() { + warn!("Header value is invalid: {:?}", url.host_str()); + continue; + } + headers.insert("Host", host_header_value.unwrap()); + headers.insert("Digest", request::Digest::digest(&body)); + headers.insert( + "Signature", + request::signature(sender, &headers, ("post", url.path(), url.query())) + .expect("activity_pub::broadcast: request signature error"), + ); + let client = client.clone(); + let tx = tx.clone(); + let _ = tx.send( + rt.spawn( + client + .post(&inbox) + .headers(headers.clone()) + .body(body) + .send(), + ), + ); } - let url = url.unwrap(); - if !url.has_host() { - warn!("Inbox doesn't have host: {:?}", &inbox); - continue; - }; - let host_header_value = HeaderValue::from_str(url.host_str().expect("Unreachable")); - if host_header_value.is_err() { - warn!("Header value is invalid: {:?}", url.host_str()); - continue; - } - headers.insert("Host", host_header_value.unwrap()); - headers.insert("Digest", request::Digest::digest(&body)); - rt.spawn( - client - .post(&inbox) - .headers(headers.clone()) - .header( - "Signature", - request::signature(sender, &headers, ("post", url.path(), url.query())) - .expect("activity_pub::broadcast: request signature error"), - ) - .body(body) - .send() - .and_then(move |r| { - if r.status().is_success() { - debug!("Successfully sent activity to inbox ({})", &inbox); - } else { - warn!("Error while sending to inbox ({:?})", &r) - } - r.into_body().concat2() + while let Some(request) = rx.recv().await { + let _ = request + .await + .map(move |r| { + r.map(|r| { + if r.status().is_success() { + debug!("Successfully sent activity to inbox ({})", &r.url()); + } else { + warn!("Error while sending to inbox ({} {:?})", &r.url(), &r) + } + debug!("Response: \"{:?}\"\n", r); + }) }) - .map(move |response| debug!("Response: \"{:?}\"\n", response)) - .map_err(|e| warn!("Error while sending to inbox ({:?})", e)), - ); - } - rt.run().unwrap(); + .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); + } + }); } #[derive(Shrinkwrap, Clone, Serialize, Deserialize)] diff --git a/plume-common/src/activity_pub/request.rs b/plume-common/src/activity_pub/request.rs index 4258bd7b..3d60b820 100644 --- a/plume-common/src/activity_pub/request.rs +++ b/plume-common/src/activity_pub/request.rs @@ -1,10 +1,11 @@ use chrono::{offset::Utc, DateTime}; use openssl::hash::{Hasher, MessageDigest}; use reqwest::{ + blocking::{ClientBuilder, Response}, header::{ HeaderMap, HeaderValue, InvalidHeaderValue, ACCEPT, CONTENT_TYPE, DATE, HOST, USER_AGENT, }, - ClientBuilder, Proxy, Response, Url, UrlError, + Proxy, Url, }; use std::ops::Deref; use std::time::SystemTime; @@ -18,8 +19,8 @@ const PLUME_USER_AGENT: &str = concat!("Plume/", env!("CARGO_PKG_VERSION")); #[derive(Debug)] pub struct Error(); -impl From for Error { - fn from(_err: UrlError) -> Self { +impl From for Error { + fn from(_err: url::ParseError) -> Self { Error() } } diff --git a/plume-models/Cargo.toml b/plume-models/Cargo.toml index 1d6572a6..6436e739 100644 --- a/plume-models/Cargo.toml +++ b/plume-models/Cargo.toml @@ -15,7 +15,7 @@ migrations_internals= "1.4.0" openssl = "0.10.22" rocket = "0.4.6" rocket_i18n = "0.4.1" -reqwest = "0.9" +reqwest = "0.11.10" scheduled-thread-pool = "0.2.2" serde = "1.0" serde_derive = "1.0" diff --git a/plume-models/src/users.rs b/plume-models/src/users.rs index 3086b7a8..4c572b3b 100644 --- a/plume-models/src/users.rs +++ b/plume-models/src/users.rs @@ -244,7 +244,7 @@ impl User { } fn fetch(url: &str) -> Result { - let mut res = get(url, Self::get_sender(), CONFIG.proxy().cloned())?; + let res = get(url, Self::get_sender(), CONFIG.proxy().cloned())?; let text = &res.text()?; // without this workaround, publicKey is not correctly deserialized let ap_sign = serde_json::from_str::(text)?; @@ -568,7 +568,7 @@ impl User { } pub fn fetch_followers_ids(&self) -> Result> { - let mut res = get( + let res = get( &self.followers_endpoint[..], Self::get_sender(), CONFIG.proxy().cloned(),