Atproto Types Overhaul and DB Interface #3

Merged
Julia merged 4 commits from db-ingestion-interface into main 2025-06-17 15:49:17 -07:00
13 changed files with 243 additions and 119 deletions
Showing only changes of commit 8288620f2e - Show all commits

1
Cargo.lock generated
View file

@ -569,6 +569,7 @@ dependencies = [
"async-trait", "async-trait",
"atproto", "atproto",
"sqlx", "sqlx",
"thiserror 2.0.12",
"tokio", "tokio",
] ]

View file

@ -6,5 +6,6 @@ edition = "2024"
[dependencies] [dependencies]
async-trait.workspace = true async-trait.workspace = true
atproto = { workspace = true, features = ["sqlx-support"] } atproto = { workspace = true, features = ["sqlx-support"] }
sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] } sqlx.workspace = true
thiserror.workspace = true
tokio.workspace = true tokio.workspace = true

View file

@ -0,0 +1,45 @@
-- Add migration script here
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE TABLE actor (
did VARCHAR PRIMARY KEY,
handle VARCHAR UNIQUE,
indexed_at VARCHAR NOT NULL
);
CREATE INDEX actor_handle_trgm_idx ON actor USING gist (handle gist_trgm_ops);
CREATE TABLE session (
uri VARCHAR PRIMARY KEY,
cid VARCHAR NOT NULL,
owner VARCHAR NOT NULL,
content VARCHAR NOT NULL,
contentcid VARCHAR NOT NULL,
label VARCHAR,
-- Participants in participant
created_at VARCHAR,
indexed_at VARCHAR NOT NULL,
sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL
);
CREATE TABLE activity (
uri VARCHAR PRIMARY KEY,
cid VARCHAR NOT NULL,
session VARCHAR,
sessioncid VARCHAR,
-- Progress in progress
performed_at VARCHAR,
created_at VARCHAR,
indexed_at VARCHAR NOT NULL,
sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL
);
CREATE TABLE participant (
participantdid VARCHAR NOT NULL,
sessionuri VARCHAR NOT NULL,
role VARCHAR NOT NULL
);

View file

@ -1,99 +0,0 @@
use sqlx::{
query,
Database,
Pool,
Postgres,
pool::PoolOptions,
postgres::{
PgConnectOptions,
PgSslMode,
},
Result,
};
use std::string::ToString;
pub struct Db<Dbimp: Database> {
pool: Pool<Dbimp>
}
#[non_exhaustive]
enum Role {
Owner,
Participant
}
impl ToString for Role {
fn to_string(&self) -> String {
match *self {
Role::Owner => "owner".to_string(),
Role::Participant => "participant".to_string(),
}
}
}
pub struct Session {
sessionuri: Uri,
label: Option<String>,
participants: Vec<Participant>,
}
impl Db<Postgres> {
async fn connect() -> Result<Self> {
let conn = PgConnectOptions::new()
.host("localhost")
.port(5432)
.username("postgres")
.password("062217")
.database("anisky")
.ssl_mode(PgSslMode::Disable);
let pool = match PoolOptions::new().connect_with(conn).await {
Ok(p) => p,
Err(e) => return Err(e),
};
Ok(Db { pool })
}
//
// pub async fn add_user(&self, user: &User) -> Result<()> {
// query!(r#"
// INSERT INTO users(userdid, handle) VALUES ($1, $2)
// "#,
// user.userdid, user.handle
// ).execute(self.pool).await?;
// Ok(())
// }
//
// pub async fn add_session(&self, session: &Session) -> Result<()> {
// let mut transaction = self.pool.begin().await?;
//
// query!(r#"
// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2)
// "#,
// session.sessionuri, session.label
// ).execute(&mut *transaction).await?;
//
// for participant in session.participants {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(&mut *transaction).await?;
// }
//
// transaction.commit().await
// }
//
// pub async fn add_participant(&self, session: Session,
// participant: Participant) -> Result<Session> {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(self.pool).await?;
//
// session.participants.push(participant);
//
// Ok(session)
// }
}

10
db/src/error.rs Normal file
View file

@ -0,0 +1,10 @@
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Database Implementation Error: {0}")]
Backend(#[from] sqlx::Error),
#[error("AT Protocol Implementation Error: {0}")]
Atproto(#[from] atproto::error::Error),
}
pub type Result<T> = std::result::Result<T, Error>;

View file

@ -1,15 +0,0 @@
use atproto::{
Did,
Uri,
};
pub struct User {
userdid: Did,
handle: Handle,
}
struct Participant {
participantdid: Did,
role: Role,
}

View file

@ -0,0 +1,2 @@
pub mod types;
pub mod functions;

View file

@ -0,0 +1,116 @@
use atproto::types::{
Datetime,
Uri,
};
use crate::{
Error,
interfaces::direct::types::{
Activity,
Participant,
Session,
User
},
};
use sqlx::{
Transaction,
Postgres,
PgPool,
query,
};
pub async fn ingest_activity(
db: PgPool, activity: Activity
) -> Result<(), Error> {
let mut transaction = db.begin().await?;
write_activity(&mut transaction, activity).await?;
transaction.commit().await.map_err(Error::Backend)
}
pub async fn ingest_session(
db: PgPool, session: Session
) -> Result<(), Error> {
let mut transaction = db.begin().await?;
write_session(&mut transaction, session).await?;
transaction.commit().await.map_err(Error::Backend)
}
async fn write_activity(
tr: &mut Transaction<'_, Postgres>, activity: Activity
) -> Result<(), Error> {
let (sessionuri, sessioncid) = match activity.session {
Some(sr) => {
let (session, cid) = sr.extract_content();
let sessionuri = session.uri.to_string();
write_session(tr, session).await?;
(sessionuri, cid.to_string())
}
None => ("".to_string(), "".to_string()),
};
query!(r#"
INSERT INTO
activity(uri, cid, session, sessioncid, performed_at, created_at, indexed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"#,
&activity.uri.to_string(),
&activity.cid.to_string(),
sessionuri,
sessioncid,
activity.performed_at.map(|dt| dt.to_string()),
activity.created_at.map(|dt| dt.to_string()),
&Datetime::now()?.to_string(),
).execute(&mut **tr).await?;
// TODO: Handle Progress
Ok(())
}
async fn write_session(
tr: &mut Transaction<'_, Postgres>, session: Session
) -> Result<(), Error> {
query!(r#"
INSERT INTO
session(uri, cid, owner, content, contentcid, label, created_at, indexed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
"#,
&session.uri.to_string(),
&session.cid.to_string(),
&session.uri.authority_as_did().to_string(),
&session.content.get_content().to_string(),
&session.content.get_cid().to_string(),
session.label,
session.created_at.map(|dt| dt.to_string()),
&Datetime::now()?.to_string()
).execute(&mut **tr).await?;
if let Some(participants) = session.other_participants {
for participant in participants {
write_participant(tr, &participant, &session.uri).await?
}
}
Ok(())
}
async fn write_participant(
tr: &mut Transaction<'_, Postgres>, participant: &Participant, sessionuri: &Uri
) -> Result<(), Error> {
let (participant_type, user): (String, &User) = match participant {
Participant::Owner(user) => ("Owner".to_string(), user),
Participant::Added(user) => ("Participant".to_string(), user),
};
query!(r#"
INSERT INTO
participant(participantdid, sessionuri, role)
VALUES ($1, $2, $3)
"#,
user.did.to_string(),
sessionuri.to_string(),
participant_type,
).execute(&mut **tr).await?;
Ok(())
}

View file

@ -0,0 +1,58 @@
use atproto::types::{
Cid,
Uri,
Datetime,
Did,
StrongRef,
Handle,
};
use std::fmt::{Display, Formatter, Result as FmtResult};
pub struct Activity {
pub uri: Uri,
pub cid: Cid,
pub session: Option<StrongRef<Session>>,
pub progress: Option<Progress>,
pub performed_at: Option<Datetime>,
pub created_at: Option<Datetime>,
}
pub struct Session {
pub uri: Uri,
pub cid: Cid,
pub content: StrongRef<Content>,
pub label: Option<String>,
pub created_at: Option<Datetime>,
pub other_participants: Option<Vec<Participant>>,
}
pub struct User {
pub did: Did,
pub handle: Option<Handle>,
}
#[non_exhaustive]
pub enum Participant {
Owner(User),
Added(User),
}
#[non_exhaustive]
pub enum Content {
UnknownContent
}
impl Display for Content {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{}", match self {
Content::UnknownContent => "UnknownContentType",
})
}
}
#[non_exhaustive]
pub enum Progress {
UnknownProgress
}

1
db/src/interfaces/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod direct;

View file

@ -1 +1,4 @@
pub struct db; pub mod interfaces;
pub mod error;
pub use crate::error::Error;

6
flake.lock generated
View file

@ -41,11 +41,11 @@
"nixpkgs": "nixpkgs_2" "nixpkgs": "nixpkgs_2"
}, },
"locked": { "locked": {
"lastModified": 1746585402, "lastModified": 1749695868,
"narHash": "sha256-Pf+ufu6bYNA1+KQKHnGMNEfTwpD9ZIcAeLoE2yPWIP0=", "narHash": "sha256-debjTLOyqqsYOUuUGQsAHskFXH5+Kx2t3dOo/FCoNRA=",
"owner": "oxalica", "owner": "oxalica",
"repo": "rust-overlay", "repo": "rust-overlay",
"rev": "72dd969389583664f87aa348b3458f2813693617", "rev": "55f914d5228b5c8120e9e0f9698ed5b7214d09cd",
"type": "github" "type": "github"
}, },
"original": { "original": {

View file

@ -42,6 +42,7 @@
packages = (with pkgs; [ packages = (with pkgs; [
# The package provided by our custom overlay. Includes cargo, Clippy, cargo-fmt, # The package provided by our custom overlay. Includes cargo, Clippy, cargo-fmt,
# rustdoc, rustfmt, and other tools. # rustdoc, rustfmt, and other tools.
sqlx-cli
rustToolchain rustToolchain
]) ++ pkgs.lib.optionals pkgs.stdenv.isDarwin (with pkgs; [ libiconv ]); ]) ++ pkgs.lib.optionals pkgs.stdenv.isDarwin (with pkgs; [ libiconv ]);
}; };