Merge branch 'fix-timeout' into ap07

This commit is contained in:
Kitaiti Makoto 2022-05-05 13:12:04 +09:00
commit 5871ed7301
3 changed files with 83 additions and 46 deletions

70
Cargo.lock generated
View File

@ -493,7 +493,7 @@ dependencies = [
"num-integer", "num-integer",
"num-traits 0.2.15", "num-traits 0.2.15",
"serde 1.0.137", "serde 1.0.137",
"time 0.1.44", "time 0.1.43",
"winapi 0.3.9", "winapi 0.3.9",
] ]
@ -619,7 +619,7 @@ dependencies = [
"percent-encoding 2.1.0", "percent-encoding 2.1.0",
"rand 0.8.5", "rand 0.8.5",
"sha2", "sha2",
"time 0.1.44", "time 0.1.43",
] ]
[[package]] [[package]]
@ -628,7 +628,7 @@ version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "888604f00b3db336d2af898ec3c1d5d0ddf5e6d462220f2ededc33a87ac4bbd5" checksum = "888604f00b3db336d2af898ec3c1d5d0ddf5e6d462220f2ededc33a87ac4bbd5"
dependencies = [ dependencies = [
"time 0.1.44", "time 0.1.43",
"url 1.7.2", "url 1.7.2",
] ]
@ -645,7 +645,7 @@ dependencies = [
"publicsuffix", "publicsuffix",
"serde 1.0.137", "serde 1.0.137",
"serde_json", "serde_json",
"time 0.1.44", "time 0.1.43",
"try_from", "try_from",
"url 1.7.2", "url 1.7.2",
] ]
@ -1129,7 +1129,7 @@ dependencies = [
"encoding", "encoding",
"lazy_static", "lazy_static",
"rand 0.4.6", "rand 0.4.6",
"time 0.1.44", "time 0.1.43",
"version_check 0.1.5", "version_check 0.1.5",
] ]
@ -1281,6 +1281,19 @@ dependencies = [
"miniz_oxide", "miniz_oxide",
] ]
[[package]]
name = "flume"
version = "0.10.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "843c03199d0c0ca54bc1ea90ac0d507274c28abcc4f691ae8b4eaa375087c76a"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spin",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@ -1507,8 +1520,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad" checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"js-sys",
"libc", "libc",
"wasi 0.10.0+wasi-snapshot-preview1", "wasi 0.10.2+wasi-snapshot-preview1",
"wasm-bindgen",
] ]
[[package]] [[package]]
@ -1845,7 +1860,7 @@ dependencies = [
"log 0.3.9", "log 0.3.9",
"mime 0.2.6", "mime 0.2.6",
"num_cpus", "num_cpus",
"time 0.1.44", "time 0.1.43",
"traitobject", "traitobject",
"typeable", "typeable",
"unicase 1.4.2", "unicase 1.4.2",
@ -1870,7 +1885,7 @@ dependencies = [
"log 0.4.17", "log 0.4.17",
"net2", "net2",
"rustc_version", "rustc_version",
"time 0.1.44", "time 0.1.43",
"tokio 0.1.22", "tokio 0.1.22",
"tokio-buf", "tokio-buf",
"tokio-executor", "tokio-executor",
@ -2205,7 +2220,7 @@ dependencies = [
"email", "email",
"lettre", "lettre",
"mime 0.3.16", "mime 0.3.16",
"time 0.1.44", "time 0.1.43",
"uuid 0.7.4", "uuid 0.7.4",
] ]
@ -2653,6 +2668,15 @@ dependencies = [
"byteorder", "byteorder",
] ]
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom 0.2.6",
]
[[package]] [[package]]
name = "native-tls" name = "native-tls"
version = "0.2.10" version = "0.2.10"
@ -3240,6 +3264,8 @@ dependencies = [
"assert-json-diff", "assert-json-diff",
"base64 0.13.0", "base64 0.13.0",
"chrono", "chrono",
"flume",
"futures 0.3.21",
"heck", "heck",
"hex", "hex",
"once_cell", "once_cell",
@ -3798,7 +3824,7 @@ dependencies = [
"serde 1.0.137", "serde 1.0.137",
"serde_json", "serde_json",
"serde_urlencoded 0.5.5", "serde_urlencoded 0.5.5",
"time 0.1.44", "time 0.1.43",
"tokio 0.1.22", "tokio 0.1.22",
"tokio-executor", "tokio-executor",
"tokio-io", "tokio-io",
@ -3940,7 +3966,7 @@ dependencies = [
"rocket_codegen", "rocket_codegen",
"rocket_http", "rocket_http",
"state", "state",
"time 0.1.44", "time 0.1.43",
"toml 0.4.10", "toml 0.4.10",
"version_check 0.9.4", "version_check 0.9.4",
"yansi", "yansi",
@ -3983,7 +4009,7 @@ dependencies = [
"ring", "ring",
"rocket", "rocket",
"serde 1.0.137", "serde 1.0.137",
"time 0.1.44", "time 0.1.43",
] ]
[[package]] [[package]]
@ -3999,7 +4025,7 @@ dependencies = [
"percent-encoding 1.0.1", "percent-encoding 1.0.1",
"smallvec 1.8.0", "smallvec 1.8.0",
"state", "state",
"time 0.1.44", "time 0.1.43",
"unicode-xid 0.1.0", "unicode-xid 0.1.0",
] ]
@ -4378,6 +4404,15 @@ dependencies = [
"winapi 0.3.9", "winapi 0.3.9",
] ]
[[package]]
name = "spin"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c530c2b0d0bf8b69304b39fe2001993e267461948b890cd037d8ad4293fa1a0d"
dependencies = [
"lock_api 0.4.7",
]
[[package]] [[package]]
name = "stable_deref_trait" name = "stable_deref_trait"
version = "1.2.0" version = "1.2.0"
@ -4729,12 +4764,11 @@ dependencies = [
[[package]] [[package]]
name = "time" name = "time"
version = "0.1.44" version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
dependencies = [ dependencies = [
"libc", "libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi 0.3.9", "winapi 0.3.9",
] ]
@ -5417,9 +5451,9 @@ checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]] [[package]]
name = "wasi" name = "wasi"
version = "0.10.0+wasi-snapshot-preview1" version = "0.10.2+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]] [[package]]
name = "wasi" name = "wasi"

View File

@ -23,7 +23,9 @@ askama_escape = "0.10.3"
activitystreams = "0.7.0-alpha.18" activitystreams = "0.7.0-alpha.18"
activitystreams-ext = "0.1.0-alpha.2" activitystreams-ext = "0.1.0-alpha.2"
url = "2.2.2" url = "2.2.2"
flume = "0.10.12"
tokio = { version = "1.18.1", features = ["full"] } tokio = { version = "1.18.1", features = ["full"] }
futures = "0.3.21"
[dependencies.chrono] [dependencies.chrono]
features = ["serde"] features = ["serde"]

View File

@ -10,7 +10,8 @@ use activitystreams::{
}; };
use activitystreams_ext::{Ext1, Ext2, UnparsedExtension}; use activitystreams_ext::{Ext1, Ext2, UnparsedExtension};
use array_tool::vec::Uniq; use array_tool::vec::Uniq;
use reqwest::{header::HeaderValue, ClientBuilder, Url}; use futures::future::join_all;
use reqwest::{header::HeaderValue, ClientBuilder, RequestBuilder, Url};
use rocket::{ use rocket::{
http::Status, http::Status,
request::{FromRequest, Request}, request::{FromRequest, Request},
@ -156,7 +157,29 @@ where
.build() .build()
.expect("Error while initializing tokio runtime for federation"); .expect("Error while initializing tokio runtime for federation");
rt.block_on(async { rt.block_on(async {
let (tx, mut rx) = mpsc::channel(32); let capacity = 50;
let (tx, rx) = flume::bounded::<RequestBuilder>(capacity);
let mut handles = Vec::with_capacity(capacity);
for _ in 0..capacity {
let rx = rx.clone();
let handle = rt.spawn(async move {
while let Ok(request_builder) = rx.recv_async().await {
let _ = request_builder
.send()
.await
.map(move |r| {
if r.status().is_success() {
debug!("Successfully sent activity to inbox ({})", &r.url());
} else {
warn!("Error while sending to inbox ({:?})", &r)
}
debug!("Response: \"{:?}\"\n", r);
})
.map_err(|e| warn!("Error while sending to inbox ({:?})", e));
}
});
handles.push(handle);
}
for inbox in boxes { for inbox in boxes {
let body = signed.to_string(); let body = signed.to_string();
let mut headers = request::headers(); let mut headers = request::headers();
@ -182,33 +205,11 @@ where
request::signature(sender, &headers, ("post", url.path(), url.query())) request::signature(sender, &headers, ("post", url.path(), url.query()))
.expect("activity_pub::broadcast: request signature error"), .expect("activity_pub::broadcast: request signature error"),
); );
let client = client.clone(); let request_builder = client.post(&inbox).headers(headers.clone()).body(body);
let tx = tx.clone(); tx.send_async(request_builder).await.unwrap();
let _ = tx.send(
rt.spawn(
client
.post(&inbox)
.headers(headers.clone())
.body(body)
.send(),
),
);
}
while let Some(request) = rx.recv().await {
let _ = request
.await
.map(move |r| {
r.map(|r| {
if r.status().is_success() {
debug!("Successfully sent activity to inbox ({})", &r.url());
} else {
warn!("Error while sending to inbox ({} {:?})", &r.url(), &r)
}
debug!("Response: \"{:?}\"\n", r);
})
})
.map_err(|e| warn!("Error while sending to inbox ({:?})", e));
} }
drop(tx);
join_all(handles).await;
}); });
} }