Small refactorings for client.
parent
4e6a49b0f5
commit
001fe270c1
2024/08/chat_demo1
chat_demo1_web/src/main/java/ch/polgrabia/demos/client
|
@ -0,0 +1,10 @@
|
|||
root = true
|
||||
|
||||
[*]
|
||||
charset = utf-8
|
||||
end_of_line = lf
|
||||
indent_size = 2
|
||||
indent_style = space
|
||||
insert_final_newline = false
|
||||
max_line_length = 120
|
||||
tab_width = 2
|
|
@ -1,79 +0,0 @@
|
|||
package ch.polgrabia.demos.client;
|
||||
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.http.WebSocket;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
public class ClientPeriodicStdinScanner implements Handler<Long> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ClientPeriodicStdinScanner.class);
|
||||
private final InputStream inputStream;
|
||||
private Buffer clientPayloadRemainderBuffer = Buffer.buffer();
|
||||
private final WebSocket webSocket;
|
||||
|
||||
ClientPeriodicStdinScanner(WebSocket webSocket, InputStream inputStream) {
|
||||
this.webSocket = webSocket;
|
||||
this.webSocket.setWriteQueueMaxSize(0);
|
||||
this.inputStream = inputStream;
|
||||
}
|
||||
|
||||
private int newLineIndex(byte[] clientPayload, int prevIdx) {
|
||||
for (int i = prevIdx; i < clientPayload.length; i++) {
|
||||
if (clientPayload[i] == '\n' || clientPayload[i] == '\r' ) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(Long event) {
|
||||
{
|
||||
// TODO this doesn't seem to be working on wsl environment. Needs to be resolved or changed how it
|
||||
// fetches the input
|
||||
try {
|
||||
int available = inputStream.available();
|
||||
if (available > 0) {
|
||||
logger.debug("Got available: {}", available);
|
||||
clientPayloadRemainderBuffer.appendBytes(inputStream.readNBytes(available));
|
||||
}
|
||||
|
||||
byte[] clientPayload = clientPayloadRemainderBuffer.getBytes();
|
||||
|
||||
int idx, prevIdx = 0;
|
||||
do {
|
||||
idx = newLineIndex(clientPayload, prevIdx);
|
||||
if (idx >= 0) {
|
||||
// TODO here don't assume we deal with linux eol
|
||||
var eolLength =
|
||||
idx < clientPayload.length - 1
|
||||
&& clientPayload[idx] == '\r'
|
||||
&& clientPayload[idx + 1] == '\n'
|
||||
? 2 : 1;
|
||||
int len = idx - prevIdx;
|
||||
Buffer b = Buffer.buffer(len);
|
||||
b = b.appendBytes(clientPayload, prevIdx, len);
|
||||
b = b.appendByte((byte)'\n');
|
||||
logger.info("Sending binary message: {}", b);;
|
||||
webSocket.writeBinaryMessage(b);
|
||||
prevIdx = idx + eolLength;
|
||||
}
|
||||
} while (idx >= 0 && prevIdx < clientPayload.length);
|
||||
int remainderLength = clientPayload.length - prevIdx + 1;
|
||||
if (remainderLength > 0 && prevIdx < clientPayload.length) {
|
||||
clientPayloadRemainderBuffer = Buffer.buffer(remainderLength)
|
||||
.appendBytes(clientPayload, prevIdx, remainderLength);
|
||||
} else {
|
||||
clientPayloadRemainderBuffer = Buffer.buffer();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error("Failed to read client input", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
package ch.polgrabia.demos.client;
|
||||
|
||||
import io.vertx.core.AbstractVerticle;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.http.WebSocket;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
public class ClientPeriodicStdinScannerParticle extends AbstractVerticle implements Handler<Long> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ClientPeriodicStdinScannerParticle.class);
|
||||
private final InputStream inputStream;
|
||||
private Buffer clientPayloadRemainderBuffer = Buffer.buffer();
|
||||
|
||||
ClientPeriodicStdinScannerParticle(InputStream inputStream) {
|
||||
this.inputStream = inputStream;
|
||||
}
|
||||
|
||||
private int newLineIndex(byte[] clientPayload, int prevIdx) {
|
||||
for (int i = prevIdx; i < clientPayload.length; i++) {
|
||||
if (clientPayload[i] == '\n' || clientPayload[i] == '\r') {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(Long event) {
|
||||
{
|
||||
// TODO this doesn't seem to be working on wsl environment. Needs to be resolved or changed how it
|
||||
// fetches the input
|
||||
try {
|
||||
int available = inputStream.available();
|
||||
if (available > 0) {
|
||||
logger.debug("Got available: {}", available);
|
||||
clientPayloadRemainderBuffer.appendBytes(inputStream.readNBytes(available));
|
||||
}
|
||||
|
||||
byte[] clientPayload = clientPayloadRemainderBuffer.getBytes();
|
||||
|
||||
int idx, prevIdx = 0;
|
||||
do {
|
||||
idx = newLineIndex(clientPayload, prevIdx);
|
||||
if (idx >= 0) {
|
||||
// TODO here don't assume we deal with linux eol
|
||||
var eolLength =
|
||||
idx < clientPayload.length - 1
|
||||
&& clientPayload[idx] == '\r'
|
||||
&& clientPayload[idx + 1] == '\n'
|
||||
? 2 : 1;
|
||||
int len = idx - prevIdx;
|
||||
Buffer b = Buffer.buffer(len);
|
||||
b = b.appendBytes(clientPayload, prevIdx, len);
|
||||
b = b.appendByte((byte) '\n');
|
||||
logger.info("Sending binary message: {}", b);
|
||||
getVertx()
|
||||
.eventBus()
|
||||
.send(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, b);
|
||||
prevIdx = idx + eolLength;
|
||||
}
|
||||
} while (idx >= 0 && prevIdx < clientPayload.length);
|
||||
int remainderLength = clientPayload.length - prevIdx + 1;
|
||||
if (remainderLength > 0 && prevIdx < clientPayload.length) {
|
||||
clientPayloadRemainderBuffer = Buffer.buffer(remainderLength)
|
||||
.appendBytes(clientPayload, prevIdx, remainderLength);
|
||||
} else {
|
||||
clientPayloadRemainderBuffer = Buffer.buffer();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error("Failed to read client input", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,6 @@
|
|||
package ch.polgrabia.demos.client;
|
||||
|
||||
public class Constants {
|
||||
public static final String CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY = "%s#send"
|
||||
.formatted(WebSocketSenderParticle.class.getCanonicalName());
|
||||
}
|
|
@ -0,0 +1,54 @@
|
|||
package ch.polgrabia.demos.client;
|
||||
|
||||
import io.vertx.core.AbstractVerticle;
|
||||
import io.vertx.core.AsyncResult;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.http.HttpServer;
|
||||
import io.vertx.core.http.HttpServerOptions;
|
||||
import io.vertx.core.http.HttpServerRequest;
|
||||
import io.vertx.core.http.HttpServerResponse;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class WebApiParticle extends AbstractVerticle {
|
||||
private static final Logger logger = LoggerFactory.getLogger(WebApiParticle.class);
|
||||
private HttpServer server;
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
HttpServerOptions httpServerOptions = new HttpServerOptions();
|
||||
this.server = getVertx().createHttpServer(httpServerOptions);
|
||||
this.server.requestHandler(this::handleCommandRequests);
|
||||
this.server.listen(8123, "localhost", this::handleListenCommand);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
server.close();
|
||||
}
|
||||
|
||||
private void handleCommandRequests(HttpServerRequest httpServerRequest) {
|
||||
logger.info("Received command request on 8123");
|
||||
HttpServerResponse response = httpServerRequest.response();
|
||||
String qParam = httpServerRequest.getParam("q");
|
||||
if (qParam == null || qParam.isBlank()) {
|
||||
response.setStatusCode(400);
|
||||
response.end("bad request");
|
||||
return;
|
||||
}
|
||||
logger.info("Sending binary message: {}", qParam);
|
||||
getVertx().eventBus().send(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, Buffer.buffer(qParam));
|
||||
response.setStatusCode(202);
|
||||
response.end("Ok");
|
||||
}
|
||||
|
||||
private void handleListenCommand(AsyncResult<HttpServer> httpServerAsyncResult) {
|
||||
if (httpServerAsyncResult.failed()) {
|
||||
logger.warn("Couldn't initialized http command server");
|
||||
getVertx().close();
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
logger.info("Successfully started websocket client command server");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package ch.polgrabia.demos.client;
|
||||
|
||||
import io.vertx.core.AbstractVerticle;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.Verticle;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.eventbus.EventBus;
|
||||
import io.vertx.core.eventbus.Message;
|
||||
import io.vertx.core.eventbus.MessageConsumer;
|
||||
import io.vertx.core.http.WebSocket;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class WebSocketSenderParticle extends AbstractVerticle {
|
||||
private static final Logger logger = LoggerFactory.getLogger(WebSocketSenderParticle.class);
|
||||
private final WebSocket webSocket;
|
||||
private MessageConsumer<Buffer> consumer;
|
||||
|
||||
public WebSocketSenderParticle(WebSocket webSocket) {
|
||||
this.webSocket = webSocket;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
EventBus eventBus = getVertx().eventBus();
|
||||
this.consumer = eventBus.consumer(WebSocketSenderParticle.class.getCanonicalName(), event -> {
|
||||
Buffer msg = event.body();
|
||||
logger.info("Sending binary message: {}", msg);
|
||||
webSocket.writeBinaryMessage(msg);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
consumer.unregister().result();
|
||||
}
|
||||
}
|
|
@ -15,6 +15,7 @@ public class WebsocketClientApp {
|
|||
websocketClientParticle
|
||||
.waitingUntilFinished()
|
||||
.join();
|
||||
vertx.undeploy(websocketClientParticle.deploymentID());
|
||||
vertx.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,8 +3,9 @@ package ch.polgrabia.demos.client;
|
|||
import ch.polgrabia.demos.utils.TopicUtil;
|
||||
import io.vertx.core.AbstractVerticle;
|
||||
import io.vertx.core.AsyncResult;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.http.*;
|
||||
import io.vertx.core.http.HttpClient;
|
||||
import io.vertx.core.http.HttpClientOptions;
|
||||
import io.vertx.core.http.WebSocket;
|
||||
import io.vertx.core.http.impl.WebSocketClientImpl;
|
||||
import io.vertx.core.impl.CloseFuture;
|
||||
import io.vertx.core.impl.VertxInternal;
|
||||
|
@ -14,116 +15,94 @@ import org.slf4j.LoggerFactory;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public class WebsocketClientParticle extends AbstractVerticle {
|
||||
private static final Logger logger = LoggerFactory.getLogger(WebsocketClientParticle.class);
|
||||
private final String hostname;
|
||||
private final int port;
|
||||
// path maps to channel
|
||||
private final String path;
|
||||
private final String channel;
|
||||
private HttpClient client;
|
||||
private boolean isShuttingDownActivated = false;
|
||||
private final Object runningMonitor = new Object();
|
||||
private HttpServer server;
|
||||
private WebSocket webSocket;
|
||||
private static final Logger logger = LoggerFactory.getLogger(WebsocketClientParticle.class);
|
||||
private final String hostname;
|
||||
private final int port;
|
||||
// path maps to channel
|
||||
private final String path;
|
||||
private final String channel;
|
||||
private final ClientPeriodicStdinScannerParticle clientPeriodicStdinScannerParticle = new ClientPeriodicStdinScannerParticle(
|
||||
System.in
|
||||
);
|
||||
private final WebApiParticle webApiParticle = new WebApiParticle();
|
||||
private HttpClient client;
|
||||
private boolean isShuttingDownActivated = false;
|
||||
private final Object runningMonitor = new Object();
|
||||
private WebSocket webSocket;
|
||||
private WebSocketSenderParticle webSocketSenderParticle;
|
||||
|
||||
public WebsocketClientParticle(String hostname, int port, String path) {
|
||||
this.hostname = hostname;
|
||||
this.port = port;
|
||||
this.path = path;
|
||||
this.channel = TopicUtil.convertTopicName(path);
|
||||
public WebsocketClientParticle(String hostname, int port, String path) {
|
||||
this.hostname = hostname;
|
||||
this.port = port;
|
||||
this.path = path;
|
||||
this.channel = TopicUtil.convertTopicName(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
this.client = getVertx()
|
||||
.createHttpClient();
|
||||
logger.info("Starting websocket client command server");
|
||||
new WebSocketClientImpl((VertxInternal) getVertx(), new HttpClientOptions(), new CloseFuture())
|
||||
.webSocket(port, hostname, path, this::handleMessage);
|
||||
getVertx().deployVerticle(clientPeriodicStdinScannerParticle);
|
||||
getVertx().deployVerticle(webApiParticle);
|
||||
}
|
||||
|
||||
private void handleMessage(AsyncResult<WebSocket> webSocketAsyncResult) {
|
||||
logger.info("Handling websocket");
|
||||
if (!webSocketAsyncResult.succeeded()) {
|
||||
logger.warn("Websocket handler connection failed", webSocketAsyncResult.cause());
|
||||
try {
|
||||
stop();
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to stop particle", e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
this.webSocket = webSocketAsyncResult.result();
|
||||
webSocketSenderParticle = new WebSocketSenderParticle(webSocket);
|
||||
getVertx().deployVerticle(webSocketSenderParticle);
|
||||
vertx.setPeriodic(100, clientPeriodicStdinScannerParticle);
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
this.client = getVertx()
|
||||
.createHttpClient();
|
||||
HttpServerOptions httpServerOptions = new HttpServerOptions();
|
||||
this.server = getVertx().createHttpServer(httpServerOptions);
|
||||
this.server.requestHandler(this::handleCommandRequests);
|
||||
this.server.listen(8123, "localhost", this::handleListenCommand);
|
||||
logger.info("Starting websocket client command server");
|
||||
new WebSocketClientImpl((VertxInternal) getVertx(), new HttpClientOptions(), new CloseFuture())
|
||||
.webSocket(port, hostname, path, this::handleMessage);
|
||||
}
|
||||
|
||||
private void handleListenCommand(AsyncResult<HttpServer> httpServerAsyncResult) {
|
||||
if (httpServerAsyncResult.failed()) {
|
||||
logger.warn("Couldn't initialized http command server");
|
||||
getVertx().close();
|
||||
System.exit(1);
|
||||
webSocket
|
||||
.binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", channel, msg))
|
||||
.closeHandler(unused -> {
|
||||
try {
|
||||
stop();
|
||||
} catch (Exception e) {
|
||||
logger.error("Got an error while stopping", e);
|
||||
}
|
||||
});
|
||||
|
||||
logger.info("Successfully started websocket client command server");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
isShuttingDownActivated = true;
|
||||
getVertx().undeploy(webApiParticle.deploymentID());
|
||||
getVertx().undeploy(webSocketSenderParticle.deploymentID());
|
||||
getVertx().undeploy(clientPeriodicStdinScannerParticle.deploymentID());
|
||||
client.close();
|
||||
synchronized (runningMonitor) {
|
||||
runningMonitor.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
private void handleCommandRequests(HttpServerRequest httpServerRequest) {
|
||||
logger.info("Received command request on 8123");
|
||||
HttpServerResponse response = httpServerRequest.response();
|
||||
String qParam = httpServerRequest.getParam("q");
|
||||
if (qParam == null || qParam.isBlank()) {
|
||||
response.setStatusCode(400);
|
||||
response.end("bad request");
|
||||
return;
|
||||
}
|
||||
logger.info("Sending binary message: {}", qParam);
|
||||
webSocket.writeBinaryMessage(Buffer.buffer(qParam));
|
||||
response.setStatusCode(202);
|
||||
response.end("Ok");
|
||||
}
|
||||
|
||||
private void handleMessage(AsyncResult<WebSocket> webSocketAsyncResult) {
|
||||
logger.info("Handling websocket");
|
||||
if (!webSocketAsyncResult.succeeded()) {
|
||||
logger.warn("Websocket handler connection failed", webSocketAsyncResult.cause());
|
||||
try {
|
||||
stop();
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to stop particle", e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
this.webSocket = webSocketAsyncResult.result();
|
||||
vertx.setPeriodic(100, new ClientPeriodicStdinScanner(
|
||||
webSocket,
|
||||
System.in
|
||||
));
|
||||
|
||||
webSocket
|
||||
.binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", channel, msg))
|
||||
.closeHandler(unused -> {
|
||||
try {
|
||||
stop();
|
||||
} catch (Exception e) {
|
||||
logger.error("Got an error while stopping", e);
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
isShuttingDownActivated = true;
|
||||
client.close();
|
||||
server.close();
|
||||
public CompletableFuture<Boolean> waitingUntilFinished() {
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
while (!isShuttingDownActivated) {
|
||||
synchronized (runningMonitor) {
|
||||
runningMonitor.notifyAll();
|
||||
try {
|
||||
runningMonitor.wait();
|
||||
return true;
|
||||
} catch (InterruptedException e) {
|
||||
logger.warn("Got interrupted", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public CompletableFuture<Boolean> waitingUntilFinished() {
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
while (!isShuttingDownActivated) {
|
||||
synchronized (runningMonitor) {
|
||||
try {
|
||||
runningMonitor.wait();
|
||||
return true;
|
||||
} catch (InterruptedException e) {
|
||||
logger.warn("Got interrupted", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue