diff --git a/Cargo.lock b/Cargo.lock index 9032f39..3283355 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1879,6 +1879,8 @@ dependencies = [ name = "rust" version = "0.1.0" dependencies = [ + "anyhow", + "async-trait", "atrium-api", "axum", "axum-macros", @@ -1890,6 +1892,8 @@ dependencies = [ "serde_json", "sqlx", "tokio", + "tracing", + "tracing-subscriber", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 4f4b2c9..4e7eb84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = "1.0.98" +async-trait = "0.1.88" atrium-api = { version = "0.25.2", default-features = false } axum = { version = "0.8.3", features = ["json"] } axum-macros = "0.5.0" @@ -16,6 +18,8 @@ serde = "1.0.219" serde_json = "1.0.140" sqlx = { version = "0.8.5", features = ["postgres", "runtime-tokio"] } tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] } +tracing = "0.1.41" +tracing-subscriber = "0.3.19" [build-dependencies] esquema-codegen = { git = "https://github.com/fatfingers23/esquema.git", branch = "main" } diff --git a/src/atproto.rs b/src/atproto.rs index 0fdc930..10bf5d4 100644 --- a/src/atproto.rs +++ b/src/atproto.rs @@ -1,8 +1,5 @@ -use atrium_api::types::string::{ - AtIdentifier, - RecordKey, -}; -use regex::Regex; +use atrium_api::types::string::RecordKey; +// use regex::Regex; pub use atrium_api::types::string::{ Nsid, @@ -15,10 +12,10 @@ enum Authority { Handle(Handle), } -impl Authority { - pub fn new(authority: String) -> Result { - } -} +// impl Authority { +// pub fn new(authority: String) -> Result { +// } +// } // This implementation does not support Query or Fragments, and thus follows // the following schema: "at://" AUTHORITY [ "/" COLLECTION [ "/" RKEY ] ] @@ -29,42 +26,42 @@ pub struct Uri { } // TODO: Replace super basic URI regex with real uri parsing -const URI_REGEX: Regex = Regex::new( - r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i" - ).expect("valid regex"); - -impl Uri { - pub fn new(uri: String) -> Result { - let Some(captures) = URI_REGEX.captures(&uri) else { - return Err("Invalid Uri"); - }; - // TODO: Convert authority if its a did or a handle - let Some(Ok(authority)) = captures.get(1).map(|mtch| { - Authority::new(mtch.as_str().to_string()) - }) else { - return Err("Invalid Authority") - }; - let collection = captures.get(2).map(|mtch| { - Nsid::new(mtch.as_str().to_string()) - }); - let rkey = captures.get(3).map(|mtch| { - RecordKey::new(mtch.as_str().to_string()) - }); - Ok(Uri { authority, collection, rkey }) - } - - pub fn as_string(&self) -> String { - let mut uri = String::from("at://"); - uri.push_str(match &self.authority { - Authority::Handle(h) => &*h, - Authority::Did(d) => &*d, - }); - if let Some(nsid) = &self.collection { - uri.push_str(&*nsid); - } - if let Some(rkey) = &self.rkey { - uri.push_str(&*rkey); - } - uri - } -} +// const URI_REGEX: Regex = Regex::new( +// r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i" +// ).expect("valid regex"); +// +// impl Uri { +// pub fn new(uri: String) -> Result { +// let Some(captures) = URI_REGEX.captures(&uri) else { +// return Err("Invalid Uri"); +// }; +// // TODO: Convert authority if its a did or a handle +// let Some(Ok(authority)) = captures.get(1).map(|mtch| { +// Authority::new(mtch.as_str().to_string()) +// }) else { +// return Err("Invalid Authority") +// }; +// let collection = captures.get(2).map(|mtch| { +// Nsid::new(mtch.as_str().to_string()) +// }); +// let rkey = captures.get(3).map(|mtch| { +// RecordKey::new(mtch.as_str().to_string()) +// }); +// Ok(Uri { authority, collection, rkey }) +// } +// +// pub fn as_string(&self) -> String { +// let mut uri = String::from("at://"); +// uri.push_str(match &self.authority { +// Authority::Handle(h) => &*h, +// Authority::Did(d) => &*d, +// }); +// if let Some(nsid) = &self.collection { +// uri.push_str(&*nsid); +// } +// if let Some(rkey) = &self.rkey { +// uri.push_str(&*rkey); +// } +// uri +// } +// } diff --git a/src/ingestor.rs b/src/ingestor.rs new file mode 100644 index 0000000..72c6311 --- /dev/null +++ b/src/ingestor.rs @@ -0,0 +1,74 @@ +use crate::lexicons::my::spoor::log::{Activity, Session}; +use async_trait::async_trait; +use atrium_api::types::Collection; +use rocketman::{ + options::JetstreamOptions, + ingestion::LexiconIngestor, + // types::event::{Event, Operation}, + types::event::Event, + connection::JetstreamConnection, + handler, +}; +use serde_json::value::Value; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; +use tracing::{error, info}; +use anyhow::Result; + +enum Ingestor { + Jetstream(SpoorJetstream) +} + +struct SpoorJetstream; + +#[async_trait] +impl LexiconIngestor for SpoorJetstream { + async fn ingest(&self, message: Event) -> Result<()> { + info!("{:?}", message); + // if let Some(commit) = &message.commit { + // match commit.operation { + // Operation::Create | Operation::Update => {} + // Operation::Delete => {} + // } + // } else { + // return Err("Message has no commit"); + // } + Ok(()) + } +} + +pub async fn start_ingestor() { + let nsids = vec![ + Activity::NSID.to_string(), + Session::NSID.to_string() + ]; + let opts = JetstreamOptions::builder().wanted_collections(nsids.clone()).build(); + let jetstream = JetstreamConnection::new(opts); + + let mut ingesters: HashMap> = HashMap::new(); + for nsid in nsids { + ingesters.insert(nsid, Box::new(SpoorJetstream)); + } + + let cursor: Arc>> = Arc::new(Mutex::new(None)); + + let msg_rx = jetstream.get_msg_rx(); + let reconnect_tx = jetstream.get_reconnect_tx(); + + let cursor_clone = cursor.clone(); + tokio::spawn(async move { + while let Ok(message) = msg_rx.recv_async().await { + if let Err(e) = handler::handle_message(message, &ingesters, + reconnect_tx.clone(), cursor_clone.clone()).await { + error!("Error processing message: {}", e); + } + } + }); + + if let Err(e) = jetstream.connect(cursor.clone()).await { + error!("Failed to connect to Jetstream: {}", e); + std::process::exit(1); + } +} diff --git a/src/injester.rs b/src/injester.rs deleted file mode 100644 index 722390e..0000000 --- a/src/injester.rs +++ /dev/null @@ -1,30 +0,0 @@ -use crate::db::Db; -use rocketman::{ - options::JetstreamOptions - ingestion::LexiconIngestor -}; - -enum Injester { - Jetstream(Jetstream) -} - -struct SpoorJetstream; - -impl LexiconIngestor for SpoorJetstream { - async fn ingest(&self, message: Event) -> Result<()> { - if let Some(commit) = &message.commit { - match commit.operation { - Operation::Create | Operation::Update => { - - } - } - } else { - return Err("Message has no commit"); - } - Ok(()) - } -} - -pub async fn start_ingester(db: Db) { - let opts = JetstreamOptions::builder() -} diff --git a/src/main.rs b/src/main.rs index 993e55f..dd97114 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use crate::{ atproto::Nsid, - injester::start_injester, + ingestor::start_ingestor, router::{ Router, Endpoint, @@ -15,19 +15,23 @@ use crate::{ use http::status::StatusCode; mod atproto; -mod injester; +mod ingestor; +mod lexicons; mod router; -mod db; +// mod db; #[tokio::main] async fn main() { + let subscriber = tracing_subscriber::FmtSubscriber::new(); + tracing::subscriber::set_global_default(subscriber); + let mut router = Router::new(); let get_nsid = Nsid::new(String::from("me.woach.get")).expect("me.woach.get is a valid nsid"); let post_nsid = Nsid::new(String::from("me.woach.post")).expect("me.woach.post is a valid nsid"); router = router.add_endpoint(Endpoint::new_xrpc_query(get_nsid, test)); router = router.add_endpoint(Endpoint::new_xrpc_procedure(post_nsid, test2)); tokio::spawn(async move { - start_injester(); + start_ingestor(); }); router.serve().await; }