Compare commits

..

No commits in common. "028219ded1aeaf38712711c8d61b6f4f728427c4" and "ec141315d10d56df9276ebeb8f1900425e4ab49e" have entirely different histories.

13 changed files with 101 additions and 169 deletions

4
Cargo.lock generated
View file

@ -1879,8 +1879,6 @@ dependencies = [
name = "rust" name = "rust"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"async-trait",
"atrium-api", "atrium-api",
"axum", "axum",
"axum-macros", "axum-macros",
@ -1892,8 +1890,6 @@ dependencies = [
"serde_json", "serde_json",
"sqlx", "sqlx",
"tokio", "tokio",
"tracing",
"tracing-subscriber",
] ]
[[package]] [[package]]

View file

@ -6,8 +6,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
anyhow = "1.0.98"
async-trait = "0.1.88"
atrium-api = { version = "0.25.2", default-features = false } atrium-api = { version = "0.25.2", default-features = false }
axum = { version = "0.8.3", features = ["json"] } axum = { version = "0.8.3", features = ["json"] }
axum-macros = "0.5.0" axum-macros = "0.5.0"
@ -18,8 +16,6 @@ serde = "1.0.219"
serde_json = "1.0.140" serde_json = "1.0.140"
sqlx = { version = "0.8.5", features = ["postgres", "runtime-tokio"] } sqlx = { version = "0.8.5", features = ["postgres", "runtime-tokio"] }
tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] } tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] }
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
[build-dependencies] [build-dependencies]
esquema-codegen = { git = "https://github.com/fatfingers23/esquema.git", branch = "main" } esquema-codegen = { git = "https://github.com/fatfingers23/esquema.git", branch = "main" }

@ -1 +1 @@
Subproject commit 2fdba4f2178a106418f48b7fd26d48b91aff58b1 Subproject commit 0c7e67126c2eb46bd1901c2c329d1139747d7e89

View file

@ -1,5 +1,8 @@
use atrium_api::types::string::RecordKey; use atrium_api::types::string::{
// use regex::Regex; AtIdentifier,
RecordKey,
};
use regex::Regex;
pub use atrium_api::types::string::{ pub use atrium_api::types::string::{
Nsid, Nsid,
@ -12,10 +15,10 @@ enum Authority {
Handle(Handle), Handle(Handle),
} }
// impl Authority { impl Authority {
// pub fn new(authority: String) -> Result<Self, &'static str> { pub fn new(authority: String) -> Result<Self, &'static str> {
// } }
// } }
// This implementation does not support Query or Fragments, and thus follows // This implementation does not support Query or Fragments, and thus follows
// the following schema: "at://" AUTHORITY [ "/" COLLECTION [ "/" RKEY ] ] // the following schema: "at://" AUTHORITY [ "/" COLLECTION [ "/" RKEY ] ]
@ -26,42 +29,42 @@ pub struct Uri {
} }
// TODO: Replace super basic URI regex with real uri parsing // TODO: Replace super basic URI regex with real uri parsing
// const URI_REGEX: Regex = Regex::new( const URI_REGEX: Regex = Regex::new(
// r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i" r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i"
// ).expect("valid regex"); ).expect("valid regex");
//
// impl Uri { impl Uri {
// pub fn new(uri: String) -> Result<Self, &'static str> { pub fn new(uri: String) -> Result<Self, &'static str> {
// let Some(captures) = URI_REGEX.captures(&uri) else { let Some(captures) = URI_REGEX.captures(&uri) else {
// return Err("Invalid Uri"); return Err("Invalid Uri");
// }; };
// // TODO: Convert authority if its a did or a handle // TODO: Convert authority if its a did or a handle
// let Some(Ok(authority)) = captures.get(1).map(|mtch| { let Some(Ok(authority)) = captures.get(1).map(|mtch| {
// Authority::new(mtch.as_str().to_string()) Authority::new(mtch.as_str().to_string())
// }) else { }) else {
// return Err("Invalid Authority") return Err("Invalid Authority")
// }; };
// let collection = captures.get(2).map(|mtch| { let collection = captures.get(2).map(|mtch| {
// Nsid::new(mtch.as_str().to_string()) Nsid::new(mtch.as_str().to_string())
// }); });
// let rkey = captures.get(3).map(|mtch| { let rkey = captures.get(3).map(|mtch| {
// RecordKey::new(mtch.as_str().to_string()) RecordKey::new(mtch.as_str().to_string())
// }); });
// Ok(Uri { authority, collection, rkey }) Ok(Uri { authority, collection, rkey })
// } }
//
// pub fn as_string(&self) -> String { pub fn as_string(&self) -> String {
// let mut uri = String::from("at://"); let mut uri = String::from("at://");
// uri.push_str(match &self.authority { uri.push_str(match &self.authority {
// Authority::Handle(h) => &*h, Authority::Handle(h) => &*h,
// Authority::Did(d) => &*d, Authority::Did(d) => &*d,
// }); });
// if let Some(nsid) = &self.collection { if let Some(nsid) = &self.collection {
// uri.push_str(&*nsid); uri.push_str(&*nsid);
// } }
// if let Some(rkey) = &self.rkey { if let Some(rkey) = &self.rkey {
// uri.push_str(&*rkey); uri.push_str(&*rkey);
// } }
// uri uri
// } }
// } }

View file

@ -1,74 +0,0 @@
use crate::lexicons::my::spoor::log::{Activity, Session};
use async_trait::async_trait;
use atrium_api::types::Collection;
use rocketman::{
options::JetstreamOptions,
ingestion::LexiconIngestor,
// types::event::{Event, Operation},
types::event::Event,
connection::JetstreamConnection,
handler,
};
use serde_json::value::Value;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use tracing::{error, info};
use anyhow::Result;
enum Ingestor {
Jetstream(SpoorJetstream)
}
struct SpoorJetstream;
#[async_trait]
impl LexiconIngestor for SpoorJetstream {
async fn ingest(&self, message: Event<Value>) -> Result<()> {
info!("{:?}", message);
// if let Some(commit) = &message.commit {
// match commit.operation {
// Operation::Create | Operation::Update => {}
// Operation::Delete => {}
// }
// } else {
// return Err("Message has no commit");
// }
Ok(())
}
}
pub async fn start_ingestor() {
let nsids = vec![
Activity::NSID.to_string(),
Session::NSID.to_string()
];
let opts = JetstreamOptions::builder().wanted_collections(nsids.clone()).build();
let jetstream = JetstreamConnection::new(opts);
let mut ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
for nsid in nsids {
ingesters.insert(nsid, Box::new(SpoorJetstream));
}
let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
let msg_rx = jetstream.get_msg_rx();
let reconnect_tx = jetstream.get_reconnect_tx();
let cursor_clone = cursor.clone();
tokio::spawn(async move {
while let Ok(message) = msg_rx.recv_async().await {
if let Err(e) = handler::handle_message(message, &ingesters,
reconnect_tx.clone(), cursor_clone.clone()).await {
error!("Error processing message: {}", e);
}
}
});
if let Err(e) = jetstream.connect(cursor.clone()).await {
error!("Failed to connect to Jetstream: {}", e);
std::process::exit(1);
}
}

30
src/injester.rs Normal file
View file

@ -0,0 +1,30 @@
use crate::db::Db;
use rocketman::{
options::JetstreamOptions
ingestion::LexiconIngestor
};
enum Injester {
Jetstream(Jetstream)
}
struct SpoorJetstream;
impl LexiconIngestor for SpoorJetstream {
async fn ingest(&self, message: Event<Value>) -> Result<()> {
if let Some(commit) = &message.commit {
match commit.operation {
Operation::Create | Operation::Update => {
}
}
} else {
return Err("Message has no commit");
}
Ok(())
}
}
pub async fn start_ingester(db: Db) {
let opts = JetstreamOptions::builder()
}

View file

@ -1,8 +1,8 @@
// @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT. // @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT.
//!Definitions for the `my.spoor.content` namespace. //!Definitions for the `my.spoor.content` namespace.
pub mod defs;
pub mod external; pub mod external;
pub mod media; pub mod media;
pub mod title;
#[derive(Debug)] #[derive(Debug)]
pub struct External; pub struct External;
impl atrium_api::types::Collection for External { impl atrium_api::types::Collection for External {

View file

@ -1,8 +1,8 @@
// @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT. // @generated - This file is generated by esquema-codegen (forked from atrium-codegen). DO NOT EDIT.
//!Definitions for the `my.spoor.content.title` namespace. //!Definitions for the `my.spoor.content.defs` namespace.
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct MainData { pub struct TitleData {
#[serde(skip_serializing_if = "core::option::Option::is_none")] #[serde(skip_serializing_if = "core::option::Option::is_none")]
pub lang: core::option::Option<atrium_api::types::string::Language>, pub lang: core::option::Option<atrium_api::types::string::Language>,
#[serde(skip_serializing_if = "core::option::Option::is_none")] #[serde(skip_serializing_if = "core::option::Option::is_none")]
@ -10,10 +10,10 @@ pub struct MainData {
#[serde(skip_serializing_if = "core::option::Option::is_none")] #[serde(skip_serializing_if = "core::option::Option::is_none")]
pub value: core::option::Option<String>, pub value: core::option::Option<String>,
} }
pub type Main = atrium_api::types::Object<MainData>; pub type Title = atrium_api::types::Object<TitleData>;
///A phonetic transcription of the native title such that when read it will sound like the native title. ///A phonetic transcription of the native title such that when read it will sound like the native title.
pub const TITLE_HOMOPHONIC: &str = "my.spoor.content.title#titleHomophonic"; pub const TITLE_HOMOPHONIC: &str = "my.spoor.content.defs#titleHomophonic";
///The title in its native script ///The title in its native script
pub const TITLE_NATIVE: &str = "my.spoor.content.title#titleNative"; pub const TITLE_NATIVE: &str = "my.spoor.content.defs#titleNative";
///A translation of the title ///A translation of the title
pub const TITLE_TRANSLATION: &str = "my.spoor.content.title#titleTranslation"; pub const TITLE_TRANSLATION: &str = "my.spoor.content.defs#titleTranslation";

View file

@ -9,8 +9,10 @@ pub struct RecordData {
pub overrides: core::option::Option<atrium_api::types::Union<RecordOverridesRefs>>, pub overrides: core::option::Option<atrium_api::types::Union<RecordOverridesRefs>>,
///All the data needed to query the content from the source ///All the data needed to query the content from the source
pub queryable: atrium_api::types::Union<RecordQueryableRefs>, pub queryable: atrium_api::types::Union<RecordQueryableRefs>,
///An nsid for a specific data source. The domain authority governs how to process the queryable
pub source: atrium_api::types::string::Nsid,
#[serde(skip_serializing_if = "core::option::Option::is_none")] #[serde(skip_serializing_if = "core::option::Option::is_none")]
pub titles: core::option::Option<Vec<crate::lexicons::my::spoor::content::title::Main>>, pub titles: core::option::Option<Vec<crate::my::spoor::content::defs::Title>>,
} }
pub type Record = atrium_api::types::Object<RecordData>; pub type Record = atrium_api::types::Object<RecordData>;
impl From<atrium_api::types::Unknown> for RecordData { impl From<atrium_api::types::Unknown> for RecordData {
@ -19,18 +21,8 @@ impl From<atrium_api::types::Unknown> for RecordData {
} }
} }
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Tvdbv4Data {
pub id: i64,
pub r#type: String,
}
pub type Tvdbv4 = atrium_api::types::Object<Tvdbv4Data>;
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(tag = "$type")] #[serde(tag = "$type")]
pub enum RecordOverridesRefs {} pub enum RecordOverridesRefs {}
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(tag = "$type")] #[serde(tag = "$type")]
pub enum RecordQueryableRefs { pub enum RecordQueryableRefs {}
#[serde(rename = "my.spoor.content.external#tvdbv4")]
Tvdbv4(Box<Tvdbv4>),
}

View file

@ -12,7 +12,7 @@ pub struct RecordData {
pub last_updated: atrium_api::types::string::Datetime, pub last_updated: atrium_api::types::string::Datetime,
#[serde(skip_serializing_if = "core::option::Option::is_none")] #[serde(skip_serializing_if = "core::option::Option::is_none")]
pub poster_image: core::option::Option<atrium_api::types::BlobRef>, pub poster_image: core::option::Option<atrium_api::types::BlobRef>,
pub titles: Vec<crate::lexicons::my::spoor::content::title::Main>, pub titles: Vec<crate::my::spoor::content::defs::Title>,
} }
pub type Record = atrium_api::types::Object<RecordData>; pub type Record = atrium_api::types::Object<RecordData>;
impl From<atrium_api::types::Unknown> for RecordData { impl From<atrium_api::types::Unknown> for RecordData {
@ -38,7 +38,7 @@ pub type Television = atrium_api::types::Object<TelevisionData>;
#[serde(tag = "$type")] #[serde(tag = "$type")]
pub enum RecordDurationDataRefs { pub enum RecordDurationDataRefs {
#[serde(rename = "my.spoor.content.media#television")] #[serde(rename = "my.spoor.content.media#television")]
MySpoorContentMediaTelevision(Box<crate::lexicons::my::spoor::content::media::Television>), MySpoorContentMediaTelevision(Box<crate::my::spoor::content::media::Television>),
#[serde(rename = "my.spoor.content.media#book")] #[serde(rename = "my.spoor.content.media#book")]
MySpoorContentMediaBook(Box<crate::lexicons::my::spoor::content::media::Book>), MySpoorContentMediaBook(Box<crate::my::spoor::content::media::Book>),
} }

View file

@ -10,7 +10,7 @@ pub struct RecordData {
#[serde(skip_serializing_if = "core::option::Option::is_none")] #[serde(skip_serializing_if = "core::option::Option::is_none")]
pub performed_at: core::option::Option<atrium_api::types::string::Datetime>, pub performed_at: core::option::Option<atrium_api::types::string::Datetime>,
pub progress: atrium_api::types::Union<RecordProgressRefs>, pub progress: atrium_api::types::Union<RecordProgressRefs>,
pub session: atrium_api::com::atproto::repo::strong_ref::Main, pub session: crate::com::atproto::repo::strong_ref::Main,
} }
pub type Record = atrium_api::types::Object<RecordData>; pub type Record = atrium_api::types::Object<RecordData>;
impl From<atrium_api::types::Unknown> for RecordData { impl From<atrium_api::types::Unknown> for RecordData {
@ -18,16 +18,9 @@ impl From<atrium_api::types::Unknown> for RecordData {
Self::try_from_unknown(value).unwrap() Self::try_from_unknown(value).unwrap()
} }
} }
///The index of the content consumed. Content must be indexable
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct ProgressIndexData {
pub index: i64,
}
pub type ProgressIndex = atrium_api::types::Object<ProgressIndexData>;
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(tag = "$type")] #[serde(tag = "$type")]
pub enum RecordProgressRefs { pub enum RecordProgressRefs {
#[serde(rename = "my.spoor.log.activity#progressIndex")] #[serde(rename = "my.spoor.log.activity#indexProgress")]
ProgressIndex(Box<ProgressIndex>), MySpoorLogActivityIndexProgress(Box<crate::my::spoor::log::activity::IndexProgress>),
} }

View file

@ -4,7 +4,7 @@ use atrium_api::types::TryFromUnknown;
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct RecordData { pub struct RecordData {
pub content: atrium_api::com::atproto::repo::strong_ref::Main, pub content: crate::com::atproto::repo::strong_ref::Main,
///Client-declared timestamp for when this activity was created ///Client-declared timestamp for when this activity was created
pub created_at: atrium_api::types::string::Datetime, pub created_at: atrium_api::types::string::Datetime,
#[serde(skip_serializing_if = "core::option::Option::is_none")] #[serde(skip_serializing_if = "core::option::Option::is_none")]

View file

@ -1,6 +1,6 @@
use crate::{ use crate::{
atproto::Nsid, atproto::Nsid,
ingestor::start_ingestor, injester::start_injester,
router::{ router::{
Router, Router,
Endpoint, Endpoint,
@ -15,23 +15,19 @@ use crate::{
use http::status::StatusCode; use http::status::StatusCode;
mod atproto; mod atproto;
mod ingestor; mod injester;
mod lexicons;
mod router; mod router;
// mod db; mod db;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber);
let mut router = Router::new(); let mut router = Router::new();
let get_nsid = Nsid::new(String::from("me.woach.get")).expect("me.woach.get is a valid nsid"); let get_nsid = Nsid::new(String::from("me.woach.get")).expect("me.woach.get is a valid nsid");
let post_nsid = Nsid::new(String::from("me.woach.post")).expect("me.woach.post is a valid nsid"); let post_nsid = Nsid::new(String::from("me.woach.post")).expect("me.woach.post is a valid nsid");
router = router.add_endpoint(Endpoint::new_xrpc_query(get_nsid, test)); router = router.add_endpoint(Endpoint::new_xrpc_query(get_nsid, test));
router = router.add_endpoint(Endpoint::new_xrpc_procedure(post_nsid, test2)); router = router.add_endpoint(Endpoint::new_xrpc_procedure(post_nsid, test2));
tokio::spawn(async move { tokio::spawn(async move {
start_ingestor(); start_injester();
}); });
router.serve().await; router.serve().await;
} }