Atproto Types Overhaul and DB Interface #3

Merged
Julia merged 4 commits from db-ingestion-interface into main 2025-06-17 15:49:17 -07:00
26 changed files with 802 additions and 180 deletions

58
Cargo.lock generated
View file

@ -118,6 +118,9 @@ dependencies = [
"lazy-regex",
"serde",
"serde_json",
"sqlx",
"thiserror 2.0.12",
"time",
"tracing",
"tracing-subscriber",
]
@ -563,7 +566,10 @@ dependencies = [
name = "db"
version = "0.1.0"
dependencies = [
"async-trait",
"atproto",
"sqlx",
"thiserror 2.0.12",
"tokio",
]
@ -578,6 +584,15 @@ dependencies = [
"zeroize",
]
[[package]]
name = "deranged"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e"
dependencies = [
"powerfmt",
]
[[package]]
name = "derive_builder"
version = "0.20.2"
@ -1467,6 +1482,12 @@ dependencies = [
"zeroize",
]
[[package]]
name = "num-conv"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-integer"
version = "0.1.46"
@ -1622,6 +1643,12 @@ dependencies = [
"zerovec",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "ppv-lite86"
version = "0.2.21"
@ -2407,6 +2434,37 @@ dependencies = [
"once_cell",
]
[[package]]
name = "time"
version = "0.3.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40"
dependencies = [
"deranged",
"itoa",
"num-conv",
"powerfmt",
"serde",
"time-core",
"time-macros",
]
[[package]]
name = "time-core"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c"
[[package]]
name = "time-macros"
version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49"
dependencies = [
"num-conv",
"time-core",
]
[[package]]
name = "tinystr"
version = "0.8.1"

View file

@ -3,9 +3,12 @@ resolver = "3"
members = [ "api", "atproto","db", "ingestor"]
[workspace.dependencies]
async-trait = "0.1.88"
atproto = { path = "./atproto" }
serde = "1.0.219"
serde_json = "1.0.140"
sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] }
thiserror = "2.0.12"
tokio = { version = "1.45.0", features = ["macros", "rt-multi-thread"] }
tracing = "0.1.41"
tracing-subscriber = "0.3.19"

View file

@ -8,5 +8,12 @@ atrium-api = { version = "0.25.3", default-features = false }
lazy-regex = "3.4.1"
serde.workspace = true
serde_json.workspace = true
sqlx = { workspace = true, optional = true }
time = { version = "0.3.41", features = ["parsing", "formatting"] }
tracing-subscriber.workspace = true
tracing.workspace = true
thiserror.workspace = true
[features]
default = []
sqlx-support = ["dep:sqlx"]

31
atproto/src/error.rs Normal file
View file

@ -0,0 +1,31 @@
use thiserror::Error as ThisError;
#[non_exhaustive]
#[derive(Debug, ThisError)]
pub enum Error {
#[error("Error while parsing")]
Parse { err: ParseError, object: String },
#[error("Error while formatting")]
Format { err: FormatError, object: String },
}
#[non_exhaustive]
#[derive(Debug, ThisError)]
pub enum FormatError {
#[error("Time Parse Error: {0}")]
Datetime(#[from] time::error::Format),
}
#[non_exhaustive]
#[derive(Debug, ThisError)]
pub enum ParseError {
#[error("Time Parse Error: {0}")]
Datetime(#[from] time::error::Parse),
#[error("Length of parsed object too long, max: {max:?}, got: {got:?}.")]
Length { max: usize, got: usize },
#[error("Currently Did is enforced, cannot use handle, {handle:?}")]
ForceDid { handle: String },
#[error("Incorrectly formatted")]
Format,
}
pub type Result<T> = std::result::Result<T, Error>;

View file

@ -1,60 +1,5 @@
use lazy_regex::regex_captures;
use core::str::FromStr;
pub use atrium_api::types::{
Collection,
string::{
Nsid,
RecordKey,
AtIdentifier as Authority,
}
};
pub mod lexicons;
pub struct Uri {
whole: String,
// These fields could be useful in the future,
// so I'm leaving the code for them.
// authority: Authority,
// collection: Option<Nsid>,
// rkey: Option<RecordKey>,
}
impl FromStr for Uri {
type Err = &'static str;
fn from_str(uri: &str) -> Result<Self, Self::Err> {
if uri.len() > 8000 {
return Err("Uri too long")
}
let Some((
whole, unchecked_authority, unchecked_collection, unchecked_rkey
)) = regex_captures!(
r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i",
uri,
) else {
return Err("Invalid Uri");
};
// This parsing is required, but the values don't need to be used yet.
// No compute cost to use them, just storage cost
let _authority = Authority::from_str(unchecked_authority)?;
let _collection = if unchecked_collection.is_empty() { None }
else { Some(Nsid::new(unchecked_collection.to_string())?) };
let _rkey = if unchecked_rkey.is_empty() { None }
else { Some(RecordKey::new(unchecked_rkey.to_string())?) };
// Ok(Uri{ whole: whole.to_string(), authority, collection, rkey })
Ok(Uri { whole: whole.to_string() })
}
}
impl Uri {
pub fn as_str(&self) -> &str {
self.whole.as_str()
}
}
pub mod types;
pub mod error;
#[cfg(feature = "sqlx-support")]
pub mod sqlx;

38
atproto/src/sqlx.rs Normal file
View file

@ -0,0 +1,38 @@
use crate::types::{
Did,
Cid,
Uri,
Handle,
Datetime,
};
macro_rules! implement_sqlx_for_string_type {
($name:ident) => {
impl sqlx::Type<sqlx::Postgres> for $name {
fn type_info() -> sqlx::postgres::PgTypeInfo {
<String as sqlx::Type<sqlx::Postgres>>::type_info()
}
}
impl<'q> sqlx::Encode<'q, sqlx::Postgres> for $name {
fn encode_by_ref(
&self, buf: &mut sqlx::postgres::PgArgumentBuffer
) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> {
<String as sqlx::Encode<sqlx::Postgres>>::encode_by_ref(&self.to_string(), buf)
}
}
impl<'r> sqlx::Decode<'r, sqlx::Postgres> for $name {
fn decode(
value: sqlx::postgres::PgValueRef<'r>
) -> Result<Self, sqlx::error::BoxDynError> {
let s = <String as sqlx::Decode<sqlx::Postgres>>::decode(value)?;
s.parse::<$name>().map_err(|e| Box::new(e) as sqlx::error::BoxDynError)
}
}
}
}
implement_sqlx_for_string_type!(Did);
implement_sqlx_for_string_type!(Cid);
implement_sqlx_for_string_type!(Uri);
implement_sqlx_for_string_type!(Handle);
implement_sqlx_for_string_type!(Datetime);

84
atproto/src/types.rs Normal file
View file

@ -0,0 +1,84 @@
use crate::error::{Error, ParseError};
macro_rules! basic_string_type {
($name:ident, $regex:literal, $max_len:literal) => {
pub struct $name { value: String, }
impl std::fmt::Display for $name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.value)
}
}
impl std::str::FromStr for $name {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.len() > $max_len {
return Err(Error::Parse {
err: ParseError::Length { max: s.len(), got: $max_len },
object: stringify!($name).to_string(),
});
}
if ! lazy_regex::regex_is_match!(
$regex,
s
) {
return Err(Error::Parse {
err: ParseError::Format,
object: stringify!($name).to_string(),
});
}
Ok(Self {
value: s.to_string(),
})
}
}
}
}
mod did;
pub use did::Did;
mod cid;
pub use cid::Cid;
mod authority;
pub use authority::Authority;
mod datetime;
pub use datetime::Datetime;
mod record_key;
pub use record_key::RecordKey;
mod uri;
pub use uri::Uri;
basic_string_type!(Handle,
r"^([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?$",
253
);
basic_string_type!(Nsid,
r"^[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+(\.[a-zA-Z]([a-zA-Z0-9]{0,62})?)$",
317
);
basic_string_type!(Tid,
r"^[234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{12}$",
13
);
pub struct StrongRef<T> {
content: T,
cid: Cid,
}
impl<T> StrongRef<T> {
pub fn get_content(&self) -> &T {
&self.content
}
pub fn extract_content(self) -> (T, Cid) {
(self.content, self.cid)
}
pub fn get_cid(&self) -> &Cid {
&self.cid
}
}

View file

@ -0,0 +1,38 @@
use crate::{
types::{Did, Handle},
error::{Error, ParseError},
};
use std::{
fmt::{Display, Formatter, Result as FmtResult},
str::FromStr,
};
pub enum Authority {
Did(Did),
Handle(Handle),
}
impl Display for Authority {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{}", match self {
Authority::Did(did) => did.to_string(),
Authority::Handle(handle) => handle.to_string(),
})
}
}
impl FromStr for Authority {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Ok(did) = s.parse::<Did>() {
return Ok(Authority::Did(did));
}
if let Ok(did) = s.parse::<Handle>() {
return Ok(Authority::Handle(did));
}
Err(Error::Parse {
err: ParseError::Format,
object: "Authority".to_string(),
})
}
}

16
atproto/src/types/cid.rs Normal file
View file

@ -0,0 +1,16 @@
use crate::error::Error;
pub struct Cid { value: String, }
impl std::fmt::Display for Cid {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.value)
}
}
impl std::str::FromStr for Cid {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self { value: s.to_string() })
}
}

View file

@ -0,0 +1,63 @@
use crate::error::{Error, ParseError, FormatError};
use time::{
UtcDateTime,
format_description::well_known::{Rfc3339, Iso8601},
};
use std::{
fmt::{Display, Formatter, Result as FmtResult},
str::FromStr,
};
pub use time::{Date, Time};
pub struct Datetime {
time: UtcDateTime,
derived_string: String,
}
impl Datetime {
pub fn now() -> Result<Self, Error> {
Datetime::from_utc(UtcDateTime::now())
}
pub fn new(date: Date, time: Time) -> Result<Self, Error> {
Datetime::from_utc(UtcDateTime::new(date, time))
}
fn from_utc(utc: UtcDateTime) -> Result<Self, Error> {
Ok(Datetime {
time: utc,
derived_string: match utc.format(&Rfc3339) {
Ok(ds) => ds,
Err(e) => return Err(Error::Format {
err: FormatError::Datetime(e),
object: "Datetime".to_string(),
}),
},
})
}
}
impl Display for Datetime {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{}", self.derived_string)
}
}
impl FromStr for Datetime {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Parse as both Rfc3339 and Iso8601 to ensure intersection
let dt_rfc3339 = UtcDateTime::parse(s, &Rfc3339);
let dt_iso8601 = UtcDateTime::parse(s, &Iso8601::DEFAULT);
let datetime = dt_iso8601
.and(dt_rfc3339)
.map_err(|e| Error::Parse {
err: ParseError::Datetime(e),
object: "Datetime".to_string(),
})?;
Ok(Datetime { time: datetime, derived_string: s.to_string() })
}
}

72
atproto/src/types/did.rs Normal file
View file

@ -0,0 +1,72 @@
use crate::error::{Error, ParseError};
use std::{
fmt::{Display, Formatter, Result as FmtResult},
str::FromStr,
};
use lazy_regex::regex_captures;
enum DidMethod {
Web,
Plc,
}
impl Display for DidMethod {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{}", match self {
DidMethod::Web => String::from("web"),
DidMethod::Plc => String::from("plc"),
})
}
}
impl FromStr for DidMethod {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"web" => Ok(DidMethod::Web),
"plc" => Ok(DidMethod::Plc),
_ => Err(Error::Parse {
err: ParseError::Format,
object: "DidMethod".to_string(),
}),
}
}
}
pub struct Did {
method: DidMethod,
identifier: String,
}
impl Display for Did {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "did:{}:{}", self.method, self.identifier)
}
}
impl FromStr for Did {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.len() > 2048 {
return Err(Error::Parse {
err: ParseError::Length { max: s.len(), got: 2048 },
object: "Did".to_string(),
});
}
let Some((
_whole, unchecked_method, identifier
)) = regex_captures!(
r"^did:([a-z]+):([a-zA-Z0-9._:%-]*[a-zA-Z0-9._-])$",
s,
) else {
return Err(Error::Parse {
err: ParseError::Format,
object: "Did".to_string(),
});
};
Ok(Self {
method: unchecked_method.parse::<DidMethod>()?,
identifier: identifier.to_string(),
})
}
}

View file

@ -0,0 +1,63 @@
use crate::{
types::{Nsid, Tid},
error::{Error, ParseError},
};
use std::{
fmt::{Display, Formatter, Result as FmtResult},
str::FromStr,
};
use lazy_regex::regex_is_match;
pub enum RecordKey {
Tid(Tid),
Nsid(Nsid),
Literal(String),
Any(String),
}
impl Display for RecordKey {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{}", match self {
RecordKey::Tid(tid) => tid.to_string(),
RecordKey::Nsid(nsid) => nsid.to_string(),
RecordKey::Literal(literal) => literal.to_string(),
RecordKey::Any(any) => any.to_string(),
})
}
}
impl FromStr for RecordKey {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.len() > 512 {
return Err(Error::Parse {
err: ParseError::Length { max: 512, got: s.len() },
object: "RecordKey".to_string(),
});
}
if !(
regex_is_match!(
r"^[a-zA-Z0-9`.-_:~]+$",
s,
)
&& s != "."
&& s != ".."
) {
return Err(Error::Parse {
err: ParseError::Format,
object: "RecordKey".to_string(),
});
}
// Valid record key, now decide type
if s.starts_with("literal:") {
return Ok(RecordKey::Literal(s.to_string()));
}
if let Ok(tid) = s.parse::<Tid>() {
return Ok(RecordKey::Tid(tid));
}
if let Ok(nsid) = s.parse::<Nsid>() {
return Ok(RecordKey::Nsid(nsid));
}
Ok(RecordKey::Any(s.to_string()))
}
}

79
atproto/src/types/uri.rs Normal file
View file

@ -0,0 +1,79 @@
use crate::{
types::{Did, Authority, Nsid, RecordKey},
error::{Error, ParseError},
};
use std::{
fmt::{Display, Formatter, Result as FmtResult},
str::FromStr,
};
use lazy_regex::regex_captures;
pub struct Uri {
authority: Did,
collection: Option<Nsid>,
rkey: Option<RecordKey>,
}
impl Display for Uri {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "at://{}", self.authority)?;
if let Some(collection) = &self.collection {
write!(f, "/{}", collection)?;
if let Some(rkey) = &self.rkey {
write!(f, "/{}", rkey)?;
}
}
Ok(())
}
}
impl FromStr for Uri {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.len() > 8000 {
return Err(Error::Parse {
err: ParseError::Length { max: 8000, got: s.len() },
object: "Did".to_string(),
});
}
let Some((
_whole, unchecked_authority, unchecked_collection, unchecked_rkey
)) = regex_captures!(
r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i",
s,
) else {
return Err(Error::Parse {
err: ParseError::Format,
object: "Uri".to_string(),
});
};
let did = match Authority::from_str(unchecked_authority)? {
Authority::Handle(h) =>
return Err(Error::Parse {
err: ParseError::ForceDid { handle: h.to_string() },
object: "Uri".to_string(),
}),
Authority::Did(d) => d,
};
let collection = if unchecked_collection.is_empty() { None }
else { Some(unchecked_collection.parse::<Nsid>()?) };
let rkey = if unchecked_rkey.is_empty() { None }
else { Some(unchecked_rkey.parse::<RecordKey>()?) };
Ok(Uri { authority: did, collection, rkey })
}
}
impl Uri {
pub fn authority_as_did(&self) -> &Did {
&self.authority
}
}

View file

@ -4,5 +4,8 @@ version = "0.1.0"
edition = "2024"
[dependencies]
sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] }
tokio = "1.45.0"
async-trait.workspace = true
atproto = { workspace = true, features = ["sqlx-support"] }
sqlx.workspace = true
thiserror.workspace = true
tokio.workspace = true

View file

@ -0,0 +1,45 @@
-- Add migration script here
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE TABLE actor (
did VARCHAR PRIMARY KEY,
handle VARCHAR UNIQUE,
indexed_at VARCHAR NOT NULL
);
CREATE INDEX actor_handle_trgm_idx ON actor USING gist (handle gist_trgm_ops);
CREATE TABLE session (
uri VARCHAR PRIMARY KEY,
cid VARCHAR NOT NULL,
owner VARCHAR NOT NULL,
content VARCHAR NOT NULL,
contentcid VARCHAR NOT NULL,
label VARCHAR,
-- Participants in participant
created_at VARCHAR,
indexed_at VARCHAR NOT NULL,
sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL
);
CREATE TABLE activity (
uri VARCHAR PRIMARY KEY,
cid VARCHAR NOT NULL,
session VARCHAR,
sessioncid VARCHAR,
-- Progress in progress
performed_at VARCHAR,
created_at VARCHAR,
indexed_at VARCHAR NOT NULL,
sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL
);
CREATE TABLE participant (
participantdid VARCHAR NOT NULL,
sessionuri VARCHAR NOT NULL,
role VARCHAR NOT NULL
);

View file

@ -1,99 +0,0 @@
use sqlx::{
query,
Database,
Pool,
Postgres,
pool::PoolOptions,
postgres::{
PgConnectOptions,
PgSslMode,
},
Result,
};
use std::string::ToString;
pub struct Db<Dbimp: Database> {
pool: Pool<Dbimp>
}
#[non_exhaustive]
enum Role {
Owner,
Participant
}
impl ToString for Role {
fn to_string(&self) -> String {
match *self {
Role::Owner => "owner".to_string(),
Role::Participant => "participant".to_string(),
}
}
}
pub struct Session {
sessionuri: Uri,
label: Option<String>,
participants: Vec<Participant>,
}
impl Db<Postgres> {
async fn connect() -> Result<Self> {
let conn = PgConnectOptions::new()
.host("localhost")
.port(5432)
.username("postgres")
.password("062217")
.database("anisky")
.ssl_mode(PgSslMode::Disable);
let pool = match PoolOptions::new().connect_with(conn).await {
Ok(p) => p,
Err(e) => return Err(e),
};
Ok(Db { pool })
}
//
// pub async fn add_user(&self, user: &User) -> Result<()> {
// query!(r#"
// INSERT INTO users(userdid, handle) VALUES ($1, $2)
// "#,
// user.userdid, user.handle
// ).execute(self.pool).await?;
// Ok(())
// }
//
// pub async fn add_session(&self, session: &Session) -> Result<()> {
// let mut transaction = self.pool.begin().await?;
//
// query!(r#"
// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2)
// "#,
// session.sessionuri, session.label
// ).execute(&mut *transaction).await?;
//
// for participant in session.participants {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(&mut *transaction).await?;
// }
//
// transaction.commit().await
// }
//
// pub async fn add_participant(&self, session: Session,
// participant: Participant) -> Result<Session> {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(self.pool).await?;
//
// session.participants.push(participant);
//
// Ok(session)
// }
}

10
db/src/error.rs Normal file
View file

@ -0,0 +1,10 @@
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Database Implementation Error: {0}")]
Backend(#[from] sqlx::Error),
#[error("AT Protocol Implementation Error: {0}")]
Atproto(#[from] atproto::error::Error),
}
pub type Result<T> = std::result::Result<T, Error>;

View file

@ -1,15 +0,0 @@
use atproto::{
Did,
Uri,
};
pub struct User {
userdid: Did,
handle: Handle,
}
struct Participant {
participantdid: Did,
role: Role,
}

View file

@ -0,0 +1,2 @@
pub mod types;
pub mod functions;

View file

@ -0,0 +1,116 @@
use atproto::types::{
Datetime,
Uri,
};
use crate::{
Error,
interfaces::direct::types::{
Activity,
Participant,
Session,
User
},
};
use sqlx::{
Transaction,
Postgres,
PgPool,
query,
};
pub async fn ingest_activity(
db: PgPool, activity: Activity
) -> Result<(), Error> {
let mut transaction = db.begin().await?;
write_activity(&mut transaction, activity).await?;
transaction.commit().await.map_err(Error::Backend)
}
pub async fn ingest_session(
db: PgPool, session: Session
) -> Result<(), Error> {
let mut transaction = db.begin().await?;
write_session(&mut transaction, session).await?;
transaction.commit().await.map_err(Error::Backend)
}
async fn write_activity(
tr: &mut Transaction<'_, Postgres>, activity: Activity
) -> Result<(), Error> {
let (sessionuri, sessioncid) = match activity.session {
Some(sr) => {
let (session, cid) = sr.extract_content();
let sessionuri = session.uri.to_string();
write_session(tr, session).await?;
(sessionuri, cid.to_string())
}
None => ("".to_string(), "".to_string()),
};
query!(r#"
INSERT INTO
activity(uri, cid, session, sessioncid, performed_at, created_at, indexed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"#,
&activity.uri.to_string(),
&activity.cid.to_string(),
sessionuri,
sessioncid,
activity.performed_at.map(|dt| dt.to_string()),
activity.created_at.map(|dt| dt.to_string()),
&Datetime::now()?.to_string(),
).execute(&mut **tr).await?;
// TODO: Handle Progress
Ok(())
}
async fn write_session(
tr: &mut Transaction<'_, Postgres>, session: Session
) -> Result<(), Error> {
query!(r#"
INSERT INTO
session(uri, cid, owner, content, contentcid, label, created_at, indexed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
"#,
&session.uri.to_string(),
&session.cid.to_string(),
&session.uri.authority_as_did().to_string(),
&session.content.get_content().to_string(),
&session.content.get_cid().to_string(),
session.label,
session.created_at.map(|dt| dt.to_string()),
&Datetime::now()?.to_string()
).execute(&mut **tr).await?;
if let Some(participants) = session.other_participants {
for participant in participants {
write_participant(tr, &participant, &session.uri).await?
}
}
Ok(())
}
async fn write_participant(
tr: &mut Transaction<'_, Postgres>, participant: &Participant, sessionuri: &Uri
) -> Result<(), Error> {
let (participant_type, user): (String, &User) = match participant {
Participant::Owner(user) => ("Owner".to_string(), user),
Participant::Added(user) => ("Participant".to_string(), user),
};
query!(r#"
INSERT INTO
participant(participantdid, sessionuri, role)
VALUES ($1, $2, $3)
"#,
user.did.to_string(),
sessionuri.to_string(),
participant_type,
).execute(&mut **tr).await?;
Ok(())
}

View file

@ -0,0 +1,58 @@
use atproto::types::{
Cid,
Uri,
Datetime,
Did,
StrongRef,
Handle,
};
use std::fmt::{Display, Formatter, Result as FmtResult};
pub struct Activity {
pub uri: Uri,
pub cid: Cid,
pub session: Option<StrongRef<Session>>,
pub progress: Option<Progress>,
pub performed_at: Option<Datetime>,
pub created_at: Option<Datetime>,
}
pub struct Session {
pub uri: Uri,
pub cid: Cid,
pub content: StrongRef<Content>,
pub label: Option<String>,
pub created_at: Option<Datetime>,
pub other_participants: Option<Vec<Participant>>,
}
pub struct User {
pub did: Did,
pub handle: Option<Handle>,
}
#[non_exhaustive]
pub enum Participant {
Owner(User),
Added(User),
}
#[non_exhaustive]
pub enum Content {
UnknownContent
}
impl Display for Content {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{}", match self {
Content::UnknownContent => "UnknownContentType",
})
}
}
#[non_exhaustive]
pub enum Progress {
UnknownProgress
}

1
db/src/interfaces/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod direct;

View file

@ -1 +1,4 @@
pub struct db;
pub mod interfaces;
pub mod error;
pub use crate::error::Error;

6
flake.lock generated
View file

@ -41,11 +41,11 @@
"nixpkgs": "nixpkgs_2"
},
"locked": {
"lastModified": 1746585402,
"narHash": "sha256-Pf+ufu6bYNA1+KQKHnGMNEfTwpD9ZIcAeLoE2yPWIP0=",
"lastModified": 1749695868,
"narHash": "sha256-debjTLOyqqsYOUuUGQsAHskFXH5+Kx2t3dOo/FCoNRA=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "72dd969389583664f87aa348b3458f2813693617",
"rev": "55f914d5228b5c8120e9e0f9698ed5b7214d09cd",
"type": "github"
},
"original": {

View file

@ -42,6 +42,7 @@
packages = (with pkgs; [
# The package provided by our custom overlay. Includes cargo, Clippy, cargo-fmt,
# rustdoc, rustfmt, and other tools.
sqlx-cli
rustToolchain
]) ++ pkgs.lib.optionals pkgs.stdenv.isDarwin (with pkgs; [ libiconv ]);
};

View file

@ -5,7 +5,7 @@ edition = "2024"
[dependencies]
anyhow = "1.0.98"
async-trait = "0.1.88"
async-trait.workspace = true
atproto.workspace = true
rocketman = "0.2.0"
serde.workspace = true