use crate::Error; use atproto::{ Cid, Uri, Datetime, Did, StrongRef, Handle, }; use sqlx::{ PgTransaction, PgPool, query, }; pub struct Activity { pub authority: Uri, pub cid: Option, pub session: Option>, pub progress: Option, pub performed_at: Option, pub created_at: Option, } pub struct Session { pub uri: Uri, pub cid: Option, pub content: Option>, pub label: Option, pub created_at: Option, pub other_participants: Option>, } pub struct User { pub did: Did, pub handle: Option, } #[non_exhaustive] pub enum Participant { Owner(User), Added(User), } #[non_exhaustive] pub enum Content { UnknownContent } #[non_exhaustive] pub enum Progress { UnknownProgress } pub async fn ingest_session( db: PgPool, session: Session ) -> Result<(), Error> { let mut transaction = db.begin().await?; write_session(&mut transaction, session).await?; transaction.commit().await.map_err(Error::Backend) } async fn write_session( tr: &mut PgTransaction<'_>, session: Session ) -> Result<(), Error> { let (contenturi, contentcid): (Option, String) = match session.content { Some(sr) => (Some(sr.content), sr.cid.to_string()), None => (None, "".to_string()), }; query!(r#" INSERT INTO sessions( sessionuri, sessioncid, label, created_at, contenturi, contentcid ) VALUES ($1, $2, $3, $4, $5, $6) "#, session.uri, session.cid.map_or("", |cid| cid.to_string()), session.label.unwrap_or(""), session.created_at.map_or("", |dt| (*dt.as_str()).to_string()), contenturi, contentcid, ).execute(&mut tr).await?; session.other_participants.and_then(|participants| { for participant in participants { write_participant(tr, participant, session.uri).await? } }).unwrap_or(Ok(())) } async fn write_participant( tr: &mut PgTransaction<'_>, participant: Participant, sessionuri: Uri ) -> Result<(), Error> { let (participantType, user): (String, User) = match participant { Participant::Owner(user) => ("Owner".to_string(), user), Participant::Added(user) => ("Participant".to_string(), user), }; query!(r#" INSERT INTO participants( sessionuri, userdid, role ) VALUES ($1, $2, $3) "#, sessionuri.to_string(), user.did.to_string, participantType ).execute(&mut tr).await?; Ok(()) } // pub fn injest_activity(db: PgPool, activity: Activity) -> Result<(), Error> { // todo!(); // } // // let mut transaction = db.begin().await?; // // query!(r#" // INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) // "#, // session.sessionuri, session.label // ).execute(&mut *transaction).await?; // // for participant in session.participants { // query!(r#" // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) // "#, // session.sessionuri, participant.userdid, participant.role.to_string() // ).execute(&mut *transaction).await?; // } // // transaction.commit().await // } // // pub async fn ingest_participant(db: PgPool, participant: Participant) -> Result<(), Error> { // query!(r#" // INSERT INTO sessions(sessionuri, label) VALUES ($1, $2) // "#, // session.sessionuri, session.label // ).execute(&mut *transaction).await?; // // for participant in session.participants { // query!(r#" // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) // "#, // session.sessionuri, participant.userdid, participant.role.to_string() // ).execute(&mut *transaction).await?; // } // // transaction.commit().await // } // // pub async fn add_user(&self, user: &User) -> Result<()> { // query!(r#" // INSERT INTO users(userdid, handle) VALUES ($1, $2) // "#, // user.userdid, user.handle // ).execute(self.pool).await?; // Ok(()) // } // // pub async fn add_session(&self, session: &Session) -> Result<()> { // } // // pub async fn add_participant(&self, session: Session, // participant: Participant) -> Result { // query!(r#" // INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3) // "#, // session.sessionuri, participant.userdid, participant.role.to_string() // ).execute(self.pool).await?; // // session.participants.push(participant); // // Ok(session) // }