From b29a3128bdf8ffc108dc420384a5254adffa0d26 Mon Sep 17 00:00:00 2001 From: Tomasz Polgrabia Date: Fri, 27 Dec 2024 19:16:26 +0100 Subject: [PATCH] Improved scanning logic for newlines. --- .../client/ClientPeriodicStdinScanner.java | 48 +++++++++++++------ .../demos/client/WebsocketClientParticle.java | 6 ++- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScanner.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScanner.java index 1e0c61f..aa19daa 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScanner.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/ClientPeriodicStdinScanner.java @@ -1,11 +1,11 @@ package ch.polgrabia.demos.client; import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; import io.vertx.core.http.WebSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; @@ -13,17 +13,18 @@ import java.nio.charset.StandardCharsets; public class ClientPeriodicStdinScanner implements Handler { private static final Logger logger = LoggerFactory.getLogger(ClientPeriodicStdinScanner.class); private final InputStream inputStream; - private ByteArrayOutputStream clientTextInput = new ByteArrayOutputStream(); + private Buffer clientPayloadRemainderBuffer = Buffer.buffer(); private final WebSocket webSocket; ClientPeriodicStdinScanner(WebSocket webSocket, InputStream inputStream) { this.webSocket = webSocket; + this.webSocket.setWriteQueueMaxSize(0); this.inputStream = inputStream; } - private static int newLineIndex(byte[] clientPayload) { - for (int i = 0; i < clientPayload.length; i++) { - if (clientPayload[i] == "\n".getBytes(StandardCharsets.UTF_8)[0]) { + private int newLineIndex(byte[] clientPayload, int prevIdx) { + for (int i = prevIdx; i < clientPayload.length; i++) { + if (clientPayload[i] == '\n' || clientPayload[i] == '\r' ) { return i; } } @@ -36,18 +37,35 @@ public class ClientPeriodicStdinScanner implements Handler { try { int available = inputStream.available(); if (available > 0) { - byte[] clientPayload = inputStream.readNBytes(available); - this.clientTextInput.write(clientPayload); - int idx = newLineIndex(clientPayload); - if (idx > 0) { - String payload = clientTextInput.toString(StandardCharsets.UTF_8); + clientPayloadRemainderBuffer.appendBytes(inputStream.readNBytes(available)); + } + + byte[] clientPayload = clientPayloadRemainderBuffer.getBytes(); + + int idx, prevIdx = 0; + do { + idx = newLineIndex(clientPayload, prevIdx); + if (idx >= 0) { // TODO here don't assume we deal with linux eol - int nidx = payload.indexOf('\n'); - String textMsgToBeSent = payload.substring(0, nidx); - webSocket.writeTextMessage(textMsgToBeSent); - clientTextInput = new ByteArrayOutputStream(); - clientTextInput.write(payload.substring(idx).getBytes(StandardCharsets.UTF_8)); + var eolLength = + idx < clientPayload.length - 1 + && clientPayload[idx] == '\r' + && clientPayload[idx + 1] == '\n' + ? 2 : 1; + int len = idx - prevIdx; + Buffer b = Buffer.buffer(len); + b = b.appendBytes(clientPayload, prevIdx, len); + b = b.appendByte((byte)'\n'); + webSocket.writeTextMessage(b.toString(StandardCharsets.UTF_8)); + prevIdx = idx + eolLength; } + } while (idx >= 0 && prevIdx < clientPayload.length); + int remainderLength = clientPayload.length - prevIdx + 1; + if (remainderLength > 0 && prevIdx < clientPayload.length) { + clientPayloadRemainderBuffer = Buffer.buffer(remainderLength) + .appendBytes(clientPayload, prevIdx, remainderLength); + } else { + clientPayloadRemainderBuffer = Buffer.buffer(); } } catch (IOException e) { logger.error("Failed to read client input", e); diff --git a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java index 0acf33a..1dd2583 100644 --- a/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java +++ b/2024/08/chat_demo1/chat_demo1_web/src/main/java/ch/polgrabia/demos/client/WebsocketClientParticle.java @@ -50,7 +50,11 @@ public class WebsocketClientParticle extends AbstractVerticle { return; } WebSocket webSocket = webSocketAsyncResult.result(); - vertx.setPeriodic(100, new ClientPeriodicStdinScanner(webSocket, System.in)); + vertx.setPeriodic(100, new ClientPeriodicStdinScanner( + webSocket, + System.in + )); + webSocket .textMessageHandler(msg -> logger.info("Got message: {}", msg)) .closeHandler(unused -> {