From 35aa2374c4ca57b2db4b02a1df2c43c3c9ed4b15 Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Wed, 4 May 2022 21:12:35 +0900 Subject: [PATCH] Execute broadcast synchronously --- plume-common/src/activity_pub/mod.rs | 47 ++++++++++++---------------- 1 file changed, 20 insertions(+), 27 deletions(-) diff --git a/plume-common/src/activity_pub/mod.rs b/plume-common/src/activity_pub/mod.rs index aa17d86e..ebc5536f 100644 --- a/plume-common/src/activity_pub/mod.rs +++ b/plume-common/src/activity_pub/mod.rs @@ -1,13 +1,12 @@ use activitypub::{Activity, Link, Object}; use array_tool::vec::Uniq; -use reqwest::{header::HeaderValue, r#async::ClientBuilder, Url}; +use reqwest::{header::HeaderValue, ClientBuilder, Url}; use rocket::{ http::Status, request::{FromRequest, Request}, response::{Responder, Response}, Outcome, }; -use tokio::prelude::*; use tracing::{debug, warn}; use self::sign::Signable; @@ -140,8 +139,6 @@ where .connect_timeout(std::time::Duration::from_secs(5)) .build() .expect("Can't build client"); - let mut rt = tokio::runtime::current_thread::Runtime::new() - .expect("Error while initializing tokio runtime for federation"); for inbox in boxes { let body = signed.to_string(); let mut headers = request::headers(); @@ -162,30 +159,26 @@ where } headers.insert("Host", host_header_value.unwrap()); headers.insert("Digest", request::Digest::digest(&body)); - rt.spawn( - client - .post(&inbox) - .headers(headers.clone()) - .header( - "Signature", - request::signature(sender, &headers, ("post", url.path(), url.query())) - .expect("activity_pub::broadcast: request signature error"), - ) - .body(body) - .send() - .and_then(move |r| { - if r.status().is_success() { - debug!("Successfully sent activity to inbox ({})", &inbox); - } else { - warn!("Error while sending to inbox ({} {:?})", &inbox, &r) - } - r.into_body().concat2() - }) - .map(move |response| debug!("Response: \"{:?}\"\n", response)) - .map_err(|e| warn!("Error while sending to inbox ({:?})", e)), - ); + let _ = client + .post(&inbox) + .headers(headers.clone()) + .header( + "Signature", + request::signature(sender, &headers, ("post", url.path(), url.query())) + .expect("activity_pub::broadcast: request signature error"), + ) + .body(body) + .send() + .map(move |r| { + if r.status().is_success() { + debug!("Successfully sent activity to inbox ({})", &inbox); + } else { + warn!("Error while sending to inbox ({} {:?})", &inbox, &r) + } + debug!("Response: \"{:?}\"\n", r); + }) + .map_err(|e| warn!("Error while sending to inbox ({:?})", e)); } - rt.run().unwrap(); } #[derive(Shrinkwrap, Clone, Serialize, Deserialize)]