Src, move out of ./rust, add missing wip files
Moves file contents out of ./rust since I'm planning to go with rust for the backend and the lexicons will be used for codegen soon anyways. I also added extra files that have been in the works that I have been accidentally sprinkling into main.rs already. God forbid I need to cherry-pick anything from so far back in the history. It will make git blame ugly though.
This commit is contained in:
parent
8363125108
commit
49e7340c19
11 changed files with 846 additions and 21 deletions
70
src/atproto.rs
Normal file
70
src/atproto.rs
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
use atrium_api::types::string::{
|
||||
AtIdentifier,
|
||||
RecordKey,
|
||||
};
|
||||
use regex::Regex;
|
||||
|
||||
pub use atrium_api::types::string::{
|
||||
Nsid,
|
||||
Did,
|
||||
Handle,
|
||||
};
|
||||
|
||||
enum Authority {
|
||||
Did(Did),
|
||||
Handle(Handle),
|
||||
}
|
||||
|
||||
impl Authority {
|
||||
pub fn new(authority: String) -> Result<Self, &'static str> {
|
||||
}
|
||||
}
|
||||
|
||||
// This implementation does not support Query or Fragments, and thus follows
|
||||
// the following schema: "at://" AUTHORITY [ "/" COLLECTION [ "/" RKEY ] ]
|
||||
pub struct Uri {
|
||||
authority: Authority,
|
||||
collection: Option<Nsid>,
|
||||
rkey: Option<RecordKey>,
|
||||
}
|
||||
|
||||
// TODO: Replace super basic URI regex with real uri parsing
|
||||
const URI_REGEX: Regex = Regex::new(
|
||||
r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i"
|
||||
).expect("valid regex");
|
||||
|
||||
impl Uri {
|
||||
pub fn new(uri: String) -> Result<Self, &'static str> {
|
||||
let Some(captures) = URI_REGEX.captures(&uri) else {
|
||||
return Err("Invalid Uri");
|
||||
};
|
||||
// TODO: Convert authority if its a did or a handle
|
||||
let Some(Ok(authority)) = captures.get(1).map(|mtch| {
|
||||
Authority::new(mtch.as_str().to_string())
|
||||
}) else {
|
||||
return Err("Invalid Authority")
|
||||
};
|
||||
let collection = captures.get(2).map(|mtch| {
|
||||
Nsid::new(mtch.as_str().to_string())
|
||||
});
|
||||
let rkey = captures.get(3).map(|mtch| {
|
||||
RecordKey::new(mtch.as_str().to_string())
|
||||
});
|
||||
Ok(Uri { authority, collection, rkey })
|
||||
}
|
||||
|
||||
pub fn as_string(&self) -> String {
|
||||
let mut uri = String::from("at://");
|
||||
uri.push_str(match &self.authority {
|
||||
Authority::Handle(h) => &*h,
|
||||
Authority::Did(d) => &*d,
|
||||
});
|
||||
if let Some(nsid) = &self.collection {
|
||||
uri.push_str(&*nsid);
|
||||
}
|
||||
if let Some(rkey) = &self.rkey {
|
||||
uri.push_str(&*rkey);
|
||||
}
|
||||
uri
|
||||
}
|
||||
}
|
||||
113
src/db.rs
Normal file
113
src/db.rs
Normal file
|
|
@ -0,0 +1,113 @@
|
|||
use crate::atproto::{
|
||||
Did,
|
||||
Uri,
|
||||
}
|
||||
use sqlx::{
|
||||
query,
|
||||
Database,
|
||||
Pool,
|
||||
Postgres,
|
||||
pool::PoolOptions,
|
||||
postgres::{
|
||||
PgConnectOptions,
|
||||
PgSslMode,
|
||||
},
|
||||
Result,
|
||||
};
|
||||
use std::string::ToString;
|
||||
|
||||
pub struct Db<Dbimp: Database> {
|
||||
pool: Pool<Dbimp>
|
||||
}
|
||||
|
||||
pub struct User {
|
||||
userdid: Did,
|
||||
handle: Handle,
|
||||
}
|
||||
|
||||
#[non_exhaustive]
|
||||
enum Role {
|
||||
Owner,
|
||||
Participant
|
||||
}
|
||||
|
||||
impl ToString for Role {
|
||||
fn to_string(&self) -> String {
|
||||
match *self {
|
||||
Role::Owner => "owner".to_string(),
|
||||
Role::Participant => "participant".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Participant {
|
||||
participantdid: Did,
|
||||
role: Role,
|
||||
}
|
||||
|
||||
pub struct Session {
|
||||
sessionuri: Uri,
|
||||
label: Option<String>,
|
||||
participants: Vec<Participant>,
|
||||
}
|
||||
|
||||
impl Db<Postgres> {
|
||||
async fn connect() -> Result<Self> {
|
||||
let conn = PgConnectOptions::new()
|
||||
.host("localhost")
|
||||
.port(5432)
|
||||
.username("postgres")
|
||||
.password("062217")
|
||||
.database("anisky")
|
||||
.ssl_mode(PgSslMode::Disable);
|
||||
|
||||
let pool = match PoolOptions::new().connect_with(conn).await {
|
||||
Ok(p) => p,
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
Ok(Db { pool })
|
||||
}
|
||||
//
|
||||
// pub async fn add_user(&self, user: &User) -> Result<()> {
|
||||
// query!(r#"
|
||||
// INSERT INTO users(userdid, handle) VALUES ($1, $2)
|
||||
// "#,
|
||||
// user.userdid, user.handle
|
||||
// ).execute(self.pool).await?;
|
||||
// Ok(())
|
||||
// }
|
||||
//
|
||||
// pub async fn add_session(&self, session: &Session) -> Result<()> {
|
||||
// let mut transaction = self.pool.begin().await?;
|
||||
//
|
||||
// query!(r#"
|
||||
// INSERT INTO sessions(sessionuri, label) VALUES ($1, $2)
|
||||
// "#,
|
||||
// session.sessionuri, session.label
|
||||
// ).execute(&mut *transaction).await?;
|
||||
//
|
||||
// for participant in session.participants {
|
||||
// query!(r#"
|
||||
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
|
||||
// "#,
|
||||
// session.sessionuri, participant.userdid, participant.role.to_string()
|
||||
// ).execute(&mut *transaction).await?;
|
||||
// }
|
||||
//
|
||||
// transaction.commit().await
|
||||
// }
|
||||
//
|
||||
// pub async fn add_participant(&self, session: Session,
|
||||
// participant: Participant) -> Result<Session> {
|
||||
// query!(r#"
|
||||
// INSERT INTO participants(sessionuri, userdid, role) VALUES ($1, $2, $3)
|
||||
// "#,
|
||||
// session.sessionuri, participant.userdid, participant.role.to_string()
|
||||
// ).execute(self.pool).await?;
|
||||
//
|
||||
// session.participants.push(participant);
|
||||
//
|
||||
// Ok(session)
|
||||
// }
|
||||
}
|
||||
30
src/injester.rs
Normal file
30
src/injester.rs
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
use crate::db::Db;
|
||||
use rocketman::{
|
||||
options::JetstreamOptions
|
||||
ingestion::LexiconIngestor
|
||||
};
|
||||
|
||||
enum Injester {
|
||||
Jetstream(Jetstream)
|
||||
}
|
||||
|
||||
struct SpoorJetstream;
|
||||
|
||||
impl LexiconIngestor for SpoorJetstream {
|
||||
async fn ingest(&self, message: Event<Value>) -> Result<()> {
|
||||
if let Some(commit) = &message.commit {
|
||||
match commit.operation {
|
||||
Operation::Create | Operation::Update => {
|
||||
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err("Message has no commit");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_ingester(db: Db) {
|
||||
let opts = JetstreamOptions::builder()
|
||||
}
|
||||
41
src/main.rs
Normal file
41
src/main.rs
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
use crate::{
|
||||
atproto::Nsid,
|
||||
injester::start_injester,
|
||||
router::{
|
||||
Router,
|
||||
Endpoint,
|
||||
xrpc::{
|
||||
QueryInput,
|
||||
ProcedureInput,
|
||||
Response,
|
||||
error,
|
||||
},
|
||||
},
|
||||
};
|
||||
use http::status::StatusCode;
|
||||
|
||||
mod atproto;
|
||||
mod injester;
|
||||
mod router;
|
||||
mod db;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let mut router = Router::new();
|
||||
let get_nsid = Nsid::new(String::from("me.woach.get")).expect("me.woach.get is a valid nsid");
|
||||
let post_nsid = Nsid::new(String::from("me.woach.post")).expect("me.woach.post is a valid nsid");
|
||||
router = router.add_endpoint(Endpoint::new_xrpc_query(get_nsid, test));
|
||||
router = router.add_endpoint(Endpoint::new_xrpc_procedure(post_nsid, test2));
|
||||
tokio::spawn(async move {
|
||||
start_injester();
|
||||
});
|
||||
router.serve().await;
|
||||
}
|
||||
|
||||
async fn test(_data: QueryInput) -> Response {
|
||||
error(StatusCode::OK, "error", "message")
|
||||
}
|
||||
|
||||
async fn test2(_data: ProcedureInput) -> Response {
|
||||
error(StatusCode::OK, "error", "message")
|
||||
}
|
||||
61
src/router.rs
Normal file
61
src/router.rs
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
use crate::{
|
||||
atproto::Nsid,
|
||||
router::xrpc::{
|
||||
XrpcEndpoint,
|
||||
XrpcHandler,
|
||||
QueryInput,
|
||||
ProcedureInput,
|
||||
}
|
||||
};
|
||||
use axum::Router as AxumRouter;
|
||||
use core::net::SocketAddr;
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
pub struct Router {
|
||||
addr: SocketAddr,
|
||||
router: AxumRouter,
|
||||
}
|
||||
|
||||
// In case server ever needs to support more than just XRPC
|
||||
pub enum Endpoint {
|
||||
Xrpc(XrpcEndpoint),
|
||||
}
|
||||
impl Endpoint {
|
||||
pub fn new_xrpc_query<Q>(nsid: Nsid, query: Q) -> Self
|
||||
where
|
||||
Q: XrpcHandler<QueryInput> + Clone
|
||||
{
|
||||
Endpoint::Xrpc(XrpcEndpoint::new_query(nsid,query))
|
||||
}
|
||||
pub fn new_xrpc_procedure<P>(nsid: Nsid, procedure: P) -> Self
|
||||
where
|
||||
P: XrpcHandler<ProcedureInput> + Clone
|
||||
{
|
||||
Endpoint::Xrpc(XrpcEndpoint::new_procedure(nsid,procedure))
|
||||
}
|
||||
}
|
||||
|
||||
pub mod xrpc;
|
||||
|
||||
impl Router {
|
||||
pub fn new() -> Self {
|
||||
let mut router = AxumRouter::new();
|
||||
router = XrpcEndpoint::not_implemented().add_to_router(router);
|
||||
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127,0,0,1)), 6702);
|
||||
Router { router, addr }
|
||||
}
|
||||
|
||||
pub fn add_endpoint(mut self, endpoint: Endpoint) -> Self {
|
||||
match endpoint {
|
||||
Endpoint::Xrpc(ep) => self.router = ep.add_to_router(self.router),
|
||||
};
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn serve(self) {
|
||||
let listener = TcpListener::bind(self.addr).await.unwrap();
|
||||
|
||||
axum::serve(listener, self.router).await.unwrap();
|
||||
}
|
||||
}
|
||||
181
src/router/xrpc.rs
Normal file
181
src/router/xrpc.rs
Normal file
|
|
@ -0,0 +1,181 @@
|
|||
use crate::atproto::Nsid;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
pin::Pin,
|
||||
future::Future,
|
||||
};
|
||||
use axum::{
|
||||
extract::{
|
||||
Json,
|
||||
Query,
|
||||
Request,
|
||||
FromRequest,
|
||||
FromRequestParts,
|
||||
rejection::QueryRejection,
|
||||
},
|
||||
body::Bytes,
|
||||
routing::{
|
||||
get,
|
||||
post,
|
||||
method_routing::MethodRouter,
|
||||
},
|
||||
http::{
|
||||
StatusCode,
|
||||
request::Parts,
|
||||
},
|
||||
Router as axumRouter,
|
||||
};
|
||||
use serde_json::{Value, json};
|
||||
|
||||
enum Path {
|
||||
Nsid(Nsid),
|
||||
NotImplemented,
|
||||
}
|
||||
|
||||
pub struct XrpcEndpoint {
|
||||
path: Path,
|
||||
resolver: MethodRouter,
|
||||
}
|
||||
|
||||
pub type Response = (StatusCode, Json<Value>);
|
||||
pub fn error(code: StatusCode, error: &str, message: &str) -> Response {
|
||||
(
|
||||
code,
|
||||
Json(json!({
|
||||
"error": error,
|
||||
"message": message
|
||||
}))
|
||||
)
|
||||
}
|
||||
pub fn response(code: StatusCode, message: &str) -> Response {
|
||||
error(code, "", message)
|
||||
}
|
||||
|
||||
pub struct QueryInput {
|
||||
parameters: HashMap<String, String>,
|
||||
}
|
||||
impl<S> FromRequestParts<S> for QueryInput
|
||||
where
|
||||
S: Send + Sync,
|
||||
{
|
||||
type Rejection = Response;
|
||||
|
||||
async fn from_request_parts(parts: &mut Parts, _state: &S)
|
||||
-> Result<Self, Self::Rejection> {
|
||||
let query_params: Result<Query<HashMap<String, String>>, QueryRejection> = Query::try_from_uri(&parts.uri);
|
||||
match query_params {
|
||||
Ok(p) => Ok(QueryInput { parameters: p.0 }),
|
||||
Err(e) => Err(error(StatusCode::BAD_REQUEST, "Bad Parameters", &e.body_text())),
|
||||
}
|
||||
}
|
||||
}
|
||||
pub struct ProcedureInput {
|
||||
parameters: HashMap<String, String>,
|
||||
input: Json<Value>,
|
||||
}
|
||||
impl<S> FromRequest<S> for ProcedureInput
|
||||
where
|
||||
Bytes: FromRequest<S>,
|
||||
S: Send + Sync,
|
||||
{
|
||||
type Rejection = Response;
|
||||
|
||||
async fn from_request(req: Request, state: &S)
|
||||
-> Result<Self, Self::Rejection> {
|
||||
let query_params: Result<Query<HashMap<String, String>>, QueryRejection> = Query::try_from_uri(req.uri());
|
||||
let parameters = match query_params {
|
||||
Ok(p) => p.0,
|
||||
Err(e) => return Err(error(StatusCode::BAD_REQUEST, "Bad Parameters", &e.body_text())),
|
||||
};
|
||||
|
||||
let json_value = Json::<Value>::from_request(req, state).await;
|
||||
let input: Json<Value> = match json_value {
|
||||
Ok(v) => v,
|
||||
Err(e) => return Err(error(StatusCode::BAD_REQUEST, "Bad Parameters", &e.body_text())),
|
||||
};
|
||||
|
||||
Ok(ProcedureInput { parameters, input })
|
||||
}
|
||||
}
|
||||
|
||||
pub trait XrpcHandler<Input>: Send + Sync + 'static {
|
||||
fn call(&self, input: Input)
|
||||
-> Pin<Box<dyn Future<Output = Response> + Send>>;
|
||||
}
|
||||
impl<F, Fut> XrpcHandler<QueryInput> for F
|
||||
where
|
||||
F: Fn(QueryInput) -> Fut + Send + Sync + 'static,
|
||||
Fut: Future<Output = Response> + Send + 'static,
|
||||
{
|
||||
fn call(&self, input: QueryInput)
|
||||
-> Pin<Box<dyn Future<Output = Response>+ Send>> {
|
||||
Box::pin((self)(input))
|
||||
}
|
||||
}
|
||||
impl<F, Fut> XrpcHandler<ProcedureInput> for F
|
||||
where
|
||||
F: Fn(ProcedureInput) -> Fut + Send + Sync + 'static,
|
||||
Fut: Future<Output = Response> + Send + 'static,
|
||||
{
|
||||
fn call(&self, input: ProcedureInput)
|
||||
-> Pin<Box<dyn Future<Output = Response>+ Send>> {
|
||||
Box::pin((self)(input))
|
||||
}
|
||||
}
|
||||
|
||||
impl XrpcEndpoint {
|
||||
pub fn new_query<Q>(nsid: Nsid, query: Q) -> Self
|
||||
where
|
||||
Q: XrpcHandler<QueryInput> + Clone
|
||||
{
|
||||
XrpcEndpoint {
|
||||
path: Path::Nsid(nsid),
|
||||
resolver: get(async move | mut parts: Parts | -> Response {
|
||||
match QueryInput::from_request_parts(&mut parts, &()).await {
|
||||
Ok(qi) => query.call(qi).await,
|
||||
Err(e) => e
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_procedure<P>(nsid: Nsid, procedure: P) -> Self
|
||||
where
|
||||
P: XrpcHandler<ProcedureInput> + Clone
|
||||
{
|
||||
XrpcEndpoint {
|
||||
path: Path::Nsid(nsid),
|
||||
resolver: post(async move | req: Request | -> Response {
|
||||
match ProcedureInput::from_request(req, &()).await {
|
||||
Ok(pi) => procedure.call(pi).await,
|
||||
Err(e) => e
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_to_router(self, router: axumRouter) -> axumRouter {
|
||||
let path = match self.path {
|
||||
Path::Nsid(nsid) => &("/xrpc/".to_owned() + nsid.as_str()),
|
||||
Path::NotImplemented => "/xrpc/{*nsid}",
|
||||
};
|
||||
|
||||
router.route(path, self.resolver)
|
||||
}
|
||||
|
||||
pub fn not_implemented() -> Self {
|
||||
let resolver = (
|
||||
StatusCode::NOT_IMPLEMENTED,
|
||||
Json(json!({
|
||||
"error": "MethodNotImplemented",
|
||||
"message": "Method Not Implemented"
|
||||
}))
|
||||
);
|
||||
|
||||
XrpcEndpoint {
|
||||
path: Path::NotImplemented,
|
||||
resolver: get(resolver.clone()).post(resolver),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue