diff --git a/Cargo.lock b/Cargo.lock index 06106f68..16d066c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -493,7 +493,7 @@ dependencies = [ "num-integer", "num-traits 0.2.15", "serde 1.0.137", - "time 0.1.44", + "time 0.1.43", "winapi 0.3.9", ] @@ -619,7 +619,7 @@ dependencies = [ "percent-encoding 2.1.0", "rand 0.8.5", "sha2", - "time 0.1.44", + "time 0.1.43", ] [[package]] @@ -628,7 +628,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "888604f00b3db336d2af898ec3c1d5d0ddf5e6d462220f2ededc33a87ac4bbd5" dependencies = [ - "time 0.1.44", + "time 0.1.43", "url 1.7.2", ] @@ -645,7 +645,7 @@ dependencies = [ "publicsuffix", "serde 1.0.137", "serde_json", - "time 0.1.44", + "time 0.1.43", "try_from", "url 1.7.2", ] @@ -1129,7 +1129,7 @@ dependencies = [ "encoding", "lazy_static", "rand 0.4.6", - "time 0.1.44", + "time 0.1.43", "version_check 0.1.5", ] @@ -1281,6 +1281,19 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.10.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1507,8 +1520,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad" dependencies = [ "cfg-if 1.0.0", + "js-sys", "libc", - "wasi 0.10.0+wasi-snapshot-preview1", + "wasi 0.10.2+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -1845,7 +1860,7 @@ dependencies = [ "log 0.3.9", "mime 0.2.6", "num_cpus", - "time 0.1.44", + "time 0.1.43", "traitobject", "typeable", "unicase 1.4.2", @@ -1870,7 +1885,7 @@ dependencies = [ "log 0.4.17", "net2", "rustc_version", - "time 0.1.44", + "time 0.1.43", "tokio 0.1.22", "tokio-buf", "tokio-executor", @@ -2205,7 +2220,7 @@ dependencies = [ "email", "lettre", "mime 0.3.16", - "time 0.1.44", + "time 0.1.43", "uuid 0.7.4", ] @@ -2653,6 +2668,15 @@ dependencies = [ "byteorder", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.6", +] + [[package]] name = "native-tls" version = "0.2.10" @@ -3240,6 +3264,8 @@ dependencies = [ "assert-json-diff", "base64 0.13.0", "chrono", + "flume", + "futures 0.3.21", "heck", "hex", "once_cell", @@ -3798,7 +3824,7 @@ dependencies = [ "serde 1.0.137", "serde_json", "serde_urlencoded 0.5.5", - "time 0.1.44", + "time 0.1.43", "tokio 0.1.22", "tokio-executor", "tokio-io", @@ -3940,7 +3966,7 @@ dependencies = [ "rocket_codegen", "rocket_http", "state", - "time 0.1.44", + "time 0.1.43", "toml 0.4.10", "version_check 0.9.4", "yansi", @@ -3983,7 +4009,7 @@ dependencies = [ "ring", "rocket", "serde 1.0.137", - "time 0.1.44", + "time 0.1.43", ] [[package]] @@ -3999,7 +4025,7 @@ dependencies = [ "percent-encoding 1.0.1", "smallvec 1.8.0", "state", - "time 0.1.44", + "time 0.1.43", "unicode-xid 0.1.0", ] @@ -4378,6 +4404,15 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "spin" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c530c2b0d0bf8b69304b39fe2001993e267461948b890cd037d8ad4293fa1a0d" +dependencies = [ + "lock_api 0.4.7", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -4729,12 +4764,11 @@ dependencies = [ [[package]] name = "time" -version = "0.1.44" +version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" +checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" dependencies = [ "libc", - "wasi 0.10.0+wasi-snapshot-preview1", "winapi 0.3.9", ] @@ -5417,9 +5451,9 @@ checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" [[package]] name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" +version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" +checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" [[package]] name = "wasi" diff --git a/plume-common/Cargo.toml b/plume-common/Cargo.toml index 96090759..4539dba0 100644 --- a/plume-common/Cargo.toml +++ b/plume-common/Cargo.toml @@ -23,7 +23,9 @@ askama_escape = "0.10.3" activitystreams = "0.7.0-alpha.18" activitystreams-ext = "0.1.0-alpha.2" url = "2.2.2" +flume = "0.10.12" tokio = { version = "1.18.1", features = ["full"] } +futures = "0.3.21" [dependencies.chrono] features = ["serde"] diff --git a/plume-common/src/activity_pub/mod.rs b/plume-common/src/activity_pub/mod.rs index cb86fcf4..2a076f62 100644 --- a/plume-common/src/activity_pub/mod.rs +++ b/plume-common/src/activity_pub/mod.rs @@ -10,7 +10,8 @@ use activitystreams::{ }; use activitystreams_ext::{Ext1, Ext2, UnparsedExtension}; use array_tool::vec::Uniq; -use reqwest::{header::HeaderValue, ClientBuilder, Url}; +use futures::future::join_all; +use reqwest::{header::HeaderValue, ClientBuilder, RequestBuilder, Url}; use rocket::{ http::Status, request::{FromRequest, Request}, @@ -156,7 +157,29 @@ where .build() .expect("Error while initializing tokio runtime for federation"); rt.block_on(async { - let (tx, mut rx) = mpsc::channel(32); + let capacity = 50; + let (tx, rx) = flume::bounded::(capacity); + let mut handles = Vec::with_capacity(capacity); + for _ in 0..capacity { + let rx = rx.clone(); + let handle = rt.spawn(async move { + while let Ok(request_builder) = rx.recv_async().await { + let _ = request_builder + .send() + .await + .map(move |r| { + if r.status().is_success() { + debug!("Successfully sent activity to inbox ({})", &r.url()); + } else { + warn!("Error while sending to inbox ({:?})", &r) + } + debug!("Response: \"{:?}\"\n", r); + }) + .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); + } + }); + handles.push(handle); + } for inbox in boxes { let body = signed.to_string(); let mut headers = request::headers(); @@ -182,33 +205,11 @@ where request::signature(sender, &headers, ("post", url.path(), url.query())) .expect("activity_pub::broadcast: request signature error"), ); - let client = client.clone(); - let tx = tx.clone(); - let _ = tx.send( - rt.spawn( - client - .post(&inbox) - .headers(headers.clone()) - .body(body) - .send(), - ), - ); - } - while let Some(request) = rx.recv().await { - let _ = request - .await - .map(move |r| { - r.map(|r| { - if r.status().is_success() { - debug!("Successfully sent activity to inbox ({})", &r.url()); - } else { - warn!("Error while sending to inbox ({} {:?})", &r.url(), &r) - } - debug!("Response: \"{:?}\"\n", r); - }) - }) - .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); + let request_builder = client.post(&inbox).headers(headers.clone()).body(body); + tx.send_async(request_builder).await.unwrap(); } + drop(tx); + join_all(handles).await; }); }