Initial basic ping pong websocket app in vertx.

This commit is contained in:
Tomasz Półgrabia 2024-08-01 16:28:09 +02:00
parent 321c890178
commit b0c8930568
13 changed files with 654 additions and 0 deletions

View file

@ -0,0 +1,21 @@
plugins {
id("java")
}
group = "ch.polgrabia.demos"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation("io.vertx:vertx-core:4.5.9")
implementation("ch.qos.logback:logback-classic:1.5.6")
testImplementation(platform("org.junit:junit-bom:5.10.0"))
testImplementation("org.junit.jupiter:junit-jupiter")
}
tasks.test {
useJUnitPlatform()
}

View file

@ -0,0 +1,18 @@
package ch.polgrabia.demos.client;
import io.vertx.core.Vertx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebsocketClientApp {
private static final Logger logger = LoggerFactory.getLogger(WebsocketClientApp.class);
public static void main(String[] args) {
var vertx = Vertx.vertx();
logger.info("Deploying websocket particle");
WebsocketClientParticle websocketClientParticle = new WebsocketClientParticle("localhost", 8080, "/");
vertx.deployVerticle(websocketClientParticle);
logger.info("Deployed websocket particle");
websocketClientParticle.waitingUntilFinished();
vertx.close();
}
}

View file

@ -0,0 +1,123 @@
package ch.polgrabia.demos.client;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class WebsocketClientParticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(WebsocketClientParticle.class);
private final String hostname;
private final int port;
private final String path;
private HttpClient client;
private boolean isShuttingDownActivated = false;
private final Object runningMonitor = new Object();
private ByteArrayOutputStream clientTextInput;
private WebSocket webSocket;
public WebsocketClientParticle(String hostname, int port, String path) {
this.hostname = hostname;
this.port = port;
this.path = path;
}
@Override
public void start() throws Exception {
this.client = getVertx()
.createHttpClient();
// TODO refactor it as it's deprecated
client.webSocket(port, hostname, path, this::handleMessage);
}
private void handleMessage(AsyncResult<WebSocket> webSocketAsyncResult) {
logger.info("Handling websocket");
if (!webSocketAsyncResult.succeeded()) {
logger.warn("Websocket handler connection failed", webSocketAsyncResult.cause());
try {
stop();
} catch (Exception e) {
logger.error("Failed to stop particle", e);
}
return;
}
this.clientTextInput = new ByteArrayOutputStream();
vertx.setPeriodic(100, this::handleClientTextInput);
webSocket = webSocketAsyncResult.result();
webSocket
.textMessageHandler(this::handleText)
.closeHandler(this::stopHandler);
}
private void handleClientTextInput(Long ms) {
try {
int available = System.in.available();
if (available > 0) {
byte[] clientPayload = System.in.readNBytes(available);
this.clientTextInput.write(clientPayload);
int idx = newLineIndex(clientPayload);
if (idx > 0) {
String payload = clientTextInput.toString(StandardCharsets.UTF_8);
int nidx = payload.indexOf('\n');
String textMsgToBeSent = payload.substring(0, nidx);
webSocket.writeTextMessage(textMsgToBeSent);
clientTextInput = new ByteArrayOutputStream();
clientTextInput.write(payload.substring(idx).getBytes(StandardCharsets.UTF_8));
}
}
} catch (IOException e) {
logger.error("Failed to read client input", e);
}
}
private int newLineIndex(byte[] clientPayload) {
for (int i = 0; i < clientPayload.length; i++) {
if (clientPayload[i] == "\n".getBytes(StandardCharsets.UTF_8)[0]) {
return i;
}
}
return -1;
}
private void stopHandler(Void unused) {
try {
stop();
} catch (Exception e) {
logger.error("Got an error while stopping", e);
}
}
private void handleText(String s) {
logger.info("Got message: {}", s);
}
@Override
public void stop() throws Exception {
isShuttingDownActivated = true;
client.close();
synchronized (runningMonitor) {
runningMonitor.notifyAll();
}
}
public void waitingUntilFinished() {
while(!isShuttingDownActivated) {
synchronized (runningMonitor) {
try {
runningMonitor.wait();
return;
} catch (InterruptedException e) {
logger.warn("Got interrupted", e);
}
}
}
}
}

View file

@ -0,0 +1,16 @@
package ch.polgrabia.demos.server;
import io.vertx.core.Vertx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebsocketServerApp {
private static final Logger logger = LoggerFactory.getLogger(WebsocketServerApp.class);
public static void main(String[] args) {
var vertx = Vertx.vertx();
WebsocketServerChatVerticle websocketServerChatVerticle = new WebsocketServerChatVerticle(8080, "localhost");
vertx.deployVerticle(websocketServerChatVerticle);
websocketServerChatVerticle.waitUntilFinished();
vertx.close();
}
}

View file

@ -0,0 +1,51 @@
package ch.polgrabia.demos.server;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.http.HttpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebsocketServerChatVerticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(WebsocketServerChatVerticle.class);
private HttpServer httpServer;
private final int port;
private final String hostname;
private final Object runningMonitor = new Object();
private boolean shutdownActivation = false;
public WebsocketServerChatVerticle(int port, String hostname) {
this.port = port;
this.hostname = hostname;
}
@Override
public void start() throws Exception {
httpServer = getVertx()
.createHttpServer();
httpServer.webSocketHandler(new WebsocketServerHandler()).listen(port, hostname);
shutdownActivation = true;
}
@Override
public void stop() throws Exception {
httpServer.close();
shutdownActivation = false;
synchronized (runningMonitor) {
runningMonitor.notifyAll();
}
}
public void waitUntilFinished() {
while (!shutdownActivation) {
synchronized (runningMonitor) {
try {
runningMonitor.wait();
return;
} catch (InterruptedException e) {
logger.warn("We got interrupted, retrying", e);;
}
}
}
}
}

View file

@ -0,0 +1,33 @@
package ch.polgrabia.demos.server;
import ch.polgrabia.demos.client.WebsocketClientApp;
import io.vertx.core.Handler;
import io.vertx.core.http.ServerWebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebsocketServerHandler implements Handler<ServerWebSocket> {
private static final Logger logger = LoggerFactory.getLogger(WebsocketClientApp.class);
@Override
public void handle(ServerWebSocket serverWebSocket) {
serverWebSocket.writeTextMessage("Hello");
serverWebSocket.textMessageHandler(new WebsocketTextMessageHandler(serverWebSocket));
}
static class WebsocketTextMessageHandler implements Handler<String> {
private final ServerWebSocket serverWebSocket;
public WebsocketTextMessageHandler(ServerWebSocket serverWebSocket) {
this.serverWebSocket = serverWebSocket;
}
@Override
public void handle(String s) {
logger.info("Got message {}", s);
serverWebSocket.writeTextMessage("Pong: " + s); // TBD send to kafka
}
}
}