Initialize database; Add consumer; split off server

This commit is contained in:
Julia Lange 2025-10-27 15:24:46 -07:00
parent ec8799c7fc
commit aa8f931862
Signed by: Julia
SSH key fingerprint: SHA256:50XUMcOFYPUs9/1j7p9SPnwASZ7QnxXm7THF7HkbqzQ
9 changed files with 1632 additions and 30 deletions

1486
backend/Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -4,7 +4,9 @@ version = "0.1.0"
edition = "2024"
[dependencies]
anyhow = "1.0.100"
async-trait = "0.1.89"
atproto-jetstream = "0.12.0"
axum = { version = "0.8.6", features = ["json"] }
serde = "1.0.228"
serde_json = "1.0.145"

View file

@ -0,0 +1,41 @@
-- Add migration script here
CREATE TABLE actor (
did VARCHAR PRIMARY KEY,
handle VARCHAR UNIQUE,
indexed_at VARCHAR NOT NULL
);
CREATE TABLE event (
uri VARCHAR PRIMARY KEY,
cid VARCHAR NOT NULL,
organizer VARCHAR NOT NULL,
summary VARCHAR,
description VARCHAR,
location VARCHAR,
status VARCHAR,
timezone VARCHAR,
starts_at_original VARCHAR NOT NULL,
starts_at TIMESTAMPTZ NOT NULL,
ends_ats VARCHAR,
created_at VARCHAR,
indexed_at VARCHAR NOT NULL
);
CREATE INDEX idx_event_starts_at ON event(starts_at);
CREATE TABLE rsvp (
uri VARCHAR PRIMARY KEY,
cid VARCHAR NOT NULL,
attendee VARCHAR NOT NULL,
event_uri VARCHAR NOT NULL,
event_cid VARCHAR NOT NULL,
status VARCHAR NOT NULL DEFAULT 'GOING',
indexed_at VARCHAR NOT NULL
);
CREATE INDEX idx_rsvp_event_uri ON rsvp(event_uri);
CREATE INDEX idx_rsvp_attendee_event ON rsvp(attendee, event_uri);

View file

@ -0,0 +1,23 @@
use backend::{
create_db_pool,
create_consumer,
jetstream::Handler,
};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let db_connection_str = std::env::var("DATABASE_URL").unwrap();
let pool = create_db_pool(&db_connection_str).await;
let jetstream_consumer = create_consumer(vec![
"app.bsky.feed.post".to_string()
]);
jetstream_consumer.register_handler(std::sync::Arc::new(
Handler::new(pool.clone())
)).await?;
let jetstream_cancel_token = atproto_jetstream::CancellationToken::new();
jetstream_consumer.run_background(jetstream_cancel_token).await
}

35
backend/src/bin/server.rs Normal file
View file

@ -0,0 +1,35 @@
use backend::{
create_db_pool,
internal_error,
};
use axum::{
Router,
extract::State,
http::StatusCode,
routing::get,
};
use sqlx::postgres::PgPool;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let db_connection_str = std::env::var("DATABASE_URL").unwrap();
let pool = create_db_pool(&db_connection_str).await;
let app = Router::new()
.route("/", get(root))
.with_state(pool);
let listener = tokio::net::TcpListener::bind("0.0.0.0:4142").await.unwrap();
Ok(axum::serve(listener, app).await?)
}
async fn root(
State(pool): State<PgPool>,
) -> Result<String, (StatusCode, String)> {
sqlx::query_scalar("select 'hello world from pg'")
.fetch_one(&pool)
.await
.map_err(internal_error)
}

29
backend/src/jetstream.rs Normal file
View file

@ -0,0 +1,29 @@
use atproto_jetstream::{
EventHandler,
JetstreamEvent,
};
use async_trait::async_trait;
use sqlx::postgres::PgPool;
pub struct Handler {
db: PgPool,
}
impl Handler {
pub fn new(db: PgPool) -> Self {
Handler { db }
}
}
#[async_trait]
impl EventHandler for Handler {
async fn handle_event(&self, event: JetstreamEvent) -> anyhow::Result<()> {
// println!("Received event: {:?}", event);
Ok(())
}
fn handler_id(&self) -> String {
"my-handler".to_string()
}
}

42
backend/src/lib.rs Normal file
View file

@ -0,0 +1,42 @@
use atproto_jetstream::{
Consumer,
ConsumerTaskConfig,
};
use sqlx::postgres::{PgPool, PgPoolOptions};
use std::time::Duration;
pub mod jetstream;
pub fn internal_error<E>(err: E) -> (axum::http::StatusCode, String)
where
E: std::error::Error,
{
(axum::http::StatusCode::INTERNAL_SERVER_ERROR, err.to_string())
}
pub async fn create_db_pool(url: &str) -> PgPool {
PgPoolOptions::new()
.max_connections(5)
.acquire_timeout(Duration::from_secs(3))
.connect(&url)
.await
.expect("can't connect to database")
}
pub fn create_consumer(
collections: Vec<String>
) -> Consumer {
let jetstream_consumer_config = ConsumerTaskConfig {
user_agent: "my-app/1.0".to_string(),
compression: false,
zstd_dictionary_location: String::new(),
jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(),
collections: collections,
dids: vec![],
max_message_size_bytes: None,
cursor: None,
require_hello: false,
};
Consumer::new(jetstream_consumer_config)
}

View file

@ -1,3 +0,0 @@
fn main() {
println!("Hello, world!");
}

View file

@ -26,6 +26,7 @@
rust-analyzer
rustfmt
clippy
sqlx-cli
];
RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}";