From 079e8c5831f1c6896bd4c8bb294aab3ff336c2ec Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Thu, 5 Jun 2025 14:26:55 -0700 Subject: [PATCH] Ingestor refactor complete baby~ --- ingestor/src/collections.rs | 7 ++-- ingestor/src/ingestor.rs | 66 ++++++++++++++++++------------------- ingestor/src/main.rs | 12 ++++++- 3 files changed, 48 insertions(+), 37 deletions(-) diff --git a/ingestor/src/collections.rs b/ingestor/src/collections.rs index 4a4e733..b9ca1c1 100644 --- a/ingestor/src/collections.rs +++ b/ingestor/src/collections.rs @@ -21,7 +21,9 @@ macro_rules! create_commit_collection { struct Ingestor; impl $crate::collections::Collection for $collection { - fn get_nsid() -> String { + fn new() -> Self { Self } + + fn get_nsid(&self) -> String { $nsid } @@ -77,7 +79,8 @@ pub use my_spoor_activity::MySpoorActivity; struct CommitIngestorState; pub trait Collection { - fn get_nsid() -> String; + fn new() -> Self; + fn get_nsid(&self) -> String; fn get_ingestor(&self) -> Box; } diff --git a/ingestor/src/ingestor.rs b/ingestor/src/ingestor.rs index d25f102..2bbdad9 100644 --- a/ingestor/src/ingestor.rs +++ b/ingestor/src/ingestor.rs @@ -1,54 +1,52 @@ -use atproto::{ - Collection, - lexicons::my::spoor::log::{Activity, Session}, -}; -use async_trait::async_trait; +use crate::collections::Collection; use rocketman::{ options::JetstreamOptions, ingestion::LexiconIngestor, - // types::event::{Event, Operation}, - types::event::Event, connection::JetstreamConnection, handler, }; -use serde_json::value::Value; use std::{ collections::HashMap, sync::{Arc, Mutex}, }; -use tracing::{error, info}; +use tracing::{info, error}; -pub async fn start_ingestor() { - info!("Starting ingestor"); - let nsids = vec![ - Activity::NSID.to_string(), - Session::NSID.to_string(), - ]; - let opts = JetstreamOptions::builder().wanted_collections(nsids.clone()).build(); - let jetstream = JetstreamConnection::new(opts); +pub struct Ingestor { + ingesters: HashMap>, +} - let mut ingesters: HashMap> = HashMap::new(); - for nsid in nsids { - ingesters.insert(nsid, Box::new(SpoorJetstream)); +impl Ingestor { + pub fn new() -> Self { + Self { ingesters: HashMap::new() } } - let cursor: Arc>> = Arc::new(Mutex::new(None)); + pub fn add_collection(&mut self, collection: C) { + self.ingesters.insert(collection.get_nsid(), collection.get_ingestor()); + } - let msg_rx = jetstream.get_msg_rx(); - let reconnect_tx = jetstream.get_reconnect_tx(); + pub async fn start(self) { + info!("Starting ingestor"); + let opts = JetstreamOptions::builder() + .wanted_collections(self.ingesters.keys().cloned().collect()) + .build(); + let jetstream = JetstreamConnection::new(opts); + let cursor: Arc>> = Arc::new(Mutex::new(None)); + let msg_rx = jetstream.get_msg_rx(); + let reconnect_tx = jetstream.get_reconnect_tx(); - let cursor_clone = cursor.clone(); - tokio::spawn(async move { - while let Ok(message) = msg_rx.recv_async().await { - if let Err(e) = handler::handle_message(message, &ingesters, - reconnect_tx.clone(), cursor_clone.clone()).await { - error!("Error processing message: {}", e); + let cursor_clone = cursor.clone(); + tokio::spawn(async move { + while let Ok(message) = msg_rx.recv_async().await { + if let Err(e) = handler::handle_message(message, &self.ingesters, + reconnect_tx.clone(), cursor_clone.clone()).await { + error!("Error processing message: {}", e); + } } - } - }); + }); - if let Err(e) = jetstream.connect(cursor.clone()).await { - error!("Failed to connect to Jetstream: {}", e); - std::process::exit(1); + if let Err(e) = jetstream.connect(cursor.clone()).await { + error!("Failed to connect to Jetstream: {}", e); + std::process::exit(1); + } } } diff --git a/ingestor/src/main.rs b/ingestor/src/main.rs index fdb8fa2..0214287 100644 --- a/ingestor/src/main.rs +++ b/ingestor/src/main.rs @@ -1,3 +1,10 @@ +use crate::{ + collections::{ + Collection, + MySpoorActivity, MySpoorSession, + }, + ingestor::Ingestor, +}; mod ingestor; mod collections; @@ -7,5 +14,8 @@ async fn main() { let subscriber = tracing_subscriber::FmtSubscriber::new(); let _ = tracing::subscriber::set_global_default(subscriber); - + let mut ingestor = Ingestor::new(); + ingestor.add_collection(MySpoorActivity::new()); + ingestor.add_collection(MySpoorSession::new()); + ingestor.start().await; }