add async/.await until all our errors are the same:

that our Connection is not Send-safe.
once we get there, we can start thinking about restructing the way we
pass along our connection, or consider using #[database].
This commit is contained in:
Mina Galić 2020-05-22 21:41:33 +02:00
parent 44ebce516c
commit 850b3c1337
No known key found for this signature in database
GPG Key ID: ACFEFF7F6A123A86
3 changed files with 61 additions and 46 deletions

View File

@ -8,6 +8,7 @@ edition = "2018"
[dependencies] [dependencies]
activitypub = "0.1.3" activitypub = "0.1.3"
askama_escape = "0.1" askama_escape = "0.1"
async-trait = "*"
atom_syndication = "0.6" atom_syndication = "0.6"
clap = "2.33" clap = "2.33"
colored = "1.8" colored = "1.8"
@ -32,7 +33,7 @@ syntect = "3.3"
tokio = "0.2" tokio = "0.2"
validator = "0.8" validator = "0.8"
validator_derive = "0.8" validator_derive = "0.8"
webfinger = { git = "https://github.com/Plume-org/webfinger", rev = "update-deps" } webfinger = { git = "https://github.com/Plume-org/webfinger", rev = "4e8f12810c4a7ba7a07bbcb722cd265fdff512b6", features = ["async"] }
[[bin]] [[bin]]
name = "plume" name = "plume"

View File

@ -36,8 +36,12 @@ pub fn create(
rockets: PlumeRocket, rockets: PlumeRocket,
) -> Result<Flash<Redirect>, Ructe> { ) -> Result<Flash<Redirect>, Ructe> {
let conn = &*rockets.conn; let conn = &*rockets.conn;
let blog = Blog::find_by_fqn(&rockets, &blog_name).expect("comments::create: blog error"); let blog = Blog::find_by_fqn(&rockets, &blog_name)
let post = Post::find_by_slug(&*conn, &slug, blog.id).expect("comments::create: post error"); .await
.expect("comments::create: blog error");
let post = Post::find_by_slug(&*conn, &slug, blog.id)
.await
.expect("comments::create: post error");
form.validate() form.validate()
.map(|_| { .map(|_| {
let (html, mentions, _hashtags) = utils::md_to_html( let (html, mentions, _hashtags) = utils::md_to_html(

View File

@ -42,7 +42,7 @@ pub fn me(user: Option<User>) -> RespondOrRedirect {
} }
#[get("/@/<name>", rank = 2)] #[get("/@/<name>", rank = 2)]
pub fn details( pub async fn details(
name: String, name: String,
rockets: PlumeRocket, rockets: PlumeRocket,
fetch_rockets: PlumeRocket, fetch_rockets: PlumeRocket,
@ -61,6 +61,7 @@ pub fn details(
worker.execute(move || { worker.execute(move || {
for create_act in user_clone for create_act in user_clone
.fetch_outbox::<Create>() .fetch_outbox::<Create>()
.await
.expect("Remote user: outbox couldn't be fetched") .expect("Remote user: outbox couldn't be fetched")
{ {
match create_act.create_props.object_object::<LicensedArticle>() { match create_act.create_props.object_object::<LicensedArticle>() {
@ -79,6 +80,7 @@ pub fn details(
worker.execute(move || { worker.execute(move || {
for user_id in user_clone for user_id in user_clone
.fetch_followers_ids() .fetch_followers_ids()
.await
.expect("Remote user: fetching followers error") .expect("Remote user: fetching followers error")
{ {
let follower = User::from_id(&fetch_followers_rockets, &user_id, None) let follower = User::from_id(&fetch_followers_rockets, &user_id, None)
@ -101,6 +103,7 @@ pub fn details(
worker.execute(move || { worker.execute(move || {
user_clone user_clone
.refetch(&*update_conn) .refetch(&*update_conn)
.await
.expect("Couldn't update user info"); .expect("Couldn't update user info");
}); });
} }
@ -152,7 +155,7 @@ pub fn follow(
rockets: PlumeRocket, rockets: PlumeRocket,
) -> Result<Flash<Redirect>, ErrorPage> { ) -> Result<Flash<Redirect>, ErrorPage> {
let conn = &*rockets.conn; let conn = &*rockets.conn;
let target = User::find_by_fqn(&rockets, &name)?; let target = User::find_by_fqn(&rockets, &name).await?;
let message = if let Ok(follow) = follows::Follow::find(&*conn, user.id, target.id) { let message = if let Ok(follow) = follows::Follow::find(&*conn, user.id, target.id) {
let delete_act = follow.build_undo(&*conn)?; let delete_act = follow.build_undo(&*conn)?;
local_inbox( local_inbox(
@ -196,9 +199,10 @@ pub fn follow_not_connected(
remote_form: Option<LenientForm<RemoteForm>>, remote_form: Option<LenientForm<RemoteForm>>,
i18n: I18n, i18n: I18n,
) -> Result<RespondOrRedirect, ErrorPage> { ) -> Result<RespondOrRedirect, ErrorPage> {
let target = User::find_by_fqn(&rockets, &name)?; let target = User::find_by_fqn(&rockets, &name).await?;
if let Some(remote_form) = remote_form { if let Some(remote_form) = remote_form {
if let Some(uri) = User::fetch_remote_interact_uri(&remote_form) if let Some(uri) = User::fetch_remote_interact_uri(&remote_form)
.await
.ok() .ok()
.and_then(|uri| { .and_then(|uri| {
uri.replace( uri.replace(
@ -265,15 +269,15 @@ pub fn follow_auth(name: String, i18n: I18n) -> Flash<Redirect> {
} }
#[get("/@/<name>/followers?<page>", rank = 2)] #[get("/@/<name>/followers?<page>", rank = 2)]
pub fn followers( pub async fn followers(
name: String, String,
page: Option<Page>, page: Option<Page>,
rockets: PlumeRocket, rockets: PlumeRocket,
) -> Result<Ructe, ErrorPage> { ) -> Result<Ructe, ErrorPage> {
let conn = &*rockets.conn; let conn = &*rockets.conn;
let page = page.unwrap_or_default(); let page = page.unwrap_or_default();
let user = User::find_by_fqn(&rockets, &name)?; let user = User::find_by_fqn(&rockets, &name).await?;
let followers_count = user.count_followers(&*conn)?; let followers_count = user.count_followers(&*;
Ok(render!(users::followers( Ok(render!(users::followers(
&rockets.to_context(), &rockets.to_context(),
@ -299,7 +303,7 @@ pub fn followed(
) -> Result<Ructe, ErrorPage> { ) -> Result<Ructe, ErrorPage> {
let conn = &*rockets.conn; let conn = &*rockets.conn;
let page = page.unwrap_or_default(); let page = page.unwrap_or_default();
let user = User::find_by_fqn(&rockets, &name)?; let user = User::find_by_fqn(&rockets, &name).await?;
let followed_count = user.count_followed(conn)?; let followed_count = user.count_followed(conn)?;
Ok(render!(users::followed( Ok(render!(users::followed(
@ -319,13 +323,13 @@ pub fn followed(
} }
#[get("/@/<name>", rank = 1)] #[get("/@/<name>", rank = 1)]
pub fn activity_details( pub async fn activity_details(
name: String, name: String,
rockets: PlumeRocket, rockets: PlumeRocket,
_ap: ApRequest, _ap: ApRequest,
) -> Option<ActivityStream<CustomPerson>> { ) -> Option<ActivityStream<CustomPerson>> {
let user = User::find_by_fqn(&rockets, &name).ok()?; let user = User::find_by_fqn(&rockets, &name).await?.ok()?;
Some(ActivityStream::new(user.to_activity(&*rockets.conn).ok()?)) Some(ActivityStream::new(user.to_activity(&*roonn).ok()?))
} }
#[get("/users/new")] #[get("/users/new")]
@ -411,39 +415,38 @@ pub fn update(
} }
#[post("/@/<name>/delete")] #[post("/@/<name>/delete")]
pub fn delete( pub async fn delete(
name: String, name: String,
user: User, user: User,
mut cookies: Cookies<'_>, mut cookies: Cookies<'_>,
rockets: PlumeRocket, rockets: PlumeRocket,
) -> Result<Flash<Redirect>, ErrorPage> { ) -> Result<Flash<Redirect>, ErrorPage> {
let account = User::find_by_fqn(&rockets, &name)?; let account = User::find_by_fqn(&rockets, &name).await?;
if user.id == account.id { if user.id != account.id {
account.delete(&*rockets.conn, &rockets.searcher)?; return Ok(Flash::error(
let target = User::one_by_instance(&*rockets.conn)?;
let delete_act = account.delete_activity(&*rockets.conn)?;
rockets
.worker
.execute(move || broadcast(&account, delete_act, target));
if let Some(cookie) = cookies.get_private(AUTH_COOKIE) {
cookies.remove_private(cookie);
}
Ok(Flash::success(
Redirect::to(uri!(super::instance::index)),
i18n!(rockets.intl.catalog, "Your account has been deleted."),
))
} else {
Ok(Flash::error(
Redirect::to(uri!(edit: name = name)), Redirect::to(uri!(edit: name = name)),
i18n!( i18n!(
rockets.intl.catalog, rockets.intl.catalog,
"You can't delete someone else's account." "You can't delete someone else's account."
), ),
)) ));
} }
account.delete(&*rockets.conn, &rockets.searcher)?;
let target = User::one_by_instance(&*rockets.conn)?;
let delete_act = account.delete_activity(&*rockets.conn)?;
rockets
.worker
.execute(move || broadcast(&account, delete_act, target));
if let Some(cookie) = cookies.get_private(AUTH_COOKIE) {
cookies.remove_private(cookie);
}
Ok(Flash::success(
Redirect::to(uri!(super::instance::index)),
i18n!(rockets.intl.catalog, "Your account has been deleted."),
))
} }
#[derive(Default, FromForm, Validate)] #[derive(Default, FromForm, Validate)]
@ -564,37 +567,44 @@ pub fn create(
} }
#[get("/@/<name>/outbox")] #[get("/@/<name>/outbox")]
pub fn outbox(name: String, rockets: PlumeRocket) -> Option<ActivityStream<OrderedCollection>> { pub async fn outbox(
let user = User::find_by_fqn(&rockets, &name).ok()?; name: String,
rockets: PlumeRocket,
) -> Option<ActivityStream<OrderedCollection>> {
let user = User::find_by_fqn(&rockets, &name).await.ok()?;
user.outbox(&*rockets.conn).ok() user.outbox(&*rockets.conn).ok()
} }
#[get("/@/<name>/outbox?<page>")] #[get("/@/<name>/outbox?<page>")]
pub fn outbox_page( pub async fn outbox_page(
name: String, name: String,
page: Page, page: Page,
rockets: PlumeRocket, rockets: PlumeRocket,
) -> Option<ActivityStream<OrderedCollectionPage>> { ) -> Option<ActivityStream<OrderedCollectionPage>> {
let user = User::find_by_fqn(&rockets, &name).ok()?; let user = User::find_by_fqn(&rockets, &name).await.ok()?;
user.outbox_page(&*rockets.conn, page.limits()).ok() user.outbox_page(&*rockets.conn, page.limits()).ok()
} }
#[post("/@/<name>/inbox", data = "<data>")] #[post("/@/<name>/inbox", data = "<data>")]
pub fn inbox( pub async fn inbox(
name: String, name: String,
data: inbox::SignedJson<serde_json::Value>, data: inbox::SignedJson<serde_json::Value>,
headers: Headers<'_>, headers: Headers<'_>,
rockets: PlumeRocket, rockets: PlumeRocket,
) -> Result<String, status::BadRequest<&'static str>> { ) -> Result<String, status::BadRequest<&'static str>> {
User::find_by_fqn(&rockets, &name).map_err(|_| status::BadRequest(Some("User not found")))?; User::find_by_fqn(&rockets, &name)
.await
.map_err(|_| status::BadRequest(Some("User not found")))?;
inbox::handle_incoming(rockets, data, headers) inbox::handle_incoming(rockets, data, headers)
} }
#[get("/@/<name>/followers", rank = 1)] #[get("/@/<name>/followers", rank = 1)]
pub fn ap_followers( pub async fn ap_followers(
name: String, name: String,
rockets: PlumeRocket, rockets: PlumeRocket,
_ap: ApRequest, _ap: ApRequest,
) -> Option<ActivityStream<OrderedCollection>> { ) -> Option<ActivityStream<OrderedCollection>> {
let user = User::find_by_fqn(&rockets, &name).ok()?; let user = User::find_by_fqn(&rockets, &name).await?.ok()?;
let followers = user let followers = user
.get_followers(&*rockets.conn) .get_followers(&*rockets.conn)
.ok()? .ok()?
@ -614,9 +624,9 @@ pub fn ap_followers(
} }
#[get("/@/<name>/atom.xml")] #[get("/@/<name>/atom.xml")]
pub fn atom_feed(name: String, rockets: PlumeRocket) -> Option<Content<String>> { pub async fn atom_feed(name: String, rockets: PlumeRocket) -> Option<Content<String>> {
let conn = &*rockets.conn; let conn = &*rockets.conn;
let author = User::find_by_fqn(&rockets, &name).ok()?; let author = User::find_by_fqn(&rockets, &name).await?.ok()?;
let entries = Post::get_recents_for_author(conn, &author, 15).ok()?; let entries = Post::get_recents_for_author(conn, &author, 15).ok()?;
let uri = Instance::get_local() let uri = Instance::get_local()
.ok()? .ok()?