From d4a3a71e2fefea88ac4465937d64e0ae11f8e79d Mon Sep 17 00:00:00 2001 From: Julia Lange Date: Thu, 24 Apr 2025 14:45:53 -0700 Subject: [PATCH] Router, add xrpc function interface, test in main Adds an interface for adding xrpc queries and procedures that is a bit simpler than directly interfacing with axum. Should provide good scaffolding for next steps. --- rust/Cargo.lock | 18 ++++- rust/Cargo.toml | 1 + rust/src/main.rs | 26 ++++++- rust/src/router.rs | 51 ++++++++++--- rust/src/router/xrpc.rs | 157 ++++++++++++++++++++++++++++++++++++---- 5 files changed, 221 insertions(+), 32 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 27c12b6..4d296f7 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -92,6 +92,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-macros" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -281,7 +292,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1099,6 +1110,7 @@ name = "rust" version = "0.1.0" dependencies = [ "axum", + "axum-macros", "serde", "serde_json", "sqlx", @@ -1121,7 +1133,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1525,7 +1537,7 @@ dependencies = [ "getrandom 0.3.2", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/rust/Cargo.toml b/rust/Cargo.toml index ee71a7c..06b40a3 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] axum = { version = "0.8.3", features = ["json"] } +axum-macros = "0.5.0" serde = "1.0.219" serde_json = "1.0.140" sqlx = { version = "0.8.5", features = ["runtime-tokio"] } diff --git a/rust/src/main.rs b/rust/src/main.rs index f95f790..769184d 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -1,9 +1,31 @@ -use crate::router::Router; +use crate::router::{ + Router, + Endpoint, + xrpc::{ + QueryInput, + ProcedureInput, + Response, + error, + }, +}; +use axum::http::StatusCode; + mod router; +mod db; #[tokio::main] async fn main() { - let router = Router::new(); + let mut router = Router::new(); + router = router.add_endpoint(Endpoint::new_xrpc_query(String::from("me.woach.get"), test)); + router = router.add_endpoint(Endpoint::new_xrpc_procedure(String::from("me.woach.post"), test2)); router.serve().await; } + +async fn test(_data: QueryInput) -> Response { + error(StatusCode::OK, "error", "message") +} + +async fn test2(_data: ProcedureInput) -> Response { + error(StatusCode::OK, "error", "message") +} diff --git a/rust/src/router.rs b/rust/src/router.rs index f6b65ba..79468db 100644 --- a/rust/src/router.rs +++ b/rust/src/router.rs @@ -1,31 +1,58 @@ -use crate::router::xrpc::XrpcEndpoint; -use axum::Router as axumRouter; +use crate::router::xrpc::{ + XrpcEndpoint, + XrpcHandler, + QueryInput, + ProcedureInput, +}; +use axum::Router as AxumRouter; use core::net::SocketAddr; use std::net::{IpAddr, Ipv4Addr}; use tokio::net::TcpListener; pub struct Router { addr: SocketAddr, - xrpc: Vec, + router: AxumRouter, } -mod xrpc; +// In case server ever needs to support more than just XRPC +pub enum Endpoint { + Xrpc(XrpcEndpoint), +} +impl Endpoint { + pub fn new_xrpc_query(nsid: String, query: Q) -> Self + where + Q: XrpcHandler + Clone + { + Endpoint::Xrpc(XrpcEndpoint::new_query(nsid,query)) + } + pub fn new_xrpc_procedure

(nsid: String, procedure: P) -> Self + where + P: XrpcHandler + Clone + { + Endpoint::Xrpc(XrpcEndpoint::new_procedure(nsid,procedure)) + } +} + +pub mod xrpc; impl Router { pub fn new() -> Self { - let xrpc = vec![XrpcEndpoint::not_implemented()]; + let mut router = AxumRouter::new(); + router = XrpcEndpoint::not_implemented().add_to_router(router); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127,0,0,1)), 6702); - Router { xrpc, addr } + Router { router, addr } + } + + pub fn add_endpoint(mut self, endpoint: Endpoint) -> Self { + match endpoint { + Endpoint::Xrpc(ep) => self.router = ep.add_to_router(self.router), + }; + self } pub async fn serve(self) { let listener = TcpListener::bind(self.addr).await.unwrap(); - let mut router = axumRouter::new(); - for endpoint in self.xrpc { - router = endpoint.add_to_router(router); - } - - axum::serve(listener, router).await.unwrap(); + axum::serve(listener, self.router).await.unwrap(); } } diff --git a/rust/src/router/xrpc.rs b/rust/src/router/xrpc.rs index dddb751..ab17148 100644 --- a/rust/src/router/xrpc.rs +++ b/rust/src/router/xrpc.rs @@ -1,11 +1,27 @@ +use std::{ + collections::HashMap, + pin::Pin, + future::Future, +}; use axum::{ - Json, - extract::Path, + extract::{ + Json, + Query, + Request, + FromRequest, + FromRequestParts, + rejection::QueryRejection, + }, + body::Bytes, routing::{ get, + post, method_routing::MethodRouter, }, - http::StatusCode, + http::{ + StatusCode, + request::Parts, + }, Router as axumRouter, }; use serde_json::{Value, json}; @@ -20,7 +36,122 @@ pub struct XrpcEndpoint { resolver: MethodRouter, } +pub type Response = (StatusCode, Json); +pub fn error(code: StatusCode, error: &str, message: &str) -> Response { + ( + code, + Json(json!({ + "error": error, + "message": message + })) + ) +} +pub fn response(code: StatusCode, message: &str) -> Response { + error(code, "", message) +} + +pub struct QueryInput { + parameters: HashMap, +} +impl FromRequestParts for QueryInput +where + S: Send + Sync, +{ + type Rejection = Response; + + async fn from_request_parts(parts: &mut Parts, _state: &S) + -> Result { + let query_params: Result>, QueryRejection> = Query::try_from_uri(&parts.uri); + match query_params { + Ok(p) => Ok(QueryInput { parameters: p.0 }), + Err(e) => Err(error(StatusCode::BAD_REQUEST, "Bad Parameters", &e.body_text())), + } + } +} +pub struct ProcedureInput { + parameters: HashMap, + input: Json, +} +impl FromRequest for ProcedureInput +where + Bytes: FromRequest, + S: Send + Sync, +{ + type Rejection = Response; + + async fn from_request(req: Request, state: &S) + -> Result { + let query_params: Result>, QueryRejection> = Query::try_from_uri(req.uri()); + let parameters = match query_params { + Ok(p) => p.0, + Err(e) => return Err(error(StatusCode::BAD_REQUEST, "Bad Parameters", &e.body_text())), + }; + + let json_value = Json::::from_request(req, state).await; + let input: Json = match json_value { + Ok(v) => v, + Err(e) => return Err(error(StatusCode::BAD_REQUEST, "Bad Parameters", &e.body_text())), + }; + + Ok(ProcedureInput { parameters, input }) + } +} + +pub trait XrpcHandler: Send + Sync + 'static { + fn call(&self, input: Input) + -> Pin + Send>>; +} +impl XrpcHandler for F +where + F: Fn(QueryInput) -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, +{ + fn call(&self, input: QueryInput) + -> Pin+ Send>> { + Box::pin((self)(input)) + } +} +impl XrpcHandler for F +where + F: Fn(ProcedureInput) -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, +{ + fn call(&self, input: ProcedureInput) + -> Pin+ Send>> { + Box::pin((self)(input)) + } +} + impl XrpcEndpoint { + pub fn new_query(nsid: String, query: Q) -> Self + where + Q: XrpcHandler + Clone + { + XrpcEndpoint { + nsid: Nsid::Nsid(nsid), + resolver: get(async move | mut parts: Parts | -> Response { + match QueryInput::from_request_parts(&mut parts, &()).await { + Ok(qi) => query.call(qi).await, + Err(e) => e + } + }) + } + } + + pub fn new_procedure

(nsid: String, procedure: P) -> Self + where + P: XrpcHandler + Clone + { + XrpcEndpoint { + nsid: Nsid::Nsid(nsid), + resolver: post(async move | req: Request | -> Response { + match ProcedureInput::from_request(req, &()).await { + Ok(pi) => procedure.call(pi).await, + Err(e) => e + } + }) + } + } pub fn add_to_router(self, router: axumRouter) -> axumRouter { let path = match self.nsid { @@ -31,23 +162,19 @@ impl XrpcEndpoint { router.route(path, self.resolver) } - pub fn not_implemented() -> XrpcEndpoint { - XrpcEndpoint { - nsid: Nsid::NotImplemented, - resolver: get(Self::not_implemented_resolver) - .post(Self::not_implemented_resolver), - } - } - - async fn not_implemented_resolver(Path(_nsid): Path) -> (StatusCode, Json) { - ( + pub fn not_implemented() -> Self { + let resolver = ( StatusCode::NOT_IMPLEMENTED, Json(json!({ "error": "MethodNotImplemented", "message": "Method Not Implemented" })) - ) + ); + + XrpcEndpoint { + nsid: Nsid::NotImplemented, + resolver: get(resolver.clone()).post(resolver), + } } - }