Compare commits
No commits in common. "05b33f0243775f25b83076a5504e0958d9e53f0a" and "9adc67cf4d87f3b07b526e7eb3d13460b899a576" have entirely different histories.
05b33f0243
...
9adc67cf4d
7 changed files with 60 additions and 218 deletions
25
ingestor/src/collection.rs
Normal file
25
ingestor/src/collection.rs
Normal file
|
|
@ -0,0 +1,25 @@
|
||||||
|
use rocketman::types::event::Event;
|
||||||
|
use anyhow::Result;
|
||||||
|
|
||||||
|
enum Ingestor {
|
||||||
|
Jetstream(SpoorJetstream)
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SpoorJetstream;
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl LexiconIngestor for SpoorJetstream {
|
||||||
|
async fn ingest(&self, message: Event<Value>) -> Result<()> {
|
||||||
|
info!("{:?}", message);
|
||||||
|
// if let Some(commit) = &message.commit {
|
||||||
|
// match commit.operation {
|
||||||
|
// Operation::Create | Operation::Update => {}
|
||||||
|
// Operation::Delete => {}
|
||||||
|
// }
|
||||||
|
// } else {
|
||||||
|
// return Err("Message has no commit");
|
||||||
|
// }
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -1,85 +0,0 @@
|
||||||
use rocketman::ingestion::LexiconIngestor;
|
|
||||||
|
|
||||||
macro_rules! create_commit_collection {
|
|
||||||
($collection:ident, $nsid:expr,
|
|
||||||
change => $change_fn:expr,
|
|
||||||
delete => $delete_fn:expr $(,)?
|
|
||||||
) => (
|
|
||||||
create_commit_collection!(
|
|
||||||
$collection, $nsid,
|
|
||||||
create => $change_fn,
|
|
||||||
update => $change_fn,
|
|
||||||
delete => $delete_fn,
|
|
||||||
);
|
|
||||||
);
|
|
||||||
($collection:ident, $nsid:expr,
|
|
||||||
create => $create_fn:expr,
|
|
||||||
update => $update_fn:expr,
|
|
||||||
delete => $delete_fn:expr $(,)?
|
|
||||||
) => (
|
|
||||||
pub struct $collection;
|
|
||||||
struct Ingestor;
|
|
||||||
|
|
||||||
impl $crate::collections::Collection for $collection {
|
|
||||||
fn new() -> Self { Self }
|
|
||||||
|
|
||||||
fn get_nsid(&self) -> String {
|
|
||||||
$nsid
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_ingestor(
|
|
||||||
&self
|
|
||||||
) -> Box<dyn rocketman::ingestion::LexiconIngestor + Send + Sync> {
|
|
||||||
Box::new(Ingestor::new())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Ingestor {
|
|
||||||
pub fn new() -> Self { Self }
|
|
||||||
|
|
||||||
pub async fn handle_commit(
|
|
||||||
&self,
|
|
||||||
message: rocketman::types::event::Event<serde_json::Value>
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
use rocketman::types::event::Operation;
|
|
||||||
|
|
||||||
let state = $crate::collections::CommitIngestorState { };
|
|
||||||
|
|
||||||
if let Some(commit) = &message.commit {
|
|
||||||
match commit.operation {
|
|
||||||
Operation::Create => ($create_fn)(state, message).await?,
|
|
||||||
Operation::Update => ($update_fn)(state, message).await?,
|
|
||||||
Operation::Delete => ($delete_fn)(state, message).await?,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Err(anyhow::anyhow!("Message has no commit"));
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl rocketman::ingestion::LexiconIngestor for Ingestor {
|
|
||||||
async fn ingest(
|
|
||||||
&self,
|
|
||||||
message: rocketman::types::event::Event<serde_json::Value>
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
self.handle_commit(message).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub mod my_spoor_log_session;
|
|
||||||
pub use my_spoor_log_session::MySpoorLogSession;
|
|
||||||
pub mod my_spoor_log_activity;
|
|
||||||
pub use my_spoor_log_activity::MySpoorLogActivity;
|
|
||||||
|
|
||||||
struct CommitIngestorState;
|
|
||||||
|
|
||||||
pub trait Collection {
|
|
||||||
fn new() -> Self;
|
|
||||||
fn get_nsid(&self) -> String;
|
|
||||||
fn get_ingestor(&self) -> Box<dyn LexiconIngestor + Send + Sync>;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
@ -1,26 +0,0 @@
|
||||||
use crate::collections::CommitIngestorState;
|
|
||||||
use rocketman::types::event::Event;
|
|
||||||
use serde_json::Value;
|
|
||||||
use anyhow::Result;
|
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
async fn handle_change(
|
|
||||||
_state: CommitIngestorState,
|
|
||||||
message: Event<Value>
|
|
||||||
) -> Result<()> {
|
|
||||||
info!("{:?}", message);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_delete(
|
|
||||||
_state: CommitIngestorState,
|
|
||||||
message: Event<Value>
|
|
||||||
) -> Result<()> {
|
|
||||||
info!("{:?}", message);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
create_commit_collection!(AppBskyFeedPost, "app.bsky.feed.post".to_string(),
|
|
||||||
change => handle_change,
|
|
||||||
delete => handle_delete,
|
|
||||||
);
|
|
||||||
|
|
@ -1,30 +0,0 @@
|
||||||
use crate::collections::CommitIngestorState;
|
|
||||||
use atproto::{
|
|
||||||
Collection as AtprotoCollection,
|
|
||||||
lexicons::my::spoor::log::Activity,
|
|
||||||
};
|
|
||||||
use rocketman::types::event::Event;
|
|
||||||
use serde_json::Value;
|
|
||||||
use anyhow::Result;
|
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
async fn handle_change(
|
|
||||||
_state: CommitIngestorState,
|
|
||||||
message: Event<Value>
|
|
||||||
) -> Result<()> {
|
|
||||||
info!("{:?}", message);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_delete(
|
|
||||||
_state: CommitIngestorState,
|
|
||||||
message: Event<Value>
|
|
||||||
) -> Result<()> {
|
|
||||||
info!("{:?}", message);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
create_commit_collection!(MySpoorLogActivity, Activity::NSID.to_string(),
|
|
||||||
change => handle_change,
|
|
||||||
delete => handle_delete,
|
|
||||||
);
|
|
||||||
|
|
@ -1,30 +0,0 @@
|
||||||
use crate::collections::CommitIngestorState;
|
|
||||||
use atproto::{
|
|
||||||
Collection as AtprotoCollection,
|
|
||||||
lexicons::my::spoor::log::Session,
|
|
||||||
};
|
|
||||||
use rocketman::types::event::Event;
|
|
||||||
use serde_json::Value;
|
|
||||||
use anyhow::Result;
|
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
async fn handle_change(
|
|
||||||
_state: CommitIngestorState,
|
|
||||||
message: Event<Value>
|
|
||||||
) -> Result<()> {
|
|
||||||
info!("{:?}", message);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_delete(
|
|
||||||
_state: CommitIngestorState,
|
|
||||||
message: Event<Value>
|
|
||||||
) -> Result<()> {
|
|
||||||
info!("{:?}", message);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
create_commit_collection!(MySpoorLogSession, Session::NSID.to_string(),
|
|
||||||
change => handle_change,
|
|
||||||
delete => handle_delete,
|
|
||||||
);
|
|
||||||
|
|
@ -1,53 +1,54 @@
|
||||||
use crate::collections::Collection;
|
use atproto::{
|
||||||
|
Collection,
|
||||||
|
lexicons::my::spoor::log::{Activity, Session},
|
||||||
|
};
|
||||||
|
use async_trait::async_trait;
|
||||||
use rocketman::{
|
use rocketman::{
|
||||||
options::JetstreamOptions,
|
options::JetstreamOptions,
|
||||||
ingestion::LexiconIngestor,
|
ingestion::LexiconIngestor,
|
||||||
|
// types::event::{Event, Operation},
|
||||||
|
types::event::Event,
|
||||||
connection::JetstreamConnection,
|
connection::JetstreamConnection,
|
||||||
handler,
|
handler,
|
||||||
};
|
};
|
||||||
|
use serde_json::value::Value;
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
};
|
};
|
||||||
use tracing::{info, error};
|
use tracing::{error, info};
|
||||||
|
|
||||||
pub struct Ingestor {
|
pub async fn start_ingestor() {
|
||||||
ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>>,
|
info!("Starting ingestor");
|
||||||
}
|
let nsids = vec![
|
||||||
|
Activity::NSID.to_string(),
|
||||||
|
Session::NSID.to_string(),
|
||||||
|
];
|
||||||
|
let opts = JetstreamOptions::builder().wanted_collections(nsids.clone()).build();
|
||||||
|
let jetstream = JetstreamConnection::new(opts);
|
||||||
|
|
||||||
impl Ingestor {
|
let mut ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
|
||||||
pub fn new() -> Self {
|
for nsid in nsids {
|
||||||
Self { ingesters: HashMap::new() }
|
ingesters.insert(nsid, Box::new(SpoorJetstream));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add_collection<C: Collection>(&mut self, collection: C) {
|
let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
|
||||||
self.ingesters.insert(collection.get_nsid(), collection.get_ingestor());
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn start(self) {
|
let msg_rx = jetstream.get_msg_rx();
|
||||||
info!("Starting ingestor with the following collections: {:?}",
|
let reconnect_tx = jetstream.get_reconnect_tx();
|
||||||
self.ingesters.keys());
|
|
||||||
let opts = JetstreamOptions::builder()
|
|
||||||
.wanted_collections(self.ingesters.keys().cloned().collect())
|
|
||||||
.build();
|
|
||||||
let jetstream = JetstreamConnection::new(opts);
|
|
||||||
let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
|
|
||||||
let msg_rx = jetstream.get_msg_rx();
|
|
||||||
let reconnect_tx = jetstream.get_reconnect_tx();
|
|
||||||
|
|
||||||
let cursor_clone = cursor.clone();
|
let cursor_clone = cursor.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
while let Ok(message) = msg_rx.recv_async().await {
|
while let Ok(message) = msg_rx.recv_async().await {
|
||||||
if let Err(e) = handler::handle_message(message, &self.ingesters,
|
if let Err(e) = handler::handle_message(message, &ingesters,
|
||||||
reconnect_tx.clone(), cursor_clone.clone()).await {
|
reconnect_tx.clone(), cursor_clone.clone()).await {
|
||||||
error!("Error processing message: {}", e);
|
error!("Error processing message: {}", e);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
if let Err(e) = jetstream.connect(cursor.clone()).await {
|
|
||||||
error!("Failed to connect to Jetstream: {}", e);
|
|
||||||
std::process::exit(1);
|
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if let Err(e) = jetstream.connect(cursor.clone()).await {
|
||||||
|
error!("Failed to connect to Jetstream: {}", e);
|
||||||
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,21 +1,8 @@
|
||||||
use crate::{
|
|
||||||
collections::{
|
|
||||||
Collection,
|
|
||||||
MySpoorLogActivity, MySpoorLogSession,
|
|
||||||
},
|
|
||||||
ingestor::Ingestor,
|
|
||||||
};
|
|
||||||
|
|
||||||
mod ingestor;
|
|
||||||
mod collections;
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
let subscriber = tracing_subscriber::FmtSubscriber::new();
|
let subscriber = tracing_subscriber::FmtSubscriber::new();
|
||||||
let _ = tracing::subscriber::set_global_default(subscriber);
|
let _ = tracing::subscriber::set_global_default(subscriber);
|
||||||
|
|
||||||
let mut ingestor = Ingestor::new();
|
|
||||||
ingestor.add_collection(MySpoorLogActivity::new());
|
|
||||||
ingestor.add_collection(MySpoorLogSession::new());
|
|
||||||
ingestor.start().await;
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue