diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java index 1852f44..cafe94d 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java @@ -2,9 +2,8 @@ package ch.polgrabia.demos.client; import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; -import io.vertx.core.http.HttpClient; -import io.vertx.core.http.HttpClientOptions; -import io.vertx.core.http.WebSocket; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.*; import io.vertx.core.http.impl.WebSocketClientImpl; import io.vertx.core.impl.CloseFuture; import io.vertx.core.impl.VertxInternal; @@ -22,6 +21,8 @@ public class WebsocketClientParticle extends AbstractVerticle { private HttpClient client; private boolean isShuttingDownActivated = false; private final Object runningMonitor = new Object(); + private HttpServer server; + private WebSocket webSocket; public WebsocketClientParticle(String hostname, int port, String path) { this.hostname = hostname; @@ -33,10 +34,42 @@ public class WebsocketClientParticle extends AbstractVerticle { public void start() { this.client = getVertx() .createHttpClient(); + HttpServerOptions httpServerOptions = new HttpServerOptions(); + this.server = getVertx().createHttpServer(httpServerOptions); + this.server.requestHandler(this::handleCommandRequests); + this.server.listen(8123, "localhost", this::handleListenCommand); + logger.info("Starting websocket client command server"); new WebSocketClientImpl((VertxInternal) getVertx(), new HttpClientOptions(), new CloseFuture()) .webSocket(port, hostname, path, this::handleMessage); } + private void handleListenCommand(AsyncResult httpServerAsyncResult) { + if (httpServerAsyncResult.failed()) { + logger.warn("Couldn't initialized http command server"); + getVertx().close(); + System.exit(1); + } + + logger.info("Successfully started websocket client command server"); + + HttpServer result = httpServerAsyncResult.result(); + } + + private void handleCommandRequests(HttpServerRequest httpServerRequest) { + logger.info("Received command request on 8123"); + HttpServerResponse response = httpServerRequest.response(); + String qParam = httpServerRequest.getParam("q"); + if (qParam == null || qParam.isBlank()) { + response.setStatusCode(400); + response.end("bad request"); + return; + } + logger.info("Sending binary message: {}", qParam); + webSocket.writeBinaryMessage(Buffer.buffer(qParam)); + response.setStatusCode(202); + response.end("Ok"); + } + private void handleMessage(AsyncResult webSocketAsyncResult) { logger.info("Handling websocket"); if (!webSocketAsyncResult.succeeded()) { @@ -48,7 +81,7 @@ public class WebsocketClientParticle extends AbstractVerticle { } return; } - WebSocket webSocket = webSocketAsyncResult.result(); + this.webSocket = webSocketAsyncResult.result(); vertx.setPeriodic(100, new ClientPeriodicStdinScanner( webSocket, System.in @@ -70,6 +103,7 @@ public class WebsocketClientParticle extends AbstractVerticle { public void stop() { isShuttingDownActivated = true; client.close(); + server.close(); synchronized (runningMonitor) { runningMonitor.notifyAll(); } diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java index 138c179..fae7a77 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java @@ -71,7 +71,9 @@ public class WebsocketServerHandler implements Handler { String channel = serverWebSocket.path(); logger.info("[{}] Got message {}", channel, s); serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka - kafkaProducer.send(new ProducerRecord<>(channel, channel, s.getBytes())); // kafka setup needs to be checked + if (kafkaProducer != null) { + kafkaProducer.send(new ProducerRecord<>(channel, channel, s.getBytes())); // kafka setup needs to be checked + } } } diff --git a/2024/08/chat_demo1/tools/docker-compose.yaml b/2024/08/chat_demo1/tools/docker-compose.yaml index eb2afae..9b2f3c4 100644 --- a/2024/08/chat_demo1/tools/docker-compose.yaml +++ b/2024/08/chat_demo1/tools/docker-compose.yaml @@ -9,6 +9,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@:9093 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1