Compare commits
No commits in common. "093615aeabf87c54cb921f3df8f984718b737050" and "4e6a49b0f5b6c24fc5020dbd15977292287ee4b3" have entirely different histories.
093615aeab
...
4e6a49b0f5
|
@ -1,10 +0,0 @@
|
||||||
root = true
|
|
||||||
|
|
||||||
[*]
|
|
||||||
charset = utf-8
|
|
||||||
end_of_line = lf
|
|
||||||
indent_size = 2
|
|
||||||
indent_style = space
|
|
||||||
insert_final_newline = false
|
|
||||||
max_line_length = 120
|
|
||||||
tab_width = 2
|
|
|
@ -1,25 +1,23 @@
|
||||||
plugins {
|
plugins {
|
||||||
id("java")
|
id("java")
|
||||||
}
|
}
|
||||||
|
|
||||||
group = "ch.polgrabia.demos"
|
group = "ch.polgrabia.demos"
|
||||||
version = "1.0-SNAPSHOT"
|
version = "1.0-SNAPSHOT"
|
||||||
|
|
||||||
repositories {
|
repositories {
|
||||||
mavenCentral()
|
mavenCentral()
|
||||||
}
|
}
|
||||||
|
|
||||||
val vertxVersion = project.ext.get("vertx.version")
|
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation("commons-cli:commons-cli:1.9.0")
|
implementation("commons-cli:commons-cli:1.9.0")
|
||||||
implementation("io.vertx:vertx-core:$vertxVersion")
|
implementation("io.vertx:vertx-core:4.5.9")
|
||||||
implementation("io.vertx:vertx-web:$vertxVersion")
|
implementation("ch.qos.logback:logback-classic:1.5.6")
|
||||||
implementation("ch.qos.logback:logback-classic:1.5.6")
|
implementation("org.apache.kafka:kafka-clients:3.9.0")
|
||||||
implementation("org.apache.kafka:kafka-clients:3.9.0")
|
testImplementation(platform("org.junit:junit-bom:5.10.0"))
|
||||||
testImplementation(platform("org.junit:junit-bom:5.10.0"))
|
testImplementation("org.junit.jupiter:junit-jupiter")
|
||||||
testImplementation("org.junit.jupiter:junit-jupiter")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks.test {
|
tasks.test {
|
||||||
useJUnitPlatform()
|
useJUnitPlatform()
|
||||||
}
|
}
|
|
@ -0,0 +1,79 @@
|
||||||
|
package ch.polgrabia.demos.client;
|
||||||
|
|
||||||
|
import io.vertx.core.Handler;
|
||||||
|
import io.vertx.core.buffer.Buffer;
|
||||||
|
import io.vertx.core.http.WebSocket;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
public class ClientPeriodicStdinScanner implements Handler<Long> {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(ClientPeriodicStdinScanner.class);
|
||||||
|
private final InputStream inputStream;
|
||||||
|
private Buffer clientPayloadRemainderBuffer = Buffer.buffer();
|
||||||
|
private final WebSocket webSocket;
|
||||||
|
|
||||||
|
ClientPeriodicStdinScanner(WebSocket webSocket, InputStream inputStream) {
|
||||||
|
this.webSocket = webSocket;
|
||||||
|
this.webSocket.setWriteQueueMaxSize(0);
|
||||||
|
this.inputStream = inputStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int newLineIndex(byte[] clientPayload, int prevIdx) {
|
||||||
|
for (int i = prevIdx; i < clientPayload.length; i++) {
|
||||||
|
if (clientPayload[i] == '\n' || clientPayload[i] == '\r' ) {
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(Long event) {
|
||||||
|
{
|
||||||
|
// TODO this doesn't seem to be working on wsl environment. Needs to be resolved or changed how it
|
||||||
|
// fetches the input
|
||||||
|
try {
|
||||||
|
int available = inputStream.available();
|
||||||
|
if (available > 0) {
|
||||||
|
logger.debug("Got available: {}", available);
|
||||||
|
clientPayloadRemainderBuffer.appendBytes(inputStream.readNBytes(available));
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] clientPayload = clientPayloadRemainderBuffer.getBytes();
|
||||||
|
|
||||||
|
int idx, prevIdx = 0;
|
||||||
|
do {
|
||||||
|
idx = newLineIndex(clientPayload, prevIdx);
|
||||||
|
if (idx >= 0) {
|
||||||
|
// TODO here don't assume we deal with linux eol
|
||||||
|
var eolLength =
|
||||||
|
idx < clientPayload.length - 1
|
||||||
|
&& clientPayload[idx] == '\r'
|
||||||
|
&& clientPayload[idx + 1] == '\n'
|
||||||
|
? 2 : 1;
|
||||||
|
int len = idx - prevIdx;
|
||||||
|
Buffer b = Buffer.buffer(len);
|
||||||
|
b = b.appendBytes(clientPayload, prevIdx, len);
|
||||||
|
b = b.appendByte((byte)'\n');
|
||||||
|
logger.info("Sending binary message: {}", b);;
|
||||||
|
webSocket.writeBinaryMessage(b);
|
||||||
|
prevIdx = idx + eolLength;
|
||||||
|
}
|
||||||
|
} while (idx >= 0 && prevIdx < clientPayload.length);
|
||||||
|
int remainderLength = clientPayload.length - prevIdx + 1;
|
||||||
|
if (remainderLength > 0 && prevIdx < clientPayload.length) {
|
||||||
|
clientPayloadRemainderBuffer = Buffer.buffer(remainderLength)
|
||||||
|
.appendBytes(clientPayload, prevIdx, remainderLength);
|
||||||
|
} else {
|
||||||
|
clientPayloadRemainderBuffer = Buffer.buffer();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("Failed to read client input", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,78 +0,0 @@
|
||||||
package ch.polgrabia.demos.client;
|
|
||||||
|
|
||||||
import io.vertx.core.AbstractVerticle;
|
|
||||||
import io.vertx.core.Handler;
|
|
||||||
import io.vertx.core.buffer.Buffer;
|
|
||||||
import io.vertx.core.http.WebSocket;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
|
|
||||||
public class ClientPeriodicStdinScannerParticle extends AbstractVerticle implements Handler<Long> {
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ClientPeriodicStdinScannerParticle.class);
|
|
||||||
private final InputStream inputStream;
|
|
||||||
private Buffer clientPayloadRemainderBuffer = Buffer.buffer();
|
|
||||||
|
|
||||||
ClientPeriodicStdinScannerParticle(InputStream inputStream) {
|
|
||||||
this.inputStream = inputStream;
|
|
||||||
}
|
|
||||||
|
|
||||||
private int newLineIndex(byte[] clientPayload, int prevIdx) {
|
|
||||||
for (int i = prevIdx; i < clientPayload.length; i++) {
|
|
||||||
if (clientPayload[i] == '\n' || clientPayload[i] == '\r') {
|
|
||||||
return i;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handle(Long event) {
|
|
||||||
{
|
|
||||||
// TODO this doesn't seem to be working on wsl environment. Needs to be resolved or changed how it
|
|
||||||
// fetches the input
|
|
||||||
try {
|
|
||||||
int available = inputStream.available();
|
|
||||||
if (available > 0) {
|
|
||||||
logger.debug("Got available: {}", available);
|
|
||||||
clientPayloadRemainderBuffer.appendBytes(inputStream.readNBytes(available));
|
|
||||||
}
|
|
||||||
|
|
||||||
byte[] clientPayload = clientPayloadRemainderBuffer.getBytes();
|
|
||||||
|
|
||||||
int idx, prevIdx = 0;
|
|
||||||
do {
|
|
||||||
idx = newLineIndex(clientPayload, prevIdx);
|
|
||||||
if (idx >= 0) {
|
|
||||||
// TODO here don't assume we deal with linux eol
|
|
||||||
var eolLength =
|
|
||||||
idx < clientPayload.length - 1
|
|
||||||
&& clientPayload[idx] == '\r'
|
|
||||||
&& clientPayload[idx + 1] == '\n'
|
|
||||||
? 2 : 1;
|
|
||||||
int len = idx - prevIdx;
|
|
||||||
Buffer b = Buffer.buffer(len);
|
|
||||||
b = b.appendBytes(clientPayload, prevIdx, len);
|
|
||||||
b = b.appendByte((byte) '\n');
|
|
||||||
logger.info("Sending binary message: {}", b);
|
|
||||||
getVertx()
|
|
||||||
.eventBus()
|
|
||||||
.publish(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, b);
|
|
||||||
prevIdx = idx + eolLength;
|
|
||||||
}
|
|
||||||
} while (idx >= 0 && prevIdx < clientPayload.length);
|
|
||||||
int remainderLength = clientPayload.length - prevIdx + 1;
|
|
||||||
if (remainderLength > 0 && prevIdx < clientPayload.length) {
|
|
||||||
clientPayloadRemainderBuffer = Buffer.buffer(remainderLength)
|
|
||||||
.appendBytes(clientPayload, prevIdx, remainderLength);
|
|
||||||
} else {
|
|
||||||
clientPayloadRemainderBuffer = Buffer.buffer();
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.error("Failed to read client input", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,7 +0,0 @@
|
||||||
package ch.polgrabia.demos.client;
|
|
||||||
|
|
||||||
public class Constants {
|
|
||||||
public static final String CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY = "%s#send"
|
|
||||||
.formatted(WebSocketSenderParticle.class.getCanonicalName());
|
|
||||||
public static final int CLIENT_CONSOLE_INPUT_SCANNING_PERIOD_MS = 100;
|
|
||||||
}
|
|
|
@ -1,63 +0,0 @@
|
||||||
package ch.polgrabia.demos.client;
|
|
||||||
|
|
||||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
|
||||||
import io.vertx.core.AbstractVerticle;
|
|
||||||
import io.vertx.core.AsyncResult;
|
|
||||||
import io.vertx.core.Future;
|
|
||||||
import io.vertx.core.buffer.Buffer;
|
|
||||||
import io.vertx.core.http.HttpServer;
|
|
||||||
import io.vertx.core.http.HttpServerOptions;
|
|
||||||
import io.vertx.core.http.HttpServerRequest;
|
|
||||||
import io.vertx.core.http.HttpServerResponse;
|
|
||||||
import io.vertx.ext.web.Router;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
public class WebApiParticle extends AbstractVerticle {
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(WebApiParticle.class);
|
|
||||||
private final int clientPort;
|
|
||||||
private HttpServer server;
|
|
||||||
|
|
||||||
public WebApiParticle(int clientPort) {
|
|
||||||
this.clientPort = clientPort;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() throws Exception {
|
|
||||||
HttpServerOptions httpServerOptions = new HttpServerOptions();
|
|
||||||
var router = Router.router(getVertx());
|
|
||||||
this.server = getVertx().createHttpServer(httpServerOptions);
|
|
||||||
router.get("/message").respond(routingContext -> handleCommandRequests(routingContext.request()));
|
|
||||||
this.server.requestHandler(router);
|
|
||||||
this.server.listen(clientPort, "localhost", this::handleListenCommand);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop() throws Exception {
|
|
||||||
server.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
private Future<Void> handleCommandRequests(HttpServerRequest httpServerRequest) {
|
|
||||||
logger.info("Received command request on {}", clientPort);
|
|
||||||
HttpServerResponse response = httpServerRequest.response();
|
|
||||||
String qParam = httpServerRequest.getParam("q");
|
|
||||||
if (qParam == null || qParam.isBlank()) {
|
|
||||||
response.setStatusCode(HttpResponseStatus.BAD_REQUEST.code());
|
|
||||||
return response.end("%s\n".formatted(HttpResponseStatus.BAD_REQUEST.reasonPhrase()));
|
|
||||||
}
|
|
||||||
logger.info("Sending binary message: {}", qParam);
|
|
||||||
getVertx().eventBus().publish(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, Buffer.buffer(qParam));
|
|
||||||
response.setStatusCode(HttpResponseStatus.ACCEPTED.code());
|
|
||||||
return response.end("%s\n".formatted(HttpResponseStatus.ACCEPTED.reasonPhrase()));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleListenCommand(AsyncResult<HttpServer> httpServerAsyncResult) {
|
|
||||||
if (httpServerAsyncResult.failed()) {
|
|
||||||
logger.warn("Couldn't initialized http command server");
|
|
||||||
getVertx().close();
|
|
||||||
System.exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info("Successfully started websocket client command server");
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,35 +0,0 @@
|
||||||
package ch.polgrabia.demos.client;
|
|
||||||
|
|
||||||
import io.vertx.core.AbstractVerticle;
|
|
||||||
import io.vertx.core.http.WebSocket;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
public class WebSocketReceiverParticle extends AbstractVerticle {
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(WebSocketReceiverParticle.class);
|
|
||||||
private final WebSocket webSocket;
|
|
||||||
private final String channel;
|
|
||||||
private final Runnable onClose;
|
|
||||||
|
|
||||||
public WebSocketReceiverParticle(WebSocket webSocket, String channel, Runnable onClose) {
|
|
||||||
this.webSocket = webSocket;
|
|
||||||
this.channel = channel;
|
|
||||||
this.onClose = onClose;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() throws Exception {
|
|
||||||
logger.info("Starting WebSocketReceiverParticle");
|
|
||||||
webSocket
|
|
||||||
.binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", channel, msg))
|
|
||||||
.closeHandler(unused -> {
|
|
||||||
logger.info("[{}] Web socket is being closed {}", channel, webSocket.closeReason());
|
|
||||||
try {
|
|
||||||
onClose.run();
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Got an error while stopping", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,37 +0,0 @@
|
||||||
package ch.polgrabia.demos.client;
|
|
||||||
|
|
||||||
import io.vertx.core.AbstractVerticle;
|
|
||||||
import io.vertx.core.Handler;
|
|
||||||
import io.vertx.core.Verticle;
|
|
||||||
import io.vertx.core.buffer.Buffer;
|
|
||||||
import io.vertx.core.eventbus.EventBus;
|
|
||||||
import io.vertx.core.eventbus.Message;
|
|
||||||
import io.vertx.core.eventbus.MessageConsumer;
|
|
||||||
import io.vertx.core.http.WebSocket;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
public class WebSocketSenderParticle extends AbstractVerticle {
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(WebSocketSenderParticle.class);
|
|
||||||
private final WebSocket webSocket;
|
|
||||||
private MessageConsumer<Buffer> consumer;
|
|
||||||
|
|
||||||
public WebSocketSenderParticle(WebSocket webSocket) {
|
|
||||||
this.webSocket = webSocket;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() throws Exception {
|
|
||||||
EventBus eventBus = getVertx().eventBus();
|
|
||||||
this.consumer = eventBus.consumer(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, event -> {
|
|
||||||
Buffer msg = event.body();
|
|
||||||
logger.info("Sending binary message: {}", msg);
|
|
||||||
webSocket.writeBinaryMessage(msg);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop() throws Exception {
|
|
||||||
consumer.unregister().result();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -9,17 +9,12 @@ public class WebsocketClientApp {
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
var vertx = Vertx.vertx();
|
var vertx = Vertx.vertx();
|
||||||
logger.info("Deploying websocket particle");
|
logger.info("Deploying websocket particle");
|
||||||
WebsocketClientParticle websocketClientParticle = new WebsocketClientParticle(
|
WebsocketClientParticle websocketClientParticle = new WebsocketClientParticle("localhost", 8080, "/test");
|
||||||
"localhost",
|
|
||||||
8080,
|
|
||||||
"/test",
|
|
||||||
8123);
|
|
||||||
vertx.deployVerticle(websocketClientParticle);
|
vertx.deployVerticle(websocketClientParticle);
|
||||||
logger.info("Deployed websocket particle");
|
logger.info("Deployed websocket particle");
|
||||||
websocketClientParticle
|
websocketClientParticle
|
||||||
.waitingUntilFinished()
|
.waitingUntilFinished()
|
||||||
.join();
|
.join();
|
||||||
vertx.undeploy(websocketClientParticle.deploymentID());
|
|
||||||
vertx.close();
|
vertx.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,9 +3,8 @@ package ch.polgrabia.demos.client;
|
||||||
import ch.polgrabia.demos.utils.TopicUtil;
|
import ch.polgrabia.demos.utils.TopicUtil;
|
||||||
import io.vertx.core.AbstractVerticle;
|
import io.vertx.core.AbstractVerticle;
|
||||||
import io.vertx.core.AsyncResult;
|
import io.vertx.core.AsyncResult;
|
||||||
import io.vertx.core.http.HttpClient;
|
import io.vertx.core.buffer.Buffer;
|
||||||
import io.vertx.core.http.HttpClientOptions;
|
import io.vertx.core.http.*;
|
||||||
import io.vertx.core.http.WebSocket;
|
|
||||||
import io.vertx.core.http.impl.WebSocketClientImpl;
|
import io.vertx.core.http.impl.WebSocketClientImpl;
|
||||||
import io.vertx.core.impl.CloseFuture;
|
import io.vertx.core.impl.CloseFuture;
|
||||||
import io.vertx.core.impl.VertxInternal;
|
import io.vertx.core.impl.VertxInternal;
|
||||||
|
@ -15,91 +14,116 @@ import org.slf4j.LoggerFactory;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
public class WebsocketClientParticle extends AbstractVerticle {
|
public class WebsocketClientParticle extends AbstractVerticle {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(WebsocketClientParticle.class);
|
private static final Logger logger = LoggerFactory.getLogger(WebsocketClientParticle.class);
|
||||||
private final String webSocketHostname;
|
private final String hostname;
|
||||||
private final int webSocketPort;
|
private final int port;
|
||||||
// path maps to channel
|
// path maps to channel
|
||||||
private final String webSocketPath;
|
private final String path;
|
||||||
private final String channel;
|
private final String channel;
|
||||||
private final ClientPeriodicStdinScannerParticle clientPeriodicStdinScannerParticle = new ClientPeriodicStdinScannerParticle(
|
private HttpClient client;
|
||||||
System.in
|
private boolean isShuttingDownActivated = false;
|
||||||
);
|
private final Object runningMonitor = new Object();
|
||||||
private final WebApiParticle webApiParticle;
|
private HttpServer server;
|
||||||
private HttpClient client;
|
private WebSocket webSocket;
|
||||||
private boolean isShuttingDownActivated = false;
|
|
||||||
private final Object runningMonitor = new Object();
|
|
||||||
private WebSocketSenderParticle webSocketSenderParticle;
|
|
||||||
private WebSocketReceiverParticle webSocketReceiverParticle;
|
|
||||||
|
|
||||||
public WebsocketClientParticle(
|
public WebsocketClientParticle(String hostname, int port, String path) {
|
||||||
String webSocketHostname,
|
this.hostname = hostname;
|
||||||
int webSocketPort,
|
this.port = port;
|
||||||
String webSocketPath,
|
this.path = path;
|
||||||
int clientApiPort) {
|
this.channel = TopicUtil.convertTopicName(path);
|
||||||
this.webSocketHostname = webSocketHostname;
|
|
||||||
this.webSocketPort = webSocketPort;
|
|
||||||
this.webSocketPath = webSocketPath;
|
|
||||||
this.channel = TopicUtil.convertTopicName(webSocketPath);
|
|
||||||
this.webApiParticle = new WebApiParticle(clientApiPort);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() {
|
|
||||||
this.client = getVertx()
|
|
||||||
.createHttpClient();
|
|
||||||
logger.info("Starting websocket client command server");
|
|
||||||
new WebSocketClientImpl((VertxInternal) getVertx(), new HttpClientOptions(), new CloseFuture())
|
|
||||||
.webSocket(webSocketPort, webSocketHostname, webSocketPath, this::handleMessage);
|
|
||||||
getVertx().deployVerticle(clientPeriodicStdinScannerParticle);
|
|
||||||
getVertx().deployVerticle(webApiParticle);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleMessage(AsyncResult<WebSocket> webSocketAsyncResult) {
|
|
||||||
logger.info("Handling websocket");
|
|
||||||
if (!webSocketAsyncResult.succeeded()) {
|
|
||||||
logger.warn("Websocket handler connection failed", webSocketAsyncResult.cause());
|
|
||||||
try {
|
|
||||||
stop();
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Failed to stop particle", e);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
WebSocket webSocket = webSocketAsyncResult.result();
|
|
||||||
webSocketReceiverParticle = new WebSocketReceiverParticle(webSocket, channel, this::stop);
|
|
||||||
getVertx().deployVerticle(webSocketReceiverParticle);
|
|
||||||
webSocketSenderParticle = new WebSocketSenderParticle(webSocket);
|
|
||||||
getVertx().deployVerticle(webSocketSenderParticle);
|
|
||||||
vertx.setPeriodic(Constants.CLIENT_CONSOLE_INPUT_SCANNING_PERIOD_MS, clientPeriodicStdinScannerParticle);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop() {
|
public void start() {
|
||||||
isShuttingDownActivated = true;
|
this.client = getVertx()
|
||||||
getVertx().undeploy(webApiParticle.deploymentID());
|
.createHttpClient();
|
||||||
getVertx().undeploy(webSocketSenderParticle.deploymentID());
|
HttpServerOptions httpServerOptions = new HttpServerOptions();
|
||||||
getVertx().undeploy(webSocketReceiverParticle.deploymentID());
|
this.server = getVertx().createHttpServer(httpServerOptions);
|
||||||
getVertx().undeploy(clientPeriodicStdinScannerParticle.deploymentID());
|
this.server.requestHandler(this::handleCommandRequests);
|
||||||
client.close();
|
this.server.listen(8123, "localhost", this::handleListenCommand);
|
||||||
synchronized (runningMonitor) {
|
logger.info("Starting websocket client command server");
|
||||||
runningMonitor.notifyAll();
|
new WebSocketClientImpl((VertxInternal) getVertx(), new HttpClientOptions(), new CloseFuture())
|
||||||
|
.webSocket(port, hostname, path, this::handleMessage);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
public CompletableFuture<Boolean> waitingUntilFinished() {
|
private void handleListenCommand(AsyncResult<HttpServer> httpServerAsyncResult) {
|
||||||
return CompletableFuture.supplyAsync(() -> {
|
if (httpServerAsyncResult.failed()) {
|
||||||
while (!isShuttingDownActivated) {
|
logger.warn("Couldn't initialized http command server");
|
||||||
synchronized (runningMonitor) {
|
getVertx().close();
|
||||||
try {
|
System.exit(1);
|
||||||
runningMonitor.wait();
|
|
||||||
return true;
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
logger.warn("Got interrupted", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return false;
|
logger.info("Successfully started websocket client command server");
|
||||||
});
|
}
|
||||||
}
|
|
||||||
|
private void handleCommandRequests(HttpServerRequest httpServerRequest) {
|
||||||
|
logger.info("Received command request on 8123");
|
||||||
|
HttpServerResponse response = httpServerRequest.response();
|
||||||
|
String qParam = httpServerRequest.getParam("q");
|
||||||
|
if (qParam == null || qParam.isBlank()) {
|
||||||
|
response.setStatusCode(400);
|
||||||
|
response.end("bad request");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
logger.info("Sending binary message: {}", qParam);
|
||||||
|
webSocket.writeBinaryMessage(Buffer.buffer(qParam));
|
||||||
|
response.setStatusCode(202);
|
||||||
|
response.end("Ok");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleMessage(AsyncResult<WebSocket> webSocketAsyncResult) {
|
||||||
|
logger.info("Handling websocket");
|
||||||
|
if (!webSocketAsyncResult.succeeded()) {
|
||||||
|
logger.warn("Websocket handler connection failed", webSocketAsyncResult.cause());
|
||||||
|
try {
|
||||||
|
stop();
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Failed to stop particle", e);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.webSocket = webSocketAsyncResult.result();
|
||||||
|
vertx.setPeriodic(100, new ClientPeriodicStdinScanner(
|
||||||
|
webSocket,
|
||||||
|
System.in
|
||||||
|
));
|
||||||
|
|
||||||
|
webSocket
|
||||||
|
.binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", channel, msg))
|
||||||
|
.closeHandler(unused -> {
|
||||||
|
try {
|
||||||
|
stop();
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Got an error while stopping", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
isShuttingDownActivated = true;
|
||||||
|
client.close();
|
||||||
|
server.close();
|
||||||
|
synchronized (runningMonitor) {
|
||||||
|
runningMonitor.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public CompletableFuture<Boolean> waitingUntilFinished() {
|
||||||
|
return CompletableFuture.supplyAsync(() -> {
|
||||||
|
while (!isShuttingDownActivated) {
|
||||||
|
synchronized (runningMonitor) {
|
||||||
|
try {
|
||||||
|
runningMonitor.wait();
|
||||||
|
return true;
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
logger.warn("Got interrupted", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,33 +0,0 @@
|
||||||
package ch.polgrabia.demos.server;
|
|
||||||
|
|
||||||
import ch.polgrabia.demos.utils.TopicUtil;
|
|
||||||
import io.vertx.core.Handler;
|
|
||||||
import io.vertx.core.buffer.Buffer;
|
|
||||||
import io.vertx.core.http.ServerWebSocket;
|
|
||||||
import org.apache.kafka.clients.producer.Producer;
|
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
public class WebsocketMessageHandler implements Handler<Buffer> {
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(WebsocketMessageHandler.class);
|
|
||||||
private final ServerWebSocket serverWebSocket;
|
|
||||||
private final Producer<String, byte[]> kafkaProducer;
|
|
||||||
|
|
||||||
public WebsocketMessageHandler(Producer<String, byte[]> kafkaProducer, ServerWebSocket serverWebSocket) {
|
|
||||||
this.kafkaProducer = kafkaProducer;
|
|
||||||
this.serverWebSocket = serverWebSocket;
|
|
||||||
this.serverWebSocket.setWriteQueueMaxSize(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void handle(Buffer s) {
|
|
||||||
String channel = TopicUtil.convertTopicName(serverWebSocket.path());
|
|
||||||
logger.info("[{}] Got message {}", channel, s);
|
|
||||||
serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka
|
|
||||||
if (kafkaProducer != null) {
|
|
||||||
kafkaProducer.send(new ProducerRecord<>(channel, channel, s.getBytes())); // kafka setup needs to be checked
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -5,6 +5,7 @@ import io.vertx.core.Handler;
|
||||||
import io.vertx.core.buffer.Buffer;
|
import io.vertx.core.buffer.Buffer;
|
||||||
import io.vertx.core.http.ServerWebSocket;
|
import io.vertx.core.http.ServerWebSocket;
|
||||||
import org.apache.kafka.clients.producer.Producer;
|
import org.apache.kafka.clients.producer.Producer;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -54,4 +55,27 @@ public class WebsocketServerHandler implements Handler<ServerWebSocket> {
|
||||||
sockets.add(serverWebSocket);
|
sockets.add(serverWebSocket);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class WebsocketMessageHandler implements Handler<Buffer> {
|
||||||
|
|
||||||
|
private final ServerWebSocket serverWebSocket;
|
||||||
|
private final Producer<String, byte[]> kafkaProducer;
|
||||||
|
|
||||||
|
public WebsocketMessageHandler(Producer<String, byte[]> kafkaProducer, ServerWebSocket serverWebSocket) {
|
||||||
|
this.kafkaProducer = kafkaProducer;
|
||||||
|
this.serverWebSocket = serverWebSocket;
|
||||||
|
this.serverWebSocket.setWriteQueueMaxSize(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(Buffer s) {
|
||||||
|
String channel = TopicUtil.convertTopicName(serverWebSocket.path());
|
||||||
|
logger.info("[{}] Got message {}", channel, s);
|
||||||
|
serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka
|
||||||
|
if (kafkaProducer != null) {
|
||||||
|
kafkaProducer.send(new ProducerRecord<>(channel, channel, s.getBytes())); // kafka setup needs to be checked
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1 +0,0 @@
|
||||||
vertx.version=4.5.12
|
|
Loading…
Reference in New Issue