From 504d41d88762edba54d8e5c200af8afa058b9def Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 08:38:57 +0900 Subject: [PATCH 1/7] Add flume to plume-common's dependencies --- plume-common/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/plume-common/Cargo.toml b/plume-common/Cargo.toml index 795cec9e..c3ad3684 100644 --- a/plume-common/Cargo.toml +++ b/plume-common/Cargo.toml @@ -24,6 +24,7 @@ regex-syntax = { version = "0.6.17", default-features = false, features = ["unic tracing = "0.1.34" askama_escape = "0.10.3" url = "2.2.2" +flume = "0.10.12" [dependencies.chrono] features = ["serde"] From 2326eb77cd6f5901f4ad7010b0e2480ad2b394ea Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 08:42:26 +0900 Subject: [PATCH 2/7] Add tokio to plume-common's dependencies --- plume-common/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/plume-common/Cargo.toml b/plume-common/Cargo.toml index c3ad3684..45387658 100644 --- a/plume-common/Cargo.toml +++ b/plume-common/Cargo.toml @@ -25,6 +25,7 @@ tracing = "0.1.34" askama_escape = "0.10.3" url = "2.2.2" flume = "0.10.12" +tokio = { version = "1.18.1", features = ["full"] } [dependencies.chrono] features = ["serde"] From e0258003b98b17117c07c1642ac012068b4adee7 Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 09:04:34 +0900 Subject: [PATCH 3/7] Install tokio and flume --- Cargo.lock | 190 ++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 151 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b7838d43..15a905f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -137,7 +137,7 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" dependencies = [ - "getrandom 0.2.4", + "getrandom 0.2.6", "once_cell", "version_check 0.9.4", ] @@ -267,9 +267,9 @@ checksum = "1d49d90015b3c36167a20fe2810c5cd875ad504b39cff3d4eae7977e6b7c1cb2" [[package]] name = "autocfg" -version = "1.0.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "backtrace" @@ -350,7 +350,7 @@ checksum = "6fe4fef31efb0f76133ae8e3576a88e58edb7cfc5584c81c758c349ba46b43fc" dependencies = [ "base64 0.13.0", "blowfish", - "getrandom 0.2.4", + "getrandom 0.2.6", "zeroize", ] @@ -604,7 +604,7 @@ version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "615f6e27d000a2bffbc7f2f6a8669179378fa27ee4d0a509e985dfc0a7defb40" dependencies = [ - "getrandom 0.2.4", + "getrandom 0.2.6", "lazy_static", "proc-macro-hack 0.5.19", "tiny-keccak", @@ -779,7 +779,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "cfg-if 0.1.10", "crossbeam-utils 0.7.2", "lazy_static", @@ -828,7 +828,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "cfg-if 0.1.10", "lazy_static", ] @@ -1293,6 +1293,19 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.10.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1514,13 +1527,15 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.4" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c" +checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad" dependencies = [ "cfg-if 1.0.0", + "js-sys", "libc", "wasi 0.10.2+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -1678,7 +1693,7 @@ dependencies = [ "http 0.2.6", "indexmap", "slab", - "tokio 1.17.0", + "tokio 1.18.1", "tokio-util 0.6.9", "tracing", ] @@ -1936,7 +1951,7 @@ dependencies = [ "itoa 1.0.1", "pin-project-lite 0.2.8", "socket2 0.4.4", - "tokio 1.17.0", + "tokio 1.18.1", "tower-service", "tracing", "want 0.3.0", @@ -1977,7 +1992,7 @@ dependencies = [ "bytes 1.1.0", "hyper 0.14.18", "native-tls", - "tokio 1.17.0", + "tokio 1.18.1", "tokio-native-tls", ] @@ -2021,7 +2036,7 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "hashbrown 0.11.2", ] @@ -2173,7 +2188,7 @@ dependencies = [ "nom 2.2.1", "percent-encoding 2.1.0", "thiserror", - "tokio 1.17.0", + "tokio 1.18.1", "tokio-native-tls", "tokio-stream", "tokio-util 0.7.0", @@ -2366,10 +2381,11 @@ dependencies = [ [[package]] name = "lock_api" -version = "0.4.5" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109" +checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" dependencies = [ + "autocfg 1.1.0", "scopeguard", ] @@ -2466,7 +2482,7 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", ] [[package]] @@ -2475,7 +2491,7 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", ] [[package]] @@ -2537,7 +2553,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b" dependencies = [ "adler", - "autocfg 1.0.1", + "autocfg 1.1.0", ] [[package]] @@ -2561,14 +2577,15 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.0" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba272f85fa0b41fc91872be579b3bbe0f56b792aa361a380eb669469f68dafb2" +checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9" dependencies = [ "libc", "log 0.4.14", "miow 0.3.7", "ntapi", + "wasi 0.11.0+wasi-snapshot-preview1", "winapi 0.3.9", ] @@ -2655,6 +2672,15 @@ dependencies = [ "byteorder", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.6", +] + [[package]] name = "native-tls" version = "0.2.10" @@ -2802,7 +2828,7 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "num-integer", "num-traits 0.2.14", ] @@ -2813,7 +2839,7 @@ version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "num-traits 0.2.14", ] @@ -2823,7 +2849,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d41702bd167c2df5520b384281bc111a4b5efcf7fbc4c9c222c815b07e0a6a6a" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "num-integer", "num-traits 0.2.14", ] @@ -2843,7 +2869,7 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", ] [[package]] @@ -2922,7 +2948,7 @@ version = "0.9.72" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "cc", "libc", "pkg-config", @@ -2965,10 +2991,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" dependencies = [ "instant", - "lock_api 0.4.5", + "lock_api 0.4.7", "parking_lot_core 0.8.5", ] +[[package]] +name = "parking_lot" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" +dependencies = [ + "lock_api 0.4.7", + "parking_lot_core 0.9.3", +] + [[package]] name = "parking_lot_core" version = "0.6.2" @@ -2998,6 +3034,19 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "parking_lot_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "redox_syscall 0.2.10", + "smallvec 1.8.0", + "windows-sys", +] + [[package]] name = "pear" version = "0.1.4" @@ -3215,6 +3264,7 @@ dependencies = [ "askama_escape", "base64 0.13.0", "chrono", + "flume", "heck", "hex", "once_cell", @@ -3228,6 +3278,7 @@ dependencies = [ "serde_json", "shrinkwraprs", "syntect", + "tokio 1.18.1", "tracing", "url 2.2.2", ] @@ -3615,7 +3666,7 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" dependencies = [ - "getrandom 0.2.4", + "getrandom 0.2.6", ] [[package]] @@ -3713,7 +3764,7 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90" dependencies = [ - "autocfg 1.0.1", + "autocfg 1.1.0", "crossbeam-deque 0.8.1", "either 1.6.1", "rayon-core", @@ -3884,7 +3935,7 @@ dependencies = [ "serde 1.0.137", "serde_json", "serde_urlencoded 0.7.1", - "tokio 1.17.0", + "tokio 1.18.1", "tokio-native-tls", "tokio-socks", "url 2.2.2", @@ -4391,6 +4442,15 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "spin" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c530c2b0d0bf8b69304b39fe2001993e267461948b890cd037d8ad4293fa1a0d" +dependencies = [ + "lock_api 0.4.7", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -4840,16 +4900,19 @@ dependencies = [ [[package]] name = "tokio" -version = "1.17.0" +version = "1.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" +checksum = "dce653fb475565de9f6fb0614b28bca8df2c430c0cf84bcd9c843f15de5414cc" dependencies = [ "bytes 1.1.0", "libc", "memchr", - "mio 0.8.0", + "mio 0.8.2", "num_cpus", + "once_cell", + "parking_lot 0.12.0", "pin-project-lite 0.2.8", + "signal-hook-registry", "socket2 0.4.4", "tokio-macros 1.7.0", "winapi 0.3.9", @@ -4926,7 +4989,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" dependencies = [ "native-tls", - "tokio 1.17.0", + "tokio 1.18.1", ] [[package]] @@ -4957,7 +5020,7 @@ dependencies = [ "either 1.6.1", "futures-util", "thiserror", - "tokio 1.17.0", + "tokio 1.18.1", ] [[package]] @@ -4968,7 +5031,7 @@ checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" dependencies = [ "futures-core", "pin-project-lite 0.2.8", - "tokio 1.17.0", + "tokio 1.18.1", ] [[package]] @@ -5059,7 +5122,7 @@ dependencies = [ "futures-sink", "log 0.4.14", "pin-project-lite 0.2.8", - "tokio 1.17.0", + "tokio 1.18.1", ] [[package]] @@ -5073,7 +5136,7 @@ dependencies = [ "futures-sink", "log 0.4.14", "pin-project-lite 0.2.8", - "tokio 1.17.0", + "tokio 1.18.1", ] [[package]] @@ -5328,7 +5391,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" dependencies = [ - "getrandom 0.2.4", + "getrandom 0.2.6", "serde 1.0.137", ] @@ -5454,6 +5517,12 @@ version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "wasm-bindgen" version = "0.2.80" @@ -5596,6 +5665,49 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-sys" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" + +[[package]] +name = "windows_i686_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" + +[[package]] +name = "windows_i686_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" + [[package]] name = "winreg" version = "0.6.2" From a7b899817acbf951d0b56c9e69c439eae332d63b Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 09:04:54 +0900 Subject: [PATCH 4/7] Run HTTP request in broadcast() on tokio runtime --- plume-common/src/activity_pub/mod.rs | 88 +++++++++++++++------------- 1 file changed, 48 insertions(+), 40 deletions(-) diff --git a/plume-common/src/activity_pub/mod.rs b/plume-common/src/activity_pub/mod.rs index 9b85a4c0..3715aadd 100644 --- a/plume-common/src/activity_pub/mod.rs +++ b/plume-common/src/activity_pub/mod.rs @@ -1,12 +1,13 @@ use activitypub::{Activity, Link, Object}; use array_tool::vec::Uniq; -use reqwest::{blocking::ClientBuilder, header::HeaderValue, Url}; +use reqwest::{header::HeaderValue, ClientBuilder, Url}; use rocket::{ http::Status, request::{FromRequest, Request}, response::{Responder, Response}, Outcome, }; +use tokio::runtime; use tracing::{debug, warn}; use self::sign::Signable; @@ -139,46 +140,53 @@ where .connect_timeout(std::time::Duration::from_secs(5)) .build() .expect("Can't build client"); - for inbox in boxes { - let body = signed.to_string(); - let mut headers = request::headers(); - let url = Url::parse(&inbox); - if url.is_err() { - warn!("Inbox is invalid URL: {:?}", &inbox); - continue; + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Error while initializing tokio runtime for federation"); + rt.block_on(async { + for inbox in boxes { + let body = signed.to_string(); + let mut headers = request::headers(); + let url = Url::parse(&inbox); + if url.is_err() { + warn!("Inbox is invalid URL: {:?}", &inbox); + continue; + } + let url = url.unwrap(); + if !url.has_host() { + warn!("Inbox doesn't have host: {:?}", &inbox); + continue; + }; + let host_header_value = HeaderValue::from_str(url.host_str().expect("Unreachable")); + if host_header_value.is_err() { + warn!("Header value is invalid: {:?}", url.host_str()); + continue; + } + headers.insert("Host", host_header_value.unwrap()); + headers.insert("Digest", request::Digest::digest(&body)); + let _ = client + .post(&inbox) + .headers(headers.clone()) + .header( + "Signature", + request::signature(sender, &headers, ("post", url.path(), url.query())) + .expect("activity_pub::broadcast: request signature error"), + ) + .body(body) + .send() + .await + .map(move |r| { + if r.status().is_success() { + debug!("Successfully sent activity to inbox ({})", &inbox); + } else { + warn!("Error while sending to inbox ({} {:?})", &inbox, &r) + } + debug!("Response: \"{:?}\"\n", r); + }) + .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); } - let url = url.unwrap(); - if !url.has_host() { - warn!("Inbox doesn't have host: {:?}", &inbox); - continue; - }; - let host_header_value = HeaderValue::from_str(url.host_str().expect("Unreachable")); - if host_header_value.is_err() { - warn!("Header value is invalid: {:?}", url.host_str()); - continue; - } - headers.insert("Host", host_header_value.unwrap()); - headers.insert("Digest", request::Digest::digest(&body)); - let _ = client - .post(&inbox) - .headers(headers.clone()) - .header( - "Signature", - request::signature(sender, &headers, ("post", url.path(), url.query())) - .expect("activity_pub::broadcast: request signature error"), - ) - .body(body) - .send() - .map(move |r| { - if r.status().is_success() { - debug!("Successfully sent activity to inbox ({})", &inbox); - } else { - warn!("Error while sending to inbox ({} {:?})", &inbox, &r) - } - debug!("Response: \"{:?}\"\n", r); - }) - .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); - } + }); } #[derive(Shrinkwrap, Clone, Serialize, Deserialize)] From 76ca7c14629722f01e5715fcb26b998e61ce4702 Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 12:40:37 +0900 Subject: [PATCH 5/7] Add futures to plume-common's dependencies --- plume-common/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/plume-common/Cargo.toml b/plume-common/Cargo.toml index 45387658..f1763ad1 100644 --- a/plume-common/Cargo.toml +++ b/plume-common/Cargo.toml @@ -26,6 +26,7 @@ askama_escape = "0.10.3" url = "2.2.2" flume = "0.10.12" tokio = { version = "1.18.1", features = ["full"] } +futures = "0.3.21" [dependencies.chrono] features = ["serde"] From 1f8da7e63db53fb6706e5d92a81697b81c3f66e5 Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 12:41:16 +0900 Subject: [PATCH 6/7] Install futures --- Cargo.lock | 45 +++++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 15a905f3..c20116f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1406,9 +1406,9 @@ checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" [[package]] name = "futures" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28560757fe2bb34e79f907794bb6b22ae8b0e5c669b638a1132f2592b19035b4" +checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" dependencies = [ "futures-channel", "futures-core", @@ -1421,9 +1421,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3dda0b6588335f360afc675d0564c17a77a2bda81ca178a4b6081bd86c7f0b" +checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" dependencies = [ "futures-core", "futures-sink", @@ -1431,9 +1431,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7" +checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" [[package]] name = "futures-cpupool" @@ -1447,9 +1447,9 @@ dependencies = [ [[package]] name = "futures-executor" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29d6d2ff5bb10fb95c85b8ce46538a2e5f5e7fdc755623a7d4529ab8a4ed9d2a" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" dependencies = [ "futures-core", "futures-task", @@ -1459,15 +1459,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f9d34af5a1aac6fb380f735fe510746c38067c5bf16c7fd250280503c971b2" +checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" [[package]] name = "futures-macro" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" dependencies = [ "proc-macro2 1.0.36", "quote 1.0.15", @@ -1476,21 +1476,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3055baccb68d74ff6480350f8d6eb8fcfa3aa11bdc1a1ae3afdd0514617d508" +checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" [[package]] name = "futures-task" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ee7c6485c30167ce4dfb83ac568a849fe53274c831081476ee13e0dce1aad72" +checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" [[package]] name = "futures-util" -version = "0.3.19" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164" +checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ "futures-channel", "futures-core", @@ -2179,7 +2179,7 @@ checksum = "52110b91cbe3a92ba70d4bd64366bfe5c8b8698516155db7041ae3dd155a4fc3" dependencies = [ "async-trait", "bytes 1.1.0", - "futures 0.3.19", + "futures 0.3.21", "futures-util", "lazy_static", "lber", @@ -3265,6 +3265,7 @@ dependencies = [ "base64 0.13.0", "chrono", "flume", + "futures 0.3.21", "heck", "hex", "once_cell", @@ -3954,7 +3955,7 @@ dependencies = [ "chrono", "config", "dashmap", - "futures 0.3.19", + "futures 0.3.21", "num_cpus", "pin-utils", "rand 0.7.3", @@ -4630,7 +4631,7 @@ dependencies = [ "failure", "fnv", "fs2", - "futures 0.3.19", + "futures 0.3.21", "htmlescape", "levenshtein_automata", "log 0.4.14", @@ -4673,7 +4674,7 @@ dependencies = [ "fail", "fnv", "fs2", - "futures 0.3.19", + "futures 0.3.21", "htmlescape", "levenshtein_automata", "log 0.4.14", From 97632fdbfe439a25d311d7c8b07b895fcb846c04 Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 13:03:41 +0900 Subject: [PATCH 7/7] Broadcast asynchronously --- plume-common/src/activity_pub/mod.rs | 44 +++++++++++++++++++--------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/plume-common/src/activity_pub/mod.rs b/plume-common/src/activity_pub/mod.rs index 3715aadd..d83bc735 100644 --- a/plume-common/src/activity_pub/mod.rs +++ b/plume-common/src/activity_pub/mod.rs @@ -1,6 +1,7 @@ use activitypub::{Activity, Link, Object}; use array_tool::vec::Uniq; -use reqwest::{header::HeaderValue, ClientBuilder, Url}; +use futures::future::join_all; +use reqwest::{header::HeaderValue, ClientBuilder, RequestBuilder, Url}; use rocket::{ http::Status, request::{FromRequest, Request}, @@ -145,6 +146,29 @@ where .build() .expect("Error while initializing tokio runtime for federation"); rt.block_on(async { + let capacity = 50; + let (tx, rx) = flume::bounded::(capacity); + let mut handles = Vec::with_capacity(capacity); + for _ in 0..capacity { + let rx = rx.clone(); + let handle = rt.spawn(async move { + while let Ok(request_builder) = rx.recv_async().await { + let _ = request_builder + .send() + .await + .map(move |r| { + if r.status().is_success() { + debug!("Successfully sent activity to inbox ({})", &r.url()); + } else { + warn!("Error while sending to inbox ({:?})", &r) + } + debug!("Response: \"{:?}\"\n", r); + }) + .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); + } + }); + handles.push(handle); + } for inbox in boxes { let body = signed.to_string(); let mut headers = request::headers(); @@ -165,7 +189,7 @@ where } headers.insert("Host", host_header_value.unwrap()); headers.insert("Digest", request::Digest::digest(&body)); - let _ = client + let request_builder = client .post(&inbox) .headers(headers.clone()) .header( @@ -173,19 +197,11 @@ where request::signature(sender, &headers, ("post", url.path(), url.query())) .expect("activity_pub::broadcast: request signature error"), ) - .body(body) - .send() - .await - .map(move |r| { - if r.status().is_success() { - debug!("Successfully sent activity to inbox ({})", &inbox); - } else { - warn!("Error while sending to inbox ({} {:?})", &inbox, &r) - } - debug!("Response: \"{:?}\"\n", r); - }) - .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); + .body(body); + tx.send_async(request_builder).await.unwrap(); } + drop(tx); + join_all(handles).await; }); }