From 3453e919b413f380d9e0ef94d7a69841459e9964 Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Tue, 28 Oct 2025 11:33:23 -0700 Subject: [PATCH] Collection Consuming interface --- backend/src/bin/consumer.rs | 46 ++++++++++++++---- backend/src/jetstream.rs | 93 +++++++++++++++++++++++++++++++++---- backend/src/lib.rs | 22 --------- 3 files changed, 122 insertions(+), 39 deletions(-) diff --git a/backend/src/bin/consumer.rs b/backend/src/bin/consumer.rs index ee5320b..f3c6636 100644 --- a/backend/src/bin/consumer.rs +++ b/backend/src/bin/consumer.rs @@ -1,8 +1,16 @@ use backend::{ create_db_pool, - create_consumer, - jetstream::Handler, + jetstream::{ + Handler, + create_consumer, + CollectionIngestor, + IngestorCommitData, + IngestorDeleteData, + }, }; +use async_trait::async_trait; +use sqlx::PgPool; +use std::boxed::Box; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -11,13 +19,33 @@ async fn main() -> anyhow::Result<()> { let db_connection_str = std::env::var("DATABASE_URL").unwrap(); let pool = create_db_pool(&db_connection_str).await; - let jetstream_consumer = create_consumer(vec![ - "app.bsky.feed.post".to_string() - ]); - jetstream_consumer.register_handler(std::sync::Arc::new( - Handler::new(pool.clone()) - )).await?; + let jetstream_consumer = create_consumer(Handler::new(vec![ + Box::new(BskyIngestor { _db: pool }) + ])).await?; let jetstream_cancel_token = atproto_jetstream::CancellationToken::new(); - jetstream_consumer.run_background(jetstream_cancel_token).await } + +struct BskyIngestor { + _db: PgPool +} + +#[async_trait] +impl CollectionIngestor for BskyIngestor { + async fn handle_commit( + &self, data: IngestorCommitData + ) -> anyhow::Result<()> { + println!("{:?}", data.did); + Ok(()) + } + + async fn handle_delete( + &self, _data: IngestorDeleteData + ) -> anyhow::Result<()> { + Ok(()) + } + + fn collection(&self) -> String { + "app.bsky.feed.post".to_string() + } +} diff --git a/backend/src/jetstream.rs b/backend/src/jetstream.rs index 7f43cec..b214508 100644 --- a/backend/src/jetstream.rs +++ b/backend/src/jetstream.rs @@ -1,25 +1,102 @@ use atproto_jetstream::{ - EventHandler, - JetstreamEvent, + Consumer, + ConsumerTaskConfig, + EventHandler, + JetstreamEvent, + JetstreamEventCommit, + JetstreamEventDelete, }; use async_trait::async_trait; -use sqlx::postgres::PgPool; +use std::{ + collections::HashMap, + boxed::Box, +}; + +#[async_trait] +pub trait CollectionIngestor: Send + Sync { + async fn handle_commit(&self, data: IngestorCommitData) -> anyhow::Result<()>; + async fn handle_delete(&self, data: IngestorDeleteData) -> anyhow::Result<()>; + fn collection(&self) -> String; +} + +pub struct IngestorCommitData { + pub did: String, + pub indexed_at: u64, + pub commit: JetstreamEventCommit, +} +pub struct IngestorDeleteData { + pub did: String, + pub indexed_at: u64, + pub commit: JetstreamEventDelete, +} + +pub async fn create_consumer(handler: Handler) -> anyhow::Result { + let jetstream_consumer_config = ConsumerTaskConfig { + user_agent: "my-app/1.0".to_string(), + compression: false, + zstd_dictionary_location: String::new(), + jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(), + collections: handler.get_collections(), + dids: vec![], + max_message_size_bytes: None, + cursor: None, + require_hello: false, + }; + + let consumer = Consumer::new(jetstream_consumer_config); + consumer.register_handler(std::sync::Arc::new(handler)).await?; + + Ok(consumer) +} pub struct Handler { - db: PgPool, + collection_ingestors: + HashMap>, } impl Handler { - pub fn new(db: PgPool) -> Self { - Handler { db } + pub fn new( + collection_ingestors: Vec> + ) -> Self { + let mut ingestors_hashmap = HashMap::new(); + for collection_ingestor in collection_ingestors { + ingestors_hashmap.insert( + collection_ingestor.collection(), collection_ingestor + ); + } + + Self { collection_ingestors: ingestors_hashmap } + } + + pub fn get_collections(&self) -> Vec { + self.collection_ingestors.keys().cloned().collect() } } #[async_trait] impl EventHandler for Handler { async fn handle_event(&self, event: JetstreamEvent) -> anyhow::Result<()> { - // println!("Received event: {:?}", event); - Ok(()) + match event { + JetstreamEvent::Commit { did, time_us, commit, .. } => { + if let Some(ingestor) = + self.collection_ingestors.get(&commit.collection) { + ingestor.handle_commit(IngestorCommitData { + did, indexed_at: time_us, commit + }).await?; + } + Ok(()) + }, + JetstreamEvent::Delete { did, time_us, commit, .. } => { + if let Some(ingestor) = + self.collection_ingestors.get(&commit.collection) { + ingestor.handle_delete(IngestorDeleteData { + did, indexed_at: time_us, commit + }).await?; + } + Ok(()) + }, + _ => Ok(()), + } } fn handler_id(&self) -> String { diff --git a/backend/src/lib.rs b/backend/src/lib.rs index c930e2f..f6c942a 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -1,7 +1,3 @@ -use atproto_jetstream::{ - Consumer, - ConsumerTaskConfig, -}; use sqlx::postgres::{PgPool, PgPoolOptions}; use std::time::Duration; @@ -22,21 +18,3 @@ pub async fn create_db_pool(url: &str) -> PgPool { .await .expect("can't connect to database") } - -pub fn create_consumer( - collections: Vec -) -> Consumer { - let jetstream_consumer_config = ConsumerTaskConfig { - user_agent: "my-app/1.0".to_string(), - compression: false, - zstd_dictionary_location: String::new(), - jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(), - collections: collections, - dids: vec![], - max_message_size_bytes: None, - cursor: None, - require_hello: false, - }; - - Consumer::new(jetstream_consumer_config) -}