Improved scanning logic for newlines.
parent
4b34b31928
commit
b29a3128bd
|
@ -1,11 +1,11 @@
|
||||||
package ch.polgrabia.demos.client;
|
package ch.polgrabia.demos.client;
|
||||||
|
|
||||||
import io.vertx.core.Handler;
|
import io.vertx.core.Handler;
|
||||||
|
import io.vertx.core.buffer.Buffer;
|
||||||
import io.vertx.core.http.WebSocket;
|
import io.vertx.core.http.WebSocket;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
@ -13,17 +13,18 @@ import java.nio.charset.StandardCharsets;
|
||||||
public class ClientPeriodicStdinScanner implements Handler<Long> {
|
public class ClientPeriodicStdinScanner implements Handler<Long> {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ClientPeriodicStdinScanner.class);
|
private static final Logger logger = LoggerFactory.getLogger(ClientPeriodicStdinScanner.class);
|
||||||
private final InputStream inputStream;
|
private final InputStream inputStream;
|
||||||
private ByteArrayOutputStream clientTextInput = new ByteArrayOutputStream();
|
private Buffer clientPayloadRemainderBuffer = Buffer.buffer();
|
||||||
private final WebSocket webSocket;
|
private final WebSocket webSocket;
|
||||||
|
|
||||||
ClientPeriodicStdinScanner(WebSocket webSocket, InputStream inputStream) {
|
ClientPeriodicStdinScanner(WebSocket webSocket, InputStream inputStream) {
|
||||||
this.webSocket = webSocket;
|
this.webSocket = webSocket;
|
||||||
|
this.webSocket.setWriteQueueMaxSize(0);
|
||||||
this.inputStream = inputStream;
|
this.inputStream = inputStream;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int newLineIndex(byte[] clientPayload) {
|
private int newLineIndex(byte[] clientPayload, int prevIdx) {
|
||||||
for (int i = 0; i < clientPayload.length; i++) {
|
for (int i = prevIdx; i < clientPayload.length; i++) {
|
||||||
if (clientPayload[i] == "\n".getBytes(StandardCharsets.UTF_8)[0]) {
|
if (clientPayload[i] == '\n' || clientPayload[i] == '\r' ) {
|
||||||
return i;
|
return i;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,18 +37,35 @@ public class ClientPeriodicStdinScanner implements Handler<Long> {
|
||||||
try {
|
try {
|
||||||
int available = inputStream.available();
|
int available = inputStream.available();
|
||||||
if (available > 0) {
|
if (available > 0) {
|
||||||
byte[] clientPayload = inputStream.readNBytes(available);
|
clientPayloadRemainderBuffer.appendBytes(inputStream.readNBytes(available));
|
||||||
this.clientTextInput.write(clientPayload);
|
|
||||||
int idx = newLineIndex(clientPayload);
|
|
||||||
if (idx > 0) {
|
|
||||||
String payload = clientTextInput.toString(StandardCharsets.UTF_8);
|
|
||||||
// TODO here don't assume we deal with linux eol
|
|
||||||
int nidx = payload.indexOf('\n');
|
|
||||||
String textMsgToBeSent = payload.substring(0, nidx);
|
|
||||||
webSocket.writeTextMessage(textMsgToBeSent);
|
|
||||||
clientTextInput = new ByteArrayOutputStream();
|
|
||||||
clientTextInput.write(payload.substring(idx).getBytes(StandardCharsets.UTF_8));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
byte[] clientPayload = clientPayloadRemainderBuffer.getBytes();
|
||||||
|
|
||||||
|
int idx, prevIdx = 0;
|
||||||
|
do {
|
||||||
|
idx = newLineIndex(clientPayload, prevIdx);
|
||||||
|
if (idx >= 0) {
|
||||||
|
// TODO here don't assume we deal with linux eol
|
||||||
|
var eolLength =
|
||||||
|
idx < clientPayload.length - 1
|
||||||
|
&& clientPayload[idx] == '\r'
|
||||||
|
&& clientPayload[idx + 1] == '\n'
|
||||||
|
? 2 : 1;
|
||||||
|
int len = idx - prevIdx;
|
||||||
|
Buffer b = Buffer.buffer(len);
|
||||||
|
b = b.appendBytes(clientPayload, prevIdx, len);
|
||||||
|
b = b.appendByte((byte)'\n');
|
||||||
|
webSocket.writeTextMessage(b.toString(StandardCharsets.UTF_8));
|
||||||
|
prevIdx = idx + eolLength;
|
||||||
|
}
|
||||||
|
} while (idx >= 0 && prevIdx < clientPayload.length);
|
||||||
|
int remainderLength = clientPayload.length - prevIdx + 1;
|
||||||
|
if (remainderLength > 0 && prevIdx < clientPayload.length) {
|
||||||
|
clientPayloadRemainderBuffer = Buffer.buffer(remainderLength)
|
||||||
|
.appendBytes(clientPayload, prevIdx, remainderLength);
|
||||||
|
} else {
|
||||||
|
clientPayloadRemainderBuffer = Buffer.buffer();
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.error("Failed to read client input", e);
|
logger.error("Failed to read client input", e);
|
||||||
|
|
|
@ -50,7 +50,11 @@ public class WebsocketClientParticle extends AbstractVerticle {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
WebSocket webSocket = webSocketAsyncResult.result();
|
WebSocket webSocket = webSocketAsyncResult.result();
|
||||||
vertx.setPeriodic(100, new ClientPeriodicStdinScanner(webSocket, System.in));
|
vertx.setPeriodic(100, new ClientPeriodicStdinScanner(
|
||||||
|
webSocket,
|
||||||
|
System.in
|
||||||
|
));
|
||||||
|
|
||||||
webSocket
|
webSocket
|
||||||
.textMessageHandler(msg -> logger.info("Got message: {}", msg))
|
.textMessageHandler(msg -> logger.info("Got message: {}", msg))
|
||||||
.closeHandler(unused -> {
|
.closeHandler(unused -> {
|
||||||
|
|
Loading…
Reference in New Issue