From aadea9757ba3caa228f1b48c63c6a91d4f5e287e Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Wed, 11 Jun 2025 17:29:59 -0700 Subject: [PATCH] Db, first commit of work This also touches atproto/src/lib.rs which may be a problem for the rebase~ Adds migrations and all the other stuff. --- Cargo.lock | 2 + atproto/src/lib.rs | 22 ++- db/Cargo.toml | 2 + .../20250612223204_initial_schema.sql | 45 +++++ db/src/connection.rs | 64 +------ db/src/error.rs | 8 + db/src/interfaces.rs | 15 -- db/src/interfaces/mod.rs | 1 + db/src/interfaces/spoor.rs | 178 ++++++++++++++++++ db/src/lib.rs | 8 +- flake.lock | 6 +- flake.nix | 1 + 12 files changed, 270 insertions(+), 82 deletions(-) create mode 100644 db/migrations/20250612223204_initial_schema.sql create mode 100644 db/src/error.rs delete mode 100644 db/src/interfaces.rs create mode 100644 db/src/interfaces/mod.rs create mode 100644 db/src/interfaces/spoor.rs diff --git a/Cargo.lock b/Cargo.lock index 00ea69c..89997b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -564,7 +564,9 @@ name = "db" version = "0.1.0" dependencies = [ "async-trait", + "atproto", "sqlx", + "thiserror 2.0.12", "tokio", ] diff --git a/atproto/src/lib.rs b/atproto/src/lib.rs index 21f8919..a327896 100644 --- a/atproto/src/lib.rs +++ b/atproto/src/lib.rs @@ -1,17 +1,37 @@ use lazy_regex::regex_captures; use core::str::FromStr; +use std::fmt::{ + Display, Formatter, Result as FmtResult +}; pub use atrium_api::types::{ Collection, string::{ + AtIdentifier as Authority, + Datetime, + Did, Nsid, RecordKey, - AtIdentifier as Authority, + Tid, + Handle, } }; pub mod lexicons; +pub struct Cid(String); + +impl Display for Cid { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", self.0) + } +} + +pub struct StrongRef { + pub content: T, + pub cid: Cid, +} + pub struct Uri { whole: String, // These fields could be useful in the future, diff --git a/db/Cargo.toml b/db/Cargo.toml index d555eb9..dd1d0b9 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" edition = "2024" [dependencies] +thiserror = "2.0.12" +atproto.workspace = true async-trait.workspace = true sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] } tokio.workspace = true diff --git a/db/migrations/20250612223204_initial_schema.sql b/db/migrations/20250612223204_initial_schema.sql new file mode 100644 index 0000000..f52b19b --- /dev/null +++ b/db/migrations/20250612223204_initial_schema.sql @@ -0,0 +1,45 @@ +-- Add migration script here + +CREATE EXTENSION IF NOT EXISTS pg_trgm; + +CREATE TABLE actor ( + did VARCHAR PRIMARY KEY, + handle VARCHAR UNIQUE, + indexed_at VARCHAR NOT NULL +); +CREATE INDEX actor_handle_trgm_idx ON actor USING gist (handle gist_trgm_ops); + +CREATE TABLE session ( + uri VARCHAR PRIMARY KEY, + cid VARCHAR NOT NULL, + owner VARCHAR NOT NULL, + + content VARCHAR NOT NULL, + contentcid VARCHAR NOT NULL, + label VARCHAR, + -- Participants in participant + + created_at VARCHAR NOT NULL, + indexed_at VARCHAR NOT NULL, + sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL +); + +CREATE TABLE activity ( + uri VARCHAR PRIMARY KEY, + cid VARCHAR NOT NULL, + + session VARCHAR, + sessioncid VARCHAR, + -- Progress in progress + + performed_at VARCHAR, + created_at VARCHAR NOT NULL, + indexed_at VARCHAR NOT NULL, + sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL +); + +CREATE TABLE participant ( + participantdid VARCHAR NOT NULL, + session VARCHAR NOT NULL, + role VARCHAR NOT NULL +); diff --git a/db/src/connection.rs b/db/src/connection.rs index ba883cc..71e9c58 100644 --- a/db/src/connection.rs +++ b/db/src/connection.rs @@ -1,19 +1,7 @@ -use sqlx::{ - query, - Database, - Pool, - Postgres, - pool::PoolOptions, - postgres::{ - PgConnectOptions, - PgSslMode, - }, - Result, -}; use std::string::ToString; -pub struct Db { - pool: Pool +pub struct Db { + pool: Pool } #[non_exhaustive] @@ -39,13 +27,6 @@ pub struct Session { impl Db { async fn connect() -> Result { - let conn = PgConnectOptions::new() - .host("localhost") - .port(5432) - .username("postgres") - .password("062217") - .database("anisky") - .ssl_mode(PgSslMode::Disable); let pool = match PoolOptions::new().connect_with(conn).await { Ok(p) => p, @@ -55,45 +36,4 @@ impl Db { Ok(Db { pool }) } // - // pub async fn add_user(&self, user: &User) -> Result<()> { - // query!(r#" - // INSERT INTO users(userdid, handle) VALUES ($1, $2) - // "#, - // user.userdid, user.handle - // ).execute(self.pool).await?; - // Ok(()) - // } - // - // pub async fn add_session(&self, session: &Session) -> Result<()> { - // let mut transaction = self.pool.begin().await?; - // - // query!(r#" - // INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) - // "#, - // session.sessionuri, session.label - // ).execute(&mut *transaction).await?; - // - // for participant in session.participants { - // query!(r#" - // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) - // "#, - // session.sessionuri, participant.userdid, participant.role.to_string() - // ).execute(&mut *transaction).await?; - // } - // - // transaction.commit().await - // } - // - // pub async fn add_participant(&self, session: Session, - // participant: Participant) -> Result { - // query!(r#" - // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) - // "#, - // session.sessionuri, participant.userdid, participant.role.to_string() - // ).execute(self.pool).await?; - // - // session.participants.push(participant); - // - // Ok(session) - // } } diff --git a/db/src/error.rs b/db/src/error.rs new file mode 100644 index 0000000..ed1b585 --- /dev/null +++ b/db/src/error.rs @@ -0,0 +1,8 @@ +#[non_exhaustive] +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Database Implementation Error: {0}")] + Backend(#[from] sqlx::Error), +} + +pub type Result = std::result::Result; diff --git a/db/src/interfaces.rs b/db/src/interfaces.rs deleted file mode 100644 index d3629e1..0000000 --- a/db/src/interfaces.rs +++ /dev/null @@ -1,15 +0,0 @@ -use atproto::{ - Did, - Uri, -}; - -pub struct User { - userdid: Did, - handle: Handle, -} - -struct Participant { - participantdid: Did, - role: Role, -} - diff --git a/db/src/interfaces/mod.rs b/db/src/interfaces/mod.rs new file mode 100644 index 0000000..d13ba36 --- /dev/null +++ b/db/src/interfaces/mod.rs @@ -0,0 +1 @@ +pub mod spoor; diff --git a/db/src/interfaces/spoor.rs b/db/src/interfaces/spoor.rs new file mode 100644 index 0000000..4a27dcb --- /dev/null +++ b/db/src/interfaces/spoor.rs @@ -0,0 +1,178 @@ +use crate::Error; +use atproto::{ + Cid, + Uri, + Datetime, + Did, + StrongRef, + Handle, +}; +use sqlx::{ + PgTransaction, + PgPool, + query, +}; + +pub struct Activity { + pub authority: Uri, + + pub cid: Option, + pub session: Option>, + pub progress: Option, + pub performed_at: Option, + pub created_at: Option, +} + +pub struct Session { + pub uri: Uri, + + pub cid: Option, + pub content: Option>, + pub label: Option, + pub created_at: Option, + pub other_participants: Option>, +} + +pub struct User { + pub did: Did, + pub handle: Option, +} + +#[non_exhaustive] +pub enum Participant { + Owner(User), + Added(User), +} + +#[non_exhaustive] +pub enum Content { + UnknownContent +} + +#[non_exhaustive] +pub enum Progress { + UnknownProgress +} + +pub async fn ingest_session( + db: PgPool, session: Session +) -> Result<(), Error> { + let mut transaction = db.begin().await?; + write_session(&mut transaction, session).await?; + transaction.commit().await.map_err(Error::Backend) +} + +async fn write_session( + tr: &mut PgTransaction<'_>, session: Session +) -> Result<(), Error> { + + let (contenturi, contentcid): (Option, String) = + match session.content { + Some(sr) => (Some(sr.content), sr.cid.to_string()), + None => (None, "".to_string()), + }; + + query!(r#" + INSERT INTO sessions( + sessionuri, sessioncid, label, created_at, contenturi, contentcid + ) VALUES ($1, $2, $3, $4, $5, $6) + "#, + session.uri, + session.cid.map_or("", |cid| cid.to_string()), + session.label.unwrap_or(""), + session.created_at.map_or("", |dt| (*dt.as_str()).to_string()), + contenturi, + contentcid, + ).execute(&mut tr).await?; + + session.other_participants.and_then(|participants| { + for participant in participants { + write_participant(tr, participant, session.uri).await? + } + }).unwrap_or(Ok(())) +} + +async fn write_participant( + tr: &mut PgTransaction<'_>, participant: Participant, sessionuri: Uri +) -> Result<(), Error> { + let (participantType, user): (String, User) = match participant { + Participant::Owner(user) => ("Owner".to_string(), user), + Participant::Added(user) => ("Participant".to_string(), user), + }; + + query!(r#" + INSERT INTO participants( + sessionuri, userdid, role + ) VALUES ($1, $2, $3) + "#, sessionuri.to_string(), user.did.to_string, participantType + ).execute(&mut tr).await?; + + Ok(()) +} + +// pub fn injest_activity(db: PgPool, activity: Activity) -> Result<(), Error> { +// todo!(); +// } +// +// let mut transaction = db.begin().await?; +// +// query!(r#" +// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) +// "#, +// session.sessionuri, session.label +// ).execute(&mut *transaction).await?; +// +// for participant in session.participants { +// query!(r#" +// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) +// "#, +// session.sessionuri, participant.userdid, participant.role.to_string() +// ).execute(&mut *transaction).await?; +// } +// +// transaction.commit().await +// } +// +// pub async fn ingest_participant(db: PgPool, participant: Participant) -> Result<(), Error> { +// query!(r#" +// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) +// "#, +// session.sessionuri, session.label +// ).execute(&mut *transaction).await?; +// +// for participant in session.participants { +// query!(r#" +// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) +// "#, +// session.sessionuri, participant.userdid, participant.role.to_string() +// ).execute(&mut *transaction).await?; +// } +// +// transaction.commit().await +// } +// + + // pub async fn add_user(&self, user: &User) -> Result<()> { + // query!(r#" + // INSERT INTO users(userdid, handle) VALUES ($1, $2) + // "#, + // user.userdid, user.handle + // ).execute(self.pool).await?; + // Ok(()) + // } + // + // pub async fn add_session(&self, session: &Session) -> Result<()> { + // } + // + // pub async fn add_participant(&self, session: Session, + // participant: Participant) -> Result { + // query!(r#" + // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) + // "#, + // session.sessionuri, participant.userdid, participant.role.to_string() + // ).execute(self.pool).await?; + // + // session.participants.push(participant); + // + // Ok(session) + // } diff --git a/db/src/lib.rs b/db/src/lib.rs index 82e9c13..6683e08 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -1 +1,7 @@ -pub struct db; +pub mod interfaces; +pub mod error; + +pub use crate::error::Error; + + +// pub struct db; diff --git a/flake.lock b/flake.lock index 8f3aa7b..69e701e 100644 --- a/flake.lock +++ b/flake.lock @@ -41,11 +41,11 @@ "nixpkgs": "nixpkgs_2" }, "locked": { - "lastModified": 1746585402, - "narHash": "sha256-Pf+ufu6bYNA1+KQKHnGMNEfTwpD9ZIcAeLoE2yPWIP0=", + "lastModified": 1749695868, + "narHash": "sha256-debjTLOyqqsYOUuUGQsAHskFXH5+Kx2t3dOo/FCoNRA=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "72dd969389583664f87aa348b3458f2813693617", + "rev": "55f914d5228b5c8120e9e0f9698ed5b7214d09cd", "type": "github" }, "original": { diff --git a/flake.nix b/flake.nix index 943945b..48d94bf 100644 --- a/flake.nix +++ b/flake.nix @@ -42,6 +42,7 @@ packages = (with pkgs; [ # The package provided by our custom overlay. Includes cargo, Clippy, cargo-fmt, # rustdoc, rustfmt, and other tools. + sqlx-cli rustToolchain ]) ++ pkgs.lib.optionals pkgs.stdenv.isDarwin (with pkgs; [ libiconv ]); };