Db, first commit of work

This also touches atproto/src/lib.rs which may be a problem for the
rebase~

Adds migrations and all the other stuff.
This commit is contained in:
Julia Lange 2025-06-11 17:29:59 -07:00
parent 3eb6aab10f
commit aadea9757b
Signed by: Julia
SSH key fingerprint: SHA256:5DJcfxa5/fKCYn57dcabJa2vN2e6eT0pBerYi5SUbto
12 changed files with 270 additions and 82 deletions

View file

@ -4,6 +4,8 @@ version = "0.1.0"
edition = "2024"
[dependencies]
thiserror = "2.0.12"
atproto.workspace = true
async-trait.workspace = true
sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] }
tokio.workspace = true

View file

@ -0,0 +1,45 @@
-- Add migration script here
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE TABLE actor (
did VARCHAR PRIMARY KEY,
handle VARCHAR UNIQUE,
indexed_at VARCHAR NOT NULL
);
CREATE INDEX actor_handle_trgm_idx ON actor USING gist (handle gist_trgm_ops);
CREATE TABLE session (
uri VARCHAR PRIMARY KEY,
cid VARCHAR NOT NULL,
owner VARCHAR NOT NULL,
content VARCHAR NOT NULL,
contentcid VARCHAR NOT NULL,
label VARCHAR,
-- Participants in participant
created_at VARCHAR NOT NULL,
indexed_at VARCHAR NOT NULL,
sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL
);
CREATE TABLE activity (
uri VARCHAR PRIMARY KEY,
cid VARCHAR NOT NULL,
session VARCHAR,
sessioncid VARCHAR,
-- Progress in progress
performed_at VARCHAR,
created_at VARCHAR NOT NULL,
indexed_at VARCHAR NOT NULL,
sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL
);
CREATE TABLE participant (
participantdid VARCHAR NOT NULL,
session VARCHAR NOT NULL,
role VARCHAR NOT NULL
);

View file

@ -1,19 +1,7 @@
use sqlx::{
query,
Database,
Pool,
Postgres,
pool::PoolOptions,
postgres::{
PgConnectOptions,
PgSslMode,
},
Result,
};
use std::string::ToString;
pub struct Db<Dbimp: Database> {
pool: Pool<Dbimp>
pub struct Db<Db: Database> {
pool: Pool<Db>
}
#[non_exhaustive]
@ -39,13 +27,6 @@ pub struct Session {
impl Db<Postgres> {
async fn connect() -> Result<Self> {
let conn = PgConnectOptions::new()
.host("localhost")
.port(5432)
.username("postgres")
.password("062217")
.database("anisky")
.ssl_mode(PgSslMode::Disable);
let pool = match PoolOptions::new().connect_with(conn).await {
Ok(p) => p,
@ -55,45 +36,4 @@ impl Db<Postgres> {
Ok(Db { pool })
}
//
// pub async fn add_user(&self, user: &User) -> Result<()> {
// query!(r#"
// INSERT INTO users(userdid, handle) VALUES ($1, $2)
// "#,
// user.userdid, user.handle
// ).execute(self.pool).await?;
// Ok(())
// }
//
// pub async fn add_session(&self, session: &Session) -> Result<()> {
// let mut transaction = self.pool.begin().await?;
//
// query!(r#"
// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2)
// "#,
// session.sessionuri, session.label
// ).execute(&mut *transaction).await?;
//
// for participant in session.participants {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(&mut *transaction).await?;
// }
//
// transaction.commit().await
// }
//
// pub async fn add_participant(&self, session: Session,
// participant: Participant) -> Result<Session> {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(self.pool).await?;
//
// session.participants.push(participant);
//
// Ok(session)
// }
}

8
db/src/error.rs Normal file
View file

@ -0,0 +1,8 @@
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Database Implementation Error: {0}")]
Backend(#[from] sqlx::Error),
}
pub type Result<T> = std::result::Result<T, Error>;

View file

@ -1,15 +0,0 @@
use atproto::{
Did,
Uri,
};
pub struct User {
userdid: Did,
handle: Handle,
}
struct Participant {
participantdid: Did,
role: Role,
}

1
db/src/interfaces/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod spoor;

178
db/src/interfaces/spoor.rs Normal file
View file

@ -0,0 +1,178 @@
use crate::Error;
use atproto::{
Cid,
Uri,
Datetime,
Did,
StrongRef,
Handle,
};
use sqlx::{
PgTransaction,
PgPool,
query,
};
pub struct Activity {
pub authority: Uri,
pub cid: Option<Cid>,
pub session: Option<StrongRef<Session>>,
pub progress: Option<Progress>,
pub performed_at: Option<Datetime>,
pub created_at: Option<Datetime>,
}
pub struct Session {
pub uri: Uri,
pub cid: Option<Cid>,
pub content: Option<StrongRef<Content>>,
pub label: Option<String>,
pub created_at: Option<Datetime>,
pub other_participants: Option<Vec<Participant>>,
}
pub struct User {
pub did: Did,
pub handle: Option<Handle>,
}
#[non_exhaustive]
pub enum Participant {
Owner(User),
Added(User),
}
#[non_exhaustive]
pub enum Content {
UnknownContent
}
#[non_exhaustive]
pub enum Progress {
UnknownProgress
}
pub async fn ingest_session(
db: PgPool, session: Session
) -> Result<(), Error> {
let mut transaction = db.begin().await?;
write_session(&mut transaction, session).await?;
transaction.commit().await.map_err(Error::Backend)
}
async fn write_session(
tr: &mut PgTransaction<'_>, session: Session
) -> Result<(), Error> {
let (contenturi, contentcid): (Option<Content>, String) =
match session.content {
Some(sr) => (Some(sr.content), sr.cid.to_string()),
None => (None, "".to_string()),
};
query!(r#"
INSERT INTO sessions(
sessionuri, sessioncid, label, created_at, contenturi, contentcid
) VALUES ($1, $2, $3, $4, $5, $6)
"#,
session.uri,
session.cid.map_or("", |cid| cid.to_string()),
session.label.unwrap_or(""),
session.created_at.map_or("", |dt| (*dt.as_str()).to_string()),
contenturi,
contentcid,
).execute(&mut tr).await?;
session.other_participants.and_then(|participants| {
for participant in participants {
write_participant(tr, participant, session.uri).await?
}
}).unwrap_or(Ok(()))
}
async fn write_participant(
tr: &mut PgTransaction<'_>, participant: Participant, sessionuri: Uri
) -> Result<(), Error> {
let (participantType, user): (String, User) = match participant {
Participant::Owner(user) => ("Owner".to_string(), user),
Participant::Added(user) => ("Participant".to_string(), user),
};
query!(r#"
INSERT INTO participants(
sessionuri, userdid, role
) VALUES ($1, $2, $3)
"#, sessionuri.to_string(), user.did.to_string, participantType
).execute(&mut tr).await?;
Ok(())
}
// pub fn injest_activity(db: PgPool, activity: Activity) -> Result<(), Error> {
// todo!();
// }
//
// let mut transaction = db.begin().await?;
//
// query!(r#"
// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2)
// "#,
// session.sessionuri, session.label
// ).execute(&mut *transaction).await?;
//
// for participant in session.participants {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(&mut *transaction).await?;
// }
//
// transaction.commit().await
// }
//
// pub async fn ingest_participant(db: PgPool, participant: Participant) -> Result<(), Error> {
// query!(r#"
// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2)
// "#,
// session.sessionuri, session.label
// ).execute(&mut *transaction).await?;
//
// for participant in session.participants {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(&mut *transaction).await?;
// }
//
// transaction.commit().await
// }
//
// pub async fn add_user(&self, user: &User) -> Result<()> {
// query!(r#"
// INSERT INTO users(userdid, handle) VALUES ($1, $2)
// "#,
// user.userdid, user.handle
// ).execute(self.pool).await?;
// Ok(())
// }
//
// pub async fn add_session(&self, session: &Session) -> Result<()> {
// }
//
// pub async fn add_participant(&self, session: Session,
// participant: Participant) -> Result<Session> {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(self.pool).await?;
//
// session.participants.push(participant);
//
// Ok(session)
// }

View file

@ -1 +1,7 @@
pub struct db;
pub mod interfaces;
pub mod error;
pub use crate::error::Error;
// pub struct db;