Collections refactor, unused in ingestor
This commit is contained in:
parent
9adc67cf4d
commit
23ba006b8f
5 changed files with 146 additions and 25 deletions
|
|
@ -1,25 +0,0 @@
|
||||||
use rocketman::types::event::Event;
|
|
||||||
use anyhow::Result;
|
|
||||||
|
|
||||||
enum Ingestor {
|
|
||||||
Jetstream(SpoorJetstream)
|
|
||||||
}
|
|
||||||
|
|
||||||
struct SpoorJetstream;
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl LexiconIngestor for SpoorJetstream {
|
|
||||||
async fn ingest(&self, message: Event<Value>) -> Result<()> {
|
|
||||||
info!("{:?}", message);
|
|
||||||
// if let Some(commit) = &message.commit {
|
|
||||||
// match commit.operation {
|
|
||||||
// Operation::Create | Operation::Update => {}
|
|
||||||
// Operation::Delete => {}
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// return Err("Message has no commit");
|
|
||||||
// }
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
83
ingestor/src/collections.rs
Normal file
83
ingestor/src/collections.rs
Normal file
|
|
@ -0,0 +1,83 @@
|
||||||
|
use rocketman::ingestion::LexiconIngestor;
|
||||||
|
|
||||||
|
macro_rules! create_commit_collection {
|
||||||
|
($collection:ident, $nsid:expr,
|
||||||
|
change => $change_fn:expr,
|
||||||
|
delete => $delete_fn:expr $(,)?
|
||||||
|
) => (
|
||||||
|
create_commit_collection!(
|
||||||
|
$collection, $nsid,
|
||||||
|
create => $change_fn,
|
||||||
|
update => $change_fn,
|
||||||
|
delete => $delete_fn,
|
||||||
|
);
|
||||||
|
);
|
||||||
|
($collection:ident, $nsid:expr,
|
||||||
|
create => $create_fn:expr,
|
||||||
|
update => $update_fn:expr,
|
||||||
|
delete => $delete_fn:expr $(,)?
|
||||||
|
) => (
|
||||||
|
pub struct $collection;
|
||||||
|
struct Ingestor;
|
||||||
|
|
||||||
|
impl $crate::collections::Collection for $collection {
|
||||||
|
fn get_nsid() -> String {
|
||||||
|
$nsid
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_ingestor(
|
||||||
|
&self
|
||||||
|
) -> Box<dyn rocketman::ingestion::LexiconIngestor + Send + Sync> {
|
||||||
|
Box::new(Ingestor::new())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Ingestor {
|
||||||
|
pub fn new() -> Self { Self }
|
||||||
|
|
||||||
|
pub async fn handle_commit(
|
||||||
|
&self,
|
||||||
|
message: rocketman::types::event::Event<serde_json::Value>
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
use rocketman::types::event::Operation;
|
||||||
|
|
||||||
|
let state = $crate::collections::CommitIngestorState { };
|
||||||
|
|
||||||
|
if let Some(commit) = &message.commit {
|
||||||
|
match commit.operation {
|
||||||
|
Operation::Create => ($create_fn)(state, message).await?,
|
||||||
|
Operation::Update => ($update_fn)(state, message).await?,
|
||||||
|
Operation::Delete => ($delete_fn)(state, message).await?,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err(anyhow::anyhow!("Message has no commit"));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl rocketman::ingestion::LexiconIngestor for Ingestor {
|
||||||
|
async fn ingest(
|
||||||
|
&self,
|
||||||
|
message: rocketman::types::event::Event<serde_json::Value>
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
self.handle_commit(message).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub mod my_spoor_session;
|
||||||
|
pub use my_spoor_session::MySpoorSession;
|
||||||
|
pub mod my_spoor_activity;
|
||||||
|
pub use my_spoor_activity::MySpoorActivity;
|
||||||
|
|
||||||
|
|
||||||
|
struct CommitIngestorState;
|
||||||
|
|
||||||
|
pub trait Collection {
|
||||||
|
fn get_nsid() -> String;
|
||||||
|
fn get_ingestor(&self) -> Box<dyn LexiconIngestor + Send + Sync>;
|
||||||
|
}
|
||||||
|
|
||||||
30
ingestor/src/collections/my_spoor_activity.rs
Normal file
30
ingestor/src/collections/my_spoor_activity.rs
Normal file
|
|
@ -0,0 +1,30 @@
|
||||||
|
use crate::collections::CommitIngestorState;
|
||||||
|
use atproto::{
|
||||||
|
Collection as AtprotoCollection,
|
||||||
|
lexicons::my::spoor::log::Activity,
|
||||||
|
};
|
||||||
|
use rocketman::types::event::Event;
|
||||||
|
use serde_json::Value;
|
||||||
|
use anyhow::Result;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
async fn handle_change(
|
||||||
|
_state: CommitIngestorState,
|
||||||
|
message: Event<Value>
|
||||||
|
) -> Result<()> {
|
||||||
|
info!("{:?}", message);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_delete(
|
||||||
|
_state: CommitIngestorState,
|
||||||
|
message: Event<Value>
|
||||||
|
) -> Result<()> {
|
||||||
|
info!("{:?}", message);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
create_commit_collection!(MySpoorActivity, Activity::NSID.to_string(),
|
||||||
|
change => handle_change,
|
||||||
|
delete => handle_delete,
|
||||||
|
);
|
||||||
30
ingestor/src/collections/my_spoor_session.rs
Normal file
30
ingestor/src/collections/my_spoor_session.rs
Normal file
|
|
@ -0,0 +1,30 @@
|
||||||
|
use crate::collections::CommitIngestorState;
|
||||||
|
use atproto::{
|
||||||
|
Collection as AtprotoCollection,
|
||||||
|
lexicons::my::spoor::log::Session,
|
||||||
|
};
|
||||||
|
use rocketman::types::event::Event;
|
||||||
|
use serde_json::Value;
|
||||||
|
use anyhow::Result;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
async fn handle_change(
|
||||||
|
_state: CommitIngestorState,
|
||||||
|
message: Event<Value>
|
||||||
|
) -> Result<()> {
|
||||||
|
info!("{:?}", message);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_delete(
|
||||||
|
_state: CommitIngestorState,
|
||||||
|
message: Event<Value>
|
||||||
|
) -> Result<()> {
|
||||||
|
info!("{:?}", message);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
create_commit_collection!(MySpoorSession, Session::NSID.to_string(),
|
||||||
|
change => handle_change,
|
||||||
|
delete => handle_delete,
|
||||||
|
);
|
||||||
|
|
@ -1,4 +1,7 @@
|
||||||
|
|
||||||
|
mod ingestor;
|
||||||
|
mod collections;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
let subscriber = tracing_subscriber::FmtSubscriber::new();
|
let subscriber = tracing_subscriber::FmtSubscriber::new();
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue