Compare commits

...

5 commits

Author SHA1 Message Date
4a767ea1ad
Db, Commit two, everything is looking pretty
Db, add atproto error support

Db, rename spoor to direct, implement sessions

Db, add activity ingestor, fix type name

activity has its type name Authority instead of Uri for some reason? So
that is no longer an issue.

EVERYTHING IS STILL UNTESTED BABY~
2025-06-17 15:15:34 -07:00
0707b5eed4
Atproto, add sqlx support 2025-06-17 15:15:34 -07:00
ab78d1fb7b
Atproto, types overhaul and error handling
Breaks off from Atrium-rs's types because they are implemented
inconsistently, which makes them harder to use.

This was done with reference to the atproto documentation but
specifically not the atrium-rs codebase so I wouldn't have to think
about licenses.

This adds the types and error module in atproto. It also touches
Cargo.toml for some new dependencies and some shared dependencies. It
required thiserror, so I looped that into the workspace meaning that
this commit touches db.

some things to keep in mind:
- There is no CID parsing
- None of this is tested, nor are there any tests written. We're playing
  fast and loose baby~
2025-06-17 15:14:45 -07:00
aadea9757b
Db, first commit of work
This also touches atproto/src/lib.rs which may be a problem for the
rebase~

Adds migrations and all the other stuff.
2025-06-17 15:13:53 -07:00
3eb6aab10f
Cargo, moving async-trait and tokio (DB, Ingestor)
Move async-trait from ingestor to workspace so DB can share.

Update the reference to tokio in DB from version to workspace
2025-06-17 15:12:10 -07:00
26 changed files with 807 additions and 143 deletions

58
Cargo.lock generated
View file

@ -118,6 +118,9 @@ dependencies = [
"lazy-regex", "lazy-regex",
"serde", "serde",
"serde_json", "serde_json",
"sqlx",
"thiserror 2.0.12",
"time",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
] ]
@ -563,7 +566,10 @@ dependencies = [
name = "db" name = "db"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-trait",
"atproto",
"sqlx", "sqlx",
"thiserror 2.0.12",
"tokio", "tokio",
] ]
@ -578,6 +584,15 @@ dependencies = [
"zeroize", "zeroize",
] ]
[[package]]
name = "deranged"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e"
dependencies = [
"powerfmt",
]
[[package]] [[package]]
name = "derive_builder" name = "derive_builder"
version = "0.20.2" version = "0.20.2"
@ -1467,6 +1482,12 @@ dependencies = [
"zeroize", "zeroize",
] ]
[[package]]
name = "num-conv"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]] [[package]]
name = "num-integer" name = "num-integer"
version = "0.1.46" version = "0.1.46"
@ -1622,6 +1643,12 @@ dependencies = [
"zerovec", "zerovec",
] ]
[[package]]
name = "powerfmt"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]] [[package]]
name = "ppv-lite86" name = "ppv-lite86"
version = "0.2.21" version = "0.2.21"
@ -2407,6 +2434,37 @@ dependencies = [
"once_cell", "once_cell",
] ]
[[package]]
name = "time"
version = "0.3.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40"
dependencies = [
"deranged",
"itoa",
"num-conv",
"powerfmt",
"serde",
"time-core",
"time-macros",
]
[[package]]
name = "time-core"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c"
[[package]]
name = "time-macros"
version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49"
dependencies = [
"num-conv",
"time-core",
]
[[package]] [[package]]
name = "tinystr" name = "tinystr"
version = "0.8.1" version = "0.8.1"

View file

@ -3,9 +3,12 @@ resolver = "3"
members = [ "api", "atproto","db", "ingestor"] members = [ "api", "atproto","db", "ingestor"]
[workspace.dependencies] [workspace.dependencies]
async-trait = "0.1.88"
atproto = { path = "./atproto" } atproto = { path = "./atproto" }
serde = "1.0.219" serde = "1.0.219"
serde_json = "1.0.140" serde_json = "1.0.140"
sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] }
thiserror = "2.0.12"
tokio = { version = "1.45.0", features = ["macros", "rt-multi-thread"] } tokio = { version = "1.45.0", features = ["macros", "rt-multi-thread"] }
tracing = "0.1.41" tracing = "0.1.41"
tracing-subscriber = "0.3.19" tracing-subscriber = "0.3.19"

View file

@ -8,5 +8,12 @@ atrium-api = { version = "0.25.3", default-features = false }
lazy-regex = "3.4.1" lazy-regex = "3.4.1"
serde.workspace = true serde.workspace = true
serde_json.workspace = true serde_json.workspace = true
sqlx = { workspace = true, optional = true }
time = { version = "0.3.41", features = ["parsing", "formatting"] }
tracing-subscriber.workspace = true tracing-subscriber.workspace = true
tracing.workspace = true tracing.workspace = true
thiserror.workspace = true
[features]
default = []
sqlx-support = ["dep:sqlx"]

31
atproto/src/error.rs Normal file
View file

@ -0,0 +1,31 @@
use thiserror::Error as ThisError;
#[non_exhaustive]
#[derive(Debug, ThisError)]
pub enum Error {
#[error("Error while parsing")]
Parse { err: ParseError, object: String },
#[error("Error while formatting")]
Format { err: FormatError, object: String },
}
#[non_exhaustive]
#[derive(Debug, ThisError)]
pub enum FormatError {
#[error("Time Parse Error: {0}")]
Datetime(#[from] time::error::Format),
}
#[non_exhaustive]
#[derive(Debug, ThisError)]
pub enum ParseError {
#[error("Time Parse Error: {0}")]
Datetime(#[from] time::error::Parse),
#[error("Length of parsed object too long, max: {max:?}, got: {got:?}.")]
Length { max: usize, got: usize },
#[error("Currently Did is enforced, cannot use handle, {handle:?}")]
ForceDid { handle: String },
#[error("Incorrectly formatted")]
Format,
}
pub type Result<T> = std::result::Result<T, Error>;

View file

@ -1,60 +1,5 @@
use lazy_regex::regex_captures;
use core::str::FromStr;
pub use atrium_api::types::{
Collection,
string::{
Nsid,
RecordKey,
AtIdentifier as Authority,
}
};
pub mod lexicons; pub mod lexicons;
pub mod types;
pub struct Uri { pub mod error;
whole: String, #[cfg(feature = "sqlx-support")]
// These fields could be useful in the future, pub mod sqlx;
// so I'm leaving the code for them.
// authority: Authority,
// collection: Option<Nsid>,
// rkey: Option<RecordKey>,
}
impl FromStr for Uri {
type Err = &'static str;
fn from_str(uri: &str) -> Result<Self, Self::Err> {
if uri.len() > 8000 {
return Err("Uri too long")
}
let Some((
whole, unchecked_authority, unchecked_collection, unchecked_rkey
)) = regex_captures!(
r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i",
uri,
) else {
return Err("Invalid Uri");
};
// This parsing is required, but the values don't need to be used yet.
// No compute cost to use them, just storage cost
let _authority = Authority::from_str(unchecked_authority)?;
let _collection = if unchecked_collection.is_empty() { None }
else { Some(Nsid::new(unchecked_collection.to_string())?) };
let _rkey = if unchecked_rkey.is_empty() { None }
else { Some(RecordKey::new(unchecked_rkey.to_string())?) };
// Ok(Uri{ whole: whole.to_string(), authority, collection, rkey })
Ok(Uri { whole: whole.to_string() })
}
}
impl Uri {
pub fn as_str(&self) -> &str {
self.whole.as_str()
}
}

38
atproto/src/sqlx.rs Normal file
View file

@ -0,0 +1,38 @@
use crate::types::{
Did,
Cid,
Uri,
Handle,
Datetime,
};
macro_rules! implement_sqlx_for_string_type {
($name:ident) => {
impl sqlx::Type<sqlx::Postgres> for $name {
fn type_info() -> sqlx::postgres::PgTypeInfo {
<String as sqlx::Type<sqlx::Postgres>>::type_info()
}
}
impl<'q> sqlx::Encode<'q, sqlx::Postgres> for $name {
fn encode_by_ref(
&self, buf: &mut sqlx::postgres::PgArgumentBuffer
) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
<String as sqlx::Encode<sqlx::Postgres>>::encode_by_ref(&self.to_string(), buf)
}
}
impl<'r> sqlx::Decode<'r, sqlx::Postgres> for $name {
fn decode(
value: sqlx::postgres::PgValueRef<'r>
) -> Result<Self, sqlx::error::BoxDynError> {
let s = <String as sqlx::Decode<sqlx::Postgres>>::decode(value)?;
s.parse::<$name>().map_err(|e| Box::new(e) as sqlx::error::BoxDynError)
}
}
}
}
implement_sqlx_for_string_type!(Did);
implement_sqlx_for_string_type!(Cid);
implement_sqlx_for_string_type!(Uri);
implement_sqlx_for_string_type!(Handle);
implement_sqlx_for_string_type!(Datetime);

84
atproto/src/types.rs Normal file
View file

@ -0,0 +1,84 @@
use crate::error::{Error, ParseError};
macro_rules! basic_string_type {
($name:ident, $regex:literal, $max_len:literal) => {
pub struct $name { value: String, }
impl std::fmt::Display for $name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.value)
}
}
impl std::str::FromStr for $name {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.len() > $max_len {
return Err(Error::Parse {
err: ParseError::Length { max: s.len(), got: $max_len },
object: stringify!($name).to_string(),
});
}
if ! lazy_regex::regex_is_match!(
$regex,
s
) {
return Err(Error::Parse {
err: ParseError::Format,
object: stringify!($name).to_string(),
});
}
Ok(Self {
value: s.to_string(),
})
}
}
}
}
mod did;
pub use did::Did;
mod cid;
pub use cid::Cid;
mod authority;
pub use authority::Authority;
mod datetime;
pub use datetime::Datetime;
mod record_key;
pub use record_key::RecordKey;
mod uri;
pub use uri::Uri;
basic_string_type!(Handle,
r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$",
253
);
basic_string_type!(Nsid,
r"^[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+(\.[a-zA-Z]([a-zA-Z0-9]{0,62})?)$",
317
);
basic_string_type!(Tid,
r"^[234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{12}$",
13
);
pub struct StrongRef<T> {
content: T,
cid: Cid,
}
impl<T> StrongRef<T> {
pub fn get_content(&self) -> &T {
&self.content
}
pub fn extract_content(self) -> (T, Cid) {
(self.content, self.cid)
}
pub fn get_cid(&self) -> &Cid {
&self.cid
}
}

View file

@ -0,0 +1,38 @@
use crate::{
types::{Did, Handle},
error::{Error, ParseError},
};
use std::{
fmt::{Display, Formatter, Result as FmtResult},
str::FromStr,
};
pub enum Authority {
Did(Did),
Handle(Handle),
}
impl Display for Authority {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{}", match self {
Authority::Did(did) => did.to_string(),
Authority::Handle(handle) => handle.to_string(),
})
}
}
impl FromStr for Authority {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Ok(did) = s.parse::<Did>() {
return Ok(Authority::Did(did));
}
if let Ok(did) = s.parse::<Handle>() {
return Ok(Authority::Handle(did));
}
Err(Error::Parse {
err: ParseError::Format,
object: "Authority".to_string(),
})
}
}

16
atproto/src/types/cid.rs Normal file
View file

@ -0,0 +1,16 @@
use crate::error::Error;
pub struct Cid { value: String, }
impl std::fmt::Display for Cid {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.value)
}
}
impl std::str::FromStr for Cid {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self { value: s.to_string() })
}
}

View file

@ -0,0 +1,63 @@
use crate::error::{Error, ParseError, FormatError};
use time::{
UtcDateTime,
format_description::well_known::{Rfc3339, Iso8601},
};
use std::{
fmt::{Display, Formatter, Result as FmtResult},
str::FromStr,
};
pub use time::{Date, Time};
pub struct Datetime {
time: UtcDateTime,
derived_string: String,
}
impl Datetime {
pub fn now() -> Result<Self, Error> {
Datetime::from_utc(UtcDateTime::now())
}
pub fn new(date: Date, time: Time) -> Result<Self, Error> {
Datetime::from_utc(UtcDateTime::new(date, time))
}
fn from_utc(utc: UtcDateTime) -> Result<Self, Error> {
Ok(Datetime {
time: utc,
derived_string: match utc.format(&Rfc3339) {
Ok(ds) => ds,
Err(e) => return Err(Error::Format {
err: FormatError::Datetime(e),
object: "Datetime".to_string(),
}),
},
})
}
}
impl Display for Datetime {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{}", self.derived_string)
}
}
impl FromStr for Datetime {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Parse as both Rfc3339 and Iso8601 to ensure intersection
let dt_rfc3339 = UtcDateTime::parse(s, &Rfc3339);
let dt_iso8601 = UtcDateTime::parse(s, &Iso8601::DEFAULT);
let datetime = dt_iso8601
.and(dt_rfc3339)
.map_err(|e| Error::Parse {
err: ParseError::Datetime(e),
object: "Datetime".to_string(),
})?;
Ok(Datetime { time: datetime, derived_string: s.to_string() })
}
}

72
atproto/src/types/did.rs Normal file
View file

@ -0,0 +1,72 @@
use crate::error::{Error, ParseError};
use std::{
fmt::{Display, Formatter, Result as FmtResult},
str::FromStr,
};
use lazy_regex::regex_captures;
enum DidMethod {
Web,
Plc,
}
impl Display for DidMethod {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{}", match self {
DidMethod::Web => String::from("web"),
DidMethod::Plc => String::from("plc"),
})
}
}
impl FromStr for DidMethod {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"web" => Ok(DidMethod::Web),
"plc" => Ok(DidMethod::Plc),
_ => Err(Error::Parse {
err: ParseError::Format,
object: "DidMethod".to_string(),
}),
}
}
}
pub struct Did {
method: DidMethod,
identifier: String,
}
impl Display for Did {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "did:{}:{}", self.method, self.identifier)
}
}
impl FromStr for Did {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.len() > 2048 {
return Err(Error::Parse {
err: ParseError::Length { max: s.len(), got: 2048 },
object: "Did".to_string(),
});
}
let Some((
_whole, unchecked_method, identifier
)) = regex_captures!(
r"^did:([a-z]+):([a-zA-Z0-9._:%-]*[a-zA-Z0-9._-])$",
s,
) else {
return Err(Error::Parse {
err: ParseError::Format,
object: "Did".to_string(),
});
};
Ok(Self {
method: unchecked_method.parse::<DidMethod>()?,
identifier: identifier.to_string(),
})
}
}

View file

@ -0,0 +1,63 @@
use crate::{
types::{Nsid, Tid},
error::{Error, ParseError},
};
use std::{
fmt::{Display, Formatter, Result as FmtResult},
str::FromStr,
};
use lazy_regex::regex_is_match;
pub enum RecordKey {
Tid(Tid),
Nsid(Nsid),
Literal(String),
Any(String),
}
impl Display for RecordKey {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{}", match self {
RecordKey::Tid(tid) => tid.to_string(),
RecordKey::Nsid(nsid) => nsid.to_string(),
RecordKey::Literal(literal) => literal.to_string(),
RecordKey::Any(any) => any.to_string(),
})
}
}
impl FromStr for RecordKey {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.len() > 512 {
return Err(Error::Parse {
err: ParseError::Length { max: 512, got: s.len() },
object: "RecordKey".to_string(),
});
}
if !(
regex_is_match!(
r"^[a-zA-Z0-9`.-_:~]+$",
s,
)
&& s != "."
&& s != ".."
) {
return Err(Error::Parse {
err: ParseError::Format,
object: "RecordKey".to_string(),
});
}
// Valid record key, now decide type
if s.starts_with("literal:") {
return Ok(RecordKey::Literal(s.to_string()));
}
if let Ok(tid) = s.parse::<Tid>() {
return Ok(RecordKey::Tid(tid));
}
if let Ok(nsid) = s.parse::<Nsid>() {
return Ok(RecordKey::Nsid(nsid));
}
Ok(RecordKey::Any(s.to_string()))
}
}

79
atproto/src/types/uri.rs Normal file
View file

@ -0,0 +1,79 @@
use crate::{
types::{Did, Authority, Nsid, RecordKey},
error::{Error, ParseError},
};
use std::{
fmt::{Display, Formatter, Result as FmtResult},
str::FromStr,
};
use lazy_regex::regex_captures;
pub struct Uri {
authority: Did,
collection: Option<Nsid>,
rkey: Option<RecordKey>,
}
impl Display for Uri {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "at://{}", self.authority)?;
if let Some(collection) = &self.collection {
write!(f, "/{}", collection)?;
if let Some(rkey) = &self.rkey {
write!(f, "/{}", rkey)?;
}
}
Ok(())
}
}
impl FromStr for Uri {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.len() > 8000 {
return Err(Error::Parse {
err: ParseError::Length { max: 8000, got: s.len() },
object: "Did".to_string(),
});
}
let Some((
_whole, unchecked_authority, unchecked_collection, unchecked_rkey
)) = regex_captures!(
r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i",
s,
) else {
return Err(Error::Parse {
err: ParseError::Format,
object: "Uri".to_string(),
});
};
let did = match Authority::from_str(unchecked_authority)? {
Authority::Handle(h) =>
return Err(Error::Parse {
err: ParseError::ForceDid { handle: h.to_string() },
object: "Uri".to_string(),
}),
Authority::Did(d) => d,
};
let collection = if unchecked_collection.is_empty() { None }
else { Some(unchecked_collection.parse::<Nsid>()?) };
let rkey = if unchecked_rkey.is_empty() { None }
else { Some(unchecked_rkey.parse::<RecordKey>()?) };
Ok(Uri { authority: did, collection, rkey })
}
}
impl Uri {
pub fn authority_as_did(&self) -> &Did {
&self.authority
}
}

View file

@ -4,5 +4,8 @@ version = "0.1.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]
sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] } async-trait.workspace = true
tokio = "1.45.0" atproto = { workspace = true, features = ["sqlx-support"] }
sqlx.workspace = true
thiserror.workspace = true
tokio.workspace = true

View file

@ -0,0 +1,45 @@
-- Add migration script here
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE TABLE actor (
did VARCHAR PRIMARY KEY,
handle VARCHAR UNIQUE,
indexed_at VARCHAR NOT NULL
);
CREATE INDEX actor_handle_trgm_idx ON actor USING gist (handle gist_trgm_ops);
CREATE TABLE session (
uri VARCHAR PRIMARY KEY,
cid VARCHAR NOT NULL,
owner VARCHAR NOT NULL,
content VARCHAR NOT NULL,
contentcid VARCHAR NOT NULL,
label VARCHAR,
-- Participants in participant
created_at VARCHAR,
indexed_at VARCHAR NOT NULL,
sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL
);
CREATE TABLE activity (
uri VARCHAR PRIMARY KEY,
cid VARCHAR NOT NULL,
session VARCHAR,
sessioncid VARCHAR,
-- Progress in progress
performed_at VARCHAR,
created_at VARCHAR,
indexed_at VARCHAR NOT NULL,
sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL
);
CREATE TABLE participant (
participantdid VARCHAR NOT NULL,
sessionuri VARCHAR NOT NULL,
role VARCHAR NOT NULL
);

View file

@ -1,19 +1,7 @@
use sqlx::{
query,
Database,
Pool,
Postgres,
pool::PoolOptions,
postgres::{
PgConnectOptions,
PgSslMode,
},
Result,
};
use std::string::ToString; use std::string::ToString;
pub struct Db<Dbimp: Database> { pub struct Db<Db: Database> {
pool: Pool<Dbimp> pool: Pool<Db>
} }
#[non_exhaustive] #[non_exhaustive]
@ -39,13 +27,6 @@ pub struct Session {
impl Db<Postgres> { impl Db<Postgres> {
async fn connect() -> Result<Self> { async fn connect() -> Result<Self> {
let conn = PgConnectOptions::new()
.host("localhost")
.port(5432)
.username("postgres")
.password("062217")
.database("anisky")
.ssl_mode(PgSslMode::Disable);
let pool = match PoolOptions::new().connect_with(conn).await { let pool = match PoolOptions::new().connect_with(conn).await {
Ok(p) => p, Ok(p) => p,
@ -55,45 +36,4 @@ impl Db<Postgres> {
Ok(Db { pool }) Ok(Db { pool })
} }
// //
// pub async fn add_user(&self, user: &User) -> Result<()> {
// query!(r#"
// INSERT INTO users(userdid, handle) VALUES ($1, $2)
// "#,
// user.userdid, user.handle
// ).execute(self.pool).await?;
// Ok(())
// }
//
// pub async fn add_session(&self, session: &Session) -> Result<()> {
// let mut transaction = self.pool.begin().await?;
//
// query!(r#"
// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2)
// "#,
// session.sessionuri, session.label
// ).execute(&mut *transaction).await?;
//
// for participant in session.participants {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(&mut *transaction).await?;
// }
//
// transaction.commit().await
// }
//
// pub async fn add_participant(&self, session: Session,
// participant: Participant) -> Result<Session> {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(self.pool).await?;
//
// session.participants.push(participant);
//
// Ok(session)
// }
} }

10
db/src/error.rs Normal file
View file

@ -0,0 +1,10 @@
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Database Implementation Error: {0}")]
Backend(#[from] sqlx::Error),
#[error("AT Protocol Implementation Error: {0}")]
Atproto(#[from] atproto::error::Error),
}
pub type Result<T> = std::result::Result<T, Error>;

View file

@ -1,15 +0,0 @@
use atproto::{
Did,
Uri,
};
pub struct User {
userdid: Did,
handle: Handle,
}
struct Participant {
participantdid: Did,
role: Role,
}

View file

@ -0,0 +1,2 @@
pub mod types;
pub mod functions;

View file

@ -0,0 +1,116 @@
use atproto::types::{
Datetime,
Uri,
};
use crate::{
Error,
interfaces::direct::types::{
Activity,
Participant,
Session,
User
},
};
use sqlx::{
Transaction,
Postgres,
PgPool,
query,
};
pub async fn ingest_activity(
db: PgPool, activity: Activity
) -> Result<(), Error> {
let mut transaction = db.begin().await?;
write_activity(&mut transaction, activity).await?;
transaction.commit().await.map_err(Error::Backend)
}
pub async fn ingest_session(
db: PgPool, session: Session
) -> Result<(), Error> {
let mut transaction = db.begin().await?;
write_session(&mut transaction, session).await?;
transaction.commit().await.map_err(Error::Backend)
}
async fn write_activity(
tr: &mut Transaction<'_, Postgres>, activity: Activity
) -> Result<(), Error> {
let (sessionuri, sessioncid) = match activity.session {
Some(sr) => {
let (session, cid) = sr.extract_content();
let sessionuri = session.uri.to_string();
write_session(tr, session).await?;
(sessionuri, cid.to_string())
}
None => ("".to_string(), "".to_string()),
};
query!(r#"
INSERT INTO
activity(uri, cid, session, sessioncid, performed_at, created_at, indexed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"#,
&activity.uri.to_string(),
&activity.cid.to_string(),
sessionuri,
sessioncid,
activity.performed_at.map(|dt| dt.to_string()),
activity.created_at.map(|dt| dt.to_string()),
&Datetime::now()?.to_string(),
).execute(&mut **tr).await?;
// TODO: Handle Progress
Ok(())
}
async fn write_session(
tr: &mut Transaction<'_, Postgres>, session: Session
) -> Result<(), Error> {
query!(r#"
INSERT INTO
session(uri, cid, owner, content, contentcid, label, created_at, indexed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
"#,
&session.uri.to_string(),
&session.cid.to_string(),
&session.uri.authority_as_did().to_string(),
&session.content.get_content().to_string(),
&session.content.get_cid().to_string(),
session.label,
session.created_at.map(|dt| dt.to_string()),
&Datetime::now()?.to_string()
).execute(&mut **tr).await?;
if let Some(participants) = session.other_participants {
for participant in participants {
write_participant(tr, &participant, &session.uri).await?
}
}
Ok(())
}
async fn write_participant(
tr: &mut Transaction<'_, Postgres>, participant: &Participant, sessionuri: &Uri
) -> Result<(), Error> {
let (participant_type, user): (String, &User) = match participant {
Participant::Owner(user) => ("Owner".to_string(), user),
Participant::Added(user) => ("Participant".to_string(), user),
};
query!(r#"
INSERT INTO
participant(participantdid, sessionuri, role)
VALUES ($1, $2, $3)
"#,
user.did.to_string(),
sessionuri.to_string(),
participant_type,
).execute(&mut **tr).await?;
Ok(())
}

View file

@ -0,0 +1,58 @@
use atproto::types::{
Cid,
Uri,
Datetime,
Did,
StrongRef,
Handle,
};
use std::fmt::{Display, Formatter, Result as FmtResult};
pub struct Activity {
pub uri: Uri,
pub cid: Cid,
pub session: Option<StrongRef<Session>>,
pub progress: Option<Progress>,
pub performed_at: Option<Datetime>,
pub created_at: Option<Datetime>,
}
pub struct Session {
pub uri: Uri,
pub cid: Cid,
pub content: StrongRef<Content>,
pub label: Option<String>,
pub created_at: Option<Datetime>,
pub other_participants: Option<Vec<Participant>>,
}
pub struct User {
pub did: Did,
pub handle: Option<Handle>,
}
#[non_exhaustive]
pub enum Participant {
Owner(User),
Added(User),
}
#[non_exhaustive]
pub enum Content {
UnknownContent
}
impl Display for Content {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{}", match self {
Content::UnknownContent => "UnknownContentType",
})
}
}
#[non_exhaustive]
pub enum Progress {
UnknownProgress
}

1
db/src/interfaces/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod direct;

View file

@ -1 +1,7 @@
pub struct db; pub mod interfaces;
pub mod error;
pub use crate::error::Error;
// pub struct db;

6
flake.lock generated
View file

@ -41,11 +41,11 @@
"nixpkgs": "nixpkgs_2" "nixpkgs": "nixpkgs_2"
}, },
"locked": { "locked": {
"lastModified": 1746585402, "lastModified": 1749695868,
"narHash": "sha256-Pf+ufu6bYNA1+KQKHnGMNEfTwpD9ZIcAeLoE2yPWIP0=", "narHash": "sha256-debjTLOyqqsYOUuUGQsAHskFXH5+Kx2t3dOo/FCoNRA=",
"owner": "oxalica", "owner": "oxalica",
"repo": "rust-overlay", "repo": "rust-overlay",
"rev": "72dd969389583664f87aa348b3458f2813693617", "rev": "55f914d5228b5c8120e9e0f9698ed5b7214d09cd",
"type": "github" "type": "github"
}, },
"original": { "original": {

View file

@ -42,6 +42,7 @@
packages = (with pkgs; [ packages = (with pkgs; [
# The package provided by our custom overlay. Includes cargo, Clippy, cargo-fmt, # The package provided by our custom overlay. Includes cargo, Clippy, cargo-fmt,
# rustdoc, rustfmt, and other tools. # rustdoc, rustfmt, and other tools.
sqlx-cli
rustToolchain rustToolchain
]) ++ pkgs.lib.optionals pkgs.stdenv.isDarwin (with pkgs; [ libiconv ]); ]) ++ pkgs.lib.optionals pkgs.stdenv.isDarwin (with pkgs; [ libiconv ]);
}; };

View file

@ -5,7 +5,7 @@ edition = "2024"
[dependencies] [dependencies]
anyhow = "1.0.98" anyhow = "1.0.98"
async-trait = "0.1.88" async-trait.workspace = true
atproto.workspace = true atproto.workspace = true
rocketman = "0.2.0" rocketman = "0.2.0"
serde.workspace = true serde.workspace = true