Upgraded kafka and zookeeper to 7.4.8 (last working before 7.5.0) + successfully sending to kafka topic.

master
Tomasz Półgrabia 2024-12-28 13:59:13 +01:00
parent affff394be
commit e492b6df60
5 changed files with 28 additions and 8 deletions

View File

@ -1,5 +1,6 @@
package ch.polgrabia.demos.client; package ch.polgrabia.demos.client;
import ch.polgrabia.demos.utils.TopicUtil;
import io.vertx.core.AbstractVerticle; import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult; import io.vertx.core.AsyncResult;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
@ -18,6 +19,7 @@ public class WebsocketClientParticle extends AbstractVerticle {
private final int port; private final int port;
// path maps to channel // path maps to channel
private final String path; private final String path;
private final String channel;
private HttpClient client; private HttpClient client;
private boolean isShuttingDownActivated = false; private boolean isShuttingDownActivated = false;
private final Object runningMonitor = new Object(); private final Object runningMonitor = new Object();
@ -28,6 +30,7 @@ public class WebsocketClientParticle extends AbstractVerticle {
this.hostname = hostname; this.hostname = hostname;
this.port = port; this.port = port;
this.path = path; this.path = path;
this.channel = TopicUtil.convertTopicName(path);
} }
@Override @Override
@ -51,8 +54,6 @@ public class WebsocketClientParticle extends AbstractVerticle {
} }
logger.info("Successfully started websocket client command server"); logger.info("Successfully started websocket client command server");
HttpServer result = httpServerAsyncResult.result();
} }
private void handleCommandRequests(HttpServerRequest httpServerRequest) { private void handleCommandRequests(HttpServerRequest httpServerRequest) {
@ -88,7 +89,7 @@ public class WebsocketClientParticle extends AbstractVerticle {
)); ));
webSocket webSocket
.binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", path, msg)) .binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", channel, msg))
.closeHandler(unused -> { .closeHandler(unused -> {
try { try {
stop(); stop();

View File

@ -1,5 +1,6 @@
package ch.polgrabia.demos.server; package ch.polgrabia.demos.server;
import ch.polgrabia.demos.utils.TopicUtil;
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket; import io.vertx.core.http.ServerWebSocket;
@ -27,7 +28,7 @@ public class WebsocketServerHandler implements Handler<ServerWebSocket> {
public void handle(ServerWebSocket serverWebSocket) { public void handle(ServerWebSocket serverWebSocket) {
serverWebSocket.writeBinaryMessage(Buffer.buffer("Hello")); serverWebSocket.writeBinaryMessage(Buffer.buffer("Hello"));
serverWebSocket.handler(new WebsocketMessageHandler(kafkaProducer, serverWebSocket)); serverWebSocket.handler(new WebsocketMessageHandler(kafkaProducer, serverWebSocket));
String channel = serverWebSocket.path(); String channel = TopicUtil.convertTopicName(serverWebSocket.path());
Set<ServerWebSocket> sockets = channelSocketMapping.computeIfAbsent(channel, (k) -> { Set<ServerWebSocket> sockets = channelSocketMapping.computeIfAbsent(channel, (k) -> {
Set<ServerWebSocket> s = new HashSet<>(); Set<ServerWebSocket> s = new HashSet<>();
s.add(serverWebSocket); s.add(serverWebSocket);
@ -47,7 +48,7 @@ public class WebsocketServerHandler implements Handler<ServerWebSocket> {
" It should never happen", channel); " It should never happen", channel);
} }
} else { } else {
logger.warn("Channel {} not found. It should never happen", channel); logger.warn("Channel {} not found. It should never happen",channel);
} }
}); });
@ -68,7 +69,7 @@ public class WebsocketServerHandler implements Handler<ServerWebSocket> {
@Override @Override
public void handle(Buffer s) { public void handle(Buffer s) {
String channel = serverWebSocket.path(); String channel = TopicUtil.convertTopicName(serverWebSocket.path());
logger.info("[{}] Got message {}", channel, s); logger.info("[{}] Got message {}", channel, s);
serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka
if (kafkaProducer != null) { if (kafkaProducer != null) {

View File

@ -0,0 +1,13 @@
package ch.polgrabia.demos.utils;
import org.apache.kafka.common.internals.Topic;
public class TopicUtil {
private TopicUtil() {
}
public static String convertTopicName(final String path) {
return path.replaceAll("[^a-zA-Z0-9]", "");
}
}

View File

@ -1,3 +1,6 @@
# Kafka docker setup # Kafka docker setup
Seems like there is a behavior change between 7.4.4 and latest (7.8.0, too) images. The same config works for 7.4.4 but not for the latest (it tries to connect broker and it fails). Seems like there is a behavior change between 7.4.4 and latest (7.8.0, too) images. The same config works for 7.4.4 but not for the latest (it tries to connect broker and it fails).
28.12.2024 - last 7.4.x tag 7.4.8 version seems to be working well.
Needs to be checked what's the difference between 7.4.8 and 7.5.0

View File

@ -1,22 +1,24 @@
services: services:
zookeeper: zookeeper:
image: confluentinc/cp-zookeeper:7.4.4 image: confluentinc/cp-zookeeper:7.4.8
environment: environment:
ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_TICK_TIME: 2000
ports: ports:
- "22181:2181" - "22181:2181"
broker: broker:
image: confluentinc/cp-kafka:7.4.4 image: confluentinc/cp-kafka:7.4.8
depends_on: depends_on:
- zookeeper - zookeeper
environment: environment:
KAFKA_BROKER_ID: 1 KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
TOPIC_AUTO_CREATE: true
ports: ports:
- "29092:29092" - "29092:29092"