Try to fetch followers

This commit is contained in:
Bat 2018-07-27 12:53:21 +02:00
parent 812b76b0de
commit 38d99ad5af
5 changed files with 60 additions and 10 deletions

View File

@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
ALTER TABLE users DROP COLUMN followers_endpoint;

View File

@ -0,0 +1,2 @@
-- Your SQL goes here
ALTER TABLE users ADD COLUMN followers_endpoint VARCHAR NOT NULL DEFAULT '';

View File

@ -135,6 +135,7 @@ table! {
private_key -> Nullable<Text>, private_key -> Nullable<Text>,
public_key -> Text, public_key -> Text,
shared_inbox_url -> Nullable<Varchar>, shared_inbox_url -> Nullable<Varchar>,
followers_endpoint -> Varchar,
} }
} }

View File

@ -63,7 +63,8 @@ pub struct User {
pub ap_url: String, pub ap_url: String,
pub private_key: Option<String>, pub private_key: Option<String>,
pub public_key: String, pub public_key: String,
pub shared_inbox_url: Option<String> pub shared_inbox_url: Option<String>,
pub followers_endpoint: String
} }
#[derive(Insertable)] #[derive(Insertable)]
@ -81,7 +82,8 @@ pub struct NewUser {
pub ap_url: String, pub ap_url: String,
pub private_key: Option<String>, pub private_key: Option<String>,
pub public_key: String, pub public_key: String,
pub shared_inbox_url: Option<String> pub shared_inbox_url: Option<String>,
pub followers_endpoint: String
} }
const USER_PREFIX: &'static str = "@"; const USER_PREFIX: &'static str = "@";
@ -152,7 +154,7 @@ impl User {
} }
} }
fn fetch_from_url(conn: &PgConnection, url: String) -> Option<User> { pub fn fetch_from_url(conn: &PgConnection, url: String) -> Option<User> {
let req = Client::new() let req = Client::new()
.get(&url[..]) .get(&url[..])
.header(Accept(ap_accept_header().into_iter().map(|h| qitem(h.parse::<Mime>().expect("Invalid Content-Type"))).collect())) .header(Accept(ap_accept_header().into_iter().map(|h| qitem(h.parse::<Mime>().expect("Invalid Content-Type"))).collect()))
@ -186,7 +188,6 @@ impl User {
}) })
} }
}; };
println!("User from act : {:?}", acct.custom_props);
User::insert(conn, NewUser { User::insert(conn, NewUser {
username: acct.object.ap_actor_props.preferred_username_string().expect("User::from_activity: preferredUsername error"), username: acct.object.ap_actor_props.preferred_username_string().expect("User::from_activity: preferredUsername error"),
display_name: acct.object.object_props.name_string().expect("User::from_activity: name error"), display_name: acct.object.object_props.name_string().expect("User::from_activity: name error"),
@ -202,7 +203,8 @@ impl User {
.public_key_pem_string().expect("User::from_activity: publicKey.publicKeyPem error"), .public_key_pem_string().expect("User::from_activity: publicKey.publicKeyPem error"),
private_key: None, private_key: None,
shared_inbox_url: acct.object.ap_actor_props.endpoints_endpoint() shared_inbox_url: acct.object.ap_actor_props.endpoints_endpoint()
.and_then(|e| e.shared_inbox_string()).ok() .and_then(|e| e.shared_inbox_string()).ok(),
followers_endpoint: acct.object.ap_actor_props.followers_string().expect("User::from_activity: followers error")
}) })
} }
@ -243,6 +245,12 @@ impl User {
.set(users::shared_inbox_url.eq(ap_url(format!("{}/inbox", Instance::get_local(conn).unwrap().public_domain)))) .set(users::shared_inbox_url.eq(ap_url(format!("{}/inbox", Instance::get_local(conn).unwrap().public_domain))))
.get_result::<User>(conn).expect("Couldn't update shared inbox URL"); .get_result::<User>(conn).expect("Couldn't update shared inbox URL");
} }
if self.followers_endpoint.len() == 0 {
diesel::update(self)
.set(users::followers_endpoint.eq(instance.compute_box(USER_PREFIX, self.username.clone(), "followers")))
.get_result::<User>(conn).expect("Couldn't update followers endpoint");
}
} }
pub fn outbox(&self, conn: &PgConnection) -> ActivityStream<OrderedCollection> { pub fn outbox(&self, conn: &PgConnection) -> ActivityStream<OrderedCollection> {
@ -276,6 +284,28 @@ impl User {
} }
} }
pub fn fetch_followers_ids(&self) -> Vec<String> {
let req = Client::new()
.get(&self.followers_endpoint[..])
.header(Accept(ap_accept_header().into_iter().map(|h| qitem(h.parse::<Mime>().expect("Invalid Content-Type"))).collect()))
.send();
match req {
Ok(mut res) => {
let text = &res.text().unwrap();
let json: serde_json::Value = serde_json::from_str(text).unwrap();
json["items"].as_array()
.expect("Followers.items is not an array")
.into_iter()
.filter_map(|j| serde_json::from_value(j.clone()).ok())
.collect::<Vec<String>>()
},
Err(e) => {
println!("User followers fetch error: {:?}", e);
vec![]
}
}
}
fn get_activities(&self, conn: &PgConnection) -> Vec<serde_json::Value> { fn get_activities(&self, conn: &PgConnection) -> Vec<serde_json::Value> {
use schema::posts; use schema::posts;
use schema::post_authors; use schema::post_authors;
@ -377,6 +407,7 @@ impl User {
actor.ap_actor_props.set_inbox_string(self.inbox_url.clone()).expect("User::into_activity: inbox error"); actor.ap_actor_props.set_inbox_string(self.inbox_url.clone()).expect("User::into_activity: inbox error");
actor.ap_actor_props.set_outbox_string(self.outbox_url.clone()).expect("User::into_activity: outbox error"); actor.ap_actor_props.set_outbox_string(self.outbox_url.clone()).expect("User::into_activity: outbox error");
actor.ap_actor_props.set_preferred_username_string(self.username.clone()).expect("User::into_activity: preferredUsername error"); actor.ap_actor_props.set_preferred_username_string(self.username.clone()).expect("User::into_activity: preferredUsername error");
actor.ap_actor_props.set_followers_string(self.followers_endpoint.clone()).expect("User::into_activity: followers error");
let mut endpoints = Endpoint::default(); let mut endpoints = Endpoint::default();
endpoints.set_shared_inbox_string(ap_url(format!("{}/inbox/", BASE_URL.as_str()))).expect("User::into_activity: endpoints.sharedInbox error"); endpoints.set_shared_inbox_string(ap_url(format!("{}/inbox/", BASE_URL.as_str()))).expect("User::into_activity: endpoints.sharedInbox error");
@ -517,7 +548,8 @@ impl NewUser {
ap_url: String::from(""), ap_url: String::from(""),
public_key: String::from_utf8(pub_key).unwrap(), public_key: String::from_utf8(pub_key).unwrap(),
private_key: Some(String::from_utf8(priv_key).unwrap()), private_key: Some(String::from_utf8(priv_key).unwrap()),
shared_inbox_url: None shared_inbox_url: None,
followers_endpoint: String::from("")
}) })
} }
} }

View File

@ -39,27 +39,40 @@ fn me(user: Option<User>) -> Result<Redirect, Flash<Redirect>> {
} }
#[get("/@/<name>", rank = 2)] #[get("/@/<name>", rank = 2)]
fn details<'r>(name: String, conn: DbConn, account: Option<User>, worker: State<Pool<ThunkWorker<()>>>, other_conn: DbConn) -> Template { fn details<'r>(name: String, conn: DbConn, account: Option<User>, worker: State<Pool<ThunkWorker<()>>>, fecth_articles_conn: DbConn, fecth_followers_conn: DbConn) -> Template {
may_fail!(account, User::find_by_fqn(&*conn, name), "Couldn't find requested user", |user| { may_fail!(account, User::find_by_fqn(&*conn, name), "Couldn't find requested user", |user| {
let recents = Post::get_recents_for_author(&*conn, &user, 6); let recents = Post::get_recents_for_author(&*conn, &user, 6);
let reshares = Reshare::get_recents_for_author(&*conn, &user, 6); let reshares = Reshare::get_recents_for_author(&*conn, &user, 6);
let user_id = user.id.clone(); let user_id = user.id.clone();
let n_followers = user.get_followers(&*conn).len(); let n_followers = user.get_followers(&*conn).len();
// Fetch new articles
if !user.get_instance(&*conn).local { if !user.get_instance(&*conn).local {
// Fetch new articles
let user_clone = user.clone(); let user_clone = user.clone();
worker.execute(Thunk::of(move || { worker.execute(Thunk::of(move || {
for create_act in user_clone.fetch_outbox::<Create>() { for create_act in user_clone.fetch_outbox::<Create>() {
match create_act.create_props.object_object::<Article>() { match create_act.create_props.object_object::<Article>() {
Ok(article) => { Ok(article) => {
Post::from_activity(&*other_conn, article, user_clone.clone().into_id()); Post::from_activity(&*fecth_articles_conn, article, user_clone.clone().into_id());
println!("Fetched article from remote user"); println!("Fetched article from remote user");
} }
Err(e) => println!("Error while fetching articles in background: {:?}", e) Err(e) => println!("Error while fetching articles in background: {:?}", e)
} }
} }
})); }));
// Fetch followers
let user_clone = user.clone();
worker.execute(Thunk::of(move || {
for user_id in user_clone.fetch_followers_ids() {
let follower = User::find_by_ap_url(&*fecth_followers_conn, user_id.clone())
.unwrap_or_else(|| User::fetch_from_url(&*fecth_followers_conn, user_id).expect("Couldn't fetch follower"));
follows::Follow::insert(&*fecth_followers_conn, follows::NewFollow {
follower_id: follower.id,
following_id: user_clone.id
});
}
}));
} }
Template::render("users/details", json!({ Template::render("users/details", json!({
@ -258,7 +271,7 @@ fn ap_followers(name: String, conn: DbConn, _ap: ApRequest) -> ActivityStream<Or
let followers = user.get_followers(&*conn).into_iter().map(|f| Id::new(f.ap_url)).collect::<Vec<Id>>(); let followers = user.get_followers(&*conn).into_iter().map(|f| Id::new(f.ap_url)).collect::<Vec<Id>>();
let mut coll = OrderedCollection::default(); let mut coll = OrderedCollection::default();
coll.object_props.set_id_string(format!("{}/followers", user.ap_url)).expect("Follower collection: id error"); coll.object_props.set_id_string(user.followers_endpoint).expect("Follower collection: id error");
coll.collection_props.set_total_items_u64(followers.len() as u64).expect("Follower collection: totalItems error"); coll.collection_props.set_total_items_u64(followers.len() as u64).expect("Follower collection: totalItems error");
coll.collection_props.set_items_link_vec(followers).expect("Follower collection: items error"); coll.collection_props.set_items_link_vec(followers).expect("Follower collection: items error");
ActivityStream::new(coll) ActivityStream::new(coll)