Db, add activity ingestor, fix type name

activity has its type name Authority instead of Uri for some reason? So
that is no longer an issue.

EVERYTHING IS STILL UNTESTED BABY~
This commit is contained in:
Julia Lange 2025-06-17 14:39:35 -07:00
parent 2fa7590271
commit 2dfbc314b2
Signed by: Julia
SSH key fingerprint: SHA256:5DJcfxa5/fKCYn57dcabJa2vN2e6eT0pBerYi5SUbto
2 changed files with 43 additions and 69 deletions

View file

@ -5,6 +5,7 @@ use atproto::types::{
use crate::{ use crate::{
Error, Error,
interfaces::direct::types::{ interfaces::direct::types::{
Activity,
Participant, Participant,
Session, Session,
User User
@ -17,6 +18,14 @@ use sqlx::{
query, query,
}; };
pub async fn ingest_activity(
db: PgPool, activity: Activity
) -> Result<(), Error> {
let mut transaction = db.begin().await?;
write_activity(&mut transaction, activity).await?;
transaction.commit().await.map_err(Error::Backend)
}
pub async fn ingest_session( pub async fn ingest_session(
db: PgPool, session: Session db: PgPool, session: Session
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -25,6 +34,39 @@ pub async fn ingest_session(
transaction.commit().await.map_err(Error::Backend) transaction.commit().await.map_err(Error::Backend)
} }
async fn write_activity(
tr: &mut Transaction<'_, Postgres>, activity: Activity
) -> Result<(), Error> {
let (sessionuri, sessioncid) = match activity.session {
Some(sr) => {
let (session, cid) = sr.extract_content();
let sessionuri = session.uri.to_string();
write_session(tr, session).await?;
(sessionuri, cid.to_string())
}
None => ("".to_string(), "".to_string()),
};
query!(r#"
INSERT INTO
activity(uri, cid, session, sessioncid, performed_at, created_at, indexed_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"#,
&activity.uri.to_string(),
&activity.cid.to_string(),
sessionuri,
sessioncid,
activity.performed_at.map(|dt| dt.to_string()),
activity.created_at.map(|dt| dt.to_string()),
&Datetime::now()?.to_string(),
).execute(&mut **tr).await?;
// TODO: Handle Progress
Ok(())
}
async fn write_session( async fn write_session(
tr: &mut Transaction<'_, Postgres>, session: Session tr: &mut Transaction<'_, Postgres>, session: Session
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -72,70 +114,3 @@ async fn write_participant(
Ok(()) Ok(())
} }
// pub fn injest_activity(db: PgPool, activity: Activity) -> Result<(), Error> {
// todo!();
// }
//
// let mut transaction = db.begin().await?;
//
// query!(r#"
// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2)
// "#,
// session.sessionuri, session.label
// ).execute(&mut *transaction).await?;
//
// for participant in session.participants {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(&mut *transaction).await?;
// }
//
// transaction.commit().await
// }
//
// pub async fn ingest_participant(db: PgPool, participant: Participant) -> Result<(), Error> {
// query!(r#"
// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2)
// "#,
// session.sessionuri, session.label
// ).execute(&mut *transaction).await?;
//
// for participant in session.participants {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(&mut *transaction).await?;
// }
//
// transaction.commit().await
// }
//
// pub async fn add_user(&self, user: &User) -> Result<()> {
// query!(r#"
// INSERT INTO users(userdid, handle) VALUES ($1, $2)
// "#,
// user.userdid, user.handle
// ).execute(self.pool).await?;
// Ok(())
// }
//
// pub async fn add_session(&self, session: &Session) -> Result<()> {
// }
//
// pub async fn add_participant(&self, session: Session,
// participant: Participant) -> Result<Session> {
// query!(r#"
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
// "#,
// session.sessionuri, participant.userdid, participant.role.to_string()
// ).execute(self.pool).await?;
//
// session.participants.push(participant);
//
// Ok(session)
// }

View file

@ -1,4 +1,3 @@
use crate::Error;
use atproto::types::{ use atproto::types::{
Cid, Cid,
Uri, Uri,
@ -10,7 +9,7 @@ use atproto::types::{
use std::fmt::{Display, Formatter, Result as FmtResult}; use std::fmt::{Display, Formatter, Result as FmtResult};
pub struct Activity { pub struct Activity {
pub authority: Uri, pub uri: Uri,
pub cid: Cid, pub cid: Cid,
pub session: Option<StrongRef<Session>>, pub session: Option<StrongRef<Session>>,