From a7b899817acbf951d0b56c9e69c439eae332d63b Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Thu, 5 May 2022 09:04:54 +0900 Subject: [PATCH] Run HTTP request in broadcast() on tokio runtime --- plume-common/src/activity_pub/mod.rs | 88 +++++++++++++++------------- 1 file changed, 48 insertions(+), 40 deletions(-) diff --git a/plume-common/src/activity_pub/mod.rs b/plume-common/src/activity_pub/mod.rs index 9b85a4c0..3715aadd 100644 --- a/plume-common/src/activity_pub/mod.rs +++ b/plume-common/src/activity_pub/mod.rs @@ -1,12 +1,13 @@ use activitypub::{Activity, Link, Object}; use array_tool::vec::Uniq; -use reqwest::{blocking::ClientBuilder, header::HeaderValue, Url}; +use reqwest::{header::HeaderValue, ClientBuilder, Url}; use rocket::{ http::Status, request::{FromRequest, Request}, response::{Responder, Response}, Outcome, }; +use tokio::runtime; use tracing::{debug, warn}; use self::sign::Signable; @@ -139,46 +140,53 @@ where .connect_timeout(std::time::Duration::from_secs(5)) .build() .expect("Can't build client"); - for inbox in boxes { - let body = signed.to_string(); - let mut headers = request::headers(); - let url = Url::parse(&inbox); - if url.is_err() { - warn!("Inbox is invalid URL: {:?}", &inbox); - continue; + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Error while initializing tokio runtime for federation"); + rt.block_on(async { + for inbox in boxes { + let body = signed.to_string(); + let mut headers = request::headers(); + let url = Url::parse(&inbox); + if url.is_err() { + warn!("Inbox is invalid URL: {:?}", &inbox); + continue; + } + let url = url.unwrap(); + if !url.has_host() { + warn!("Inbox doesn't have host: {:?}", &inbox); + continue; + }; + let host_header_value = HeaderValue::from_str(url.host_str().expect("Unreachable")); + if host_header_value.is_err() { + warn!("Header value is invalid: {:?}", url.host_str()); + continue; + } + headers.insert("Host", host_header_value.unwrap()); + headers.insert("Digest", request::Digest::digest(&body)); + let _ = client + .post(&inbox) + .headers(headers.clone()) + .header( + "Signature", + request::signature(sender, &headers, ("post", url.path(), url.query())) + .expect("activity_pub::broadcast: request signature error"), + ) + .body(body) + .send() + .await + .map(move |r| { + if r.status().is_success() { + debug!("Successfully sent activity to inbox ({})", &inbox); + } else { + warn!("Error while sending to inbox ({} {:?})", &inbox, &r) + } + debug!("Response: \"{:?}\"\n", r); + }) + .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); } - let url = url.unwrap(); - if !url.has_host() { - warn!("Inbox doesn't have host: {:?}", &inbox); - continue; - }; - let host_header_value = HeaderValue::from_str(url.host_str().expect("Unreachable")); - if host_header_value.is_err() { - warn!("Header value is invalid: {:?}", url.host_str()); - continue; - } - headers.insert("Host", host_header_value.unwrap()); - headers.insert("Digest", request::Digest::digest(&body)); - let _ = client - .post(&inbox) - .headers(headers.clone()) - .header( - "Signature", - request::signature(sender, &headers, ("post", url.path(), url.query())) - .expect("activity_pub::broadcast: request signature error"), - ) - .body(body) - .send() - .map(move |r| { - if r.status().is_success() { - debug!("Successfully sent activity to inbox ({})", &inbox); - } else { - warn!("Error while sending to inbox ({} {:?})", &inbox, &r) - } - debug!("Response: \"{:?}\"\n", r); - }) - .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); - } + }); } #[derive(Shrinkwrap, Clone, Serialize, Deserialize)]