Db, first commit of work

This also touches atproto/src/lib.rs which may be a problem for the
rebase~

Adds migrations and all the other stuff.

Db, Commit two, everything is looking pretty

Db, add atproto error support

Db, rename spoor to direct, implement sessions

Db, add activity ingestor, fix type name

activity has its type name Authority instead of Uri for some reason? So
that is no longer an issue.

EVERYTHING IS STILL UNTESTED BABY~
This commit is contained in:
Julia Lange 2025-06-11 17:29:59 -07:00
parent c1b5b774d5
commit 7274be1d91
Signed by: Julia
SSH key fingerprint: SHA256:5DJcfxa5/fKCYn57dcabJa2vN2e6eT0pBerYi5SUbto
13 changed files with 248 additions and 81 deletions

1
Cargo.lock generated
View file

@ -569,6 +569,7 @@ dependencies = [
"async-trait",
"atproto",
"sqlx",
"thiserror 2.0.12",
"tokio",
]

View file

@ -4,6 +4,8 @@ version = "0.1.0"
edition = "2024"
[dependencies]
thiserror = "2.0.12"
atproto.workspace = true
async-trait.workspace = true
atproto = { workspace = true, features = ["sqlx-support"] }
sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] }

View file

@ -0,0 +1,45 @@
-- Add migration script here
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE TABLE actor (
did VARCHAR PRIMARY KEY,
handle VARCHAR UNIQUE,
indexed_at VARCHAR NOT NULL
);
CREATE INDEX actor_handle_trgm_idx ON actor USING gist (handle gist_trgm_ops);
CREATE TABLE session (
uri VARCHAR PRIMARY KEY,
cid VARCHAR NOT NULL,
owner VARCHAR NOT NULL,
content VARCHAR NOT NULL,
contentcid VARCHAR NOT NULL,
label VARCHAR,
-- Participants in participant
created_at VARCHAR,
indexed_at VARCHAR NOT NULL,
sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL
);
CREATE TABLE activity (
uri VARCHAR PRIMARY KEY,
cid VARCHAR NOT NULL,
session VARCHAR,
sessioncid VARCHAR,
-- Progress in progress
performed_at VARCHAR,
created_at VARCHAR,
indexed_at VARCHAR NOT NULL,
sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL
);
CREATE TABLE participant (
participantdid VARCHAR NOT NULL,
sessionuri VARCHAR NOT NULL,
role VARCHAR NOT NULL
);

View file

@ -1,19 +1,7 @@
use sqlx::{
query,
Database,
Pool,
Postgres,
pool::PoolOptions,
postgres::{
PgConnectOptions,
PgSslMode,
},
Result,
};
use std::string::ToString;
pub struct Db<Dbimp: Database> {
pool: Pool<Dbimp>
pub struct Db<Db: Database> {
pool: Pool<Db>
}
#[non_exhaustive]
@ -39,13 +27,6 @@ pub struct Session {
impl Db<Postgres> {
async fn connect() -> Result<Self> {
let conn = PgConnectOptions::new()
.host("localhost")
.port(5432)
.username("postgres")
.password("062217")
.database("anisky")
.ssl_mode(PgSslMode::Disable);
let pool = match PoolOptions::new().connect_with(conn).await {
Ok(p) => p,
@ -55,45 +36,4 @@ impl Db<Postgres> {
Ok(Db { pool })
}
//
// pub async fn add_user(&self, user: &User) -> Result<()> {
// query!(r#"
// INSERT INTO users(userdid, handle) VALUES ($1, $2)
// "#,
// user.userdid, user.handle
// ).execute(self.pool).await?;
// Ok(())
// }
//
// pub async fn add_session(&self, session: &Session) -> Result<()> {
// let mut transaction = self.pool.begin().await?;
//
// query!(r#"
// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2)
// "#,
// session.sessionuri, session.label
// ).execute(&mut *transaction).await?;
//
// for participant in session.participants {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(&mut *transaction).await?;
// }
//
// transaction.commit().await
// }
//
// pub async fn add_participant(&self, session: Session,
// participant: Participant) -> Result<Session> {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(self.pool).await?;
//
// session.participants.push(participant);
//
// Ok(session)
// }
}

10
db/src/error.rs Normal file
View file

@ -0,0 +1,10 @@
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Database Implementation Error: {0}")]
Backend(#[from] sqlx::Error),
#[error("AT Protocol Implementation Error: {0}")]
Atproto(#[from] atproto::error::Error),
}
pub type Result<T> = std::result::Result<T, Error>;

View file

@ -1,15 +0,0 @@
use atproto::{
Did,
Uri,
};
pub struct User {
userdid: Did,
handle: Handle,
}
struct Participant {
participantdid: Did,
role: Role,
}

View file

@ -0,0 +1,2 @@
pub mod types;
pub mod functions;

View file

@ -0,0 +1,116 @@
use atproto::types::{
Datetime,
Uri,
};
use crate::{
Error,
interfaces::direct::types::{
Activity,
Participant,
Session,
User
},
};
use sqlx::{
Transaction,
Postgres,
PgPool,
query,
};
pub async fn ingest_activity(
db: PgPool, activity: Activity
) -> Result<(), Error> {
let mut transaction = db.begin().await?;
write_activity(&mut transaction, activity).await?;
transaction.commit().await.map_err(Error::Backend)
}
pub async fn ingest_session(
db: PgPool, session: Session
) -> Result<(), Error> {
let mut transaction = db.begin().await?;
write_session(&mut transaction, session).await?;
transaction.commit().await.map_err(Error::Backend)
}
async fn write_activity(
tr: &mut Transaction<'_, Postgres>, activity: Activity
) -> Result<(), Error> {
let (sessionuri, sessioncid) = match activity.session {
Some(sr) => {
let (session, cid) = sr.extract_content();
let sessionuri = session.uri.to_string();
write_session(tr, session).await?;
(sessionuri, cid.to_string())
}
None => ("".to_string(), "".to_string()),
};
query!(r#"
INSERT INTO
activity(uri, cid, session, sessioncid, performed_at, created_at, indexed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"#,
&activity.uri.to_string(),
&activity.cid.to_string(),
sessionuri,
sessioncid,
activity.performed_at.map(|dt| dt.to_string()),
activity.created_at.map(|dt| dt.to_string()),
&Datetime::now()?.to_string(),
).execute(&mut **tr).await?;
// TODO: Handle Progress
Ok(())
}
async fn write_session(
tr: &mut Transaction<'_, Postgres>, session: Session
) -> Result<(), Error> {
query!(r#"
INSERT INTO
session(uri, cid, owner, content, contentcid, label, created_at, indexed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
"#,
&session.uri.to_string(),
&session.cid.to_string(),
&session.uri.authority_as_did().to_string(),
&session.content.get_content().to_string(),
&session.content.get_cid().to_string(),
session.label,
session.created_at.map(|dt| dt.to_string()),
&Datetime::now()?.to_string()
).execute(&mut **tr).await?;
if let Some(participants) = session.other_participants {
for participant in participants {
write_participant(tr, &participant, &session.uri).await?
}
}
Ok(())
}
async fn write_participant(
tr: &mut Transaction<'_, Postgres>, participant: &Participant, sessionuri: &Uri
) -> Result<(), Error> {
let (participant_type, user): (String, &User) = match participant {
Participant::Owner(user) => ("Owner".to_string(), user),
Participant::Added(user) => ("Participant".to_string(), user),
};
query!(r#"
INSERT INTO
participant(participantdid, sessionuri, role)
VALUES ($1, $2, $3)
"#,
user.did.to_string(),
sessionuri.to_string(),
participant_type,
).execute(&mut **tr).await?;
Ok(())
}

View file

@ -0,0 +1,58 @@
use atproto::types::{
Cid,
Uri,
Datetime,
Did,
StrongRef,
Handle,
};
use std::fmt::{Display, Formatter, Result as FmtResult};
pub struct Activity {
pub uri: Uri,
pub cid: Cid,
pub session: Option<StrongRef<Session>>,
pub progress: Option<Progress>,
pub performed_at: Option<Datetime>,
pub created_at: Option<Datetime>,
}
pub struct Session {
pub uri: Uri,
pub cid: Cid,
pub content: StrongRef<Content>,
pub label: Option<String>,
pub created_at: Option<Datetime>,
pub other_participants: Option<Vec<Participant>>,
}
pub struct User {
pub did: Did,
pub handle: Option<Handle>,
}
#[non_exhaustive]
pub enum Participant {
Owner(User),
Added(User),
}
#[non_exhaustive]
pub enum Content {
UnknownContent
}
impl Display for Content {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{}", match self {
Content::UnknownContent => "UnknownContentType",
})
}
}
#[non_exhaustive]
pub enum Progress {
UnknownProgress
}

1
db/src/interfaces/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod direct;

View file

@ -1 +1,7 @@
pub struct db;
pub mod interfaces;
pub mod error;
pub use crate::error::Error;
// pub struct db;

6
flake.lock generated
View file

@ -41,11 +41,11 @@
"nixpkgs": "nixpkgs_2"
},
"locked": {
"lastModified": 1746585402,
"narHash": "sha256-Pf+ufu6bYNA1+KQKHnGMNEfTwpD9ZIcAeLoE2yPWIP0=",
"lastModified": 1749695868,
"narHash": "sha256-debjTLOyqqsYOUuUGQsAHskFXH5+Kx2t3dOo/FCoNRA=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "72dd969389583664f87aa348b3458f2813693617",
"rev": "55f914d5228b5c8120e9e0f9698ed5b7214d09cd",
"type": "github"
},
"original": {

View file

@ -42,6 +42,7 @@
packages = (with pkgs; [
# The package provided by our custom overlay. Includes cargo, Clippy, cargo-fmt,
# rustdoc, rustfmt, and other tools.
sqlx-cli
rustToolchain
]) ++ pkgs.lib.optionals pkgs.stdenv.isDarwin (with pkgs; [ libiconv ]);
};