Jetstream, Tracing, basic ingestor + tracing
I'm mainly just commiting this so I can work on my laptop, but I'm too tired to manage branches and then later rebasing things
This commit is contained in:
parent
c0c90c3001
commit
028219ded1
6 changed files with 135 additions and 82 deletions
4
Cargo.lock
generated
4
Cargo.lock
generated
|
|
@ -1879,6 +1879,8 @@ dependencies = [
|
|||
name = "rust"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"atrium-api",
|
||||
"axum",
|
||||
"axum-macros",
|
||||
|
|
@ -1890,6 +1892,8 @@ dependencies = [
|
|||
"serde_json",
|
||||
"sqlx",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
|||
|
|
@ -6,6 +6,8 @@ edition = "2021"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.98"
|
||||
async-trait = "0.1.88"
|
||||
atrium-api = { version = "0.25.2", default-features = false }
|
||||
axum = { version = "0.8.3", features = ["json"] }
|
||||
axum-macros = "0.5.0"
|
||||
|
|
@ -16,6 +18,8 @@ serde = "1.0.219"
|
|||
serde_json = "1.0.140"
|
||||
sqlx = { version = "0.8.5", features = ["postgres", "runtime-tokio"] }
|
||||
tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] }
|
||||
tracing = "0.1.41"
|
||||
tracing-subscriber = "0.3.19"
|
||||
|
||||
[build-dependencies]
|
||||
esquema-codegen = { git = "https://github.com/fatfingers23/esquema.git", branch = "main" }
|
||||
|
|
|
|||
|
|
@ -1,8 +1,5 @@
|
|||
use atrium_api::types::string::{
|
||||
AtIdentifier,
|
||||
RecordKey,
|
||||
};
|
||||
use regex::Regex;
|
||||
use atrium_api::types::string::RecordKey;
|
||||
// use regex::Regex;
|
||||
|
||||
pub use atrium_api::types::string::{
|
||||
Nsid,
|
||||
|
|
@ -15,10 +12,10 @@ enum Authority {
|
|||
Handle(Handle),
|
||||
}
|
||||
|
||||
impl Authority {
|
||||
pub fn new(authority: String) -> Result<Self, &'static str> {
|
||||
}
|
||||
}
|
||||
// impl Authority {
|
||||
// pub fn new(authority: String) -> Result<Self, &'static str> {
|
||||
// }
|
||||
// }
|
||||
|
||||
// This implementation does not support Query or Fragments, and thus follows
|
||||
// the following schema: "at://" AUTHORITY [ "/" COLLECTION [ "/" RKEY ] ]
|
||||
|
|
@ -29,42 +26,42 @@ pub struct Uri {
|
|||
}
|
||||
|
||||
// TODO: Replace super basic URI regex with real uri parsing
|
||||
const URI_REGEX: Regex = Regex::new(
|
||||
r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i"
|
||||
).expect("valid regex");
|
||||
|
||||
impl Uri {
|
||||
pub fn new(uri: String) -> Result<Self, &'static str> {
|
||||
let Some(captures) = URI_REGEX.captures(&uri) else {
|
||||
return Err("Invalid Uri");
|
||||
};
|
||||
// TODO: Convert authority if its a did or a handle
|
||||
let Some(Ok(authority)) = captures.get(1).map(|mtch| {
|
||||
Authority::new(mtch.as_str().to_string())
|
||||
}) else {
|
||||
return Err("Invalid Authority")
|
||||
};
|
||||
let collection = captures.get(2).map(|mtch| {
|
||||
Nsid::new(mtch.as_str().to_string())
|
||||
});
|
||||
let rkey = captures.get(3).map(|mtch| {
|
||||
RecordKey::new(mtch.as_str().to_string())
|
||||
});
|
||||
Ok(Uri { authority, collection, rkey })
|
||||
}
|
||||
|
||||
pub fn as_string(&self) -> String {
|
||||
let mut uri = String::from("at://");
|
||||
uri.push_str(match &self.authority {
|
||||
Authority::Handle(h) => &*h,
|
||||
Authority::Did(d) => &*d,
|
||||
});
|
||||
if let Some(nsid) = &self.collection {
|
||||
uri.push_str(&*nsid);
|
||||
}
|
||||
if let Some(rkey) = &self.rkey {
|
||||
uri.push_str(&*rkey);
|
||||
}
|
||||
uri
|
||||
}
|
||||
}
|
||||
// const URI_REGEX: Regex = Regex::new(
|
||||
// r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i"
|
||||
// ).expect("valid regex");
|
||||
//
|
||||
// impl Uri {
|
||||
// pub fn new(uri: String) -> Result<Self, &'static str> {
|
||||
// let Some(captures) = URI_REGEX.captures(&uri) else {
|
||||
// return Err("Invalid Uri");
|
||||
// };
|
||||
// // TODO: Convert authority if its a did or a handle
|
||||
// let Some(Ok(authority)) = captures.get(1).map(|mtch| {
|
||||
// Authority::new(mtch.as_str().to_string())
|
||||
// }) else {
|
||||
// return Err("Invalid Authority")
|
||||
// };
|
||||
// let collection = captures.get(2).map(|mtch| {
|
||||
// Nsid::new(mtch.as_str().to_string())
|
||||
// });
|
||||
// let rkey = captures.get(3).map(|mtch| {
|
||||
// RecordKey::new(mtch.as_str().to_string())
|
||||
// });
|
||||
// Ok(Uri { authority, collection, rkey })
|
||||
// }
|
||||
//
|
||||
// pub fn as_string(&self) -> String {
|
||||
// let mut uri = String::from("at://");
|
||||
// uri.push_str(match &self.authority {
|
||||
// Authority::Handle(h) => &*h,
|
||||
// Authority::Did(d) => &*d,
|
||||
// });
|
||||
// if let Some(nsid) = &self.collection {
|
||||
// uri.push_str(&*nsid);
|
||||
// }
|
||||
// if let Some(rkey) = &self.rkey {
|
||||
// uri.push_str(&*rkey);
|
||||
// }
|
||||
// uri
|
||||
// }
|
||||
// }
|
||||
|
|
|
|||
74
src/ingestor.rs
Normal file
74
src/ingestor.rs
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
use crate::lexicons::my::spoor::log::{Activity, Session};
|
||||
use async_trait::async_trait;
|
||||
use atrium_api::types::Collection;
|
||||
use rocketman::{
|
||||
options::JetstreamOptions,
|
||||
ingestion::LexiconIngestor,
|
||||
// types::event::{Event, Operation},
|
||||
types::event::Event,
|
||||
connection::JetstreamConnection,
|
||||
handler,
|
||||
};
|
||||
use serde_json::value::Value;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use tracing::{error, info};
|
||||
use anyhow::Result;
|
||||
|
||||
enum Ingestor {
|
||||
Jetstream(SpoorJetstream)
|
||||
}
|
||||
|
||||
struct SpoorJetstream;
|
||||
|
||||
#[async_trait]
|
||||
impl LexiconIngestor for SpoorJetstream {
|
||||
async fn ingest(&self, message: Event<Value>) -> Result<()> {
|
||||
info!("{:?}", message);
|
||||
// if let Some(commit) = &message.commit {
|
||||
// match commit.operation {
|
||||
// Operation::Create | Operation::Update => {}
|
||||
// Operation::Delete => {}
|
||||
// }
|
||||
// } else {
|
||||
// return Err("Message has no commit");
|
||||
// }
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_ingestor() {
|
||||
let nsids = vec![
|
||||
Activity::NSID.to_string(),
|
||||
Session::NSID.to_string()
|
||||
];
|
||||
let opts = JetstreamOptions::builder().wanted_collections(nsids.clone()).build();
|
||||
let jetstream = JetstreamConnection::new(opts);
|
||||
|
||||
let mut ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
|
||||
for nsid in nsids {
|
||||
ingesters.insert(nsid, Box::new(SpoorJetstream));
|
||||
}
|
||||
|
||||
let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
|
||||
|
||||
let msg_rx = jetstream.get_msg_rx();
|
||||
let reconnect_tx = jetstream.get_reconnect_tx();
|
||||
|
||||
let cursor_clone = cursor.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Ok(message) = msg_rx.recv_async().await {
|
||||
if let Err(e) = handler::handle_message(message, &ingesters,
|
||||
reconnect_tx.clone(), cursor_clone.clone()).await {
|
||||
error!("Error processing message: {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if let Err(e) = jetstream.connect(cursor.clone()).await {
|
||||
error!("Failed to connect to Jetstream: {}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,30 +0,0 @@
|
|||
use crate::db::Db;
|
||||
use rocketman::{
|
||||
options::JetstreamOptions
|
||||
ingestion::LexiconIngestor
|
||||
};
|
||||
|
||||
enum Injester {
|
||||
Jetstream(Jetstream)
|
||||
}
|
||||
|
||||
struct SpoorJetstream;
|
||||
|
||||
impl LexiconIngestor for SpoorJetstream {
|
||||
async fn ingest(&self, message: Event<Value>) -> Result<()> {
|
||||
if let Some(commit) = &message.commit {
|
||||
match commit.operation {
|
||||
Operation::Create | Operation::Update => {
|
||||
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err("Message has no commit");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_ingester(db: Db) {
|
||||
let opts = JetstreamOptions::builder()
|
||||
}
|
||||
12
src/main.rs
12
src/main.rs
|
|
@ -1,6 +1,6 @@
|
|||
use crate::{
|
||||
atproto::Nsid,
|
||||
injester::start_injester,
|
||||
ingestor::start_ingestor,
|
||||
router::{
|
||||
Router,
|
||||
Endpoint,
|
||||
|
|
@ -15,19 +15,23 @@ use crate::{
|
|||
use http::status::StatusCode;
|
||||
|
||||
mod atproto;
|
||||
mod injester;
|
||||
mod ingestor;
|
||||
mod lexicons;
|
||||
mod router;
|
||||
mod db;
|
||||
// mod db;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let subscriber = tracing_subscriber::FmtSubscriber::new();
|
||||
tracing::subscriber::set_global_default(subscriber);
|
||||
|
||||
let mut router = Router::new();
|
||||
let get_nsid = Nsid::new(String::from("me.woach.get")).expect("me.woach.get is a valid nsid");
|
||||
let post_nsid = Nsid::new(String::from("me.woach.post")).expect("me.woach.post is a valid nsid");
|
||||
router = router.add_endpoint(Endpoint::new_xrpc_query(get_nsid, test));
|
||||
router = router.add_endpoint(Endpoint::new_xrpc_procedure(post_nsid, test2));
|
||||
tokio::spawn(async move {
|
||||
start_injester();
|
||||
start_ingestor();
|
||||
});
|
||||
router.serve().await;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue