From ce4b216722c1917a5e98c8ff55962c7c06dae62a Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 04:34:04 +0900 Subject: [PATCH] Broadcast asynchronously --- plume-common/src/activity_pub/mod.rs | 94 +++++++++++++++++----------- 1 file changed, 56 insertions(+), 38 deletions(-) diff --git a/plume-common/src/activity_pub/mod.rs b/plume-common/src/activity_pub/mod.rs index 9b85a4c0..69d40cd5 100644 --- a/plume-common/src/activity_pub/mod.rs +++ b/plume-common/src/activity_pub/mod.rs @@ -1,12 +1,13 @@ use activitypub::{Activity, Link, Object}; use array_tool::vec::Uniq; -use reqwest::{blocking::ClientBuilder, header::HeaderValue, Url}; +use reqwest::{header::HeaderValue, ClientBuilder, Url}; use rocket::{ http::Status, request::{FromRequest, Request}, response::{Responder, Response}, Outcome, }; +use tokio::{runtime, sync::mpsc}; use tracing::{debug, warn}; use self::sign::Signable; @@ -139,46 +140,63 @@ where .connect_timeout(std::time::Duration::from_secs(5)) .build() .expect("Can't build client"); - for inbox in boxes { - let body = signed.to_string(); - let mut headers = request::headers(); - let url = Url::parse(&inbox); - if url.is_err() { - warn!("Inbox is invalid URL: {:?}", &inbox); - continue; - } - let url = url.unwrap(); - if !url.has_host() { - warn!("Inbox doesn't have host: {:?}", &inbox); - continue; - }; - let host_header_value = HeaderValue::from_str(url.host_str().expect("Unreachable")); - if host_header_value.is_err() { - warn!("Header value is invalid: {:?}", url.host_str()); - continue; - } - headers.insert("Host", host_header_value.unwrap()); - headers.insert("Digest", request::Digest::digest(&body)); - let _ = client - .post(&inbox) - .headers(headers.clone()) - .header( + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Error while initializing tokio runtime for federation"); + rt.block_on(async { + let (tx, mut rx) = mpsc::channel(32); + for inbox in boxes { + let body = signed.to_string(); + let mut headers = request::headers(); + let url = Url::parse(&inbox); + if url.is_err() { + warn!("Inbox is invalid URL: {:?}", &inbox); + continue; + } + let url = url.unwrap(); + if !url.has_host() { + warn!("Inbox doesn't have host: {:?}", &inbox); + continue; + }; + let host_header_value = HeaderValue::from_str(url.host_str().expect("Unreachable")); + if host_header_value.is_err() { + warn!("Header value is invalid: {:?}", url.host_str()); + continue; + } + headers.insert("Host", host_header_value.unwrap()); + headers.insert("Digest", request::Digest::digest(&body)); + headers.insert( "Signature", request::signature(sender, &headers, ("post", url.path(), url.query())) .expect("activity_pub::broadcast: request signature error"), - ) - .body(body) - .send() - .map(move |r| { - if r.status().is_success() { - debug!("Successfully sent activity to inbox ({})", &inbox); - } else { - warn!("Error while sending to inbox ({} {:?})", &inbox, &r) - } - debug!("Response: \"{:?}\"\n", r); - }) - .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); - } + ); + let client = client.clone(); + let tx = tx.clone(); + rt.spawn(async move { + let _ = tx.send( + client + .post(&inbox) + .headers(headers.clone()) + .body(body) + .send() + .await, + ); + }); + } + while let Some(request) = rx.recv().await { + let _ = request + .map(move |r| { + if r.status().is_success() { + debug!("Successfully sent activity to inbox ({})", &r.url()); + } else { + warn!("Error while sending to inbox ({} {:?})", &r.url(), &r) + } + debug!("Response: \"{:?}\"\n", r); + }) + .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); + } + }); } #[derive(Shrinkwrap, Clone, Serialize, Deserialize)]