Collections refactor, unused in ingestor

Ingestor refactor complete baby~

Just make sure everything works

fix naming convention
This commit is contained in:
Julia Lange 2025-06-05 14:00:07 -07:00
parent 9adc67cf4d
commit 6d60756348
Signed by: Julia
SSH key fingerprint: SHA256:5DJcfxa5/fKCYn57dcabJa2vN2e6eT0pBerYi5SUbto
7 changed files with 218 additions and 60 deletions

View file

@ -1,25 +0,0 @@
use rocketman::types::event::Event;
use anyhow::Result;
enum Ingestor {
Jetstream(SpoorJetstream)
}
struct SpoorJetstream;
#[async_trait]
impl LexiconIngestor for SpoorJetstream {
async fn ingest(&self, message: Event<Value>) -> Result<()> {
info!("{:?}", message);
// if let Some(commit) = &message.commit {
// match commit.operation {
// Operation::Create | Operation::Update => {}
// Operation::Delete => {}
// }
// } else {
// return Err("Message has no commit");
// }
Ok(())
}
}

View file

@ -0,0 +1,85 @@
use rocketman::ingestion::LexiconIngestor;
macro_rules! create_commit_collection {
($collection:ident, $nsid:expr,
change => $change_fn:expr,
delete => $delete_fn:expr $(,)?
) => (
create_commit_collection!(
$collection, $nsid,
create => $change_fn,
update => $change_fn,
delete => $delete_fn,
);
);
($collection:ident, $nsid:expr,
create => $create_fn:expr,
update => $update_fn:expr,
delete => $delete_fn:expr $(,)?
) => (
pub struct $collection;
struct Ingestor;
impl $crate::collections::Collection for $collection {
fn new() -> Self { Self }
fn get_nsid(&self) -> String {
$nsid
}
fn get_ingestor(
&self
) -> Box<dyn rocketman::ingestion::LexiconIngestor + Send + Sync> {
Box::new(Ingestor::new())
}
}
impl Ingestor {
pub fn new() -> Self { Self }
pub async fn handle_commit(
&self,
message: rocketman::types::event::Event<serde_json::Value>
) -> anyhow::Result<()> {
use rocketman::types::event::Operation;
let state = $crate::collections::CommitIngestorState { };
if let Some(commit) = &message.commit {
match commit.operation {
Operation::Create => ($create_fn)(state, message).await?,
Operation::Update => ($update_fn)(state, message).await?,
Operation::Delete => ($delete_fn)(state, message).await?,
}
} else {
return Err(anyhow::anyhow!("Message has no commit"));
}
Ok(())
}
}
#[async_trait::async_trait]
impl rocketman::ingestion::LexiconIngestor for Ingestor {
async fn ingest(
&self,
message: rocketman::types::event::Event<serde_json::Value>
) -> anyhow::Result<()> {
self.handle_commit(message).await
}
}
);
}
pub mod my_spoor_log_session;
pub use my_spoor_log_session::MySpoorLogSession;
pub mod my_spoor_log_activity;
pub use my_spoor_log_activity::MySpoorLogActivity;
struct CommitIngestorState;
pub trait Collection {
fn new() -> Self;
fn get_nsid(&self) -> String;
fn get_ingestor(&self) -> Box<dyn LexiconIngestor + Send + Sync>;
}

View file

@ -0,0 +1,26 @@
use crate::collections::CommitIngestorState;
use rocketman::types::event::Event;
use serde_json::Value;
use anyhow::Result;
use tracing::info;
async fn handle_change(
_state: CommitIngestorState,
message: Event<Value>
) -> Result<()> {
info!("{:?}", message);
Ok(())
}
async fn handle_delete(
_state: CommitIngestorState,
message: Event<Value>
) -> Result<()> {
info!("{:?}", message);
Ok(())
}
create_commit_collection!(AppBskyFeedPost, "app.bsky.feed.post".to_string(),
change => handle_change,
delete => handle_delete,
);

View file

@ -0,0 +1,30 @@
use crate::collections::CommitIngestorState;
use atproto::{
Collection as AtprotoCollection,
lexicons::my::spoor::log::Activity,
};
use rocketman::types::event::Event;
use serde_json::Value;
use anyhow::Result;
use tracing::info;
async fn handle_change(
_state: CommitIngestorState,
message: Event<Value>
) -> Result<()> {
info!("{:?}", message);
Ok(())
}
async fn handle_delete(
_state: CommitIngestorState,
message: Event<Value>
) -> Result<()> {
info!("{:?}", message);
Ok(())
}
create_commit_collection!(MySpoorLogActivity, Activity::NSID.to_string(),
change => handle_change,
delete => handle_delete,
);

View file

@ -0,0 +1,30 @@
use crate::collections::CommitIngestorState;
use atproto::{
Collection as AtprotoCollection,
lexicons::my::spoor::log::Session,
};
use rocketman::types::event::Event;
use serde_json::Value;
use anyhow::Result;
use tracing::info;
async fn handle_change(
_state: CommitIngestorState,
message: Event<Value>
) -> Result<()> {
info!("{:?}", message);
Ok(())
}
async fn handle_delete(
_state: CommitIngestorState,
message: Event<Value>
) -> Result<()> {
info!("{:?}", message);
Ok(())
}
create_commit_collection!(MySpoorLogSession, Session::NSID.to_string(),
change => handle_change,
delete => handle_delete,
);

View file

@ -1,46 +1,44 @@
use atproto::{
Collection,
lexicons::my::spoor::log::{Activity, Session},
};
use async_trait::async_trait;
use crate::collections::Collection;
use rocketman::{
options::JetstreamOptions,
ingestion::LexiconIngestor,
// types::event::{Event, Operation},
types::event::Event,
connection::JetstreamConnection,
handler,
};
use serde_json::value::Value;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use tracing::{error, info};
use tracing::{info, error};
pub async fn start_ingestor() {
info!("Starting ingestor");
let nsids = vec![
Activity::NSID.to_string(),
Session::NSID.to_string(),
];
let opts = JetstreamOptions::builder().wanted_collections(nsids.clone()).build();
let jetstream = JetstreamConnection::new(opts);
let mut ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
for nsid in nsids {
ingesters.insert(nsid, Box::new(SpoorJetstream));
pub struct Ingestor {
ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>>,
}
let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
impl Ingestor {
pub fn new() -> Self {
Self { ingesters: HashMap::new() }
}
pub fn add_collection<C: Collection>(&mut self, collection: C) {
self.ingesters.insert(collection.get_nsid(), collection.get_ingestor());
}
pub async fn start(self) {
info!("Starting ingestor with the following collections: {:?}",
self.ingesters.keys());
let opts = JetstreamOptions::builder()
.wanted_collections(self.ingesters.keys().cloned().collect())
.build();
let jetstream = JetstreamConnection::new(opts);
let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
let msg_rx = jetstream.get_msg_rx();
let reconnect_tx = jetstream.get_reconnect_tx();
let cursor_clone = cursor.clone();
tokio::spawn(async move {
while let Ok(message) = msg_rx.recv_async().await {
if let Err(e) = handler::handle_message(message, &ingesters,
if let Err(e) = handler::handle_message(message, &self.ingesters,
reconnect_tx.clone(), cursor_clone.clone()).await {
error!("Error processing message: {}", e);
}
@ -52,3 +50,4 @@ pub async fn start_ingestor() {
std::process::exit(1);
}
}
}

View file

@ -1,8 +1,21 @@
use crate::{
collections::{
Collection,
MySpoorLogActivity, MySpoorLogSession,
},
ingestor::Ingestor,
};
mod ingestor;
mod collections;
#[tokio::main]
async fn main() {
let subscriber = tracing_subscriber::FmtSubscriber::new();
let _ = tracing::subscriber::set_global_default(subscriber);
let mut ingestor = Ingestor::new();
ingestor.add_collection(MySpoorLogActivity::new());
ingestor.add_collection(MySpoorLogSession::new());
ingestor.start().await;
}