From 23ba006b8f5857b051f3b7a4e75c26dade44d17b Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Thu, 5 Jun 2025 14:00:07 -0700 Subject: [PATCH] Collections refactor, unused in ingestor --- ingestor/src/collection.rs | 25 ------ ingestor/src/collections.rs | 83 +++++++++++++++++++ ingestor/src/collections/my_spoor_activity.rs | 30 +++++++ ingestor/src/collections/my_spoor_session.rs | 30 +++++++ ingestor/src/main.rs | 3 + 5 files changed, 146 insertions(+), 25 deletions(-) delete mode 100644 ingestor/src/collection.rs create mode 100644 ingestor/src/collections.rs create mode 100644 ingestor/src/collections/my_spoor_activity.rs create mode 100644 ingestor/src/collections/my_spoor_session.rs diff --git a/ingestor/src/collection.rs b/ingestor/src/collection.rs deleted file mode 100644 index 69128d3..0000000 --- a/ingestor/src/collection.rs +++ /dev/null @@ -1,25 +0,0 @@ -use rocketman::types::event::Event; -use anyhow::Result; - -enum Ingestor { - Jetstream(SpoorJetstream) -} - -struct SpoorJetstream; - -#[async_trait] -impl LexiconIngestor for SpoorJetstream { - async fn ingest(&self, message: Event) -> Result<()> { - info!("{:?}", message); - // if let Some(commit) = &message.commit { - // match commit.operation { - // Operation::Create | Operation::Update => {} - // Operation::Delete => {} - // } - // } else { - // return Err("Message has no commit"); - // } - Ok(()) - } -} - diff --git a/ingestor/src/collections.rs b/ingestor/src/collections.rs new file mode 100644 index 0000000..4a4e733 --- /dev/null +++ b/ingestor/src/collections.rs @@ -0,0 +1,83 @@ +use rocketman::ingestion::LexiconIngestor; + +macro_rules! create_commit_collection { + ($collection:ident, $nsid:expr, + change => $change_fn:expr, + delete => $delete_fn:expr $(,)? + ) => ( + create_commit_collection!( + $collection, $nsid, + create => $change_fn, + update => $change_fn, + delete => $delete_fn, + ); + ); + ($collection:ident, $nsid:expr, + create => $create_fn:expr, + update => $update_fn:expr, + delete => $delete_fn:expr $(,)? + ) => ( + pub struct $collection; + struct Ingestor; + + impl $crate::collections::Collection for $collection { + fn get_nsid() -> String { + $nsid + } + + fn get_ingestor( + &self + ) -> Box { + Box::new(Ingestor::new()) + } + } + + impl Ingestor { + pub fn new() -> Self { Self } + + pub async fn handle_commit( + &self, + message: rocketman::types::event::Event + ) -> anyhow::Result<()> { + use rocketman::types::event::Operation; + + let state = $crate::collections::CommitIngestorState { }; + + if let Some(commit) = &message.commit { + match commit.operation { + Operation::Create => ($create_fn)(state, message).await?, + Operation::Update => ($update_fn)(state, message).await?, + Operation::Delete => ($delete_fn)(state, message).await?, + } + } else { + return Err(anyhow::anyhow!("Message has no commit")); + } + Ok(()) + } + } + + #[async_trait::async_trait] + impl rocketman::ingestion::LexiconIngestor for Ingestor { + async fn ingest( + &self, + message: rocketman::types::event::Event + ) -> anyhow::Result<()> { + self.handle_commit(message).await + } + } + ); +} + +pub mod my_spoor_session; +pub use my_spoor_session::MySpoorSession; +pub mod my_spoor_activity; +pub use my_spoor_activity::MySpoorActivity; + + +struct CommitIngestorState; + +pub trait Collection { + fn get_nsid() -> String; + fn get_ingestor(&self) -> Box; +} + diff --git a/ingestor/src/collections/my_spoor_activity.rs b/ingestor/src/collections/my_spoor_activity.rs new file mode 100644 index 0000000..3544cfb --- /dev/null +++ b/ingestor/src/collections/my_spoor_activity.rs @@ -0,0 +1,30 @@ +use crate::collections::CommitIngestorState; +use atproto::{ + Collection as AtprotoCollection, + lexicons::my::spoor::log::Activity, +}; +use rocketman::types::event::Event; +use serde_json::Value; +use anyhow::Result; +use tracing::info; + +async fn handle_change( + _state: CommitIngestorState, + message: Event +) -> Result<()> { + info!("{:?}", message); + Ok(()) +} + +async fn handle_delete( + _state: CommitIngestorState, + message: Event +) -> Result<()> { + info!("{:?}", message); + Ok(()) +} + +create_commit_collection!(MySpoorActivity, Activity::NSID.to_string(), + change => handle_change, + delete => handle_delete, +); diff --git a/ingestor/src/collections/my_spoor_session.rs b/ingestor/src/collections/my_spoor_session.rs new file mode 100644 index 0000000..25ba380 --- /dev/null +++ b/ingestor/src/collections/my_spoor_session.rs @@ -0,0 +1,30 @@ +use crate::collections::CommitIngestorState; +use atproto::{ + Collection as AtprotoCollection, + lexicons::my::spoor::log::Session, +}; +use rocketman::types::event::Event; +use serde_json::Value; +use anyhow::Result; +use tracing::info; + +async fn handle_change( + _state: CommitIngestorState, + message: Event +) -> Result<()> { + info!("{:?}", message); + Ok(()) +} + +async fn handle_delete( + _state: CommitIngestorState, + message: Event +) -> Result<()> { + info!("{:?}", message); + Ok(()) +} + +create_commit_collection!(MySpoorSession, Session::NSID.to_string(), + change => handle_change, + delete => handle_delete, +); diff --git a/ingestor/src/main.rs b/ingestor/src/main.rs index 4289770..fdb8fa2 100644 --- a/ingestor/src/main.rs +++ b/ingestor/src/main.rs @@ -1,4 +1,7 @@ +mod ingestor; +mod collections; + #[tokio::main] async fn main() { let subscriber = tracing_subscriber::FmtSubscriber::new();