diff --git a/xxdk-rs/Cargo.lock b/xxdk-rs/Cargo.lock index 934f68d7b917353c81d0e1796fb464e9b65e6b43..1bd264f5e6479af932b287b88e8bbab2dd5ce9a7 100644 --- a/xxdk-rs/Cargo.lock +++ b/xxdk-rs/Cargo.lock @@ -566,6 +566,12 @@ dependencies = [ "either", ] +[[package]] +name = "itoa" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" + [[package]] name = "js-sys" version = "0.3.69" @@ -898,6 +904,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "ryu" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" + [[package]] name = "serde" version = "1.0.202" @@ -918,6 +930,17 @@ dependencies = [ "syn 2.0.53", ] +[[package]] +name = "serde_json" +version = "1.0.120" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -1431,6 +1454,7 @@ dependencies = [ "lazy_static", "libc", "serde", + "serde_json", "tokio", "tower", "tracing", diff --git a/xxdk-rs/rpc-example/src/lib.rs b/xxdk-rs/rpc-example/src/lib.rs index 0e09ad52f73482bacdc13a25ad3035489757cd4a..d77eb574f3c30b20659259543629b8d64e28a4b1 100644 --- a/xxdk-rs/rpc-example/src/lib.rs +++ b/xxdk-rs/rpc-example/src/lib.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use base64::prelude::*; use structopt::StructOpt; use xxdk::base::*; -use xxdk::service::*; +use xxdk::service::{CMixServerConfig, SenderId, Utf8Lossy}; const SECRET: &str = "Hello"; const REGISTRATION_CODE: &str = ""; @@ -56,17 +56,16 @@ pub async fn run() -> Result<(), String> { private_key: String::from(""), }; - let xx_router = xxdk::service::Router::new(Arc::new(cmix)).route("demo", xx_rpc_handler); - CMixServer::serve(xx_router, cmix_config).await + let xx_router = xxdk::service::Router::with_state(Arc::new(cmix)).route("demo", xx_rpc_handler); + xxdk::service::serve(xx_router, cmix_config).await } -pub async fn xx_rpc_handler(_: Arc<CMix>, request: IncomingRequest) -> Result<Vec<u8>, String> { - let sender: String = request.sender_id.iter().fold(String::new(), |mut s, b| { +pub async fn xx_rpc_handler(id: SenderId, req: Utf8Lossy) -> String { + let sender: String = id.0.iter().fold(String::new(), |mut s, b| { write!(s, "{b:02x}").unwrap(); s }); tracing::info!(sender, "Received message via cMix",); - let text = String::from_utf8_lossy(&request.request); - - Ok(format!("Hi from rust rpc example! Echoed message: {text}").into_bytes()) + let text = req.0; + format!("Hi from rust rpc example! Echoed message: {text}") } diff --git a/xxdk-rs/xxdk/Cargo.toml b/xxdk-rs/xxdk/Cargo.toml index 0e90483aa46d4ea0cd7ec26a65d94c858fb39677..6062a6623a347a32de867746b690f01bb0bf1051 100644 --- a/xxdk-rs/xxdk/Cargo.toml +++ b/xxdk-rs/xxdk/Cargo.toml @@ -10,6 +10,7 @@ base64 = "0.22.1" lazy_static = "1.4.0" libc = "0.2.153" serde = { version = "1.0.202", features = ["derive"] } +serde_json = "1.0.120" tokio = { version = "1.37.0", features = ["rt", "fs", "sync", "time"] } tower = "0.4.13" tracing = "0.1.40" diff --git a/xxdk-rs/xxdk/src/service.rs b/xxdk-rs/xxdk/src/service.rs index 8e34d459d8cc03f4b1d4b974ae06efecc50c1ef1..b5e068de0a296969135a4e40ed96e50a01e21de4 100644 --- a/xxdk-rs/xxdk/src/service.rs +++ b/xxdk-rs/xxdk/src/service.rs @@ -1,131 +1,367 @@ -use base64::prelude::*; -use serde::Deserialize; +use std::borrow::Cow; use std::collections::HashMap; -use std::future::{self, poll_fn, Future}; +use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use std::task::Poll; +use std::task::{Context, Poll}; use std::time::Duration; -use tokio::sync::mpsc; + +use base64::prelude::*; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use serde_json as json; use tower::Service; -use crate::base; -use crate::rpc; +use crate::{base, rpc}; + +pub type PinnedFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>; #[derive(Debug, Clone)] pub struct IncomingRequest { - pub sender_id: Vec<u8>, - pub request: Vec<u8>, + sender_id: Vec<u8>, + request: Vec<u8>, + separator_idx: usize, } -#[derive(Debug, Clone)] -struct Response { - pub text: Vec<u8>, + +impl IncomingRequest { + fn new(sender_id: Vec<u8>, request: Vec<u8>) -> Result<Self, String> { + let separator_idx = request + .iter() + .position(|b| *b == b',') + .ok_or_else(|| "no endpoint in request".to_string())?; + + std::str::from_utf8(&request[..separator_idx]) + .map_err(|e| format!("non-UTF-8 endpoint: {e}"))?; + + Ok(Self { + sender_id, + request, + separator_idx, + }) + } + + pub fn sender_id(&self) -> &[u8] { + &self.sender_id + } + + pub fn endpoint(&self) -> &str { + unsafe { std::str::from_utf8_unchecked(&self.request[..self.separator_idx]) } + } + + pub fn request(&self) -> &[u8] { + &self.request[self.separator_idx + 1..] + } } -type HandlerFnInner<T> = dyn Fn( - Arc<T>, - IncomingRequest, - ) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, String>> + Send + 'static>> - + Send - + Sync - + 'static; +// TODO If we're a bit more careful about it, we can probably get rid of the Sync bound here +pub trait Handler<T, S, Res>: Clone + Send + Sync + Sized + 'static { + fn call(self, req: IncomingRequest, state: S) -> PinnedFuture<Result<Vec<u8>, String>>; +} -pub trait HandlerFn<T>: - Fn(Arc<T>, IncomingRequest) -> Self::Future + Send + Sync + 'static -{ - type Future: Future<Output = Result<Vec<u8>, String>> + Send + 'static; +macro_rules! impl_handler { + ($($ty:ident),*) => { + impl<F, Fut, S, Res, $($ty),*> Handler<($($ty,)*), S, Res> for F + where + F: FnOnce($($ty),*) -> Fut + Clone + Send + Sync + Sized + 'static, + Fut: Future<Output = Res> + Send + 'static, + S: Send + 'static, + Res: IntoResponse, + $( + $ty: FromRequest<S>, + )* + { + #[allow(non_snake_case, unused_variables)] + fn call(self, req: IncomingRequest, state: S) -> PinnedFuture<Result<Vec<u8>, String>> { + Box::pin(async move { + $( + let $ty = $ty::extract(&req, &state)?; + )* + self($($ty),*).await.into_response() + }) + } + } + }; +} + +macro_rules! tuples { + ($name:ident) => { + $name!(); + $name!(T1); + $name!(T1, T2); + $name!(T1, T2, T3); + $name!(T1, T2, T3, T4); + $name!(T1, T2, T3, T4, T5); + $name!(T1, T2, T3, T4, T5, T6); + $name!(T1, T2, T3, T4, T5, T6, T7); + $name!(T1, T2, T3, T4, T5, T6, T7, T8); + $name!(T1, T2, T3, T4, T5, T6, T7, T8, T9); + $name!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10); + $name!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11); + $name!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12); + $name!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13); + $name!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14); + $name!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15); + $name!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16); + }; } -impl<T, F, Fut> HandlerFn<T> for F +tuples!(impl_handler); + +trait ErasedHandler<S>: Send + Sync + 'static { + fn call(&self, req: IncomingRequest, state: S) -> PinnedFuture<Result<Vec<u8>, String>>; +} + +struct MakeErasedHandler<H, S> { + handler: H, + #[allow(clippy::type_complexity)] + call: fn(H, IncomingRequest, S) -> PinnedFuture<Result<Vec<u8>, String>>, +} + +impl<H, S> ErasedHandler<S> for MakeErasedHandler<H, S> where - F: Fn(Arc<T>, IncomingRequest) -> Fut + Send + Sync + 'static, - Fut: Future<Output = Result<Vec<u8>, String>> + Send + 'static, + H: Clone + Send + Sync + 'static, + S: 'static, { - type Future = Fut; + fn call(&self, req: IncomingRequest, state: S) -> PinnedFuture<Result<Vec<u8>, String>> { + let h = self.handler.clone(); + (self.call)(h, req, state) + } } -pub struct Router<T> { - handlers: HashMap<String, Arc<HandlerFnInner<T>>>, - state: Arc<T>, +impl<H, S> MakeErasedHandler<H, S> { + fn make<T, Res>(handler: H) -> Self + where + H: Handler<T, S, Res>, + { + let call = |h: H, req, state| h.call(req, state); + Self { handler, call } + } } -// Manual implementation to avoid derive putting a Clone bound on T -impl<T> Clone for Router<T> { - fn clone(&self) -> Self { - Self { - handlers: self.handlers.clone(), - state: self.state.clone(), - } +type BoxedErasedHandler<S> = Arc<dyn ErasedHandler<S>>; + +#[derive(Clone)] +pub struct Router<S> { + inner: Arc<RouterInner<S>>, +} + +#[derive(Clone)] +struct RouterInner<S> { + handlers: HashMap<String, BoxedErasedHandler<S>>, + state: S, +} + +impl Router<()> { + pub fn without_state() -> Self { + Self::with_state(()) } } -impl<T> Router<T> { - pub fn new(state: Arc<T>) -> Self { - Self { - handlers: HashMap::new(), - state, - } +impl<S> Router<S> +where + S: Send + Clone + 'static, +{ + pub fn with_state(state: S) -> Self { + let handlers = HashMap::new(); + let inner = Arc::new(RouterInner { handlers, state }); + Self { inner } } - pub fn route<F>(mut self, endpoint: &str, handler: F) -> Self + pub fn route<H, T, Res>(self, endpoint: &str, handler: H) -> Self where - F: HandlerFn<T>, + H: Handler<T, S, Res>, { - self.handlers.insert( - endpoint.to_string(), - Arc::new(move |state, req| Box::pin(handler(state, req))), - ); - self + let handler = Arc::new(MakeErasedHandler::make(handler)); + self.with_inner(|inner| { + inner.handlers.insert(String::from(endpoint), handler); + }) + } + + fn with_inner<F>(self, f: F) -> Self + where + F: FnOnce(&mut RouterInner<S>), + { + let mut inner = self.into_inner(); + f(&mut inner); + Self { + inner: Arc::new(inner), + } + } + + fn into_inner(self) -> RouterInner<S> { + match Arc::try_unwrap(self.inner) { + Ok(inner) => inner, + Err(arc) => RouterInner::clone(&*arc), + } } } -impl<T> Service<IncomingRequest> for Router<T> +impl<S> Service<IncomingRequest> for Router<S> where - T: Send + Sync + 'static, + S: Clone + Send + 'static, { type Response = Vec<u8>; type Error = String; - type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>; + type Future = PinnedFuture<Result<Vec<u8>, String>>; - fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } fn call(&mut self, req: IncomingRequest) -> Self::Future { - // TODO All this manual matching on Results is gross, see if we can find a cleaner way to - // do this - let separator_idx = match req - .request - .iter() - .position(|b| *b == b',') - .ok_or_else(|| "no endpoint in request".to_string()) - { - Ok(idx) => idx, - Err(e) => return Box::pin(future::ready(Err(e))), + let endpoint = req.endpoint(); + let handler = match self.inner.handlers.get(req.endpoint()) { + Some(h) => h, + None => { + return Box::pin(std::future::ready(Err(format!( + "unrecognized endpoint `{endpoint}`" + )))) + } }; - let (endpoint, request) = req.request.split_at(separator_idx); - - let endpoint = - match std::str::from_utf8(endpoint).map_err(|e| format!("non-UTF-8 endpoint ({e})")) { - Ok(endpoint) => endpoint, - Err(e) => return Box::pin(future::ready(Err(e))), - }; - - if let Some(handler) = self.handlers.get(endpoint) { - let (_, request) = request.split_first().unwrap(); - let req = IncomingRequest { - request: request.into(), - sender_id: req.sender_id, - }; - - handler(self.state.clone(), req) - } else { - Box::pin(future::ready(Err(format!( - "unrecognized endpoint `{endpoint}`" - )))) - } + + let state = self.inner.state.clone(); + handler.call(req, state) + } +} + +// TODO We can put a lifetime parameter on this to allow borrowing directly from the request buffer +pub trait FromRequest<S>: Sized { + fn extract(req: &IncomingRequest, state: &S) -> Result<Self, String>; +} + +pub trait IntoResponse { + fn into_response(self) -> Result<Vec<u8>, String>; +} + +impl<R> IntoResponse for Result<R, String> +where + R: IntoResponse, +{ + fn into_response(self) -> Result<Vec<u8>, String> { + self.and_then(|r| r.into_response()) + } +} + +impl IntoResponse for () { + fn into_response(self) -> Result<Vec<u8>, String> { + Ok(Vec::new()) + } +} + +impl IntoResponse for Vec<u8> { + fn into_response(self) -> Result<Vec<u8>, String> { + Ok(self) + } +} + +impl IntoResponse for &[u8] { + fn into_response(self) -> Result<Vec<u8>, String> { + Ok(Vec::from(self)) + } +} + +impl<const N: usize> IntoResponse for [u8; N] { + fn into_response(self) -> Result<Vec<u8>, String> { + Ok(Vec::from(&self)) + } +} + +impl IntoResponse for Cow<'_, [u8]> { + fn into_response(self) -> Result<Vec<u8>, String> { + Ok(self.into_owned()) + } +} + +impl IntoResponse for String { + fn into_response(self) -> Result<Vec<u8>, String> { + Ok(self.into_bytes()) + } +} + +impl IntoResponse for &str { + fn into_response(self) -> Result<Vec<u8>, String> { + Ok(Vec::from(self.as_bytes())) + } +} + +impl IntoResponse for Cow<'_, str> { + fn into_response(self) -> Result<Vec<u8>, String> { + Ok(self.into_owned().into_bytes()) + } +} + +#[derive(Debug, Clone, Copy)] +pub struct Json<T>(pub T); + +impl<T, S> FromRequest<S> for Json<T> +where + T: DeserializeOwned, +{ + fn extract(req: &IncomingRequest, _state: &S) -> Result<Self, String> { + Ok(Self( + json::from_slice(req.request()).map_err(|e| e.to_string())?, + )) + } +} + +impl<T> IntoResponse for Json<T> +where + T: Serialize, +{ + fn into_response(self) -> Result<Vec<u8>, String> { + json::to_vec(&self.0).map_err(|e| e.to_string()) + } +} + +#[derive(Debug, Clone)] +pub struct SenderId(pub Vec<u8>); + +impl<S> FromRequest<S> for SenderId { + fn extract(req: &IncomingRequest, _state: &S) -> Result<Self, String> { + Ok(Self(req.sender_id.clone())) + } +} + +#[derive(Debug, Clone)] +pub struct RawRequest(pub Vec<u8>); + +impl<S> FromRequest<S> for RawRequest { + fn extract(req: &IncomingRequest, _state: &S) -> Result<Self, String> { + Ok(Self(Vec::from(req.request()))) + } +} + +#[derive(Debug, Clone)] +pub struct Utf8(pub String); + +impl<S> FromRequest<S> for Utf8 { + fn extract(req: &IncomingRequest, _state: &S) -> Result<Self, String> { + Ok(Self(String::from( + std::str::from_utf8(req.request()).map_err(|e| e.to_string())?, + ))) + } +} + +#[derive(Debug, Clone)] +pub struct Utf8Lossy(pub String); + +impl<S> FromRequest<S> for Utf8Lossy { + fn extract(req: &IncomingRequest, _state: &S) -> Result<Self, String> { + Ok(Self(String::from_utf8_lossy(req.request()).into_owned())) + } +} + +#[derive(Debug, Clone)] +pub struct State<S>(pub S); + +impl<S> FromRequest<S> for State<S> +where + S: Clone, +{ + fn extract(_req: &IncomingRequest, state: &S) -> Result<Self, String> { + Ok(Self(state.clone())) } } @@ -138,195 +374,173 @@ pub struct CMixServerConfig { pub private_key: String, } -#[derive(Debug)] -pub struct CMixServer; - -impl CMixServer { - pub async fn serve<T>(router: Router<T>, config: CMixServerConfig) -> Result<(), String> - where - T: Send + Sync + 'static, - { - tracing::info!("Starting cMix server"); - let ndf_contents = tokio::fs::read_to_string(&config.ndf_path) - .await - .map_err(|e| e.to_string())?; - - if tokio::fs::read_dir(&config.storage_dir).await.is_err() { - let storage_dir = config.storage_dir.clone(); - let secret = config.secret.clone(); - tokio::task::spawn_blocking(move || { - tracing::info!("Creating storage directory"); - base::CMix::create(&ndf_contents, &storage_dir, secret.as_bytes(), "") - }) - .await - .map_err(|e| e.to_string())??; - } +pub async fn serve<S>(service: S, config: CMixServerConfig) -> Result<(), String> +where + S: Service<IncomingRequest, Response = Vec<u8>, Error = String> + Clone + Send + 'static, +{ + tracing::info!("Starting cMix server"); + let ndf_contents = tokio::fs::read_to_string(&config.ndf_path) + .await + .map_err(|e| e.to_string())?; + if tokio::fs::read_dir(&config.storage_dir).await.is_err() { let storage_dir = config.storage_dir.clone(); let secret = config.secret.clone(); - let cmix = tokio::task::spawn_blocking(move || { - tracing::info!("Loading storage directory"); - base::CMix::load(&storage_dir, secret.as_bytes(), &[]) + tokio::task::spawn_blocking(move || { + tracing::info!("Creating storage directory"); + base::CMix::create(&ndf_contents, &storage_dir, secret.as_bytes(), "") }) .await .map_err(|e| e.to_string())??; + } - // Reception ID Load or Generate - let mut reception_id_b64 = config.reception_id.clone(); - let reception_id: Vec<u8>; - if reception_id_b64.is_empty() { - match cmix.ekv_get("rpc_server_reception_id") { - Ok(r) => { - tracing::info!("Loaded Reception ID From EKV..."); - reception_id = r; - } - Err(_) => { - tracing::info!("Generating Random Reception ID..."); - reception_id = rpc::generate_reception_id(&cmix)?; - } + let storage_dir = config.storage_dir.clone(); + let secret = config.secret.clone(); + let cmix = tokio::task::spawn_blocking(move || { + tracing::info!("Loading storage directory"); + base::CMix::load(&storage_dir, secret.as_bytes(), &[]) + }) + .await + .map_err(|e| e.to_string())??; + + // Reception ID Load or Generate + let mut reception_id_b64 = config.reception_id.clone(); + let reception_id: Vec<u8>; + if reception_id_b64.is_empty() { + match cmix.ekv_get("rpc_server_reception_id") { + Ok(r) => { + tracing::info!("Loaded Reception ID From EKV..."); + reception_id = r; } - } else { - match BASE64_STANDARD_NO_PAD.decode(reception_id_b64) { - Ok(r) => { - tracing::info!("Loaded Reception ID From config..."); - reception_id = r; - } - Err(e) => { - panic!("{}", e); - } + Err(_) => { + tracing::info!("Generating Random Reception ID..."); + reception_id = rpc::generate_reception_id(&cmix)?; } } - reception_id_b64 = BASE64_STANDARD_NO_PAD.encode(&reception_id); - tracing::info!("RPC Reception ID: {reception_id_b64}"); - cmix.ekv_set("rpc_server_reception_id", &reception_id)?; - - // Private Key Load or Generate - let private_key_b64 = config.private_key.clone(); - let private_key: Vec<u8>; - if private_key_b64.is_empty() { - match cmix.ekv_get("rpc_server_private_key") { - Ok(r) => { - private_key = r; - tracing::info!("Loaded Private Key From EKV..."); - } - Err(_) => { - private_key = rpc::generate_random_key(&cmix)?; - tracing::info!("Generating Random Private Key..."); - } + } else { + match BASE64_STANDARD_NO_PAD.decode(reception_id_b64) { + Ok(r) => { + tracing::info!("Loaded Reception ID From config..."); + reception_id = r; } - } else { - match BASE64_STANDARD_NO_PAD.decode(private_key_b64) { - Ok(r) => { - tracing::info!("Loaded Private Key From config..."); - private_key = r; - } - Err(e) => { - panic!("{}", e); - } + Err(e) => { + panic!("{}", e); } } - let public_key = rpc::derive_public_key(&private_key)?; - let public_key_b64 = BASE64_STANDARD_NO_PAD.encode(public_key); - tracing::info!("RPC Public Key: {public_key_b64}"); - cmix.ekv_set("rpc_server_private_key", &private_key)?; - - let runtime = tokio::runtime::Handle::current(); - let (sender, mut response_queue) = mpsc::channel(256); - let cbs = CMixServerCallbacks { - router, - runtime, - response_queue: sender, - }; - - tracing::info!("Spawning RPC server"); - base::callbacks::set_rpc_callbacks(); - let rpc_server = rpc::new_server(&cmix, cbs, reception_id, private_key)?; - - let cmix = Arc::new(cmix); - tokio::task::spawn_blocking({ - let cmix = cmix.clone(); - move || { - tracing::info!("Starting network follower"); - cmix.start_network_follower(5000)?; - while let Err(e) = cmix.wait_for_network(20000) { - tracing::info!("Waiting to connect to network: {e}"); - } - Ok::<_, String>(()) + } + reception_id_b64 = BASE64_STANDARD_NO_PAD.encode(&reception_id); + tracing::info!("RPC Reception ID: {reception_id_b64}"); + cmix.ekv_set("rpc_server_reception_id", &reception_id)?; + + // Private Key Load or Generate + let private_key_b64 = config.private_key.clone(); + let private_key: Vec<u8>; + if private_key_b64.is_empty() { + match cmix.ekv_get("rpc_server_private_key") { + Ok(r) => { + private_key = r; + tracing::info!("Loaded Private Key From EKV..."); + } + Err(_) => { + private_key = rpc::generate_random_key(&cmix)?; + tracing::info!("Generating Random Private Key..."); } - }) - .await - .map_err(|e| e.to_string())??; - - tracing::info!("Waiting until ready to send"); - while !cmix.ready_to_send() { - tokio::time::sleep(Duration::from_secs(1)).await; } - - rpc_server.start(); - - tracing::info!( - "RPC Server CB PTR: {:#x}", - rpc_server.cb as *const _ as *const libc::c_void as usize - ); - tracing::info!("RPC Server Started"); - tracing::info!("RPC Public Key: {public_key_b64}"); - tracing::info!("RPC Reception ID: {reception_id_b64}"); - - while let Some(resp) = response_queue.recv().await { - tokio::spawn(async move { - tracing::debug!("request received, sending response"); - tracing::debug!("{}", String::from_utf8_lossy(&resp.text)); - }); + } else { + match BASE64_STANDARD_NO_PAD.decode(private_key_b64) { + Ok(r) => { + tracing::info!("Loaded Private Key From config..."); + private_key = r; + } + Err(e) => { + panic!("{}", e); + } + } + } + let public_key = rpc::derive_public_key(&private_key)?; + let public_key_b64 = BASE64_STANDARD_NO_PAD.encode(public_key); + tracing::info!("RPC Public Key: {public_key_b64}"); + cmix.ekv_set("rpc_server_private_key", &private_key)?; + + let runtime = tokio::runtime::Handle::current(); + let cbs = CMixServerCallback { service, runtime }; + + let cmix = Arc::new(cmix); + tokio::task::spawn_blocking({ + let cmix = cmix.clone(); + move || { + tracing::info!("Starting network follower"); + cmix.start_network_follower(5000)?; + while let Err(e) = cmix.wait_for_network(20000) { + tracing::info!("Waiting to connect to network: {e}"); + } + Ok::<_, String>(()) } + }) + .await + .map_err(|e| e.to_string())??; - // rpc_server.stop(); - cmix.stop_network_follower() + tracing::info!("Waiting until ready to send"); + while !cmix.ready_to_send() { + tokio::time::sleep(Duration::from_secs(1)).await; } -} -unsafe impl Send for CMixServer {} + tracing::info!("Spawning RPC server"); + base::callbacks::set_rpc_callbacks(); + let rpc_server = rpc::new_server(&cmix, cbs, reception_id, private_key)?; + rpc_server.start(); + tracing::info!( + "RPC Server CB PTR: {:#x}", + rpc_server.cb as *const _ as *const libc::c_void as usize + ); + tracing::info!("RPC Server Started"); + tracing::info!("RPC Public Key: {public_key_b64}"); + tracing::info!("RPC Reception ID: {reception_id_b64}"); + + // TODO We need a better way to shut down the server. This never actually completes or gets + // past this line, it just runs until the process gets a kill signal. + std::future::pending::<()>().await; + + rpc_server.stop(); + cmix.stop_network_follower() +} -struct CMixServerCallbacks<T> { - router: Router<T>, +struct CMixServerCallback<S> { + service: S, runtime: tokio::runtime::Handle, - response_queue: mpsc::Sender<Response>, } -impl<T> rpc::ServerCallback for CMixServerCallbacks<T> +impl<S> rpc::ServerCallback for CMixServerCallback<S> where - T: Send + Sync + 'static, + S: Service<IncomingRequest, Response = Vec<u8>, Error = String> + Clone + Send + 'static, { - #[tracing::instrument(level = "debug", skip(self))] fn serve_req(&self, sender_id: Vec<u8>, request: Vec<u8>) -> Vec<u8> { - let mut router = self.router.clone(); - let response_queue = self.response_queue.clone(); - let req = IncomingRequest { sender_id, request }; - let r = self.runtime.block_on(async { - let ret: Vec<u8>; - tracing::debug!("Evaluating router on request"); - if poll_fn(|cx| router.poll_ready(cx)).await.is_ok() { - match router.call(req).await { - Err(e) => { - tracing::warn!(error = e, "Error in servicing request"); - ret = e.into_bytes(); - } - Ok(resp) => { - if response_queue - .send(Response { text: resp.clone() }) - .await - .is_err() - { - tracing::warn!("couldn't send to queue"); - }; - ret = resp; - } - } + let mut service = self.service.clone(); + let res: Result<Vec<u8>, String> = self.runtime.block_on(async move { + tracing::debug!("evaluating service on request"); + if std::future::poll_fn(|cx| service.poll_ready(cx)) + .await + .is_ok() + { + let req = IncomingRequest::new(sender_id, request)?; + service.call(req).await } else { - ret = String::from("error unable to service request").into_bytes(); + Err("unable to service request".to_string()) } - ret }); - tracing::warn!("sending response: {}", String::from_utf8_lossy(&r)); - return r; + + let res = match res { + Ok(bytes) => bytes, + Err(text) => { + tracing::warn!(error = text, "error servicing request"); + text.into_bytes() + } + }; + + tracing::info!( + res = String::from_utf8_lossy(&res).as_ref(), + "sending response" + ); + res } }