From 22e0a6ffbd97e75368649170e39cd5372ae39bf9 Mon Sep 17 00:00:00 2001 From: Tomasz Polgrabia Date: Fri, 2 Aug 2024 00:43:24 +0200 Subject: [PATCH] Small refactoring in code. --- .../client/ClientPeriodicStdinScanner.java | 57 +++++++++++++++++ .../demos/client/WebsocketClientParticle.java | 62 ++++--------------- .../demos/server/WebsocketServerApp.java | 5 +- .../server/WebsocketServerChatVerticle.java | 23 ++++--- 4 files changed, 86 insertions(+), 61 deletions(-) create mode 100644 2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScanner.java diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScanner.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScanner.java new file mode 100644 index 0000000..1e0c61f --- /dev/null +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScanner.java @@ -0,0 +1,57 @@ +package ch.polgrabia.demos.client; + +import io.vertx.core.Handler; +import io.vertx.core.http.WebSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +public class ClientPeriodicStdinScanner implements Handler { + private static final Logger logger = LoggerFactory.getLogger(ClientPeriodicStdinScanner.class); + private final InputStream inputStream; + private ByteArrayOutputStream clientTextInput = new ByteArrayOutputStream(); + private final WebSocket webSocket; + + ClientPeriodicStdinScanner(WebSocket webSocket, InputStream inputStream) { + this.webSocket = webSocket; + this.inputStream = inputStream; + } + + private static int newLineIndex(byte[] clientPayload) { + for (int i = 0; i < clientPayload.length; i++) { + if (clientPayload[i] == "\n".getBytes(StandardCharsets.UTF_8)[0]) { + return i; + } + } + return -1; + } + + @Override + public void handle(Long event) { + { + try { + int available = inputStream.available(); + if (available > 0) { + byte[] clientPayload = inputStream.readNBytes(available); + this.clientTextInput.write(clientPayload); + int idx = newLineIndex(clientPayload); + if (idx > 0) { + String payload = clientTextInput.toString(StandardCharsets.UTF_8); + // TODO here don't assume we deal with linux eol + int nidx = payload.indexOf('\n'); + String textMsgToBeSent = payload.substring(0, nidx); + webSocket.writeTextMessage(textMsgToBeSent); + clientTextInput = new ByteArrayOutputStream(); + clientTextInput.write(payload.substring(idx).getBytes(StandardCharsets.UTF_8)); + } + } + } catch (IOException e) { + logger.error("Failed to read client input", e); + } + } + } +} diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java index 5a70648..970f359 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java @@ -8,8 +8,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; public class WebsocketClientParticle extends AbstractVerticle { private static final Logger logger = LoggerFactory.getLogger(WebsocketClientParticle.class); @@ -20,7 +18,6 @@ public class WebsocketClientParticle extends AbstractVerticle { private boolean isShuttingDownActivated = false; private final Object runningMonitor = new Object(); private ByteArrayOutputStream clientTextInput; - private WebSocket webSocket; public WebsocketClientParticle(String hostname, int port, String path) { this.hostname = hostname; @@ -47,58 +44,20 @@ public class WebsocketClientParticle extends AbstractVerticle { } return; } - - this.clientTextInput = new ByteArrayOutputStream(); - vertx.setPeriodic(100, this::handleClientTextInput); - webSocket = webSocketAsyncResult.result(); + WebSocket webSocket = webSocketAsyncResult.result(); + vertx.setPeriodic(100, new ClientPeriodicStdinScanner(webSocket, System.in)); webSocket - .textMessageHandler(this::handleText) - .closeHandler(this::stopHandler); + .textMessageHandler(msg -> logger.info("Got message: {}", msg)) + .closeHandler(unused -> { + try { + stop(); + } catch (Exception e) { + logger.error("Got an error while stopping", e); + } + }); } - private void handleClientTextInput(Long ms) { - try { - int available = System.in.available(); - if (available > 0) { - byte[] clientPayload = System.in.readNBytes(available); - this.clientTextInput.write(clientPayload); - int idx = newLineIndex(clientPayload); - if (idx > 0) { - String payload = clientTextInput.toString(StandardCharsets.UTF_8); - int nidx = payload.indexOf('\n'); - String textMsgToBeSent = payload.substring(0, nidx); - webSocket.writeTextMessage(textMsgToBeSent); - clientTextInput = new ByteArrayOutputStream(); - clientTextInput.write(payload.substring(idx).getBytes(StandardCharsets.UTF_8)); - } - } - } catch (IOException e) { - logger.error("Failed to read client input", e); - } - } - - private int newLineIndex(byte[] clientPayload) { - for (int i = 0; i < clientPayload.length; i++) { - if (clientPayload[i] == "\n".getBytes(StandardCharsets.UTF_8)[0]) { - return i; - } - } - return -1; - } - - private void stopHandler(Void unused) { - try { - stop(); - } catch (Exception e) { - logger.error("Got an error while stopping", e); - } - } - - private void handleText(String s) { - logger.info("Got message: {}", s); - } - @Override public void stop() throws Exception { isShuttingDownActivated = true; @@ -120,4 +79,5 @@ public class WebsocketClientParticle extends AbstractVerticle { } } } + } diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerApp.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerApp.java index 6c029c6..7993344 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerApp.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerApp.java @@ -6,11 +6,14 @@ import org.slf4j.LoggerFactory; public class WebsocketServerApp { private static final Logger logger = LoggerFactory.getLogger(WebsocketServerApp.class); + public static void main(String[] args) { var vertx = Vertx.vertx(); WebsocketServerChatVerticle websocketServerChatVerticle = new WebsocketServerChatVerticle(8080, "localhost"); vertx.deployVerticle(websocketServerChatVerticle); - websocketServerChatVerticle.waitUntilFinished(); + websocketServerChatVerticle + .waitUntilFinished() + .join(); vertx.close(); } } \ No newline at end of file diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerChatVerticle.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerChatVerticle.java index 6d26501..3636c22 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerChatVerticle.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/server/WebsocketServerChatVerticle.java @@ -5,6 +5,8 @@ import io.vertx.core.http.HttpServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; + public class WebsocketServerChatVerticle extends AbstractVerticle { private static final Logger logger = LoggerFactory.getLogger(WebsocketServerChatVerticle.class); private HttpServer httpServer; @@ -36,16 +38,19 @@ public class WebsocketServerChatVerticle extends AbstractVerticle { } } - public void waitUntilFinished() { - while (!shutdownActivation) { - synchronized (runningMonitor) { - try { - runningMonitor.wait(); - return; - } catch (InterruptedException e) { - logger.warn("We got interrupted, retrying", e);; + public CompletableFuture waitUntilFinished() { + return CompletableFuture.supplyAsync(() -> { + while (!shutdownActivation) { + synchronized (runningMonitor) { + try { + runningMonitor.wait(); + return true; + } catch (InterruptedException e) { + logger.warn("We got interrupted, retrying", e); + } } } - } + return false; + }); } }