From e492b6df606f17a312c2869b57f419e210b8c9f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20P=C3=B3=C5=82grabia?= Date: Sat, 28 Dec 2024 13:59:13 +0100 Subject: [PATCH] Upgraded kafka and zookeeper to 7.4.8 (last working before 7.5.0) + successfully sending to kafka topic. --- .../demos/client/WebsocketClientParticle.java | 7 ++++--- .../demos/server/WebsocketServerHandler.java | 7 ++++--- .../java/ch/polgrabia/demos/utils/TopicUtil.java | 13 +++++++++++++ 2024/08/chat_demo1/tools/README.md | 3 +++ 2024/08/chat_demo1/tools/docker-compose.yaml | 6 ++++-- 5 files changed, 28 insertions(+), 8 deletions(-) create mode 100644 2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/utils/TopicUtil.java diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java index cafe94d..e698f19 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java @@ -1,5 +1,6 @@ package ch.polgrabia.demos.client; +import ch.polgrabia.demos.utils.TopicUtil; import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; import io.vertx.core.buffer.Buffer; @@ -18,6 +19,7 @@ public class WebsocketClientParticle extends AbstractVerticle { private final int port; // path maps to channel private final String path; + private final String channel; private HttpClient client; private boolean isShuttingDownActivated = false; private final Object runningMonitor = new Object(); @@ -28,6 +30,7 @@ public class WebsocketClientParticle extends AbstractVerticle { this.hostname = hostname; this.port = port; this.path = path; + this.channel = TopicUtil.convertTopicName(path); } @Override @@ -51,8 +54,6 @@ public class WebsocketClientParticle extends AbstractVerticle { } logger.info("Successfully started websocket client command server"); - - HttpServer result = httpServerAsyncResult.result(); } private void handleCommandRequests(HttpServerRequest httpServerRequest) { @@ -88,7 +89,7 @@ public class WebsocketClientParticle extends AbstractVerticle { )); webSocket - .binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", path, msg)) + .binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", channel, msg)) .closeHandler(unused -> { try { stop(); diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java index fae7a77..093ff51 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java @@ -1,5 +1,6 @@ package ch.polgrabia.demos.server; +import ch.polgrabia.demos.utils.TopicUtil; import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.ServerWebSocket; @@ -27,7 +28,7 @@ public class WebsocketServerHandler implements Handler { public void handle(ServerWebSocket serverWebSocket) { serverWebSocket.writeBinaryMessage(Buffer.buffer("Hello")); serverWebSocket.handler(new WebsocketMessageHandler(kafkaProducer, serverWebSocket)); - String channel = serverWebSocket.path(); + String channel = TopicUtil.convertTopicName(serverWebSocket.path()); Set sockets = channelSocketMapping.computeIfAbsent(channel, (k) -> { Set s = new HashSet<>(); s.add(serverWebSocket); @@ -47,7 +48,7 @@ public class WebsocketServerHandler implements Handler { " It should never happen", channel); } } else { - logger.warn("Channel {} not found. It should never happen", channel); + logger.warn("Channel {} not found. It should never happen",channel); } }); @@ -68,7 +69,7 @@ public class WebsocketServerHandler implements Handler { @Override public void handle(Buffer s) { - String channel = serverWebSocket.path(); + String channel = TopicUtil.convertTopicName(serverWebSocket.path()); logger.info("[{}] Got message {}", channel, s); serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka if (kafkaProducer != null) { diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/utils/TopicUtil.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/utils/TopicUtil.java new file mode 100644 index 0000000..d6e1c5f --- /dev/null +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/utils/TopicUtil.java @@ -0,0 +1,13 @@ +package ch.polgrabia.demos.utils; + +import org.apache.kafka.common.internals.Topic; + +public class TopicUtil { + private TopicUtil() { + + } + + public static String convertTopicName(final String path) { + return path.replaceAll("[^a-zA-Z0-9]", ""); + } +} diff --git a/2024/08/chat_demo1/tools/README.md b/2024/08/chat_demo1/tools/README.md index 772d799..2e42fe5 100644 --- a/2024/08/chat_demo1/tools/README.md +++ b/2024/08/chat_demo1/tools/README.md @@ -1,3 +1,6 @@ # Kafka docker setup Seems like there is a behavior change between 7.4.4 and latest (7.8.0, too) images. The same config works for 7.4.4 but not for the latest (it tries to connect broker and it fails). + +28.12.2024 - last 7.4.x tag 7.4.8 version seems to be working well. +Needs to be checked what's the difference between 7.4.8 and 7.5.0 diff --git a/2024/08/chat_demo1/tools/docker-compose.yaml b/2024/08/chat_demo1/tools/docker-compose.yaml index 1ab0253..e7c3a3f 100644 --- a/2024/08/chat_demo1/tools/docker-compose.yaml +++ b/2024/08/chat_demo1/tools/docker-compose.yaml @@ -1,22 +1,24 @@ services: zookeeper: - image: confluentinc/cp-zookeeper:7.4.4 + image: confluentinc/cp-zookeeper:7.4.8 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - "22181:2181" broker: - image: confluentinc/cp-kafka:7.4.4 + image: confluentinc/cp-kafka:7.4.8 depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + TOPIC_AUTO_CREATE: true ports: - "29092:29092"