diff --git a/Cargo.lock b/Cargo.lock index dbe12ef..6c97f48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,9 @@ dependencies = [ "lazy-regex", "serde", "serde_json", + "sqlx", + "thiserror 2.0.12", + "time", "tracing", "tracing-subscriber", ] @@ -563,7 +566,10 @@ dependencies = [ name = "db" version = "0.1.0" dependencies = [ + "async-trait", + "atproto", "sqlx", + "thiserror 2.0.12", "tokio", ] @@ -578,6 +584,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +dependencies = [ + "powerfmt", +] + [[package]] name = "derive_builder" version = "0.20.2" @@ -1467,6 +1482,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.46" @@ -1622,6 +1643,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -2407,6 +2434,37 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.3.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" + +[[package]] +name = "time-macros" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinystr" version = "0.8.1" diff --git a/Cargo.toml b/Cargo.toml index 908c860..c029fb0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,9 +3,12 @@ resolver = "3" members = [ "api", "atproto","db", "ingestor"] [workspace.dependencies] +async-trait = "0.1.88" atproto = { path = "./atproto" } serde = "1.0.219" serde_json = "1.0.140" +sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] } +thiserror = "2.0.12" tokio = { version = "1.45.0", features = ["macros", "rt-multi-thread"] } tracing = "0.1.41" tracing-subscriber = "0.3.19" diff --git a/atproto/Cargo.toml b/atproto/Cargo.toml index 7d56725..3f5445b 100644 --- a/atproto/Cargo.toml +++ b/atproto/Cargo.toml @@ -8,5 +8,12 @@ atrium-api = { version = "0.25.3", default-features = false } lazy-regex = "3.4.1" serde.workspace = true serde_json.workspace = true +sqlx = { workspace = true, optional = true } +time = { version = "0.3.41", features = ["parsing", "formatting"] } tracing-subscriber.workspace = true tracing.workspace = true +thiserror.workspace = true + +[features] +default = [] +sqlx-support = ["dep:sqlx"] diff --git a/atproto/src/error.rs b/atproto/src/error.rs new file mode 100644 index 0000000..1db7106 --- /dev/null +++ b/atproto/src/error.rs @@ -0,0 +1,31 @@ +use thiserror::Error as ThisError; + +#[non_exhaustive] +#[derive(Debug, ThisError)] +pub enum Error { + #[error("Error while parsing")] + Parse { err: ParseError, object: String }, + #[error("Error while formatting")] + Format { err: FormatError, object: String }, +} + +#[non_exhaustive] +#[derive(Debug, ThisError)] +pub enum FormatError { + #[error("Time Parse Error: {0}")] + Datetime(#[from] time::error::Format), +} +#[non_exhaustive] +#[derive(Debug, ThisError)] +pub enum ParseError { + #[error("Time Parse Error: {0}")] + Datetime(#[from] time::error::Parse), + #[error("Length of parsed object too long, max: {max:?}, got: {got:?}.")] + Length { max: usize, got: usize }, + #[error("Currently Did is enforced, cannot use handle, {handle:?}")] + ForceDid { handle: String }, + #[error("Incorrectly formatted")] + Format, +} + +pub type Result = std::result::Result; diff --git a/atproto/src/lib.rs b/atproto/src/lib.rs index 21f8919..53c8d32 100644 --- a/atproto/src/lib.rs +++ b/atproto/src/lib.rs @@ -1,60 +1,5 @@ -use lazy_regex::regex_captures; -use core::str::FromStr; - -pub use atrium_api::types::{ - Collection, - string::{ - Nsid, - RecordKey, - AtIdentifier as Authority, - } -}; - pub mod lexicons; - -pub struct Uri { - whole: String, - // These fields could be useful in the future, - // so I'm leaving the code for them. - // authority: Authority, - // collection: Option, - // rkey: Option, -} - -impl FromStr for Uri { - type Err = &'static str; - fn from_str(uri: &str) -> Result { - if uri.len() > 8000 { - return Err("Uri too long") - } - - let Some(( - whole, unchecked_authority, unchecked_collection, unchecked_rkey - )) = regex_captures!( - r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i", - uri, - ) else { - return Err("Invalid Uri"); - }; - - // This parsing is required, but the values don't need to be used yet. - // No compute cost to use them, just storage cost - let _authority = Authority::from_str(unchecked_authority)?; - - let _collection = if unchecked_collection.is_empty() { None } - else { Some(Nsid::new(unchecked_collection.to_string())?) }; - - let _rkey = if unchecked_rkey.is_empty() { None } - else { Some(RecordKey::new(unchecked_rkey.to_string())?) }; - - // Ok(Uri{ whole: whole.to_string(), authority, collection, rkey }) - Ok(Uri { whole: whole.to_string() }) - } -} - -impl Uri { - pub fn as_str(&self) -> &str { - self.whole.as_str() - } -} - +pub mod types; +pub mod error; +#[cfg(feature = "sqlx-support")] +pub mod sqlx; diff --git a/atproto/src/sqlx.rs b/atproto/src/sqlx.rs new file mode 100644 index 0000000..0bb6683 --- /dev/null +++ b/atproto/src/sqlx.rs @@ -0,0 +1,38 @@ +use crate::types::{ + Did, + Cid, + Uri, + Handle, + Datetime, +}; + +macro_rules! implement_sqlx_for_string_type { + ($name:ident) => { + impl sqlx::Type for $name { + fn type_info() -> sqlx::postgres::PgTypeInfo { + >::type_info() + } + } + impl<'q> sqlx::Encode<'q, sqlx::Postgres> for $name { + fn encode_by_ref( + &self, buf: &mut sqlx::postgres::PgArgumentBuffer + ) -> Result { + >::encode_by_ref(&self.to_string(), buf) + } + } + impl<'r> sqlx::Decode<'r, sqlx::Postgres> for $name { + fn decode( + value: sqlx::postgres::PgValueRef<'r> + ) -> Result { + let s = >::decode(value)?; + s.parse::<$name>().map_err(|e| Box::new(e) as sqlx::error::BoxDynError) + } + } + } +} + +implement_sqlx_for_string_type!(Did); +implement_sqlx_for_string_type!(Cid); +implement_sqlx_for_string_type!(Uri); +implement_sqlx_for_string_type!(Handle); +implement_sqlx_for_string_type!(Datetime); diff --git a/atproto/src/types.rs b/atproto/src/types.rs new file mode 100644 index 0000000..26ee60d --- /dev/null +++ b/atproto/src/types.rs @@ -0,0 +1,84 @@ +use crate::error::{Error, ParseError}; + +macro_rules! basic_string_type { + ($name:ident, $regex:literal, $max_len:literal) => { + pub struct $name { value: String, } + + impl std::fmt::Display for $name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.value) + } + } + + impl std::str::FromStr for $name { + type Err = Error; + fn from_str(s: &str) -> Result { + if s.len() > $max_len { + return Err(Error::Parse { + err: ParseError::Length { max: s.len(), got: $max_len }, + object: stringify!($name).to_string(), + }); + } + + if ! lazy_regex::regex_is_match!( + $regex, + s + ) { + return Err(Error::Parse { + err: ParseError::Format, + object: stringify!($name).to_string(), + }); + } + + Ok(Self { + value: s.to_string(), + }) + } + } + } +} + +mod did; +pub use did::Did; +mod cid; +pub use cid::Cid; +mod authority; +pub use authority::Authority; +mod datetime; +pub use datetime::Datetime; +mod record_key; +pub use record_key::RecordKey; +mod uri; +pub use uri::Uri; + +basic_string_type!(Handle, + r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$", + 253 +); +basic_string_type!(Nsid, + r"^[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+(\.[a-zA-Z]([a-zA-Z0-9]{0,62})?)$", + 317 +); +basic_string_type!(Tid, + r"^[234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{12}$", + 13 +); + +pub struct StrongRef { + content: T, + cid: Cid, +} + +impl StrongRef { + pub fn get_content(&self) -> &T { + &self.content + } + + pub fn extract_content(self) -> (T, Cid) { + (self.content, self.cid) + } + + pub fn get_cid(&self) -> &Cid { + &self.cid + } +} diff --git a/atproto/src/types/authority.rs b/atproto/src/types/authority.rs new file mode 100644 index 0000000..1c76750 --- /dev/null +++ b/atproto/src/types/authority.rs @@ -0,0 +1,38 @@ +use crate::{ + types::{Did, Handle}, + error::{Error, ParseError}, +}; +use std::{ + fmt::{Display, Formatter, Result as FmtResult}, + str::FromStr, +}; + +pub enum Authority { + Did(Did), + Handle(Handle), +} + +impl Display for Authority { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", match self { + Authority::Did(did) => did.to_string(), + Authority::Handle(handle) => handle.to_string(), + }) + } +} + +impl FromStr for Authority { + type Err = Error; + fn from_str(s: &str) -> Result { + if let Ok(did) = s.parse::() { + return Ok(Authority::Did(did)); + } + if let Ok(did) = s.parse::() { + return Ok(Authority::Handle(did)); + } + Err(Error::Parse { + err: ParseError::Format, + object: "Authority".to_string(), + }) + } +} diff --git a/atproto/src/types/cid.rs b/atproto/src/types/cid.rs new file mode 100644 index 0000000..3581ad2 --- /dev/null +++ b/atproto/src/types/cid.rs @@ -0,0 +1,16 @@ +use crate::error::Error; + +pub struct Cid { value: String, } + +impl std::fmt::Display for Cid { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.value) + } +} + +impl std::str::FromStr for Cid { + type Err = Error; + fn from_str(s: &str) -> Result { + Ok(Self { value: s.to_string() }) + } +} diff --git a/atproto/src/types/datetime.rs b/atproto/src/types/datetime.rs new file mode 100644 index 0000000..6dcf234 --- /dev/null +++ b/atproto/src/types/datetime.rs @@ -0,0 +1,63 @@ +use crate::error::{Error, ParseError, FormatError}; +use time::{ + UtcDateTime, + format_description::well_known::{Rfc3339, Iso8601}, +}; +use std::{ + fmt::{Display, Formatter, Result as FmtResult}, + str::FromStr, +}; +pub use time::{Date, Time}; + +pub struct Datetime { + time: UtcDateTime, + derived_string: String, +} + +impl Datetime { + pub fn now() -> Result { + Datetime::from_utc(UtcDateTime::now()) + } + + pub fn new(date: Date, time: Time) -> Result { + Datetime::from_utc(UtcDateTime::new(date, time)) + } + + fn from_utc(utc: UtcDateTime) -> Result { + Ok(Datetime { + time: utc, + derived_string: match utc.format(&Rfc3339) { + Ok(ds) => ds, + Err(e) => return Err(Error::Format { + err: FormatError::Datetime(e), + object: "Datetime".to_string(), + }), + }, + }) + } +} + +impl Display for Datetime { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", self.derived_string) + } +} + +impl FromStr for Datetime { + type Err = Error; + + fn from_str(s: &str) -> Result { + // Parse as both Rfc3339 and Iso8601 to ensure intersection + let dt_rfc3339 = UtcDateTime::parse(s, &Rfc3339); + let dt_iso8601 = UtcDateTime::parse(s, &Iso8601::DEFAULT); + + let datetime = dt_iso8601 + .and(dt_rfc3339) + .map_err(|e| Error::Parse { + err: ParseError::Datetime(e), + object: "Datetime".to_string(), + })?; + + Ok(Datetime { time: datetime, derived_string: s.to_string() }) + } +} diff --git a/atproto/src/types/did.rs b/atproto/src/types/did.rs new file mode 100644 index 0000000..6c91db2 --- /dev/null +++ b/atproto/src/types/did.rs @@ -0,0 +1,72 @@ +use crate::error::{Error, ParseError}; +use std::{ + fmt::{Display, Formatter, Result as FmtResult}, + str::FromStr, +}; +use lazy_regex::regex_captures; + +enum DidMethod { + Web, + Plc, +} +impl Display for DidMethod { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", match self { + DidMethod::Web => String::from("web"), + DidMethod::Plc => String::from("plc"), + }) + } +} +impl FromStr for DidMethod { + type Err = Error; + fn from_str(s: &str) -> Result { + match s { + "web" => Ok(DidMethod::Web), + "plc" => Ok(DidMethod::Plc), + _ => Err(Error::Parse { + err: ParseError::Format, + object: "DidMethod".to_string(), + }), + } + } +} + +pub struct Did { + method: DidMethod, + identifier: String, +} + +impl Display for Did { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "did:{}:{}", self.method, self.identifier) + } +} + +impl FromStr for Did { + type Err = Error; + fn from_str(s: &str) -> Result { + if s.len() > 2048 { + return Err(Error::Parse { + err: ParseError::Length { max: s.len(), got: 2048 }, + object: "Did".to_string(), + }); + } + + let Some(( + _whole, unchecked_method, identifier + )) = regex_captures!( + r"^did:([a-z]+):([a-zA-Z0-9._:%-]*[a-zA-Z0-9._-])$", + s, + ) else { + return Err(Error::Parse { + err: ParseError::Format, + object: "Did".to_string(), + }); + }; + + Ok(Self { + method: unchecked_method.parse::()?, + identifier: identifier.to_string(), + }) + } +} diff --git a/atproto/src/types/record_key.rs b/atproto/src/types/record_key.rs new file mode 100644 index 0000000..7368b53 --- /dev/null +++ b/atproto/src/types/record_key.rs @@ -0,0 +1,63 @@ +use crate::{ + types::{Nsid, Tid}, + error::{Error, ParseError}, +}; +use std::{ + fmt::{Display, Formatter, Result as FmtResult}, + str::FromStr, +}; +use lazy_regex::regex_is_match; + +pub enum RecordKey { + Tid(Tid), + Nsid(Nsid), + Literal(String), + Any(String), +} +impl Display for RecordKey { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", match self { + RecordKey::Tid(tid) => tid.to_string(), + RecordKey::Nsid(nsid) => nsid.to_string(), + RecordKey::Literal(literal) => literal.to_string(), + RecordKey::Any(any) => any.to_string(), + }) + } +} +impl FromStr for RecordKey { + type Err = Error; + fn from_str(s: &str) -> Result { + if s.len() > 512 { + return Err(Error::Parse { + err: ParseError::Length { max: 512, got: s.len() }, + object: "RecordKey".to_string(), + }); + } + + if !( + regex_is_match!( + r"^[a-zA-Z0-9`.-_:~]+$", + s, + ) + && s != "." + && s != ".." + ) { + return Err(Error::Parse { + err: ParseError::Format, + object: "RecordKey".to_string(), + }); + } + + // Valid record key, now decide type + if s.starts_with("literal:") { + return Ok(RecordKey::Literal(s.to_string())); + } + if let Ok(tid) = s.parse::() { + return Ok(RecordKey::Tid(tid)); + } + if let Ok(nsid) = s.parse::() { + return Ok(RecordKey::Nsid(nsid)); + } + Ok(RecordKey::Any(s.to_string())) + } +} diff --git a/atproto/src/types/uri.rs b/atproto/src/types/uri.rs new file mode 100644 index 0000000..843ee2b --- /dev/null +++ b/atproto/src/types/uri.rs @@ -0,0 +1,79 @@ +use crate::{ + types::{Did, Authority, Nsid, RecordKey}, + error::{Error, ParseError}, +}; +use std::{ + fmt::{Display, Formatter, Result as FmtResult}, + str::FromStr, +}; +use lazy_regex::regex_captures; + +pub struct Uri { + authority: Did, + collection: Option, + rkey: Option, +} + +impl Display for Uri { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "at://{}", self.authority)?; + + if let Some(collection) = &self.collection { + write!(f, "/{}", collection)?; + + if let Some(rkey) = &self.rkey { + write!(f, "/{}", rkey)?; + } + } + + Ok(()) + } +} + +impl FromStr for Uri { + type Err = Error; + fn from_str(s: &str) -> Result { + if s.len() > 8000 { + return Err(Error::Parse { + err: ParseError::Length { max: 8000, got: s.len() }, + object: "Did".to_string(), + }); + } + + let Some(( + _whole, unchecked_authority, unchecked_collection, unchecked_rkey + )) = regex_captures!( + r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i", + s, + ) else { + return Err(Error::Parse { + err: ParseError::Format, + object: "Uri".to_string(), + }); + }; + + let did = match Authority::from_str(unchecked_authority)? { + Authority::Handle(h) => + return Err(Error::Parse { + err: ParseError::ForceDid { handle: h.to_string() }, + object: "Uri".to_string(), + }), + Authority::Did(d) => d, + }; + + let collection = if unchecked_collection.is_empty() { None } + else { Some(unchecked_collection.parse::()?) }; + + let rkey = if unchecked_rkey.is_empty() { None } + else { Some(unchecked_rkey.parse::()?) }; + + Ok(Uri { authority: did, collection, rkey }) + } +} + +impl Uri { + pub fn authority_as_did(&self) -> &Did { + &self.authority + } +} + diff --git a/db/Cargo.toml b/db/Cargo.toml index ded38de..a07bead 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -4,5 +4,8 @@ version = "0.1.0" edition = "2024" [dependencies] -sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] } -tokio = "1.45.0" +async-trait.workspace = true +atproto = { workspace = true, features = ["sqlx-support"] } +sqlx.workspace = true +thiserror.workspace = true +tokio.workspace = true diff --git a/db/migrations/20250612223204_initial_schema.sql b/db/migrations/20250612223204_initial_schema.sql new file mode 100644 index 0000000..b95abd4 --- /dev/null +++ b/db/migrations/20250612223204_initial_schema.sql @@ -0,0 +1,45 @@ +-- Add migration script here + +CREATE EXTENSION IF NOT EXISTS pg_trgm; + +CREATE TABLE actor ( + did VARCHAR PRIMARY KEY, + handle VARCHAR UNIQUE, + indexed_at VARCHAR NOT NULL +); +CREATE INDEX actor_handle_trgm_idx ON actor USING gist (handle gist_trgm_ops); + +CREATE TABLE session ( + uri VARCHAR PRIMARY KEY, + cid VARCHAR NOT NULL, + owner VARCHAR NOT NULL, + + content VARCHAR NOT NULL, + contentcid VARCHAR NOT NULL, + label VARCHAR, + -- Participants in participant + + created_at VARCHAR, + indexed_at VARCHAR NOT NULL, + sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL +); + +CREATE TABLE activity ( + uri VARCHAR PRIMARY KEY, + cid VARCHAR NOT NULL, + + session VARCHAR, + sessioncid VARCHAR, + -- Progress in progress + + performed_at VARCHAR, + created_at VARCHAR, + indexed_at VARCHAR NOT NULL, + sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL +); + +CREATE TABLE participant ( + participantdid VARCHAR NOT NULL, + sessionuri VARCHAR NOT NULL, + role VARCHAR NOT NULL +); diff --git a/db/src/connection.rs b/db/src/connection.rs deleted file mode 100644 index ba883cc..0000000 --- a/db/src/connection.rs +++ /dev/null @@ -1,99 +0,0 @@ -use sqlx::{ - query, - Database, - Pool, - Postgres, - pool::PoolOptions, - postgres::{ - PgConnectOptions, - PgSslMode, - }, - Result, -}; -use std::string::ToString; - -pub struct Db { - pool: Pool -} - -#[non_exhaustive] -enum Role { - Owner, - Participant -} - -impl ToString for Role { - fn to_string(&self) -> String { - match *self { - Role::Owner => "owner".to_string(), - Role::Participant => "participant".to_string(), - } - } -} - -pub struct Session { - sessionuri: Uri, - label: Option, - participants: Vec, -} - -impl Db { - async fn connect() -> Result { - let conn = PgConnectOptions::new() - .host("localhost") - .port(5432) - .username("postgres") - .password("062217") - .database("anisky") - .ssl_mode(PgSslMode::Disable); - - let pool = match PoolOptions::new().connect_with(conn).await { - Ok(p) => p, - Err(e) => return Err(e), - }; - - Ok(Db { pool }) - } - // - // pub async fn add_user(&self, user: &User) -> Result<()> { - // query!(r#" - // INSERT INTO users(userdid, handle) VALUES ($1, $2) - // "#, - // user.userdid, user.handle - // ).execute(self.pool).await?; - // Ok(()) - // } - // - // pub async fn add_session(&self, session: &Session) -> Result<()> { - // let mut transaction = self.pool.begin().await?; - // - // query!(r#" - // INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) - // "#, - // session.sessionuri, session.label - // ).execute(&mut *transaction).await?; - // - // for participant in session.participants { - // query!(r#" - // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) - // "#, - // session.sessionuri, participant.userdid, participant.role.to_string() - // ).execute(&mut *transaction).await?; - // } - // - // transaction.commit().await - // } - // - // pub async fn add_participant(&self, session: Session, - // participant: Participant) -> Result { - // query!(r#" - // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) - // "#, - // session.sessionuri, participant.userdid, participant.role.to_string() - // ).execute(self.pool).await?; - // - // session.participants.push(participant); - // - // Ok(session) - // } -} diff --git a/db/src/error.rs b/db/src/error.rs new file mode 100644 index 0000000..1c330bc --- /dev/null +++ b/db/src/error.rs @@ -0,0 +1,10 @@ +#[non_exhaustive] +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Database Implementation Error: {0}")] + Backend(#[from] sqlx::Error), + #[error("AT Protocol Implementation Error: {0}")] + Atproto(#[from] atproto::error::Error), +} + +pub type Result = std::result::Result; diff --git a/db/src/interfaces.rs b/db/src/interfaces.rs deleted file mode 100644 index d3629e1..0000000 --- a/db/src/interfaces.rs +++ /dev/null @@ -1,15 +0,0 @@ -use atproto::{ - Did, - Uri, -}; - -pub struct User { - userdid: Did, - handle: Handle, -} - -struct Participant { - participantdid: Did, - role: Role, -} - diff --git a/db/src/interfaces/direct.rs b/db/src/interfaces/direct.rs new file mode 100644 index 0000000..d0b4ddc --- /dev/null +++ b/db/src/interfaces/direct.rs @@ -0,0 +1,2 @@ +pub mod types; +pub mod functions; diff --git a/db/src/interfaces/direct/functions.rs b/db/src/interfaces/direct/functions.rs new file mode 100644 index 0000000..a61ab37 --- /dev/null +++ b/db/src/interfaces/direct/functions.rs @@ -0,0 +1,116 @@ +use atproto::types::{ + Datetime, + Uri, +}; +use crate::{ + Error, + interfaces::direct::types::{ + Activity, + Participant, + Session, + User + }, +}; +use sqlx::{ + Transaction, + Postgres, + PgPool, + query, +}; + +pub async fn ingest_activity( + db: PgPool, activity: Activity +) -> Result<(), Error> { + let mut transaction = db.begin().await?; + write_activity(&mut transaction, activity).await?; + transaction.commit().await.map_err(Error::Backend) +} + +pub async fn ingest_session( + db: PgPool, session: Session +) -> Result<(), Error> { + let mut transaction = db.begin().await?; + write_session(&mut transaction, session).await?; + transaction.commit().await.map_err(Error::Backend) +} + +async fn write_activity( + tr: &mut Transaction<'_, Postgres>, activity: Activity +) -> Result<(), Error> { + let (sessionuri, sessioncid) = match activity.session { + Some(sr) => { + let (session, cid) = sr.extract_content(); + let sessionuri = session.uri.to_string(); + write_session(tr, session).await?; + (sessionuri, cid.to_string()) + } + None => ("".to_string(), "".to_string()), + }; + + query!(r#" + INSERT INTO + activity(uri, cid, session, sessioncid, performed_at, created_at, indexed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7) + "#, + &activity.uri.to_string(), + &activity.cid.to_string(), + sessionuri, + sessioncid, + activity.performed_at.map(|dt| dt.to_string()), + activity.created_at.map(|dt| dt.to_string()), + &Datetime::now()?.to_string(), + ).execute(&mut **tr).await?; + + // TODO: Handle Progress + + Ok(()) +} + + +async fn write_session( + tr: &mut Transaction<'_, Postgres>, session: Session +) -> Result<(), Error> { + query!(r#" + INSERT INTO + session(uri, cid, owner, content, contentcid, label, created_at, indexed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + "#, + &session.uri.to_string(), + &session.cid.to_string(), + &session.uri.authority_as_did().to_string(), + &session.content.get_content().to_string(), + &session.content.get_cid().to_string(), + session.label, + session.created_at.map(|dt| dt.to_string()), + &Datetime::now()?.to_string() + ).execute(&mut **tr).await?; + + if let Some(participants) = session.other_participants { + for participant in participants { + write_participant(tr, &participant, &session.uri).await? + } + } + + Ok(()) +} + +async fn write_participant( + tr: &mut Transaction<'_, Postgres>, participant: &Participant, sessionuri: &Uri +) -> Result<(), Error> { + let (participant_type, user): (String, &User) = match participant { + Participant::Owner(user) => ("Owner".to_string(), user), + Participant::Added(user) => ("Participant".to_string(), user), + }; + + query!(r#" + INSERT INTO + participant(participantdid, sessionuri, role) + VALUES ($1, $2, $3) + "#, + user.did.to_string(), + sessionuri.to_string(), + participant_type, + ).execute(&mut **tr).await?; + + Ok(()) +} diff --git a/db/src/interfaces/direct/types.rs b/db/src/interfaces/direct/types.rs new file mode 100644 index 0000000..4a3ea65 --- /dev/null +++ b/db/src/interfaces/direct/types.rs @@ -0,0 +1,58 @@ +use atproto::types::{ + Cid, + Uri, + Datetime, + Did, + StrongRef, + Handle, +}; +use std::fmt::{Display, Formatter, Result as FmtResult}; + +pub struct Activity { + pub uri: Uri, + pub cid: Cid, + + pub session: Option>, + pub progress: Option, + pub performed_at: Option, + pub created_at: Option, +} + +pub struct Session { + pub uri: Uri, + pub cid: Cid, + + pub content: StrongRef, + pub label: Option, + pub created_at: Option, + pub other_participants: Option>, +} + +pub struct User { + pub did: Did, + pub handle: Option, +} + +#[non_exhaustive] +pub enum Participant { + Owner(User), + Added(User), +} + +#[non_exhaustive] +pub enum Content { + UnknownContent +} + +impl Display for Content { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + write!(f, "{}", match self { + Content::UnknownContent => "UnknownContentType", + }) + } +} + +#[non_exhaustive] +pub enum Progress { + UnknownProgress +} diff --git a/db/src/interfaces/mod.rs b/db/src/interfaces/mod.rs new file mode 100644 index 0000000..afdae97 --- /dev/null +++ b/db/src/interfaces/mod.rs @@ -0,0 +1 @@ +pub mod direct; diff --git a/db/src/lib.rs b/db/src/lib.rs index 82e9c13..f696f5c 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -1 +1,4 @@ -pub struct db; +pub mod interfaces; +pub mod error; + +pub use crate::error::Error; diff --git a/flake.lock b/flake.lock index 8f3aa7b..69e701e 100644 --- a/flake.lock +++ b/flake.lock @@ -41,11 +41,11 @@ "nixpkgs": "nixpkgs_2" }, "locked": { - "lastModified": 1746585402, - "narHash": "sha256-Pf+ufu6bYNA1+KQKHnGMNEfTwpD9ZIcAeLoE2yPWIP0=", + "lastModified": 1749695868, + "narHash": "sha256-debjTLOyqqsYOUuUGQsAHskFXH5+Kx2t3dOo/FCoNRA=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "72dd969389583664f87aa348b3458f2813693617", + "rev": "55f914d5228b5c8120e9e0f9698ed5b7214d09cd", "type": "github" }, "original": { diff --git a/flake.nix b/flake.nix index 943945b..48d94bf 100644 --- a/flake.nix +++ b/flake.nix @@ -42,6 +42,7 @@ packages = (with pkgs; [ # The package provided by our custom overlay. Includes cargo, Clippy, cargo-fmt, # rustdoc, rustfmt, and other tools. + sqlx-cli rustToolchain ]) ++ pkgs.lib.optionals pkgs.stdenv.isDarwin (with pkgs; [ libiconv ]); }; diff --git a/ingestor/Cargo.toml b/ingestor/Cargo.toml index a177581..7da302c 100644 --- a/ingestor/Cargo.toml +++ b/ingestor/Cargo.toml @@ -5,7 +5,7 @@ edition = "2024" [dependencies] anyhow = "1.0.98" -async-trait = "0.1.88" +async-trait.workspace = true atproto.workspace = true rocketman = "0.2.0" serde.workspace = true