From a6ea81505c7fb74ad95dafd4b8dba39c6b45a8c8 Mon Sep 17 00:00:00 2001 From: Tomasz Polgrabia Date: Fri, 27 Dec 2024 22:00:03 +0100 Subject: [PATCH] Restored working but it needs to be run on the same wsl box. --- .../ch/polgrabia/demos/server/WebsocketServerApp.java | 9 ++++++--- .../polgrabia/demos/server/WebsocketServerHandler.java | 2 +- 2024/08/chat_demo1/tools/docker-compose.yaml | 6 +++--- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerApp.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerApp.java index 0e62c15..d118f23 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerApp.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerApp.java @@ -1,10 +1,13 @@ package ch.polgrabia.demos.server; import io.vertx.core.Vertx; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; + +import java.util.Properties; public class WebsocketServerApp { public static void main(String[] args) { - /** Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("linger.ms", 1); @@ -12,7 +15,7 @@ public class WebsocketServerApp { props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); Producer kafkaProducer = new KafkaProducer<>(props); - **/ + var vertx = Vertx.vertx(); WebsocketServerChatVerticle websocketServerChatVerticle = new WebsocketServerChatVerticle( @@ -24,6 +27,6 @@ public class WebsocketServerApp { .waitUntilFinished() .join(); vertx.close(); - // kafkaProducer.close(); + kafkaProducer.close(); } } \ No newline at end of file diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java index 2851d3a..138c179 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java @@ -71,7 +71,7 @@ public class WebsocketServerHandler implements Handler { String channel = serverWebSocket.path(); logger.info("[{}] Got message {}", channel, s); serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka - // kafkaProducer.send(new ProducerRecord<>(channel, channel, s.getBytes())); // kafka setup needs to be checked + kafkaProducer.send(new ProducerRecord<>(channel, channel, s.getBytes())); // kafka setup needs to be checked } } diff --git a/2024/08/chat_demo1/tools/docker-compose.yaml b/2024/08/chat_demo1/tools/docker-compose.yaml index 4bfdbcb..3165d68 100644 --- a/2024/08/chat_demo1/tools/docker-compose.yaml +++ b/2024/08/chat_demo1/tools/docker-compose.yaml @@ -5,11 +5,11 @@ services: environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller - KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CONTROLLER_QUORUM_VOTERS: 1@0.0.0.0:9093 + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1