Small refactoring in code.

master
Tomasz Polgrabia 2024-08-02 00:43:24 +02:00
parent b0c8930568
commit 22e0a6ffbd
4 changed files with 86 additions and 61 deletions

View File

@ -0,0 +1,57 @@
package ch.polgrabia.demos.client;
import io.vertx.core.Handler;
import io.vertx.core.http.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
public class ClientPeriodicStdinScanner implements Handler<Long> {
private static final Logger logger = LoggerFactory.getLogger(ClientPeriodicStdinScanner.class);
private final InputStream inputStream;
private ByteArrayOutputStream clientTextInput = new ByteArrayOutputStream();
private final WebSocket webSocket;
ClientPeriodicStdinScanner(WebSocket webSocket, InputStream inputStream) {
this.webSocket = webSocket;
this.inputStream = inputStream;
}
private static int newLineIndex(byte[] clientPayload) {
for (int i = 0; i < clientPayload.length; i++) {
if (clientPayload[i] == "\n".getBytes(StandardCharsets.UTF_8)[0]) {
return i;
}
}
return -1;
}
@Override
public void handle(Long event) {
{
try {
int available = inputStream.available();
if (available > 0) {
byte[] clientPayload = inputStream.readNBytes(available);
this.clientTextInput.write(clientPayload);
int idx = newLineIndex(clientPayload);
if (idx > 0) {
String payload = clientTextInput.toString(StandardCharsets.UTF_8);
// TODO here don't assume we deal with linux eol
int nidx = payload.indexOf('\n');
String textMsgToBeSent = payload.substring(0, nidx);
webSocket.writeTextMessage(textMsgToBeSent);
clientTextInput = new ByteArrayOutputStream();
clientTextInput.write(payload.substring(idx).getBytes(StandardCharsets.UTF_8));
}
}
} catch (IOException e) {
logger.error("Failed to read client input", e);
}
}
}
}

View File

@ -8,8 +8,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class WebsocketClientParticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(WebsocketClientParticle.class);
@ -20,7 +18,6 @@ public class WebsocketClientParticle extends AbstractVerticle {
private boolean isShuttingDownActivated = false;
private final Object runningMonitor = new Object();
private ByteArrayOutputStream clientTextInput;
private WebSocket webSocket;
public WebsocketClientParticle(String hostname, int port, String path) {
this.hostname = hostname;
@ -47,58 +44,20 @@ public class WebsocketClientParticle extends AbstractVerticle {
}
return;
}
this.clientTextInput = new ByteArrayOutputStream();
vertx.setPeriodic(100, this::handleClientTextInput);
webSocket = webSocketAsyncResult.result();
WebSocket webSocket = webSocketAsyncResult.result();
vertx.setPeriodic(100, new ClientPeriodicStdinScanner(webSocket, System.in));
webSocket
.textMessageHandler(this::handleText)
.closeHandler(this::stopHandler);
.textMessageHandler(msg -> logger.info("Got message: {}", msg))
.closeHandler(unused -> {
try {
stop();
} catch (Exception e) {
logger.error("Got an error while stopping", e);
}
});
}
private void handleClientTextInput(Long ms) {
try {
int available = System.in.available();
if (available > 0) {
byte[] clientPayload = System.in.readNBytes(available);
this.clientTextInput.write(clientPayload);
int idx = newLineIndex(clientPayload);
if (idx > 0) {
String payload = clientTextInput.toString(StandardCharsets.UTF_8);
int nidx = payload.indexOf('\n');
String textMsgToBeSent = payload.substring(0, nidx);
webSocket.writeTextMessage(textMsgToBeSent);
clientTextInput = new ByteArrayOutputStream();
clientTextInput.write(payload.substring(idx).getBytes(StandardCharsets.UTF_8));
}
}
} catch (IOException e) {
logger.error("Failed to read client input", e);
}
}
private int newLineIndex(byte[] clientPayload) {
for (int i = 0; i < clientPayload.length; i++) {
if (clientPayload[i] == "\n".getBytes(StandardCharsets.UTF_8)[0]) {
return i;
}
}
return -1;
}
private void stopHandler(Void unused) {
try {
stop();
} catch (Exception e) {
logger.error("Got an error while stopping", e);
}
}
private void handleText(String s) {
logger.info("Got message: {}", s);
}
@Override
public void stop() throws Exception {
isShuttingDownActivated = true;
@ -120,4 +79,5 @@ public class WebsocketClientParticle extends AbstractVerticle {
}
}
}
}

View File

@ -6,11 +6,14 @@ import org.slf4j.LoggerFactory;
public class WebsocketServerApp {
private static final Logger logger = LoggerFactory.getLogger(WebsocketServerApp.class);
public static void main(String[] args) {
var vertx = Vertx.vertx();
WebsocketServerChatVerticle websocketServerChatVerticle = new WebsocketServerChatVerticle(8080, "localhost");
vertx.deployVerticle(websocketServerChatVerticle);
websocketServerChatVerticle.waitUntilFinished();
websocketServerChatVerticle
.waitUntilFinished()
.join();
vertx.close();
}
}

View File

@ -5,6 +5,8 @@ import io.vertx.core.http.HttpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
public class WebsocketServerChatVerticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(WebsocketServerChatVerticle.class);
private HttpServer httpServer;
@ -36,16 +38,19 @@ public class WebsocketServerChatVerticle extends AbstractVerticle {
}
}
public void waitUntilFinished() {
while (!shutdownActivation) {
synchronized (runningMonitor) {
try {
runningMonitor.wait();
return;
} catch (InterruptedException e) {
logger.warn("We got interrupted, retrying", e);;
public CompletableFuture<Boolean> waitUntilFinished() {
return CompletableFuture.supplyAsync(() -> {
while (!shutdownActivation) {
synchronized (runningMonitor) {
try {
runningMonitor.wait();
return true;
} catch (InterruptedException e) {
logger.warn("We got interrupted, retrying", e);
}
}
}
}
return false;
});
}
}