karyon_jsonrpc::server::builder

Struct ServerBuilder

Source
pub struct ServerBuilder<C> {
    config: ServerConfig,
    codec: C,
    executor: Option<Executor>,
}
Expand description

Builder for constructing an RPC Server.

Fields§

§config: ServerConfig§codec: C§executor: Option<Executor>

Implementations§

Source§

impl<C> ServerBuilder<C>
where C: ClonableJsonCodec + 'static,

Source

pub fn new_with_codec( endpoint: impl ToEndpoint, codec: C, ) -> Result<ServerBuilder<C>>

Creates a new ServerBuilder With a custom codec.

This function initializes a ServerBuilder with the specified endpoint and custom codec.

§Example

#[cfg(feature = "ws")]
use async_tungstenite::tungstenite::Message;
use serde_json::Value;
#[cfg(feature = "ws")]
use karyon_jsonrpc::codec::{WebSocketCodec, WebSocketDecoder, WebSocketEncoder};
use karyon_jsonrpc::{server::ServerBuilder, codec::{Codec, Decoder, Encoder, }, error::{Error, Result}};


#[derive(Clone)]
pub struct CustomJsonCodec {}

impl Codec for CustomJsonCodec {
    type Message = serde_json::Value;
    type Error = Error;
}

#[cfg(feature = "ws")]
impl WebSocketCodec for CustomJsonCodec {
    type Message = serde_json::Value;
    type Error = Error;
}

impl Encoder for CustomJsonCodec {
    type EnMessage = serde_json::Value;
    type EnError = Error;
    fn encode(&self, src: &Self::EnMessage, dst: &mut [u8]) -> Result<usize> {
        let msg = match serde_json::to_string(src) {
            Ok(m) => m,
            Err(err) => return Err(Error::Encode(err.to_string())),
        };
        let buf = msg.as_bytes();
        dst[..buf.len()].copy_from_slice(buf);
        Ok(buf.len())
    }
}

impl Decoder for CustomJsonCodec {
    type DeMessage = serde_json::Value;
    type DeError = Error;
    fn decode(&self, src: &mut [u8]) -> Result<Option<(usize, Self::DeMessage)>> {
        let de = serde_json::Deserializer::from_slice(src);
        let mut iter = de.into_iter::<serde_json::Value>();

        let item = match iter.next() {
            Some(Ok(item)) => item,
            Some(Err(ref e)) if e.is_eof() => return Ok(None),
            Some(Err(e)) => return Err(Error::Decode(e.to_string())),
            None => return Ok(None),
        };

        Ok(Some((iter.byte_offset(), item)))
    }
}


#[cfg(feature = "ws")]
impl WebSocketEncoder for CustomJsonCodec {
    type EnMessage = serde_json::Value;
    type EnError = Error;

    fn encode(&self, src: &Self::EnMessage) -> Result<Message> {
        let msg = match serde_json::to_string(src) {
            Ok(m) => m,
            Err(err) => return Err(Error::Encode(err.to_string())),
        };
        Ok(Message::Text(msg))
    }
}

#[cfg(feature = "ws")]
impl WebSocketDecoder for CustomJsonCodec {
    type DeMessage = serde_json::Value;
    type DeError = Error;
    fn decode(&self, src: &Message) -> Result<Option<Self::DeMessage>> {
         match src {
             Message::Text(s) => match serde_json::from_str(s) {
                 Ok(m) => Ok(Some(m)),
                 Err(err) => Err(Error::Decode(err.to_string())),
             },
             Message::Binary(s) => match serde_json::from_slice(s) {
                 Ok(m) => Ok(m),
                 Err(err) => Err(Error::Decode(err.to_string())),
             },
             Message::Close(_) => Err(Error::IO(std::io::ErrorKind::ConnectionAborted.into())),
             m => Err(Error::Decode(format!(
                 "Receive unexpected message: {:?}",
                 m
             ))),
         }
     }
}

async {
    let server = ServerBuilder::new_with_codec("tcp://127.0.0.1:3000", CustomJsonCodec{})
        .expect("Create a new server builder")
        .build().await
        .expect("Build the server");
};
Source

pub fn service(self, service: Arc<dyn RPCService>) -> Self

Adds a new RPC service to the server.

§Example
use std::sync::Arc;

use serde_json::Value;

use karyon_jsonrpc::{rpc_impl, error::RPCError, server::ServerBuilder};

struct Ping {}

#[rpc_impl]
impl Ping {
    async fn ping(&self, _params: Value) -> Result<Value, RPCError> {
        Ok(serde_json::json!("Pong"))
    }
}

async {
    let server = ServerBuilder::new("ws://127.0.0.1:3000")
        .expect("Create a new server builder")
        .service(Arc::new(Ping{}))
        .build().await
        .expect("Build the server");
};
Source

pub fn pubsub_service(self, service: Arc<dyn PubSubRPCService>) -> Self

Adds a new PubSub RPC service to the server.

§Example
use std::sync::Arc;

use serde_json::Value;

use karyon_jsonrpc::{
    rpc_impl, rpc_pubsub_impl, error::RPCError, message::SubscriptionID,
    server::{ServerBuilder, Channel},
};

struct Ping {}

#[rpc_impl]
impl Ping {
    async fn ping(&self, _params: Value) -> Result<Value, RPCError> {
        Ok(serde_json::json!("Pong"))
    }
}

#[rpc_pubsub_impl]
impl Ping {
   async fn log_subscribe(
        &self,
        chan: Arc<Channel>,
        method: String,
        _params: Value,
    ) -> Result<Value, RPCError> {
        let sub = chan.new_subscription(&method).await;
        let sub_id = sub.id.clone();
        Ok(serde_json::json!(sub_id))
    }

    async fn log_unsubscribe(
        &self,
        chan: Arc<Channel>,
        _method: String,
        params: Value,
    ) -> Result<Value, RPCError> {
        let sub_id: SubscriptionID = serde_json::from_value(params)?;
        chan.remove_subscription(&sub_id).await;
        Ok(serde_json::json!(true))
    }
}

async {
    let ping_service = Arc::new(Ping{});
    let server = ServerBuilder::new("ws://127.0.0.1:3000")
        .expect("Create a new server builder")
        .service(ping_service.clone())
        .pubsub_service(ping_service)
        .build().await
        .expect("Build the server");
};
Source

pub fn tcp_config(self, config: TcpConfig) -> Result<ServerBuilder<C>>

Configure TCP settings for the server.

§Example
use karyon_jsonrpc::{server::ServerBuilder, net::TcpConfig};

async {
    let tcp_config = TcpConfig::default();
    let server = ServerBuilder::new("ws://127.0.0.1:3000")
        .expect("Create a new server builder")
        .tcp_config(tcp_config)
        .expect("Add tcp config")
        .build().await
        .expect("Build the server");
};

This function will return an error if the endpoint does not support TCP protocols.

Source

pub async fn with_executor(self, ex: Executor) -> Self

With an executor.

Source

pub async fn build(self) -> Result<Arc<Server>>

Builds the server with the configured options.

Source§

impl ServerBuilder<JsonCodec>

Source

pub fn new(endpoint: impl ToEndpoint) -> Result<ServerBuilder<JsonCodec>>

Creates a new ServerBuilder

This function initializes a ServerBuilder with the specified endpoint.

§Example
use karyon_jsonrpc::server::ServerBuilder;

async {
    let server = ServerBuilder::new("ws://127.0.0.1:3000")
        .expect("Create a new server builder")
        .build().await
        .expect("Build the server");
};

Auto Trait Implementations§

§

impl<C> Freeze for ServerBuilder<C>
where C: Freeze,

§

impl<C> !RefUnwindSafe for ServerBuilder<C>

§

impl<C> Send for ServerBuilder<C>
where C: Send,

§

impl<C> Sync for ServerBuilder<C>
where C: Sync,

§

impl<C> Unpin for ServerBuilder<C>
where C: Unpin,

§

impl<C> !UnwindSafe for ServerBuilder<C>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more