Plume/plume-models/src/remote_fetch_actor.rs

132 lines
4.3 KiB
Rust
Raw Normal View History

2021-01-31 14:56:17 +01:00
use crate::{
db_conn::{DbConn, DbPool},
follows,
2022-05-02 05:58:01 +02:00
posts::Post,
2021-01-31 14:56:17 +01:00
users::{User, UserEvent},
ACTOR_SYS, CONFIG, USER_CHAN,
};
2022-05-02 05:58:01 +02:00
use activitystreams::{
activity::{ActorAndObjectRef, Create},
2022-05-02 05:58:01 +02:00
base::AnyBase,
object::kind::ArticleType,
};
use plume_common::activity_pub::{inbox::FromId, LicensedArticle};
2021-01-31 14:56:17 +01:00
use riker::actors::{Actor, ActorFactoryArgs, ActorRefFactory, Context, Sender, Subscribe, Tell};
use std::sync::Arc;
use tracing::{error, info, warn};
pub struct RemoteFetchActor {
conn: DbPool,
}
impl RemoteFetchActor {
pub fn init(conn: DbPool) {
let actor = ACTOR_SYS
2021-01-31 14:56:17 +01:00
.actor_of_args::<RemoteFetchActor, _>("remote-fetch", conn)
.expect("Failed to initialize remote fetch actor");
USER_CHAN.tell(
Subscribe {
actor: Box::new(actor),
2021-01-31 14:56:17 +01:00
topic: "*".into(),
},
None,
)
}
}
impl Actor for RemoteFetchActor {
type Msg = UserEvent;
2021-01-31 14:56:17 +01:00
fn recv(&mut self, _ctx: &Context<Self::Msg>, msg: Self::Msg, _sender: Sender) {
use UserEvent::*;
match msg {
RemoteUserFound(user) => match self.conn.get() {
Ok(conn) => {
let conn = DbConn(conn);
2021-01-31 17:16:52 +01:00
// Don't call these functions in parallel
// for the case database connections limit is too small
2021-01-31 14:56:17 +01:00
fetch_and_cache_articles(&user, &conn);
fetch_and_cache_followers(&user, &conn);
if user.needs_update() {
fetch_and_cache_user(&user, &conn);
}
}
_ => {
error!("Failed to get database connection");
}
},
}
}
}
impl ActorFactoryArgs<DbPool> for RemoteFetchActor {
fn create_args(conn: DbPool) -> Self {
Self { conn }
}
}
fn fetch_and_cache_articles(user: &Arc<User>, conn: &DbConn) {
2022-05-02 19:11:46 +02:00
let create_acts = user.fetch_outbox::<Create>();
2021-01-31 17:36:29 +01:00
match create_acts {
Ok(create_acts) => {
for create_act in create_acts {
2022-05-03 03:04:22 +02:00
match create_act.object_field_ref().as_single_base().map(|base| {
let any_base = AnyBase::from_base(base.clone()); // FIXME: Don't clone()
any_base.extend::<LicensedArticle, ArticleType>()
}) {
Some(Ok(Some(article))) => {
Post::from_activity(conn, article)
2021-01-31 17:36:29 +01:00
.expect("Article from remote user couldn't be saved");
info!("Fetched article from remote user");
}
2022-05-03 03:04:22 +02:00
Some(Err(e)) => warn!("Error while fetching articles in background: {:?}", e),
2022-05-02 05:58:01 +02:00
_ => warn!("Error while fetching articles in background"),
2021-01-31 17:36:29 +01:00
}
2021-01-31 14:56:17 +01:00
}
2021-01-31 17:36:29 +01:00
}
Err(err) => {
error!("Failed to fetch outboxes: {:?}", err);
2021-01-31 14:56:17 +01:00
}
}
}
fn fetch_and_cache_followers(user: &Arc<User>, conn: &DbConn) {
2021-01-31 17:36:29 +01:00
let follower_ids = user.fetch_followers_ids();
match follower_ids {
Ok(user_ids) => {
for user_id in user_ids {
2022-05-02 12:24:36 +02:00
let follower = User::from_id(conn, &user_id, None, CONFIG.proxy());
2021-01-31 17:36:29 +01:00
match follower {
Ok(follower) => {
let inserted = follows::Follow::insert(
conn,
follows::NewFollow {
follower_id: follower.id,
following_id: user.id,
ap_url: String::new(),
},
);
if inserted.is_err() {
error!("Couldn't save follower for remote user: {:?}", user_id);
}
}
Err(err) => {
error!("Couldn't fetch follower: {:?}", err);
}
}
}
}
Err(err) => {
error!("Failed to fetch follower: {:?}", err);
}
2021-01-31 14:56:17 +01:00
}
}
fn fetch_and_cache_user(user: &Arc<User>, conn: &DbConn) {
2021-01-31 17:36:29 +01:00
if user.refetch(conn).is_err() {
error!("Couldn't update user info: {:?}", user);
}
2021-01-31 14:56:17 +01:00
}