Compare commits

...

6 commits

28 changed files with 445 additions and 635 deletions

742
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,25 +1,11 @@
[package]
name = "rust"
version = "0.1.0"
edition = "2021"
[workspace]
resolver = "3"
members = [ "api", "atproto","db", "ingestor"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.98"
async-trait = "0.1.88"
atrium-api = { version = "0.25.2", default-features = false }
axum = { version = "0.8.3", features = ["json"] }
axum-macros = "0.5.0"
http = "1.3.1"
regex = "1.11.1"
rocketman = "0.2.0"
[workspace.dependencies]
atproto = { path = "./atproto" }
serde = "1.0.219"
serde_json = "1.0.140"
sqlx = { version = "0.8.5", features = ["postgres", "runtime-tokio"] }
tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.45.0", features = ["macros", "rt-multi-thread"] }
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
[build-dependencies]
esquema-codegen = { git = "https://github.com/fatfingers23/esquema.git", branch = "main" }

14
api/Cargo.toml Normal file
View file

@ -0,0 +1,14 @@
[package]
name = "api"
version = "0.1.0"
edition = "2024"
[dependencies]
atproto.workspace = true
axum = { version = "0.8.3", features = ["json"] }
http = "1.3.1"
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true

View file

@ -1,24 +1,17 @@
use crate::{
atproto::Nsid,
ingestor::start_ingestor,
router::{
Router,
Endpoint,
xrpc::{
QueryInput,
ProcedureInput,
Response,
error,
},
use crate::router::{
Router,
Endpoint,
xrpc::{
QueryInput,
ProcedureInput,
Response,
error,
},
};
use atproto::Nsid;
use http::status::StatusCode;
mod atproto;
mod ingestor;
mod lexicons;
mod router;
// mod db;
#[tokio::main]
async fn main() {
@ -30,9 +23,6 @@ async fn main() {
let post_nsid = Nsid::new(String::from("me.woach.post")).expect("me.woach.post is a valid nsid");
router = router.add_endpoint(Endpoint::new_xrpc_query(get_nsid, test));
router = router.add_endpoint(Endpoint::new_xrpc_procedure(post_nsid, test2));
tokio::spawn(async move {
start_ingestor().await;
});
router.serve().await;
}

View file

@ -1,12 +1,10 @@
use crate::{
atproto::Nsid,
router::xrpc::{
XrpcEndpoint,
XrpcHandler,
QueryInput,
ProcedureInput,
}
use crate::router::xrpc::{
XrpcEndpoint,
XrpcHandler,
QueryInput,
ProcedureInput,
};
use atproto::Nsid;
use axum::Router as AxumRouter;
use core::net::SocketAddr;
use std::net::{IpAddr, Ipv4Addr};

View file

@ -1,9 +1,9 @@
use crate::atproto::Nsid;
use std::{
collections::HashMap,
pin::Pin,
future::Future,
};
use atproto::Nsid;
use axum::{
extract::{
Json,

12
atproto/Cargo.toml Normal file
View file

@ -0,0 +1,12 @@
[package]
name = "atproto"
version = "0.1.0"
edition = "2024"
[dependencies]
atrium-api = { version = "0.25.3", default-features = false }
lazy-regex = "3.4.1"
serde.workspace = true
serde_json.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true

60
atproto/src/lib.rs Normal file
View file

@ -0,0 +1,60 @@
use lazy_regex::regex_captures;
use core::str::FromStr;
pub use atrium_api::types::{
Collection,
string::{
Nsid,
RecordKey,
AtIdentifier as Authority,
}
};
pub mod lexicons;
pub struct Uri {
whole: String,
// These fields could be useful in the future,
// so I'm leaving the code for them.
// authority: Authority,
// collection: Option<Nsid>,
// rkey: Option<RecordKey>,
}
impl FromStr for Uri {
type Err = &'static str;
fn from_str(uri: &str) -> Result<Self, Self::Err> {
if uri.len() > 8000 {
return Err("Uri too long")
}
let Some((
whole, unchecked_authority, unchecked_collection, unchecked_rkey
)) = regex_captures!(
r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i",
uri,
) else {
return Err("Invalid Uri");
};
// This parsing is required, but the values don't need to be used yet.
// No compute cost to use them, just storage cost
let _authority = Authority::from_str(unchecked_authority)?;
let _collection = if unchecked_collection.is_empty() { None }
else { Some(Nsid::new(unchecked_collection.to_string())?) };
let _rkey = if unchecked_rkey.is_empty() { None }
else { Some(RecordKey::new(unchecked_rkey.to_string())?) };
// Ok(Uri{ whole: whole.to_string(), authority, collection, rkey })
Ok(Uri { whole: whole.to_string() })
}
}
impl Uri {
pub fn as_str(&self) -> &str {
self.whole.as_str()
}
}

8
db/Cargo.toml Normal file
View file

@ -0,0 +1,8 @@
[package]
name = "db"
version = "0.1.0"
edition = "2024"
[dependencies]
sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] }
tokio = "1.45.0"

View file

@ -1,7 +1,3 @@
use crate::atproto::{
Did,
Uri,
}
use sqlx::{
query,
Database,
@ -20,11 +16,6 @@ pub struct Db<Dbimp: Database> {
pool: Pool<Dbimp>
}
pub struct User {
userdid: Did,
handle: Handle,
}
#[non_exhaustive]
enum Role {
Owner,
@ -40,11 +31,6 @@ impl ToString for Role {
}
}
struct Participant {
participantdid: Did,
role: Role,
}
pub struct Session {
sessionuri: Uri,
label: Option<String>,

15
db/src/interfaces.rs Normal file
View file

@ -0,0 +1,15 @@
use atproto::{
Did,
Uri,
};
pub struct User {
userdid: Did,
handle: Handle,
}
struct Participant {
participantdid: Did,
role: Role,
}

1
db/src/lib.rs Normal file
View file

@ -0,0 +1 @@
pub struct db;

15
ingestor/Cargo.toml Normal file
View file

@ -0,0 +1,15 @@
[package]
name = "ingestor"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow = "1.0.98"
async-trait = "0.1.88"
atproto.workspace = true
rocketman = "0.2.0"
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true

View file

@ -0,0 +1,25 @@
use rocketman::types::event::Event;
use anyhow::Result;
enum Ingestor {
Jetstream(SpoorJetstream)
}
struct SpoorJetstream;
#[async_trait]
impl LexiconIngestor for SpoorJetstream {
async fn ingest(&self, message: Event<Value>) -> Result<()> {
info!("{:?}", message);
// if let Some(commit) = &message.commit {
// match commit.operation {
// Operation::Create | Operation::Update => {}
// Operation::Delete => {}
// }
// } else {
// return Err("Message has no commit");
// }
Ok(())
}
}

View file

@ -1,6 +1,8 @@
use crate::lexicons::my::spoor::log::{Activity, Session};
use atproto::{
Collection,
lexicons::my::spoor::log::{Activity, Session},
};
use async_trait::async_trait;
use atrium_api::types::Collection;
use rocketman::{
options::JetstreamOptions,
ingestion::LexiconIngestor,
@ -15,29 +17,6 @@ use std::{
sync::{Arc, Mutex},
};
use tracing::{error, info};
use anyhow::Result;
enum Ingestor {
Jetstream(SpoorJetstream)
}
struct SpoorJetstream;
#[async_trait]
impl LexiconIngestor for SpoorJetstream {
async fn ingest(&self, message: Event<Value>) -> Result<()> {
info!("{:?}", message);
// if let Some(commit) = &message.commit {
// match commit.operation {
// Operation::Create | Operation::Update => {}
// Operation::Delete => {}
// }
// } else {
// return Err("Message has no commit");
// }
Ok(())
}
}
pub async fn start_ingestor() {
info!("Starting ingestor");

8
ingestor/src/main.rs Normal file
View file

@ -0,0 +1,8 @@
#[tokio::main]
async fn main() {
let subscriber = tracing_subscriber::FmtSubscriber::new();
let _ = tracing::subscriber::set_global_default(subscriber);
}

View file

@ -1,67 +0,0 @@
use atrium_api::types::string::RecordKey;
// use regex::Regex;
pub use atrium_api::types::string::{
Nsid,
Did,
Handle,
};
enum Authority {
Did(Did),
Handle(Handle),
}
// impl Authority {
// pub fn new(authority: String) -> Result<Self, &'static str> {
// }
// }
// This implementation does not support Query or Fragments, and thus follows
// the following schema: "at://" AUTHORITY [ "/" COLLECTION [ "/" RKEY ] ]
pub struct Uri {
authority: Authority,
collection: Option<Nsid>,
rkey: Option<RecordKey>,
}
// TODO: Replace super basic URI regex with real uri parsing
// const URI_REGEX: Regex = Regex::new(
// r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i"
// ).expect("valid regex");
//
// impl Uri {
// pub fn new(uri: String) -> Result<Self, &'static str> {
// let Some(captures) = URI_REGEX.captures(&uri) else {
// return Err("Invalid Uri");
// };
// // TODO: Convert authority if its a did or a handle
// let Some(Ok(authority)) = captures.get(1).map(|mtch| {
// Authority::new(mtch.as_str().to_string())
// }) else {
// return Err("Invalid Authority")
// };
// let collection = captures.get(2).map(|mtch| {
// Nsid::new(mtch.as_str().to_string())
// });
// let rkey = captures.get(3).map(|mtch| {
// RecordKey::new(mtch.as_str().to_string())
// });
// Ok(Uri { authority, collection, rkey })
// }
//
// pub fn as_string(&self) -> String {
// let mut uri = String::from("at://");
// uri.push_str(match &self.authority {
// Authority::Handle(h) => &*h,
// Authority::Did(d) => &*d,
// });
// if let Some(nsid) = &self.collection {
// uri.push_str(&*nsid);
// }
// if let Some(rkey) = &self.rkey {
// uri.push_str(&*rkey);
// }
// uri
// }
// }