diff --git a/2024/08/chat_demo1/chat_demo1_web/build.gradle.kts b/2024/08/chat_demo1/chat_demo1_web/build.gradle.kts index 5aae278..8226bc2 100644 --- a/2024/08/chat_demo1/chat_demo1_web/build.gradle.kts +++ b/2024/08/chat_demo1/chat_demo1_web/build.gradle.kts @@ -12,6 +12,7 @@ repositories { dependencies { implementation("io.vertx:vertx-core:4.5.9") implementation("ch.qos.logback:logback-classic:1.5.6") + implementation("org.apache.kafka:kafka-clients:3.9.0") testImplementation(platform("org.junit:junit-bom:5.10.0")) testImplementation("org.junit.jupiter:junit-jupiter") } diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java index 60c86a4..214bbad 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerHandler.java @@ -6,13 +6,45 @@ import io.vertx.core.http.ServerWebSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + public class WebsocketServerHandler implements Handler<ServerWebSocket> { private static final Logger logger = LoggerFactory.getLogger(WebsocketServerHandler.class); + private static final ConcurrentHashMap<String, Set<ServerWebSocket>> + channelSocketMapping = new ConcurrentHashMap<>(); + @Override public void handle(ServerWebSocket serverWebSocket) { serverWebSocket.writeBinaryMessage(Buffer.buffer("Hello")); serverWebSocket.handler(new WebsocketMessageHandler(serverWebSocket)); + String channel = serverWebSocket.path(); + Set<ServerWebSocket> sockets = channelSocketMapping.computeIfAbsent(channel, (k) -> { + Set<ServerWebSocket> s = new HashSet<>(); + s.add(serverWebSocket); + return s; + }); + + + serverWebSocket.closeHandler(aVoid -> { + int id = System.identityHashCode(serverWebSocket); + logger.info("[{}] Closing web socket with id {}", channel, id); + Set<ServerWebSocket> serverWebSockets = channelSocketMapping.get(channel); + if (serverWebSockets != null) { + boolean removed = serverWebSockets + .remove(serverWebSocket); + if (!removed) { + logger.warn("Server websocket channel {} was not found in the channel set of sockets." + + " It should never happen", channel); + } + } else { + logger.warn("Channel {} not found. It should never happen", channel); + } + }); + + sockets.add(serverWebSocket); } static class WebsocketMessageHandler implements Handler<Buffer> { @@ -24,9 +56,11 @@ public class WebsocketServerHandler implements Handler<ServerWebSocket> { this.serverWebSocket.setWriteQueueMaxSize(0); } + @Override public void handle(Buffer s) { - logger.info("[{}] Got message {}", serverWebSocket.path(), s); + String channel = serverWebSocket.path(); + logger.info("[{}] Got message {}", channel, s); serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka } } diff --git a/2024/08/chat_demo1/tools/docker-compose.yaml b/2024/08/chat_demo1/tools/docker-compose.yaml new file mode 100644 index 0000000..63e36a0 --- /dev/null +++ b/2024/08/chat_demo1/tools/docker-compose.yaml @@ -0,0 +1,17 @@ +services: + broker: + image: apache/kafka:latest + container_name: broker + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_NUM_PARTITIONS: 3