Thursday, May 13, 2021

RSocket Java Example

Come across RSocket here

RSocket is a binary protocol for use on byte stream transports such as TCP, WebSockets, and Aeron.

It could be a good alternative to REST for microservice communications. 

Let's write some code and try it out. All you need is these libraries in your pom file. Then you are good to go.

        <dependency>
            <groupId>io.rsocket</groupId>
            <artifactId>rsocket-core</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>io.rsocket</groupId>
            <artifactId>rsocket-transport-netty</artifactId>
            <version>1.1.0</version>
        </dependency>

For the demo, we will write a Server that returns the message that is sent by the client.

Here is the Server code.


import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import reactor.core.publisher.Mono;

public class ServerApplication {

    public static final int TCP_PORT = 8910;
    
    public static void main(String[] args) {
        CloseableChannel channel = 
                RSocketServer.create(SocketAcceptor.with(new MyRSocket()))
                             // Enable Zero Copy
                             .payloadDecoder(PayloadDecoder.ZERO_COPY)
                             .bind(TcpServerTransport.create(TCP_PORT))
                             .block();
        if(channel != null) {
            channel.onClose().block();
        }
    }
    
    private static class MyRSocket implements RSocket {
        @Override
        public Mono<Payload%gt; requestResponse(Payload payload) {
            return Mono.just(payload);
        }
    }
    
}

Just start the ServerApplication and let it listen to port 8910.

Now we write the Client code.


import java.nio.ByteBuffer;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;

public class ReqResClient {

    private final RSocket socket;

    public ReqResClient() {
        this.socket = RSocketFactory.connect()
                .transport(TcpClientTransport.create(HOST, TCP_PORT))
                .start()
                .block();
    }

    public ByteBuffer callBlocking(byte[] byteArray) {
        return socket
                .requestResponse(DefaultPayload.create(byteArray))
                .map(Payload::getData)
                .block();
    }
    
    public void dispose() {
        this.socket.dispose();
    }

}

And the class for the object we want to send to the Server. 


import java.io.Serializable;

import lombok.AllArgsConstructor;
import lombok.Data;

/* ISO-99999999 standard user */
@Data
@AllArgsConstructor
public class User implements Serializable {

    private static final long serialVersionUID = -8731669541043173364L;
    private String username;
    private String password;
    
}

Now we run the client.


import java.nio.ByteBuffer;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Application {

    public static final String HOST = "localhost";
    public static final int TCP_PORT = 8910;

    public static void main(String[] args) {

        ReqResClient client = new ReqResClient();
        
        User user = new User("devil", "kazuya");
        try {
            // serializing object to byte array.
            byte[] bytes = SerializationUtils.serialize(user);
            
            log.info("Sending 'user' to server:   ----  {}", user);
            ByteBuffer response = client.callBlocking(bytes);
            
            // deserializing byte array back to object.
            User clone = (User) SerializationUtils.deserialize(response.array());
            log.info("Getting 'user' from server: ----  {}", clone);
            
        } catch (NullPointerException | SerializationException e) {
            log.error("something wrong", e);
        }
        
        client.dispose();
    }

}

For the result, we see some warnings messages. We ignore them for now. We can see your object went to the Server and back in one piece. How nice.

[main] WARN reactor.netty.tcp.TcpResources - [tcp] resources will use the default LoopResources: DefaultLoopResources {prefix=reactor-tcp, daemon=true, selectCount=4, workerCount=4}
[main] WARN reactor.netty.tcp.TcpResources - [tcp] resources will use the default ConnectionProvider: PooledConnectionProvider {name=tcp, poolFactory=reactor.netty.resources.ConnectionProvider$$Lambda$7/13138721@39f31e}
[main] INFO ong.ternchow.rsocketclient.Application - Sending 'user' to server:   ----  User(username=devil, password=kazuya)
[main] INFO ong.ternchow.rsocketclient.Application - Getting 'user' from server: ----  User(username=devil, password=kazuya)

Few thoughts come across my mind.

  • I don't have to parse my object to JSON in order to talk to other microservice. 
  • There will be a lot of Serializing/Deserializing (the good/bad old RMI days). 
  • Sending data in binaries will be faster than text(JSON). 

The Interaction Models of RSocket are very interesting as well. Read more here

Fire-and-Forget

Future<Void> completionSignalOfSend = socketClient.fireAndForget(message);

Request/Response

Future<Payload> response = socketClient.requestResponse(requestPayload);

Request/Stream

Publisher<Payload> response = socketClient.requestStream(requestPayload);

Channel

Publisher<Payload> output = socketClient.requestChannel(Publisher<Payload> input);


We will play with each model in the future. Stay tuned.

---

Source code:

https://github.com/devilkazuya99/rsocketserver 

https://github.com/devilkazuya99/rsocketclient


No comments: