appview/ingestor/src/ingestor.rs

54 lines
1.5 KiB
Rust
Raw Normal View History

use crate::collections::Collection;
use rocketman::{
options::JetstreamOptions,
ingestion::LexiconIngestor,
connection::JetstreamConnection,
handler,
};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use tracing::{info, error};
pub struct Ingestor {
ingestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>>,
}
impl Ingestor {
pub fn new() -> Self {
Self { ingestors: HashMap::new() }
}
pub fn add_collection<C: Collection>(&mut self, collection: C) {
self.ingestors.insert(collection.get_nsid(), collection.get_ingestor());
}
pub async fn start(self) {
info!("Starting ingestor with the following collections: {:?}",
self.ingestors.keys());
let opts = JetstreamOptions::builder()
.wanted_collections(self.ingestors.keys().cloned().collect())
.build();
let jetstream = JetstreamConnection::new(opts);
let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
let msg_rx = jetstream.get_msg_rx();
let reconnect_tx = jetstream.get_reconnect_tx();
let cursor_clone = cursor.clone();
tokio::spawn(async move {
while let Ok(message) = msg_rx.recv_async().await {
if let Err(e) = handler::handle_message(message, &self.ingestors,
reconnect_tx.clone(), cursor_clone.clone()).await {
error!("Error processing message: {}", e);
}
}
});
if let Err(e) = jetstream.connect(cursor.clone()).await {
error!("Failed to connect to Jetstream: {}", e);
std::process::exit(1);
}
}
}