diff --git a/ingestor/src/collection.rs b/ingestor/src/collection.rs new file mode 100644 index 0000000..69128d3 --- /dev/null +++ b/ingestor/src/collection.rs @@ -0,0 +1,25 @@ +use rocketman::types::event::Event; +use anyhow::Result; + +enum Ingestor { + Jetstream(SpoorJetstream) +} + +struct SpoorJetstream; + +#[async_trait] +impl LexiconIngestor for SpoorJetstream { + async fn ingest(&self, message: Event) -> Result<()> { + info!("{:?}", message); + // if let Some(commit) = &message.commit { + // match commit.operation { + // Operation::Create | Operation::Update => {} + // Operation::Delete => {} + // } + // } else { + // return Err("Message has no commit"); + // } + Ok(()) + } +} + diff --git a/ingestor/src/collections.rs b/ingestor/src/collections.rs deleted file mode 100644 index 3b9cd5d..0000000 --- a/ingestor/src/collections.rs +++ /dev/null @@ -1,85 +0,0 @@ -use rocketman::ingestion::LexiconIngestor; - -macro_rules! create_commit_collection { - ($collection:ident, $nsid:expr, - change => $change_fn:expr, - delete => $delete_fn:expr $(,)? - ) => ( - create_commit_collection!( - $collection, $nsid, - create => $change_fn, - update => $change_fn, - delete => $delete_fn, - ); - ); - ($collection:ident, $nsid:expr, - create => $create_fn:expr, - update => $update_fn:expr, - delete => $delete_fn:expr $(,)? - ) => ( - pub struct $collection; - struct Ingestor; - - impl $crate::collections::Collection for $collection { - fn new() -> Self { Self } - - fn get_nsid(&self) -> String { - $nsid - } - - fn get_ingestor( - &self - ) -> Box { - Box::new(Ingestor::new()) - } - } - - impl Ingestor { - pub fn new() -> Self { Self } - - pub async fn handle_commit( - &self, - message: rocketman::types::event::Event - ) -> anyhow::Result<()> { - use rocketman::types::event::Operation; - - let state = $crate::collections::CommitIngestorState { }; - - if let Some(commit) = &message.commit { - match commit.operation { - Operation::Create => ($create_fn)(state, message).await?, - Operation::Update => ($update_fn)(state, message).await?, - Operation::Delete => ($delete_fn)(state, message).await?, - } - } else { - return Err(anyhow::anyhow!("Message has no commit")); - } - Ok(()) - } - } - - #[async_trait::async_trait] - impl rocketman::ingestion::LexiconIngestor for Ingestor { - async fn ingest( - &self, - message: rocketman::types::event::Event - ) -> anyhow::Result<()> { - self.handle_commit(message).await - } - } - ); -} - -pub mod my_spoor_log_session; -pub use my_spoor_log_session::MySpoorLogSession; -pub mod my_spoor_log_activity; -pub use my_spoor_log_activity::MySpoorLogActivity; - -struct CommitIngestorState; - -pub trait Collection { - fn new() -> Self; - fn get_nsid(&self) -> String; - fn get_ingestor(&self) -> Box; -} - diff --git a/ingestor/src/collections/app_bsky_feed_post.rs b/ingestor/src/collections/app_bsky_feed_post.rs deleted file mode 100644 index d57bb1d..0000000 --- a/ingestor/src/collections/app_bsky_feed_post.rs +++ /dev/null @@ -1,26 +0,0 @@ -use crate::collections::CommitIngestorState; -use rocketman::types::event::Event; -use serde_json::Value; -use anyhow::Result; -use tracing::info; - -async fn handle_change( - _state: CommitIngestorState, - message: Event -) -> Result<()> { - info!("{:?}", message); - Ok(()) -} - -async fn handle_delete( - _state: CommitIngestorState, - message: Event -) -> Result<()> { - info!("{:?}", message); - Ok(()) -} - -create_commit_collection!(AppBskyFeedPost, "app.bsky.feed.post".to_string(), - change => handle_change, - delete => handle_delete, -); diff --git a/ingestor/src/collections/my_spoor_log_activity.rs b/ingestor/src/collections/my_spoor_log_activity.rs deleted file mode 100644 index b664220..0000000 --- a/ingestor/src/collections/my_spoor_log_activity.rs +++ /dev/null @@ -1,30 +0,0 @@ -use crate::collections::CommitIngestorState; -use atproto::{ - Collection as AtprotoCollection, - lexicons::my::spoor::log::Activity, -}; -use rocketman::types::event::Event; -use serde_json::Value; -use anyhow::Result; -use tracing::info; - -async fn handle_change( - _state: CommitIngestorState, - message: Event -) -> Result<()> { - info!("{:?}", message); - Ok(()) -} - -async fn handle_delete( - _state: CommitIngestorState, - message: Event -) -> Result<()> { - info!("{:?}", message); - Ok(()) -} - -create_commit_collection!(MySpoorLogActivity, Activity::NSID.to_string(), - change => handle_change, - delete => handle_delete, -); diff --git a/ingestor/src/collections/my_spoor_log_session.rs b/ingestor/src/collections/my_spoor_log_session.rs deleted file mode 100644 index c90e7de..0000000 --- a/ingestor/src/collections/my_spoor_log_session.rs +++ /dev/null @@ -1,30 +0,0 @@ -use crate::collections::CommitIngestorState; -use atproto::{ - Collection as AtprotoCollection, - lexicons::my::spoor::log::Session, -}; -use rocketman::types::event::Event; -use serde_json::Value; -use anyhow::Result; -use tracing::info; - -async fn handle_change( - _state: CommitIngestorState, - message: Event -) -> Result<()> { - info!("{:?}", message); - Ok(()) -} - -async fn handle_delete( - _state: CommitIngestorState, - message: Event -) -> Result<()> { - info!("{:?}", message); - Ok(()) -} - -create_commit_collection!(MySpoorLogSession, Session::NSID.to_string(), - change => handle_change, - delete => handle_delete, -); diff --git a/ingestor/src/ingestor.rs b/ingestor/src/ingestor.rs index e2a80f3..d25f102 100644 --- a/ingestor/src/ingestor.rs +++ b/ingestor/src/ingestor.rs @@ -1,53 +1,54 @@ -use crate::collections::Collection; +use atproto::{ + Collection, + lexicons::my::spoor::log::{Activity, Session}, +}; +use async_trait::async_trait; use rocketman::{ options::JetstreamOptions, ingestion::LexiconIngestor, + // types::event::{Event, Operation}, + types::event::Event, connection::JetstreamConnection, handler, }; +use serde_json::value::Value; use std::{ collections::HashMap, sync::{Arc, Mutex}, }; -use tracing::{info, error}; +use tracing::{error, info}; -pub struct Ingestor { - ingesters: HashMap>, -} +pub async fn start_ingestor() { + info!("Starting ingestor"); + let nsids = vec![ + Activity::NSID.to_string(), + Session::NSID.to_string(), + ]; + let opts = JetstreamOptions::builder().wanted_collections(nsids.clone()).build(); + let jetstream = JetstreamConnection::new(opts); -impl Ingestor { - pub fn new() -> Self { - Self { ingesters: HashMap::new() } + let mut ingesters: HashMap> = HashMap::new(); + for nsid in nsids { + ingesters.insert(nsid, Box::new(SpoorJetstream)); } - pub fn add_collection(&mut self, collection: C) { - self.ingesters.insert(collection.get_nsid(), collection.get_ingestor()); - } + let cursor: Arc>> = Arc::new(Mutex::new(None)); - pub async fn start(self) { - info!("Starting ingestor with the following collections: {:?}", - self.ingesters.keys()); - let opts = JetstreamOptions::builder() - .wanted_collections(self.ingesters.keys().cloned().collect()) - .build(); - let jetstream = JetstreamConnection::new(opts); - let cursor: Arc>> = Arc::new(Mutex::new(None)); - let msg_rx = jetstream.get_msg_rx(); - let reconnect_tx = jetstream.get_reconnect_tx(); + let msg_rx = jetstream.get_msg_rx(); + let reconnect_tx = jetstream.get_reconnect_tx(); - let cursor_clone = cursor.clone(); - tokio::spawn(async move { - while let Ok(message) = msg_rx.recv_async().await { - if let Err(e) = handler::handle_message(message, &self.ingesters, - reconnect_tx.clone(), cursor_clone.clone()).await { - error!("Error processing message: {}", e); - } + let cursor_clone = cursor.clone(); + tokio::spawn(async move { + while let Ok(message) = msg_rx.recv_async().await { + if let Err(e) = handler::handle_message(message, &ingesters, + reconnect_tx.clone(), cursor_clone.clone()).await { + error!("Error processing message: {}", e); } - }); - - if let Err(e) = jetstream.connect(cursor.clone()).await { - error!("Failed to connect to Jetstream: {}", e); - std::process::exit(1); } + }); + + if let Err(e) = jetstream.connect(cursor.clone()).await { + error!("Failed to connect to Jetstream: {}", e); + std::process::exit(1); } } diff --git a/ingestor/src/main.rs b/ingestor/src/main.rs index 08af557..4289770 100644 --- a/ingestor/src/main.rs +++ b/ingestor/src/main.rs @@ -1,21 +1,8 @@ -use crate::{ - collections::{ - Collection, - MySpoorLogActivity, MySpoorLogSession, - }, - ingestor::Ingestor, -}; - -mod ingestor; -mod collections; #[tokio::main] async fn main() { let subscriber = tracing_subscriber::FmtSubscriber::new(); let _ = tracing::subscriber::set_global_default(subscriber); - let mut ingestor = Ingestor::new(); - ingestor.add_collection(MySpoorLogActivity::new()); - ingestor.add_collection(MySpoorLogSession::new()); - ingestor.start().await; + }