Ingestor refactor complete baby~

This commit is contained in:
Julia Lange 2025-06-05 14:26:55 -07:00
parent 23ba006b8f
commit 079e8c5831
Signed by: Julia
SSH key fingerprint: SHA256:5DJcfxa5/fKCYn57dcabJa2vN2e6eT0pBerYi5SUbto
3 changed files with 48 additions and 37 deletions

View file

@ -21,7 +21,9 @@ macro_rules! create_commit_collection {
struct Ingestor; struct Ingestor;
impl $crate::collections::Collection for $collection { impl $crate::collections::Collection for $collection {
fn get_nsid() -> String { fn new() -> Self { Self }
fn get_nsid(&self) -> String {
$nsid $nsid
} }
@ -77,7 +79,8 @@ pub use my_spoor_activity::MySpoorActivity;
struct CommitIngestorState; struct CommitIngestorState;
pub trait Collection { pub trait Collection {
fn get_nsid() -> String; fn new() -> Self;
fn get_nsid(&self) -> String;
fn get_ingestor(&self) -> Box<dyn LexiconIngestor + Send + Sync>; fn get_ingestor(&self) -> Box<dyn LexiconIngestor + Send + Sync>;
} }

View file

@ -1,54 +1,52 @@
use atproto::{ use crate::collections::Collection;
Collection,
lexicons::my::spoor::log::{Activity, Session},
};
use async_trait::async_trait;
use rocketman::{ use rocketman::{
options::JetstreamOptions, options::JetstreamOptions,
ingestion::LexiconIngestor, ingestion::LexiconIngestor,
// types::event::{Event, Operation},
types::event::Event,
connection::JetstreamConnection, connection::JetstreamConnection,
handler, handler,
}; };
use serde_json::value::Value;
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
use tracing::{error, info}; use tracing::{info, error};
pub async fn start_ingestor() { pub struct Ingestor {
info!("Starting ingestor"); ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>>,
let nsids = vec![ }
Activity::NSID.to_string(),
Session::NSID.to_string(),
];
let opts = JetstreamOptions::builder().wanted_collections(nsids.clone()).build();
let jetstream = JetstreamConnection::new(opts);
let mut ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new(); impl Ingestor {
for nsid in nsids { pub fn new() -> Self {
ingesters.insert(nsid, Box::new(SpoorJetstream)); Self { ingesters: HashMap::new() }
} }
let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None)); pub fn add_collection<C: Collection>(&mut self, collection: C) {
self.ingesters.insert(collection.get_nsid(), collection.get_ingestor());
}
let msg_rx = jetstream.get_msg_rx(); pub async fn start(self) {
let reconnect_tx = jetstream.get_reconnect_tx(); info!("Starting ingestor");
let opts = JetstreamOptions::builder()
.wanted_collections(self.ingesters.keys().cloned().collect())
.build();
let jetstream = JetstreamConnection::new(opts);
let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
let msg_rx = jetstream.get_msg_rx();
let reconnect_tx = jetstream.get_reconnect_tx();
let cursor_clone = cursor.clone(); let cursor_clone = cursor.clone();
tokio::spawn(async move { tokio::spawn(async move {
while let Ok(message) = msg_rx.recv_async().await { while let Ok(message) = msg_rx.recv_async().await {
if let Err(e) = handler::handle_message(message, &ingesters, if let Err(e) = handler::handle_message(message, &self.ingesters,
reconnect_tx.clone(), cursor_clone.clone()).await { reconnect_tx.clone(), cursor_clone.clone()).await {
error!("Error processing message: {}", e); error!("Error processing message: {}", e);
}
} }
} });
});
if let Err(e) = jetstream.connect(cursor.clone()).await { if let Err(e) = jetstream.connect(cursor.clone()).await {
error!("Failed to connect to Jetstream: {}", e); error!("Failed to connect to Jetstream: {}", e);
std::process::exit(1); std::process::exit(1);
}
} }
} }

View file

@ -1,3 +1,10 @@
use crate::{
collections::{
Collection,
MySpoorActivity, MySpoorSession,
},
ingestor::Ingestor,
};
mod ingestor; mod ingestor;
mod collections; mod collections;
@ -7,5 +14,8 @@ async fn main() {
let subscriber = tracing_subscriber::FmtSubscriber::new(); let subscriber = tracing_subscriber::FmtSubscriber::new();
let _ = tracing::subscriber::set_global_default(subscriber); let _ = tracing::subscriber::set_global_default(subscriber);
let mut ingestor = Ingestor::new();
ingestor.add_collection(MySpoorActivity::new());
ingestor.add_collection(MySpoorSession::new());
ingestor.start().await;
} }