Compare commits
5 commits
0573e1aa43
...
2dfbc314b2
| Author | SHA1 | Date | |
|---|---|---|---|
| 2dfbc314b2 | |||
| 2fa7590271 | |||
| 8160b161f5 | |||
| df1da0905d | |||
| 1d1f8ae60f |
8 changed files with 196 additions and 182 deletions
|
|
@ -68,3 +68,17 @@ pub struct StrongRef<T> {
|
||||||
content: T,
|
content: T,
|
||||||
cid: Cid,
|
cid: Cid,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T> StrongRef<T> {
|
||||||
|
pub fn get_content(&self) -> &T {
|
||||||
|
&self.content
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn extract_content(self) -> (T, Cid) {
|
||||||
|
(self.content, self.cid)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_cid(&self) -> &Cid {
|
||||||
|
&self.cid
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ CREATE TABLE session (
|
||||||
label VARCHAR,
|
label VARCHAR,
|
||||||
-- Participants in participant
|
-- Participants in participant
|
||||||
|
|
||||||
created_at VARCHAR NOT NULL,
|
created_at VARCHAR,
|
||||||
indexed_at VARCHAR NOT NULL,
|
indexed_at VARCHAR NOT NULL,
|
||||||
sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL
|
sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL
|
||||||
);
|
);
|
||||||
|
|
@ -33,13 +33,13 @@ CREATE TABLE activity (
|
||||||
-- Progress in progress
|
-- Progress in progress
|
||||||
|
|
||||||
performed_at VARCHAR,
|
performed_at VARCHAR,
|
||||||
created_at VARCHAR NOT NULL,
|
created_at VARCHAR,
|
||||||
indexed_at VARCHAR NOT NULL,
|
indexed_at VARCHAR NOT NULL,
|
||||||
sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL
|
sort_at VARCHAR GENERATED ALWAYS AS (LEAST(created_at,indexed_at)) STORED NOT NULL
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE participant (
|
CREATE TABLE participant (
|
||||||
participantdid VARCHAR NOT NULL,
|
participantdid VARCHAR NOT NULL,
|
||||||
session VARCHAR NOT NULL,
|
sessionuri VARCHAR NOT NULL,
|
||||||
role VARCHAR NOT NULL
|
role VARCHAR NOT NULL
|
||||||
);
|
);
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,8 @@
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[error("Database Implementation Error: {0}")]
|
#[error("Database Implementation Error: {0}")]
|
||||||
Backend(#[from] sqlx::Error),
|
Backend(#[from] sqlx::Error),
|
||||||
|
#[error("AT Protocol Implementation Error: {0}")]
|
||||||
|
Atproto(#[from] atproto::error::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
|
||||||
2
db/src/interfaces/direct.rs
Normal file
2
db/src/interfaces/direct.rs
Normal file
|
|
@ -0,0 +1,2 @@
|
||||||
|
pub mod types;
|
||||||
|
pub mod functions;
|
||||||
116
db/src/interfaces/direct/functions.rs
Normal file
116
db/src/interfaces/direct/functions.rs
Normal file
|
|
@ -0,0 +1,116 @@
|
||||||
|
use atproto::types::{
|
||||||
|
Datetime,
|
||||||
|
Uri,
|
||||||
|
};
|
||||||
|
use crate::{
|
||||||
|
Error,
|
||||||
|
interfaces::direct::types::{
|
||||||
|
Activity,
|
||||||
|
Participant,
|
||||||
|
Session,
|
||||||
|
User
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use sqlx::{
|
||||||
|
Transaction,
|
||||||
|
Postgres,
|
||||||
|
PgPool,
|
||||||
|
query,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub async fn ingest_activity(
|
||||||
|
db: PgPool, activity: Activity
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let mut transaction = db.begin().await?;
|
||||||
|
write_activity(&mut transaction, activity).await?;
|
||||||
|
transaction.commit().await.map_err(Error::Backend)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn ingest_session(
|
||||||
|
db: PgPool, session: Session
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let mut transaction = db.begin().await?;
|
||||||
|
write_session(&mut transaction, session).await?;
|
||||||
|
transaction.commit().await.map_err(Error::Backend)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write_activity(
|
||||||
|
tr: &mut Transaction<'_, Postgres>, activity: Activity
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let (sessionuri, sessioncid) = match activity.session {
|
||||||
|
Some(sr) => {
|
||||||
|
let (session, cid) = sr.extract_content();
|
||||||
|
let sessionuri = session.uri.to_string();
|
||||||
|
write_session(tr, session).await?;
|
||||||
|
(sessionuri, cid.to_string())
|
||||||
|
}
|
||||||
|
None => ("".to_string(), "".to_string()),
|
||||||
|
};
|
||||||
|
|
||||||
|
query!(r#"
|
||||||
|
INSERT INTO
|
||||||
|
activity(uri, cid, session, sessioncid, performed_at, created_at, indexed_at)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||||
|
"#,
|
||||||
|
&activity.uri.to_string(),
|
||||||
|
&activity.cid.to_string(),
|
||||||
|
sessionuri,
|
||||||
|
sessioncid,
|
||||||
|
activity.performed_at.map(|dt| dt.to_string()),
|
||||||
|
activity.created_at.map(|dt| dt.to_string()),
|
||||||
|
&Datetime::now()?.to_string(),
|
||||||
|
).execute(&mut **tr).await?;
|
||||||
|
|
||||||
|
// TODO: Handle Progress
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async fn write_session(
|
||||||
|
tr: &mut Transaction<'_, Postgres>, session: Session
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
query!(r#"
|
||||||
|
INSERT INTO
|
||||||
|
session(uri, cid, owner, content, contentcid, label, created_at, indexed_at)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||||
|
"#,
|
||||||
|
&session.uri.to_string(),
|
||||||
|
&session.cid.to_string(),
|
||||||
|
&session.uri.authority_as_did().to_string(),
|
||||||
|
&session.content.get_content().to_string(),
|
||||||
|
&session.content.get_cid().to_string(),
|
||||||
|
session.label,
|
||||||
|
session.created_at.map(|dt| dt.to_string()),
|
||||||
|
&Datetime::now()?.to_string()
|
||||||
|
).execute(&mut **tr).await?;
|
||||||
|
|
||||||
|
if let Some(participants) = session.other_participants {
|
||||||
|
for participant in participants {
|
||||||
|
write_participant(tr, &participant, &session.uri).await?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write_participant(
|
||||||
|
tr: &mut Transaction<'_, Postgres>, participant: &Participant, sessionuri: &Uri
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let (participant_type, user): (String, &User) = match participant {
|
||||||
|
Participant::Owner(user) => ("Owner".to_string(), user),
|
||||||
|
Participant::Added(user) => ("Participant".to_string(), user),
|
||||||
|
};
|
||||||
|
|
||||||
|
query!(r#"
|
||||||
|
INSERT INTO
|
||||||
|
participant(participantdid, sessionuri, role)
|
||||||
|
VALUES ($1, $2, $3)
|
||||||
|
"#,
|
||||||
|
user.did.to_string(),
|
||||||
|
sessionuri.to_string(),
|
||||||
|
participant_type,
|
||||||
|
).execute(&mut **tr).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
58
db/src/interfaces/direct/types.rs
Normal file
58
db/src/interfaces/direct/types.rs
Normal file
|
|
@ -0,0 +1,58 @@
|
||||||
|
use atproto::types::{
|
||||||
|
Cid,
|
||||||
|
Uri,
|
||||||
|
Datetime,
|
||||||
|
Did,
|
||||||
|
StrongRef,
|
||||||
|
Handle,
|
||||||
|
};
|
||||||
|
use std::fmt::{Display, Formatter, Result as FmtResult};
|
||||||
|
|
||||||
|
pub struct Activity {
|
||||||
|
pub uri: Uri,
|
||||||
|
pub cid: Cid,
|
||||||
|
|
||||||
|
pub session: Option<StrongRef<Session>>,
|
||||||
|
pub progress: Option<Progress>,
|
||||||
|
pub performed_at: Option<Datetime>,
|
||||||
|
pub created_at: Option<Datetime>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Session {
|
||||||
|
pub uri: Uri,
|
||||||
|
pub cid: Cid,
|
||||||
|
|
||||||
|
pub content: StrongRef<Content>,
|
||||||
|
pub label: Option<String>,
|
||||||
|
pub created_at: Option<Datetime>,
|
||||||
|
pub other_participants: Option<Vec<Participant>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct User {
|
||||||
|
pub did: Did,
|
||||||
|
pub handle: Option<Handle>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[non_exhaustive]
|
||||||
|
pub enum Participant {
|
||||||
|
Owner(User),
|
||||||
|
Added(User),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[non_exhaustive]
|
||||||
|
pub enum Content {
|
||||||
|
UnknownContent
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for Content {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
|
||||||
|
write!(f, "{}", match self {
|
||||||
|
Content::UnknownContent => "UnknownContentType",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[non_exhaustive]
|
||||||
|
pub enum Progress {
|
||||||
|
UnknownProgress
|
||||||
|
}
|
||||||
|
|
@ -1 +1 @@
|
||||||
pub mod spoor;
|
pub mod direct;
|
||||||
|
|
|
||||||
|
|
@ -1,178 +0,0 @@
|
||||||
use crate::Error;
|
|
||||||
use atproto::{
|
|
||||||
Cid,
|
|
||||||
Uri,
|
|
||||||
Datetime,
|
|
||||||
Did,
|
|
||||||
StrongRef,
|
|
||||||
Handle,
|
|
||||||
};
|
|
||||||
use sqlx::{
|
|
||||||
PgTransaction,
|
|
||||||
PgPool,
|
|
||||||
query,
|
|
||||||
};
|
|
||||||
|
|
||||||
pub struct Activity {
|
|
||||||
pub authority: Uri,
|
|
||||||
|
|
||||||
pub cid: Option<Cid>,
|
|
||||||
pub session: Option<StrongRef<Session>>,
|
|
||||||
pub progress: Option<Progress>,
|
|
||||||
pub performed_at: Option<Datetime>,
|
|
||||||
pub created_at: Option<Datetime>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Session {
|
|
||||||
pub uri: Uri,
|
|
||||||
|
|
||||||
pub cid: Option<Cid>,
|
|
||||||
pub content: Option<StrongRef<Content>>,
|
|
||||||
pub label: Option<String>,
|
|
||||||
pub created_at: Option<Datetime>,
|
|
||||||
pub other_participants: Option<Vec<Participant>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct User {
|
|
||||||
pub did: Did,
|
|
||||||
pub handle: Option<Handle>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[non_exhaustive]
|
|
||||||
pub enum Participant {
|
|
||||||
Owner(User),
|
|
||||||
Added(User),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[non_exhaustive]
|
|
||||||
pub enum Content {
|
|
||||||
UnknownContent
|
|
||||||
}
|
|
||||||
|
|
||||||
#[non_exhaustive]
|
|
||||||
pub enum Progress {
|
|
||||||
UnknownProgress
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn ingest_session(
|
|
||||||
db: PgPool, session: Session
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let mut transaction = db.begin().await?;
|
|
||||||
write_session(&mut transaction, session).await?;
|
|
||||||
transaction.commit().await.map_err(Error::Backend)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn write_session(
|
|
||||||
tr: &mut PgTransaction<'_>, session: Session
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
|
|
||||||
let (contenturi, contentcid): (Option<Content>, String) =
|
|
||||||
match session.content {
|
|
||||||
Some(sr) => (Some(sr.content), sr.cid.to_string()),
|
|
||||||
None => (None, "".to_string()),
|
|
||||||
};
|
|
||||||
|
|
||||||
query!(r#"
|
|
||||||
INSERT INTO sessions(
|
|
||||||
sessionuri, sessioncid, label, created_at, contenturi, contentcid
|
|
||||||
) VALUES ($1, $2, $3, $4, $5, $6)
|
|
||||||
"#,
|
|
||||||
session.uri,
|
|
||||||
session.cid.map_or("", |cid| cid.to_string()),
|
|
||||||
session.label.unwrap_or(""),
|
|
||||||
session.created_at.map_or("", |dt| (*dt.as_str()).to_string()),
|
|
||||||
contenturi,
|
|
||||||
contentcid,
|
|
||||||
).execute(&mut tr).await?;
|
|
||||||
|
|
||||||
session.other_participants.and_then(|participants| {
|
|
||||||
for participant in participants {
|
|
||||||
write_participant(tr, participant, session.uri).await?
|
|
||||||
}
|
|
||||||
}).unwrap_or(Ok(()))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn write_participant(
|
|
||||||
tr: &mut PgTransaction<'_>, participant: Participant, sessionuri: Uri
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let (participantType, user): (String, User) = match participant {
|
|
||||||
Participant::Owner(user) => ("Owner".to_string(), user),
|
|
||||||
Participant::Added(user) => ("Participant".to_string(), user),
|
|
||||||
};
|
|
||||||
|
|
||||||
query!(r#"
|
|
||||||
INSERT INTO participants(
|
|
||||||
sessionuri, userdid, role
|
|
||||||
) VALUES ($1, $2, $3)
|
|
||||||
"#, sessionuri.to_string(), user.did.to_string, participantType
|
|
||||||
).execute(&mut tr).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// pub fn injest_activity(db: PgPool, activity: Activity) -> Result<(), Error> {
|
|
||||||
// todo!();
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// let mut transaction = db.begin().await?;
|
|
||||||
//
|
|
||||||
// query!(r#"
|
|
||||||
// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2)
|
|
||||||
// "#,
|
|
||||||
// session.sessionuri, session.label
|
|
||||||
// ).execute(&mut *transaction).await?;
|
|
||||||
//
|
|
||||||
// for participant in session.participants {
|
|
||||||
// query!(r#"
|
|
||||||
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
|
|
||||||
// "#,
|
|
||||||
// session.sessionuri, participant.userdid, participant.role.to_string()
|
|
||||||
// ).execute(&mut *transaction).await?;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// transaction.commit().await
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// pub async fn ingest_participant(db: PgPool, participant: Participant) -> Result<(), Error> {
|
|
||||||
// query!(r#"
|
|
||||||
// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2)
|
|
||||||
// "#,
|
|
||||||
// session.sessionuri, session.label
|
|
||||||
// ).execute(&mut *transaction).await?;
|
|
||||||
//
|
|
||||||
// for participant in session.participants {
|
|
||||||
// query!(r#"
|
|
||||||
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
|
|
||||||
// "#,
|
|
||||||
// session.sessionuri, participant.userdid, participant.role.to_string()
|
|
||||||
// ).execute(&mut *transaction).await?;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// transaction.commit().await
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
|
|
||||||
// pub async fn add_user(&self, user: &User) -> Result<()> {
|
|
||||||
// query!(r#"
|
|
||||||
// INSERT INTO users(userdid, handle) VALUES ($1, $2)
|
|
||||||
// "#,
|
|
||||||
// user.userdid, user.handle
|
|
||||||
// ).execute(self.pool).await?;
|
|
||||||
// Ok(())
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// pub async fn add_session(&self, session: &Session) -> Result<()> {
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// pub async fn add_participant(&self, session: Session,
|
|
||||||
// participant: Participant) -> Result<Session> {
|
|
||||||
// query!(r#"
|
|
||||||
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
|
|
||||||
// "#,
|
|
||||||
// session.sessionuri, participant.userdid, participant.role.to_string()
|
|
||||||
// ).execute(self.pool).await?;
|
|
||||||
//
|
|
||||||
// session.participants.push(participant);
|
|
||||||
//
|
|
||||||
// Ok(session)
|
|
||||||
// }
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue