From 2dfbc314b2612996f93cd5f36326eba74a1f51f3 Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Tue, 17 Jun 2025 14:39:35 -0700 Subject: [PATCH] Db, add activity ingestor, fix type name activity has its type name Authority instead of Uri for some reason? So that is no longer an issue. EVERYTHING IS STILL UNTESTED BABY~ --- db/src/interfaces/direct/functions.rs | 109 ++++++++++---------------- db/src/interfaces/direct/types.rs | 3 +- 2 files changed, 43 insertions(+), 69 deletions(-) diff --git a/db/src/interfaces/direct/functions.rs b/db/src/interfaces/direct/functions.rs index caf9cbd..a61ab37 100644 --- a/db/src/interfaces/direct/functions.rs +++ b/db/src/interfaces/direct/functions.rs @@ -5,6 +5,7 @@ use atproto::types::{ use crate::{ Error, interfaces::direct::types::{ + Activity, Participant, Session, User @@ -17,6 +18,14 @@ use sqlx::{ query, }; +pub async fn ingest_activity( + db: PgPool, activity: Activity +) -> Result<(), Error> { + let mut transaction = db.begin().await?; + write_activity(&mut transaction, activity).await?; + transaction.commit().await.map_err(Error::Backend) +} + pub async fn ingest_session( db: PgPool, session: Session ) -> Result<(), Error> { @@ -25,6 +34,39 @@ pub async fn ingest_session( transaction.commit().await.map_err(Error::Backend) } +async fn write_activity( + tr: &mut Transaction<'_, Postgres>, activity: Activity +) -> Result<(), Error> { + let (sessionuri, sessioncid) = match activity.session { + Some(sr) => { + let (session, cid) = sr.extract_content(); + let sessionuri = session.uri.to_string(); + write_session(tr, session).await?; + (sessionuri, cid.to_string()) + } + None => ("".to_string(), "".to_string()), + }; + + query!(r#" + INSERT INTO + activity(uri, cid, session, sessioncid, performed_at, created_at, indexed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7) + "#, + &activity.uri.to_string(), + &activity.cid.to_string(), + sessionuri, + sessioncid, + activity.performed_at.map(|dt| dt.to_string()), + activity.created_at.map(|dt| dt.to_string()), + &Datetime::now()?.to_string(), + ).execute(&mut **tr).await?; + + // TODO: Handle Progress + + Ok(()) +} + + async fn write_session( tr: &mut Transaction<'_, Postgres>, session: Session ) -> Result<(), Error> { @@ -72,70 +114,3 @@ async fn write_participant( Ok(()) } - -// pub fn injest_activity(db: PgPool, activity: Activity) -> Result<(), Error> { -// todo!(); -// } -// -// let mut transaction = db.begin().await?; -// -// query!(r#" -// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) -// "#, -// session.sessionuri, session.label -// ).execute(&mut *transaction).await?; -// -// for participant in session.participants { -// query!(r#" -// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) -// "#, -// session.sessionuri, participant.userdid, participant.role.to_string() -// ).execute(&mut *transaction).await?; -// } -// -// transaction.commit().await -// } -// -// pub async fn ingest_participant(db: PgPool, participant: Participant) -> Result<(), Error> { -// query!(r#" -// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) -// "#, -// session.sessionuri, session.label -// ).execute(&mut *transaction).await?; -// -// for participant in session.participants { -// query!(r#" -// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) -// "#, -// session.sessionuri, participant.userdid, participant.role.to_string() -// ).execute(&mut *transaction).await?; -// } -// -// transaction.commit().await -// } -// - - // pub async fn add_user(&self, user: &User) -> Result<()> { - // query!(r#" - // INSERT INTO users(userdid, handle) VALUES ($1, $2) - // "#, - // user.userdid, user.handle - // ).execute(self.pool).await?; - // Ok(()) - // } - // - // pub async fn add_session(&self, session: &Session) -> Result<()> { - // } - // - // pub async fn add_participant(&self, session: Session, - // participant: Participant) -> Result { - // query!(r#" - // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) - // "#, - // session.sessionuri, participant.userdid, participant.role.to_string() - // ).execute(self.pool).await?; - // - // session.participants.push(participant); - // - // Ok(session) - // } diff --git a/db/src/interfaces/direct/types.rs b/db/src/interfaces/direct/types.rs index 91acc73..4a3ea65 100644 --- a/db/src/interfaces/direct/types.rs +++ b/db/src/interfaces/direct/types.rs @@ -1,4 +1,3 @@ -use crate::Error; use atproto::types::{ Cid, Uri, @@ -10,7 +9,7 @@ use atproto::types::{ use std::fmt::{Display, Formatter, Result as FmtResult}; pub struct Activity { - pub authority: Uri, + pub uri: Uri, pub cid: Cid, pub session: Option>,