Separate mono-binary into libraries and binaries #2
31 changed files with 582 additions and 636 deletions
729
Cargo.lock
generated
729
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
27
Cargo.toml
27
Cargo.toml
|
|
@ -1,26 +1,11 @@
|
||||||
[package]
|
[workspace]
|
||||||
name = "rust"
|
resolver = "3"
|
||||||
version = "0.1.0"
|
members = [ "api", "atproto","db", "ingestor"]
|
||||||
edition = "2021"
|
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
[workspace.dependencies]
|
||||||
|
atproto = { path = "./atproto" }
|
||||||
[dependencies]
|
|
||||||
anyhow = "1.0.98"
|
|
||||||
async-trait = "0.1.88"
|
|
||||||
atrium-api = { version = "0.25.2", default-features = false }
|
|
||||||
axum = { version = "0.8.3", features = ["json"] }
|
|
||||||
axum-macros = "0.5.0"
|
|
||||||
http = "1.3.1"
|
|
||||||
lazy-regex = "3.4.1"
|
|
||||||
regex-macro = "0.3.0"
|
|
||||||
rocketman = "0.2.0"
|
|
||||||
serde = "1.0.219"
|
serde = "1.0.219"
|
||||||
serde_json = "1.0.140"
|
serde_json = "1.0.140"
|
||||||
sqlx = { version = "0.8.5", features = ["postgres", "runtime-tokio"] }
|
tokio = { version = "1.45.0", features = ["macros", "rt-multi-thread"] }
|
||||||
tokio = { version = "1.44.2", features = ["macros", "rt-multi-thread"] }
|
|
||||||
tracing = "0.1.41"
|
tracing = "0.1.41"
|
||||||
tracing-subscriber = "0.3.19"
|
tracing-subscriber = "0.3.19"
|
||||||
|
|
||||||
[build-dependencies]
|
|
||||||
esquema-codegen = { git = "https://github.com/fatfingers23/esquema.git", branch = "main" }
|
|
||||||
|
|
|
||||||
14
api/Cargo.toml
Normal file
14
api/Cargo.toml
Normal file
|
|
@ -0,0 +1,14 @@
|
||||||
|
[package]
|
||||||
|
name = "api"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
atproto.workspace = true
|
||||||
|
axum = { version = "0.8.3", features = ["json"] }
|
||||||
|
http = "1.3.1"
|
||||||
|
serde.workspace = true
|
||||||
|
serde_json.workspace = true
|
||||||
|
tokio.workspace = true
|
||||||
|
tracing-subscriber.workspace = true
|
||||||
|
tracing.workspace = true
|
||||||
|
|
@ -1,24 +1,17 @@
|
||||||
use crate::{
|
use crate::router::{
|
||||||
atproto::Nsid,
|
Router,
|
||||||
ingestor::start_ingestor,
|
Endpoint,
|
||||||
router::{
|
xrpc::{
|
||||||
Router,
|
QueryInput,
|
||||||
Endpoint,
|
ProcedureInput,
|
||||||
xrpc::{
|
Response,
|
||||||
QueryInput,
|
error,
|
||||||
ProcedureInput,
|
|
||||||
Response,
|
|
||||||
error,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
use atproto::Nsid;
|
||||||
use http::status::StatusCode;
|
use http::status::StatusCode;
|
||||||
|
|
||||||
mod atproto;
|
|
||||||
mod ingestor;
|
|
||||||
mod lexicons;
|
|
||||||
mod router;
|
mod router;
|
||||||
// mod db;
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
|
@ -30,9 +23,6 @@ async fn main() {
|
||||||
let post_nsid = Nsid::new(String::from("me.woach.post")).expect("me.woach.post is a valid nsid");
|
let post_nsid = Nsid::new(String::from("me.woach.post")).expect("me.woach.post is a valid nsid");
|
||||||
router = router.add_endpoint(Endpoint::new_xrpc_query(get_nsid, test));
|
router = router.add_endpoint(Endpoint::new_xrpc_query(get_nsid, test));
|
||||||
router = router.add_endpoint(Endpoint::new_xrpc_procedure(post_nsid, test2));
|
router = router.add_endpoint(Endpoint::new_xrpc_procedure(post_nsid, test2));
|
||||||
tokio::spawn(async move {
|
|
||||||
start_ingestor().await;
|
|
||||||
});
|
|
||||||
router.serve().await;
|
router.serve().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1,12 +1,10 @@
|
||||||
use crate::{
|
use crate::router::xrpc::{
|
||||||
atproto::Nsid,
|
XrpcEndpoint,
|
||||||
router::xrpc::{
|
XrpcHandler,
|
||||||
XrpcEndpoint,
|
QueryInput,
|
||||||
XrpcHandler,
|
ProcedureInput,
|
||||||
QueryInput,
|
|
||||||
ProcedureInput,
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
use atproto::Nsid;
|
||||||
use axum::Router as AxumRouter;
|
use axum::Router as AxumRouter;
|
||||||
use core::net::SocketAddr;
|
use core::net::SocketAddr;
|
||||||
use std::net::{IpAddr, Ipv4Addr};
|
use std::net::{IpAddr, Ipv4Addr};
|
||||||
|
|
@ -1,9 +1,9 @@
|
||||||
use crate::atproto::Nsid;
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
future::Future,
|
future::Future,
|
||||||
};
|
};
|
||||||
|
use atproto::Nsid;
|
||||||
use axum::{
|
use axum::{
|
||||||
extract::{
|
extract::{
|
||||||
Json,
|
Json,
|
||||||
12
atproto/Cargo.toml
Normal file
12
atproto/Cargo.toml
Normal file
|
|
@ -0,0 +1,12 @@
|
||||||
|
[package]
|
||||||
|
name = "atproto"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
atrium-api = { version = "0.25.3", default-features = false }
|
||||||
|
lazy-regex = "3.4.1"
|
||||||
|
serde.workspace = true
|
||||||
|
serde_json.workspace = true
|
||||||
|
tracing-subscriber.workspace = true
|
||||||
|
tracing.workspace = true
|
||||||
|
|
@ -10,6 +10,8 @@ pub use atrium_api::types::{
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub mod lexicons;
|
||||||
|
|
||||||
pub struct Uri {
|
pub struct Uri {
|
||||||
whole: String,
|
whole: String,
|
||||||
// These fields could be useful in the future,
|
// These fields could be useful in the future,
|
||||||
|
|
@ -19,12 +21,9 @@ pub struct Uri {
|
||||||
// rkey: Option<RecordKey>,
|
// rkey: Option<RecordKey>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Uri {
|
impl FromStr for Uri {
|
||||||
pub fn as_str(&self) -> &str {
|
type Err = &'static str;
|
||||||
self.whole.as_str()
|
fn from_str(uri: &str) -> Result<Self, Self::Err> {
|
||||||
}
|
|
||||||
|
|
||||||
pub fn from_str(uri: String) -> Result<Self, &'static str> {
|
|
||||||
if uri.len() > 8000 {
|
if uri.len() > 8000 {
|
||||||
return Err("Uri too long")
|
return Err("Uri too long")
|
||||||
}
|
}
|
||||||
|
|
@ -33,7 +32,7 @@ impl Uri {
|
||||||
whole, unchecked_authority, unchecked_collection, unchecked_rkey
|
whole, unchecked_authority, unchecked_collection, unchecked_rkey
|
||||||
)) = regex_captures!(
|
)) = regex_captures!(
|
||||||
r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i",
|
r"/^at:\/\/([\w\.\-_~:]+)(?:\/([\w\.\-_~:]+)(?:)\/([\w\.\-_~:]+))?$/i",
|
||||||
&uri,
|
uri,
|
||||||
) else {
|
) else {
|
||||||
return Err("Invalid Uri");
|
return Err("Invalid Uri");
|
||||||
};
|
};
|
||||||
|
|
@ -53,3 +52,9 @@ impl Uri {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Uri {
|
||||||
|
pub fn as_str(&self) -> &str {
|
||||||
|
self.whole.as_str()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
8
db/Cargo.toml
Normal file
8
db/Cargo.toml
Normal file
|
|
@ -0,0 +1,8 @@
|
||||||
|
[package]
|
||||||
|
name = "db"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
sqlx = { version = "0.8.6", features = ["postgres", "runtime-tokio"] }
|
||||||
|
tokio = "1.45.0"
|
||||||
|
|
@ -1,7 +1,3 @@
|
||||||
use crate::atproto::{
|
|
||||||
Did,
|
|
||||||
Uri,
|
|
||||||
}
|
|
||||||
use sqlx::{
|
use sqlx::{
|
||||||
query,
|
query,
|
||||||
Database,
|
Database,
|
||||||
|
|
@ -20,11 +16,6 @@ pub struct Db<Dbimp: Database> {
|
||||||
pool: Pool<Dbimp>
|
pool: Pool<Dbimp>
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct User {
|
|
||||||
userdid: Did,
|
|
||||||
handle: Handle,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[non_exhaustive]
|
#[non_exhaustive]
|
||||||
enum Role {
|
enum Role {
|
||||||
Owner,
|
Owner,
|
||||||
|
|
@ -40,11 +31,6 @@ impl ToString for Role {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Participant {
|
|
||||||
participantdid: Did,
|
|
||||||
role: Role,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Session {
|
pub struct Session {
|
||||||
sessionuri: Uri,
|
sessionuri: Uri,
|
||||||
label: Option<String>,
|
label: Option<String>,
|
||||||
15
db/src/interfaces.rs
Normal file
15
db/src/interfaces.rs
Normal file
|
|
@ -0,0 +1,15 @@
|
||||||
|
use atproto::{
|
||||||
|
Did,
|
||||||
|
Uri,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct User {
|
||||||
|
userdid: Did,
|
||||||
|
handle: Handle,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Participant {
|
||||||
|
participantdid: Did,
|
||||||
|
role: Role,
|
||||||
|
}
|
||||||
|
|
||||||
1
db/src/lib.rs
Normal file
1
db/src/lib.rs
Normal file
|
|
@ -0,0 +1 @@
|
||||||
|
pub struct db;
|
||||||
15
ingestor/Cargo.toml
Normal file
15
ingestor/Cargo.toml
Normal file
|
|
@ -0,0 +1,15 @@
|
||||||
|
[package]
|
||||||
|
name = "ingestor"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow = "1.0.98"
|
||||||
|
async-trait = "0.1.88"
|
||||||
|
atproto.workspace = true
|
||||||
|
rocketman = "0.2.0"
|
||||||
|
serde.workspace = true
|
||||||
|
serde_json.workspace = true
|
||||||
|
tokio.workspace = true
|
||||||
|
tracing-subscriber.workspace = true
|
||||||
|
tracing.workspace = true
|
||||||
85
ingestor/src/collections.rs
Normal file
85
ingestor/src/collections.rs
Normal file
|
|
@ -0,0 +1,85 @@
|
||||||
|
use rocketman::ingestion::LexiconIngestor;
|
||||||
|
|
||||||
|
macro_rules! create_commit_collection {
|
||||||
|
($collection:ident, $nsid:expr,
|
||||||
|
change => $change_fn:expr,
|
||||||
|
delete => $delete_fn:expr $(,)?
|
||||||
|
) => (
|
||||||
|
create_commit_collection!(
|
||||||
|
$collection, $nsid,
|
||||||
|
create => $change_fn,
|
||||||
|
update => $change_fn,
|
||||||
|
delete => $delete_fn,
|
||||||
|
);
|
||||||
|
);
|
||||||
|
($collection:ident, $nsid:expr,
|
||||||
|
create => $create_fn:expr,
|
||||||
|
update => $update_fn:expr,
|
||||||
|
delete => $delete_fn:expr $(,)?
|
||||||
|
) => (
|
||||||
|
pub struct $collection;
|
||||||
|
struct Ingestor;
|
||||||
|
|
||||||
|
impl $crate::collections::Collection for $collection {
|
||||||
|
fn new() -> Self { Self }
|
||||||
|
|
||||||
|
fn get_nsid(&self) -> String {
|
||||||
|
$nsid
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_ingestor(
|
||||||
|
&self
|
||||||
|
) -> Box<dyn rocketman::ingestion::LexiconIngestor + Send + Sync> {
|
||||||
|
Box::new(Ingestor::new())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Ingestor {
|
||||||
|
pub fn new() -> Self { Self }
|
||||||
|
|
||||||
|
pub async fn handle_commit(
|
||||||
|
&self,
|
||||||
|
message: rocketman::types::event::Event<serde_json::Value>
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
use rocketman::types::event::Operation;
|
||||||
|
|
||||||
|
let state = $crate::collections::CommitIngestorState { };
|
||||||
|
|
||||||
|
if let Some(commit) = &message.commit {
|
||||||
|
match commit.operation {
|
||||||
|
Operation::Create => ($create_fn)(state, message).await?,
|
||||||
|
Operation::Update => ($update_fn)(state, message).await?,
|
||||||
|
Operation::Delete => ($delete_fn)(state, message).await?,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err(anyhow::anyhow!("Message has no commit"));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl rocketman::ingestion::LexiconIngestor for Ingestor {
|
||||||
|
async fn ingest(
|
||||||
|
&self,
|
||||||
|
message: rocketman::types::event::Event<serde_json::Value>
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
self.handle_commit(message).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub mod my_spoor_log_session;
|
||||||
|
pub use my_spoor_log_session::MySpoorLogSession;
|
||||||
|
pub mod my_spoor_log_activity;
|
||||||
|
pub use my_spoor_log_activity::MySpoorLogActivity;
|
||||||
|
|
||||||
|
struct CommitIngestorState;
|
||||||
|
|
||||||
|
pub trait Collection {
|
||||||
|
fn new() -> Self;
|
||||||
|
fn get_nsid(&self) -> String;
|
||||||
|
fn get_ingestor(&self) -> Box<dyn LexiconIngestor + Send + Sync>;
|
||||||
|
}
|
||||||
|
|
||||||
26
ingestor/src/collections/app_bsky_feed_post.rs
Normal file
26
ingestor/src/collections/app_bsky_feed_post.rs
Normal file
|
|
@ -0,0 +1,26 @@
|
||||||
|
use crate::collections::CommitIngestorState;
|
||||||
|
use rocketman::types::event::Event;
|
||||||
|
use serde_json::Value;
|
||||||
|
use anyhow::Result;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
async fn handle_change(
|
||||||
|
_state: CommitIngestorState,
|
||||||
|
message: Event<Value>
|
||||||
|
) -> Result<()> {
|
||||||
|
info!("{:?}", message);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_delete(
|
||||||
|
_state: CommitIngestorState,
|
||||||
|
message: Event<Value>
|
||||||
|
) -> Result<()> {
|
||||||
|
info!("{:?}", message);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
create_commit_collection!(AppBskyFeedPost, "app.bsky.feed.post".to_string(),
|
||||||
|
change => handle_change,
|
||||||
|
delete => handle_delete,
|
||||||
|
);
|
||||||
30
ingestor/src/collections/my_spoor_log_activity.rs
Normal file
30
ingestor/src/collections/my_spoor_log_activity.rs
Normal file
|
|
@ -0,0 +1,30 @@
|
||||||
|
use crate::collections::CommitIngestorState;
|
||||||
|
use atproto::{
|
||||||
|
Collection as AtprotoCollection,
|
||||||
|
lexicons::my::spoor::log::Activity,
|
||||||
|
};
|
||||||
|
use rocketman::types::event::Event;
|
||||||
|
use serde_json::Value;
|
||||||
|
use anyhow::Result;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
async fn handle_change(
|
||||||
|
_state: CommitIngestorState,
|
||||||
|
message: Event<Value>
|
||||||
|
) -> Result<()> {
|
||||||
|
info!("{:?}", message);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_delete(
|
||||||
|
_state: CommitIngestorState,
|
||||||
|
message: Event<Value>
|
||||||
|
) -> Result<()> {
|
||||||
|
info!("{:?}", message);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
create_commit_collection!(MySpoorLogActivity, Activity::NSID.to_string(),
|
||||||
|
change => handle_change,
|
||||||
|
delete => handle_delete,
|
||||||
|
);
|
||||||
30
ingestor/src/collections/my_spoor_log_session.rs
Normal file
30
ingestor/src/collections/my_spoor_log_session.rs
Normal file
|
|
@ -0,0 +1,30 @@
|
||||||
|
use crate::collections::CommitIngestorState;
|
||||||
|
use atproto::{
|
||||||
|
Collection as AtprotoCollection,
|
||||||
|
lexicons::my::spoor::log::Session,
|
||||||
|
};
|
||||||
|
use rocketman::types::event::Event;
|
||||||
|
use serde_json::Value;
|
||||||
|
use anyhow::Result;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
async fn handle_change(
|
||||||
|
_state: CommitIngestorState,
|
||||||
|
message: Event<Value>
|
||||||
|
) -> Result<()> {
|
||||||
|
info!("{:?}", message);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_delete(
|
||||||
|
_state: CommitIngestorState,
|
||||||
|
message: Event<Value>
|
||||||
|
) -> Result<()> {
|
||||||
|
info!("{:?}", message);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
create_commit_collection!(MySpoorLogSession, Session::NSID.to_string(),
|
||||||
|
change => handle_change,
|
||||||
|
delete => handle_delete,
|
||||||
|
);
|
||||||
53
ingestor/src/ingestor.rs
Normal file
53
ingestor/src/ingestor.rs
Normal file
|
|
@ -0,0 +1,53 @@
|
||||||
|
use crate::collections::Collection;
|
||||||
|
use rocketman::{
|
||||||
|
options::JetstreamOptions,
|
||||||
|
ingestion::LexiconIngestor,
|
||||||
|
connection::JetstreamConnection,
|
||||||
|
handler,
|
||||||
|
};
|
||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
sync::{Arc, Mutex},
|
||||||
|
};
|
||||||
|
use tracing::{info, error};
|
||||||
|
|
||||||
|
pub struct Ingestor {
|
||||||
|
ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Ingestor {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self { ingesters: HashMap::new() }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_collection<C: Collection>(&mut self, collection: C) {
|
||||||
|
self.ingesters.insert(collection.get_nsid(), collection.get_ingestor());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start(self) {
|
||||||
|
info!("Starting ingestor with the following collections: {:?}",
|
||||||
|
self.ingesters.keys());
|
||||||
|
let opts = JetstreamOptions::builder()
|
||||||
|
.wanted_collections(self.ingesters.keys().cloned().collect())
|
||||||
|
.build();
|
||||||
|
let jetstream = JetstreamConnection::new(opts);
|
||||||
|
let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
|
||||||
|
let msg_rx = jetstream.get_msg_rx();
|
||||||
|
let reconnect_tx = jetstream.get_reconnect_tx();
|
||||||
|
|
||||||
|
let cursor_clone = cursor.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Ok(message) = msg_rx.recv_async().await {
|
||||||
|
if let Err(e) = handler::handle_message(message, &self.ingesters,
|
||||||
|
reconnect_tx.clone(), cursor_clone.clone()).await {
|
||||||
|
error!("Error processing message: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if let Err(e) = jetstream.connect(cursor.clone()).await {
|
||||||
|
error!("Failed to connect to Jetstream: {}", e);
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
21
ingestor/src/main.rs
Normal file
21
ingestor/src/main.rs
Normal file
|
|
@ -0,0 +1,21 @@
|
||||||
|
use crate::{
|
||||||
|
collections::{
|
||||||
|
Collection,
|
||||||
|
MySpoorLogActivity, MySpoorLogSession,
|
||||||
|
},
|
||||||
|
ingestor::Ingestor,
|
||||||
|
};
|
||||||
|
|
||||||
|
mod ingestor;
|
||||||
|
mod collections;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
let subscriber = tracing_subscriber::FmtSubscriber::new();
|
||||||
|
let _ = tracing::subscriber::set_global_default(subscriber);
|
||||||
|
|
||||||
|
let mut ingestor = Ingestor::new();
|
||||||
|
ingestor.add_collection(MySpoorLogActivity::new());
|
||||||
|
ingestor.add_collection(MySpoorLogSession::new());
|
||||||
|
ingestor.start().await;
|
||||||
|
}
|
||||||
|
|
@ -1,75 +0,0 @@
|
||||||
use crate::lexicons::my::spoor::log::{Activity, Session};
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use atrium_api::types::Collection;
|
|
||||||
use rocketman::{
|
|
||||||
options::JetstreamOptions,
|
|
||||||
ingestion::LexiconIngestor,
|
|
||||||
// types::event::{Event, Operation},
|
|
||||||
types::event::Event,
|
|
||||||
connection::JetstreamConnection,
|
|
||||||
handler,
|
|
||||||
};
|
|
||||||
use serde_json::value::Value;
|
|
||||||
use std::{
|
|
||||||
collections::HashMap,
|
|
||||||
sync::{Arc, Mutex},
|
|
||||||
};
|
|
||||||
use tracing::{error, info};
|
|
||||||
use anyhow::Result;
|
|
||||||
|
|
||||||
enum Ingestor {
|
|
||||||
Jetstream(SpoorJetstream)
|
|
||||||
}
|
|
||||||
|
|
||||||
struct SpoorJetstream;
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl LexiconIngestor for SpoorJetstream {
|
|
||||||
async fn ingest(&self, message: Event<Value>) -> Result<()> {
|
|
||||||
info!("{:?}", message);
|
|
||||||
// if let Some(commit) = &message.commit {
|
|
||||||
// match commit.operation {
|
|
||||||
// Operation::Create | Operation::Update => {}
|
|
||||||
// Operation::Delete => {}
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// return Err("Message has no commit");
|
|
||||||
// }
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn start_ingestor() {
|
|
||||||
info!("Starting ingestor");
|
|
||||||
let nsids = vec![
|
|
||||||
Activity::NSID.to_string(),
|
|
||||||
Session::NSID.to_string(),
|
|
||||||
];
|
|
||||||
let opts = JetstreamOptions::builder().wanted_collections(nsids.clone()).build();
|
|
||||||
let jetstream = JetstreamConnection::new(opts);
|
|
||||||
|
|
||||||
let mut ingesters: HashMap<String, Box<dyn LexiconIngestor + Send + Sync>> = HashMap::new();
|
|
||||||
for nsid in nsids {
|
|
||||||
ingesters.insert(nsid, Box::new(SpoorJetstream));
|
|
||||||
}
|
|
||||||
|
|
||||||
let cursor: Arc<Mutex<Option<u64>>> = Arc::new(Mutex::new(None));
|
|
||||||
|
|
||||||
let msg_rx = jetstream.get_msg_rx();
|
|
||||||
let reconnect_tx = jetstream.get_reconnect_tx();
|
|
||||||
|
|
||||||
let cursor_clone = cursor.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
while let Ok(message) = msg_rx.recv_async().await {
|
|
||||||
if let Err(e) = handler::handle_message(message, &ingesters,
|
|
||||||
reconnect_tx.clone(), cursor_clone.clone()).await {
|
|
||||||
error!("Error processing message: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
if let Err(e) = jetstream.connect(cursor.clone()).await {
|
|
||||||
error!("Failed to connect to Jetstream: {}", e);
|
|
||||||
std::process::exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue