Ingestor, scaffolding
This commit is contained in:
parent
cee9bf78ef
commit
2f2e653d13
4 changed files with 52 additions and 25 deletions
15
ingestor/Cargo.toml
Normal file
15
ingestor/Cargo.toml
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
[package]
|
||||
name = "ingestor"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.98"
|
||||
async-trait = "0.1.88"
|
||||
atproto.workspace = true
|
||||
rocketman = "0.2.0"
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tokio.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
tracing.workspace = true
|
||||
25
ingestor/src/collection.rs
Normal file
25
ingestor/src/collection.rs
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
use rocketman::types::event::Event;
|
||||
use anyhow::Result;
|
||||
|
||||
enum Ingestor {
|
||||
Jetstream(SpoorJetstream)
|
||||
}
|
||||
|
||||
struct SpoorJetstream;
|
||||
|
||||
#[async_trait]
|
||||
impl LexiconIngestor for SpoorJetstream {
|
||||
async fn ingest(&self, message: Event<Value>) -> Result<()> {
|
||||
info!("{:?}", message);
|
||||
// if let Some(commit) = &message.commit {
|
||||
// match commit.operation {
|
||||
// Operation::Create | Operation::Update => {}
|
||||
// Operation::Delete => {}
|
||||
// }
|
||||
// } else {
|
||||
// return Err("Message has no commit");
|
||||
// }
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
54
ingestor/src/ingestor.rs
Normal file
54
ingestor/src/ingestor.rs
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
use atproto::{
|
||||
Collection,
|
||||
lexicons::my::spoor::log::{Activity, Session},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use rocketman::{
|
||||
options::JetstreamOptions,
|
||||
ingestion::LexiconIngestor,
|
||||
// types::event::{Event, Operation},
|
||||
types::event::Event,
|
||||
connection::JetstreamConnection,
|
||||
handler,
|
||||
};
|
||||
use serde_json::value::Value;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use tracing::{error, info};
|
||||
|
||||
pub async fn start_ingestor() {
|
||||
info!("Starting ingestor");
|
||||
let nsids = vec![
|
||||
Activity::NSID.to_string(),
|
||||
Session::NSID.to_string(),
|
||||
];
|
||||
let opts = JetstreamOptions::builder().wanted_collections(nsids.clone()).build();
|
||||
let jetstream = JetstreamConnection::new(opts);
|
||||
|
||||
let mut ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
|
||||
for nsid in nsids {
|
||||
ingesters.insert(nsid, Box::new(SpoorJetstream));
|
||||
}
|
||||
|
||||
let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
|
||||
|
||||
let msg_rx = jetstream.get_msg_rx();
|
||||
let reconnect_tx = jetstream.get_reconnect_tx();
|
||||
|
||||
let cursor_clone = cursor.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Ok(message) = msg_rx.recv_async().await {
|
||||
if let Err(e) = handler::handle_message(message, &ingesters,
|
||||
reconnect_tx.clone(), cursor_clone.clone()).await {
|
||||
error!("Error processing message: {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if let Err(e) = jetstream.connect(cursor.clone()).await {
|
||||
error!("Failed to connect to Jetstream: {}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
8
ingestor/src/main.rs
Normal file
8
ingestor/src/main.rs
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let subscriber = tracing_subscriber::FmtSubscriber::new();
|
||||
let _ = tracing::subscriber::set_global_default(subscriber);
|
||||
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue