From 9d37408535481b39b7c60825b07e88c2b4088019 Mon Sep 17 00:00:00 2001 From: Kitaiti Makoto Date: Sun, 31 Jan 2021 22:56:17 +0900 Subject: [PATCH] Define RemoteFetchActor --- plume-models/src/lib.rs | 1 + plume-models/src/remote_fetch_actor.rs | 103 +++++++++++++++++++++++++ 2 files changed, 104 insertions(+) create mode 100644 plume-models/src/remote_fetch_actor.rs diff --git a/plume-models/src/lib.rs b/plume-models/src/lib.rs index 8bfb2002..a3d10755 100755 --- a/plume-models/src/lib.rs +++ b/plume-models/src/lib.rs @@ -356,6 +356,7 @@ pub mod password_reset_requests; pub mod plume_rocket; pub mod post_authors; pub mod posts; +pub mod remote_fetch_actor; pub mod reshares; pub mod safe_string; #[allow(unused_imports)] diff --git a/plume-models/src/remote_fetch_actor.rs b/plume-models/src/remote_fetch_actor.rs new file mode 100644 index 00000000..49163404 --- /dev/null +++ b/plume-models/src/remote_fetch_actor.rs @@ -0,0 +1,103 @@ +use crate::{ + db_conn::{DbConn, DbPool}, + follows, + posts::{LicensedArticle, Post}, + users::{User, UserEvent}, + ACTOR_SYS, CONFIG, USER_CHAN, +}; +use activitypub::activity::Create; +use plume_common::activity_pub::inbox::FromId; +use riker::actors::{Actor, ActorFactoryArgs, ActorRefFactory, Context, Sender, Subscribe, Tell}; +use std::sync::Arc; +use tracing::{error, info, warn}; + +pub struct RemoteFetchActor { + conn: DbPool, +} + +impl RemoteFetchActor { + pub fn init(conn: DbPool) { + ACTOR_SYS + .actor_of_args::("remote-fetch", conn) + .expect("Failed to initialize remote fetch actor"); + } +} + +impl Actor for RemoteFetchActor { + type Msg = UserEvent; + + fn pre_start(&mut self, ctx: &Context) { + USER_CHAN.tell( + Subscribe { + actor: Box::new(ctx.myself()), + topic: "*".into(), + }, + None, + ) + } + + fn recv(&mut self, _ctx: &Context, msg: Self::Msg, _sender: Sender) { + use UserEvent::*; + + match msg { + RemoteUserFound(user) => match self.conn.get() { + Ok(conn) => { + let conn = DbConn(conn); + fetch_and_cache_articles(&user, &conn); + fetch_and_cache_followers(&user, &conn); + if user.needs_update() { + fetch_and_cache_user(&user, &conn); + } + } + _ => { + error!("Failed to get database connection"); + } + }, + } + } +} + +impl ActorFactoryArgs for RemoteFetchActor { + fn create_args(conn: DbPool) -> Self { + Self { conn } + } +} + +fn fetch_and_cache_articles(user: &Arc, conn: &DbConn) { + for create_act in user + .fetch_outbox::() + .expect("Remote user: outbox couldn't be fetched") + { + match create_act.create_props.object_object::() { + Ok(article) => { + Post::from_activity(conn, article) + .expect("Article from remote user couldn't be saved"); + info!("Fetched article from remote user"); + } + Err(e) => warn!("Error while fetching articles in background: {:?}", e), + } + } +} + +fn fetch_and_cache_followers(user: &Arc, conn: &DbConn) { + for user_id in user + .fetch_followers_ids() + .expect("Remote user: fetching followers error") + { + let follower = User::from_id(conn, &user_id, None, CONFIG.proxy()) + .expect("user::details: Couldn't fetch follower"); + follows::Follow::insert( + conn, + follows::NewFollow { + follower_id: follower.id, + following_id: user.id, + ap_url: String::new(), + }, + ) + .expect("Couldn't save follower for remote user"); + } +} + +fn fetch_and_cache_user(user: &Arc, conn: &DbConn) { + user.refetch(conn).expect("Couldn't update user info"); +}