diff --git a/Cargo.lock b/Cargo.lock index 9032f39..3283355 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1879,6 +1879,8 @@ dependencies = [ name = "rust" version = "0.1.0" dependencies = [ + "anyhow", + "async-trait", "atrium-api", "axum", "axum-macros", @@ -1890,6 +1892,8 @@ dependencies = [ "serde_json", "sqlx", "tokio", + "tracing", + "tracing-subscriber", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 4f4b2c9..4e7eb84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = "1.0.98" +async-trait = "0.1.88" atrium-api = { version = "0.25.2", default-features = false } axum = { version = "0.8.3", features = ["json"] } axum-macros = "0.5.0" @@ -16,6 +18,8 @@ serde = "1.0.219" serde_json = "1.0.140" sqlx = { version = "0.8.5", features = ["postgres", "runtime-tokio"] } tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] } +tracing = "0.1.41" +tracing-subscriber = "0.3.19" [build-dependencies] esquema-codegen = { git = "https://github.com/fatfingers23/esquema.git", branch = "main" } diff --git a/lexicons b/lexicons index 0c7e671..2fdba4f 160000 --- a/lexicons +++ b/lexicons @@ -1 +1 @@ -Subproject commit 0c7e67126c2eb46bd1901c2c329d1139747d7e89 +Subproject commit 2fdba4f2178a106418f48b7fd26d48b91aff58b1 diff --git a/src/atproto.rs b/src/atproto.rs index 0fdc930..10bf5d4 100644 --- a/src/atproto.rs +++ b/src/atproto.rs @@ -1,8 +1,5 @@ -use atrium_api::types::string::{ - AtIdentifier, - RecordKey, -}; -use regex::Regex; +use atrium_api::types::string::RecordKey; +// use regex::Regex; pub use atrium_api::types::string::{ Nsid, @@ -15,10 +12,10 @@ enum Authority { Handle(Handle), } -impl Authority { - pub fn new(authority: String) -> Result { - } -} +// impl Authority { +// pub fn new(authority: String) -> Result { +// } +// } // This implementation does not support Query or Fragments, and thus follows // the following schema: "at://" AUTHORITY [ "/" COLLECTION [ "/" RKEY ] ] @@ -29,42 +26,42 @@ pub struct Uri { } // TODO: Replace super basic URI regex with real uri parsing -const URI_REGEX: Regex = Regex::new( - r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i" - ).expect("valid regex"); - -impl Uri { - pub fn new(uri: String) -> Result { - let Some(captures) = URI_REGEX.captures(&uri) else { - return Err("Invalid Uri"); - }; - // TODO: Convert authority if its a did or a handle - let Some(Ok(authority)) = captures.get(1).map(|mtch| { - Authority::new(mtch.as_str().to_string()) - }) else { - return Err("Invalid Authority") - }; - let collection = captures.get(2).map(|mtch| { - Nsid::new(mtch.as_str().to_string()) - }); - let rkey = captures.get(3).map(|mtch| { - RecordKey::new(mtch.as_str().to_string()) - }); - Ok(Uri { authority, collection, rkey }) - } - - pub fn as_string(&self) -> String { - let mut uri = String::from("at://"); - uri.push_str(match &self.authority { - Authority::Handle(h) => &*h, - Authority::Did(d) => &*d, - }); - if let Some(nsid) = &self.collection { - uri.push_str(&*nsid); - } - if let Some(rkey) = &self.rkey { - uri.push_str(&*rkey); - } - uri - } -} +// const URI_REGEX: Regex = Regex::new( +// r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i" +// ).expect("valid regex"); +// +// impl Uri { +// pub fn new(uri: String) -> Result { +// let Some(captures) = URI_REGEX.captures(&uri) else { +// return Err("Invalid Uri"); +// }; +// // TODO: Convert authority if its a did or a handle +// let Some(Ok(authority)) = captures.get(1).map(|mtch| { +// Authority::new(mtch.as_str().to_string()) +// }) else { +// return Err("Invalid Authority") +// }; +// let collection = captures.get(2).map(|mtch| { +// Nsid::new(mtch.as_str().to_string()) +// }); +// let rkey = captures.get(3).map(|mtch| { +// RecordKey::new(mtch.as_str().to_string()) +// }); +// Ok(Uri { authority, collection, rkey }) +// } +// +// pub fn as_string(&self) -> String { +// let mut uri = String::from("at://"); +// uri.push_str(match &self.authority { +// Authority::Handle(h) => &*h, +// Authority::Did(d) => &*d, +// }); +// if let Some(nsid) = &self.collection { +// uri.push_str(&*nsid); +// } +// if let Some(rkey) = &self.rkey { +// uri.push_str(&*rkey); +// } +// uri +// } +// } diff --git a/src/ingestor.rs b/src/ingestor.rs new file mode 100644 index 0000000..72c6311 --- /dev/null +++ b/src/ingestor.rs @@ -0,0 +1,74 @@ +use crate::lexicons::my::spoor::log::{Activity, Session}; +use async_trait::async_trait; +use atrium_api::types::Collection; +use rocketman::{ + options::JetstreamOptions, + ingestion::LexiconIngestor, + // types::event::{Event, Operation}, + types::event::Event, + connection::JetstreamConnection, + handler, +}; +use serde_json::value::Value; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; +use tracing::{error, info}; +use anyhow::Result; + +enum Ingestor { + Jetstream(SpoorJetstream) +} + +struct SpoorJetstream; + +#[async_trait] +impl LexiconIngestor for SpoorJetstream { + async fn ingest(&self, message: Event) -> Result<()> { + info!("{:?}", message); + // if let Some(commit) = &message.commit { + // match commit.operation { + // Operation::Create | Operation::Update => {} + // Operation::Delete => {} + // } + // } else { + // return Err("Message has no commit"); + // } + Ok(()) + } +} + +pub async fn start_ingestor() { + let nsids = vec![ + Activity::NSID.to_string(), + Session::NSID.to_string() + ]; + let opts = JetstreamOptions::builder().wanted_collections(nsids.clone()).build(); + let jetstream = JetstreamConnection::new(opts); + + let mut ingesters: HashMap> = HashMap::new(); + for nsid in nsids { + ingesters.insert(nsid, Box::new(SpoorJetstream)); + } + + let cursor: Arc>> = Arc::new(Mutex::new(None)); + + let msg_rx = jetstream.get_msg_rx(); + let reconnect_tx = jetstream.get_reconnect_tx(); + + let cursor_clone = cursor.clone(); + tokio::spawn(async move { + while let Ok(message) = msg_rx.recv_async().await { + if let Err(e) = handler::handle_message(message, &ingesters, + reconnect_tx.clone(), cursor_clone.clone()).await { + error!("Error processing message: {}", e); + } + } + }); + + if let Err(e) = jetstream.connect(cursor.clone()).await { + error!("Failed to connect to Jetstream: {}", e); + std::process::exit(1); + } +} diff --git a/src/injester.rs b/src/injester.rs deleted file mode 100644 index 722390e..0000000 --- a/src/injester.rs +++ /dev/null @@ -1,30 +0,0 @@ -use crate::db::Db; -use rocketman::{ - options::JetstreamOptions - ingestion::LexiconIngestor -}; - -enum Injester { - Jetstream(Jetstream) -} - -struct SpoorJetstream; - -impl LexiconIngestor for SpoorJetstream { - async fn ingest(&self, message: Event) -> Result<()> { - if let Some(commit) = &message.commit { - match commit.operation { - Operation::Create | Operation::Update => { - - } - } - } else { - return Err("Message has no commit"); - } - Ok(()) - } -} - -pub async fn start_ingester(db: Db) { - let opts = JetstreamOptions::builder() -} diff --git a/src/lexicons/my/spoor/content.rs b/src/lexicons/my/spoor/content.rs index 5d9b80b..08a442c 100644 --- a/src/lexicons/my/spoor/content.rs +++ b/src/lexicons/my/spoor/content.rs @@ -1,8 +1,8 @@ // @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT. //!Definitions for the `my.spoor.content` namespace. -pub mod defs; pub mod external; pub mod media; +pub mod title; #[derive(Debug)] pub struct External; impl atrium_api::types::Collection for External { diff --git a/src/lexicons/my/spoor/content/external.rs b/src/lexicons/my/spoor/content/external.rs index 5a916ee..505af91 100644 --- a/src/lexicons/my/spoor/content/external.rs +++ b/src/lexicons/my/spoor/content/external.rs @@ -9,10 +9,8 @@ pub struct RecordData { pub overrides: core::option::Option>, ///All the data needed to query the content from the source pub queryable: atrium_api::types::Union, - ///An nsid for a specific data source. The domain authority governs how to process the queryable - pub source: atrium_api::types::string::Nsid, #[serde(skip_serializing_if = "core::option::Option::is_none")] - pub titles: core::option::Option>, + pub titles: core::option::Option>, } pub type Record = atrium_api::types::Object; impl From for RecordData { @@ -21,8 +19,18 @@ impl From for RecordData { } } #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct Tvdbv4Data { + pub id: i64, + pub r#type: String, +} +pub type Tvdbv4 = atrium_api::types::Object; +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(tag = "$type")] pub enum RecordOverridesRefs {} #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(tag = "$type")] -pub enum RecordQueryableRefs {} +pub enum RecordQueryableRefs { + #[serde(rename = "my.spoor.content.external#tvdbv4")] + Tvdbv4(Box), +} diff --git a/src/lexicons/my/spoor/content/media.rs b/src/lexicons/my/spoor/content/media.rs index aa1cd3b..8b008f4 100644 --- a/src/lexicons/my/spoor/content/media.rs +++ b/src/lexicons/my/spoor/content/media.rs @@ -12,7 +12,7 @@ pub struct RecordData { pub last_updated: atrium_api::types::string::Datetime, #[serde(skip_serializing_if = "core::option::Option::is_none")] pub poster_image: core::option::Option, - pub titles: Vec, + pub titles: Vec, } pub type Record = atrium_api::types::Object; impl From for RecordData { @@ -38,7 +38,7 @@ pub type Television = atrium_api::types::Object; #[serde(tag = "$type")] pub enum RecordDurationDataRefs { #[serde(rename = "my.spoor.content.media#television")] - MySpoorContentMediaTelevision(Box), + MySpoorContentMediaTelevision(Box), #[serde(rename = "my.spoor.content.media#book")] - MySpoorContentMediaBook(Box), + MySpoorContentMediaBook(Box), } diff --git a/src/lexicons/my/spoor/content/defs.rs b/src/lexicons/my/spoor/content/title.rs similarity index 68% rename from src/lexicons/my/spoor/content/defs.rs rename to src/lexicons/my/spoor/content/title.rs index 69d3607..2fb3ea6 100644 --- a/src/lexicons/my/spoor/content/defs.rs +++ b/src/lexicons/my/spoor/content/title.rs @@ -1,8 +1,8 @@ // @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT. -//!Definitions for the `my.spoor.content.defs` namespace. +//!Definitions for the `my.spoor.content.title` namespace. #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] -pub struct TitleData { +pub struct MainData { #[serde(skip_serializing_if = "core::option::Option::is_none")] pub lang: core::option::Option, #[serde(skip_serializing_if = "core::option::Option::is_none")] @@ -10,10 +10,10 @@ pub struct TitleData { #[serde(skip_serializing_if = "core::option::Option::is_none")] pub value: core::option::Option, } -pub type Title = atrium_api::types::Object; +pub type Main = atrium_api::types::Object; ///A phonetic transcription of the native title such that when read it will sound like the native title. -pub const TITLE_HOMOPHONIC: &str = "my.spoor.content.defs#titleHomophonic"; +pub const TITLE_HOMOPHONIC: &str = "my.spoor.content.title#titleHomophonic"; ///The title in its native script -pub const TITLE_NATIVE: &str = "my.spoor.content.defs#titleNative"; +pub const TITLE_NATIVE: &str = "my.spoor.content.title#titleNative"; ///A translation of the title -pub const TITLE_TRANSLATION: &str = "my.spoor.content.defs#titleTranslation"; +pub const TITLE_TRANSLATION: &str = "my.spoor.content.title#titleTranslation"; diff --git a/src/lexicons/my/spoor/log/activity.rs b/src/lexicons/my/spoor/log/activity.rs index cbf8882..0f6f2ee 100644 --- a/src/lexicons/my/spoor/log/activity.rs +++ b/src/lexicons/my/spoor/log/activity.rs @@ -10,7 +10,7 @@ pub struct RecordData { #[serde(skip_serializing_if = "core::option::Option::is_none")] pub performed_at: core::option::Option, pub progress: atrium_api::types::Union, - pub session: crate::com::atproto::repo::strong_ref::Main, + pub session: atrium_api::com::atproto::repo::strong_ref::Main, } pub type Record = atrium_api::types::Object; impl From for RecordData { @@ -18,9 +18,16 @@ impl From for RecordData { Self::try_from_unknown(value).unwrap() } } +///The index of the content consumed. Content must be indexable +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct ProgressIndexData { + pub index: i64, +} +pub type ProgressIndex = atrium_api::types::Object; #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(tag = "$type")] pub enum RecordProgressRefs { - #[serde(rename = "my.spoor.log.activity#indexProgress")] - MySpoorLogActivityIndexProgress(Box), + #[serde(rename = "my.spoor.log.activity#progressIndex")] + ProgressIndex(Box), } diff --git a/src/lexicons/my/spoor/log/session.rs b/src/lexicons/my/spoor/log/session.rs index cf5d0d5..ce61cb2 100644 --- a/src/lexicons/my/spoor/log/session.rs +++ b/src/lexicons/my/spoor/log/session.rs @@ -4,7 +4,7 @@ use atrium_api::types::TryFromUnknown; #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct RecordData { - pub content: crate::com::atproto::repo::strong_ref::Main, + pub content: atrium_api::com::atproto::repo::strong_ref::Main, ///Client-declared timestamp for when this activity was created pub created_at: atrium_api::types::string::Datetime, #[serde(skip_serializing_if = "core::option::Option::is_none")] diff --git a/src/main.rs b/src/main.rs index 993e55f..dd97114 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use crate::{ atproto::Nsid, - injester::start_injester, + ingestor::start_ingestor, router::{ Router, Endpoint, @@ -15,19 +15,23 @@ use crate::{ use http::status::StatusCode; mod atproto; -mod injester; +mod ingestor; +mod lexicons; mod router; -mod db; +// mod db; #[tokio::main] async fn main() { + let subscriber = tracing_subscriber::FmtSubscriber::new(); + tracing::subscriber::set_global_default(subscriber); + let mut router = Router::new(); let get_nsid = Nsid::new(String::from("me.woach.get")).expect("me.woach.get is a valid nsid"); let post_nsid = Nsid::new(String::from("me.woach.post")).expect("me.woach.post is a valid nsid"); router = router.add_endpoint(Endpoint::new_xrpc_query(get_nsid, test)); router = router.add_endpoint(Endpoint::new_xrpc_procedure(post_nsid, test2)); tokio::spawn(async move { - start_injester(); + start_ingestor(); }); router.serve().await; }