diff --git a/plume-common/src/activity_pub/mod.rs b/plume-common/src/activity_pub/mod.rs index 5fac258f..ea7850af 100644 --- a/plume-common/src/activity_pub/mod.rs +++ b/plume-common/src/activity_pub/mod.rs @@ -1,4 +1,12 @@ use activitypub::{Activity, Link, Object}; +use activitystreams::{ + activity::AsActivity, + actor::{ApActor, Group, Person}, + iri_string::types::IriString, + object::{ApObject, Article}, + unparsed::UnparsedMutExt, +}; +use activitystreams_ext::{Ext1, UnparsedExtension}; use array_tool::vec::Uniq; use reqwest::{header::HeaderValue, r#async::ClientBuilder, Url}; use rocket::{ @@ -185,6 +193,82 @@ where rt.run().unwrap(); } +pub fn broadcast07(sender: &S, act: A, to: Vec, proxy: Option) +where + S: sign::Signer, + A: AsActivity + serde::Serialize, + T: inbox::AsActor, +{ + let boxes = to + .into_iter() + .map(|u| { + u.get_shared_inbox_url() + .unwrap_or_else(|| u.get_inbox_url()) + }) + .collect::>() + .unique(); + + let mut act = serde_json::to_value(act).expect("activity_pub::broadcast: serialization error"); + act["@context"] = context(); + let signed = act + .sign(sender) + .expect("activity_pub::broadcast: signature error"); + + let mut rt = tokio::runtime::current_thread::Runtime::new() + .expect("Error while initializing tokio runtime for federation"); + for inbox in boxes { + let body = signed.to_string(); + let mut headers = request::headers(); + let url = Url::parse(&inbox); + if url.is_err() { + warn!("Inbox is invalid URL: {:?}", &inbox); + continue; + } + let url = url.unwrap(); + if !url.has_host() { + warn!("Inbox doesn't have host: {:?}", &inbox); + continue; + }; + let host_header_value = HeaderValue::from_str(url.host_str().expect("Unreachable")); + if host_header_value.is_err() { + warn!("Header value is invalid: {:?}", url.host_str()); + continue; + } + headers.insert("Host", host_header_value.unwrap()); + headers.insert("Digest", request::Digest::digest(&body)); + rt.spawn( + if let Some(proxy) = proxy.clone() { + ClientBuilder::new().proxy(proxy) + } else { + ClientBuilder::new() + } + .connect_timeout(std::time::Duration::from_secs(5)) + .build() + .expect("Can't build client") + .post(&inbox) + .headers(headers.clone()) + .header( + "Signature", + request::signature(sender, &headers, ("post", url.path(), url.query())) + .expect("activity_pub::broadcast: request signature error"), + ) + .body(body) + .send() + .and_then(move |r| { + if r.status().is_success() { + debug!("Successfully sent activity to inbox ({})", &inbox); + } else { + warn!("Error while sending to inbox ({:?})", &r) + } + r.into_body().concat2() + }) + .map(move |response| debug!("Response: \"{:?}\"\n", response)) + .map_err(|e| warn!("Error while sending to inbox ({:?})", e)), + ); + } + rt.run().unwrap(); +} + #[derive(Shrinkwrap, Clone, Serialize, Deserialize)] pub struct Id(String); @@ -226,6 +310,41 @@ pub struct PublicKey { pub public_key_pem: Option, } +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct ApSignature07 { + public_key: PublicKey07, +} + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct PublicKey07 { + id: IriString, + owner: IriString, + public_key_pem: String, +} + +impl UnparsedExtension for ApSignature07 +where + U: UnparsedMutExt, +{ + type Error = serde_json::Error; + + fn try_from_unparsed(unparsed_mut: &mut U) -> Result { + Ok(ApSignature07 { + public_key: unparsed_mut.remove("publicKey")?, + }) + } + + fn try_into_unparsed(self, unparsed_mut: &mut U) -> Result<(), Self::Error> { + unparsed_mut.insert("publicKey", self.public_key)?; + Ok(()) + } +} + +pub type CustomPerson = Ext1, ApSignature07>; +pub type CustomGroup = Ext1, ApSignature07>; + #[derive(Clone, Debug, Default, UnitString)] #[activitystreams(Hashtag)] pub struct HashtagType; @@ -262,6 +381,32 @@ pub struct Licensed { impl Object for Licensed {} +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct Licensed07 { + pub license: String, +} + +impl UnparsedExtension for Licensed07 +where + U: UnparsedMutExt, +{ + type Error = serde_json::Error; + + fn try_from_unparsed(unparsed_mut: &mut U) -> Result { + Ok(Licensed07 { + license: unparsed_mut.remove("license")?, + }) + } + + fn try_into_unparsed(self, unparsed_mut: &mut U) -> Result<(), Self::Error> { + unparsed_mut.insert("license", self.license)?; + Ok(()) + } +} + +pub type LicensedArticle = Ext1, Licensed07>; + #[cfg(test)] mod tests { use super::*;