From 2f2e653d130a1ca690162bc96ea0a9343b088d24 Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Tue, 27 May 2025 15:12:46 -0700 Subject: [PATCH] Ingestor, scaffolding --- ingestor/Cargo.toml | 15 +++++++++++++++ ingestor/src/collection.rs | 25 +++++++++++++++++++++++++ {src => ingestor/src}/ingestor.rs | 29 ++++------------------------- ingestor/src/main.rs | 8 ++++++++ 4 files changed, 52 insertions(+), 25 deletions(-) create mode 100644 ingestor/Cargo.toml create mode 100644 ingestor/src/collection.rs rename {src => ingestor/src}/ingestor.rs (70%) create mode 100644 ingestor/src/main.rs diff --git a/ingestor/Cargo.toml b/ingestor/Cargo.toml new file mode 100644 index 0000000..a177581 --- /dev/null +++ b/ingestor/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "ingestor" +version = "0.1.0" +edition = "2024" + +[dependencies] +anyhow = "1.0.98" +async-trait = "0.1.88" +atproto.workspace = true +rocketman = "0.2.0" +serde.workspace = true +serde_json.workspace = true +tokio.workspace = true +tracing-subscriber.workspace = true +tracing.workspace = true diff --git a/ingestor/src/collection.rs b/ingestor/src/collection.rs new file mode 100644 index 0000000..69128d3 --- /dev/null +++ b/ingestor/src/collection.rs @@ -0,0 +1,25 @@ +use rocketman::types::event::Event; +use anyhow::Result; + +enum Ingestor { + Jetstream(SpoorJetstream) +} + +struct SpoorJetstream; + +#[async_trait] +impl LexiconIngestor for SpoorJetstream { + async fn ingest(&self, message: Event) -> Result<()> { + info!("{:?}", message); + // if let Some(commit) = &message.commit { + // match commit.operation { + // Operation::Create | Operation::Update => {} + // Operation::Delete => {} + // } + // } else { + // return Err("Message has no commit"); + // } + Ok(()) + } +} + diff --git a/src/ingestor.rs b/ingestor/src/ingestor.rs similarity index 70% rename from src/ingestor.rs rename to ingestor/src/ingestor.rs index 1dc31cc..d25f102 100644 --- a/src/ingestor.rs +++ b/ingestor/src/ingestor.rs @@ -1,6 +1,8 @@ -use crate::lexicons::my::spoor::log::{Activity, Session}; +use atproto::{ + Collection, + lexicons::my::spoor::log::{Activity, Session}, +}; use async_trait::async_trait; -use atrium_api::types::Collection; use rocketman::{ options::JetstreamOptions, ingestion::LexiconIngestor, @@ -15,29 +17,6 @@ use std::{ sync::{Arc, Mutex}, }; use tracing::{error, info}; -use anyhow::Result; - -enum Ingestor { - Jetstream(SpoorJetstream) -} - -struct SpoorJetstream; - -#[async_trait] -impl LexiconIngestor for SpoorJetstream { - async fn ingest(&self, message: Event) -> Result<()> { - info!("{:?}", message); - // if let Some(commit) = &message.commit { - // match commit.operation { - // Operation::Create | Operation::Update => {} - // Operation::Delete => {} - // } - // } else { - // return Err("Message has no commit"); - // } - Ok(()) - } -} pub async fn start_ingestor() { info!("Starting ingestor"); diff --git a/ingestor/src/main.rs b/ingestor/src/main.rs new file mode 100644 index 0000000..4289770 --- /dev/null +++ b/ingestor/src/main.rs @@ -0,0 +1,8 @@ + +#[tokio::main] +async fn main() { + let subscriber = tracing_subscriber::FmtSubscriber::new(); + let _ = tracing::subscriber::set_global_default(subscriber); + + +}