From 3eb6aab10fe71fcd3b2f16ea415faa2a607865ab Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Wed, 11 Jun 2025 14:09:02 -0700 Subject: [PATCH 1/5] Cargo, moving async-trait and tokio (DB, Ingestor) Move async-trait from ingestor to workspace so DB can share. Update the reference to tokio in DB from version to workspace --- Cargo.lock | 1 + Cargo.toml | 1 + db/Cargo.toml | 3 ++- ingestor/Cargo.toml | 2 +- 4 files changed, 5 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dbe12ef..00ea69c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -563,6 +563,7 @@ dependencies = [ name = "db" version = "0.1.0" dependencies = [ + "async-trait", "sqlx", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 908c860..9129ba7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ resolver = "3" members = [ "api", "atproto","db", "ingestor"] [workspace.dependencies] +async-trait = "0.1.88" atproto = { path = "./atproto" } serde = "1.0.219" serde_json = "1.0.140" diff --git a/db/Cargo.toml b/db/Cargo.toml index ded38de..d555eb9 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -4,5 +4,6 @@ version = "0.1.0" edition = "2024" [dependencies] +async-trait.workspace = true sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] } -tokio = "1.45.0" +tokio.workspace = true diff --git a/ingestor/Cargo.toml b/ingestor/Cargo.toml index a177581..7da302c 100644 --- a/ingestor/Cargo.toml +++ b/ingestor/Cargo.toml @@ -5,7 +5,7 @@ edition = "2024" [dependencies] anyhow = "1.0.98" -async-trait = "0.1.88" +async-trait.workspace = true atproto.workspace = true rocketman = "0.2.0" serde.workspace = true From aadea9757ba3caa228f1b48c63c6a91d4f5e287e Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Wed, 11 Jun 2025 17:29:59 -0700 Subject: [PATCH 2/5] Db, first commit of work This also touches atproto/src/lib.rs which may be a problem for the rebase~ Adds migrations and all the other stuff. --- Cargo.lock | 2 + atproto/src/lib.rs | 22 ++- db/Cargo.toml | 2 + .../20250612223204_initial_schema.sql | 45 +++++ db/src/connection.rs | 64 +------ db/src/error.rs | 8 + db/src/interfaces.rs | 15 -- db/src/interfaces/mod.rs | 1 + db/src/interfaces/spoor.rs | 178 ++++++++++++++++++ db/src/lib.rs | 8 +- flake.lock | 6 +- flake.nix | 1 + 12 files changed, 270 insertions(+), 82 deletions(-) create mode 100644 db/migrations/20250612223204_initial_schema.sql create mode 100644 db/src/error.rs delete mode 100644 db/src/interfaces.rs create mode 100644 db/src/interfaces/mod.rs create mode 100644 db/src/interfaces/spoor.rs diff --git a/Cargo.lock b/Cargo.lock index 00ea69c..89997b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -564,7 +564,9 @@ name = "db" version = "0.1.0" dependencies = [ "async-trait", + "atproto", "sqlx", + "thiserror 2.0.12", "tokio", ] diff --git a/atproto/src/lib.rs b/atproto/src/lib.rs index 21f8919..a327896 100644 --- a/atproto/src/lib.rs +++ b/atproto/src/lib.rs @@ -1,17 +1,37 @@ use lazy_regex::regex_captures; use core::str::FromStr; +use std::fmt::{ + Display, Formatter, Result as FmtResult +}; pub use atrium_api::types::{ Collection, string::{ + AtIdentifier as Authority, + Datetime, + Did, Nsid, RecordKey, - AtIdentifier as Authority, + Tid, + Handle, } }; pub mod lexicons; +pub struct Cid(String); + +impl Display for Cid { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", self.0) + } +} + +pub struct StrongRef { + pub content: T, + pub cid: Cid, +} + pub struct Uri { whole: String, // These fields could be useful in the future, diff --git a/db/Cargo.toml b/db/Cargo.toml index d555eb9..dd1d0b9 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" edition = "2024" [dependencies] +thiserror = "2.0.12" +atproto.workspace = true async-trait.workspace = true sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] } tokio.workspace = true diff --git a/db/migrations/20250612223204_initial_schema.sql b/db/migrations/20250612223204_initial_schema.sql new file mode 100644 index 0000000..f52b19b --- /dev/null +++ b/db/migrations/20250612223204_initial_schema.sql @@ -0,0 +1,45 @@ +-- Add migration script here + +CREATE EXTENSION IF NOT EXISTS pg_trgm; + +CREATE TABLE actor ( + did VARCHAR PRIMARY KEY, + handle VARCHAR UNIQUE, + indexed_at VARCHAR NOT NULL +); +CREATE INDEX actor_handle_trgm_idx ON actor USING gist (handle gist_trgm_ops); + +CREATE TABLE session ( + uri VARCHAR PRIMARY KEY, + cid VARCHAR NOT NULL, + owner VARCHAR NOT NULL, + + content VARCHAR NOT NULL, + contentcid VARCHAR NOT NULL, + label VARCHAR, + -- Participants in participant + + created_at VARCHAR NOT NULL, + indexed_at VARCHAR NOT NULL, + sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL +); + +CREATE TABLE activity ( + uri VARCHAR PRIMARY KEY, + cid VARCHAR NOT NULL, + + session VARCHAR, + sessioncid VARCHAR, + -- Progress in progress + + performed_at VARCHAR, + created_at VARCHAR NOT NULL, + indexed_at VARCHAR NOT NULL, + sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL +); + +CREATE TABLE participant ( + participantdid VARCHAR NOT NULL, + session VARCHAR NOT NULL, + role VARCHAR NOT NULL +); diff --git a/db/src/connection.rs b/db/src/connection.rs index ba883cc..71e9c58 100644 --- a/db/src/connection.rs +++ b/db/src/connection.rs @@ -1,19 +1,7 @@ -use sqlx::{ - query, - Database, - Pool, - Postgres, - pool::PoolOptions, - postgres::{ - PgConnectOptions, - PgSslMode, - }, - Result, -}; use std::string::ToString; -pub struct Db { - pool: Pool +pub struct Db { + pool: Pool } #[non_exhaustive] @@ -39,13 +27,6 @@ pub struct Session { impl Db { async fn connect() -> Result { - let conn = PgConnectOptions::new() - .host("localhost") - .port(5432) - .username("postgres") - .password("062217") - .database("anisky") - .ssl_mode(PgSslMode::Disable); let pool = match PoolOptions::new().connect_with(conn).await { Ok(p) => p, @@ -55,45 +36,4 @@ impl Db { Ok(Db { pool }) } // - // pub async fn add_user(&self, user: &User) -> Result<()> { - // query!(r#" - // INSERT INTO users(userdid, handle) VALUES ($1, $2) - // "#, - // user.userdid, user.handle - // ).execute(self.pool).await?; - // Ok(()) - // } - // - // pub async fn add_session(&self, session: &Session) -> Result<()> { - // let mut transaction = self.pool.begin().await?; - // - // query!(r#" - // INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) - // "#, - // session.sessionuri, session.label - // ).execute(&mut *transaction).await?; - // - // for participant in session.participants { - // query!(r#" - // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) - // "#, - // session.sessionuri, participant.userdid, participant.role.to_string() - // ).execute(&mut *transaction).await?; - // } - // - // transaction.commit().await - // } - // - // pub async fn add_participant(&self, session: Session, - // participant: Participant) -> Result { - // query!(r#" - // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) - // "#, - // session.sessionuri, participant.userdid, participant.role.to_string() - // ).execute(self.pool).await?; - // - // session.participants.push(participant); - // - // Ok(session) - // } } diff --git a/db/src/error.rs b/db/src/error.rs new file mode 100644 index 0000000..ed1b585 --- /dev/null +++ b/db/src/error.rs @@ -0,0 +1,8 @@ +#[non_exhaustive] +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Database Implementation Error: {0}")] + Backend(#[from] sqlx::Error), +} + +pub type Result = std::result::Result; diff --git a/db/src/interfaces.rs b/db/src/interfaces.rs deleted file mode 100644 index d3629e1..0000000 --- a/db/src/interfaces.rs +++ /dev/null @@ -1,15 +0,0 @@ -use atproto::{ - Did, - Uri, -}; - -pub struct User { - userdid: Did, - handle: Handle, -} - -struct Participant { - participantdid: Did, - role: Role, -} - diff --git a/db/src/interfaces/mod.rs b/db/src/interfaces/mod.rs new file mode 100644 index 0000000..d13ba36 --- /dev/null +++ b/db/src/interfaces/mod.rs @@ -0,0 +1 @@ +pub mod spoor; diff --git a/db/src/interfaces/spoor.rs b/db/src/interfaces/spoor.rs new file mode 100644 index 0000000..4a27dcb --- /dev/null +++ b/db/src/interfaces/spoor.rs @@ -0,0 +1,178 @@ +use crate::Error; +use atproto::{ + Cid, + Uri, + Datetime, + Did, + StrongRef, + Handle, +}; +use sqlx::{ + PgTransaction, + PgPool, + query, +}; + +pub struct Activity { + pub authority: Uri, + + pub cid: Option, + pub session: Option>, + pub progress: Option, + pub performed_at: Option, + pub created_at: Option, +} + +pub struct Session { + pub uri: Uri, + + pub cid: Option, + pub content: Option>, + pub label: Option, + pub created_at: Option, + pub other_participants: Option>, +} + +pub struct User { + pub did: Did, + pub handle: Option, +} + +#[non_exhaustive] +pub enum Participant { + Owner(User), + Added(User), +} + +#[non_exhaustive] +pub enum Content { + UnknownContent +} + +#[non_exhaustive] +pub enum Progress { + UnknownProgress +} + +pub async fn ingest_session( + db: PgPool, session: Session +) -> Result<(), Error> { + let mut transaction = db.begin().await?; + write_session(&mut transaction, session).await?; + transaction.commit().await.map_err(Error::Backend) +} + +async fn write_session( + tr: &mut PgTransaction<'_>, session: Session +) -> Result<(), Error> { + + let (contenturi, contentcid): (Option, String) = + match session.content { + Some(sr) => (Some(sr.content), sr.cid.to_string()), + None => (None, "".to_string()), + }; + + query!(r#" + INSERT INTO sessions( + sessionuri, sessioncid, label, created_at, contenturi, contentcid + ) VALUES ($1, $2, $3, $4, $5, $6) + "#, + session.uri, + session.cid.map_or("", |cid| cid.to_string()), + session.label.unwrap_or(""), + session.created_at.map_or("", |dt| (*dt.as_str()).to_string()), + contenturi, + contentcid, + ).execute(&mut tr).await?; + + session.other_participants.and_then(|participants| { + for participant in participants { + write_participant(tr, participant, session.uri).await? + } + }).unwrap_or(Ok(())) +} + +async fn write_participant( + tr: &mut PgTransaction<'_>, participant: Participant, sessionuri: Uri +) -> Result<(), Error> { + let (participantType, user): (String, User) = match participant { + Participant::Owner(user) => ("Owner".to_string(), user), + Participant::Added(user) => ("Participant".to_string(), user), + }; + + query!(r#" + INSERT INTO participants( + sessionuri, userdid, role + ) VALUES ($1, $2, $3) + "#, sessionuri.to_string(), user.did.to_string, participantType + ).execute(&mut tr).await?; + + Ok(()) +} + +// pub fn injest_activity(db: PgPool, activity: Activity) -> Result<(), Error> { +// todo!(); +// } +// +// let mut transaction = db.begin().await?; +// +// query!(r#" +// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) +// "#, +// session.sessionuri, session.label +// ).execute(&mut *transaction).await?; +// +// for participant in session.participants { +// query!(r#" +// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) +// "#, +// session.sessionuri, participant.userdid, participant.role.to_string() +// ).execute(&mut *transaction).await?; +// } +// +// transaction.commit().await +// } +// +// pub async fn ingest_participant(db: PgPool, participant: Participant) -> Result<(), Error> { +// query!(r#" +// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) +// "#, +// session.sessionuri, session.label +// ).execute(&mut *transaction).await?; +// +// for participant in session.participants { +// query!(r#" +// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) +// "#, +// session.sessionuri, participant.userdid, participant.role.to_string() +// ).execute(&mut *transaction).await?; +// } +// +// transaction.commit().await +// } +// + + // pub async fn add_user(&self, user: &User) -> Result<()> { + // query!(r#" + // INSERT INTO users(userdid, handle) VALUES ($1, $2) + // "#, + // user.userdid, user.handle + // ).execute(self.pool).await?; + // Ok(()) + // } + // + // pub async fn add_session(&self, session: &Session) -> Result<()> { + // } + // + // pub async fn add_participant(&self, session: Session, + // participant: Participant) -> Result { + // query!(r#" + // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) + // "#, + // session.sessionuri, participant.userdid, participant.role.to_string() + // ).execute(self.pool).await?; + // + // session.participants.push(participant); + // + // Ok(session) + // } diff --git a/db/src/lib.rs b/db/src/lib.rs index 82e9c13..6683e08 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -1 +1,7 @@ -pub struct db; +pub mod interfaces; +pub mod error; + +pub use crate::error::Error; + + +// pub struct db; diff --git a/flake.lock b/flake.lock index 8f3aa7b..69e701e 100644 --- a/flake.lock +++ b/flake.lock @@ -41,11 +41,11 @@ "nixpkgs": "nixpkgs_2" }, "locked": { - "lastModified": 1746585402, - "narHash": "sha256-Pf+ufu6bYNA1+KQKHnGMNEfTwpD9ZIcAeLoE2yPWIP0=", + "lastModified": 1749695868, + "narHash": "sha256-debjTLOyqqsYOUuUGQsAHskFXH5+Kx2t3dOo/FCoNRA=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "72dd969389583664f87aa348b3458f2813693617", + "rev": "55f914d5228b5c8120e9e0f9698ed5b7214d09cd", "type": "github" }, "original": { diff --git a/flake.nix b/flake.nix index 943945b..48d94bf 100644 --- a/flake.nix +++ b/flake.nix @@ -42,6 +42,7 @@ packages = (with pkgs; [ # The package provided by our custom overlay. Includes cargo, Clippy, cargo-fmt, # rustdoc, rustfmt, and other tools. + sqlx-cli rustToolchain ]) ++ pkgs.lib.optionals pkgs.stdenv.isDarwin (with pkgs; [ libiconv ]); }; From ab78d1fb7b5ca4b5ff7dcefe925c4a6e8c373f3d Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Mon, 16 Jun 2025 15:29:27 -0700 Subject: [PATCH 3/5] Atproto, types overhaul and error handling Breaks off from Atrium-rs's types because they are implemented inconsistently, which makes them harder to use. This was done with reference to the atproto documentation but specifically not the atrium-rs codebase so I wouldn't have to think about licenses. This adds the types and error module in atproto. It also touches Cargo.toml for some new dependencies and some shared dependencies. It required thiserror, so I looped that into the workspace meaning that this commit touches db. some things to keep in mind: - There is no CID parsing - None of this is tested, nor are there any tests written. We're playing fast and loose baby~ --- Cargo.toml | 1 + atproto/Cargo.toml | 2 + atproto/src/error.rs | 31 ++++++++++++ atproto/src/lib.rs | 81 +------------------------------ atproto/src/types.rs | 84 +++++++++++++++++++++++++++++++++ atproto/src/types/authority.rs | 38 +++++++++++++++ atproto/src/types/cid.rs | 16 +++++++ atproto/src/types/datetime.rs | 63 +++++++++++++++++++++++++ atproto/src/types/did.rs | 72 ++++++++++++++++++++++++++++ atproto/src/types/record_key.rs | 63 +++++++++++++++++++++++++ atproto/src/types/uri.rs | 79 +++++++++++++++++++++++++++++++ db/Cargo.toml | 4 +- 12 files changed, 453 insertions(+), 81 deletions(-) create mode 100644 atproto/src/error.rs create mode 100644 atproto/src/types.rs create mode 100644 atproto/src/types/authority.rs create mode 100644 atproto/src/types/cid.rs create mode 100644 atproto/src/types/datetime.rs create mode 100644 atproto/src/types/did.rs create mode 100644 atproto/src/types/record_key.rs create mode 100644 atproto/src/types/uri.rs diff --git a/Cargo.toml b/Cargo.toml index 9129ba7..779f393 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ async-trait = "0.1.88" atproto = { path = "./atproto" } serde = "1.0.219" serde_json = "1.0.140" +thiserror = "2.0.12" tokio = { version = "1.45.0", features = ["macros", "rt-multi-thread"] } tracing = "0.1.41" tracing-subscriber = "0.3.19" diff --git a/atproto/Cargo.toml b/atproto/Cargo.toml index 7d56725..033ff0d 100644 --- a/atproto/Cargo.toml +++ b/atproto/Cargo.toml @@ -8,5 +8,7 @@ atrium-api = { version = "0.25.3", default-features = false } lazy-regex = "3.4.1" serde.workspace = true serde_json.workspace = true +time = { version = "0.3.41", features = ["parsing", "formatting"] } tracing-subscriber.workspace = true tracing.workspace = true +thiserror.workspace = true diff --git a/atproto/src/error.rs b/atproto/src/error.rs new file mode 100644 index 0000000..1db7106 --- /dev/null +++ b/atproto/src/error.rs @@ -0,0 +1,31 @@ +use thiserror::Error as ThisError; + +#[non_exhaustive] +#[derive(Debug, ThisError)] +pub enum Error { + #[error("Error while parsing")] + Parse { err: ParseError, object: String }, + #[error("Error while formatting")] + Format { err: FormatError, object: String }, +} + +#[non_exhaustive] +#[derive(Debug, ThisError)] +pub enum FormatError { + #[error("Time Parse Error: {0}")] + Datetime(#[from] time::error::Format), +} +#[non_exhaustive] +#[derive(Debug, ThisError)] +pub enum ParseError { + #[error("Time Parse Error: {0}")] + Datetime(#[from] time::error::Parse), + #[error("Length of parsed object too long, max: {max:?}, got: {got:?}.")] + Length { max: usize, got: usize }, + #[error("Currently Did is enforced, cannot use handle, {handle:?}")] + ForceDid { handle: String }, + #[error("Incorrectly formatted")] + Format, +} + +pub type Result = std::result::Result; diff --git a/atproto/src/lib.rs b/atproto/src/lib.rs index a327896..c3b56ee 100644 --- a/atproto/src/lib.rs +++ b/atproto/src/lib.rs @@ -1,80 +1,3 @@ -use lazy_regex::regex_captures; -use core::str::FromStr; -use std::fmt::{ - Display, Formatter, Result as FmtResult -}; - -pub use atrium_api::types::{ - Collection, - string::{ - AtIdentifier as Authority, - Datetime, - Did, - Nsid, - RecordKey, - Tid, - Handle, - } -}; - pub mod lexicons; - -pub struct Cid(String); - -impl Display for Cid { - fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - write!(f, "{}", self.0) - } -} - -pub struct StrongRef { - pub content: T, - pub cid: Cid, -} - -pub struct Uri { - whole: String, - // These fields could be useful in the future, - // so I'm leaving the code for them. - // authority: Authority, - // collection: Option, - // rkey: Option, -} - -impl FromStr for Uri { - type Err = &'static str; - fn from_str(uri: &str) -> Result { - if uri.len() > 8000 { - return Err("Uri too long") - } - - let Some(( - whole, unchecked_authority, unchecked_collection, unchecked_rkey - )) = regex_captures!( - r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i", - uri, - ) else { - return Err("Invalid Uri"); - }; - - // This parsing is required, but the values don't need to be used yet. - // No compute cost to use them, just storage cost - let _authority = Authority::from_str(unchecked_authority)?; - - let _collection = if unchecked_collection.is_empty() { None } - else { Some(Nsid::new(unchecked_collection.to_string())?) }; - - let _rkey = if unchecked_rkey.is_empty() { None } - else { Some(RecordKey::new(unchecked_rkey.to_string())?) }; - - // Ok(Uri{ whole: whole.to_string(), authority, collection, rkey }) - Ok(Uri { whole: whole.to_string() }) - } -} - -impl Uri { - pub fn as_str(&self) -> &str { - self.whole.as_str() - } -} - +pub mod types; +pub mod error; diff --git a/atproto/src/types.rs b/atproto/src/types.rs new file mode 100644 index 0000000..26ee60d --- /dev/null +++ b/atproto/src/types.rs @@ -0,0 +1,84 @@ +use crate::error::{Error, ParseError}; + +macro_rules! basic_string_type { + ($name:ident, $regex:literal, $max_len:literal) => { + pub struct $name { value: String, } + + impl std::fmt::Display for $name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.value) + } + } + + impl std::str::FromStr for $name { + type Err = Error; + fn from_str(s: &str) -> Result { + if s.len() > $max_len { + return Err(Error::Parse { + err: ParseError::Length { max: s.len(), got: $max_len }, + object: stringify!($name).to_string(), + }); + } + + if ! lazy_regex::regex_is_match!( + $regex, + s + ) { + return Err(Error::Parse { + err: ParseError::Format, + object: stringify!($name).to_string(), + }); + } + + Ok(Self { + value: s.to_string(), + }) + } + } + } +} + +mod did; +pub use did::Did; +mod cid; +pub use cid::Cid; +mod authority; +pub use authority::Authority; +mod datetime; +pub use datetime::Datetime; +mod record_key; +pub use record_key::RecordKey; +mod uri; +pub use uri::Uri; + +basic_string_type!(Handle, + r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$", + 253 +); +basic_string_type!(Nsid, + r"^[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+(\.[a-zA-Z]([a-zA-Z0-9]{0,62})?)$", + 317 +); +basic_string_type!(Tid, + r"^[234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{12}$", + 13 +); + +pub struct StrongRef { + content: T, + cid: Cid, +} + +impl StrongRef { + pub fn get_content(&self) -> &T { + &self.content + } + + pub fn extract_content(self) -> (T, Cid) { + (self.content, self.cid) + } + + pub fn get_cid(&self) -> &Cid { + &self.cid + } +} diff --git a/atproto/src/types/authority.rs b/atproto/src/types/authority.rs new file mode 100644 index 0000000..1c76750 --- /dev/null +++ b/atproto/src/types/authority.rs @@ -0,0 +1,38 @@ +use crate::{ + types::{Did, Handle}, + error::{Error, ParseError}, +}; +use std::{ + fmt::{Display, Formatter, Result as FmtResult}, + str::FromStr, +}; + +pub enum Authority { + Did(Did), + Handle(Handle), +} + +impl Display for Authority { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", match self { + Authority::Did(did) => did.to_string(), + Authority::Handle(handle) => handle.to_string(), + }) + } +} + +impl FromStr for Authority { + type Err = Error; + fn from_str(s: &str) -> Result { + if let Ok(did) = s.parse::() { + return Ok(Authority::Did(did)); + } + if let Ok(did) = s.parse::() { + return Ok(Authority::Handle(did)); + } + Err(Error::Parse { + err: ParseError::Format, + object: "Authority".to_string(), + }) + } +} diff --git a/atproto/src/types/cid.rs b/atproto/src/types/cid.rs new file mode 100644 index 0000000..3581ad2 --- /dev/null +++ b/atproto/src/types/cid.rs @@ -0,0 +1,16 @@ +use crate::error::Error; + +pub struct Cid { value: String, } + +impl std::fmt::Display for Cid { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.value) + } +} + +impl std::str::FromStr for Cid { + type Err = Error; + fn from_str(s: &str) -> Result { + Ok(Self { value: s.to_string() }) + } +} diff --git a/atproto/src/types/datetime.rs b/atproto/src/types/datetime.rs new file mode 100644 index 0000000..6dcf234 --- /dev/null +++ b/atproto/src/types/datetime.rs @@ -0,0 +1,63 @@ +use crate::error::{Error, ParseError, FormatError}; +use time::{ + UtcDateTime, + format_description::well_known::{Rfc3339, Iso8601}, +}; +use std::{ + fmt::{Display, Formatter, Result as FmtResult}, + str::FromStr, +}; +pub use time::{Date, Time}; + +pub struct Datetime { + time: UtcDateTime, + derived_string: String, +} + +impl Datetime { + pub fn now() -> Result { + Datetime::from_utc(UtcDateTime::now()) + } + + pub fn new(date: Date, time: Time) -> Result { + Datetime::from_utc(UtcDateTime::new(date, time)) + } + + fn from_utc(utc: UtcDateTime) -> Result { + Ok(Datetime { + time: utc, + derived_string: match utc.format(&Rfc3339) { + Ok(ds) => ds, + Err(e) => return Err(Error::Format { + err: FormatError::Datetime(e), + object: "Datetime".to_string(), + }), + }, + }) + } +} + +impl Display for Datetime { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", self.derived_string) + } +} + +impl FromStr for Datetime { + type Err = Error; + + fn from_str(s: &str) -> Result { + // Parse as both Rfc3339 and Iso8601 to ensure intersection + let dt_rfc3339 = UtcDateTime::parse(s, &Rfc3339); + let dt_iso8601 = UtcDateTime::parse(s, &Iso8601::DEFAULT); + + let datetime = dt_iso8601 + .and(dt_rfc3339) + .map_err(|e| Error::Parse { + err: ParseError::Datetime(e), + object: "Datetime".to_string(), + })?; + + Ok(Datetime { time: datetime, derived_string: s.to_string() }) + } +} diff --git a/atproto/src/types/did.rs b/atproto/src/types/did.rs new file mode 100644 index 0000000..6c91db2 --- /dev/null +++ b/atproto/src/types/did.rs @@ -0,0 +1,72 @@ +use crate::error::{Error, ParseError}; +use std::{ + fmt::{Display, Formatter, Result as FmtResult}, + str::FromStr, +}; +use lazy_regex::regex_captures; + +enum DidMethod { + Web, + Plc, +} +impl Display for DidMethod { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", match self { + DidMethod::Web => String::from("web"), + DidMethod::Plc => String::from("plc"), + }) + } +} +impl FromStr for DidMethod { + type Err = Error; + fn from_str(s: &str) -> Result { + match s { + "web" => Ok(DidMethod::Web), + "plc" => Ok(DidMethod::Plc), + _ => Err(Error::Parse { + err: ParseError::Format, + object: "DidMethod".to_string(), + }), + } + } +} + +pub struct Did { + method: DidMethod, + identifier: String, +} + +impl Display for Did { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "did:{}:{}", self.method, self.identifier) + } +} + +impl FromStr for Did { + type Err = Error; + fn from_str(s: &str) -> Result { + if s.len() > 2048 { + return Err(Error::Parse { + err: ParseError::Length { max: s.len(), got: 2048 }, + object: "Did".to_string(), + }); + } + + let Some(( + _whole, unchecked_method, identifier + )) = regex_captures!( + r"^did:([a-z]+):([a-zA-Z0-9._:%-]*[a-zA-Z0-9._-])$", + s, + ) else { + return Err(Error::Parse { + err: ParseError::Format, + object: "Did".to_string(), + }); + }; + + Ok(Self { + method: unchecked_method.parse::()?, + identifier: identifier.to_string(), + }) + } +} diff --git a/atproto/src/types/record_key.rs b/atproto/src/types/record_key.rs new file mode 100644 index 0000000..7368b53 --- /dev/null +++ b/atproto/src/types/record_key.rs @@ -0,0 +1,63 @@ +use crate::{ + types::{Nsid, Tid}, + error::{Error, ParseError}, +}; +use std::{ + fmt::{Display, Formatter, Result as FmtResult}, + str::FromStr, +}; +use lazy_regex::regex_is_match; + +pub enum RecordKey { + Tid(Tid), + Nsid(Nsid), + Literal(String), + Any(String), +} +impl Display for RecordKey { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", match self { + RecordKey::Tid(tid) => tid.to_string(), + RecordKey::Nsid(nsid) => nsid.to_string(), + RecordKey::Literal(literal) => literal.to_string(), + RecordKey::Any(any) => any.to_string(), + }) + } +} +impl FromStr for RecordKey { + type Err = Error; + fn from_str(s: &str) -> Result { + if s.len() > 512 { + return Err(Error::Parse { + err: ParseError::Length { max: 512, got: s.len() }, + object: "RecordKey".to_string(), + }); + } + + if !( + regex_is_match!( + r"^[a-zA-Z0-9`.-_:~]+$", + s, + ) + && s != "." + && s != ".." + ) { + return Err(Error::Parse { + err: ParseError::Format, + object: "RecordKey".to_string(), + }); + } + + // Valid record key, now decide type + if s.starts_with("literal:") { + return Ok(RecordKey::Literal(s.to_string())); + } + if let Ok(tid) = s.parse::() { + return Ok(RecordKey::Tid(tid)); + } + if let Ok(nsid) = s.parse::() { + return Ok(RecordKey::Nsid(nsid)); + } + Ok(RecordKey::Any(s.to_string())) + } +} diff --git a/atproto/src/types/uri.rs b/atproto/src/types/uri.rs new file mode 100644 index 0000000..843ee2b --- /dev/null +++ b/atproto/src/types/uri.rs @@ -0,0 +1,79 @@ +use crate::{ + types::{Did, Authority, Nsid, RecordKey}, + error::{Error, ParseError}, +}; +use std::{ + fmt::{Display, Formatter, Result as FmtResult}, + str::FromStr, +}; +use lazy_regex::regex_captures; + +pub struct Uri { + authority: Did, + collection: Option, + rkey: Option, +} + +impl Display for Uri { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "at://{}", self.authority)?; + + if let Some(collection) = &self.collection { + write!(f, "/{}", collection)?; + + if let Some(rkey) = &self.rkey { + write!(f, "/{}", rkey)?; + } + } + + Ok(()) + } +} + +impl FromStr for Uri { + type Err = Error; + fn from_str(s: &str) -> Result { + if s.len() > 8000 { + return Err(Error::Parse { + err: ParseError::Length { max: 8000, got: s.len() }, + object: "Did".to_string(), + }); + } + + let Some(( + _whole, unchecked_authority, unchecked_collection, unchecked_rkey + )) = regex_captures!( + r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i", + s, + ) else { + return Err(Error::Parse { + err: ParseError::Format, + object: "Uri".to_string(), + }); + }; + + let did = match Authority::from_str(unchecked_authority)? { + Authority::Handle(h) => + return Err(Error::Parse { + err: ParseError::ForceDid { handle: h.to_string() }, + object: "Uri".to_string(), + }), + Authority::Did(d) => d, + }; + + let collection = if unchecked_collection.is_empty() { None } + else { Some(unchecked_collection.parse::()?) }; + + let rkey = if unchecked_rkey.is_empty() { None } + else { Some(unchecked_rkey.parse::()?) }; + + Ok(Uri { authority: did, collection, rkey }) + } +} + +impl Uri { + pub fn authority_as_did(&self) -> &Did { + &self.authority + } +} + diff --git a/db/Cargo.toml b/db/Cargo.toml index dd1d0b9..13b55ae 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -4,8 +4,8 @@ version = "0.1.0" edition = "2024" [dependencies] -thiserror = "2.0.12" atproto.workspace = true async-trait.workspace = true -sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] } +sqlx.workspace = true +thiserror.workspace = true tokio.workspace = true From 0707b5eed4f840f98b6cdbc3a427d3f15b83f02d Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Mon, 16 Jun 2025 15:54:20 -0700 Subject: [PATCH 4/5] Atproto, add sqlx support --- Cargo.lock | 55 +++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + atproto/Cargo.toml | 5 +++++ atproto/src/lib.rs | 2 ++ atproto/src/sqlx.rs | 38 +++++++++++++++++++++++++++++++ db/Cargo.toml | 2 +- 6 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 atproto/src/sqlx.rs diff --git a/Cargo.lock b/Cargo.lock index 89997b8..6c97f48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,9 @@ dependencies = [ "lazy-regex", "serde", "serde_json", + "sqlx", + "thiserror 2.0.12", + "time", "tracing", "tracing-subscriber", ] @@ -581,6 +584,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +dependencies = [ + "powerfmt", +] + [[package]] name = "derive_builder" version = "0.20.2" @@ -1470,6 +1482,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.46" @@ -1625,6 +1643,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -2410,6 +2434,37 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.3.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" + +[[package]] +name = "time-macros" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinystr" version = "0.8.1" diff --git a/Cargo.toml b/Cargo.toml index 779f393..c029fb0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ async-trait = "0.1.88" atproto = { path = "./atproto" } serde = "1.0.219" serde_json = "1.0.140" +sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] } thiserror = "2.0.12" tokio = { version = "1.45.0", features = ["macros", "rt-multi-thread"] } tracing = "0.1.41" diff --git a/atproto/Cargo.toml b/atproto/Cargo.toml index 033ff0d..3f5445b 100644 --- a/atproto/Cargo.toml +++ b/atproto/Cargo.toml @@ -8,7 +8,12 @@ atrium-api = { version = "0.25.3", default-features = false } lazy-regex = "3.4.1" serde.workspace = true serde_json.workspace = true +sqlx = { workspace = true, optional = true } time = { version = "0.3.41", features = ["parsing", "formatting"] } tracing-subscriber.workspace = true tracing.workspace = true thiserror.workspace = true + +[features] +default = [] +sqlx-support = ["dep:sqlx"] diff --git a/atproto/src/lib.rs b/atproto/src/lib.rs index c3b56ee..53c8d32 100644 --- a/atproto/src/lib.rs +++ b/atproto/src/lib.rs @@ -1,3 +1,5 @@ pub mod lexicons; pub mod types; pub mod error; +#[cfg(feature = "sqlx-support")] +pub mod sqlx; diff --git a/atproto/src/sqlx.rs b/atproto/src/sqlx.rs new file mode 100644 index 0000000..0bb6683 --- /dev/null +++ b/atproto/src/sqlx.rs @@ -0,0 +1,38 @@ +use crate::types::{ + Did, + Cid, + Uri, + Handle, + Datetime, +}; + +macro_rules! implement_sqlx_for_string_type { + ($name:ident) => { + impl sqlx::Type for $name { + fn type_info() -> sqlx::postgres::PgTypeInfo { + >::type_info() + } + } + impl<'q> sqlx::Encode<'q, sqlx::Postgres> for $name { + fn encode_by_ref( + &self, buf: &mut sqlx::postgres::PgArgumentBuffer + ) -> Result { + >::encode_by_ref(&self.to_string(), buf) + } + } + impl<'r> sqlx::Decode<'r, sqlx::Postgres> for $name { + fn decode( + value: sqlx::postgres::PgValueRef<'r> + ) -> Result { + let s = >::decode(value)?; + s.parse::<$name>().map_err(|e| Box::new(e) as sqlx::error::BoxDynError) + } + } + } +} + +implement_sqlx_for_string_type!(Did); +implement_sqlx_for_string_type!(Cid); +implement_sqlx_for_string_type!(Uri); +implement_sqlx_for_string_type!(Handle); +implement_sqlx_for_string_type!(Datetime); diff --git a/db/Cargo.toml b/db/Cargo.toml index 13b55ae..a07bead 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -4,8 +4,8 @@ version = "0.1.0" edition = "2024" [dependencies] -atproto.workspace = true async-trait.workspace = true +atproto = { workspace = true, features = ["sqlx-support"] } sqlx.workspace = true thiserror.workspace = true tokio.workspace = true From 4a767ea1ad000c0e653b9f32144dabc4e1da0c6a Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Tue, 17 Jun 2025 13:55:30 -0700 Subject: [PATCH 5/5] Db, Commit two, everything is looking pretty Db, add atproto error support Db, rename spoor to direct, implement sessions Db, add activity ingestor, fix type name activity has its type name Authority instead of Uri for some reason? So that is no longer an issue. EVERYTHING IS STILL UNTESTED BABY~ --- .../20250612223204_initial_schema.sql | 6 +- db/src/error.rs | 2 + db/src/interfaces/direct.rs | 2 + db/src/interfaces/direct/functions.rs | 116 ++++++++++++ db/src/interfaces/direct/types.rs | 58 ++++++ db/src/interfaces/mod.rs | 2 +- db/src/interfaces/spoor.rs | 178 ------------------ 7 files changed, 182 insertions(+), 182 deletions(-) create mode 100644 db/src/interfaces/direct.rs create mode 100644 db/src/interfaces/direct/functions.rs create mode 100644 db/src/interfaces/direct/types.rs delete mode 100644 db/src/interfaces/spoor.rs diff --git a/db/migrations/20250612223204_initial_schema.sql b/db/migrations/20250612223204_initial_schema.sql index f52b19b..b95abd4 100644 --- a/db/migrations/20250612223204_initial_schema.sql +++ b/db/migrations/20250612223204_initial_schema.sql @@ -19,7 +19,7 @@ CREATE TABLE session ( label VARCHAR, -- Participants in participant - created_at VARCHAR NOT NULL, + created_at VARCHAR, indexed_at VARCHAR NOT NULL, sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL ); @@ -33,13 +33,13 @@ CREATE TABLE activity ( -- Progress in progress performed_at VARCHAR, - created_at VARCHAR NOT NULL, + created_at VARCHAR, indexed_at VARCHAR NOT NULL, sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL ); CREATE TABLE participant ( participantdid VARCHAR NOT NULL, - session VARCHAR NOT NULL, + sessionuri VARCHAR NOT NULL, role VARCHAR NOT NULL ); diff --git a/db/src/error.rs b/db/src/error.rs index ed1b585..1c330bc 100644 --- a/db/src/error.rs +++ b/db/src/error.rs @@ -3,6 +3,8 @@ pub enum Error { #[error("Database Implementation Error: {0}")] Backend(#[from] sqlx::Error), + #[error("AT Protocol Implementation Error: {0}")] + Atproto(#[from] atproto::error::Error), } pub type Result = std::result::Result; diff --git a/db/src/interfaces/direct.rs b/db/src/interfaces/direct.rs new file mode 100644 index 0000000..d0b4ddc --- /dev/null +++ b/db/src/interfaces/direct.rs @@ -0,0 +1,2 @@ +pub mod types; +pub mod functions; diff --git a/db/src/interfaces/direct/functions.rs b/db/src/interfaces/direct/functions.rs new file mode 100644 index 0000000..a61ab37 --- /dev/null +++ b/db/src/interfaces/direct/functions.rs @@ -0,0 +1,116 @@ +use atproto::types::{ + Datetime, + Uri, +}; +use crate::{ + Error, + interfaces::direct::types::{ + Activity, + Participant, + Session, + User + }, +}; +use sqlx::{ + Transaction, + Postgres, + PgPool, + query, +}; + +pub async fn ingest_activity( + db: PgPool, activity: Activity +) -> Result<(), Error> { + let mut transaction = db.begin().await?; + write_activity(&mut transaction, activity).await?; + transaction.commit().await.map_err(Error::Backend) +} + +pub async fn ingest_session( + db: PgPool, session: Session +) -> Result<(), Error> { + let mut transaction = db.begin().await?; + write_session(&mut transaction, session).await?; + transaction.commit().await.map_err(Error::Backend) +} + +async fn write_activity( + tr: &mut Transaction<'_, Postgres>, activity: Activity +) -> Result<(), Error> { + let (sessionuri, sessioncid) = match activity.session { + Some(sr) => { + let (session, cid) = sr.extract_content(); + let sessionuri = session.uri.to_string(); + write_session(tr, session).await?; + (sessionuri, cid.to_string()) + } + None => ("".to_string(), "".to_string()), + }; + + query!(r#" + INSERT INTO + activity(uri, cid, session, sessioncid, performed_at, created_at, indexed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7) + "#, + &activity.uri.to_string(), + &activity.cid.to_string(), + sessionuri, + sessioncid, + activity.performed_at.map(|dt| dt.to_string()), + activity.created_at.map(|dt| dt.to_string()), + &Datetime::now()?.to_string(), + ).execute(&mut **tr).await?; + + // TODO: Handle Progress + + Ok(()) +} + + +async fn write_session( + tr: &mut Transaction<'_, Postgres>, session: Session +) -> Result<(), Error> { + query!(r#" + INSERT INTO + session(uri, cid, owner, content, contentcid, label, created_at, indexed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + "#, + &session.uri.to_string(), + &session.cid.to_string(), + &session.uri.authority_as_did().to_string(), + &session.content.get_content().to_string(), + &session.content.get_cid().to_string(), + session.label, + session.created_at.map(|dt| dt.to_string()), + &Datetime::now()?.to_string() + ).execute(&mut **tr).await?; + + if let Some(participants) = session.other_participants { + for participant in participants { + write_participant(tr, &participant, &session.uri).await? + } + } + + Ok(()) +} + +async fn write_participant( + tr: &mut Transaction<'_, Postgres>, participant: &Participant, sessionuri: &Uri +) -> Result<(), Error> { + let (participant_type, user): (String, &User) = match participant { + Participant::Owner(user) => ("Owner".to_string(), user), + Participant::Added(user) => ("Participant".to_string(), user), + }; + + query!(r#" + INSERT INTO + participant(participantdid, sessionuri, role) + VALUES ($1, $2, $3) + "#, + user.did.to_string(), + sessionuri.to_string(), + participant_type, + ).execute(&mut **tr).await?; + + Ok(()) +} diff --git a/db/src/interfaces/direct/types.rs b/db/src/interfaces/direct/types.rs new file mode 100644 index 0000000..4a3ea65 --- /dev/null +++ b/db/src/interfaces/direct/types.rs @@ -0,0 +1,58 @@ +use atproto::types::{ + Cid, + Uri, + Datetime, + Did, + StrongRef, + Handle, +}; +use std::fmt::{Display, Formatter, Result as FmtResult}; + +pub struct Activity { + pub uri: Uri, + pub cid: Cid, + + pub session: Option>, + pub progress: Option, + pub performed_at: Option, + pub created_at: Option, +} + +pub struct Session { + pub uri: Uri, + pub cid: Cid, + + pub content: StrongRef, + pub label: Option, + pub created_at: Option, + pub other_participants: Option>, +} + +pub struct User { + pub did: Did, + pub handle: Option, +} + +#[non_exhaustive] +pub enum Participant { + Owner(User), + Added(User), +} + +#[non_exhaustive] +pub enum Content { + UnknownContent +} + +impl Display for Content { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", match self { + Content::UnknownContent => "UnknownContentType", + }) + } +} + +#[non_exhaustive] +pub enum Progress { + UnknownProgress +} diff --git a/db/src/interfaces/mod.rs b/db/src/interfaces/mod.rs index d13ba36..afdae97 100644 --- a/db/src/interfaces/mod.rs +++ b/db/src/interfaces/mod.rs @@ -1 +1 @@ -pub mod spoor; +pub mod direct; diff --git a/db/src/interfaces/spoor.rs b/db/src/interfaces/spoor.rs deleted file mode 100644 index 4a27dcb..0000000 --- a/db/src/interfaces/spoor.rs +++ /dev/null @@ -1,178 +0,0 @@ -use crate::Error; -use atproto::{ - Cid, - Uri, - Datetime, - Did, - StrongRef, - Handle, -}; -use sqlx::{ - PgTransaction, - PgPool, - query, -}; - -pub struct Activity { - pub authority: Uri, - - pub cid: Option, - pub session: Option>, - pub progress: Option, - pub performed_at: Option, - pub created_at: Option, -} - -pub struct Session { - pub uri: Uri, - - pub cid: Option, - pub content: Option>, - pub label: Option, - pub created_at: Option, - pub other_participants: Option>, -} - -pub struct User { - pub did: Did, - pub handle: Option, -} - -#[non_exhaustive] -pub enum Participant { - Owner(User), - Added(User), -} - -#[non_exhaustive] -pub enum Content { - UnknownContent -} - -#[non_exhaustive] -pub enum Progress { - UnknownProgress -} - -pub async fn ingest_session( - db: PgPool, session: Session -) -> Result<(), Error> { - let mut transaction = db.begin().await?; - write_session(&mut transaction, session).await?; - transaction.commit().await.map_err(Error::Backend) -} - -async fn write_session( - tr: &mut PgTransaction<'_>, session: Session -) -> Result<(), Error> { - - let (contenturi, contentcid): (Option, String) = - match session.content { - Some(sr) => (Some(sr.content), sr.cid.to_string()), - None => (None, "".to_string()), - }; - - query!(r#" - INSERT INTO sessions( - sessionuri, sessioncid, label, created_at, contenturi, contentcid - ) VALUES ($1, $2, $3, $4, $5, $6) - "#, - session.uri, - session.cid.map_or("", |cid| cid.to_string()), - session.label.unwrap_or(""), - session.created_at.map_or("", |dt| (*dt.as_str()).to_string()), - contenturi, - contentcid, - ).execute(&mut tr).await?; - - session.other_participants.and_then(|participants| { - for participant in participants { - write_participant(tr, participant, session.uri).await? - } - }).unwrap_or(Ok(())) -} - -async fn write_participant( - tr: &mut PgTransaction<'_>, participant: Participant, sessionuri: Uri -) -> Result<(), Error> { - let (participantType, user): (String, User) = match participant { - Participant::Owner(user) => ("Owner".to_string(), user), - Participant::Added(user) => ("Participant".to_string(), user), - }; - - query!(r#" - INSERT INTO participants( - sessionuri, userdid, role - ) VALUES ($1, $2, $3) - "#, sessionuri.to_string(), user.did.to_string, participantType - ).execute(&mut tr).await?; - - Ok(()) -} - -// pub fn injest_activity(db: PgPool, activity: Activity) -> Result<(), Error> { -// todo!(); -// } -// -// let mut transaction = db.begin().await?; -// -// query!(r#" -// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) -// "#, -// session.sessionuri, session.label -// ).execute(&mut *transaction).await?; -// -// for participant in session.participants { -// query!(r#" -// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) -// "#, -// session.sessionuri, participant.userdid, participant.role.to_string() -// ).execute(&mut *transaction).await?; -// } -// -// transaction.commit().await -// } -// -// pub async fn ingest_participant(db: PgPool, participant: Participant) -> Result<(), Error> { -// query!(r#" -// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) -// "#, -// session.sessionuri, session.label -// ).execute(&mut *transaction).await?; -// -// for participant in session.participants { -// query!(r#" -// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) -// "#, -// session.sessionuri, participant.userdid, participant.role.to_string() -// ).execute(&mut *transaction).await?; -// } -// -// transaction.commit().await -// } -// - - // pub async fn add_user(&self, user: &User) -> Result<()> { - // query!(r#" - // INSERT INTO users(userdid, handle) VALUES ($1, $2) - // "#, - // user.userdid, user.handle - // ).execute(self.pool).await?; - // Ok(()) - // } - // - // pub async fn add_session(&self, session: &Session) -> Result<()> { - // } - // - // pub async fn add_participant(&self, session: Session, - // participant: Participant) -> Result { - // query!(r#" - // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) - // "#, - // session.sessionuri, participant.userdid, participant.role.to_string() - // ).execute(self.pool).await?; - // - // session.participants.push(participant); - // - // Ok(session) - // }