From 9e8a692eb4c20c2b239a518331dff463d56f91dd Mon Sep 17 00:00:00 2001 From: Tomasz Polgrabia Date: Fri, 27 Dec 2024 20:55:21 +0100 Subject: [PATCH] Handling map of channel to sockets + added kafka deps and docker compose. --- .../chat_demo1_web/build.gradle.kts | 1 + .../demos/server/WebsocketServerHandler.java | 36 ++++++++++++++++++- 2024/08/chat_demo1/tools/docker-compose.yaml | 17 +++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) create mode 100644 2024/08/chat_demo1/tools/docker-compose.yaml diff --git a/2024/08/chat_demo1/chat_demo1_web/build.gradle.kts b/2024/08/chat_demo1/chat_demo1_web/build.gradle.kts index 5aae278..8226bc2 100644 --- a/2024/08/chat_demo1/chat_demo1_web/build.gradle.kts +++ b/2024/08/chat_demo1/chat_demo1_web/build.gradle.kts @@ -12,6 +12,7 @@ repositories { dependencies { implementation("io.vertx:vertx-core:4.5.9") implementation("ch.qos.logback:logback-classic:1.5.6") + implementation("org.apache.kafka:kafka-clients:3.9.0") testImplementation(platform("org.junit:junit-bom:5.10.0")) testImplementation("org.junit.jupiter:junit-jupiter") } diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java index 60c86a4..214bbad 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java @@ -6,13 +6,45 @@ import io.vertx.core.http.ServerWebSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + public class WebsocketServerHandler implements Handler { private static final Logger logger = LoggerFactory.getLogger(WebsocketServerHandler.class); + private static final ConcurrentHashMap> + channelSocketMapping = new ConcurrentHashMap<>(); + @Override public void handle(ServerWebSocket serverWebSocket) { serverWebSocket.writeBinaryMessage(Buffer.buffer("Hello")); serverWebSocket.handler(new WebsocketMessageHandler(serverWebSocket)); + String channel = serverWebSocket.path(); + Set sockets = channelSocketMapping.computeIfAbsent(channel, (k) -> { + Set s = new HashSet<>(); + s.add(serverWebSocket); + return s; + }); + + + serverWebSocket.closeHandler(aVoid -> { + int id = System.identityHashCode(serverWebSocket); + logger.info("[{}] Closing web socket with id {}", channel, id); + Set serverWebSockets = channelSocketMapping.get(channel); + if (serverWebSockets != null) { + boolean removed = serverWebSockets + .remove(serverWebSocket); + if (!removed) { + logger.warn("Server websocket channel {} was not found in the channel set of sockets." + + " It should never happen", channel); + } + } else { + logger.warn("Channel {} not found. It should never happen", channel); + } + }); + + sockets.add(serverWebSocket); } static class WebsocketMessageHandler implements Handler { @@ -24,9 +56,11 @@ public class WebsocketServerHandler implements Handler { this.serverWebSocket.setWriteQueueMaxSize(0); } + @Override public void handle(Buffer s) { - logger.info("[{}] Got message {}", serverWebSocket.path(), s); + String channel = serverWebSocket.path(); + logger.info("[{}] Got message {}", channel, s); serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka } } diff --git a/2024/08/chat_demo1/tools/docker-compose.yaml b/2024/08/chat_demo1/tools/docker-compose.yaml new file mode 100644 index 0000000..63e36a0 --- /dev/null +++ b/2024/08/chat_demo1/tools/docker-compose.yaml @@ -0,0 +1,17 @@ +services: + broker: + image: apache/kafka:latest + container_name: broker + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_NUM_PARTITIONS: 3