diff --git a/Cargo.lock b/Cargo.lock index 3283355..9032f39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1879,8 +1879,6 @@ dependencies = [ name = "rust" version = "0.1.0" dependencies = [ - "anyhow", - "async-trait", "atrium-api", "axum", "axum-macros", @@ -1892,8 +1890,6 @@ dependencies = [ "serde_json", "sqlx", "tokio", - "tracing", - "tracing-subscriber", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 4e7eb84..4f4b2c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,8 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -anyhow = "1.0.98" -async-trait = "0.1.88" atrium-api = { version = "0.25.2", default-features = false } axum = { version = "0.8.3", features = ["json"] } axum-macros = "0.5.0" @@ -18,8 +16,6 @@ serde = "1.0.219" serde_json = "1.0.140" sqlx = { version = "0.8.5", features = ["postgres", "runtime-tokio"] } tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] } -tracing = "0.1.41" -tracing-subscriber = "0.3.19" [build-dependencies] esquema-codegen = { git = "https://github.com/fatfingers23/esquema.git", branch = "main" } diff --git a/lexicons b/lexicons index 2fdba4f..0c7e671 160000 --- a/lexicons +++ b/lexicons @@ -1 +1 @@ -Subproject commit 2fdba4f2178a106418f48b7fd26d48b91aff58b1 +Subproject commit 0c7e67126c2eb46bd1901c2c329d1139747d7e89 diff --git a/src/atproto.rs b/src/atproto.rs index 10bf5d4..0fdc930 100644 --- a/src/atproto.rs +++ b/src/atproto.rs @@ -1,5 +1,8 @@ -use atrium_api::types::string::RecordKey; -// use regex::Regex; +use atrium_api::types::string::{ + AtIdentifier, + RecordKey, +}; +use regex::Regex; pub use atrium_api::types::string::{ Nsid, @@ -12,10 +15,10 @@ enum Authority { Handle(Handle), } -// impl Authority { -// pub fn new(authority: String) -> Result { -// } -// } +impl Authority { + pub fn new(authority: String) -> Result { + } +} // This implementation does not support Query or Fragments, and thus follows // the following schema: "at://" AUTHORITY [ "/" COLLECTION [ "/" RKEY ] ] @@ -26,42 +29,42 @@ pub struct Uri { } // TODO: Replace super basic URI regex with real uri parsing -// const URI_REGEX: Regex = Regex::new( -// r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i" -// ).expect("valid regex"); -// -// impl Uri { -// pub fn new(uri: String) -> Result { -// let Some(captures) = URI_REGEX.captures(&uri) else { -// return Err("Invalid Uri"); -// }; -// // TODO: Convert authority if its a did or a handle -// let Some(Ok(authority)) = captures.get(1).map(|mtch| { -// Authority::new(mtch.as_str().to_string()) -// }) else { -// return Err("Invalid Authority") -// }; -// let collection = captures.get(2).map(|mtch| { -// Nsid::new(mtch.as_str().to_string()) -// }); -// let rkey = captures.get(3).map(|mtch| { -// RecordKey::new(mtch.as_str().to_string()) -// }); -// Ok(Uri { authority, collection, rkey }) -// } -// -// pub fn as_string(&self) -> String { -// let mut uri = String::from("at://"); -// uri.push_str(match &self.authority { -// Authority::Handle(h) => &*h, -// Authority::Did(d) => &*d, -// }); -// if let Some(nsid) = &self.collection { -// uri.push_str(&*nsid); -// } -// if let Some(rkey) = &self.rkey { -// uri.push_str(&*rkey); -// } -// uri -// } -// } +const URI_REGEX: Regex = Regex::new( + r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i" + ).expect("valid regex"); + +impl Uri { + pub fn new(uri: String) -> Result { + let Some(captures) = URI_REGEX.captures(&uri) else { + return Err("Invalid Uri"); + }; + // TODO: Convert authority if its a did or a handle + let Some(Ok(authority)) = captures.get(1).map(|mtch| { + Authority::new(mtch.as_str().to_string()) + }) else { + return Err("Invalid Authority") + }; + let collection = captures.get(2).map(|mtch| { + Nsid::new(mtch.as_str().to_string()) + }); + let rkey = captures.get(3).map(|mtch| { + RecordKey::new(mtch.as_str().to_string()) + }); + Ok(Uri { authority, collection, rkey }) + } + + pub fn as_string(&self) -> String { + let mut uri = String::from("at://"); + uri.push_str(match &self.authority { + Authority::Handle(h) => &*h, + Authority::Did(d) => &*d, + }); + if let Some(nsid) = &self.collection { + uri.push_str(&*nsid); + } + if let Some(rkey) = &self.rkey { + uri.push_str(&*rkey); + } + uri + } +} diff --git a/src/ingestor.rs b/src/ingestor.rs deleted file mode 100644 index 72c6311..0000000 --- a/src/ingestor.rs +++ /dev/null @@ -1,74 +0,0 @@ -use crate::lexicons::my::spoor::log::{Activity, Session}; -use async_trait::async_trait; -use atrium_api::types::Collection; -use rocketman::{ - options::JetstreamOptions, - ingestion::LexiconIngestor, - // types::event::{Event, Operation}, - types::event::Event, - connection::JetstreamConnection, - handler, -}; -use serde_json::value::Value; -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; -use tracing::{error, info}; -use anyhow::Result; - -enum Ingestor { - Jetstream(SpoorJetstream) -} - -struct SpoorJetstream; - -#[async_trait] -impl LexiconIngestor for SpoorJetstream { - async fn ingest(&self, message: Event) -> Result<()> { - info!("{:?}", message); - // if let Some(commit) = &message.commit { - // match commit.operation { - // Operation::Create | Operation::Update => {} - // Operation::Delete => {} - // } - // } else { - // return Err("Message has no commit"); - // } - Ok(()) - } -} - -pub async fn start_ingestor() { - let nsids = vec![ - Activity::NSID.to_string(), - Session::NSID.to_string() - ]; - let opts = JetstreamOptions::builder().wanted_collections(nsids.clone()).build(); - let jetstream = JetstreamConnection::new(opts); - - let mut ingesters: HashMap> = HashMap::new(); - for nsid in nsids { - ingesters.insert(nsid, Box::new(SpoorJetstream)); - } - - let cursor: Arc>> = Arc::new(Mutex::new(None)); - - let msg_rx = jetstream.get_msg_rx(); - let reconnect_tx = jetstream.get_reconnect_tx(); - - let cursor_clone = cursor.clone(); - tokio::spawn(async move { - while let Ok(message) = msg_rx.recv_async().await { - if let Err(e) = handler::handle_message(message, &ingesters, - reconnect_tx.clone(), cursor_clone.clone()).await { - error!("Error processing message: {}", e); - } - } - }); - - if let Err(e) = jetstream.connect(cursor.clone()).await { - error!("Failed to connect to Jetstream: {}", e); - std::process::exit(1); - } -} diff --git a/src/injester.rs b/src/injester.rs new file mode 100644 index 0000000..722390e --- /dev/null +++ b/src/injester.rs @@ -0,0 +1,30 @@ +use crate::db::Db; +use rocketman::{ + options::JetstreamOptions + ingestion::LexiconIngestor +}; + +enum Injester { + Jetstream(Jetstream) +} + +struct SpoorJetstream; + +impl LexiconIngestor for SpoorJetstream { + async fn ingest(&self, message: Event) -> Result<()> { + if let Some(commit) = &message.commit { + match commit.operation { + Operation::Create | Operation::Update => { + + } + } + } else { + return Err("Message has no commit"); + } + Ok(()) + } +} + +pub async fn start_ingester(db: Db) { + let opts = JetstreamOptions::builder() +} diff --git a/src/lexicons/my/spoor/content.rs b/src/lexicons/my/spoor/content.rs index 08a442c..5d9b80b 100644 --- a/src/lexicons/my/spoor/content.rs +++ b/src/lexicons/my/spoor/content.rs @@ -1,8 +1,8 @@ // @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT. //!Definitions for the `my.spoor.content` namespace. +pub mod defs; pub mod external; pub mod media; -pub mod title; #[derive(Debug)] pub struct External; impl atrium_api::types::Collection for External { diff --git a/src/lexicons/my/spoor/content/title.rs b/src/lexicons/my/spoor/content/defs.rs similarity index 68% rename from src/lexicons/my/spoor/content/title.rs rename to src/lexicons/my/spoor/content/defs.rs index 2fb3ea6..69d3607 100644 --- a/src/lexicons/my/spoor/content/title.rs +++ b/src/lexicons/my/spoor/content/defs.rs @@ -1,8 +1,8 @@ // @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT. -//!Definitions for the `my.spoor.content.title` namespace. +//!Definitions for the `my.spoor.content.defs` namespace. #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] -pub struct MainData { +pub struct TitleData { #[serde(skip_serializing_if = "core::option::Option::is_none")] pub lang: core::option::Option, #[serde(skip_serializing_if = "core::option::Option::is_none")] @@ -10,10 +10,10 @@ pub struct MainData { #[serde(skip_serializing_if = "core::option::Option::is_none")] pub value: core::option::Option, } -pub type Main = atrium_api::types::Object; +pub type Title = atrium_api::types::Object; ///A phonetic transcription of the native title such that when read it will sound like the native title. -pub const TITLE_HOMOPHONIC: &str = "my.spoor.content.title#titleHomophonic"; +pub const TITLE_HOMOPHONIC: &str = "my.spoor.content.defs#titleHomophonic"; ///The title in its native script -pub const TITLE_NATIVE: &str = "my.spoor.content.title#titleNative"; +pub const TITLE_NATIVE: &str = "my.spoor.content.defs#titleNative"; ///A translation of the title -pub const TITLE_TRANSLATION: &str = "my.spoor.content.title#titleTranslation"; +pub const TITLE_TRANSLATION: &str = "my.spoor.content.defs#titleTranslation"; diff --git a/src/lexicons/my/spoor/content/external.rs b/src/lexicons/my/spoor/content/external.rs index 505af91..5a916ee 100644 --- a/src/lexicons/my/spoor/content/external.rs +++ b/src/lexicons/my/spoor/content/external.rs @@ -9,8 +9,10 @@ pub struct RecordData { pub overrides: core::option::Option>, ///All the data needed to query the content from the source pub queryable: atrium_api::types::Union, + ///An nsid for a specific data source. The domain authority governs how to process the queryable + pub source: atrium_api::types::string::Nsid, #[serde(skip_serializing_if = "core::option::Option::is_none")] - pub titles: core::option::Option>, + pub titles: core::option::Option>, } pub type Record = atrium_api::types::Object; impl From for RecordData { @@ -19,18 +21,8 @@ impl From for RecordData { } } #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] -#[serde(rename_all = "camelCase")] -pub struct Tvdbv4Data { - pub id: i64, - pub r#type: String, -} -pub type Tvdbv4 = atrium_api::types::Object; -#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(tag = "$type")] pub enum RecordOverridesRefs {} #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(tag = "$type")] -pub enum RecordQueryableRefs { - #[serde(rename = "my.spoor.content.external#tvdbv4")] - Tvdbv4(Box), -} +pub enum RecordQueryableRefs {} diff --git a/src/lexicons/my/spoor/content/media.rs b/src/lexicons/my/spoor/content/media.rs index 8b008f4..aa1cd3b 100644 --- a/src/lexicons/my/spoor/content/media.rs +++ b/src/lexicons/my/spoor/content/media.rs @@ -12,7 +12,7 @@ pub struct RecordData { pub last_updated: atrium_api::types::string::Datetime, #[serde(skip_serializing_if = "core::option::Option::is_none")] pub poster_image: core::option::Option, - pub titles: Vec, + pub titles: Vec, } pub type Record = atrium_api::types::Object; impl From for RecordData { @@ -38,7 +38,7 @@ pub type Television = atrium_api::types::Object; #[serde(tag = "$type")] pub enum RecordDurationDataRefs { #[serde(rename = "my.spoor.content.media#television")] - MySpoorContentMediaTelevision(Box), + MySpoorContentMediaTelevision(Box), #[serde(rename = "my.spoor.content.media#book")] - MySpoorContentMediaBook(Box), + MySpoorContentMediaBook(Box), } diff --git a/src/lexicons/my/spoor/log/activity.rs b/src/lexicons/my/spoor/log/activity.rs index 0f6f2ee..cbf8882 100644 --- a/src/lexicons/my/spoor/log/activity.rs +++ b/src/lexicons/my/spoor/log/activity.rs @@ -10,7 +10,7 @@ pub struct RecordData { #[serde(skip_serializing_if = "core::option::Option::is_none")] pub performed_at: core::option::Option, pub progress: atrium_api::types::Union, - pub session: atrium_api::com::atproto::repo::strong_ref::Main, + pub session: crate::com::atproto::repo::strong_ref::Main, } pub type Record = atrium_api::types::Object; impl From for RecordData { @@ -18,16 +18,9 @@ impl From for RecordData { Self::try_from_unknown(value).unwrap() } } -///The index of the content consumed. Content must be indexable -#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] -#[serde(rename_all = "camelCase")] -pub struct ProgressIndexData { - pub index: i64, -} -pub type ProgressIndex = atrium_api::types::Object; #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(tag = "$type")] pub enum RecordProgressRefs { - #[serde(rename = "my.spoor.log.activity#progressIndex")] - ProgressIndex(Box), + #[serde(rename = "my.spoor.log.activity#indexProgress")] + MySpoorLogActivityIndexProgress(Box), } diff --git a/src/lexicons/my/spoor/log/session.rs b/src/lexicons/my/spoor/log/session.rs index ce61cb2..cf5d0d5 100644 --- a/src/lexicons/my/spoor/log/session.rs +++ b/src/lexicons/my/spoor/log/session.rs @@ -4,7 +4,7 @@ use atrium_api::types::TryFromUnknown; #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct RecordData { - pub content: atrium_api::com::atproto::repo::strong_ref::Main, + pub content: crate::com::atproto::repo::strong_ref::Main, ///Client-declared timestamp for when this activity was created pub created_at: atrium_api::types::string::Datetime, #[serde(skip_serializing_if = "core::option::Option::is_none")] diff --git a/src/main.rs b/src/main.rs index dd97114..993e55f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use crate::{ atproto::Nsid, - ingestor::start_ingestor, + injester::start_injester, router::{ Router, Endpoint, @@ -15,23 +15,19 @@ use crate::{ use http::status::StatusCode; mod atproto; -mod ingestor; -mod lexicons; +mod injester; mod router; -// mod db; +mod db; #[tokio::main] async fn main() { - let subscriber = tracing_subscriber::FmtSubscriber::new(); - tracing::subscriber::set_global_default(subscriber); - let mut router = Router::new(); let get_nsid = Nsid::new(String::from("me.woach.get")).expect("me.woach.get is a valid nsid"); let post_nsid = Nsid::new(String::from("me.woach.post")).expect("me.woach.post is a valid nsid"); router = router.add_endpoint(Endpoint::new_xrpc_query(get_nsid, test)); router = router.add_endpoint(Endpoint::new_xrpc_procedure(post_nsid, test2)); tokio::spawn(async move { - start_ingestor(); + start_injester(); }); router.serve().await; }