diff --git a/ingestor/src/collection.rs b/ingestor/src/collection.rs deleted file mode 100644 index 69128d3..0000000 --- a/ingestor/src/collection.rs +++ /dev/null @@ -1,25 +0,0 @@ -use rocketman::types::event::Event; -use anyhow::Result; - -enum Ingestor { - Jetstream(SpoorJetstream) -} - -struct SpoorJetstream; - -#[async_trait] -impl LexiconIngestor for SpoorJetstream { - async fn ingest(&self, message: Event) -> Result<()> { - info!("{:?}", message); - // if let Some(commit) = &message.commit { - // match commit.operation { - // Operation::Create | Operation::Update => {} - // Operation::Delete => {} - // } - // } else { - // return Err("Message has no commit"); - // } - Ok(()) - } -} - diff --git a/ingestor/src/collections.rs b/ingestor/src/collections.rs new file mode 100644 index 0000000..3b9cd5d --- /dev/null +++ b/ingestor/src/collections.rs @@ -0,0 +1,85 @@ +use rocketman::ingestion::LexiconIngestor; + +macro_rules! create_commit_collection { + ($collection:ident, $nsid:expr, + change => $change_fn:expr, + delete => $delete_fn:expr $(,)? + ) => ( + create_commit_collection!( + $collection, $nsid, + create => $change_fn, + update => $change_fn, + delete => $delete_fn, + ); + ); + ($collection:ident, $nsid:expr, + create => $create_fn:expr, + update => $update_fn:expr, + delete => $delete_fn:expr $(,)? + ) => ( + pub struct $collection; + struct Ingestor; + + impl $crate::collections::Collection for $collection { + fn new() -> Self { Self } + + fn get_nsid(&self) -> String { + $nsid + } + + fn get_ingestor( + &self + ) -> Box { + Box::new(Ingestor::new()) + } + } + + impl Ingestor { + pub fn new() -> Self { Self } + + pub async fn handle_commit( + &self, + message: rocketman::types::event::Event + ) -> anyhow::Result<()> { + use rocketman::types::event::Operation; + + let state = $crate::collections::CommitIngestorState { }; + + if let Some(commit) = &message.commit { + match commit.operation { + Operation::Create => ($create_fn)(state, message).await?, + Operation::Update => ($update_fn)(state, message).await?, + Operation::Delete => ($delete_fn)(state, message).await?, + } + } else { + return Err(anyhow::anyhow!("Message has no commit")); + } + Ok(()) + } + } + + #[async_trait::async_trait] + impl rocketman::ingestion::LexiconIngestor for Ingestor { + async fn ingest( + &self, + message: rocketman::types::event::Event + ) -> anyhow::Result<()> { + self.handle_commit(message).await + } + } + ); +} + +pub mod my_spoor_log_session; +pub use my_spoor_log_session::MySpoorLogSession; +pub mod my_spoor_log_activity; +pub use my_spoor_log_activity::MySpoorLogActivity; + +struct CommitIngestorState; + +pub trait Collection { + fn new() -> Self; + fn get_nsid(&self) -> String; + fn get_ingestor(&self) -> Box; +} + diff --git a/ingestor/src/collections/app_bsky_feed_post.rs b/ingestor/src/collections/app_bsky_feed_post.rs new file mode 100644 index 0000000..d57bb1d --- /dev/null +++ b/ingestor/src/collections/app_bsky_feed_post.rs @@ -0,0 +1,26 @@ +use crate::collections::CommitIngestorState; +use rocketman::types::event::Event; +use serde_json::Value; +use anyhow::Result; +use tracing::info; + +async fn handle_change( + _state: CommitIngestorState, + message: Event +) -> Result<()> { + info!("{:?}", message); + Ok(()) +} + +async fn handle_delete( + _state: CommitIngestorState, + message: Event +) -> Result<()> { + info!("{:?}", message); + Ok(()) +} + +create_commit_collection!(AppBskyFeedPost, "app.bsky.feed.post".to_string(), + change => handle_change, + delete => handle_delete, +); diff --git a/ingestor/src/collections/my_spoor_log_activity.rs b/ingestor/src/collections/my_spoor_log_activity.rs new file mode 100644 index 0000000..b664220 --- /dev/null +++ b/ingestor/src/collections/my_spoor_log_activity.rs @@ -0,0 +1,30 @@ +use crate::collections::CommitIngestorState; +use atproto::{ + Collection as AtprotoCollection, + lexicons::my::spoor::log::Activity, +}; +use rocketman::types::event::Event; +use serde_json::Value; +use anyhow::Result; +use tracing::info; + +async fn handle_change( + _state: CommitIngestorState, + message: Event +) -> Result<()> { + info!("{:?}", message); + Ok(()) +} + +async fn handle_delete( + _state: CommitIngestorState, + message: Event +) -> Result<()> { + info!("{:?}", message); + Ok(()) +} + +create_commit_collection!(MySpoorLogActivity, Activity::NSID.to_string(), + change => handle_change, + delete => handle_delete, +); diff --git a/ingestor/src/collections/my_spoor_log_session.rs b/ingestor/src/collections/my_spoor_log_session.rs new file mode 100644 index 0000000..c90e7de --- /dev/null +++ b/ingestor/src/collections/my_spoor_log_session.rs @@ -0,0 +1,30 @@ +use crate::collections::CommitIngestorState; +use atproto::{ + Collection as AtprotoCollection, + lexicons::my::spoor::log::Session, +}; +use rocketman::types::event::Event; +use serde_json::Value; +use anyhow::Result; +use tracing::info; + +async fn handle_change( + _state: CommitIngestorState, + message: Event +) -> Result<()> { + info!("{:?}", message); + Ok(()) +} + +async fn handle_delete( + _state: CommitIngestorState, + message: Event +) -> Result<()> { + info!("{:?}", message); + Ok(()) +} + +create_commit_collection!(MySpoorLogSession, Session::NSID.to_string(), + change => handle_change, + delete => handle_delete, +); diff --git a/ingestor/src/ingestor.rs b/ingestor/src/ingestor.rs index d25f102..e2a80f3 100644 --- a/ingestor/src/ingestor.rs +++ b/ingestor/src/ingestor.rs @@ -1,54 +1,53 @@ -use atproto::{ - Collection, - lexicons::my::spoor::log::{Activity, Session}, -}; -use async_trait::async_trait; +use crate::collections::Collection; use rocketman::{ options::JetstreamOptions, ingestion::LexiconIngestor, - // types::event::{Event, Operation}, - types::event::Event, connection::JetstreamConnection, handler, }; -use serde_json::value::Value; use std::{ collections::HashMap, sync::{Arc, Mutex}, }; -use tracing::{error, info}; +use tracing::{info, error}; -pub async fn start_ingestor() { - info!("Starting ingestor"); - let nsids = vec![ - Activity::NSID.to_string(), - Session::NSID.to_string(), - ]; - let opts = JetstreamOptions::builder().wanted_collections(nsids.clone()).build(); - let jetstream = JetstreamConnection::new(opts); +pub struct Ingestor { + ingesters: HashMap>, +} - let mut ingesters: HashMap> = HashMap::new(); - for nsid in nsids { - ingesters.insert(nsid, Box::new(SpoorJetstream)); +impl Ingestor { + pub fn new() -> Self { + Self { ingesters: HashMap::new() } } - let cursor: Arc>> = Arc::new(Mutex::new(None)); + pub fn add_collection(&mut self, collection: C) { + self.ingesters.insert(collection.get_nsid(), collection.get_ingestor()); + } - let msg_rx = jetstream.get_msg_rx(); - let reconnect_tx = jetstream.get_reconnect_tx(); + pub async fn start(self) { + info!("Starting ingestor with the following collections: {:?}", + self.ingesters.keys()); + let opts = JetstreamOptions::builder() + .wanted_collections(self.ingesters.keys().cloned().collect()) + .build(); + let jetstream = JetstreamConnection::new(opts); + let cursor: Arc>> = Arc::new(Mutex::new(None)); + let msg_rx = jetstream.get_msg_rx(); + let reconnect_tx = jetstream.get_reconnect_tx(); - let cursor_clone = cursor.clone(); - tokio::spawn(async move { - while let Ok(message) = msg_rx.recv_async().await { - if let Err(e) = handler::handle_message(message, &ingesters, - reconnect_tx.clone(), cursor_clone.clone()).await { - error!("Error processing message: {}", e); + let cursor_clone = cursor.clone(); + tokio::spawn(async move { + while let Ok(message) = msg_rx.recv_async().await { + if let Err(e) = handler::handle_message(message, &self.ingesters, + reconnect_tx.clone(), cursor_clone.clone()).await { + error!("Error processing message: {}", e); + } } - } - }); + }); - if let Err(e) = jetstream.connect(cursor.clone()).await { - error!("Failed to connect to Jetstream: {}", e); - std::process::exit(1); + if let Err(e) = jetstream.connect(cursor.clone()).await { + error!("Failed to connect to Jetstream: {}", e); + std::process::exit(1); + } } } diff --git a/ingestor/src/main.rs b/ingestor/src/main.rs index 4289770..08af557 100644 --- a/ingestor/src/main.rs +++ b/ingestor/src/main.rs @@ -1,8 +1,21 @@ +use crate::{ + collections::{ + Collection, + MySpoorLogActivity, MySpoorLogSession, + }, + ingestor::Ingestor, +}; + +mod ingestor; +mod collections; #[tokio::main] async fn main() { let subscriber = tracing_subscriber::FmtSubscriber::new(); let _ = tracing::subscriber::set_global_default(subscriber); - + let mut ingestor = Ingestor::new(); + ingestor.add_collection(MySpoorLogActivity::new()); + ingestor.add_collection(MySpoorLogSession::new()); + ingestor.start().await; }