From 74c1f2d94eef64be6c398101ab1dc0bb97c41692 Mon Sep 17 00:00:00 2001 From: Tomasz Polgrabia Date: Fri, 24 Jan 2025 00:30:26 +0100 Subject: [PATCH] Small refactorings for client & server. --- .../ClientPeriodicStdinScannerParticle.java | 2 +- .../ch/polgrabia/demos/client/Constants.java | 1 + .../demos/client/WebApiParticle.java | 3 +- .../client/WebSocketReceiverParticle.java | 35 +++++++++++++++++++ .../demos/client/WebSocketSenderParticle.java | 2 +- .../demos/client/WebsocketClientParticle.java | 20 ++++------- .../demos/server/WebsocketMessageHandler.java | 33 +++++++++++++++++ .../demos/server/WebsocketServerHandler.java | 24 ------------- 8 files changed, 79 insertions(+), 41 deletions(-) create mode 100644 2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebSocketReceiverParticle.java create mode 100644 2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketMessageHandler.java diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScannerParticle.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScannerParticle.java index 80fc312..257da62 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScannerParticle.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScannerParticle.java @@ -59,7 +59,7 @@ public class ClientPeriodicStdinScannerParticle extends AbstractVerticle impleme logger.info("Sending binary message: {}", b); getVertx() .eventBus() - .send(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, b); + .publish(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, b); prevIdx = idx + eolLength; } } while (idx >= 0 && prevIdx < clientPayload.length); diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/Constants.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/Constants.java index fe17ade..68105b9 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/Constants.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/Constants.java @@ -3,4 +3,5 @@ package ch.polgrabia.demos.client; public class Constants { public static final String CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY = "%s#send" .formatted(WebSocketSenderParticle.class.getCanonicalName()); + public static final int CLIENT_CONSOLE_INPUT_SCANNING_PERIOD_MS = 100; } diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebApiParticle.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebApiParticle.java index 0500f7d..02ce433 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebApiParticle.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebApiParticle.java @@ -3,6 +3,7 @@ package ch.polgrabia.demos.client; import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.EventBus; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerOptions; import io.vertx.core.http.HttpServerRequest; @@ -37,7 +38,7 @@ public class WebApiParticle extends AbstractVerticle { return; } logger.info("Sending binary message: {}", qParam); - getVertx().eventBus().send(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, Buffer.buffer(qParam)); + getVertx().eventBus().publish(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, Buffer.buffer(qParam)); response.setStatusCode(202); response.end("Ok"); } diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebSocketReceiverParticle.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebSocketReceiverParticle.java new file mode 100644 index 0000000..bf5cc0e --- /dev/null +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebSocketReceiverParticle.java @@ -0,0 +1,35 @@ +package ch.polgrabia.demos.client; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.http.WebSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WebSocketReceiverParticle extends AbstractVerticle { + private static final Logger logger = LoggerFactory.getLogger(WebSocketReceiverParticle.class); + private final WebSocket webSocket; + private final String channel; + private final Runnable onClose; + + public WebSocketReceiverParticle(WebSocket webSocket, String channel, Runnable onClose) { + this.webSocket = webSocket; + this.channel = channel; + this.onClose = onClose; + } + + @Override + public void start() throws Exception { + logger.info("Starting WebSocketReceiverParticle"); + webSocket + .binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", channel, msg)) + .closeHandler(unused -> { + logger.info("[{}] Web socket is being closed {}", channel, webSocket.closeReason()); + try { + onClose.run(); + } catch (Exception e) { + logger.error("Got an error while stopping", e); + } + }); + } + +} diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebSocketSenderParticle.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebSocketSenderParticle.java index 9d8bfcd..3b7e7f3 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebSocketSenderParticle.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebSocketSenderParticle.java @@ -23,7 +23,7 @@ public class WebSocketSenderParticle extends AbstractVerticle { @Override public void start() throws Exception { EventBus eventBus = getVertx().eventBus(); - this.consumer = eventBus.consumer(WebSocketSenderParticle.class.getCanonicalName(), event -> { + this.consumer = eventBus.consumer(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, event -> { Buffer msg = event.body(); logger.info("Sending binary message: {}", msg); webSocket.writeBinaryMessage(msg); diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java index 16660b1..4e560f9 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java @@ -28,8 +28,8 @@ public class WebsocketClientParticle extends AbstractVerticle { private HttpClient client; private boolean isShuttingDownActivated = false; private final Object runningMonitor = new Object(); - private WebSocket webSocket; private WebSocketSenderParticle webSocketSenderParticle; + private WebSocketReceiverParticle webSocketReceiverParticle; public WebsocketClientParticle(String hostname, int port, String path) { this.hostname = hostname; @@ -60,21 +60,12 @@ public class WebsocketClientParticle extends AbstractVerticle { } return; } - this.webSocket = webSocketAsyncResult.result(); + WebSocket webSocket = webSocketAsyncResult.result(); + webSocketReceiverParticle = new WebSocketReceiverParticle(webSocket, channel, this::stop); + getVertx().deployVerticle(webSocketReceiverParticle); webSocketSenderParticle = new WebSocketSenderParticle(webSocket); getVertx().deployVerticle(webSocketSenderParticle); - vertx.setPeriodic(100, clientPeriodicStdinScannerParticle); - - webSocket - .binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", channel, msg)) - .closeHandler(unused -> { - try { - stop(); - } catch (Exception e) { - logger.error("Got an error while stopping", e); - } - }); - + vertx.setPeriodic(Constants.CLIENT_CONSOLE_INPUT_SCANNING_PERIOD_MS, clientPeriodicStdinScannerParticle); } @Override @@ -82,6 +73,7 @@ public class WebsocketClientParticle extends AbstractVerticle { isShuttingDownActivated = true; getVertx().undeploy(webApiParticle.deploymentID()); getVertx().undeploy(webSocketSenderParticle.deploymentID()); + getVertx().undeploy(webSocketReceiverParticle.deploymentID()); getVertx().undeploy(clientPeriodicStdinScannerParticle.deploymentID()); client.close(); synchronized (runningMonitor) { diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketMessageHandler.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketMessageHandler.java new file mode 100644 index 0000000..4767d6f --- /dev/null +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketMessageHandler.java @@ -0,0 +1,33 @@ +package ch.polgrabia.demos.server; + +import ch.polgrabia.demos.utils.TopicUtil; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.ServerWebSocket; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WebsocketMessageHandler implements Handler { + private static final Logger logger = LoggerFactory.getLogger(WebsocketMessageHandler.class); + private final ServerWebSocket serverWebSocket; + private final Producer kafkaProducer; + + public WebsocketMessageHandler(Producer kafkaProducer, ServerWebSocket serverWebSocket) { + this.kafkaProducer = kafkaProducer; + this.serverWebSocket = serverWebSocket; + this.serverWebSocket.setWriteQueueMaxSize(0); + } + + + @Override + public void handle(Buffer s) { + String channel = TopicUtil.convertTopicName(serverWebSocket.path()); + logger.info("[{}] Got message {}", channel, s); + serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka + if (kafkaProducer != null) { + kafkaProducer.send(new ProducerRecord<>(channel, channel, s.getBytes())); // kafka setup needs to be checked + } + } +} diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java index 093ff51..7e74374 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java @@ -5,7 +5,6 @@ import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.ServerWebSocket; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,27 +54,4 @@ public class WebsocketServerHandler implements Handler { sockets.add(serverWebSocket); } - static class WebsocketMessageHandler implements Handler { - - private final ServerWebSocket serverWebSocket; - private final Producer kafkaProducer; - - public WebsocketMessageHandler(Producer kafkaProducer, ServerWebSocket serverWebSocket) { - this.kafkaProducer = kafkaProducer; - this.serverWebSocket = serverWebSocket; - this.serverWebSocket.setWriteQueueMaxSize(0); - } - - - @Override - public void handle(Buffer s) { - String channel = TopicUtil.convertTopicName(serverWebSocket.path()); - logger.info("[{}] Got message {}", channel, s); - serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka - if (kafkaProducer != null) { - kafkaProducer.send(new ProducerRecord<>(channel, channel, s.getBytes())); // kafka setup needs to be checked - } - } - } - }