From 62f32da92745f49b60e16b03bf3bdb2fe2a3298f Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Wed, 11 Jun 2025 14:09:02 -0700 Subject: [PATCH 01/10] Async-trait in workspace, tokio in db --- Cargo.lock | 1 + Cargo.toml | 1 + db/Cargo.toml | 3 ++- ingestor/Cargo.toml | 2 +- 4 files changed, 5 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dbe12ef..00ea69c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -563,6 +563,7 @@ dependencies = [ name = "db" version = "0.1.0" dependencies = [ + "async-trait", "sqlx", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 908c860..9129ba7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ resolver = "3" members = [ "api", "atproto","db", "ingestor"] [workspace.dependencies] +async-trait = "0.1.88" atproto = { path = "./atproto" } serde = "1.0.219" serde_json = "1.0.140" diff --git a/db/Cargo.toml b/db/Cargo.toml index ded38de..d555eb9 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -4,5 +4,6 @@ version = "0.1.0" edition = "2024" [dependencies] +async-trait.workspace = true sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] } -tokio = "1.45.0" +tokio.workspace = true diff --git a/ingestor/Cargo.toml b/ingestor/Cargo.toml index a177581..7da302c 100644 --- a/ingestor/Cargo.toml +++ b/ingestor/Cargo.toml @@ -5,7 +5,7 @@ edition = "2024" [dependencies] anyhow = "1.0.98" -async-trait = "0.1.88" +async-trait.workspace = true atproto.workspace = true rocketman = "0.2.0" serde.workspace = true From 9d8fb730ba10a801a3ddd632ea5993f1eb473119 Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Wed, 11 Jun 2025 17:29:59 -0700 Subject: [PATCH 02/10] Some work on interfaces I'm not happy. It kinda sucks right now. I hope I can fix it~~~~~ But for now I must move to my laptop --- Cargo.lock | 2 + atproto/src/lib.rs | 22 ++++- db/Cargo.toml | 2 + db/src/connection.rs | 45 +--------- db/src/interfaces.rs | 15 ---- db/src/interfaces/mod.rs | 1 + db/src/interfaces/spoor.rs | 178 +++++++++++++++++++++++++++++++++++++ db/src/lib.rs | 13 ++- 8 files changed, 218 insertions(+), 60 deletions(-) delete mode 100644 db/src/interfaces.rs create mode 100644 db/src/interfaces/mod.rs create mode 100644 db/src/interfaces/spoor.rs diff --git a/Cargo.lock b/Cargo.lock index 00ea69c..89997b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -564,7 +564,9 @@ name = "db" version = "0.1.0" dependencies = [ "async-trait", + "atproto", "sqlx", + "thiserror 2.0.12", "tokio", ] diff --git a/atproto/src/lib.rs b/atproto/src/lib.rs index 21f8919..a327896 100644 --- a/atproto/src/lib.rs +++ b/atproto/src/lib.rs @@ -1,17 +1,37 @@ use lazy_regex::regex_captures; use core::str::FromStr; +use std::fmt::{ + Display, Formatter, Result as FmtResult +}; pub use atrium_api::types::{ Collection, string::{ + AtIdentifier as Authority, + Datetime, + Did, Nsid, RecordKey, - AtIdentifier as Authority, + Tid, + Handle, } }; pub mod lexicons; +pub struct Cid(String); + +impl Display for Cid { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", self.0) + } +} + +pub struct StrongRef { + pub content: T, + pub cid: Cid, +} + pub struct Uri { whole: String, // These fields could be useful in the future, diff --git a/db/Cargo.toml b/db/Cargo.toml index d555eb9..dd1d0b9 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" edition = "2024" [dependencies] +thiserror = "2.0.12" +atproto.workspace = true async-trait.workspace = true sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] } tokio.workspace = true diff --git a/db/src/connection.rs b/db/src/connection.rs index ba883cc..1c66086 100644 --- a/db/src/connection.rs +++ b/db/src/connection.rs @@ -12,8 +12,8 @@ use sqlx::{ }; use std::string::ToString; -pub struct Db { - pool: Pool +pub struct Db { + pool: Pool } #[non_exhaustive] @@ -55,45 +55,4 @@ impl Db { Ok(Db { pool }) } // - // pub async fn add_user(&self, user: &User) -> Result<()> { - // query!(r#" - // INSERT INTO users(userdid, handle) VALUES ($1, $2) - // "#, - // user.userdid, user.handle - // ).execute(self.pool).await?; - // Ok(()) - // } - // - // pub async fn add_session(&self, session: &Session) -> Result<()> { - // let mut transaction = self.pool.begin().await?; - // - // query!(r#" - // INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) - // "#, - // session.sessionuri, session.label - // ).execute(&mut *transaction).await?; - // - // for participant in session.participants { - // query!(r#" - // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) - // "#, - // session.sessionuri, participant.userdid, participant.role.to_string() - // ).execute(&mut *transaction).await?; - // } - // - // transaction.commit().await - // } - // - // pub async fn add_participant(&self, session: Session, - // participant: Participant) -> Result { - // query!(r#" - // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) - // "#, - // session.sessionuri, participant.userdid, participant.role.to_string() - // ).execute(self.pool).await?; - // - // session.participants.push(participant); - // - // Ok(session) - // } } diff --git a/db/src/interfaces.rs b/db/src/interfaces.rs deleted file mode 100644 index d3629e1..0000000 --- a/db/src/interfaces.rs +++ /dev/null @@ -1,15 +0,0 @@ -use atproto::{ - Did, - Uri, -}; - -pub struct User { - userdid: Did, - handle: Handle, -} - -struct Participant { - participantdid: Did, - role: Role, -} - diff --git a/db/src/interfaces/mod.rs b/db/src/interfaces/mod.rs new file mode 100644 index 0000000..d13ba36 --- /dev/null +++ b/db/src/interfaces/mod.rs @@ -0,0 +1 @@ +pub mod spoor; diff --git a/db/src/interfaces/spoor.rs b/db/src/interfaces/spoor.rs new file mode 100644 index 0000000..f11386e --- /dev/null +++ b/db/src/interfaces/spoor.rs @@ -0,0 +1,178 @@ +use crate::DbError; +use atproto::{ + Cid, + Uri, + Datetime, + Did, + StrongRef, + Handle, +}; +use sqlx::{ + PgTransaction, + PgPool, + query, +}; + +pub struct Activity { + pub authority: Uri, + + pub cid: Option, + pub session: Option>, + pub progress: Option, + pub performed_at: Option, + pub created_at: Option, +} + +pub struct Session { + pub uri: Uri, + + pub cid: Option, + pub content: Option>, + pub label: Option, + pub created_at: Option, + pub other_participants: Option>, +} + +pub struct User { + pub did: Did, + pub handle: Option, +} + +#[non_exhaustive] +pub enum Participant { + Owner(User), + Added(User), +} + +#[non_exhaustive] +pub enum Content { + UnknownContent +} + +#[non_exhaustive] +pub enum Progress { + UnknownProgress +} + +pub async fn ingest_session( + db: PgPool, session: Session +) -> Result<(), DbError> { + let mut transaction = db.begin().await?; + write_session(&mut transaction, session).await?; + transaction.commit().await.map_err(DbError::Backend) +} + +async fn write_session( + tr: &mut PgTransaction<'_>, session: Session +) -> Result<(), DbError> { + + let (contenturi, contentcid): (Option, String) = + match session.content { + Some(sr) => (Some(sr.content), sr.cid.to_string()), + None => (None, "".to_string()), + }; + + query!(r#" + INSERT INTO sessions( + sessionuri, sessioncid, label, created_at, contenturi, contentcid + ) VALUES ($1, $2, $3, $4, $5, $6) + "#, + session.uri, + session.cid.map_or("", |cid| cid.to_string()), + session.label.unwrap_or(""), + session.created_at.map_or("", |dt| (*dt.as_str()).to_string()), + contenturi, + contentcid, + ).execute(&mut tr).await?; + + session.other_participants.and_then(|participants| { + for participant in participants { + write_participant(tr, participant, session.uri).await? + } + }).unwrap_or(Ok(())) +} + +async fn write_participant( + tr: &mut PgTransaction<'_>, participant: Participant, sessionuri: Uri +) -> Result<(), DbError> { + let (participantType, user): (String, User) = match participant { + Participant::Owner(user) => ("Owner".to_string(), user), + Participant::Added(user) => ("Participant".to_string(), user), + }; + + query!(r#" + INSERT INTO participants( + sessionuri, userdid, role + ) VALUES ($1, $2, $3) + "#, sessionuri.to_string(), user.did.to_string, participantType + ).execute(&mut tr).await?; + + Ok(()) +} + +// pub fn injest_activity(db: PgPool, activity: Activity) -> Result<(), Error> { +// todo!(); +// } +// +// let mut transaction = db.begin().await?; +// +// query!(r#" +// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) +// "#, +// session.sessionuri, session.label +// ).execute(&mut *transaction).await?; +// +// for participant in session.participants { +// query!(r#" +// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) +// "#, +// session.sessionuri, participant.userdid, participant.role.to_string() +// ).execute(&mut *transaction).await?; +// } +// +// transaction.commit().await +// } +// +// pub async fn ingest_participant(db: PgPool, participant: Participant) -> Result<(), Error> { +// query!(r#" +// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) +// "#, +// session.sessionuri, session.label +// ).execute(&mut *transaction).await?; +// +// for participant in session.participants { +// query!(r#" +// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) +// "#, +// session.sessionuri, participant.userdid, participant.role.to_string() +// ).execute(&mut *transaction).await?; +// } +// +// transaction.commit().await +// } +// + + // pub async fn add_user(&self, user: &User) -> Result<()> { + // query!(r#" + // INSERT INTO users(userdid, handle) VALUES ($1, $2) + // "#, + // user.userdid, user.handle + // ).execute(self.pool).await?; + // Ok(()) + // } + // + // pub async fn add_session(&self, session: &Session) -> Result<()> { + // } + // + // pub async fn add_participant(&self, session: Session, + // participant: Participant) -> Result { + // query!(r#" + // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) + // "#, + // session.sessionuri, participant.userdid, participant.role.to_string() + // ).execute(self.pool).await?; + // + // session.participants.push(participant); + // + // Ok(session) + // } diff --git a/db/src/lib.rs b/db/src/lib.rs index 82e9c13..c93205e 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -1 +1,12 @@ -pub struct db; +use thiserror::Error; + +pub mod interfaces; + +#[non_exhaustive] +#[derive(Debug, Error)] +pub enum DbError { + #[error("Database Implementation Error: {0}")] + Backend(#[from] sqlx::Error), +} + +// pub struct db; From 1abdb7f13355130a75249ad8340860bd4984d83b Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Fri, 13 Jun 2025 10:02:01 -0700 Subject: [PATCH 03/10] Db working & Migrations --- .../20250612223204_initial_schema.sql | 45 +++++++++++++++++++ db/src/connection.rs | 19 -------- db/src/error.rs | 8 ++++ db/src/interfaces/spoor.rs | 10 ++--- db/src/lib.rs | 11 ++--- flake.lock | 6 +-- flake.nix | 1 + 7 files changed, 65 insertions(+), 35 deletions(-) create mode 100644 db/migrations/20250612223204_initial_schema.sql create mode 100644 db/src/error.rs diff --git a/db/migrations/20250612223204_initial_schema.sql b/db/migrations/20250612223204_initial_schema.sql new file mode 100644 index 0000000..f52b19b --- /dev/null +++ b/db/migrations/20250612223204_initial_schema.sql @@ -0,0 +1,45 @@ +-- Add migration script here + +CREATE EXTENSION IF NOT EXISTS pg_trgm; + +CREATE TABLE actor ( + did VARCHAR PRIMARY KEY, + handle VARCHAR UNIQUE, + indexed_at VARCHAR NOT NULL +); +CREATE INDEX actor_handle_trgm_idx ON actor USING gist (handle gist_trgm_ops); + +CREATE TABLE session ( + uri VARCHAR PRIMARY KEY, + cid VARCHAR NOT NULL, + owner VARCHAR NOT NULL, + + content VARCHAR NOT NULL, + contentcid VARCHAR NOT NULL, + label VARCHAR, + -- Participants in participant + + created_at VARCHAR NOT NULL, + indexed_at VARCHAR NOT NULL, + sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL +); + +CREATE TABLE activity ( + uri VARCHAR PRIMARY KEY, + cid VARCHAR NOT NULL, + + session VARCHAR, + sessioncid VARCHAR, + -- Progress in progress + + performed_at VARCHAR, + created_at VARCHAR NOT NULL, + indexed_at VARCHAR NOT NULL, + sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL +); + +CREATE TABLE participant ( + participantdid VARCHAR NOT NULL, + session VARCHAR NOT NULL, + role VARCHAR NOT NULL +); diff --git a/db/src/connection.rs b/db/src/connection.rs index 1c66086..71e9c58 100644 --- a/db/src/connection.rs +++ b/db/src/connection.rs @@ -1,15 +1,3 @@ -use sqlx::{ - query, - Database, - Pool, - Postgres, - pool::PoolOptions, - postgres::{ - PgConnectOptions, - PgSslMode, - }, - Result, -}; use std::string::ToString; pub struct Db { @@ -39,13 +27,6 @@ pub struct Session { impl Db { async fn connect() -> Result { - let conn = PgConnectOptions::new() - .host("localhost") - .port(5432) - .username("postgres") - .password("062217") - .database("anisky") - .ssl_mode(PgSslMode::Disable); let pool = match PoolOptions::new().connect_with(conn).await { Ok(p) => p, diff --git a/db/src/error.rs b/db/src/error.rs new file mode 100644 index 0000000..ed1b585 --- /dev/null +++ b/db/src/error.rs @@ -0,0 +1,8 @@ +#[non_exhaustive] +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Database Implementation Error: {0}")] + Backend(#[from] sqlx::Error), +} + +pub type Result = std::result::Result; diff --git a/db/src/interfaces/spoor.rs b/db/src/interfaces/spoor.rs index f11386e..4a27dcb 100644 --- a/db/src/interfaces/spoor.rs +++ b/db/src/interfaces/spoor.rs @@ -1,4 +1,4 @@ -use crate::DbError; +use crate::Error; use atproto::{ Cid, Uri, @@ -56,15 +56,15 @@ pub enum Progress { pub async fn ingest_session( db: PgPool, session: Session -) -> Result<(), DbError> { +) -> Result<(), Error> { let mut transaction = db.begin().await?; write_session(&mut transaction, session).await?; - transaction.commit().await.map_err(DbError::Backend) + transaction.commit().await.map_err(Error::Backend) } async fn write_session( tr: &mut PgTransaction<'_>, session: Session -) -> Result<(), DbError> { +) -> Result<(), Error> { let (contenturi, contentcid): (Option, String) = match session.content { @@ -94,7 +94,7 @@ async fn write_session( async fn write_participant( tr: &mut PgTransaction<'_>, participant: Participant, sessionuri: Uri -) -> Result<(), DbError> { +) -> Result<(), Error> { let (participantType, user): (String, User) = match participant { Participant::Owner(user) => ("Owner".to_string(), user), Participant::Added(user) => ("Participant".to_string(), user), diff --git a/db/src/lib.rs b/db/src/lib.rs index c93205e..6683e08 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -1,12 +1,7 @@ -use thiserror::Error; - pub mod interfaces; +pub mod error; + +pub use crate::error::Error; -#[non_exhaustive] -#[derive(Debug, Error)] -pub enum DbError { - #[error("Database Implementation Error: {0}")] - Backend(#[from] sqlx::Error), -} // pub struct db; diff --git a/flake.lock b/flake.lock index 8f3aa7b..69e701e 100644 --- a/flake.lock +++ b/flake.lock @@ -41,11 +41,11 @@ "nixpkgs": "nixpkgs_2" }, "locked": { - "lastModified": 1746585402, - "narHash": "sha256-Pf+ufu6bYNA1+KQKHnGMNEfTwpD9ZIcAeLoE2yPWIP0=", + "lastModified": 1749695868, + "narHash": "sha256-debjTLOyqqsYOUuUGQsAHskFXH5+Kx2t3dOo/FCoNRA=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "72dd969389583664f87aa348b3458f2813693617", + "rev": "55f914d5228b5c8120e9e0f9698ed5b7214d09cd", "type": "github" }, "original": { diff --git a/flake.nix b/flake.nix index 943945b..48d94bf 100644 --- a/flake.nix +++ b/flake.nix @@ -42,6 +42,7 @@ packages = (with pkgs; [ # The package provided by our custom overlay. Includes cargo, Clippy, cargo-fmt, # rustdoc, rustfmt, and other tools. + sqlx-cli rustToolchain ]) ++ pkgs.lib.optionals pkgs.stdenv.isDarwin (with pkgs; [ libiconv ]); }; From 781a56028f0a00d3f80334fd73e08adbd87cc073 Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Mon, 16 Jun 2025 15:29:27 -0700 Subject: [PATCH 04/10] Atproto, types overhaul and error handling Breaks off from Atrium-rs's types because they are implemented inconsistently, which makes them harder to use. Additionally, I wanted sqlx support so I decided I'd need to reimplement them to some extent anyways. This was done with reference to the atproto documentation but specifically not the atrium-rs codebase so I wouldn't have to think about licenses. This adds the types and error module in atproto. It also touches Cargo.toml for some new dependencies and some shared dependencies. It required thiserror, so I looped that into the workspace meaning that this commit touches db. some things to keep in mind: - There is no CID parsing - None of this is tested, nor are there any tests written. We're playing fast and loose baby~ --- Cargo.toml | 1 + atproto/Cargo.toml | 2 + atproto/src/error.rs | 31 +++++++++++++ atproto/src/lib.rs | 81 +-------------------------------- atproto/src/types.rs | 70 ++++++++++++++++++++++++++++ atproto/src/types/authority.rs | 38 ++++++++++++++++ atproto/src/types/cid.rs | 16 +++++++ atproto/src/types/datetime.rs | 63 +++++++++++++++++++++++++ atproto/src/types/did.rs | 72 +++++++++++++++++++++++++++++ atproto/src/types/record_key.rs | 63 +++++++++++++++++++++++++ atproto/src/types/uri.rs | 79 ++++++++++++++++++++++++++++++++ db/Cargo.toml | 4 +- 12 files changed, 439 insertions(+), 81 deletions(-) create mode 100644 atproto/src/error.rs create mode 100644 atproto/src/types.rs create mode 100644 atproto/src/types/authority.rs create mode 100644 atproto/src/types/cid.rs create mode 100644 atproto/src/types/datetime.rs create mode 100644 atproto/src/types/did.rs create mode 100644 atproto/src/types/record_key.rs create mode 100644 atproto/src/types/uri.rs diff --git a/Cargo.toml b/Cargo.toml index 9129ba7..779f393 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ async-trait = "0.1.88" atproto = { path = "./atproto" } serde = "1.0.219" serde_json = "1.0.140" +thiserror = "2.0.12" tokio = { version = "1.45.0", features = ["macros", "rt-multi-thread"] } tracing = "0.1.41" tracing-subscriber = "0.3.19" diff --git a/atproto/Cargo.toml b/atproto/Cargo.toml index 7d56725..033ff0d 100644 --- a/atproto/Cargo.toml +++ b/atproto/Cargo.toml @@ -8,5 +8,7 @@ atrium-api = { version = "0.25.3", default-features = false } lazy-regex = "3.4.1" serde.workspace = true serde_json.workspace = true +time = { version = "0.3.41", features = ["parsing", "formatting"] } tracing-subscriber.workspace = true tracing.workspace = true +thiserror.workspace = true diff --git a/atproto/src/error.rs b/atproto/src/error.rs new file mode 100644 index 0000000..1db7106 --- /dev/null +++ b/atproto/src/error.rs @@ -0,0 +1,31 @@ +use thiserror::Error as ThisError; + +#[non_exhaustive] +#[derive(Debug, ThisError)] +pub enum Error { + #[error("Error while parsing")] + Parse { err: ParseError, object: String }, + #[error("Error while formatting")] + Format { err: FormatError, object: String }, +} + +#[non_exhaustive] +#[derive(Debug, ThisError)] +pub enum FormatError { + #[error("Time Parse Error: {0}")] + Datetime(#[from] time::error::Format), +} +#[non_exhaustive] +#[derive(Debug, ThisError)] +pub enum ParseError { + #[error("Time Parse Error: {0}")] + Datetime(#[from] time::error::Parse), + #[error("Length of parsed object too long, max: {max:?}, got: {got:?}.")] + Length { max: usize, got: usize }, + #[error("Currently Did is enforced, cannot use handle, {handle:?}")] + ForceDid { handle: String }, + #[error("Incorrectly formatted")] + Format, +} + +pub type Result = std::result::Result; diff --git a/atproto/src/lib.rs b/atproto/src/lib.rs index a327896..c3b56ee 100644 --- a/atproto/src/lib.rs +++ b/atproto/src/lib.rs @@ -1,80 +1,3 @@ -use lazy_regex::regex_captures; -use core::str::FromStr; -use std::fmt::{ - Display, Formatter, Result as FmtResult -}; - -pub use atrium_api::types::{ - Collection, - string::{ - AtIdentifier as Authority, - Datetime, - Did, - Nsid, - RecordKey, - Tid, - Handle, - } -}; - pub mod lexicons; - -pub struct Cid(String); - -impl Display for Cid { - fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - write!(f, "{}", self.0) - } -} - -pub struct StrongRef { - pub content: T, - pub cid: Cid, -} - -pub struct Uri { - whole: String, - // These fields could be useful in the future, - // so I'm leaving the code for them. - // authority: Authority, - // collection: Option, - // rkey: Option, -} - -impl FromStr for Uri { - type Err = &'static str; - fn from_str(uri: &str) -> Result { - if uri.len() > 8000 { - return Err("Uri too long") - } - - let Some(( - whole, unchecked_authority, unchecked_collection, unchecked_rkey - )) = regex_captures!( - r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i", - uri, - ) else { - return Err("Invalid Uri"); - }; - - // This parsing is required, but the values don't need to be used yet. - // No compute cost to use them, just storage cost - let _authority = Authority::from_str(unchecked_authority)?; - - let _collection = if unchecked_collection.is_empty() { None } - else { Some(Nsid::new(unchecked_collection.to_string())?) }; - - let _rkey = if unchecked_rkey.is_empty() { None } - else { Some(RecordKey::new(unchecked_rkey.to_string())?) }; - - // Ok(Uri{ whole: whole.to_string(), authority, collection, rkey }) - Ok(Uri { whole: whole.to_string() }) - } -} - -impl Uri { - pub fn as_str(&self) -> &str { - self.whole.as_str() - } -} - +pub mod types; +pub mod error; diff --git a/atproto/src/types.rs b/atproto/src/types.rs new file mode 100644 index 0000000..ca30ded --- /dev/null +++ b/atproto/src/types.rs @@ -0,0 +1,70 @@ +use crate::error::{Error, ParseError}; + +macro_rules! basic_string_type { + ($name:ident, $regex:literal, $max_len:literal) => { + pub struct $name { value: String, } + + impl std::fmt::Display for $name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.value) + } + } + + impl std::str::FromStr for $name { + type Err = Error; + fn from_str(s: &str) -> Result { + if s.len() > $max_len { + return Err(Error::Parse { + err: ParseError::Length { max: s.len(), got: $max_len }, + object: stringify!($name).to_string(), + }); + } + + if ! lazy_regex::regex_is_match!( + $regex, + s + ) { + return Err(Error::Parse { + err: ParseError::Format, + object: stringify!($name).to_string(), + }); + } + + Ok(Self { + value: s.to_string(), + }) + } + } + } +} + +mod did; +pub use did::Did; +mod cid; +pub use cid::Cid; +mod authority; +pub use authority::Authority; +mod datetime; +pub use datetime::Datetime; +mod record_key; +pub use record_key::RecordKey; +mod uri; +pub use uri::Uri; + +basic_string_type!(Handle, + r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$", + 253 +); +basic_string_type!(Nsid, + r"^[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+(\.[a-zA-Z]([a-zA-Z0-9]{0,62})?)$", + 317 +); +basic_string_type!(Tid, + r"^[234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{12}$", + 13 +); + +pub struct StrongRef { + content: T, + cid: Cid, +} diff --git a/atproto/src/types/authority.rs b/atproto/src/types/authority.rs new file mode 100644 index 0000000..1c76750 --- /dev/null +++ b/atproto/src/types/authority.rs @@ -0,0 +1,38 @@ +use crate::{ + types::{Did, Handle}, + error::{Error, ParseError}, +}; +use std::{ + fmt::{Display, Formatter, Result as FmtResult}, + str::FromStr, +}; + +pub enum Authority { + Did(Did), + Handle(Handle), +} + +impl Display for Authority { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", match self { + Authority::Did(did) => did.to_string(), + Authority::Handle(handle) => handle.to_string(), + }) + } +} + +impl FromStr for Authority { + type Err = Error; + fn from_str(s: &str) -> Result { + if let Ok(did) = s.parse::() { + return Ok(Authority::Did(did)); + } + if let Ok(did) = s.parse::() { + return Ok(Authority::Handle(did)); + } + Err(Error::Parse { + err: ParseError::Format, + object: "Authority".to_string(), + }) + } +} diff --git a/atproto/src/types/cid.rs b/atproto/src/types/cid.rs new file mode 100644 index 0000000..3581ad2 --- /dev/null +++ b/atproto/src/types/cid.rs @@ -0,0 +1,16 @@ +use crate::error::Error; + +pub struct Cid { value: String, } + +impl std::fmt::Display for Cid { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.value) + } +} + +impl std::str::FromStr for Cid { + type Err = Error; + fn from_str(s: &str) -> Result { + Ok(Self { value: s.to_string() }) + } +} diff --git a/atproto/src/types/datetime.rs b/atproto/src/types/datetime.rs new file mode 100644 index 0000000..6dcf234 --- /dev/null +++ b/atproto/src/types/datetime.rs @@ -0,0 +1,63 @@ +use crate::error::{Error, ParseError, FormatError}; +use time::{ + UtcDateTime, + format_description::well_known::{Rfc3339, Iso8601}, +}; +use std::{ + fmt::{Display, Formatter, Result as FmtResult}, + str::FromStr, +}; +pub use time::{Date, Time}; + +pub struct Datetime { + time: UtcDateTime, + derived_string: String, +} + +impl Datetime { + pub fn now() -> Result { + Datetime::from_utc(UtcDateTime::now()) + } + + pub fn new(date: Date, time: Time) -> Result { + Datetime::from_utc(UtcDateTime::new(date, time)) + } + + fn from_utc(utc: UtcDateTime) -> Result { + Ok(Datetime { + time: utc, + derived_string: match utc.format(&Rfc3339) { + Ok(ds) => ds, + Err(e) => return Err(Error::Format { + err: FormatError::Datetime(e), + object: "Datetime".to_string(), + }), + }, + }) + } +} + +impl Display for Datetime { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", self.derived_string) + } +} + +impl FromStr for Datetime { + type Err = Error; + + fn from_str(s: &str) -> Result { + // Parse as both Rfc3339 and Iso8601 to ensure intersection + let dt_rfc3339 = UtcDateTime::parse(s, &Rfc3339); + let dt_iso8601 = UtcDateTime::parse(s, &Iso8601::DEFAULT); + + let datetime = dt_iso8601 + .and(dt_rfc3339) + .map_err(|e| Error::Parse { + err: ParseError::Datetime(e), + object: "Datetime".to_string(), + })?; + + Ok(Datetime { time: datetime, derived_string: s.to_string() }) + } +} diff --git a/atproto/src/types/did.rs b/atproto/src/types/did.rs new file mode 100644 index 0000000..6c91db2 --- /dev/null +++ b/atproto/src/types/did.rs @@ -0,0 +1,72 @@ +use crate::error::{Error, ParseError}; +use std::{ + fmt::{Display, Formatter, Result as FmtResult}, + str::FromStr, +}; +use lazy_regex::regex_captures; + +enum DidMethod { + Web, + Plc, +} +impl Display for DidMethod { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", match self { + DidMethod::Web => String::from("web"), + DidMethod::Plc => String::from("plc"), + }) + } +} +impl FromStr for DidMethod { + type Err = Error; + fn from_str(s: &str) -> Result { + match s { + "web" => Ok(DidMethod::Web), + "plc" => Ok(DidMethod::Plc), + _ => Err(Error::Parse { + err: ParseError::Format, + object: "DidMethod".to_string(), + }), + } + } +} + +pub struct Did { + method: DidMethod, + identifier: String, +} + +impl Display for Did { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "did:{}:{}", self.method, self.identifier) + } +} + +impl FromStr for Did { + type Err = Error; + fn from_str(s: &str) -> Result { + if s.len() > 2048 { + return Err(Error::Parse { + err: ParseError::Length { max: s.len(), got: 2048 }, + object: "Did".to_string(), + }); + } + + let Some(( + _whole, unchecked_method, identifier + )) = regex_captures!( + r"^did:([a-z]+):([a-zA-Z0-9._:%-]*[a-zA-Z0-9._-])$", + s, + ) else { + return Err(Error::Parse { + err: ParseError::Format, + object: "Did".to_string(), + }); + }; + + Ok(Self { + method: unchecked_method.parse::()?, + identifier: identifier.to_string(), + }) + } +} diff --git a/atproto/src/types/record_key.rs b/atproto/src/types/record_key.rs new file mode 100644 index 0000000..7368b53 --- /dev/null +++ b/atproto/src/types/record_key.rs @@ -0,0 +1,63 @@ +use crate::{ + types::{Nsid, Tid}, + error::{Error, ParseError}, +}; +use std::{ + fmt::{Display, Formatter, Result as FmtResult}, + str::FromStr, +}; +use lazy_regex::regex_is_match; + +pub enum RecordKey { + Tid(Tid), + Nsid(Nsid), + Literal(String), + Any(String), +} +impl Display for RecordKey { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", match self { + RecordKey::Tid(tid) => tid.to_string(), + RecordKey::Nsid(nsid) => nsid.to_string(), + RecordKey::Literal(literal) => literal.to_string(), + RecordKey::Any(any) => any.to_string(), + }) + } +} +impl FromStr for RecordKey { + type Err = Error; + fn from_str(s: &str) -> Result { + if s.len() > 512 { + return Err(Error::Parse { + err: ParseError::Length { max: 512, got: s.len() }, + object: "RecordKey".to_string(), + }); + } + + if !( + regex_is_match!( + r"^[a-zA-Z0-9`.-_:~]+$", + s, + ) + && s != "." + && s != ".." + ) { + return Err(Error::Parse { + err: ParseError::Format, + object: "RecordKey".to_string(), + }); + } + + // Valid record key, now decide type + if s.starts_with("literal:") { + return Ok(RecordKey::Literal(s.to_string())); + } + if let Ok(tid) = s.parse::() { + return Ok(RecordKey::Tid(tid)); + } + if let Ok(nsid) = s.parse::() { + return Ok(RecordKey::Nsid(nsid)); + } + Ok(RecordKey::Any(s.to_string())) + } +} diff --git a/atproto/src/types/uri.rs b/atproto/src/types/uri.rs new file mode 100644 index 0000000..843ee2b --- /dev/null +++ b/atproto/src/types/uri.rs @@ -0,0 +1,79 @@ +use crate::{ + types::{Did, Authority, Nsid, RecordKey}, + error::{Error, ParseError}, +}; +use std::{ + fmt::{Display, Formatter, Result as FmtResult}, + str::FromStr, +}; +use lazy_regex::regex_captures; + +pub struct Uri { + authority: Did, + collection: Option, + rkey: Option, +} + +impl Display for Uri { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "at://{}", self.authority)?; + + if let Some(collection) = &self.collection { + write!(f, "/{}", collection)?; + + if let Some(rkey) = &self.rkey { + write!(f, "/{}", rkey)?; + } + } + + Ok(()) + } +} + +impl FromStr for Uri { + type Err = Error; + fn from_str(s: &str) -> Result { + if s.len() > 8000 { + return Err(Error::Parse { + err: ParseError::Length { max: 8000, got: s.len() }, + object: "Did".to_string(), + }); + } + + let Some(( + _whole, unchecked_authority, unchecked_collection, unchecked_rkey + )) = regex_captures!( + r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i", + s, + ) else { + return Err(Error::Parse { + err: ParseError::Format, + object: "Uri".to_string(), + }); + }; + + let did = match Authority::from_str(unchecked_authority)? { + Authority::Handle(h) => + return Err(Error::Parse { + err: ParseError::ForceDid { handle: h.to_string() }, + object: "Uri".to_string(), + }), + Authority::Did(d) => d, + }; + + let collection = if unchecked_collection.is_empty() { None } + else { Some(unchecked_collection.parse::()?) }; + + let rkey = if unchecked_rkey.is_empty() { None } + else { Some(unchecked_rkey.parse::()?) }; + + Ok(Uri { authority: did, collection, rkey }) + } +} + +impl Uri { + pub fn authority_as_did(&self) -> &Did { + &self.authority + } +} + diff --git a/db/Cargo.toml b/db/Cargo.toml index dd1d0b9..13b55ae 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -4,8 +4,8 @@ version = "0.1.0" edition = "2024" [dependencies] -thiserror = "2.0.12" atproto.workspace = true async-trait.workspace = true -sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] } +sqlx.workspace = true +thiserror.workspace = true tokio.workspace = true From 0573e1aa431b3048db15827e1f14e8dccda349c8 Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Mon, 16 Jun 2025 15:54:20 -0700 Subject: [PATCH 05/10] Atproto, add sqlx support --- Cargo.lock | 55 +++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + atproto/Cargo.toml | 5 +++++ atproto/src/lib.rs | 2 ++ atproto/src/sqlx.rs | 38 +++++++++++++++++++++++++++++++ db/Cargo.toml | 2 +- 6 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 atproto/src/sqlx.rs diff --git a/Cargo.lock b/Cargo.lock index 89997b8..6c97f48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,9 @@ dependencies = [ "lazy-regex", "serde", "serde_json", + "sqlx", + "thiserror 2.0.12", + "time", "tracing", "tracing-subscriber", ] @@ -581,6 +584,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +dependencies = [ + "powerfmt", +] + [[package]] name = "derive_builder" version = "0.20.2" @@ -1470,6 +1482,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.46" @@ -1625,6 +1643,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -2410,6 +2434,37 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.3.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" + +[[package]] +name = "time-macros" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinystr" version = "0.8.1" diff --git a/Cargo.toml b/Cargo.toml index 779f393..c029fb0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ async-trait = "0.1.88" atproto = { path = "./atproto" } serde = "1.0.219" serde_json = "1.0.140" +sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] } thiserror = "2.0.12" tokio = { version = "1.45.0", features = ["macros", "rt-multi-thread"] } tracing = "0.1.41" diff --git a/atproto/Cargo.toml b/atproto/Cargo.toml index 033ff0d..3f5445b 100644 --- a/atproto/Cargo.toml +++ b/atproto/Cargo.toml @@ -8,7 +8,12 @@ atrium-api = { version = "0.25.3", default-features = false } lazy-regex = "3.4.1" serde.workspace = true serde_json.workspace = true +sqlx = { workspace = true, optional = true } time = { version = "0.3.41", features = ["parsing", "formatting"] } tracing-subscriber.workspace = true tracing.workspace = true thiserror.workspace = true + +[features] +default = [] +sqlx-support = ["dep:sqlx"] diff --git a/atproto/src/lib.rs b/atproto/src/lib.rs index c3b56ee..53c8d32 100644 --- a/atproto/src/lib.rs +++ b/atproto/src/lib.rs @@ -1,3 +1,5 @@ pub mod lexicons; pub mod types; pub mod error; +#[cfg(feature = "sqlx-support")] +pub mod sqlx; diff --git a/atproto/src/sqlx.rs b/atproto/src/sqlx.rs new file mode 100644 index 0000000..0bb6683 --- /dev/null +++ b/atproto/src/sqlx.rs @@ -0,0 +1,38 @@ +use crate::types::{ + Did, + Cid, + Uri, + Handle, + Datetime, +}; + +macro_rules! implement_sqlx_for_string_type { + ($name:ident) => { + impl sqlx::Type for $name { + fn type_info() -> sqlx::postgres::PgTypeInfo { + >::type_info() + } + } + impl<'q> sqlx::Encode<'q, sqlx::Postgres> for $name { + fn encode_by_ref( + &self, buf: &mut sqlx::postgres::PgArgumentBuffer + ) -> Result { + >::encode_by_ref(&self.to_string(), buf) + } + } + impl<'r> sqlx::Decode<'r, sqlx::Postgres> for $name { + fn decode( + value: sqlx::postgres::PgValueRef<'r> + ) -> Result { + let s = >::decode(value)?; + s.parse::<$name>().map_err(|e| Box::new(e) as sqlx::error::BoxDynError) + } + } + } +} + +implement_sqlx_for_string_type!(Did); +implement_sqlx_for_string_type!(Cid); +implement_sqlx_for_string_type!(Uri); +implement_sqlx_for_string_type!(Handle); +implement_sqlx_for_string_type!(Datetime); diff --git a/db/Cargo.toml b/db/Cargo.toml index 13b55ae..a07bead 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -4,8 +4,8 @@ version = "0.1.0" edition = "2024" [dependencies] -atproto.workspace = true async-trait.workspace = true +atproto = { workspace = true, features = ["sqlx-support"] } sqlx.workspace = true thiserror.workspace = true tokio.workspace = true From 1d1f8ae60ff53d5e08351ba1cd576934ba97f060 Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Tue, 17 Jun 2025 13:54:55 -0700 Subject: [PATCH 06/10] Atproto, add getters to StrongRef --- atproto/src/types.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/atproto/src/types.rs b/atproto/src/types.rs index ca30ded..70241b8 100644 --- a/atproto/src/types.rs +++ b/atproto/src/types.rs @@ -68,3 +68,12 @@ pub struct StrongRef { content: T, cid: Cid, } + +impl StrongRef { + pub fn get_content(&self) -> &T { + &self.content + } + pub fn get_cid(&self) -> &Cid { + &self.cid + } +} From df1da0905d09368533033d8c3fe650c83f78764c Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Tue, 17 Jun 2025 13:55:30 -0700 Subject: [PATCH 07/10] Db, add atproto error support --- db/migrations/20250612223204_initial_schema.sql | 6 +++--- db/src/error.rs | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/db/migrations/20250612223204_initial_schema.sql b/db/migrations/20250612223204_initial_schema.sql index f52b19b..b95abd4 100644 --- a/db/migrations/20250612223204_initial_schema.sql +++ b/db/migrations/20250612223204_initial_schema.sql @@ -19,7 +19,7 @@ CREATE TABLE session ( label VARCHAR, -- Participants in participant - created_at VARCHAR NOT NULL, + created_at VARCHAR, indexed_at VARCHAR NOT NULL, sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL ); @@ -33,13 +33,13 @@ CREATE TABLE activity ( -- Progress in progress performed_at VARCHAR, - created_at VARCHAR NOT NULL, + created_at VARCHAR, indexed_at VARCHAR NOT NULL, sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL ); CREATE TABLE participant ( participantdid VARCHAR NOT NULL, - session VARCHAR NOT NULL, + sessionuri VARCHAR NOT NULL, role VARCHAR NOT NULL ); diff --git a/db/src/error.rs b/db/src/error.rs index ed1b585..1c330bc 100644 --- a/db/src/error.rs +++ b/db/src/error.rs @@ -3,6 +3,8 @@ pub enum Error { #[error("Database Implementation Error: {0}")] Backend(#[from] sqlx::Error), + #[error("AT Protocol Implementation Error: {0}")] + Atproto(#[from] atproto::error::Error), } pub type Result = std::result::Result; From 8160b161f583dd22d42f09ecf70a7c02c7139813 Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Tue, 17 Jun 2025 14:06:21 -0700 Subject: [PATCH 08/10] Db, rename spoor to direct, implement sessions --- db/src/interfaces/direct.rs | 2 + .../{spoor.rs => direct/functions.rs} | 119 ++++++------------ db/src/interfaces/direct/types.rs | 59 +++++++++ db/src/interfaces/mod.rs | 2 +- 4 files changed, 103 insertions(+), 79 deletions(-) create mode 100644 db/src/interfaces/direct.rs rename db/src/interfaces/{spoor.rs => direct/functions.rs} (58%) create mode 100644 db/src/interfaces/direct/types.rs diff --git a/db/src/interfaces/direct.rs b/db/src/interfaces/direct.rs new file mode 100644 index 0000000..d0b4ddc --- /dev/null +++ b/db/src/interfaces/direct.rs @@ -0,0 +1,2 @@ +pub mod types; +pub mod functions; diff --git a/db/src/interfaces/spoor.rs b/db/src/interfaces/direct/functions.rs similarity index 58% rename from db/src/interfaces/spoor.rs rename to db/src/interfaces/direct/functions.rs index 4a27dcb..caf9cbd 100644 --- a/db/src/interfaces/spoor.rs +++ b/db/src/interfaces/direct/functions.rs @@ -1,59 +1,22 @@ -use crate::Error; -use atproto::{ - Cid, - Uri, +use atproto::types::{ Datetime, - Did, - StrongRef, - Handle, + Uri, +}; +use crate::{ + Error, + interfaces::direct::types::{ + Participant, + Session, + User + }, }; use sqlx::{ - PgTransaction, + Transaction, + Postgres, PgPool, query, }; -pub struct Activity { - pub authority: Uri, - - pub cid: Option, - pub session: Option>, - pub progress: Option, - pub performed_at: Option, - pub created_at: Option, -} - -pub struct Session { - pub uri: Uri, - - pub cid: Option, - pub content: Option>, - pub label: Option, - pub created_at: Option, - pub other_participants: Option>, -} - -pub struct User { - pub did: Did, - pub handle: Option, -} - -#[non_exhaustive] -pub enum Participant { - Owner(User), - Added(User), -} - -#[non_exhaustive] -pub enum Content { - UnknownContent -} - -#[non_exhaustive] -pub enum Progress { - UnknownProgress -} - pub async fn ingest_session( db: PgPool, session: Session ) -> Result<(), Error> { @@ -63,50 +26,50 @@ pub async fn ingest_session( } async fn write_session( - tr: &mut PgTransaction<'_>, session: Session + tr: &mut Transaction<'_, Postgres>, session: Session ) -> Result<(), Error> { - - let (contenturi, contentcid): (Option, String) = - match session.content { - Some(sr) => (Some(sr.content), sr.cid.to_string()), - None => (None, "".to_string()), - }; - query!(r#" - INSERT INTO sessions( - sessionuri, sessioncid, label, created_at, contenturi, contentcid - ) VALUES ($1, $2, $3, $4, $5, $6) + INSERT INTO + session(uri, cid, owner, content, contentcid, label, created_at, indexed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) "#, - session.uri, - session.cid.map_or("", |cid| cid.to_string()), - session.label.unwrap_or(""), - session.created_at.map_or("", |dt| (*dt.as_str()).to_string()), - contenturi, - contentcid, - ).execute(&mut tr).await?; + &session.uri.to_string(), + &session.cid.to_string(), + &session.uri.authority_as_did().to_string(), + &session.content.get_content().to_string(), + &session.content.get_cid().to_string(), + session.label, + session.created_at.map(|dt| dt.to_string()), + &Datetime::now()?.to_string() + ).execute(&mut **tr).await?; - session.other_participants.and_then(|participants| { + if let Some(participants) = session.other_participants { for participant in participants { - write_participant(tr, participant, session.uri).await? + write_participant(tr, &participant, &session.uri).await? } - }).unwrap_or(Ok(())) + } + + Ok(()) } async fn write_participant( - tr: &mut PgTransaction<'_>, participant: Participant, sessionuri: Uri + tr: &mut Transaction<'_, Postgres>, participant: &Participant, sessionuri: &Uri ) -> Result<(), Error> { - let (participantType, user): (String, User) = match participant { + let (participant_type, user): (String, &User) = match participant { Participant::Owner(user) => ("Owner".to_string(), user), Participant::Added(user) => ("Participant".to_string(), user), }; query!(r#" - INSERT INTO participants( - sessionuri, userdid, role - ) VALUES ($1, $2, $3) - "#, sessionuri.to_string(), user.did.to_string, participantType - ).execute(&mut tr).await?; - + INSERT INTO + participant(participantdid, sessionuri, role) + VALUES ($1, $2, $3) + "#, + user.did.to_string(), + sessionuri.to_string(), + participant_type, + ).execute(&mut **tr).await?; + Ok(()) } diff --git a/db/src/interfaces/direct/types.rs b/db/src/interfaces/direct/types.rs new file mode 100644 index 0000000..91acc73 --- /dev/null +++ b/db/src/interfaces/direct/types.rs @@ -0,0 +1,59 @@ +use crate::Error; +use atproto::types::{ + Cid, + Uri, + Datetime, + Did, + StrongRef, + Handle, +}; +use std::fmt::{Display, Formatter, Result as FmtResult}; + +pub struct Activity { + pub authority: Uri, + pub cid: Cid, + + pub session: Option>, + pub progress: Option, + pub performed_at: Option, + pub created_at: Option, +} + +pub struct Session { + pub uri: Uri, + pub cid: Cid, + + pub content: StrongRef, + pub label: Option, + pub created_at: Option, + pub other_participants: Option>, +} + +pub struct User { + pub did: Did, + pub handle: Option, +} + +#[non_exhaustive] +pub enum Participant { + Owner(User), + Added(User), +} + +#[non_exhaustive] +pub enum Content { + UnknownContent +} + +impl Display for Content { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", match self { + Content::UnknownContent => "UnknownContentType", + }) + } +} + +#[non_exhaustive] +pub enum Progress { + UnknownProgress +} diff --git a/db/src/interfaces/mod.rs b/db/src/interfaces/mod.rs index d13ba36..afdae97 100644 --- a/db/src/interfaces/mod.rs +++ b/db/src/interfaces/mod.rs @@ -1 +1 @@ -pub mod spoor; +pub mod direct; From 2fa75902715d36b7a7f202fe5ef4051eea8f15af Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Tue, 17 Jun 2025 14:38:23 -0700 Subject: [PATCH 09/10] Atproto, add extraction of values from StrongRef --- atproto/src/types.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/atproto/src/types.rs b/atproto/src/types.rs index 70241b8..26ee60d 100644 --- a/atproto/src/types.rs +++ b/atproto/src/types.rs @@ -73,6 +73,11 @@ impl StrongRef { pub fn get_content(&self) -> &T { &self.content } + + pub fn extract_content(self) -> (T, Cid) { + (self.content, self.cid) + } + pub fn get_cid(&self) -> &Cid { &self.cid } From 2dfbc314b2612996f93cd5f36326eba74a1f51f3 Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Tue, 17 Jun 2025 14:39:35 -0700 Subject: [PATCH 10/10] Db, add activity ingestor, fix type name activity has its type name Authority instead of Uri for some reason? So that is no longer an issue. EVERYTHING IS STILL UNTESTED BABY~ --- db/src/interfaces/direct/functions.rs | 109 ++++++++++---------------- db/src/interfaces/direct/types.rs | 3 +- 2 files changed, 43 insertions(+), 69 deletions(-) diff --git a/db/src/interfaces/direct/functions.rs b/db/src/interfaces/direct/functions.rs index caf9cbd..a61ab37 100644 --- a/db/src/interfaces/direct/functions.rs +++ b/db/src/interfaces/direct/functions.rs @@ -5,6 +5,7 @@ use atproto::types::{ use crate::{ Error, interfaces::direct::types::{ + Activity, Participant, Session, User @@ -17,6 +18,14 @@ use sqlx::{ query, }; +pub async fn ingest_activity( + db: PgPool, activity: Activity +) -> Result<(), Error> { + let mut transaction = db.begin().await?; + write_activity(&mut transaction, activity).await?; + transaction.commit().await.map_err(Error::Backend) +} + pub async fn ingest_session( db: PgPool, session: Session ) -> Result<(), Error> { @@ -25,6 +34,39 @@ pub async fn ingest_session( transaction.commit().await.map_err(Error::Backend) } +async fn write_activity( + tr: &mut Transaction<'_, Postgres>, activity: Activity +) -> Result<(), Error> { + let (sessionuri, sessioncid) = match activity.session { + Some(sr) => { + let (session, cid) = sr.extract_content(); + let sessionuri = session.uri.to_string(); + write_session(tr, session).await?; + (sessionuri, cid.to_string()) + } + None => ("".to_string(), "".to_string()), + }; + + query!(r#" + INSERT INTO + activity(uri, cid, session, sessioncid, performed_at, created_at, indexed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7) + "#, + &activity.uri.to_string(), + &activity.cid.to_string(), + sessionuri, + sessioncid, + activity.performed_at.map(|dt| dt.to_string()), + activity.created_at.map(|dt| dt.to_string()), + &Datetime::now()?.to_string(), + ).execute(&mut **tr).await?; + + // TODO: Handle Progress + + Ok(()) +} + + async fn write_session( tr: &mut Transaction<'_, Postgres>, session: Session ) -> Result<(), Error> { @@ -72,70 +114,3 @@ async fn write_participant( Ok(()) } - -// pub fn injest_activity(db: PgPool, activity: Activity) -> Result<(), Error> { -// todo!(); -// } -// -// let mut transaction = db.begin().await?; -// -// query!(r#" -// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) -// "#, -// session.sessionuri, session.label -// ).execute(&mut *transaction).await?; -// -// for participant in session.participants { -// query!(r#" -// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) -// "#, -// session.sessionuri, participant.userdid, participant.role.to_string() -// ).execute(&mut *transaction).await?; -// } -// -// transaction.commit().await -// } -// -// pub async fn ingest_participant(db: PgPool, participant: Participant) -> Result<(), Error> { -// query!(r#" -// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) -// "#, -// session.sessionuri, session.label -// ).execute(&mut *transaction).await?; -// -// for participant in session.participants { -// query!(r#" -// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) -// "#, -// session.sessionuri, participant.userdid, participant.role.to_string() -// ).execute(&mut *transaction).await?; -// } -// -// transaction.commit().await -// } -// - - // pub async fn add_user(&self, user: &User) -> Result<()> { - // query!(r#" - // INSERT INTO users(userdid, handle) VALUES ($1, $2) - // "#, - // user.userdid, user.handle - // ).execute(self.pool).await?; - // Ok(()) - // } - // - // pub async fn add_session(&self, session: &Session) -> Result<()> { - // } - // - // pub async fn add_participant(&self, session: Session, - // participant: Participant) -> Result { - // query!(r#" - // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) - // "#, - // session.sessionuri, participant.userdid, participant.role.to_string() - // ).execute(self.pool).await?; - // - // session.participants.push(participant); - // - // Ok(session) - // } diff --git a/db/src/interfaces/direct/types.rs b/db/src/interfaces/direct/types.rs index 91acc73..4a3ea65 100644 --- a/db/src/interfaces/direct/types.rs +++ b/db/src/interfaces/direct/types.rs @@ -1,4 +1,3 @@ -use crate::Error; use atproto::types::{ Cid, Uri, @@ -10,7 +9,7 @@ use atproto::types::{ use std::fmt::{Display, Formatter, Result as FmtResult}; pub struct Activity { - pub authority: Uri, + pub uri: Uri, pub cid: Cid, pub session: Option>,