From 4a767ea1ad000c0e653b9f32144dabc4e1da0c6a Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Tue, 17 Jun 2025 13:55:30 -0700 Subject: [PATCH] Db, Commit two, everything is looking pretty Db, add atproto error support Db, rename spoor to direct, implement sessions Db, add activity ingestor, fix type name activity has its type name Authority instead of Uri for some reason? So that is no longer an issue. EVERYTHING IS STILL UNTESTED BABY~ --- .../20250612223204_initial_schema.sql | 6 +- db/src/error.rs | 2 + db/src/interfaces/direct.rs | 2 + db/src/interfaces/direct/functions.rs | 116 ++++++++++++ db/src/interfaces/direct/types.rs | 58 ++++++ db/src/interfaces/mod.rs | 2 +- db/src/interfaces/spoor.rs | 178 ------------------ 7 files changed, 182 insertions(+), 182 deletions(-) create mode 100644 db/src/interfaces/direct.rs create mode 100644 db/src/interfaces/direct/functions.rs create mode 100644 db/src/interfaces/direct/types.rs delete mode 100644 db/src/interfaces/spoor.rs diff --git a/db/migrations/20250612223204_initial_schema.sql b/db/migrations/20250612223204_initial_schema.sql index f52b19b..b95abd4 100644 --- a/db/migrations/20250612223204_initial_schema.sql +++ b/db/migrations/20250612223204_initial_schema.sql @@ -19,7 +19,7 @@ CREATE TABLE session ( label VARCHAR, -- Participants in participant - created_at VARCHAR NOT NULL, + created_at VARCHAR, indexed_at VARCHAR NOT NULL, sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL ); @@ -33,13 +33,13 @@ CREATE TABLE activity ( -- Progress in progress performed_at VARCHAR, - created_at VARCHAR NOT NULL, + created_at VARCHAR, indexed_at VARCHAR NOT NULL, sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL ); CREATE TABLE participant ( participantdid VARCHAR NOT NULL, - session VARCHAR NOT NULL, + sessionuri VARCHAR NOT NULL, role VARCHAR NOT NULL ); diff --git a/db/src/error.rs b/db/src/error.rs index ed1b585..1c330bc 100644 --- a/db/src/error.rs +++ b/db/src/error.rs @@ -3,6 +3,8 @@ pub enum Error { #[error("Database Implementation Error: {0}")] Backend(#[from] sqlx::Error), + #[error("AT Protocol Implementation Error: {0}")] + Atproto(#[from] atproto::error::Error), } pub type Result = std::result::Result; diff --git a/db/src/interfaces/direct.rs b/db/src/interfaces/direct.rs new file mode 100644 index 0000000..d0b4ddc --- /dev/null +++ b/db/src/interfaces/direct.rs @@ -0,0 +1,2 @@ +pub mod types; +pub mod functions; diff --git a/db/src/interfaces/direct/functions.rs b/db/src/interfaces/direct/functions.rs new file mode 100644 index 0000000..a61ab37 --- /dev/null +++ b/db/src/interfaces/direct/functions.rs @@ -0,0 +1,116 @@ +use atproto::types::{ + Datetime, + Uri, +}; +use crate::{ + Error, + interfaces::direct::types::{ + Activity, + Participant, + Session, + User + }, +}; +use sqlx::{ + Transaction, + Postgres, + PgPool, + query, +}; + +pub async fn ingest_activity( + db: PgPool, activity: Activity +) -> Result<(), Error> { + let mut transaction = db.begin().await?; + write_activity(&mut transaction, activity).await?; + transaction.commit().await.map_err(Error::Backend) +} + +pub async fn ingest_session( + db: PgPool, session: Session +) -> Result<(), Error> { + let mut transaction = db.begin().await?; + write_session(&mut transaction, session).await?; + transaction.commit().await.map_err(Error::Backend) +} + +async fn write_activity( + tr: &mut Transaction<'_, Postgres>, activity: Activity +) -> Result<(), Error> { + let (sessionuri, sessioncid) = match activity.session { + Some(sr) => { + let (session, cid) = sr.extract_content(); + let sessionuri = session.uri.to_string(); + write_session(tr, session).await?; + (sessionuri, cid.to_string()) + } + None => ("".to_string(), "".to_string()), + }; + + query!(r#" + INSERT INTO + activity(uri, cid, session, sessioncid, performed_at, created_at, indexed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7) + "#, + &activity.uri.to_string(), + &activity.cid.to_string(), + sessionuri, + sessioncid, + activity.performed_at.map(|dt| dt.to_string()), + activity.created_at.map(|dt| dt.to_string()), + &Datetime::now()?.to_string(), + ).execute(&mut **tr).await?; + + // TODO: Handle Progress + + Ok(()) +} + + +async fn write_session( + tr: &mut Transaction<'_, Postgres>, session: Session +) -> Result<(), Error> { + query!(r#" + INSERT INTO + session(uri, cid, owner, content, contentcid, label, created_at, indexed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + "#, + &session.uri.to_string(), + &session.cid.to_string(), + &session.uri.authority_as_did().to_string(), + &session.content.get_content().to_string(), + &session.content.get_cid().to_string(), + session.label, + session.created_at.map(|dt| dt.to_string()), + &Datetime::now()?.to_string() + ).execute(&mut **tr).await?; + + if let Some(participants) = session.other_participants { + for participant in participants { + write_participant(tr, &participant, &session.uri).await? + } + } + + Ok(()) +} + +async fn write_participant( + tr: &mut Transaction<'_, Postgres>, participant: &Participant, sessionuri: &Uri +) -> Result<(), Error> { + let (participant_type, user): (String, &User) = match participant { + Participant::Owner(user) => ("Owner".to_string(), user), + Participant::Added(user) => ("Participant".to_string(), user), + }; + + query!(r#" + INSERT INTO + participant(participantdid, sessionuri, role) + VALUES ($1, $2, $3) + "#, + user.did.to_string(), + sessionuri.to_string(), + participant_type, + ).execute(&mut **tr).await?; + + Ok(()) +} diff --git a/db/src/interfaces/direct/types.rs b/db/src/interfaces/direct/types.rs new file mode 100644 index 0000000..4a3ea65 --- /dev/null +++ b/db/src/interfaces/direct/types.rs @@ -0,0 +1,58 @@ +use atproto::types::{ + Cid, + Uri, + Datetime, + Did, + StrongRef, + Handle, +}; +use std::fmt::{Display, Formatter, Result as FmtResult}; + +pub struct Activity { + pub uri: Uri, + pub cid: Cid, + + pub session: Option>, + pub progress: Option, + pub performed_at: Option, + pub created_at: Option, +} + +pub struct Session { + pub uri: Uri, + pub cid: Cid, + + pub content: StrongRef, + pub label: Option, + pub created_at: Option, + pub other_participants: Option>, +} + +pub struct User { + pub did: Did, + pub handle: Option, +} + +#[non_exhaustive] +pub enum Participant { + Owner(User), + Added(User), +} + +#[non_exhaustive] +pub enum Content { + UnknownContent +} + +impl Display for Content { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", match self { + Content::UnknownContent => "UnknownContentType", + }) + } +} + +#[non_exhaustive] +pub enum Progress { + UnknownProgress +} diff --git a/db/src/interfaces/mod.rs b/db/src/interfaces/mod.rs index d13ba36..afdae97 100644 --- a/db/src/interfaces/mod.rs +++ b/db/src/interfaces/mod.rs @@ -1 +1 @@ -pub mod spoor; +pub mod direct; diff --git a/db/src/interfaces/spoor.rs b/db/src/interfaces/spoor.rs deleted file mode 100644 index 4a27dcb..0000000 --- a/db/src/interfaces/spoor.rs +++ /dev/null @@ -1,178 +0,0 @@ -use crate::Error; -use atproto::{ - Cid, - Uri, - Datetime, - Did, - StrongRef, - Handle, -}; -use sqlx::{ - PgTransaction, - PgPool, - query, -}; - -pub struct Activity { - pub authority: Uri, - - pub cid: Option, - pub session: Option>, - pub progress: Option, - pub performed_at: Option, - pub created_at: Option, -} - -pub struct Session { - pub uri: Uri, - - pub cid: Option, - pub content: Option>, - pub label: Option, - pub created_at: Option, - pub other_participants: Option>, -} - -pub struct User { - pub did: Did, - pub handle: Option, -} - -#[non_exhaustive] -pub enum Participant { - Owner(User), - Added(User), -} - -#[non_exhaustive] -pub enum Content { - UnknownContent -} - -#[non_exhaustive] -pub enum Progress { - UnknownProgress -} - -pub async fn ingest_session( - db: PgPool, session: Session -) -> Result<(), Error> { - let mut transaction = db.begin().await?; - write_session(&mut transaction, session).await?; - transaction.commit().await.map_err(Error::Backend) -} - -async fn write_session( - tr: &mut PgTransaction<'_>, session: Session -) -> Result<(), Error> { - - let (contenturi, contentcid): (Option, String) = - match session.content { - Some(sr) => (Some(sr.content), sr.cid.to_string()), - None => (None, "".to_string()), - }; - - query!(r#" - INSERT INTO sessions( - sessionuri, sessioncid, label, created_at, contenturi, contentcid - ) VALUES ($1, $2, $3, $4, $5, $6) - "#, - session.uri, - session.cid.map_or("", |cid| cid.to_string()), - session.label.unwrap_or(""), - session.created_at.map_or("", |dt| (*dt.as_str()).to_string()), - contenturi, - contentcid, - ).execute(&mut tr).await?; - - session.other_participants.and_then(|participants| { - for participant in participants { - write_participant(tr, participant, session.uri).await? - } - }).unwrap_or(Ok(())) -} - -async fn write_participant( - tr: &mut PgTransaction<'_>, participant: Participant, sessionuri: Uri -) -> Result<(), Error> { - let (participantType, user): (String, User) = match participant { - Participant::Owner(user) => ("Owner".to_string(), user), - Participant::Added(user) => ("Participant".to_string(), user), - }; - - query!(r#" - INSERT INTO participants( - sessionuri, userdid, role - ) VALUES ($1, $2, $3) - "#, sessionuri.to_string(), user.did.to_string, participantType - ).execute(&mut tr).await?; - - Ok(()) -} - -// pub fn injest_activity(db: PgPool, activity: Activity) -> Result<(), Error> { -// todo!(); -// } -// -// let mut transaction = db.begin().await?; -// -// query!(r#" -// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) -// "#, -// session.sessionuri, session.label -// ).execute(&mut *transaction).await?; -// -// for participant in session.participants { -// query!(r#" -// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) -// "#, -// session.sessionuri, participant.userdid, participant.role.to_string() -// ).execute(&mut *transaction).await?; -// } -// -// transaction.commit().await -// } -// -// pub async fn ingest_participant(db: PgPool, participant: Participant) -> Result<(), Error> { -// query!(r#" -// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) -// "#, -// session.sessionuri, session.label -// ).execute(&mut *transaction).await?; -// -// for participant in session.participants { -// query!(r#" -// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) -// "#, -// session.sessionuri, participant.userdid, participant.role.to_string() -// ).execute(&mut *transaction).await?; -// } -// -// transaction.commit().await -// } -// - - // pub async fn add_user(&self, user: &User) -> Result<()> { - // query!(r#" - // INSERT INTO users(userdid, handle) VALUES ($1, $2) - // "#, - // user.userdid, user.handle - // ).execute(self.pool).await?; - // Ok(()) - // } - // - // pub async fn add_session(&self, session: &Session) -> Result<()> { - // } - // - // pub async fn add_participant(&self, session: Session, - // participant: Participant) -> Result { - // query!(r#" - // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) - // "#, - // session.sessionuri, participant.userdid, participant.role.to_string() - // ).execute(self.pool).await?; - // - // session.participants.push(participant); - // - // Ok(session) - // }