Now possible to send text messages from client via simple rest and curl but kafka setup still is not working.

master
Tomasz Półgrabia 2024-12-28 00:10:13 +01:00
parent 091fdaa8b8
commit 9aac4914bd
3 changed files with 42 additions and 5 deletions

View File

@ -2,9 +2,8 @@ package ch.polgrabia.demos.client;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.WebSocket;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.core.http.impl.WebSocketClientImpl;
import io.vertx.core.impl.CloseFuture;
import io.vertx.core.impl.VertxInternal;
@ -22,6 +21,8 @@ public class WebsocketClientParticle extends AbstractVerticle {
private HttpClient client;
private boolean isShuttingDownActivated = false;
private final Object runningMonitor = new Object();
private HttpServer server;
private WebSocket webSocket;
public WebsocketClientParticle(String hostname, int port, String path) {
this.hostname = hostname;
@ -33,10 +34,42 @@ public class WebsocketClientParticle extends AbstractVerticle {
public void start() {
this.client = getVertx()
.createHttpClient();
HttpServerOptions httpServerOptions = new HttpServerOptions();
this.server = getVertx().createHttpServer(httpServerOptions);
this.server.requestHandler(this::handleCommandRequests);
this.server.listen(8123, "localhost", this::handleListenCommand);
logger.info("Starting websocket client command server");
new WebSocketClientImpl((VertxInternal) getVertx(), new HttpClientOptions(), new CloseFuture())
.webSocket(port, hostname, path, this::handleMessage);
}
private void handleListenCommand(AsyncResult<HttpServer> httpServerAsyncResult) {
if (httpServerAsyncResult.failed()) {
logger.warn("Couldn't initialized http command server");
getVertx().close();
System.exit(1);
}
logger.info("Successfully started websocket client command server");
HttpServer result = httpServerAsyncResult.result();
}
private void handleCommandRequests(HttpServerRequest httpServerRequest) {
logger.info("Received command request on 8123");
HttpServerResponse response = httpServerRequest.response();
String qParam = httpServerRequest.getParam("q");
if (qParam == null || qParam.isBlank()) {
response.setStatusCode(400);
response.end("bad request");
return;
}
logger.info("Sending binary message: {}", qParam);
webSocket.writeBinaryMessage(Buffer.buffer(qParam));
response.setStatusCode(202);
response.end("Ok");
}
private void handleMessage(AsyncResult<WebSocket> webSocketAsyncResult) {
logger.info("Handling websocket");
if (!webSocketAsyncResult.succeeded()) {
@ -48,7 +81,7 @@ public class WebsocketClientParticle extends AbstractVerticle {
}
return;
}
WebSocket webSocket = webSocketAsyncResult.result();
this.webSocket = webSocketAsyncResult.result();
vertx.setPeriodic(100, new ClientPeriodicStdinScanner(
webSocket,
System.in
@ -70,6 +103,7 @@ public class WebsocketClientParticle extends AbstractVerticle {
public void stop() {
isShuttingDownActivated = true;
client.close();
server.close();
synchronized (runningMonitor) {
runningMonitor.notifyAll();
}

View File

@ -71,7 +71,9 @@ public class WebsocketServerHandler implements Handler<ServerWebSocket> {
String channel = serverWebSocket.path();
logger.info("[{}] Got message {}", channel, s);
serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka
kafkaProducer.send(new ProducerRecord<>(channel, channel, s.getBytes())); // kafka setup needs to be checked
if (kafkaProducer != null) {
kafkaProducer.send(new ProducerRecord<>(channel, channel, s.getBytes())); // kafka setup needs to be checked
}
}
}

View File

@ -9,6 +9,7 @@ services:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1