Collection Consuming interface

This commit is contained in:
Julia Lange 2025-10-28 11:33:23 -07:00
parent aa8f931862
commit 3453e919b4
Signed by: Julia
SSH key fingerprint: SHA256:50XUMcOFYPUs9/1j7p9SPnwASZ7QnxXm7THF7HkbqzQ
3 changed files with 122 additions and 39 deletions

View file

@ -1,8 +1,16 @@
use backend::{ use backend::{
create_db_pool, create_db_pool,
create_consumer, jetstream::{
jetstream::Handler, Handler,
create_consumer,
CollectionIngestor,
IngestorCommitData,
IngestorDeleteData,
},
}; };
use async_trait::async_trait;
use sqlx::PgPool;
use std::boxed::Box;
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
@ -11,13 +19,33 @@ async fn main() -> anyhow::Result<()> {
let db_connection_str = std::env::var("DATABASE_URL").unwrap(); let db_connection_str = std::env::var("DATABASE_URL").unwrap();
let pool = create_db_pool(&db_connection_str).await; let pool = create_db_pool(&db_connection_str).await;
let jetstream_consumer = create_consumer(vec![ let jetstream_consumer = create_consumer(Handler::new(vec![
"app.bsky.feed.post".to_string() Box::new(BskyIngestor { _db: pool })
]); ])).await?;
jetstream_consumer.register_handler(std::sync::Arc::new(
Handler::new(pool.clone())
)).await?;
let jetstream_cancel_token = atproto_jetstream::CancellationToken::new(); let jetstream_cancel_token = atproto_jetstream::CancellationToken::new();
jetstream_consumer.run_background(jetstream_cancel_token).await jetstream_consumer.run_background(jetstream_cancel_token).await
} }
struct BskyIngestor {
_db: PgPool
}
#[async_trait]
impl CollectionIngestor for BskyIngestor {
async fn handle_commit(
&self, data: IngestorCommitData
) -> anyhow::Result<()> {
println!("{:?}", data.did);
Ok(())
}
async fn handle_delete(
&self, _data: IngestorDeleteData
) -> anyhow::Result<()> {
Ok(())
}
fn collection(&self) -> String {
"app.bsky.feed.post".to_string()
}
}

View file

@ -1,25 +1,102 @@
use atproto_jetstream::{ use atproto_jetstream::{
Consumer,
ConsumerTaskConfig,
EventHandler, EventHandler,
JetstreamEvent, JetstreamEvent,
JetstreamEventCommit,
JetstreamEventDelete,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use sqlx::postgres::PgPool; use std::{
collections::HashMap,
boxed::Box,
};
#[async_trait]
pub trait CollectionIngestor: Send + Sync {
async fn handle_commit(&self, data: IngestorCommitData) -> anyhow::Result<()>;
async fn handle_delete(&self, data: IngestorDeleteData) -> anyhow::Result<()>;
fn collection(&self) -> String;
}
pub struct IngestorCommitData {
pub did: String,
pub indexed_at: u64,
pub commit: JetstreamEventCommit,
}
pub struct IngestorDeleteData {
pub did: String,
pub indexed_at: u64,
pub commit: JetstreamEventDelete,
}
pub async fn create_consumer(handler: Handler) -> anyhow::Result<Consumer> {
let jetstream_consumer_config = ConsumerTaskConfig {
user_agent: "my-app/1.0".to_string(),
compression: false,
zstd_dictionary_location: String::new(),
jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(),
collections: handler.get_collections(),
dids: vec![],
max_message_size_bytes: None,
cursor: None,
require_hello: false,
};
let consumer = Consumer::new(jetstream_consumer_config);
consumer.register_handler(std::sync::Arc::new(handler)).await?;
Ok(consumer)
}
pub struct Handler { pub struct Handler {
db: PgPool, collection_ingestors:
HashMap<String, Box<dyn CollectionIngestor>>,
} }
impl Handler { impl Handler {
pub fn new(db: PgPool) -> Self { pub fn new(
Handler { db } collection_ingestors: Vec<Box<dyn CollectionIngestor>>
) -> Self {
let mut ingestors_hashmap = HashMap::new();
for collection_ingestor in collection_ingestors {
ingestors_hashmap.insert(
collection_ingestor.collection(), collection_ingestor
);
}
Self { collection_ingestors: ingestors_hashmap }
}
pub fn get_collections(&self) -> Vec<String> {
self.collection_ingestors.keys().cloned().collect()
} }
} }
#[async_trait] #[async_trait]
impl EventHandler for Handler { impl EventHandler for Handler {
async fn handle_event(&self, event: JetstreamEvent) -> anyhow::Result<()> { async fn handle_event(&self, event: JetstreamEvent) -> anyhow::Result<()> {
// println!("Received event: {:?}", event); match event {
Ok(()) JetstreamEvent::Commit { did, time_us, commit, .. } => {
if let Some(ingestor) =
self.collection_ingestors.get(&commit.collection) {
ingestor.handle_commit(IngestorCommitData {
did, indexed_at: time_us, commit
}).await?;
}
Ok(())
},
JetstreamEvent::Delete { did, time_us, commit, .. } => {
if let Some(ingestor) =
self.collection_ingestors.get(&commit.collection) {
ingestor.handle_delete(IngestorDeleteData {
did, indexed_at: time_us, commit
}).await?;
}
Ok(())
},
_ => Ok(()),
}
} }
fn handler_id(&self) -> String { fn handler_id(&self) -> String {

View file

@ -1,7 +1,3 @@
use atproto_jetstream::{
Consumer,
ConsumerTaskConfig,
};
use sqlx::postgres::{PgPool, PgPoolOptions}; use sqlx::postgres::{PgPool, PgPoolOptions};
use std::time::Duration; use std::time::Duration;
@ -22,21 +18,3 @@ pub async fn create_db_pool(url: &str) -> PgPool {
.await .await
.expect("can't connect to database") .expect("can't connect to database")
} }
pub fn create_consumer(
collections: Vec<String>
) -> Consumer {
let jetstream_consumer_config = ConsumerTaskConfig {
user_agent: "my-app/1.0".to_string(),
compression: false,
zstd_dictionary_location: String::new(),
jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(),
collections: collections,
dids: vec![],
max_message_size_bytes: None,
cursor: None,
require_hello: false,
};
Consumer::new(jetstream_consumer_config)
}