From 001fe270c15d1aea13ab38369ac4eb4ba9d6eb1d Mon Sep 17 00:00:00 2001 From: Tomasz Polgrabia Date: Thu, 23 Jan 2025 23:16:25 +0100 Subject: [PATCH] Small refactorings for client. --- 2024/08/chat_demo1/.editorconfig | 10 + .../client/ClientPeriodicStdinScanner.java | 79 -------- .../ClientPeriodicStdinScannerParticle.java | 78 ++++++++ .../ch/polgrabia/demos/client/Constants.java | 6 + .../demos/client/WebApiParticle.java | 54 +++++ .../demos/client/WebSocketSenderParticle.java | 37 ++++ .../demos/client/WebsocketClientApp.java | 1 + .../demos/client/WebsocketClientParticle.java | 187 ++++++++---------- 8 files changed, 269 insertions(+), 183 deletions(-) create mode 100644 2024/08/chat_demo1/.editorconfig delete mode 100644 2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScanner.java create mode 100644 2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScannerParticle.java create mode 100644 2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/Constants.java create mode 100644 2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebApiParticle.java create mode 100644 2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebSocketSenderParticle.java diff --git a/2024/08/chat_demo1/.editorconfig b/2024/08/chat_demo1/.editorconfig new file mode 100644 index 0000000..546a2c8 --- /dev/null +++ b/2024/08/chat_demo1/.editorconfig @@ -0,0 +1,10 @@ +root = true + +[*] +charset = utf-8 +end_of_line = lf +indent_size = 2 +indent_style = space +insert_final_newline = false +max_line_length = 120 +tab_width = 2 diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScanner.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScanner.java deleted file mode 100644 index 6dd178f..0000000 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScanner.java +++ /dev/null @@ -1,79 +0,0 @@ -package ch.polgrabia.demos.client; - -import io.vertx.core.Handler; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.WebSocket; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; - -public class ClientPeriodicStdinScanner implements Handler { - private static final Logger logger = LoggerFactory.getLogger(ClientPeriodicStdinScanner.class); - private final InputStream inputStream; - private Buffer clientPayloadRemainderBuffer = Buffer.buffer(); - private final WebSocket webSocket; - - ClientPeriodicStdinScanner(WebSocket webSocket, InputStream inputStream) { - this.webSocket = webSocket; - this.webSocket.setWriteQueueMaxSize(0); - this.inputStream = inputStream; - } - - private int newLineIndex(byte[] clientPayload, int prevIdx) { - for (int i = prevIdx; i < clientPayload.length; i++) { - if (clientPayload[i] == '\n' || clientPayload[i] == '\r' ) { - return i; - } - } - return -1; - } - - @Override - public void handle(Long event) { - { - // TODO this doesn't seem to be working on wsl environment. Needs to be resolved or changed how it - // fetches the input - try { - int available = inputStream.available(); - if (available > 0) { - logger.debug("Got available: {}", available); - clientPayloadRemainderBuffer.appendBytes(inputStream.readNBytes(available)); - } - - byte[] clientPayload = clientPayloadRemainderBuffer.getBytes(); - - int idx, prevIdx = 0; - do { - idx = newLineIndex(clientPayload, prevIdx); - if (idx >= 0) { - // TODO here don't assume we deal with linux eol - var eolLength = - idx < clientPayload.length - 1 - && clientPayload[idx] == '\r' - && clientPayload[idx + 1] == '\n' - ? 2 : 1; - int len = idx - prevIdx; - Buffer b = Buffer.buffer(len); - b = b.appendBytes(clientPayload, prevIdx, len); - b = b.appendByte((byte)'\n'); - logger.info("Sending binary message: {}", b);; - webSocket.writeBinaryMessage(b); - prevIdx = idx + eolLength; - } - } while (idx >= 0 && prevIdx < clientPayload.length); - int remainderLength = clientPayload.length - prevIdx + 1; - if (remainderLength > 0 && prevIdx < clientPayload.length) { - clientPayloadRemainderBuffer = Buffer.buffer(remainderLength) - .appendBytes(clientPayload, prevIdx, remainderLength); - } else { - clientPayloadRemainderBuffer = Buffer.buffer(); - } - } catch (IOException e) { - logger.error("Failed to read client input", e); - } - } - } -} diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScannerParticle.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScannerParticle.java new file mode 100644 index 0000000..80fc312 --- /dev/null +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScannerParticle.java @@ -0,0 +1,78 @@ +package ch.polgrabia.demos.client; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.WebSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; + +public class ClientPeriodicStdinScannerParticle extends AbstractVerticle implements Handler { + private static final Logger logger = LoggerFactory.getLogger(ClientPeriodicStdinScannerParticle.class); + private final InputStream inputStream; + private Buffer clientPayloadRemainderBuffer = Buffer.buffer(); + + ClientPeriodicStdinScannerParticle(InputStream inputStream) { + this.inputStream = inputStream; + } + + private int newLineIndex(byte[] clientPayload, int prevIdx) { + for (int i = prevIdx; i < clientPayload.length; i++) { + if (clientPayload[i] == '\n' || clientPayload[i] == '\r') { + return i; + } + } + return -1; + } + + @Override + public void handle(Long event) { + { + // TODO this doesn't seem to be working on wsl environment. Needs to be resolved or changed how it + // fetches the input + try { + int available = inputStream.available(); + if (available > 0) { + logger.debug("Got available: {}", available); + clientPayloadRemainderBuffer.appendBytes(inputStream.readNBytes(available)); + } + + byte[] clientPayload = clientPayloadRemainderBuffer.getBytes(); + + int idx, prevIdx = 0; + do { + idx = newLineIndex(clientPayload, prevIdx); + if (idx >= 0) { + // TODO here don't assume we deal with linux eol + var eolLength = + idx < clientPayload.length - 1 + && clientPayload[idx] == '\r' + && clientPayload[idx + 1] == '\n' + ? 2 : 1; + int len = idx - prevIdx; + Buffer b = Buffer.buffer(len); + b = b.appendBytes(clientPayload, prevIdx, len); + b = b.appendByte((byte) '\n'); + logger.info("Sending binary message: {}", b); + getVertx() + .eventBus() + .send(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, b); + prevIdx = idx + eolLength; + } + } while (idx >= 0 && prevIdx < clientPayload.length); + int remainderLength = clientPayload.length - prevIdx + 1; + if (remainderLength > 0 && prevIdx < clientPayload.length) { + clientPayloadRemainderBuffer = Buffer.buffer(remainderLength) + .appendBytes(clientPayload, prevIdx, remainderLength); + } else { + clientPayloadRemainderBuffer = Buffer.buffer(); + } + } catch (IOException e) { + logger.error("Failed to read client input", e); + } + } + } +} diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/Constants.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/Constants.java new file mode 100644 index 0000000..fe17ade --- /dev/null +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/Constants.java @@ -0,0 +1,6 @@ +package ch.polgrabia.demos.client; + +public class Constants { + public static final String CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY = "%s#send" + .formatted(WebSocketSenderParticle.class.getCanonicalName()); +} diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebApiParticle.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebApiParticle.java new file mode 100644 index 0000000..0500f7d --- /dev/null +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebApiParticle.java @@ -0,0 +1,54 @@ +package ch.polgrabia.demos.client; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.AsyncResult; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WebApiParticle extends AbstractVerticle { + private static final Logger logger = LoggerFactory.getLogger(WebApiParticle.class); + private HttpServer server; + + @Override + public void start() throws Exception { + HttpServerOptions httpServerOptions = new HttpServerOptions(); + this.server = getVertx().createHttpServer(httpServerOptions); + this.server.requestHandler(this::handleCommandRequests); + this.server.listen(8123, "localhost", this::handleListenCommand); + } + + @Override + public void stop() throws Exception { + server.close(); + } + + private void handleCommandRequests(HttpServerRequest httpServerRequest) { + logger.info("Received command request on 8123"); + HttpServerResponse response = httpServerRequest.response(); + String qParam = httpServerRequest.getParam("q"); + if (qParam == null || qParam.isBlank()) { + response.setStatusCode(400); + response.end("bad request"); + return; + } + logger.info("Sending binary message: {}", qParam); + getVertx().eventBus().send(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, Buffer.buffer(qParam)); + response.setStatusCode(202); + response.end("Ok"); + } + + private void handleListenCommand(AsyncResult httpServerAsyncResult) { + if (httpServerAsyncResult.failed()) { + logger.warn("Couldn't initialized http command server"); + getVertx().close(); + System.exit(1); + } + + logger.info("Successfully started websocket client command server"); + } +} diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebSocketSenderParticle.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebSocketSenderParticle.java new file mode 100644 index 0000000..9d8bfcd --- /dev/null +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebSocketSenderParticle.java @@ -0,0 +1,37 @@ +package ch.polgrabia.demos.client; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Handler; +import io.vertx.core.Verticle; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageConsumer; +import io.vertx.core.http.WebSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WebSocketSenderParticle extends AbstractVerticle { + private static final Logger logger = LoggerFactory.getLogger(WebSocketSenderParticle.class); + private final WebSocket webSocket; + private MessageConsumer consumer; + + public WebSocketSenderParticle(WebSocket webSocket) { + this.webSocket = webSocket; + } + + @Override + public void start() throws Exception { + EventBus eventBus = getVertx().eventBus(); + this.consumer = eventBus.consumer(WebSocketSenderParticle.class.getCanonicalName(), event -> { + Buffer msg = event.body(); + logger.info("Sending binary message: {}", msg); + webSocket.writeBinaryMessage(msg); + }); + } + + @Override + public void stop() throws Exception { + consumer.unregister().result(); + } +} diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientApp.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientApp.java index fa5d766..c5eb93c 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientApp.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientApp.java @@ -15,6 +15,7 @@ public class WebsocketClientApp { websocketClientParticle .waitingUntilFinished() .join(); + vertx.undeploy(websocketClientParticle.deploymentID()); vertx.close(); } } diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java index e698f19..16660b1 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java @@ -3,8 +3,9 @@ package ch.polgrabia.demos.client; import ch.polgrabia.demos.utils.TopicUtil; import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.*; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientOptions; +import io.vertx.core.http.WebSocket; import io.vertx.core.http.impl.WebSocketClientImpl; import io.vertx.core.impl.CloseFuture; import io.vertx.core.impl.VertxInternal; @@ -14,116 +15,94 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; public class WebsocketClientParticle extends AbstractVerticle { - private static final Logger logger = LoggerFactory.getLogger(WebsocketClientParticle.class); - private final String hostname; - private final int port; - // path maps to channel - private final String path; - private final String channel; - private HttpClient client; - private boolean isShuttingDownActivated = false; - private final Object runningMonitor = new Object(); - private HttpServer server; - private WebSocket webSocket; + private static final Logger logger = LoggerFactory.getLogger(WebsocketClientParticle.class); + private final String hostname; + private final int port; + // path maps to channel + private final String path; + private final String channel; + private final ClientPeriodicStdinScannerParticle clientPeriodicStdinScannerParticle = new ClientPeriodicStdinScannerParticle( + System.in + ); + private final WebApiParticle webApiParticle = new WebApiParticle(); + private HttpClient client; + private boolean isShuttingDownActivated = false; + private final Object runningMonitor = new Object(); + private WebSocket webSocket; + private WebSocketSenderParticle webSocketSenderParticle; - public WebsocketClientParticle(String hostname, int port, String path) { - this.hostname = hostname; - this.port = port; - this.path = path; - this.channel = TopicUtil.convertTopicName(path); + public WebsocketClientParticle(String hostname, int port, String path) { + this.hostname = hostname; + this.port = port; + this.path = path; + this.channel = TopicUtil.convertTopicName(path); + } + + @Override + public void start() { + this.client = getVertx() + .createHttpClient(); + logger.info("Starting websocket client command server"); + new WebSocketClientImpl((VertxInternal) getVertx(), new HttpClientOptions(), new CloseFuture()) + .webSocket(port, hostname, path, this::handleMessage); + getVertx().deployVerticle(clientPeriodicStdinScannerParticle); + getVertx().deployVerticle(webApiParticle); + } + + private void handleMessage(AsyncResult webSocketAsyncResult) { + logger.info("Handling websocket"); + if (!webSocketAsyncResult.succeeded()) { + logger.warn("Websocket handler connection failed", webSocketAsyncResult.cause()); + try { + stop(); + } catch (Exception e) { + logger.error("Failed to stop particle", e); + } + return; } + this.webSocket = webSocketAsyncResult.result(); + webSocketSenderParticle = new WebSocketSenderParticle(webSocket); + getVertx().deployVerticle(webSocketSenderParticle); + vertx.setPeriodic(100, clientPeriodicStdinScannerParticle); - @Override - public void start() { - this.client = getVertx() - .createHttpClient(); - HttpServerOptions httpServerOptions = new HttpServerOptions(); - this.server = getVertx().createHttpServer(httpServerOptions); - this.server.requestHandler(this::handleCommandRequests); - this.server.listen(8123, "localhost", this::handleListenCommand); - logger.info("Starting websocket client command server"); - new WebSocketClientImpl((VertxInternal) getVertx(), new HttpClientOptions(), new CloseFuture()) - .webSocket(port, hostname, path, this::handleMessage); - } - - private void handleListenCommand(AsyncResult httpServerAsyncResult) { - if (httpServerAsyncResult.failed()) { - logger.warn("Couldn't initialized http command server"); - getVertx().close(); - System.exit(1); + webSocket + .binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", channel, msg)) + .closeHandler(unused -> { + try { + stop(); + } catch (Exception e) { + logger.error("Got an error while stopping", e); } + }); - logger.info("Successfully started websocket client command server"); + } + + @Override + public void stop() { + isShuttingDownActivated = true; + getVertx().undeploy(webApiParticle.deploymentID()); + getVertx().undeploy(webSocketSenderParticle.deploymentID()); + getVertx().undeploy(clientPeriodicStdinScannerParticle.deploymentID()); + client.close(); + synchronized (runningMonitor) { + runningMonitor.notifyAll(); } + } - private void handleCommandRequests(HttpServerRequest httpServerRequest) { - logger.info("Received command request on 8123"); - HttpServerResponse response = httpServerRequest.response(); - String qParam = httpServerRequest.getParam("q"); - if (qParam == null || qParam.isBlank()) { - response.setStatusCode(400); - response.end("bad request"); - return; - } - logger.info("Sending binary message: {}", qParam); - webSocket.writeBinaryMessage(Buffer.buffer(qParam)); - response.setStatusCode(202); - response.end("Ok"); - } - - private void handleMessage(AsyncResult webSocketAsyncResult) { - logger.info("Handling websocket"); - if (!webSocketAsyncResult.succeeded()) { - logger.warn("Websocket handler connection failed", webSocketAsyncResult.cause()); - try { - stop(); - } catch (Exception e) { - logger.error("Failed to stop particle", e); - } - return; - } - this.webSocket = webSocketAsyncResult.result(); - vertx.setPeriodic(100, new ClientPeriodicStdinScanner( - webSocket, - System.in - )); - - webSocket - .binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", channel, msg)) - .closeHandler(unused -> { - try { - stop(); - } catch (Exception e) { - logger.error("Got an error while stopping", e); - } - }); - - } - - @Override - public void stop() { - isShuttingDownActivated = true; - client.close(); - server.close(); + public CompletableFuture waitingUntilFinished() { + return CompletableFuture.supplyAsync(() -> { + while (!isShuttingDownActivated) { synchronized (runningMonitor) { - runningMonitor.notifyAll(); + try { + runningMonitor.wait(); + return true; + } catch (InterruptedException e) { + logger.warn("Got interrupted", e); + } } - } - - public CompletableFuture waitingUntilFinished() { - return CompletableFuture.supplyAsync(() -> { - while (!isShuttingDownActivated) { - synchronized (runningMonitor) { - try { - runningMonitor.wait(); - return true; - } catch (InterruptedException e) { - logger.warn("Got interrupted", e); - } - } - } - return false; - }); - } + } + return false; + }); + } }