Global, mono-binary to libraries and binaries

This separates the previous mono-binary setup into separate libraries
and binaries. Specifically it split the old since api/ingestor binary
into an Atproto, and DB library, as well as an api, and ingestor binary.

Atproto Lib
Was mostly untouched. The original URI implementation was changed to use
FromStr, otherwise only imports were changed.

DB Lib
Is mostly unused, so there wasn't much that needed to be changed. Some
new files were added so that future work on it can hit the ground
running.

Api Binary
Is almost entirely the same. Imports were changed and the ingestor code
of main was removed.

Ingestor Binary
Was almost entirely refactored. An interface to made injestors was
added, and it was modularized. The only shared code is in
Ingestor.start(), and collections.rs's macros, but that is mostly
boilerplate.
This commit is contained in:
Julia Lange 2025-05-22 15:22:43 -07:00
parent 45acaaa601
commit eb28549a0f
Signed by: Julia
SSH key fingerprint: SHA256:5DJcfxa5/fKCYn57dcabJa2vN2e6eT0pBerYi5SUbto
31 changed files with 582 additions and 636 deletions

15
ingestor/Cargo.toml Normal file
View file

@ -0,0 +1,15 @@
[package]
name = "ingestor"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow = "1.0.98"
async-trait = "0.1.88"
atproto.workspace = true
rocketman = "0.2.0"
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true

View file

@ -0,0 +1,85 @@
use rocketman::ingestion::LexiconIngestor;
macro_rules! create_commit_collection {
($collection:ident, $nsid:expr,
change => $change_fn:expr,
delete => $delete_fn:expr $(,)?
) => (
create_commit_collection!(
$collection, $nsid,
create => $change_fn,
update => $change_fn,
delete => $delete_fn,
);
);
($collection:ident, $nsid:expr,
create => $create_fn:expr,
update => $update_fn:expr,
delete => $delete_fn:expr $(,)?
) => (
pub struct $collection;
struct Ingestor;
impl $crate::collections::Collection for $collection {
fn new() -> Self { Self }
fn get_nsid(&self) -> String {
$nsid
}
fn get_ingestor(
&self
) -> Box<dyn rocketman::ingestion::LexiconIngestor + Send + Sync> {
Box::new(Ingestor::new())
}
}
impl Ingestor {
pub fn new() -> Self { Self }
pub async fn handle_commit(
&self,
message: rocketman::types::event::Event<serde_json::Value>
) -> anyhow::Result<()> {
use rocketman::types::event::Operation;
let state = $crate::collections::CommitIngestorState { };
if let Some(commit) = &message.commit {
match commit.operation {
Operation::Create => ($create_fn)(state, message).await?,
Operation::Update => ($update_fn)(state, message).await?,
Operation::Delete => ($delete_fn)(state, message).await?,
}
} else {
return Err(anyhow::anyhow!("Message has no commit"));
}
Ok(())
}
}
#[async_trait::async_trait]
impl rocketman::ingestion::LexiconIngestor for Ingestor {
async fn ingest(
&self,
message: rocketman::types::event::Event<serde_json::Value>
) -> anyhow::Result<()> {
self.handle_commit(message).await
}
}
);
}
pub mod my_spoor_log_session;
pub use my_spoor_log_session::MySpoorLogSession;
pub mod my_spoor_log_activity;
pub use my_spoor_log_activity::MySpoorLogActivity;
struct CommitIngestorState;
pub trait Collection {
fn new() -> Self;
fn get_nsid(&self) -> String;
fn get_ingestor(&self) -> Box<dyn LexiconIngestor + Send + Sync>;
}

View file

@ -0,0 +1,26 @@
use crate::collections::CommitIngestorState;
use rocketman::types::event::Event;
use serde_json::Value;
use anyhow::Result;
use tracing::info;
async fn handle_change(
_state: CommitIngestorState,
message: Event<Value>
) -> Result<()> {
info!("{:?}", message);
Ok(())
}
async fn handle_delete(
_state: CommitIngestorState,
message: Event<Value>
) -> Result<()> {
info!("{:?}", message);
Ok(())
}
create_commit_collection!(AppBskyFeedPost, "app.bsky.feed.post".to_string(),
change => handle_change,
delete => handle_delete,
);

View file

@ -0,0 +1,30 @@
use crate::collections::CommitIngestorState;
use atproto::{
Collection as AtprotoCollection,
lexicons::my::spoor::log::Activity,
};
use rocketman::types::event::Event;
use serde_json::Value;
use anyhow::Result;
use tracing::info;
async fn handle_change(
_state: CommitIngestorState,
message: Event<Value>
) -> Result<()> {
info!("{:?}", message);
Ok(())
}
async fn handle_delete(
_state: CommitIngestorState,
message: Event<Value>
) -> Result<()> {
info!("{:?}", message);
Ok(())
}
create_commit_collection!(MySpoorLogActivity, Activity::NSID.to_string(),
change => handle_change,
delete => handle_delete,
);

View file

@ -0,0 +1,30 @@
use crate::collections::CommitIngestorState;
use atproto::{
Collection as AtprotoCollection,
lexicons::my::spoor::log::Session,
};
use rocketman::types::event::Event;
use serde_json::Value;
use anyhow::Result;
use tracing::info;
async fn handle_change(
_state: CommitIngestorState,
message: Event<Value>
) -> Result<()> {
info!("{:?}", message);
Ok(())
}
async fn handle_delete(
_state: CommitIngestorState,
message: Event<Value>
) -> Result<()> {
info!("{:?}", message);
Ok(())
}
create_commit_collection!(MySpoorLogSession, Session::NSID.to_string(),
change => handle_change,
delete => handle_delete,
);

53
ingestor/src/ingestor.rs Normal file
View file

@ -0,0 +1,53 @@
use crate::collections::Collection;
use rocketman::{
options::JetstreamOptions,
ingestion::LexiconIngestor,
connection::JetstreamConnection,
handler,
};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use tracing::{info, error};
pub struct Ingestor {
ingestors: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>>,
}
impl Ingestor {
pub fn new() -> Self {
Self { ingestors: HashMap::new() }
}
pub fn add_collection<C: Collection>(&mut self, collection: C) {
self.ingestors.insert(collection.get_nsid(), collection.get_ingestor());
}
pub async fn start(self) {
info!("Starting ingestor with the following collections: {:?}",
self.ingestors.keys());
let opts = JetstreamOptions::builder()
.wanted_collections(self.ingestors.keys().cloned().collect())
.build();
let jetstream = JetstreamConnection::new(opts);
let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
let msg_rx = jetstream.get_msg_rx();
let reconnect_tx = jetstream.get_reconnect_tx();
let cursor_clone = cursor.clone();
tokio::spawn(async move {
while let Ok(message) = msg_rx.recv_async().await {
if let Err(e) = handler::handle_message(message, &self.ingestors,
reconnect_tx.clone(), cursor_clone.clone()).await {
error!("Error processing message: {}", e);
}
}
});
if let Err(e) = jetstream.connect(cursor.clone()).await {
error!("Failed to connect to Jetstream: {}", e);
std::process::exit(1);
}
}
}

21
ingestor/src/main.rs Normal file
View file

@ -0,0 +1,21 @@
use crate::{
collections::{
Collection,
MySpoorLogActivity, MySpoorLogSession,
},
ingestor::Ingestor,
};
mod ingestor;
mod collections;
#[tokio::main]
async fn main() {
let subscriber = tracing_subscriber::FmtSubscriber::new();
let _ = tracing::subscriber::set_global_default(subscriber);
let mut ingestor = Ingestor::new();
ingestor.add_collection(MySpoorLogActivity::new());
ingestor.add_collection(MySpoorLogSession::new());
ingestor.start().await;
}