diff --git a/Cargo.lock b/Cargo.lock index 6c97f48..dbe12ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,9 +118,6 @@ dependencies = [ "lazy-regex", "serde", "serde_json", - "sqlx", - "thiserror 2.0.12", - "time", "tracing", "tracing-subscriber", ] @@ -566,10 +563,7 @@ dependencies = [ name = "db" version = "0.1.0" dependencies = [ - "async-trait", - "atproto", "sqlx", - "thiserror 2.0.12", "tokio", ] @@ -584,15 +578,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "deranged" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" -dependencies = [ - "powerfmt", -] - [[package]] name = "derive_builder" version = "0.20.2" @@ -1482,12 +1467,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "num-conv" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" - [[package]] name = "num-integer" version = "0.1.46" @@ -1643,12 +1622,6 @@ dependencies = [ "zerovec", ] -[[package]] -name = "powerfmt" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" - [[package]] name = "ppv-lite86" version = "0.2.21" @@ -2434,37 +2407,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "time" -version = "0.3.41" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" -dependencies = [ - "deranged", - "itoa", - "num-conv", - "powerfmt", - "serde", - "time-core", - "time-macros", -] - -[[package]] -name = "time-core" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" - -[[package]] -name = "time-macros" -version = "0.2.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" -dependencies = [ - "num-conv", - "time-core", -] - [[package]] name = "tinystr" version = "0.8.1" diff --git a/Cargo.toml b/Cargo.toml index c029fb0..908c860 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,12 +3,9 @@ resolver = "3" members = [ "api", "atproto","db", "ingestor"] [workspace.dependencies] -async-trait = "0.1.88" atproto = { path = "./atproto" } serde = "1.0.219" serde_json = "1.0.140" -sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] } -thiserror = "2.0.12" tokio = { version = "1.45.0", features = ["macros", "rt-multi-thread"] } tracing = "0.1.41" tracing-subscriber = "0.3.19" diff --git a/atproto/Cargo.toml b/atproto/Cargo.toml index 3f5445b..7d56725 100644 --- a/atproto/Cargo.toml +++ b/atproto/Cargo.toml @@ -8,12 +8,5 @@ atrium-api = { version = "0.25.3", default-features = false } lazy-regex = "3.4.1" serde.workspace = true serde_json.workspace = true -sqlx = { workspace = true, optional = true } -time = { version = "0.3.41", features = ["parsing", "formatting"] } tracing-subscriber.workspace = true tracing.workspace = true -thiserror.workspace = true - -[features] -default = [] -sqlx-support = ["dep:sqlx"] diff --git a/atproto/src/error.rs b/atproto/src/error.rs deleted file mode 100644 index 1db7106..0000000 --- a/atproto/src/error.rs +++ /dev/null @@ -1,31 +0,0 @@ -use thiserror::Error as ThisError; - -#[non_exhaustive] -#[derive(Debug, ThisError)] -pub enum Error { - #[error("Error while parsing")] - Parse { err: ParseError, object: String }, - #[error("Error while formatting")] - Format { err: FormatError, object: String }, -} - -#[non_exhaustive] -#[derive(Debug, ThisError)] -pub enum FormatError { - #[error("Time Parse Error: {0}")] - Datetime(#[from] time::error::Format), -} -#[non_exhaustive] -#[derive(Debug, ThisError)] -pub enum ParseError { - #[error("Time Parse Error: {0}")] - Datetime(#[from] time::error::Parse), - #[error("Length of parsed object too long, max: {max:?}, got: {got:?}.")] - Length { max: usize, got: usize }, - #[error("Currently Did is enforced, cannot use handle, {handle:?}")] - ForceDid { handle: String }, - #[error("Incorrectly formatted")] - Format, -} - -pub type Result = std::result::Result; diff --git a/atproto/src/lib.rs b/atproto/src/lib.rs index 53c8d32..21f8919 100644 --- a/atproto/src/lib.rs +++ b/atproto/src/lib.rs @@ -1,5 +1,60 @@ +use lazy_regex::regex_captures; +use core::str::FromStr; + +pub use atrium_api::types::{ + Collection, + string::{ + Nsid, + RecordKey, + AtIdentifier as Authority, + } +}; + pub mod lexicons; -pub mod types; -pub mod error; -#[cfg(feature = "sqlx-support")] -pub mod sqlx; + +pub struct Uri { + whole: String, + // These fields could be useful in the future, + // so I'm leaving the code for them. + // authority: Authority, + // collection: Option, + // rkey: Option, +} + +impl FromStr for Uri { + type Err = &'static str; + fn from_str(uri: &str) -> Result { + if uri.len() > 8000 { + return Err("Uri too long") + } + + let Some(( + whole, unchecked_authority, unchecked_collection, unchecked_rkey + )) = regex_captures!( + r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i", + uri, + ) else { + return Err("Invalid Uri"); + }; + + // This parsing is required, but the values don't need to be used yet. + // No compute cost to use them, just storage cost + let _authority = Authority::from_str(unchecked_authority)?; + + let _collection = if unchecked_collection.is_empty() { None } + else { Some(Nsid::new(unchecked_collection.to_string())?) }; + + let _rkey = if unchecked_rkey.is_empty() { None } + else { Some(RecordKey::new(unchecked_rkey.to_string())?) }; + + // Ok(Uri{ whole: whole.to_string(), authority, collection, rkey }) + Ok(Uri { whole: whole.to_string() }) + } +} + +impl Uri { + pub fn as_str(&self) -> &str { + self.whole.as_str() + } +} + diff --git a/atproto/src/sqlx.rs b/atproto/src/sqlx.rs deleted file mode 100644 index 0bb6683..0000000 --- a/atproto/src/sqlx.rs +++ /dev/null @@ -1,38 +0,0 @@ -use crate::types::{ - Did, - Cid, - Uri, - Handle, - Datetime, -}; - -macro_rules! implement_sqlx_for_string_type { - ($name:ident) => { - impl sqlx::Type for $name { - fn type_info() -> sqlx::postgres::PgTypeInfo { - >::type_info() - } - } - impl<'q> sqlx::Encode<'q, sqlx::Postgres> for $name { - fn encode_by_ref( - &self, buf: &mut sqlx::postgres::PgArgumentBuffer - ) -> Result { - >::encode_by_ref(&self.to_string(), buf) - } - } - impl<'r> sqlx::Decode<'r, sqlx::Postgres> for $name { - fn decode( - value: sqlx::postgres::PgValueRef<'r> - ) -> Result { - let s = >::decode(value)?; - s.parse::<$name>().map_err(|e| Box::new(e) as sqlx::error::BoxDynError) - } - } - } -} - -implement_sqlx_for_string_type!(Did); -implement_sqlx_for_string_type!(Cid); -implement_sqlx_for_string_type!(Uri); -implement_sqlx_for_string_type!(Handle); -implement_sqlx_for_string_type!(Datetime); diff --git a/atproto/src/types.rs b/atproto/src/types.rs deleted file mode 100644 index 26ee60d..0000000 --- a/atproto/src/types.rs +++ /dev/null @@ -1,84 +0,0 @@ -use crate::error::{Error, ParseError}; - -macro_rules! basic_string_type { - ($name:ident, $regex:literal, $max_len:literal) => { - pub struct $name { value: String, } - - impl std::fmt::Display for $name { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.value) - } - } - - impl std::str::FromStr for $name { - type Err = Error; - fn from_str(s: &str) -> Result { - if s.len() > $max_len { - return Err(Error::Parse { - err: ParseError::Length { max: s.len(), got: $max_len }, - object: stringify!($name).to_string(), - }); - } - - if ! lazy_regex::regex_is_match!( - $regex, - s - ) { - return Err(Error::Parse { - err: ParseError::Format, - object: stringify!($name).to_string(), - }); - } - - Ok(Self { - value: s.to_string(), - }) - } - } - } -} - -mod did; -pub use did::Did; -mod cid; -pub use cid::Cid; -mod authority; -pub use authority::Authority; -mod datetime; -pub use datetime::Datetime; -mod record_key; -pub use record_key::RecordKey; -mod uri; -pub use uri::Uri; - -basic_string_type!(Handle, - r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$", - 253 -); -basic_string_type!(Nsid, - r"^[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+(\.[a-zA-Z]([a-zA-Z0-9]{0,62})?)$", - 317 -); -basic_string_type!(Tid, - r"^[234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{12}$", - 13 -); - -pub struct StrongRef { - content: T, - cid: Cid, -} - -impl StrongRef { - pub fn get_content(&self) -> &T { - &self.content - } - - pub fn extract_content(self) -> (T, Cid) { - (self.content, self.cid) - } - - pub fn get_cid(&self) -> &Cid { - &self.cid - } -} diff --git a/atproto/src/types/authority.rs b/atproto/src/types/authority.rs deleted file mode 100644 index 1c76750..0000000 --- a/atproto/src/types/authority.rs +++ /dev/null @@ -1,38 +0,0 @@ -use crate::{ - types::{Did, Handle}, - error::{Error, ParseError}, -}; -use std::{ - fmt::{Display, Formatter, Result as FmtResult}, - str::FromStr, -}; - -pub enum Authority { - Did(Did), - Handle(Handle), -} - -impl Display for Authority { - fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - write!(f, "{}", match self { - Authority::Did(did) => did.to_string(), - Authority::Handle(handle) => handle.to_string(), - }) - } -} - -impl FromStr for Authority { - type Err = Error; - fn from_str(s: &str) -> Result { - if let Ok(did) = s.parse::() { - return Ok(Authority::Did(did)); - } - if let Ok(did) = s.parse::() { - return Ok(Authority::Handle(did)); - } - Err(Error::Parse { - err: ParseError::Format, - object: "Authority".to_string(), - }) - } -} diff --git a/atproto/src/types/cid.rs b/atproto/src/types/cid.rs deleted file mode 100644 index 3581ad2..0000000 --- a/atproto/src/types/cid.rs +++ /dev/null @@ -1,16 +0,0 @@ -use crate::error::Error; - -pub struct Cid { value: String, } - -impl std::fmt::Display for Cid { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.value) - } -} - -impl std::str::FromStr for Cid { - type Err = Error; - fn from_str(s: &str) -> Result { - Ok(Self { value: s.to_string() }) - } -} diff --git a/atproto/src/types/datetime.rs b/atproto/src/types/datetime.rs deleted file mode 100644 index 6dcf234..0000000 --- a/atproto/src/types/datetime.rs +++ /dev/null @@ -1,63 +0,0 @@ -use crate::error::{Error, ParseError, FormatError}; -use time::{ - UtcDateTime, - format_description::well_known::{Rfc3339, Iso8601}, -}; -use std::{ - fmt::{Display, Formatter, Result as FmtResult}, - str::FromStr, -}; -pub use time::{Date, Time}; - -pub struct Datetime { - time: UtcDateTime, - derived_string: String, -} - -impl Datetime { - pub fn now() -> Result { - Datetime::from_utc(UtcDateTime::now()) - } - - pub fn new(date: Date, time: Time) -> Result { - Datetime::from_utc(UtcDateTime::new(date, time)) - } - - fn from_utc(utc: UtcDateTime) -> Result { - Ok(Datetime { - time: utc, - derived_string: match utc.format(&Rfc3339) { - Ok(ds) => ds, - Err(e) => return Err(Error::Format { - err: FormatError::Datetime(e), - object: "Datetime".to_string(), - }), - }, - }) - } -} - -impl Display for Datetime { - fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - write!(f, "{}", self.derived_string) - } -} - -impl FromStr for Datetime { - type Err = Error; - - fn from_str(s: &str) -> Result { - // Parse as both Rfc3339 and Iso8601 to ensure intersection - let dt_rfc3339 = UtcDateTime::parse(s, &Rfc3339); - let dt_iso8601 = UtcDateTime::parse(s, &Iso8601::DEFAULT); - - let datetime = dt_iso8601 - .and(dt_rfc3339) - .map_err(|e| Error::Parse { - err: ParseError::Datetime(e), - object: "Datetime".to_string(), - })?; - - Ok(Datetime { time: datetime, derived_string: s.to_string() }) - } -} diff --git a/atproto/src/types/did.rs b/atproto/src/types/did.rs deleted file mode 100644 index 6c91db2..0000000 --- a/atproto/src/types/did.rs +++ /dev/null @@ -1,72 +0,0 @@ -use crate::error::{Error, ParseError}; -use std::{ - fmt::{Display, Formatter, Result as FmtResult}, - str::FromStr, -}; -use lazy_regex::regex_captures; - -enum DidMethod { - Web, - Plc, -} -impl Display for DidMethod { - fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - write!(f, "{}", match self { - DidMethod::Web => String::from("web"), - DidMethod::Plc => String::from("plc"), - }) - } -} -impl FromStr for DidMethod { - type Err = Error; - fn from_str(s: &str) -> Result { - match s { - "web" => Ok(DidMethod::Web), - "plc" => Ok(DidMethod::Plc), - _ => Err(Error::Parse { - err: ParseError::Format, - object: "DidMethod".to_string(), - }), - } - } -} - -pub struct Did { - method: DidMethod, - identifier: String, -} - -impl Display for Did { - fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - write!(f, "did:{}:{}", self.method, self.identifier) - } -} - -impl FromStr for Did { - type Err = Error; - fn from_str(s: &str) -> Result { - if s.len() > 2048 { - return Err(Error::Parse { - err: ParseError::Length { max: s.len(), got: 2048 }, - object: "Did".to_string(), - }); - } - - let Some(( - _whole, unchecked_method, identifier - )) = regex_captures!( - r"^did:([a-z]+):([a-zA-Z0-9._:%-]*[a-zA-Z0-9._-])$", - s, - ) else { - return Err(Error::Parse { - err: ParseError::Format, - object: "Did".to_string(), - }); - }; - - Ok(Self { - method: unchecked_method.parse::()?, - identifier: identifier.to_string(), - }) - } -} diff --git a/atproto/src/types/record_key.rs b/atproto/src/types/record_key.rs deleted file mode 100644 index 7368b53..0000000 --- a/atproto/src/types/record_key.rs +++ /dev/null @@ -1,63 +0,0 @@ -use crate::{ - types::{Nsid, Tid}, - error::{Error, ParseError}, -}; -use std::{ - fmt::{Display, Formatter, Result as FmtResult}, - str::FromStr, -}; -use lazy_regex::regex_is_match; - -pub enum RecordKey { - Tid(Tid), - Nsid(Nsid), - Literal(String), - Any(String), -} -impl Display for RecordKey { - fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - write!(f, "{}", match self { - RecordKey::Tid(tid) => tid.to_string(), - RecordKey::Nsid(nsid) => nsid.to_string(), - RecordKey::Literal(literal) => literal.to_string(), - RecordKey::Any(any) => any.to_string(), - }) - } -} -impl FromStr for RecordKey { - type Err = Error; - fn from_str(s: &str) -> Result { - if s.len() > 512 { - return Err(Error::Parse { - err: ParseError::Length { max: 512, got: s.len() }, - object: "RecordKey".to_string(), - }); - } - - if !( - regex_is_match!( - r"^[a-zA-Z0-9`.-_:~]+$", - s, - ) - && s != "." - && s != ".." - ) { - return Err(Error::Parse { - err: ParseError::Format, - object: "RecordKey".to_string(), - }); - } - - // Valid record key, now decide type - if s.starts_with("literal:") { - return Ok(RecordKey::Literal(s.to_string())); - } - if let Ok(tid) = s.parse::() { - return Ok(RecordKey::Tid(tid)); - } - if let Ok(nsid) = s.parse::() { - return Ok(RecordKey::Nsid(nsid)); - } - Ok(RecordKey::Any(s.to_string())) - } -} diff --git a/atproto/src/types/uri.rs b/atproto/src/types/uri.rs deleted file mode 100644 index 843ee2b..0000000 --- a/atproto/src/types/uri.rs +++ /dev/null @@ -1,79 +0,0 @@ -use crate::{ - types::{Did, Authority, Nsid, RecordKey}, - error::{Error, ParseError}, -}; -use std::{ - fmt::{Display, Formatter, Result as FmtResult}, - str::FromStr, -}; -use lazy_regex::regex_captures; - -pub struct Uri { - authority: Did, - collection: Option, - rkey: Option, -} - -impl Display for Uri { - fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - write!(f, "at://{}", self.authority)?; - - if let Some(collection) = &self.collection { - write!(f, "/{}", collection)?; - - if let Some(rkey) = &self.rkey { - write!(f, "/{}", rkey)?; - } - } - - Ok(()) - } -} - -impl FromStr for Uri { - type Err = Error; - fn from_str(s: &str) -> Result { - if s.len() > 8000 { - return Err(Error::Parse { - err: ParseError::Length { max: 8000, got: s.len() }, - object: "Did".to_string(), - }); - } - - let Some(( - _whole, unchecked_authority, unchecked_collection, unchecked_rkey - )) = regex_captures!( - r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i", - s, - ) else { - return Err(Error::Parse { - err: ParseError::Format, - object: "Uri".to_string(), - }); - }; - - let did = match Authority::from_str(unchecked_authority)? { - Authority::Handle(h) => - return Err(Error::Parse { - err: ParseError::ForceDid { handle: h.to_string() }, - object: "Uri".to_string(), - }), - Authority::Did(d) => d, - }; - - let collection = if unchecked_collection.is_empty() { None } - else { Some(unchecked_collection.parse::()?) }; - - let rkey = if unchecked_rkey.is_empty() { None } - else { Some(unchecked_rkey.parse::()?) }; - - Ok(Uri { authority: did, collection, rkey }) - } -} - -impl Uri { - pub fn authority_as_did(&self) -> &Did { - &self.authority - } -} - diff --git a/db/Cargo.toml b/db/Cargo.toml index a07bead..ded38de 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -4,8 +4,5 @@ version = "0.1.0" edition = "2024" [dependencies] -async-trait.workspace = true -atproto = { workspace = true, features = ["sqlx-support"] } -sqlx.workspace = true -thiserror.workspace = true -tokio.workspace = true +sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] } +tokio = "1.45.0" diff --git a/db/migrations/20250612223204_initial_schema.sql b/db/migrations/20250612223204_initial_schema.sql deleted file mode 100644 index b95abd4..0000000 --- a/db/migrations/20250612223204_initial_schema.sql +++ /dev/null @@ -1,45 +0,0 @@ --- Add migration script here - -CREATE EXTENSION IF NOT EXISTS pg_trgm; - -CREATE TABLE actor ( - did VARCHAR PRIMARY KEY, - handle VARCHAR UNIQUE, - indexed_at VARCHAR NOT NULL -); -CREATE INDEX actor_handle_trgm_idx ON actor USING gist (handle gist_trgm_ops); - -CREATE TABLE session ( - uri VARCHAR PRIMARY KEY, - cid VARCHAR NOT NULL, - owner VARCHAR NOT NULL, - - content VARCHAR NOT NULL, - contentcid VARCHAR NOT NULL, - label VARCHAR, - -- Participants in participant - - created_at VARCHAR, - indexed_at VARCHAR NOT NULL, - sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL -); - -CREATE TABLE activity ( - uri VARCHAR PRIMARY KEY, - cid VARCHAR NOT NULL, - - session VARCHAR, - sessioncid VARCHAR, - -- Progress in progress - - performed_at VARCHAR, - created_at VARCHAR, - indexed_at VARCHAR NOT NULL, - sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL -); - -CREATE TABLE participant ( - participantdid VARCHAR NOT NULL, - sessionuri VARCHAR NOT NULL, - role VARCHAR NOT NULL -); diff --git a/db/src/connection.rs b/db/src/connection.rs new file mode 100644 index 0000000..ba883cc --- /dev/null +++ b/db/src/connection.rs @@ -0,0 +1,99 @@ +use sqlx::{ + query, + Database, + Pool, + Postgres, + pool::PoolOptions, + postgres::{ + PgConnectOptions, + PgSslMode, + }, + Result, +}; +use std::string::ToString; + +pub struct Db { + pool: Pool +} + +#[non_exhaustive] +enum Role { + Owner, + Participant +} + +impl ToString for Role { + fn to_string(&self) -> String { + match *self { + Role::Owner => "owner".to_string(), + Role::Participant => "participant".to_string(), + } + } +} + +pub struct Session { + sessionuri: Uri, + label: Option, + participants: Vec, +} + +impl Db { + async fn connect() -> Result { + let conn = PgConnectOptions::new() + .host("localhost") + .port(5432) + .username("postgres") + .password("062217") + .database("anisky") + .ssl_mode(PgSslMode::Disable); + + let pool = match PoolOptions::new().connect_with(conn).await { + Ok(p) => p, + Err(e) => return Err(e), + }; + + Ok(Db { pool }) + } + // + // pub async fn add_user(&self, user: &User) -> Result<()> { + // query!(r#" + // INSERT INTO users(userdid, handle) VALUES ($1, $2) + // "#, + // user.userdid, user.handle + // ).execute(self.pool).await?; + // Ok(()) + // } + // + // pub async fn add_session(&self, session: &Session) -> Result<()> { + // let mut transaction = self.pool.begin().await?; + // + // query!(r#" + // INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) + // "#, + // session.sessionuri, session.label + // ).execute(&mut *transaction).await?; + // + // for participant in session.participants { + // query!(r#" + // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) + // "#, + // session.sessionuri, participant.userdid, participant.role.to_string() + // ).execute(&mut *transaction).await?; + // } + // + // transaction.commit().await + // } + // + // pub async fn add_participant(&self, session: Session, + // participant: Participant) -> Result { + // query!(r#" + // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) + // "#, + // session.sessionuri, participant.userdid, participant.role.to_string() + // ).execute(self.pool).await?; + // + // session.participants.push(participant); + // + // Ok(session) + // } +} diff --git a/db/src/error.rs b/db/src/error.rs deleted file mode 100644 index 1c330bc..0000000 --- a/db/src/error.rs +++ /dev/null @@ -1,10 +0,0 @@ -#[non_exhaustive] -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("Database Implementation Error: {0}")] - Backend(#[from] sqlx::Error), - #[error("AT Protocol Implementation Error: {0}")] - Atproto(#[from] atproto::error::Error), -} - -pub type Result = std::result::Result; diff --git a/db/src/interfaces.rs b/db/src/interfaces.rs new file mode 100644 index 0000000..d3629e1 --- /dev/null +++ b/db/src/interfaces.rs @@ -0,0 +1,15 @@ +use atproto::{ + Did, + Uri, +}; + +pub struct User { + userdid: Did, + handle: Handle, +} + +struct Participant { + participantdid: Did, + role: Role, +} + diff --git a/db/src/interfaces/direct.rs b/db/src/interfaces/direct.rs deleted file mode 100644 index d0b4ddc..0000000 --- a/db/src/interfaces/direct.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod types; -pub mod functions; diff --git a/db/src/interfaces/direct/functions.rs b/db/src/interfaces/direct/functions.rs deleted file mode 100644 index a61ab37..0000000 --- a/db/src/interfaces/direct/functions.rs +++ /dev/null @@ -1,116 +0,0 @@ -use atproto::types::{ - Datetime, - Uri, -}; -use crate::{ - Error, - interfaces::direct::types::{ - Activity, - Participant, - Session, - User - }, -}; -use sqlx::{ - Transaction, - Postgres, - PgPool, - query, -}; - -pub async fn ingest_activity( - db: PgPool, activity: Activity -) -> Result<(), Error> { - let mut transaction = db.begin().await?; - write_activity(&mut transaction, activity).await?; - transaction.commit().await.map_err(Error::Backend) -} - -pub async fn ingest_session( - db: PgPool, session: Session -) -> Result<(), Error> { - let mut transaction = db.begin().await?; - write_session(&mut transaction, session).await?; - transaction.commit().await.map_err(Error::Backend) -} - -async fn write_activity( - tr: &mut Transaction<'_, Postgres>, activity: Activity -) -> Result<(), Error> { - let (sessionuri, sessioncid) = match activity.session { - Some(sr) => { - let (session, cid) = sr.extract_content(); - let sessionuri = session.uri.to_string(); - write_session(tr, session).await?; - (sessionuri, cid.to_string()) - } - None => ("".to_string(), "".to_string()), - }; - - query!(r#" - INSERT INTO - activity(uri, cid, session, sessioncid, performed_at, created_at, indexed_at) - VALUES ($1, $2, $3, $4, $5, $6, $7) - "#, - &activity.uri.to_string(), - &activity.cid.to_string(), - sessionuri, - sessioncid, - activity.performed_at.map(|dt| dt.to_string()), - activity.created_at.map(|dt| dt.to_string()), - &Datetime::now()?.to_string(), - ).execute(&mut **tr).await?; - - // TODO: Handle Progress - - Ok(()) -} - - -async fn write_session( - tr: &mut Transaction<'_, Postgres>, session: Session -) -> Result<(), Error> { - query!(r#" - INSERT INTO - session(uri, cid, owner, content, contentcid, label, created_at, indexed_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - "#, - &session.uri.to_string(), - &session.cid.to_string(), - &session.uri.authority_as_did().to_string(), - &session.content.get_content().to_string(), - &session.content.get_cid().to_string(), - session.label, - session.created_at.map(|dt| dt.to_string()), - &Datetime::now()?.to_string() - ).execute(&mut **tr).await?; - - if let Some(participants) = session.other_participants { - for participant in participants { - write_participant(tr, &participant, &session.uri).await? - } - } - - Ok(()) -} - -async fn write_participant( - tr: &mut Transaction<'_, Postgres>, participant: &Participant, sessionuri: &Uri -) -> Result<(), Error> { - let (participant_type, user): (String, &User) = match participant { - Participant::Owner(user) => ("Owner".to_string(), user), - Participant::Added(user) => ("Participant".to_string(), user), - }; - - query!(r#" - INSERT INTO - participant(participantdid, sessionuri, role) - VALUES ($1, $2, $3) - "#, - user.did.to_string(), - sessionuri.to_string(), - participant_type, - ).execute(&mut **tr).await?; - - Ok(()) -} diff --git a/db/src/interfaces/direct/types.rs b/db/src/interfaces/direct/types.rs deleted file mode 100644 index 4a3ea65..0000000 --- a/db/src/interfaces/direct/types.rs +++ /dev/null @@ -1,58 +0,0 @@ -use atproto::types::{ - Cid, - Uri, - Datetime, - Did, - StrongRef, - Handle, -}; -use std::fmt::{Display, Formatter, Result as FmtResult}; - -pub struct Activity { - pub uri: Uri, - pub cid: Cid, - - pub session: Option>, - pub progress: Option, - pub performed_at: Option, - pub created_at: Option, -} - -pub struct Session { - pub uri: Uri, - pub cid: Cid, - - pub content: StrongRef, - pub label: Option, - pub created_at: Option, - pub other_participants: Option>, -} - -pub struct User { - pub did: Did, - pub handle: Option, -} - -#[non_exhaustive] -pub enum Participant { - Owner(User), - Added(User), -} - -#[non_exhaustive] -pub enum Content { - UnknownContent -} - -impl Display for Content { - fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { - write!(f, "{}", match self { - Content::UnknownContent => "UnknownContentType", - }) - } -} - -#[non_exhaustive] -pub enum Progress { - UnknownProgress -} diff --git a/db/src/interfaces/mod.rs b/db/src/interfaces/mod.rs deleted file mode 100644 index afdae97..0000000 --- a/db/src/interfaces/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod direct; diff --git a/db/src/lib.rs b/db/src/lib.rs index f696f5c..82e9c13 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -1,4 +1 @@ -pub mod interfaces; -pub mod error; - -pub use crate::error::Error; +pub struct db; diff --git a/flake.lock b/flake.lock index 69e701e..8f3aa7b 100644 --- a/flake.lock +++ b/flake.lock @@ -41,11 +41,11 @@ "nixpkgs": "nixpkgs_2" }, "locked": { - "lastModified": 1749695868, - "narHash": "sha256-debjTLOyqqsYOUuUGQsAHskFXH5+Kx2t3dOo/FCoNRA=", + "lastModified": 1746585402, + "narHash": "sha256-Pf+ufu6bYNA1+KQKHnGMNEfTwpD9ZIcAeLoE2yPWIP0=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "55f914d5228b5c8120e9e0f9698ed5b7214d09cd", + "rev": "72dd969389583664f87aa348b3458f2813693617", "type": "github" }, "original": { diff --git a/flake.nix b/flake.nix index 48d94bf..943945b 100644 --- a/flake.nix +++ b/flake.nix @@ -42,7 +42,6 @@ packages = (with pkgs; [ # The package provided by our custom overlay. Includes cargo, Clippy, cargo-fmt, # rustdoc, rustfmt, and other tools. - sqlx-cli rustToolchain ]) ++ pkgs.lib.optionals pkgs.stdenv.isDarwin (with pkgs; [ libiconv ]); }; diff --git a/ingestor/Cargo.toml b/ingestor/Cargo.toml index 7da302c..a177581 100644 --- a/ingestor/Cargo.toml +++ b/ingestor/Cargo.toml @@ -5,7 +5,7 @@ edition = "2024" [dependencies] anyhow = "1.0.98" -async-trait.workspace = true +async-trait = "0.1.88" atproto.workspace = true rocketman = "0.2.0" serde.workspace = true