Compare commits

..

No commits in common. "9adc67cf4d87f3b07b526e7eb3d13460b899a576" and "b2618ab8b66de5f238c9d09cee9e4ea1f1624663" have entirely different histories.

28 changed files with 633 additions and 443 deletions

738
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,11 +1,25 @@
[workspace] [package]
resolver = "3" name = "rust"
members = [ "api", "atproto","db", "ingestor"] version = "0.1.0"
edition = "2021"
[workspace.dependencies] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
atproto = { path = "./atproto" }
[dependencies]
anyhow = "1.0.98"
async-trait = "0.1.88"
atrium-api = { version = "0.25.2", default-features = false }
axum = { version = "0.8.3", features = ["json"] }
axum-macros = "0.5.0"
http = "1.3.1"
regex = "1.11.1"
rocketman = "0.2.0"
serde = "1.0.219" serde = "1.0.219"
serde_json = "1.0.140" serde_json = "1.0.140"
tokio = { version = "1.45.0", features = ["macros", "rt-multi-thread"] } sqlx = { version = "0.8.5", features = ["postgres", "runtime-tokio"] }
tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] }
tracing = "0.1.41" tracing = "0.1.41"
tracing-subscriber = "0.3.19" tracing-subscriber = "0.3.19"
[build-dependencies]
esquema-codegen = { git = "https://github.com/fatfingers23/esquema.git", branch = "main" }

View file

@ -1,14 +0,0 @@
[package]
name = "api"
version = "0.1.0"
edition = "2024"
[dependencies]
atproto.workspace = true
axum = { version = "0.8.3", features = ["json"] }
http = "1.3.1"
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true

View file

@ -1,12 +0,0 @@
[package]
name = "atproto"
version = "0.1.0"
edition = "2024"
[dependencies]
atrium-api = { version = "0.25.3", default-features = false }
lazy-regex = "3.4.1"
serde.workspace = true
serde_json.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true

View file

@ -1,60 +0,0 @@
use lazy_regex::regex_captures;
use core::str::FromStr;
pub use atrium_api::types::{
Collection,
string::{
Nsid,
RecordKey,
AtIdentifier as Authority,
}
};
pub mod lexicons;
pub struct Uri {
whole: String,
// These fields could be useful in the future,
// so I'm leaving the code for them.
// authority: Authority,
// collection: Option<Nsid>,
// rkey: Option<RecordKey>,
}
impl FromStr for Uri {
type Err = &'static str;
fn from_str(uri: &str) -> Result<Self, Self::Err> {
if uri.len() > 8000 {
return Err("Uri too long")
}
let Some((
whole, unchecked_authority, unchecked_collection, unchecked_rkey
)) = regex_captures!(
r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i",
uri,
) else {
return Err("Invalid Uri");
};
// This parsing is required, but the values don't need to be used yet.
// No compute cost to use them, just storage cost
let _authority = Authority::from_str(unchecked_authority)?;
let _collection = if unchecked_collection.is_empty() { None }
else { Some(Nsid::new(unchecked_collection.to_string())?) };
let _rkey = if unchecked_rkey.is_empty() { None }
else { Some(RecordKey::new(unchecked_rkey.to_string())?) };
// Ok(Uri{ whole: whole.to_string(), authority, collection, rkey })
Ok(Uri { whole: whole.to_string() })
}
}
impl Uri {
pub fn as_str(&self) -> &str {
self.whole.as_str()
}
}

View file

@ -1,8 +0,0 @@
[package]
name = "db"
version = "0.1.0"
edition = "2024"
[dependencies]
sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] }
tokio = "1.45.0"

View file

@ -1,15 +0,0 @@
use atproto::{
Did,
Uri,
};
pub struct User {
userdid: Did,
handle: Handle,
}
struct Participant {
participantdid: Did,
role: Role,
}

View file

@ -1 +0,0 @@
pub struct db;

View file

@ -1,15 +0,0 @@
[package]
name = "ingestor"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow = "1.0.98"
async-trait = "0.1.88"
atproto.workspace = true
rocketman = "0.2.0"
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true

View file

@ -1,25 +0,0 @@
use rocketman::types::event::Event;
use anyhow::Result;
enum Ingestor {
Jetstream(SpoorJetstream)
}
struct SpoorJetstream;
#[async_trait]
impl LexiconIngestor for SpoorJetstream {
async fn ingest(&self, message: Event<Value>) -> Result<()> {
info!("{:?}", message);
// if let Some(commit) = &message.commit {
// match commit.operation {
// Operation::Create | Operation::Update => {}
// Operation::Delete => {}
// }
// } else {
// return Err("Message has no commit");
// }
Ok(())
}
}

View file

@ -1,8 +0,0 @@
#[tokio::main]
async fn main() {
let subscriber = tracing_subscriber::FmtSubscriber::new();
let _ = tracing::subscriber::set_global_default(subscriber);
}

67
src/atproto.rs Normal file
View file

@ -0,0 +1,67 @@
use atrium_api::types::string::RecordKey;
// use regex::Regex;
pub use atrium_api::types::string::{
Nsid,
Did,
Handle,
};
enum Authority {
Did(Did),
Handle(Handle),
}
// impl Authority {
// pub fn new(authority: String) -> Result<Self, &'static str> {
// }
// }
// This implementation does not support Query or Fragments, and thus follows
// the following schema: "at://" AUTHORITY [ "/" COLLECTION [ "/" RKEY ] ]
pub struct Uri {
authority: Authority,
collection: Option<Nsid>,
rkey: Option<RecordKey>,
}
// TODO: Replace super basic URI regex with real uri parsing
// const URI_REGEX: Regex = Regex::new(
// r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i"
// ).expect("valid regex");
//
// impl Uri {
// pub fn new(uri: String) -> Result<Self, &'static str> {
// let Some(captures) = URI_REGEX.captures(&uri) else {
// return Err("Invalid Uri");
// };
// // TODO: Convert authority if its a did or a handle
// let Some(Ok(authority)) = captures.get(1).map(|mtch| {
// Authority::new(mtch.as_str().to_string())
// }) else {
// return Err("Invalid Authority")
// };
// let collection = captures.get(2).map(|mtch| {
// Nsid::new(mtch.as_str().to_string())
// });
// let rkey = captures.get(3).map(|mtch| {
// RecordKey::new(mtch.as_str().to_string())
// });
// Ok(Uri { authority, collection, rkey })
// }
//
// pub fn as_string(&self) -> String {
// let mut uri = String::from("at://");
// uri.push_str(match &self.authority {
// Authority::Handle(h) => &*h,
// Authority::Did(d) => &*d,
// });
// if let Some(nsid) = &self.collection {
// uri.push_str(&*nsid);
// }
// if let Some(rkey) = &self.rkey {
// uri.push_str(&*rkey);
// }
// uri
// }
// }

View file

@ -1,3 +1,7 @@
use crate::atproto::{
Did,
Uri,
}
use sqlx::{ use sqlx::{
query, query,
Database, Database,
@ -16,6 +20,11 @@ pub struct Db<Dbimp: Database> {
pool: Pool<Dbimp> pool: Pool<Dbimp>
} }
pub struct User {
userdid: Did,
handle: Handle,
}
#[non_exhaustive] #[non_exhaustive]
enum Role { enum Role {
Owner, Owner,
@ -31,6 +40,11 @@ impl ToString for Role {
} }
} }
struct Participant {
participantdid: Did,
role: Role,
}
pub struct Session { pub struct Session {
sessionuri: Uri, sessionuri: Uri,
label: Option<String>, label: Option<String>,

View file

@ -1,8 +1,6 @@
use atproto::{ use crate::lexicons::my::spoor::log::{Activity, Session};
Collection,
lexicons::my::spoor::log::{Activity, Session},
};
use async_trait::async_trait; use async_trait::async_trait;
use atrium_api::types::Collection;
use rocketman::{ use rocketman::{
options::JetstreamOptions, options::JetstreamOptions,
ingestion::LexiconIngestor, ingestion::LexiconIngestor,
@ -17,6 +15,29 @@ use std::{
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
use tracing::{error, info}; use tracing::{error, info};
use anyhow::Result;
enum Ingestor {
Jetstream(SpoorJetstream)
}
struct SpoorJetstream;
#[async_trait]
impl LexiconIngestor for SpoorJetstream {
async fn ingest(&self, message: Event<Value>) -> Result<()> {
info!("{:?}", message);
// if let Some(commit) = &message.commit {
// match commit.operation {
// Operation::Create | Operation::Update => {}
// Operation::Delete => {}
// }
// } else {
// return Err("Message has no commit");
// }
Ok(())
}
}
pub async fn start_ingestor() { pub async fn start_ingestor() {
info!("Starting ingestor"); info!("Starting ingestor");

View file

@ -1,17 +1,24 @@
use crate::router::{ use crate::{
Router, atproto::Nsid,
Endpoint, ingestor::start_ingestor,
xrpc::{ router::{
QueryInput, Router,
ProcedureInput, Endpoint,
Response, xrpc::{
error, QueryInput,
ProcedureInput,
Response,
error,
},
}, },
}; };
use atproto::Nsid;
use http::status::StatusCode; use http::status::StatusCode;
mod atproto;
mod ingestor;
mod lexicons;
mod router; mod router;
// mod db;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
@ -23,6 +30,9 @@ async fn main() {
let post_nsid = Nsid::new(String::from("me.woach.post")).expect("me.woach.post is a valid nsid"); let post_nsid = Nsid::new(String::from("me.woach.post")).expect("me.woach.post is a valid nsid");
router = router.add_endpoint(Endpoint::new_xrpc_query(get_nsid, test)); router = router.add_endpoint(Endpoint::new_xrpc_query(get_nsid, test));
router = router.add_endpoint(Endpoint::new_xrpc_procedure(post_nsid, test2)); router = router.add_endpoint(Endpoint::new_xrpc_procedure(post_nsid, test2));
tokio::spawn(async move {
start_ingestor().await;
});
router.serve().await; router.serve().await;
} }

View file

@ -1,10 +1,12 @@
use crate::router::xrpc::{ use crate::{
XrpcEndpoint, atproto::Nsid,
XrpcHandler, router::xrpc::{
QueryInput, XrpcEndpoint,
ProcedureInput, XrpcHandler,
QueryInput,
ProcedureInput,
}
}; };
use atproto::Nsid;
use axum::Router as AxumRouter; use axum::Router as AxumRouter;
use core::net::SocketAddr; use core::net::SocketAddr;
use std::net::{IpAddr, Ipv4Addr}; use std::net::{IpAddr, Ipv4Addr};

View file

@ -1,9 +1,9 @@
use crate::atproto::Nsid;
use std::{ use std::{
collections::HashMap, collections::HashMap,
pin::Pin, pin::Pin,
future::Future, future::Future,
}; };
use atproto::Nsid;
use axum::{ use axum::{
extract::{ extract::{
Json, Json,