Small refactorings for client & server.

master
Tomasz Polgrabia 2025-01-24 00:30:26 +01:00
parent 001fe270c1
commit 74c1f2d94e
8 changed files with 79 additions and 41 deletions

View File

@ -59,7 +59,7 @@ public class ClientPeriodicStdinScannerParticle extends AbstractVerticle impleme
logger.info("Sending binary message: {}", b);
getVertx()
.eventBus()
.send(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, b);
.publish(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, b);
prevIdx = idx + eolLength;
}
} while (idx >= 0 && prevIdx < clientPayload.length);

View File

@ -3,4 +3,5 @@ package ch.polgrabia.demos.client;
public class Constants {
public static final String CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY = "%s#send"
.formatted(WebSocketSenderParticle.class.getCanonicalName());
public static final int CLIENT_CONSOLE_INPUT_SCANNING_PERIOD_MS = 100;
}

View File

@ -3,6 +3,7 @@ package ch.polgrabia.demos.client;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
@ -37,7 +38,7 @@ public class WebApiParticle extends AbstractVerticle {
return;
}
logger.info("Sending binary message: {}", qParam);
getVertx().eventBus().send(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, Buffer.buffer(qParam));
getVertx().eventBus().publish(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, Buffer.buffer(qParam));
response.setStatusCode(202);
response.end("Ok");
}

View File

@ -0,0 +1,35 @@
package ch.polgrabia.demos.client;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.http.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebSocketReceiverParticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(WebSocketReceiverParticle.class);
private final WebSocket webSocket;
private final String channel;
private final Runnable onClose;
public WebSocketReceiverParticle(WebSocket webSocket, String channel, Runnable onClose) {
this.webSocket = webSocket;
this.channel = channel;
this.onClose = onClose;
}
@Override
public void start() throws Exception {
logger.info("Starting WebSocketReceiverParticle");
webSocket
.binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", channel, msg))
.closeHandler(unused -> {
logger.info("[{}] Web socket is being closed {}", channel, webSocket.closeReason());
try {
onClose.run();
} catch (Exception e) {
logger.error("Got an error while stopping", e);
}
});
}
}

View File

@ -23,7 +23,7 @@ public class WebSocketSenderParticle extends AbstractVerticle {
@Override
public void start() throws Exception {
EventBus eventBus = getVertx().eventBus();
this.consumer = eventBus.consumer(WebSocketSenderParticle.class.getCanonicalName(), event -> {
this.consumer = eventBus.consumer(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, event -> {
Buffer msg = event.body();
logger.info("Sending binary message: {}", msg);
webSocket.writeBinaryMessage(msg);

View File

@ -28,8 +28,8 @@ public class WebsocketClientParticle extends AbstractVerticle {
private HttpClient client;
private boolean isShuttingDownActivated = false;
private final Object runningMonitor = new Object();
private WebSocket webSocket;
private WebSocketSenderParticle webSocketSenderParticle;
private WebSocketReceiverParticle webSocketReceiverParticle;
public WebsocketClientParticle(String hostname, int port, String path) {
this.hostname = hostname;
@ -60,21 +60,12 @@ public class WebsocketClientParticle extends AbstractVerticle {
}
return;
}
this.webSocket = webSocketAsyncResult.result();
WebSocket webSocket = webSocketAsyncResult.result();
webSocketReceiverParticle = new WebSocketReceiverParticle(webSocket, channel, this::stop);
getVertx().deployVerticle(webSocketReceiverParticle);
webSocketSenderParticle = new WebSocketSenderParticle(webSocket);
getVertx().deployVerticle(webSocketSenderParticle);
vertx.setPeriodic(100, clientPeriodicStdinScannerParticle);
webSocket
.binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", channel, msg))
.closeHandler(unused -> {
try {
stop();
} catch (Exception e) {
logger.error("Got an error while stopping", e);
}
});
vertx.setPeriodic(Constants.CLIENT_CONSOLE_INPUT_SCANNING_PERIOD_MS, clientPeriodicStdinScannerParticle);
}
@Override
@ -82,6 +73,7 @@ public class WebsocketClientParticle extends AbstractVerticle {
isShuttingDownActivated = true;
getVertx().undeploy(webApiParticle.deploymentID());
getVertx().undeploy(webSocketSenderParticle.deploymentID());
getVertx().undeploy(webSocketReceiverParticle.deploymentID());
getVertx().undeploy(clientPeriodicStdinScannerParticle.deploymentID());
client.close();
synchronized (runningMonitor) {

View File

@ -0,0 +1,33 @@
package ch.polgrabia.demos.server;
import ch.polgrabia.demos.utils.TopicUtil;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebsocketMessageHandler implements Handler<Buffer> {
private static final Logger logger = LoggerFactory.getLogger(WebsocketMessageHandler.class);
private final ServerWebSocket serverWebSocket;
private final Producer<String, byte[]> kafkaProducer;
public WebsocketMessageHandler(Producer<String, byte[]> kafkaProducer, ServerWebSocket serverWebSocket) {
this.kafkaProducer = kafkaProducer;
this.serverWebSocket = serverWebSocket;
this.serverWebSocket.setWriteQueueMaxSize(0);
}
@Override
public void handle(Buffer s) {
String channel = TopicUtil.convertTopicName(serverWebSocket.path());
logger.info("[{}] Got message {}", channel, s);
serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka
if (kafkaProducer != null) {
kafkaProducer.send(new ProducerRecord<>(channel, channel, s.getBytes())); // kafka setup needs to be checked
}
}
}

View File

@ -5,7 +5,6 @@ import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -55,27 +54,4 @@ public class WebsocketServerHandler implements Handler<ServerWebSocket> {
sockets.add(serverWebSocket);
}
static class WebsocketMessageHandler implements Handler<Buffer> {
private final ServerWebSocket serverWebSocket;
private final Producer<String, byte[]> kafkaProducer;
public WebsocketMessageHandler(Producer<String, byte[]> kafkaProducer, ServerWebSocket serverWebSocket) {
this.kafkaProducer = kafkaProducer;
this.serverWebSocket = serverWebSocket;
this.serverWebSocket.setWriteQueueMaxSize(0);
}
@Override
public void handle(Buffer s) {
String channel = TopicUtil.convertTopicName(serverWebSocket.path());
logger.info("[{}] Got message {}", channel, s);
serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka
if (kafkaProducer != null) {
kafkaProducer.send(new ProducerRecord<>(channel, channel, s.getBytes())); // kafka setup needs to be checked
}
}
}
}