diff --git a/Cargo.lock b/Cargo.lock index dbe12ef..89997b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -563,7 +563,10 @@ dependencies = [ name = "db" version = "0.1.0" dependencies = [ + "async-trait", + "atproto", "sqlx", + "thiserror 2.0.12", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 908c860..9129ba7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ resolver = "3" members = [ "api", "atproto","db", "ingestor"] [workspace.dependencies] +async-trait = "0.1.88" atproto = { path = "./atproto" } serde = "1.0.219" serde_json = "1.0.140" diff --git a/atproto/src/lib.rs b/atproto/src/lib.rs index 21f8919..15a875a 100644 --- a/atproto/src/lib.rs +++ b/atproto/src/lib.rs @@ -4,14 +4,24 @@ use core::str::FromStr; pub use atrium_api::types::{ Collection, string::{ + AtIdentifier as Authority, + Cid, + Datetime, + Did, Nsid, RecordKey, - AtIdentifier as Authority, + Tid, + Handle, } }; pub mod lexicons; +pub struct StrongRef { + pub content: T, + pub cid: Cid, +} + pub struct Uri { whole: String, // These fields could be useful in the future, diff --git a/db/Cargo.toml b/db/Cargo.toml index ded38de..dd1d0b9 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -4,5 +4,8 @@ version = "0.1.0" edition = "2024" [dependencies] +thiserror = "2.0.12" +atproto.workspace = true +async-trait.workspace = true sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] } -tokio = "1.45.0" +tokio.workspace = true diff --git a/db/src/connection.rs b/db/src/connection.rs index ba883cc..1c66086 100644 --- a/db/src/connection.rs +++ b/db/src/connection.rs @@ -12,8 +12,8 @@ use sqlx::{ }; use std::string::ToString; -pub struct Db { - pool: Pool +pub struct Db { + pool: Pool } #[non_exhaustive] @@ -55,45 +55,4 @@ impl Db { Ok(Db { pool }) } // - // pub async fn add_user(&self, user: &User) -> Result<()> { - // query!(r#" - // INSERT INTO users(userdid, handle) VALUES ($1, $2) - // "#, - // user.userdid, user.handle - // ).execute(self.pool).await?; - // Ok(()) - // } - // - // pub async fn add_session(&self, session: &Session) -> Result<()> { - // let mut transaction = self.pool.begin().await?; - // - // query!(r#" - // INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) - // "#, - // session.sessionuri, session.label - // ).execute(&mut *transaction).await?; - // - // for participant in session.participants { - // query!(r#" - // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) - // "#, - // session.sessionuri, participant.userdid, participant.role.to_string() - // ).execute(&mut *transaction).await?; - // } - // - // transaction.commit().await - // } - // - // pub async fn add_participant(&self, session: Session, - // participant: Participant) -> Result { - // query!(r#" - // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) - // "#, - // session.sessionuri, participant.userdid, participant.role.to_string() - // ).execute(self.pool).await?; - // - // session.participants.push(participant); - // - // Ok(session) - // } } diff --git a/db/src/interfaces.rs b/db/src/interfaces.rs deleted file mode 100644 index d3629e1..0000000 --- a/db/src/interfaces.rs +++ /dev/null @@ -1,15 +0,0 @@ -use atproto::{ - Did, - Uri, -}; - -pub struct User { - userdid: Did, - handle: Handle, -} - -struct Participant { - participantdid: Did, - role: Role, -} - diff --git a/db/src/interfaces/mod.rs b/db/src/interfaces/mod.rs new file mode 100644 index 0000000..d13ba36 --- /dev/null +++ b/db/src/interfaces/mod.rs @@ -0,0 +1 @@ +pub mod spoor; diff --git a/db/src/interfaces/spoor.rs b/db/src/interfaces/spoor.rs new file mode 100644 index 0000000..8f97b57 --- /dev/null +++ b/db/src/interfaces/spoor.rs @@ -0,0 +1,137 @@ +use crate::DbError; +use atproto::{ + Cid, + Datetime, + Did, + StrongRef, + Tid, + Handle, +}; +use sqlx::{ + PgTransaction, + PgPool, + query, +}; + +pub struct Activity { + pub authority: User, + pub key: Tid, + + pub cid: Option, + pub session: Option>, + pub progress: Option, + pub performed_at: Option, + pub created_at: Option, +} + +pub struct Session { + pub authority: User, + pub key: Tid, + + pub cid: Option, + pub content: Option>, + pub label: Option, + pub created_at: Option, + pub other_participants: Option>, +} + +pub struct User { + pub did: Did, + pub handle: Option, +} + +#[non_exhaustive] +pub enum Participant { + Owner(User), + Added(User), +} + +#[non_exhaustive] +pub enum Content { + UnknownContent +} + +#[non_exhaustive] +pub enum Progress { + UnknownProgress +} + +pub async fn ingest_session(db: PgPool, session: Session) -> Result<(), DbError> { + let mut transaction = db.begin().await?; + write_session(&mut transaction, session).await?; + transaction.commit().await.map_err(DbError::Backend) +} + +async fn write_session(tr: &mut PgTransaction<'_>, session: Session) -> Result<(), DbError> { + query!(r#" + INSERT INTO sessions(sessionuri, label, created_at, contenturi, contentcid) + "#).execute(&mut tr).await?; + Ok(()) +} + +// pub fn injest_activity(db: PgPool, activity: Activity) -> Result<(), Error> { +// todo!(); +// } +// +// let mut transaction = db.begin().await?; +// +// query!(r#" +// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) +// "#, +// session.sessionuri, session.label +// ).execute(&mut *transaction).await?; +// +// for participant in session.participants { +// query!(r#" +// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) +// "#, +// session.sessionuri, participant.userdid, participant.role.to_string() +// ).execute(&mut *transaction).await?; +// } +// +// transaction.commit().await +// } +// +// pub async fn ingest_participant(db: PgPool, participant: Participant) -> Result<(), Error> { +// query!(r#" +// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) +// "#, +// session.sessionuri, session.label +// ).execute(&mut *transaction).await?; +// +// for participant in session.participants { +// query!(r#" +// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) +// "#, +// session.sessionuri, participant.userdid, participant.role.to_string() +// ).execute(&mut *transaction).await?; +// } +// +// transaction.commit().await +// } +// + + // pub async fn add_user(&self, user: &User) -> Result<()> { + // query!(r#" + // INSERT INTO users(userdid, handle) VALUES ($1, $2) + // "#, + // user.userdid, user.handle + // ).execute(self.pool).await?; + // Ok(()) + // } + // + // pub async fn add_session(&self, session: &Session) -> Result<()> { + // } + // + // pub async fn add_participant(&self, session: Session, + // participant: Participant) -> Result { + // query!(r#" + // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) + // "#, + // session.sessionuri, participant.userdid, participant.role.to_string() + // ).execute(self.pool).await?; + // + // session.participants.push(participant); + // + // Ok(session) + // } diff --git a/db/src/lib.rs b/db/src/lib.rs index 82e9c13..30652a5 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -1 +1,14 @@ -pub struct db; +use thiserror::Error; + +pub mod interfaces; + +#[non_exhaustive] +#[derive(Debug, Error)] +pub enum DbError { + #[error("Database Implementation Error: {0}")] + Backend(#[from] sqlx::Error), +} + + + +// pub struct db; diff --git a/ingestor/Cargo.toml b/ingestor/Cargo.toml index a177581..7da302c 100644 --- a/ingestor/Cargo.toml +++ b/ingestor/Cargo.toml @@ -5,7 +5,7 @@ edition = "2024" [dependencies] anyhow = "1.0.98" -async-trait = "0.1.88" +async-trait.workspace = true atproto.workspace = true rocketman = "0.2.0" serde.workspace = true