Compare commits

...

5 commits

Author SHA1 Message Date
2dfbc314b2
Db, add activity ingestor, fix type name
activity has its type name Authority instead of Uri for some reason? So
that is no longer an issue.

EVERYTHING IS STILL UNTESTED BABY~
2025-06-17 14:39:35 -07:00
2fa7590271
Atproto, add extraction of values from StrongRef 2025-06-17 14:38:23 -07:00
8160b161f5
Db, rename spoor to direct, implement sessions 2025-06-17 14:06:21 -07:00
df1da0905d
Db, add atproto error support 2025-06-17 13:55:30 -07:00
1d1f8ae60f
Atproto, add getters to StrongRef 2025-06-17 13:54:55 -07:00
8 changed files with 196 additions and 182 deletions

View file

@ -68,3 +68,17 @@ pub struct StrongRef<T> {
content: T, content: T,
cid: Cid, cid: Cid,
} }
impl<T> StrongRef<T> {
pub fn get_content(&self) -> &T {
&self.content
}
pub fn extract_content(self) -> (T, Cid) {
(self.content, self.cid)
}
pub fn get_cid(&self) -> &Cid {
&self.cid
}
}

View file

@ -19,7 +19,7 @@ CREATE TABLE session (
label VARCHAR, label VARCHAR,
-- Participants in participant -- Participants in participant
created_at VARCHAR NOT NULL, created_at VARCHAR,
indexed_at VARCHAR NOT NULL, indexed_at VARCHAR NOT NULL,
sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL
); );
@ -33,13 +33,13 @@ CREATE TABLE activity (
-- Progress in progress -- Progress in progress
performed_at VARCHAR, performed_at VARCHAR,
created_at VARCHAR NOT NULL, created_at VARCHAR,
indexed_at VARCHAR NOT NULL, indexed_at VARCHAR NOT NULL,
sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL
); );
CREATE TABLE participant ( CREATE TABLE participant (
participantdid VARCHAR NOT NULL, participantdid VARCHAR NOT NULL,
session VARCHAR NOT NULL, sessionuri VARCHAR NOT NULL,
role VARCHAR NOT NULL role VARCHAR NOT NULL
); );

View file

@ -3,6 +3,8 @@
pub enum Error { pub enum Error {
#[error("Database Implementation Error: {0}")] #[error("Database Implementation Error: {0}")]
Backend(#[from] sqlx::Error), Backend(#[from] sqlx::Error),
#[error("AT Protocol Implementation Error: {0}")]
Atproto(#[from] atproto::error::Error),
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;

View file

@ -0,0 +1,2 @@
pub mod types;
pub mod functions;

View file

@ -0,0 +1,116 @@
use atproto::types::{
Datetime,
Uri,
};
use crate::{
Error,
interfaces::direct::types::{
Activity,
Participant,
Session,
User
},
};
use sqlx::{
Transaction,
Postgres,
PgPool,
query,
};
pub async fn ingest_activity(
db: PgPool, activity: Activity
) -> Result<(), Error> {
let mut transaction = db.begin().await?;
write_activity(&mut transaction, activity).await?;
transaction.commit().await.map_err(Error::Backend)
}
pub async fn ingest_session(
db: PgPool, session: Session
) -> Result<(), Error> {
let mut transaction = db.begin().await?;
write_session(&mut transaction, session).await?;
transaction.commit().await.map_err(Error::Backend)
}
async fn write_activity(
tr: &mut Transaction<'_, Postgres>, activity: Activity
) -> Result<(), Error> {
let (sessionuri, sessioncid) = match activity.session {
Some(sr) => {
let (session, cid) = sr.extract_content();
let sessionuri = session.uri.to_string();
write_session(tr, session).await?;
(sessionuri, cid.to_string())
}
None => ("".to_string(), "".to_string()),
};
query!(r#"
INSERT INTO
activity(uri, cid, session, sessioncid, performed_at, created_at, indexed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"#,
&activity.uri.to_string(),
&activity.cid.to_string(),
sessionuri,
sessioncid,
activity.performed_at.map(|dt| dt.to_string()),
activity.created_at.map(|dt| dt.to_string()),
&Datetime::now()?.to_string(),
).execute(&mut **tr).await?;
// TODO: Handle Progress
Ok(())
}
async fn write_session(
tr: &mut Transaction<'_, Postgres>, session: Session
) -> Result<(), Error> {
query!(r#"
INSERT INTO
session(uri, cid, owner, content, contentcid, label, created_at, indexed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
"#,
&session.uri.to_string(),
&session.cid.to_string(),
&session.uri.authority_as_did().to_string(),
&session.content.get_content().to_string(),
&session.content.get_cid().to_string(),
session.label,
session.created_at.map(|dt| dt.to_string()),
&Datetime::now()?.to_string()
).execute(&mut **tr).await?;
if let Some(participants) = session.other_participants {
for participant in participants {
write_participant(tr, &participant, &session.uri).await?
}
}
Ok(())
}
async fn write_participant(
tr: &mut Transaction<'_, Postgres>, participant: &Participant, sessionuri: &Uri
) -> Result<(), Error> {
let (participant_type, user): (String, &User) = match participant {
Participant::Owner(user) => ("Owner".to_string(), user),
Participant::Added(user) => ("Participant".to_string(), user),
};
query!(r#"
INSERT INTO
participant(participantdid, sessionuri, role)
VALUES ($1, $2, $3)
"#,
user.did.to_string(),
sessionuri.to_string(),
participant_type,
).execute(&mut **tr).await?;
Ok(())
}

View file

@ -0,0 +1,58 @@
use atproto::types::{
Cid,
Uri,
Datetime,
Did,
StrongRef,
Handle,
};
use std::fmt::{Display, Formatter, Result as FmtResult};
pub struct Activity {
pub uri: Uri,
pub cid: Cid,
pub session: Option<StrongRef<Session>>,
pub progress: Option<Progress>,
pub performed_at: Option<Datetime>,
pub created_at: Option<Datetime>,
}
pub struct Session {
pub uri: Uri,
pub cid: Cid,
pub content: StrongRef<Content>,
pub label: Option<String>,
pub created_at: Option<Datetime>,
pub other_participants: Option<Vec<Participant>>,
}
pub struct User {
pub did: Did,
pub handle: Option<Handle>,
}
#[non_exhaustive]
pub enum Participant {
Owner(User),
Added(User),
}
#[non_exhaustive]
pub enum Content {
UnknownContent
}
impl Display for Content {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "{}", match self {
Content::UnknownContent => "UnknownContentType",
})
}
}
#[non_exhaustive]
pub enum Progress {
UnknownProgress
}

View file

@ -1 +1 @@
pub mod spoor; pub mod direct;

View file

@ -1,178 +0,0 @@
use crate::Error;
use atproto::{
Cid,
Uri,
Datetime,
Did,
StrongRef,
Handle,
};
use sqlx::{
PgTransaction,
PgPool,
query,
};
pub struct Activity {
pub authority: Uri,
pub cid: Option<Cid>,
pub session: Option<StrongRef<Session>>,
pub progress: Option<Progress>,
pub performed_at: Option<Datetime>,
pub created_at: Option<Datetime>,
}
pub struct Session {
pub uri: Uri,
pub cid: Option<Cid>,
pub content: Option<StrongRef<Content>>,
pub label: Option<String>,
pub created_at: Option<Datetime>,
pub other_participants: Option<Vec<Participant>>,
}
pub struct User {
pub did: Did,
pub handle: Option<Handle>,
}
#[non_exhaustive]
pub enum Participant {
Owner(User),
Added(User),
}
#[non_exhaustive]
pub enum Content {
UnknownContent
}
#[non_exhaustive]
pub enum Progress {
UnknownProgress
}
pub async fn ingest_session(
db: PgPool, session: Session
) -> Result<(), Error> {
let mut transaction = db.begin().await?;
write_session(&mut transaction, session).await?;
transaction.commit().await.map_err(Error::Backend)
}
async fn write_session(
tr: &mut PgTransaction<'_>, session: Session
) -> Result<(), Error> {
let (contenturi, contentcid): (Option<Content>, String) =
match session.content {
Some(sr) => (Some(sr.content), sr.cid.to_string()),
None => (None, "".to_string()),
};
query!(r#"
INSERT INTO sessions(
sessionuri, sessioncid, label, created_at, contenturi, contentcid
) VALUES ($1, $2, $3, $4, $5, $6)
"#,
session.uri,
session.cid.map_or("", |cid| cid.to_string()),
session.label.unwrap_or(""),
session.created_at.map_or("", |dt| (*dt.as_str()).to_string()),
contenturi,
contentcid,
).execute(&mut tr).await?;
session.other_participants.and_then(|participants| {
for participant in participants {
write_participant(tr, participant, session.uri).await?
}
}).unwrap_or(Ok(()))
}
async fn write_participant(
tr: &mut PgTransaction<'_>, participant: Participant, sessionuri: Uri
) -> Result<(), Error> {
let (participantType, user): (String, User) = match participant {
Participant::Owner(user) => ("Owner".to_string(), user),
Participant::Added(user) => ("Participant".to_string(), user),
};
query!(r#"
INSERT INTO participants(
sessionuri, userdid, role
) VALUES ($1, $2, $3)
"#, sessionuri.to_string(), user.did.to_string, participantType
).execute(&mut tr).await?;
Ok(())
}
// pub fn injest_activity(db: PgPool, activity: Activity) -> Result<(), Error> {
// todo!();
// }
//
// let mut transaction = db.begin().await?;
//
// query!(r#"
// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2)
// "#,
// session.sessionuri, session.label
// ).execute(&mut *transaction).await?;
//
// for participant in session.participants {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(&mut *transaction).await?;
// }
//
// transaction.commit().await
// }
//
// pub async fn ingest_participant(db: PgPool, participant: Participant) -> Result<(), Error> {
// query!(r#"
// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2)
// "#,
// session.sessionuri, session.label
// ).execute(&mut *transaction).await?;
//
// for participant in session.participants {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(&mut *transaction).await?;
// }
//
// transaction.commit().await
// }
//
// pub async fn add_user(&self, user: &User) -> Result<()> {
// query!(r#"
// INSERT INTO users(userdid, handle) VALUES ($1, $2)
// "#,
// user.userdid, user.handle
// ).execute(self.pool).await?;
// Ok(())
// }
//
// pub async fn add_session(&self, session: &Session) -> Result<()> {
// }
//
// pub async fn add_participant(&self, session: Session,
// participant: Participant) -> Result<Session> {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(self.pool).await?;
//
// session.participants.push(participant);
//
// Ok(session)
// }