diff --git a/atproto/src/types.rs b/atproto/src/types.rs index 26ee60d..ca30ded 100644 --- a/atproto/src/types.rs +++ b/atproto/src/types.rs @@ -68,17 +68,3 @@ pub struct StrongRef { content: T, cid: Cid, } - -impl StrongRef { - pub fn get_content(&self) -> &T { - &self.content - } - - pub fn extract_content(self) -> (T, Cid) { - (self.content, self.cid) - } - - pub fn get_cid(&self) -> &Cid { - &self.cid - } -} diff --git a/db/migrations/20250612223204_initial_schema.sql b/db/migrations/20250612223204_initial_schema.sql index b95abd4..f52b19b 100644 --- a/db/migrations/20250612223204_initial_schema.sql +++ b/db/migrations/20250612223204_initial_schema.sql @@ -19,7 +19,7 @@ CREATE TABLE session ( label VARCHAR, -- Participants in participant - created_at VARCHAR, + created_at VARCHAR NOT NULL, indexed_at VARCHAR NOT NULL, sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL ); @@ -33,13 +33,13 @@ CREATE TABLE activity ( -- Progress in progress performed_at VARCHAR, - created_at VARCHAR, + created_at VARCHAR NOT NULL, indexed_at VARCHAR NOT NULL, sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL ); CREATE TABLE participant ( participantdid VARCHAR NOT NULL, - sessionuri VARCHAR NOT NULL, + session VARCHAR NOT NULL, role VARCHAR NOT NULL ); diff --git a/db/src/error.rs b/db/src/error.rs index 1c330bc..ed1b585 100644 --- a/db/src/error.rs +++ b/db/src/error.rs @@ -3,8 +3,6 @@ pub enum Error { #[error("Database Implementation Error: {0}")] Backend(#[from] sqlx::Error), - #[error("AT Protocol Implementation Error: {0}")] - Atproto(#[from] atproto::error::Error), } pub type Result = std::result::Result; diff --git a/db/src/interfaces/direct.rs b/db/src/interfaces/direct.rs deleted file mode 100644 index d0b4ddc..0000000 --- a/db/src/interfaces/direct.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod types; -pub mod functions; diff --git a/db/src/interfaces/direct/functions.rs b/db/src/interfaces/direct/functions.rs deleted file mode 100644 index a61ab37..0000000 --- a/db/src/interfaces/direct/functions.rs +++ /dev/null @@ -1,116 +0,0 @@ -use atproto::types::{ - Datetime, - Uri, -}; -use crate::{ - Error, - interfaces::direct::types::{ - Activity, - Participant, - Session, - User - }, -}; -use sqlx::{ - Transaction, - Postgres, - PgPool, - query, -}; - -pub async fn ingest_activity( - db: PgPool, activity: Activity -) -> Result<(), Error> { - let mut transaction = db.begin().await?; - write_activity(&mut transaction, activity).await?; - transaction.commit().await.map_err(Error::Backend) -} - -pub async fn ingest_session( - db: PgPool, session: Session -) -> Result<(), Error> { - let mut transaction = db.begin().await?; - write_session(&mut transaction, session).await?; - transaction.commit().await.map_err(Error::Backend) -} - -async fn write_activity( - tr: &mut Transaction<'_, Postgres>, activity: Activity -) -> Result<(), Error> { - let (sessionuri, sessioncid) = match activity.session { - Some(sr) => { - let (session, cid) = sr.extract_content(); - let sessionuri = session.uri.to_string(); - write_session(tr, session).await?; - (sessionuri, cid.to_string()) - } - None => ("".to_string(), "".to_string()), - }; - - query!(r#" - INSERT INTO - activity(uri, cid, session, sessioncid, performed_at, created_at, indexed_at) - VALUES ($1, $2, $3, $4, $5, $6, $7) - "#, - &activity.uri.to_string(), - &activity.cid.to_string(), - sessionuri, - sessioncid, - activity.performed_at.map(|dt| dt.to_string()), - activity.created_at.map(|dt| dt.to_string()), - &Datetime::now()?.to_string(), - ).execute(&mut **tr).await?; - - // TODO: Handle Progress - - Ok(()) -} - - -async fn write_session( - tr: &mut Transaction<'_, Postgres>, session: Session -) -> Result<(), Error> { - query!(r#" - INSERT INTO - session(uri, cid, owner, content, contentcid, label, created_at, indexed_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - "#, - &session.uri.to_string(), - &session.cid.to_string(), - &session.uri.authority_as_did().to_string(), - &session.content.get_content().to_string(), - &session.content.get_cid().to_string(), - session.label, - session.created_at.map(|dt| dt.to_string()), - &Datetime::now()?.to_string() - ).execute(&mut **tr).await?; - - if let Some(participants) = session.other_participants { - for participant in participants { - write_participant(tr, &participant, &session.uri).await? - } - } - - Ok(()) -} - -async fn write_participant( - tr: &mut Transaction<'_, Postgres>, participant: &Participant, sessionuri: &Uri -) -> Result<(), Error> { - let (participant_type, user): (String, &User) = match participant { - Participant::Owner(user) => ("Owner".to_string(), user), - Participant::Added(user) => ("Participant".to_string(), user), - }; - - query!(r#" - INSERT INTO - participant(participantdid, sessionuri, role) - VALUES ($1, $2, $3) - "#, - user.did.to_string(), - sessionuri.to_string(), - participant_type, - ).execute(&mut **tr).await?; - - Ok(()) -} diff --git a/db/src/interfaces/direct/types.rs b/db/src/interfaces/direct/types.rs deleted file mode 100644 index 4a3ea65..0000000 --- a/db/src/interfaces/direct/types.rs +++ /dev/null @@ -1,58 +0,0 @@ -use atproto::types::{ - Cid, - Uri, - Datetime, - Did, - StrongRef, - Handle, -}; -use std::fmt::{Display, Formatter, Result as FmtResult}; - -pub struct Activity { - pub uri: Uri, - pub cid: Cid, - - pub session: Option>, - pub progress: Option, - pub performed_at: Option, - pub created_at: Option, -} - -pub struct Session { - pub uri: Uri, - pub cid: Cid, - - pub content: StrongRef, - pub label: Option, - pub created_at: Option, - pub other_participants: Option>, -} - -pub struct User { - pub did: Did, - pub handle: Option, -} - -#[non_exhaustive] -pub enum Participant { - Owner(User), - Added(User), -} - -#[non_exhaustive] -pub enum Content { - UnknownContent -} - -impl Display for Content { - fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - write!(f, "{}", match self { - Content::UnknownContent => "UnknownContentType", - }) - } -} - -#[non_exhaustive] -pub enum Progress { - UnknownProgress -} diff --git a/db/src/interfaces/mod.rs b/db/src/interfaces/mod.rs index afdae97..d13ba36 100644 --- a/db/src/interfaces/mod.rs +++ b/db/src/interfaces/mod.rs @@ -1 +1 @@ -pub mod direct; +pub mod spoor; diff --git a/db/src/interfaces/spoor.rs b/db/src/interfaces/spoor.rs new file mode 100644 index 0000000..4a27dcb --- /dev/null +++ b/db/src/interfaces/spoor.rs @@ -0,0 +1,178 @@ +use crate::Error; +use atproto::{ + Cid, + Uri, + Datetime, + Did, + StrongRef, + Handle, +}; +use sqlx::{ + PgTransaction, + PgPool, + query, +}; + +pub struct Activity { + pub authority: Uri, + + pub cid: Option, + pub session: Option>, + pub progress: Option, + pub performed_at: Option, + pub created_at: Option, +} + +pub struct Session { + pub uri: Uri, + + pub cid: Option, + pub content: Option>, + pub label: Option, + pub created_at: Option, + pub other_participants: Option>, +} + +pub struct User { + pub did: Did, + pub handle: Option, +} + +#[non_exhaustive] +pub enum Participant { + Owner(User), + Added(User), +} + +#[non_exhaustive] +pub enum Content { + UnknownContent +} + +#[non_exhaustive] +pub enum Progress { + UnknownProgress +} + +pub async fn ingest_session( + db: PgPool, session: Session +) -> Result<(), Error> { + let mut transaction = db.begin().await?; + write_session(&mut transaction, session).await?; + transaction.commit().await.map_err(Error::Backend) +} + +async fn write_session( + tr: &mut PgTransaction<'_>, session: Session +) -> Result<(), Error> { + + let (contenturi, contentcid): (Option, String) = + match session.content { + Some(sr) => (Some(sr.content), sr.cid.to_string()), + None => (None, "".to_string()), + }; + + query!(r#" + INSERT INTO sessions( + sessionuri, sessioncid, label, created_at, contenturi, contentcid + ) VALUES ($1, $2, $3, $4, $5, $6) + "#, + session.uri, + session.cid.map_or("", |cid| cid.to_string()), + session.label.unwrap_or(""), + session.created_at.map_or("", |dt| (*dt.as_str()).to_string()), + contenturi, + contentcid, + ).execute(&mut tr).await?; + + session.other_participants.and_then(|participants| { + for participant in participants { + write_participant(tr, participant, session.uri).await? + } + }).unwrap_or(Ok(())) +} + +async fn write_participant( + tr: &mut PgTransaction<'_>, participant: Participant, sessionuri: Uri +) -> Result<(), Error> { + let (participantType, user): (String, User) = match participant { + Participant::Owner(user) => ("Owner".to_string(), user), + Participant::Added(user) => ("Participant".to_string(), user), + }; + + query!(r#" + INSERT INTO participants( + sessionuri, userdid, role + ) VALUES ($1, $2, $3) + "#, sessionuri.to_string(), user.did.to_string, participantType + ).execute(&mut tr).await?; + + Ok(()) +} + +// pub fn injest_activity(db: PgPool, activity: Activity) -> Result<(), Error> { +// todo!(); +// } +// +// let mut transaction = db.begin().await?; +// +// query!(r#" +// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) +// "#, +// session.sessionuri, session.label +// ).execute(&mut *transaction).await?; +// +// for participant in session.participants { +// query!(r#" +// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) +// "#, +// session.sessionuri, participant.userdid, participant.role.to_string() +// ).execute(&mut *transaction).await?; +// } +// +// transaction.commit().await +// } +// +// pub async fn ingest_participant(db: PgPool, participant: Participant) -> Result<(), Error> { +// query!(r#" +// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) +// "#, +// session.sessionuri, session.label +// ).execute(&mut *transaction).await?; +// +// for participant in session.participants { +// query!(r#" +// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) +// "#, +// session.sessionuri, participant.userdid, participant.role.to_string() +// ).execute(&mut *transaction).await?; +// } +// +// transaction.commit().await +// } +// + + // pub async fn add_user(&self, user: &User) -> Result<()> { + // query!(r#" + // INSERT INTO users(userdid, handle) VALUES ($1, $2) + // "#, + // user.userdid, user.handle + // ).execute(self.pool).await?; + // Ok(()) + // } + // + // pub async fn add_session(&self, session: &Session) -> Result<()> { + // } + // + // pub async fn add_participant(&self, session: Session, + // participant: Participant) -> Result { + // query!(r#" + // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) + // "#, + // session.sessionuri, participant.userdid, participant.role.to_string() + // ).execute(self.pool).await?; + // + // session.participants.push(participant); + // + // Ok(session) + // }