From 4ad8473aaa4c16aa5aa6345fdad642e0c4b1924f Mon Sep 17 00:00:00 2001 From: Tomasz Polgrabia Date: Fri, 27 Dec 2024 21:19:58 +0100 Subject: [PATCH] Added stubs for kafka connection but configuration (wsl + windows) needs to be checked. --- .../demos/server/WebsocketServerApp.java | 20 ++++++++++++++----- .../server/WebsocketServerChatVerticle.java | 7 +++++-- .../demos/server/WebsocketServerHandler.java | 14 +++++++++++-- 2024/08/chat_demo1/tools/docker-compose.yaml | 9 ++++++--- 4 files changed, 38 insertions(+), 12 deletions(-) diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerApp.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerApp.java index 7993344..0e62c15 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerApp.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerApp.java @@ -1,19 +1,29 @@ package ch.polgrabia.demos.server; import io.vertx.core.Vertx; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class WebsocketServerApp { - private static final Logger logger = LoggerFactory.getLogger(WebsocketServerApp.class); - public static void main(String[] args) { + /** + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("linger.ms", 1); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + + Producer kafkaProducer = new KafkaProducer<>(props); + **/ + var vertx = Vertx.vertx(); - WebsocketServerChatVerticle websocketServerChatVerticle = new WebsocketServerChatVerticle(8080, "localhost"); + WebsocketServerChatVerticle websocketServerChatVerticle = new WebsocketServerChatVerticle( + 8080, + "localhost", + null); // kafka producer vertx.deployVerticle(websocketServerChatVerticle); websocketServerChatVerticle .waitUntilFinished() .join(); vertx.close(); + // kafkaProducer.close(); } } \ No newline at end of file diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerChatVerticle.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerChatVerticle.java index 3636c22..452c2ab 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerChatVerticle.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerChatVerticle.java @@ -2,6 +2,7 @@ package ch.polgrabia.demos.server; import io.vertx.core.AbstractVerticle; import io.vertx.core.http.HttpServer; +import org.apache.kafka.clients.producer.Producer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -9,6 +10,7 @@ import java.util.concurrent.CompletableFuture; public class WebsocketServerChatVerticle extends AbstractVerticle { private static final Logger logger = LoggerFactory.getLogger(WebsocketServerChatVerticle.class); + private final Producer kafkaProducer; private HttpServer httpServer; private final int port; private final String hostname; @@ -16,16 +18,17 @@ public class WebsocketServerChatVerticle extends AbstractVerticle { private final Object runningMonitor = new Object(); private boolean shutdownActivation = false; - public WebsocketServerChatVerticle(int port, String hostname) { + public WebsocketServerChatVerticle(int port, String hostname, Producer kafkaProducer) { this.port = port; this.hostname = hostname; + this.kafkaProducer = kafkaProducer; } @Override public void start() throws Exception { httpServer = getVertx() .createHttpServer(); - httpServer.webSocketHandler(new WebsocketServerHandler()).listen(port, hostname); + httpServer.webSocketHandler(new WebsocketServerHandler(kafkaProducer)).listen(port, hostname); shutdownActivation = true; } diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java index 214bbad..2851d3a 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java @@ -3,6 +3,8 @@ package ch.polgrabia.demos.server; import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.ServerWebSocket; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,11 +17,16 @@ public class WebsocketServerHandler implements Handler { private static final ConcurrentHashMap> channelSocketMapping = new ConcurrentHashMap<>(); + private final Producer kafkaProducer; + + public WebsocketServerHandler(Producer kafkaProducer) { + this.kafkaProducer = kafkaProducer; + } @Override public void handle(ServerWebSocket serverWebSocket) { serverWebSocket.writeBinaryMessage(Buffer.buffer("Hello")); - serverWebSocket.handler(new WebsocketMessageHandler(serverWebSocket)); + serverWebSocket.handler(new WebsocketMessageHandler(kafkaProducer, serverWebSocket)); String channel = serverWebSocket.path(); Set sockets = channelSocketMapping.computeIfAbsent(channel, (k) -> { Set s = new HashSet<>(); @@ -50,8 +57,10 @@ public class WebsocketServerHandler implements Handler { static class WebsocketMessageHandler implements Handler { private final ServerWebSocket serverWebSocket; + private final Producer kafkaProducer; - public WebsocketMessageHandler(ServerWebSocket serverWebSocket) { + public WebsocketMessageHandler(Producer kafkaProducer, ServerWebSocket serverWebSocket) { + this.kafkaProducer = kafkaProducer; this.serverWebSocket = serverWebSocket; this.serverWebSocket.setWriteQueueMaxSize(0); } @@ -62,6 +71,7 @@ public class WebsocketServerHandler implements Handler { String channel = serverWebSocket.path(); logger.info("[{}] Got message {}", channel, s); serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka + // kafkaProducer.send(new ProducerRecord<>(channel, channel, s.getBytes())); // kafka setup needs to be checked } } diff --git a/2024/08/chat_demo1/tools/docker-compose.yaml b/2024/08/chat_demo1/tools/docker-compose.yaml index 63e36a0..4bfdbcb 100644 --- a/2024/08/chat_demo1/tools/docker-compose.yaml +++ b/2024/08/chat_demo1/tools/docker-compose.yaml @@ -5,13 +5,16 @@ services: environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller - KAFKA_LISTENERS: PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@0.0.0.0:9093 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_NUM_PARTITIONS: 3 + ports: + - "9092:9092" + - "9093:9093" \ No newline at end of file