From 3eb7662aefbc42ce31f0d24e163eec2510d3d8a2 Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Wed, 4 May 2022 21:04:49 +0900 Subject: [PATCH 01/11] Log inbox URI when broadcast() failed --- plume-common/src/activity_pub/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plume-common/src/activity_pub/mod.rs b/plume-common/src/activity_pub/mod.rs index bd640f10..aa17d86e 100644 --- a/plume-common/src/activity_pub/mod.rs +++ b/plume-common/src/activity_pub/mod.rs @@ -177,7 +177,7 @@ where if r.status().is_success() { debug!("Successfully sent activity to inbox ({})", &inbox); } else { - warn!("Error while sending to inbox ({:?})", &r) + warn!("Error while sending to inbox ({} {:?})", &inbox, &r) } r.into_body().concat2() }) From 35aa2374c4ca57b2db4b02a1df2c43c3c9ed4b15 Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Wed, 4 May 2022 21:12:35 +0900 Subject: [PATCH 02/11] Execute broadcast synchronously --- plume-common/src/activity_pub/mod.rs | 47 ++++++++++++---------------- 1 file changed, 20 insertions(+), 27 deletions(-) diff --git a/plume-common/src/activity_pub/mod.rs b/plume-common/src/activity_pub/mod.rs index aa17d86e..ebc5536f 100644 --- a/plume-common/src/activity_pub/mod.rs +++ b/plume-common/src/activity_pub/mod.rs @@ -1,13 +1,12 @@ use activitypub::{Activity, Link, Object}; use array_tool::vec::Uniq; -use reqwest::{header::HeaderValue, r#async::ClientBuilder, Url}; +use reqwest::{header::HeaderValue, ClientBuilder, Url}; use rocket::{ http::Status, request::{FromRequest, Request}, response::{Responder, Response}, Outcome, }; -use tokio::prelude::*; use tracing::{debug, warn}; use self::sign::Signable; @@ -140,8 +139,6 @@ where .connect_timeout(std::time::Duration::from_secs(5)) .build() .expect("Can't build client"); - let mut rt = tokio::runtime::current_thread::Runtime::new() - .expect("Error while initializing tokio runtime for federation"); for inbox in boxes { let body = signed.to_string(); let mut headers = request::headers(); @@ -162,30 +159,26 @@ where } headers.insert("Host", host_header_value.unwrap()); headers.insert("Digest", request::Digest::digest(&body)); - rt.spawn( - client - .post(&inbox) - .headers(headers.clone()) - .header( - "Signature", - request::signature(sender, &headers, ("post", url.path(), url.query())) - .expect("activity_pub::broadcast: request signature error"), - ) - .body(body) - .send() - .and_then(move |r| { - if r.status().is_success() { - debug!("Successfully sent activity to inbox ({})", &inbox); - } else { - warn!("Error while sending to inbox ({} {:?})", &inbox, &r) - } - r.into_body().concat2() - }) - .map(move |response| debug!("Response: \"{:?}\"\n", response)) - .map_err(|e| warn!("Error while sending to inbox ({:?})", e)), - ); + let _ = client + .post(&inbox) + .headers(headers.clone()) + .header( + "Signature", + request::signature(sender, &headers, ("post", url.path(), url.query())) + .expect("activity_pub::broadcast: request signature error"), + ) + .body(body) + .send() + .map(move |r| { + if r.status().is_success() { + debug!("Successfully sent activity to inbox ({})", &inbox); + } else { + warn!("Error while sending to inbox ({} {:?})", &inbox, &r) + } + debug!("Response: \"{:?}\"\n", r); + }) + .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); } - rt.run().unwrap(); } #[derive(Shrinkwrap, Clone, Serialize, Deserialize)] From 528f1bac48a77f3436433f573bcb10e7c0941cbb Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Wed, 4 May 2022 21:12:52 +0900 Subject: [PATCH 03/11] Remove tokio from dependencies --- plume-common/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/plume-common/Cargo.toml b/plume-common/Cargo.toml index 32db34d3..69b1381f 100644 --- a/plume-common/Cargo.toml +++ b/plume-common/Cargo.toml @@ -20,7 +20,6 @@ serde_derive = "1.0" serde_json = "1.0.80" shrinkwraprs = "0.3.0" syntect = "4.5.0" -tokio = "0.1.22" regex-syntax = { version = "0.6.17", default-features = false, features = ["unicode-perl"] } tracing = "0.1.34" askama_escape = "0.10.3" From 692e6b1c82765bc4fb29c11e2e0a484904bd0125 Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Wed, 4 May 2022 21:13:30 +0900 Subject: [PATCH 04/11] Uninstall tokio --- Cargo.lock | 61 ------------------------------------------------------ 1 file changed, 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 51ebe624..63834f96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3155,7 +3155,6 @@ dependencies = [ "serde_json", "shrinkwraprs", "syntect", - "tokio 0.1.22", "tracing", ] @@ -4708,18 +4707,13 @@ dependencies = [ "futures 0.1.31", "mio 0.6.23", "num_cpus", - "tokio-codec", "tokio-current-thread", "tokio-executor", - "tokio-fs", "tokio-io", "tokio-reactor", - "tokio-sync", "tokio-tcp", "tokio-threadpool", "tokio-timer", - "tokio-udp", - "tokio-uds", ] [[package]] @@ -4773,17 +4767,6 @@ dependencies = [ "futures 0.1.31", ] -[[package]] -name = "tokio-codec" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25b2998660ba0e70d18684de5d06b70b70a3a747469af9dea7618cc59e75976b" -dependencies = [ - "bytes 0.4.12", - "futures 0.1.31", - "tokio-io", -] - [[package]] name = "tokio-current-thread" version = "0.1.7" @@ -4804,17 +4787,6 @@ dependencies = [ "futures 0.1.31", ] -[[package]] -name = "tokio-fs" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "297a1206e0ca6302a0eed35b700d292b275256f596e2f3fea7729d5e629b6ff4" -dependencies = [ - "futures 0.1.31", - "tokio-io", - "tokio-threadpool", -] - [[package]] name = "tokio-io" version = "0.1.13" @@ -4951,39 +4923,6 @@ dependencies = [ "tokio 0.2.25", ] -[[package]] -name = "tokio-udp" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2a0b10e610b39c38b031a2fcab08e4b82f16ece36504988dcbd81dbba650d82" -dependencies = [ - "bytes 0.4.12", - "futures 0.1.31", - "log 0.4.14", - "mio 0.6.23", - "tokio-codec", - "tokio-io", - "tokio-reactor", -] - -[[package]] -name = "tokio-uds" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab57a4ac4111c8c9dbcf70779f6fc8bc35ae4b2454809febac840ad19bd7e4e0" -dependencies = [ - "bytes 0.4.12", - "futures 0.1.31", - "iovec", - "libc", - "log 0.4.14", - "mio 0.6.23", - "mio-uds", - "tokio-codec", - "tokio-io", - "tokio-reactor", -] - [[package]] name = "tokio-util" version = "0.3.1" From 5d711dc47c7fca7895b020a90ded6a90cd4d47a3 Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 00:51:16 +0900 Subject: [PATCH 05/11] Upgrade reqwest to 0.11 --- plume-common/Cargo.toml | 3 ++- plume-models/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/plume-common/Cargo.toml b/plume-common/Cargo.toml index 69b1381f..795cec9e 100644 --- a/plume-common/Cargo.toml +++ b/plume-common/Cargo.toml @@ -14,7 +14,7 @@ heck = "0.4.0" hex = "0.4" openssl = "0.10.22" rocket = "0.4.6" -reqwest = { version = "0.9", features = ["socks"] } +reqwest = { version = "0.11.10", features = ["blocking", "json", "socks"] } serde = "1.0" serde_derive = "1.0" serde_json = "1.0.80" @@ -23,6 +23,7 @@ syntect = "4.5.0" regex-syntax = { version = "0.6.17", default-features = false, features = ["unicode-perl"] } tracing = "0.1.34" askama_escape = "0.10.3" +url = "2.2.2" [dependencies.chrono] features = ["serde"] diff --git a/plume-models/Cargo.toml b/plume-models/Cargo.toml index bb0e8925..5a1765ae 100644 --- a/plume-models/Cargo.toml +++ b/plume-models/Cargo.toml @@ -16,7 +16,7 @@ migrations_internals= "1.4.0" openssl = "0.10.22" rocket = "0.4.6" rocket_i18n = "0.4.1" -reqwest = "0.9" +reqwest = "0.11.10" scheduled-thread-pool = "0.2.2" serde = "1.0" serde_derive = "1.0" From 5c74f598d8470ae11bdced78a3ffdb27d42b40d3 Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 01:21:12 +0900 Subject: [PATCH 06/11] Update Cargo.lock --- Cargo.lock | 180 ++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 157 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63834f96..b7838d43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1664,6 +1664,25 @@ dependencies = [ "tracing-futures", ] +[[package]] +name = "h2" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62eeb471aa3e3c9197aa4bfeabfe02982f6dc96f750486c0bb0009ac58b26d2b" +dependencies = [ + "bytes 1.1.0", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.6", + "indexmap", + "slab", + "tokio 1.17.0", + "tokio-util 0.6.9", + "tracing", +] + [[package]] name = "hashbrown" version = "0.11.2" @@ -1798,10 +1817,21 @@ dependencies = [ ] [[package]] -name = "httparse" -version = "1.5.1" +name = "http-body" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503" +checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +dependencies = [ + "bytes 1.1.0", + "http 0.2.6", + "pin-project-lite 0.2.8", +] + +[[package]] +name = "httparse" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "496ce29bb5a52785b44e0f7ca2847ae0bb839c9bd28f69acac9b99d461c0c04c" [[package]] name = "httpdate" @@ -1809,6 +1839,12 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47" +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + [[package]] name = "hyper" version = "0.10.16" @@ -1872,7 +1908,7 @@ dependencies = [ "http 0.2.6", "http-body 0.3.1", "httparse", - "httpdate", + "httpdate 0.3.2", "itoa 0.4.8", "pin-project", "socket2 0.3.19", @@ -1882,6 +1918,30 @@ dependencies = [ "want 0.3.0", ] +[[package]] +name = "hyper" +version = "0.14.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2" +dependencies = [ + "bytes 1.1.0", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.3.12", + "http 0.2.6", + "http-body 0.4.4", + "httparse", + "httpdate 1.0.2", + "itoa 1.0.1", + "pin-project-lite 0.2.8", + "socket2 0.4.4", + "tokio 1.17.0", + "tower-service", + "tracing", + "want 0.3.0", +] + [[package]] name = "hyper-tls" version = "0.3.2" @@ -1908,6 +1968,19 @@ dependencies = [ "tokio-tls", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes 1.1.0", + "hyper 0.14.18", + "native-tls", + "tokio 1.17.0", + "tokio-native-tls", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -3148,7 +3221,7 @@ dependencies = [ "openssl", "pulldown-cmark", "regex-syntax 0.6.25", - "reqwest 0.9.24", + "reqwest 0.11.10", "rocket", "serde 1.0.137", "serde_derive", @@ -3156,6 +3229,7 @@ dependencies = [ "shrinkwraprs", "syntect", "tracing", + "url 2.2.2", ] [[package]] @@ -3210,7 +3284,7 @@ dependencies = [ "plume-api", "plume-common", "plume-macro", - "reqwest 0.9.24", + "reqwest 0.11.10", "riker", "rocket", "rocket_i18n", @@ -3737,7 +3811,6 @@ dependencies = [ "serde 1.0.137", "serde_json", "serde_urlencoded 0.5.5", - "socks", "time 0.1.43", "tokio 0.1.22", "tokio-executor", @@ -3774,7 +3847,7 @@ dependencies = [ "percent-encoding 2.1.0", "pin-project-lite 0.2.8", "serde 1.0.137", - "serde_urlencoded 0.7.0", + "serde_urlencoded 0.7.1", "tokio 0.2.25", "tokio-tls", "url 2.2.2", @@ -3784,6 +3857,43 @@ dependencies = [ "winreg 0.7.0", ] +[[package]] +name = "reqwest" +version = "0.11.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46a1f7aa4f35e5e8b4160449f51afc758f0ce6454315a9fa7d0d113e958c41eb" +dependencies = [ + "base64 0.13.0", + "bytes 1.1.0", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.3.12", + "http 0.2.6", + "http-body 0.4.4", + "hyper 0.14.18", + "hyper-tls 0.5.0", + "ipnet", + "js-sys", + "lazy_static", + "log 0.4.14", + "mime 0.3.16", + "native-tls", + "percent-encoding 2.1.0", + "pin-project-lite 0.2.8", + "serde 1.0.137", + "serde_json", + "serde_urlencoded 0.7.1", + "tokio 1.17.0", + "tokio-native-tls", + "tokio-socks", + "url 2.2.2", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg 0.10.1", +] + [[package]] name = "riker" version = "0.4.2" @@ -4145,12 +4255,12 @@ dependencies = [ [[package]] name = "serde_urlencoded" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edfa57a7f8d9c1d260a549e7224100f6c43d43f9103e06dd8b4095a9b2b43ce9" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" dependencies = [ "form_urlencoded", - "itoa 0.4.8", + "itoa 1.0.1", "ryu", "serde 1.0.137", ] @@ -4281,18 +4391,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "socks" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30f86c7635fadf2814201a4f67efefb0007588ae7422ce299f354ab5c97f61ae" -dependencies = [ - "byteorder", - "libc", - "winapi 0.2.8", - "ws2_32-sys", -] - [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -4750,6 +4848,7 @@ dependencies = [ "libc", "memchr", "mio 0.8.0", + "num_cpus", "pin-project-lite 0.2.8", "socket2 0.4.4", "tokio-macros 1.7.0", @@ -4849,6 +4948,18 @@ dependencies = [ "tokio-sync", ] +[[package]] +name = "tokio-socks" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51165dfa029d2a65969413a6cc96f354b86b464498702f174a4efa13608fd8c0" +dependencies = [ + "either 1.6.1", + "futures-util", + "thiserror", + "tokio 1.17.0", +] + [[package]] name = "tokio-stream" version = "0.1.8" @@ -4937,6 +5048,20 @@ dependencies = [ "tokio 0.2.25", ] +[[package]] +name = "tokio-util" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" +dependencies = [ + "bytes 1.1.0", + "futures-core", + "futures-sink", + "log 0.4.14", + "pin-project-lite 0.2.8", + "tokio 1.17.0", +] + [[package]] name = "tokio-util" version = "0.7.0" @@ -5489,6 +5614,15 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "winreg" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "winutil" version = "0.1.1" From 2e35441483ebd632a1bba82c812d3f07a89867d6 Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 01:21:25 +0900 Subject: [PATCH 07/11] Follow reqwest change --- plume-common/src/activity_pub/inbox.rs | 2 +- plume-common/src/activity_pub/mod.rs | 2 +- plume-common/src/activity_pub/request.rs | 7 ++++--- plume-models/src/users.rs | 8 ++++---- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/plume-common/src/activity_pub/inbox.rs b/plume-common/src/activity_pub/inbox.rs index 48ba6c5f..a0c3b75c 100644 --- a/plume-common/src/activity_pub/inbox.rs +++ b/plume-common/src/activity_pub/inbox.rs @@ -366,7 +366,7 @@ pub trait FromId: Sized { ) -> Result, Self::Error)> { request::get(id, Self::get_sender(), proxy) .map_err(|_| (None, InboxError::DerefError)) - .and_then(|mut r| { + .and_then(|r| { let json: serde_json::Value = r .json() .map_err(|_| (None, InboxError::InvalidObject(None)))?; diff --git a/plume-common/src/activity_pub/mod.rs b/plume-common/src/activity_pub/mod.rs index ebc5536f..9b85a4c0 100644 --- a/plume-common/src/activity_pub/mod.rs +++ b/plume-common/src/activity_pub/mod.rs @@ -1,6 +1,6 @@ use activitypub::{Activity, Link, Object}; use array_tool::vec::Uniq; -use reqwest::{header::HeaderValue, ClientBuilder, Url}; +use reqwest::{blocking::ClientBuilder, header::HeaderValue, Url}; use rocket::{ http::Status, request::{FromRequest, Request}, diff --git a/plume-common/src/activity_pub/request.rs b/plume-common/src/activity_pub/request.rs index 4258bd7b..3d60b820 100644 --- a/plume-common/src/activity_pub/request.rs +++ b/plume-common/src/activity_pub/request.rs @@ -1,10 +1,11 @@ use chrono::{offset::Utc, DateTime}; use openssl::hash::{Hasher, MessageDigest}; use reqwest::{ + blocking::{ClientBuilder, Response}, header::{ HeaderMap, HeaderValue, InvalidHeaderValue, ACCEPT, CONTENT_TYPE, DATE, HOST, USER_AGENT, }, - ClientBuilder, Proxy, Response, Url, UrlError, + Proxy, Url, }; use std::ops::Deref; use std::time::SystemTime; @@ -18,8 +19,8 @@ const PLUME_USER_AGENT: &str = concat!("Plume/", env!("CARGO_PKG_VERSION")); #[derive(Debug)] pub struct Error(); -impl From for Error { - fn from(_err: UrlError) -> Self { +impl From for Error { + fn from(_err: url::ParseError) -> Self { Error() } } diff --git a/plume-models/src/users.rs b/plume-models/src/users.rs index 1e27fb19..85aaf941 100644 --- a/plume-models/src/users.rs +++ b/plume-models/src/users.rs @@ -243,7 +243,7 @@ impl User { } fn fetch(url: &str) -> Result { - let mut res = get(url, Self::get_sender(), CONFIG.proxy().cloned())?; + let res = get(url, Self::get_sender(), CONFIG.proxy().cloned())?; let text = &res.text()?; // without this workaround, publicKey is not correctly deserialized let ap_sign = serde_json::from_str::(text)?; @@ -482,7 +482,7 @@ impl User { Ok(coll) } fn fetch_outbox_page(&self, url: &str) -> Result<(Vec, Option)> { - let mut res = get(url, Self::get_sender(), CONFIG.proxy().cloned())?; + let res = get(url, Self::get_sender(), CONFIG.proxy().cloned())?; let text = &res.text()?; let json: serde_json::Value = serde_json::from_str(text)?; let items = json["items"] @@ -496,7 +496,7 @@ impl User { Ok((items, next)) } pub fn fetch_outbox(&self) -> Result> { - let mut res = get( + let res = get( &self.outbox_url[..], Self::get_sender(), CONFIG.proxy().cloned(), @@ -532,7 +532,7 @@ impl User { } pub fn fetch_followers_ids(&self) -> Result> { - let mut res = get( + let res = get( &self.followers_endpoint[..], Self::get_sender(), CONFIG.proxy().cloned(), From 9e5f9255d10f814e9abac581c7a9f8a338a5e83c Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 03:34:11 +0900 Subject: [PATCH 08/11] Add tokio to plume-common's dependencies --- plume-common/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/plume-common/Cargo.toml b/plume-common/Cargo.toml index 795cec9e..847e2d57 100644 --- a/plume-common/Cargo.toml +++ b/plume-common/Cargo.toml @@ -24,6 +24,7 @@ regex-syntax = { version = "0.6.17", default-features = false, features = ["unic tracing = "0.1.34" askama_escape = "0.10.3" url = "2.2.2" +tokio = { version = "1.18.1", features = ["full"] } [dependencies.chrono] features = ["serde"] From 9016995d920c441304c35c1915f8655114f4dfc6 Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 04:33:52 +0900 Subject: [PATCH 09/11] Install tokio --- Cargo.lock | 142 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 110 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b7838d43..5ff3c77d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -267,9 +267,9 @@ checksum = "1d49d90015b3c36167a20fe2810c5cd875ad504b39cff3d4eae7977e6b7c1cb2" [[package]] name = "autocfg" -version = "1.0.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "backtrace" @@ -779,7 +779,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "cfg-if 0.1.10", "crossbeam-utils 0.7.2", "lazy_static", @@ -828,7 +828,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "cfg-if 0.1.10", "lazy_static", ] @@ -1678,7 +1678,7 @@ dependencies = [ "http 0.2.6", "indexmap", "slab", - "tokio 1.17.0", + "tokio 1.18.1", "tokio-util 0.6.9", "tracing", ] @@ -1936,7 +1936,7 @@ dependencies = [ "itoa 1.0.1", "pin-project-lite 0.2.8", "socket2 0.4.4", - "tokio 1.17.0", + "tokio 1.18.1", "tower-service", "tracing", "want 0.3.0", @@ -1977,7 +1977,7 @@ dependencies = [ "bytes 1.1.0", "hyper 0.14.18", "native-tls", - "tokio 1.17.0", + "tokio 1.18.1", "tokio-native-tls", ] @@ -2021,7 +2021,7 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "hashbrown 0.11.2", ] @@ -2173,7 +2173,7 @@ dependencies = [ "nom 2.2.1", "percent-encoding 2.1.0", "thiserror", - "tokio 1.17.0", + "tokio 1.18.1", "tokio-native-tls", "tokio-stream", "tokio-util 0.7.0", @@ -2366,10 +2366,11 @@ dependencies = [ [[package]] name = "lock_api" -version = "0.4.5" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109" +checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" dependencies = [ + "autocfg 1.1.0", "scopeguard", ] @@ -2466,7 +2467,7 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", ] [[package]] @@ -2475,7 +2476,7 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", ] [[package]] @@ -2537,7 +2538,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b" dependencies = [ "adler", - "autocfg 1.0.1", + "autocfg 1.1.0", ] [[package]] @@ -2561,14 +2562,15 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.0" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba272f85fa0b41fc91872be579b3bbe0f56b792aa361a380eb669469f68dafb2" +checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9" dependencies = [ "libc", "log 0.4.14", "miow 0.3.7", "ntapi", + "wasi 0.11.0+wasi-snapshot-preview1", "winapi 0.3.9", ] @@ -2802,7 +2804,7 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "num-integer", "num-traits 0.2.14", ] @@ -2813,7 +2815,7 @@ version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "num-traits 0.2.14", ] @@ -2823,7 +2825,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d41702bd167c2df5520b384281bc111a4b5efcf7fbc4c9c222c815b07e0a6a6a" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "num-integer", "num-traits 0.2.14", ] @@ -2843,7 +2845,7 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", ] [[package]] @@ -2922,7 +2924,7 @@ version = "0.9.72" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "cc", "libc", "pkg-config", @@ -2965,10 +2967,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" dependencies = [ "instant", - "lock_api 0.4.5", + "lock_api 0.4.7", "parking_lot_core 0.8.5", ] +[[package]] +name = "parking_lot" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" +dependencies = [ + "lock_api 0.4.7", + "parking_lot_core 0.9.3", +] + [[package]] name = "parking_lot_core" version = "0.6.2" @@ -2998,6 +3010,19 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "parking_lot_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "redox_syscall 0.2.10", + "smallvec 1.8.0", + "windows-sys", +] + [[package]] name = "pear" version = "0.1.4" @@ -3228,6 +3253,7 @@ dependencies = [ "serde_json", "shrinkwraprs", "syntect", + "tokio 1.18.1", "tracing", "url 2.2.2", ] @@ -3713,7 +3739,7 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "crossbeam-deque 0.8.1", "either 1.6.1", "rayon-core", @@ -3884,7 +3910,7 @@ dependencies = [ "serde 1.0.137", "serde_json", "serde_urlencoded 0.7.1", - "tokio 1.17.0", + "tokio 1.18.1", "tokio-native-tls", "tokio-socks", "url 2.2.2", @@ -4840,16 +4866,19 @@ dependencies = [ [[package]] name = "tokio" -version = "1.17.0" +version = "1.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" +checksum = "dce653fb475565de9f6fb0614b28bca8df2c430c0cf84bcd9c843f15de5414cc" dependencies = [ "bytes 1.1.0", "libc", "memchr", - "mio 0.8.0", + "mio 0.8.2", "num_cpus", + "once_cell", + "parking_lot 0.12.0", "pin-project-lite 0.2.8", + "signal-hook-registry", "socket2 0.4.4", "tokio-macros 1.7.0", "winapi 0.3.9", @@ -4926,7 +4955,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" dependencies = [ "native-tls", - "tokio 1.17.0", + "tokio 1.18.1", ] [[package]] @@ -4957,7 +4986,7 @@ dependencies = [ "either 1.6.1", "futures-util", "thiserror", - "tokio 1.17.0", + "tokio 1.18.1", ] [[package]] @@ -4968,7 +4997,7 @@ checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" dependencies = [ "futures-core", "pin-project-lite 0.2.8", - "tokio 1.17.0", + "tokio 1.18.1", ] [[package]] @@ -5059,7 +5088,7 @@ dependencies = [ "futures-sink", "log 0.4.14", "pin-project-lite 0.2.8", - "tokio 1.17.0", + "tokio 1.18.1", ] [[package]] @@ -5073,7 +5102,7 @@ dependencies = [ "futures-sink", "log 0.4.14", "pin-project-lite 0.2.8", - "tokio 1.17.0", + "tokio 1.18.1", ] [[package]] @@ -5454,6 +5483,12 @@ version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "wasm-bindgen" version = "0.2.80" @@ -5596,6 +5631,49 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-sys" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" + +[[package]] +name = "windows_i686_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" + +[[package]] +name = "windows_i686_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" + [[package]] name = "winreg" version = "0.6.2" From ce4b216722c1917a5e98c8ff55962c7c06dae62a Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 04:34:04 +0900 Subject: [PATCH 10/11] Broadcast asynchronously --- plume-common/src/activity_pub/mod.rs | 94 +++++++++++++++++----------- 1 file changed, 56 insertions(+), 38 deletions(-) diff --git a/plume-common/src/activity_pub/mod.rs b/plume-common/src/activity_pub/mod.rs index 9b85a4c0..69d40cd5 100644 --- a/plume-common/src/activity_pub/mod.rs +++ b/plume-common/src/activity_pub/mod.rs @@ -1,12 +1,13 @@ use activitypub::{Activity, Link, Object}; use array_tool::vec::Uniq; -use reqwest::{blocking::ClientBuilder, header::HeaderValue, Url}; +use reqwest::{header::HeaderValue, ClientBuilder, Url}; use rocket::{ http::Status, request::{FromRequest, Request}, response::{Responder, Response}, Outcome, }; +use tokio::{runtime, sync::mpsc}; use tracing::{debug, warn}; use self::sign::Signable; @@ -139,46 +140,63 @@ where .connect_timeout(std::time::Duration::from_secs(5)) .build() .expect("Can't build client"); - for inbox in boxes { - let body = signed.to_string(); - let mut headers = request::headers(); - let url = Url::parse(&inbox); - if url.is_err() { - warn!("Inbox is invalid URL: {:?}", &inbox); - continue; - } - let url = url.unwrap(); - if !url.has_host() { - warn!("Inbox doesn't have host: {:?}", &inbox); - continue; - }; - let host_header_value = HeaderValue::from_str(url.host_str().expect("Unreachable")); - if host_header_value.is_err() { - warn!("Header value is invalid: {:?}", url.host_str()); - continue; - } - headers.insert("Host", host_header_value.unwrap()); - headers.insert("Digest", request::Digest::digest(&body)); - let _ = client - .post(&inbox) - .headers(headers.clone()) - .header( + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Error while initializing tokio runtime for federation"); + rt.block_on(async { + let (tx, mut rx) = mpsc::channel(32); + for inbox in boxes { + let body = signed.to_string(); + let mut headers = request::headers(); + let url = Url::parse(&inbox); + if url.is_err() { + warn!("Inbox is invalid URL: {:?}", &inbox); + continue; + } + let url = url.unwrap(); + if !url.has_host() { + warn!("Inbox doesn't have host: {:?}", &inbox); + continue; + }; + let host_header_value = HeaderValue::from_str(url.host_str().expect("Unreachable")); + if host_header_value.is_err() { + warn!("Header value is invalid: {:?}", url.host_str()); + continue; + } + headers.insert("Host", host_header_value.unwrap()); + headers.insert("Digest", request::Digest::digest(&body)); + headers.insert( "Signature", request::signature(sender, &headers, ("post", url.path(), url.query())) .expect("activity_pub::broadcast: request signature error"), - ) - .body(body) - .send() - .map(move |r| { - if r.status().is_success() { - debug!("Successfully sent activity to inbox ({})", &inbox); - } else { - warn!("Error while sending to inbox ({} {:?})", &inbox, &r) - } - debug!("Response: \"{:?}\"\n", r); - }) - .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); - } + ); + let client = client.clone(); + let tx = tx.clone(); + rt.spawn(async move { + let _ = tx.send( + client + .post(&inbox) + .headers(headers.clone()) + .body(body) + .send() + .await, + ); + }); + } + while let Some(request) = rx.recv().await { + let _ = request + .map(move |r| { + if r.status().is_success() { + debug!("Successfully sent activity to inbox ({})", &r.url()); + } else { + warn!("Error while sending to inbox ({} {:?})", &r.url(), &r) + } + debug!("Response: \"{:?}\"\n", r); + }) + .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); + } + }); } #[derive(Shrinkwrap, Clone, Serialize, Deserialize)] From f22c4d5c787dfcafe0b65c2794108176039c9429 Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 04:51:42 +0900 Subject: [PATCH 11/11] Await in consumer --- plume-common/src/activity_pub/mod.rs | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/plume-common/src/activity_pub/mod.rs b/plume-common/src/activity_pub/mod.rs index 69d40cd5..07a1b908 100644 --- a/plume-common/src/activity_pub/mod.rs +++ b/plume-common/src/activity_pub/mod.rs @@ -173,26 +173,28 @@ where ); let client = client.clone(); let tx = tx.clone(); - rt.spawn(async move { - let _ = tx.send( + let _ = tx.send( + rt.spawn( client .post(&inbox) .headers(headers.clone()) .body(body) - .send() - .await, - ); - }); + .send(), + ), + ); } while let Some(request) = rx.recv().await { let _ = request + .await .map(move |r| { - if r.status().is_success() { - debug!("Successfully sent activity to inbox ({})", &r.url()); - } else { - warn!("Error while sending to inbox ({} {:?})", &r.url(), &r) - } - debug!("Response: \"{:?}\"\n", r); + r.map(|r| { + if r.status().is_success() { + debug!("Successfully sent activity to inbox ({})", &r.url()); + } else { + warn!("Error while sending to inbox ({} {:?})", &r.url(), &r) + } + debug!("Response: \"{:?}\"\n", r); + }) }) .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); }