From 1d1f8ae60ff53d5e08351ba1cd576934ba97f060 Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Tue, 17 Jun 2025 13:54:55 -0700 Subject: [PATCH 1/5] Atproto, add getters to StrongRef --- atproto/src/types.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/atproto/src/types.rs b/atproto/src/types.rs index ca30ded..70241b8 100644 --- a/atproto/src/types.rs +++ b/atproto/src/types.rs @@ -68,3 +68,12 @@ pub struct StrongRef { content: T, cid: Cid, } + +impl StrongRef { + pub fn get_content(&self) -> &T { + &self.content + } + pub fn get_cid(&self) -> &Cid { + &self.cid + } +} From df1da0905d09368533033d8c3fe650c83f78764c Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Tue, 17 Jun 2025 13:55:30 -0700 Subject: [PATCH 2/5] Db, add atproto error support --- db/migrations/20250612223204_initial_schema.sql | 6 +++--- db/src/error.rs | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/db/migrations/20250612223204_initial_schema.sql b/db/migrations/20250612223204_initial_schema.sql index f52b19b..b95abd4 100644 --- a/db/migrations/20250612223204_initial_schema.sql +++ b/db/migrations/20250612223204_initial_schema.sql @@ -19,7 +19,7 @@ CREATE TABLE session ( label VARCHAR, -- Participants in participant - created_at VARCHAR NOT NULL, + created_at VARCHAR, indexed_at VARCHAR NOT NULL, sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL ); @@ -33,13 +33,13 @@ CREATE TABLE activity ( -- Progress in progress performed_at VARCHAR, - created_at VARCHAR NOT NULL, + created_at VARCHAR, indexed_at VARCHAR NOT NULL, sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL ); CREATE TABLE participant ( participantdid VARCHAR NOT NULL, - session VARCHAR NOT NULL, + sessionuri VARCHAR NOT NULL, role VARCHAR NOT NULL ); diff --git a/db/src/error.rs b/db/src/error.rs index ed1b585..1c330bc 100644 --- a/db/src/error.rs +++ b/db/src/error.rs @@ -3,6 +3,8 @@ pub enum Error { #[error("Database Implementation Error: {0}")] Backend(#[from] sqlx::Error), + #[error("AT Protocol Implementation Error: {0}")] + Atproto(#[from] atproto::error::Error), } pub type Result = std::result::Result; From 8160b161f583dd22d42f09ecf70a7c02c7139813 Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Tue, 17 Jun 2025 14:06:21 -0700 Subject: [PATCH 3/5] Db, rename spoor to direct, implement sessions --- db/src/interfaces/direct.rs | 2 + .../{spoor.rs => direct/functions.rs} | 119 ++++++------------ db/src/interfaces/direct/types.rs | 59 +++++++++ db/src/interfaces/mod.rs | 2 +- 4 files changed, 103 insertions(+), 79 deletions(-) create mode 100644 db/src/interfaces/direct.rs rename db/src/interfaces/{spoor.rs => direct/functions.rs} (58%) create mode 100644 db/src/interfaces/direct/types.rs diff --git a/db/src/interfaces/direct.rs b/db/src/interfaces/direct.rs new file mode 100644 index 0000000..d0b4ddc --- /dev/null +++ b/db/src/interfaces/direct.rs @@ -0,0 +1,2 @@ +pub mod types; +pub mod functions; diff --git a/db/src/interfaces/spoor.rs b/db/src/interfaces/direct/functions.rs similarity index 58% rename from db/src/interfaces/spoor.rs rename to db/src/interfaces/direct/functions.rs index 4a27dcb..caf9cbd 100644 --- a/db/src/interfaces/spoor.rs +++ b/db/src/interfaces/direct/functions.rs @@ -1,59 +1,22 @@ -use crate::Error; -use atproto::{ - Cid, - Uri, +use atproto::types::{ Datetime, - Did, - StrongRef, - Handle, + Uri, +}; +use crate::{ + Error, + interfaces::direct::types::{ + Participant, + Session, + User + }, }; use sqlx::{ - PgTransaction, + Transaction, + Postgres, PgPool, query, }; -pub struct Activity { - pub authority: Uri, - - pub cid: Option, - pub session: Option>, - pub progress: Option, - pub performed_at: Option, - pub created_at: Option, -} - -pub struct Session { - pub uri: Uri, - - pub cid: Option, - pub content: Option>, - pub label: Option, - pub created_at: Option, - pub other_participants: Option>, -} - -pub struct User { - pub did: Did, - pub handle: Option, -} - -#[non_exhaustive] -pub enum Participant { - Owner(User), - Added(User), -} - -#[non_exhaustive] -pub enum Content { - UnknownContent -} - -#[non_exhaustive] -pub enum Progress { - UnknownProgress -} - pub async fn ingest_session( db: PgPool, session: Session ) -> Result<(), Error> { @@ -63,50 +26,50 @@ pub async fn ingest_session( } async fn write_session( - tr: &mut PgTransaction<'_>, session: Session + tr: &mut Transaction<'_, Postgres>, session: Session ) -> Result<(), Error> { - - let (contenturi, contentcid): (Option, String) = - match session.content { - Some(sr) => (Some(sr.content), sr.cid.to_string()), - None => (None, "".to_string()), - }; - query!(r#" - INSERT INTO sessions( - sessionuri, sessioncid, label, created_at, contenturi, contentcid - ) VALUES ($1, $2, $3, $4, $5, $6) + INSERT INTO + session(uri, cid, owner, content, contentcid, label, created_at, indexed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) "#, - session.uri, - session.cid.map_or("", |cid| cid.to_string()), - session.label.unwrap_or(""), - session.created_at.map_or("", |dt| (*dt.as_str()).to_string()), - contenturi, - contentcid, - ).execute(&mut tr).await?; + &session.uri.to_string(), + &session.cid.to_string(), + &session.uri.authority_as_did().to_string(), + &session.content.get_content().to_string(), + &session.content.get_cid().to_string(), + session.label, + session.created_at.map(|dt| dt.to_string()), + &Datetime::now()?.to_string() + ).execute(&mut **tr).await?; - session.other_participants.and_then(|participants| { + if let Some(participants) = session.other_participants { for participant in participants { - write_participant(tr, participant, session.uri).await? + write_participant(tr, &participant, &session.uri).await? } - }).unwrap_or(Ok(())) + } + + Ok(()) } async fn write_participant( - tr: &mut PgTransaction<'_>, participant: Participant, sessionuri: Uri + tr: &mut Transaction<'_, Postgres>, participant: &Participant, sessionuri: &Uri ) -> Result<(), Error> { - let (participantType, user): (String, User) = match participant { + let (participant_type, user): (String, &User) = match participant { Participant::Owner(user) => ("Owner".to_string(), user), Participant::Added(user) => ("Participant".to_string(), user), }; query!(r#" - INSERT INTO participants( - sessionuri, userdid, role - ) VALUES ($1, $2, $3) - "#, sessionuri.to_string(), user.did.to_string, participantType - ).execute(&mut tr).await?; - + INSERT INTO + participant(participantdid, sessionuri, role) + VALUES ($1, $2, $3) + "#, + user.did.to_string(), + sessionuri.to_string(), + participant_type, + ).execute(&mut **tr).await?; + Ok(()) } diff --git a/db/src/interfaces/direct/types.rs b/db/src/interfaces/direct/types.rs new file mode 100644 index 0000000..91acc73 --- /dev/null +++ b/db/src/interfaces/direct/types.rs @@ -0,0 +1,59 @@ +use crate::Error; +use atproto::types::{ + Cid, + Uri, + Datetime, + Did, + StrongRef, + Handle, +}; +use std::fmt::{Display, Formatter, Result as FmtResult}; + +pub struct Activity { + pub authority: Uri, + pub cid: Cid, + + pub session: Option>, + pub progress: Option, + pub performed_at: Option, + pub created_at: Option, +} + +pub struct Session { + pub uri: Uri, + pub cid: Cid, + + pub content: StrongRef, + pub label: Option, + pub created_at: Option, + pub other_participants: Option>, +} + +pub struct User { + pub did: Did, + pub handle: Option, +} + +#[non_exhaustive] +pub enum Participant { + Owner(User), + Added(User), +} + +#[non_exhaustive] +pub enum Content { + UnknownContent +} + +impl Display for Content { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", match self { + Content::UnknownContent => "UnknownContentType", + }) + } +} + +#[non_exhaustive] +pub enum Progress { + UnknownProgress +} diff --git a/db/src/interfaces/mod.rs b/db/src/interfaces/mod.rs index d13ba36..afdae97 100644 --- a/db/src/interfaces/mod.rs +++ b/db/src/interfaces/mod.rs @@ -1 +1 @@ -pub mod spoor; +pub mod direct; From 2fa75902715d36b7a7f202fe5ef4051eea8f15af Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Tue, 17 Jun 2025 14:38:23 -0700 Subject: [PATCH 4/5] Atproto, add extraction of values from StrongRef --- atproto/src/types.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/atproto/src/types.rs b/atproto/src/types.rs index 70241b8..26ee60d 100644 --- a/atproto/src/types.rs +++ b/atproto/src/types.rs @@ -73,6 +73,11 @@ impl StrongRef { pub fn get_content(&self) -> &T { &self.content } + + pub fn extract_content(self) -> (T, Cid) { + (self.content, self.cid) + } + pub fn get_cid(&self) -> &Cid { &self.cid } From 2dfbc314b2612996f93cd5f36326eba74a1f51f3 Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Tue, 17 Jun 2025 14:39:35 -0700 Subject: [PATCH 5/5] Db, add activity ingestor, fix type name activity has its type name Authority instead of Uri for some reason? So that is no longer an issue. EVERYTHING IS STILL UNTESTED BABY~ --- db/src/interfaces/direct/functions.rs | 109 ++++++++++---------------- db/src/interfaces/direct/types.rs | 3 +- 2 files changed, 43 insertions(+), 69 deletions(-) diff --git a/db/src/interfaces/direct/functions.rs b/db/src/interfaces/direct/functions.rs index caf9cbd..a61ab37 100644 --- a/db/src/interfaces/direct/functions.rs +++ b/db/src/interfaces/direct/functions.rs @@ -5,6 +5,7 @@ use atproto::types::{ use crate::{ Error, interfaces::direct::types::{ + Activity, Participant, Session, User @@ -17,6 +18,14 @@ use sqlx::{ query, }; +pub async fn ingest_activity( + db: PgPool, activity: Activity +) -> Result<(), Error> { + let mut transaction = db.begin().await?; + write_activity(&mut transaction, activity).await?; + transaction.commit().await.map_err(Error::Backend) +} + pub async fn ingest_session( db: PgPool, session: Session ) -> Result<(), Error> { @@ -25,6 +34,39 @@ pub async fn ingest_session( transaction.commit().await.map_err(Error::Backend) } +async fn write_activity( + tr: &mut Transaction<'_, Postgres>, activity: Activity +) -> Result<(), Error> { + let (sessionuri, sessioncid) = match activity.session { + Some(sr) => { + let (session, cid) = sr.extract_content(); + let sessionuri = session.uri.to_string(); + write_session(tr, session).await?; + (sessionuri, cid.to_string()) + } + None => ("".to_string(), "".to_string()), + }; + + query!(r#" + INSERT INTO + activity(uri, cid, session, sessioncid, performed_at, created_at, indexed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7) + "#, + &activity.uri.to_string(), + &activity.cid.to_string(), + sessionuri, + sessioncid, + activity.performed_at.map(|dt| dt.to_string()), + activity.created_at.map(|dt| dt.to_string()), + &Datetime::now()?.to_string(), + ).execute(&mut **tr).await?; + + // TODO: Handle Progress + + Ok(()) +} + + async fn write_session( tr: &mut Transaction<'_, Postgres>, session: Session ) -> Result<(), Error> { @@ -72,70 +114,3 @@ async fn write_participant( Ok(()) } - -// pub fn injest_activity(db: PgPool, activity: Activity) -> Result<(), Error> { -// todo!(); -// } -// -// let mut transaction = db.begin().await?; -// -// query!(r#" -// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) -// "#, -// session.sessionuri, session.label -// ).execute(&mut *transaction).await?; -// -// for participant in session.participants { -// query!(r#" -// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) -// "#, -// session.sessionuri, participant.userdid, participant.role.to_string() -// ).execute(&mut *transaction).await?; -// } -// -// transaction.commit().await -// } -// -// pub async fn ingest_participant(db: PgPool, participant: Participant) -> Result<(), Error> { -// query!(r#" -// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) -// "#, -// session.sessionuri, session.label -// ).execute(&mut *transaction).await?; -// -// for participant in session.participants { -// query!(r#" -// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) -// "#, -// session.sessionuri, participant.userdid, participant.role.to_string() -// ).execute(&mut *transaction).await?; -// } -// -// transaction.commit().await -// } -// - - // pub async fn add_user(&self, user: &User) -> Result<()> { - // query!(r#" - // INSERT INTO users(userdid, handle) VALUES ($1, $2) - // "#, - // user.userdid, user.handle - // ).execute(self.pool).await?; - // Ok(()) - // } - // - // pub async fn add_session(&self, session: &Session) -> Result<()> { - // } - // - // pub async fn add_participant(&self, session: Session, - // participant: Participant) -> Result { - // query!(r#" - // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) - // "#, - // session.sessionuri, participant.userdid, participant.role.to_string() - // ).execute(self.pool).await?; - // - // session.participants.push(participant); - // - // Ok(session) - // } diff --git a/db/src/interfaces/direct/types.rs b/db/src/interfaces/direct/types.rs index 91acc73..4a3ea65 100644 --- a/db/src/interfaces/direct/types.rs +++ b/db/src/interfaces/direct/types.rs @@ -1,4 +1,3 @@ -use crate::Error; use atproto::types::{ Cid, Uri, @@ -10,7 +9,7 @@ use atproto::types::{ use std::fmt::{Display, Formatter, Result as FmtResult}; pub struct Activity { - pub authority: Uri, + pub uri: Uri, pub cid: Cid, pub session: Option>,