Compare commits

..

3 Commits

13 changed files with 363 additions and 219 deletions

View File

@ -0,0 +1,10 @@
root = true
[*]
charset = utf-8
end_of_line = lf
indent_size = 2
indent_style = space
insert_final_newline = false
max_line_length = 120
tab_width = 2

View File

@ -1,23 +1,25 @@
plugins { plugins {
id("java") id("java")
} }
group = "ch.polgrabia.demos" group = "ch.polgrabia.demos"
version = "1.0-SNAPSHOT" version = "1.0-SNAPSHOT"
repositories { repositories {
mavenCentral() mavenCentral()
} }
val vertxVersion = project.ext.get("vertx.version")
dependencies { dependencies {
implementation("commons-cli:commons-cli:1.9.0") implementation("commons-cli:commons-cli:1.9.0")
implementation("io.vertx:vertx-core:4.5.9") implementation("io.vertx:vertx-core:$vertxVersion")
implementation("ch.qos.logback:logback-classic:1.5.6") implementation("io.vertx:vertx-web:$vertxVersion")
implementation("org.apache.kafka:kafka-clients:3.9.0") implementation("ch.qos.logback:logback-classic:1.5.6")
testImplementation(platform("org.junit:junit-bom:5.10.0")) implementation("org.apache.kafka:kafka-clients:3.9.0")
testImplementation("org.junit.jupiter:junit-jupiter") testImplementation(platform("org.junit:junit-bom:5.10.0"))
testImplementation("org.junit.jupiter:junit-jupiter")
} }
tasks.test { tasks.test {
useJUnitPlatform() useJUnitPlatform()
} }

View File

@ -1,79 +0,0 @@
package ch.polgrabia.demos.client;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
public class ClientPeriodicStdinScanner implements Handler<Long> {
private static final Logger logger = LoggerFactory.getLogger(ClientPeriodicStdinScanner.class);
private final InputStream inputStream;
private Buffer clientPayloadRemainderBuffer = Buffer.buffer();
private final WebSocket webSocket;
ClientPeriodicStdinScanner(WebSocket webSocket, InputStream inputStream) {
this.webSocket = webSocket;
this.webSocket.setWriteQueueMaxSize(0);
this.inputStream = inputStream;
}
private int newLineIndex(byte[] clientPayload, int prevIdx) {
for (int i = prevIdx; i < clientPayload.length; i++) {
if (clientPayload[i] == '\n' || clientPayload[i] == '\r' ) {
return i;
}
}
return -1;
}
@Override
public void handle(Long event) {
{
// TODO this doesn't seem to be working on wsl environment. Needs to be resolved or changed how it
// fetches the input
try {
int available = inputStream.available();
if (available > 0) {
logger.debug("Got available: {}", available);
clientPayloadRemainderBuffer.appendBytes(inputStream.readNBytes(available));
}
byte[] clientPayload = clientPayloadRemainderBuffer.getBytes();
int idx, prevIdx = 0;
do {
idx = newLineIndex(clientPayload, prevIdx);
if (idx >= 0) {
// TODO here don't assume we deal with linux eol
var eolLength =
idx < clientPayload.length - 1
&& clientPayload[idx] == '\r'
&& clientPayload[idx + 1] == '\n'
? 2 : 1;
int len = idx - prevIdx;
Buffer b = Buffer.buffer(len);
b = b.appendBytes(clientPayload, prevIdx, len);
b = b.appendByte((byte)'\n');
logger.info("Sending binary message: {}", b);;
webSocket.writeBinaryMessage(b);
prevIdx = idx + eolLength;
}
} while (idx >= 0 && prevIdx < clientPayload.length);
int remainderLength = clientPayload.length - prevIdx + 1;
if (remainderLength > 0 && prevIdx < clientPayload.length) {
clientPayloadRemainderBuffer = Buffer.buffer(remainderLength)
.appendBytes(clientPayload, prevIdx, remainderLength);
} else {
clientPayloadRemainderBuffer = Buffer.buffer();
}
} catch (IOException e) {
logger.error("Failed to read client input", e);
}
}
}
}

View File

@ -0,0 +1,78 @@
package ch.polgrabia.demos.client;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
public class ClientPeriodicStdinScannerParticle extends AbstractVerticle implements Handler<Long> {
private static final Logger logger = LoggerFactory.getLogger(ClientPeriodicStdinScannerParticle.class);
private final InputStream inputStream;
private Buffer clientPayloadRemainderBuffer = Buffer.buffer();
ClientPeriodicStdinScannerParticle(InputStream inputStream) {
this.inputStream = inputStream;
}
private int newLineIndex(byte[] clientPayload, int prevIdx) {
for (int i = prevIdx; i < clientPayload.length; i++) {
if (clientPayload[i] == '\n' || clientPayload[i] == '\r') {
return i;
}
}
return -1;
}
@Override
public void handle(Long event) {
{
// TODO this doesn't seem to be working on wsl environment. Needs to be resolved or changed how it
// fetches the input
try {
int available = inputStream.available();
if (available > 0) {
logger.debug("Got available: {}", available);
clientPayloadRemainderBuffer.appendBytes(inputStream.readNBytes(available));
}
byte[] clientPayload = clientPayloadRemainderBuffer.getBytes();
int idx, prevIdx = 0;
do {
idx = newLineIndex(clientPayload, prevIdx);
if (idx >= 0) {
// TODO here don't assume we deal with linux eol
var eolLength =
idx < clientPayload.length - 1
&& clientPayload[idx] == '\r'
&& clientPayload[idx + 1] == '\n'
? 2 : 1;
int len = idx - prevIdx;
Buffer b = Buffer.buffer(len);
b = b.appendBytes(clientPayload, prevIdx, len);
b = b.appendByte((byte) '\n');
logger.info("Sending binary message: {}", b);
getVertx()
.eventBus()
.publish(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, b);
prevIdx = idx + eolLength;
}
} while (idx >= 0 && prevIdx < clientPayload.length);
int remainderLength = clientPayload.length - prevIdx + 1;
if (remainderLength > 0 && prevIdx < clientPayload.length) {
clientPayloadRemainderBuffer = Buffer.buffer(remainderLength)
.appendBytes(clientPayload, prevIdx, remainderLength);
} else {
clientPayloadRemainderBuffer = Buffer.buffer();
}
} catch (IOException e) {
logger.error("Failed to read client input", e);
}
}
}
}

View File

@ -0,0 +1,7 @@
package ch.polgrabia.demos.client;
public class Constants {
public static final String CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY = "%s#send"
.formatted(WebSocketSenderParticle.class.getCanonicalName());
public static final int CLIENT_CONSOLE_INPUT_SCANNING_PERIOD_MS = 100;
}

View File

@ -0,0 +1,63 @@
package ch.polgrabia.demos.client;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.Router;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebApiParticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(WebApiParticle.class);
private final int clientPort;
private HttpServer server;
public WebApiParticle(int clientPort) {
this.clientPort = clientPort;
}
@Override
public void start() throws Exception {
HttpServerOptions httpServerOptions = new HttpServerOptions();
var router = Router.router(getVertx());
this.server = getVertx().createHttpServer(httpServerOptions);
router.get("/message").respond(routingContext -> handleCommandRequests(routingContext.request()));
this.server.requestHandler(router);
this.server.listen(clientPort, "localhost", this::handleListenCommand);
}
@Override
public void stop() throws Exception {
server.close();
}
private Future<Void> handleCommandRequests(HttpServerRequest httpServerRequest) {
logger.info("Received command request on {}", clientPort);
HttpServerResponse response = httpServerRequest.response();
String qParam = httpServerRequest.getParam("q");
if (qParam == null || qParam.isBlank()) {
response.setStatusCode(HttpResponseStatus.BAD_REQUEST.code());
return response.end("%s\n".formatted(HttpResponseStatus.BAD_REQUEST.reasonPhrase()));
}
logger.info("Sending binary message: {}", qParam);
getVertx().eventBus().publish(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, Buffer.buffer(qParam));
response.setStatusCode(HttpResponseStatus.ACCEPTED.code());
return response.end("%s\n".formatted(HttpResponseStatus.ACCEPTED.reasonPhrase()));
}
private void handleListenCommand(AsyncResult<HttpServer> httpServerAsyncResult) {
if (httpServerAsyncResult.failed()) {
logger.warn("Couldn't initialized http command server");
getVertx().close();
System.exit(1);
}
logger.info("Successfully started websocket client command server");
}
}

View File

@ -0,0 +1,35 @@
package ch.polgrabia.demos.client;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.http.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebSocketReceiverParticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(WebSocketReceiverParticle.class);
private final WebSocket webSocket;
private final String channel;
private final Runnable onClose;
public WebSocketReceiverParticle(WebSocket webSocket, String channel, Runnable onClose) {
this.webSocket = webSocket;
this.channel = channel;
this.onClose = onClose;
}
@Override
public void start() throws Exception {
logger.info("Starting WebSocketReceiverParticle");
webSocket
.binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", channel, msg))
.closeHandler(unused -> {
logger.info("[{}] Web socket is being closed {}", channel, webSocket.closeReason());
try {
onClose.run();
} catch (Exception e) {
logger.error("Got an error while stopping", e);
}
});
}
}

View File

@ -0,0 +1,37 @@
package ch.polgrabia.demos.client;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.Verticle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.http.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebSocketSenderParticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(WebSocketSenderParticle.class);
private final WebSocket webSocket;
private MessageConsumer<Buffer> consumer;
public WebSocketSenderParticle(WebSocket webSocket) {
this.webSocket = webSocket;
}
@Override
public void start() throws Exception {
EventBus eventBus = getVertx().eventBus();
this.consumer = eventBus.consumer(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, event -> {
Buffer msg = event.body();
logger.info("Sending binary message: {}", msg);
webSocket.writeBinaryMessage(msg);
});
}
@Override
public void stop() throws Exception {
consumer.unregister().result();
}
}

View File

@ -9,12 +9,17 @@ public class WebsocketClientApp {
public static void main(String[] args) { public static void main(String[] args) {
var vertx = Vertx.vertx(); var vertx = Vertx.vertx();
logger.info("Deploying websocket particle"); logger.info("Deploying websocket particle");
WebsocketClientParticle websocketClientParticle = new WebsocketClientParticle("localhost", 8080, "/test"); WebsocketClientParticle websocketClientParticle = new WebsocketClientParticle(
"localhost",
8080,
"/test",
8123);
vertx.deployVerticle(websocketClientParticle); vertx.deployVerticle(websocketClientParticle);
logger.info("Deployed websocket particle"); logger.info("Deployed websocket particle");
websocketClientParticle websocketClientParticle
.waitingUntilFinished() .waitingUntilFinished()
.join(); .join();
vertx.undeploy(websocketClientParticle.deploymentID());
vertx.close(); vertx.close();
} }
} }

View File

@ -3,8 +3,9 @@ package ch.polgrabia.demos.client;
import ch.polgrabia.demos.utils.TopicUtil; import ch.polgrabia.demos.utils.TopicUtil;
import io.vertx.core.AbstractVerticle; import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult; import io.vertx.core.AsyncResult;
import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClient;
import io.vertx.core.http.*; import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.impl.WebSocketClientImpl; import io.vertx.core.http.impl.WebSocketClientImpl;
import io.vertx.core.impl.CloseFuture; import io.vertx.core.impl.CloseFuture;
import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.VertxInternal;
@ -14,116 +15,91 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
public class WebsocketClientParticle extends AbstractVerticle { public class WebsocketClientParticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(WebsocketClientParticle.class); private static final Logger logger = LoggerFactory.getLogger(WebsocketClientParticle.class);
private final String hostname; private final String webSocketHostname;
private final int port; private final int webSocketPort;
// path maps to channel // path maps to channel
private final String path; private final String webSocketPath;
private final String channel; private final String channel;
private HttpClient client; private final ClientPeriodicStdinScannerParticle clientPeriodicStdinScannerParticle = new ClientPeriodicStdinScannerParticle(
private boolean isShuttingDownActivated = false; System.in
private final Object runningMonitor = new Object(); );
private HttpServer server; private final WebApiParticle webApiParticle;
private WebSocket webSocket; private HttpClient client;
private boolean isShuttingDownActivated = false;
private final Object runningMonitor = new Object();
private WebSocketSenderParticle webSocketSenderParticle;
private WebSocketReceiverParticle webSocketReceiverParticle;
public WebsocketClientParticle(String hostname, int port, String path) { public WebsocketClientParticle(
this.hostname = hostname; String webSocketHostname,
this.port = port; int webSocketPort,
this.path = path; String webSocketPath,
this.channel = TopicUtil.convertTopicName(path); int clientApiPort) {
this.webSocketHostname = webSocketHostname;
this.webSocketPort = webSocketPort;
this.webSocketPath = webSocketPath;
this.channel = TopicUtil.convertTopicName(webSocketPath);
this.webApiParticle = new WebApiParticle(clientApiPort);
}
@Override
public void start() {
this.client = getVertx()
.createHttpClient();
logger.info("Starting websocket client command server");
new WebSocketClientImpl((VertxInternal) getVertx(), new HttpClientOptions(), new CloseFuture())
.webSocket(webSocketPort, webSocketHostname, webSocketPath, this::handleMessage);
getVertx().deployVerticle(clientPeriodicStdinScannerParticle);
getVertx().deployVerticle(webApiParticle);
}
private void handleMessage(AsyncResult<WebSocket> webSocketAsyncResult) {
logger.info("Handling websocket");
if (!webSocketAsyncResult.succeeded()) {
logger.warn("Websocket handler connection failed", webSocketAsyncResult.cause());
try {
stop();
} catch (Exception e) {
logger.error("Failed to stop particle", e);
}
return;
} }
WebSocket webSocket = webSocketAsyncResult.result();
webSocketReceiverParticle = new WebSocketReceiverParticle(webSocket, channel, this::stop);
getVertx().deployVerticle(webSocketReceiverParticle);
webSocketSenderParticle = new WebSocketSenderParticle(webSocket);
getVertx().deployVerticle(webSocketSenderParticle);
vertx.setPeriodic(Constants.CLIENT_CONSOLE_INPUT_SCANNING_PERIOD_MS, clientPeriodicStdinScannerParticle);
}
@Override @Override
public void start() { public void stop() {
this.client = getVertx() isShuttingDownActivated = true;
.createHttpClient(); getVertx().undeploy(webApiParticle.deploymentID());
HttpServerOptions httpServerOptions = new HttpServerOptions(); getVertx().undeploy(webSocketSenderParticle.deploymentID());
this.server = getVertx().createHttpServer(httpServerOptions); getVertx().undeploy(webSocketReceiverParticle.deploymentID());
this.server.requestHandler(this::handleCommandRequests); getVertx().undeploy(clientPeriodicStdinScannerParticle.deploymentID());
this.server.listen(8123, "localhost", this::handleListenCommand); client.close();
logger.info("Starting websocket client command server"); synchronized (runningMonitor) {
new WebSocketClientImpl((VertxInternal) getVertx(), new HttpClientOptions(), new CloseFuture()) runningMonitor.notifyAll();
.webSocket(port, hostname, path, this::handleMessage);
} }
}
private void handleListenCommand(AsyncResult<HttpServer> httpServerAsyncResult) { public CompletableFuture<Boolean> waitingUntilFinished() {
if (httpServerAsyncResult.failed()) { return CompletableFuture.supplyAsync(() -> {
logger.warn("Couldn't initialized http command server"); while (!isShuttingDownActivated) {
getVertx().close();
System.exit(1);
}
logger.info("Successfully started websocket client command server");
}
private void handleCommandRequests(HttpServerRequest httpServerRequest) {
logger.info("Received command request on 8123");
HttpServerResponse response = httpServerRequest.response();
String qParam = httpServerRequest.getParam("q");
if (qParam == null || qParam.isBlank()) {
response.setStatusCode(400);
response.end("bad request");
return;
}
logger.info("Sending binary message: {}", qParam);
webSocket.writeBinaryMessage(Buffer.buffer(qParam));
response.setStatusCode(202);
response.end("Ok");
}
private void handleMessage(AsyncResult<WebSocket> webSocketAsyncResult) {
logger.info("Handling websocket");
if (!webSocketAsyncResult.succeeded()) {
logger.warn("Websocket handler connection failed", webSocketAsyncResult.cause());
try {
stop();
} catch (Exception e) {
logger.error("Failed to stop particle", e);
}
return;
}
this.webSocket = webSocketAsyncResult.result();
vertx.setPeriodic(100, new ClientPeriodicStdinScanner(
webSocket,
System.in
));
webSocket
.binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", channel, msg))
.closeHandler(unused -> {
try {
stop();
} catch (Exception e) {
logger.error("Got an error while stopping", e);
}
});
}
@Override
public void stop() {
isShuttingDownActivated = true;
client.close();
server.close();
synchronized (runningMonitor) { synchronized (runningMonitor) {
runningMonitor.notifyAll(); try {
runningMonitor.wait();
return true;
} catch (InterruptedException e) {
logger.warn("Got interrupted", e);
}
} }
} }
return false;
public CompletableFuture<Boolean> waitingUntilFinished() { });
return CompletableFuture.supplyAsync(() -> { }
while (!isShuttingDownActivated) {
synchronized (runningMonitor) {
try {
runningMonitor.wait();
return true;
} catch (InterruptedException e) {
logger.warn("Got interrupted", e);
}
}
}
return false;
});
}
} }

View File

@ -0,0 +1,33 @@
package ch.polgrabia.demos.server;
import ch.polgrabia.demos.utils.TopicUtil;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WebsocketMessageHandler implements Handler<Buffer> {
private static final Logger logger = LoggerFactory.getLogger(WebsocketMessageHandler.class);
private final ServerWebSocket serverWebSocket;
private final Producer<String, byte[]> kafkaProducer;
public WebsocketMessageHandler(Producer<String, byte[]> kafkaProducer, ServerWebSocket serverWebSocket) {
this.kafkaProducer = kafkaProducer;
this.serverWebSocket = serverWebSocket;
this.serverWebSocket.setWriteQueueMaxSize(0);
}
@Override
public void handle(Buffer s) {
String channel = TopicUtil.convertTopicName(serverWebSocket.path());
logger.info("[{}] Got message {}", channel, s);
serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka
if (kafkaProducer != null) {
kafkaProducer.send(new ProducerRecord<>(channel, channel, s.getBytes())); // kafka setup needs to be checked
}
}
}

View File

@ -5,7 +5,6 @@ import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket; import io.vertx.core.http.ServerWebSocket;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -55,27 +54,4 @@ public class WebsocketServerHandler implements Handler<ServerWebSocket> {
sockets.add(serverWebSocket); sockets.add(serverWebSocket);
} }
static class WebsocketMessageHandler implements Handler<Buffer> {
private final ServerWebSocket serverWebSocket;
private final Producer<String, byte[]> kafkaProducer;
public WebsocketMessageHandler(Producer<String, byte[]> kafkaProducer, ServerWebSocket serverWebSocket) {
this.kafkaProducer = kafkaProducer;
this.serverWebSocket = serverWebSocket;
this.serverWebSocket.setWriteQueueMaxSize(0);
}
@Override
public void handle(Buffer s) {
String channel = TopicUtil.convertTopicName(serverWebSocket.path());
logger.info("[{}] Got message {}", channel, s);
serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka
if (kafkaProducer != null) {
kafkaProducer.send(new ProducerRecord<>(channel, channel, s.getBytes())); // kafka setup needs to be checked
}
}
}
} }

View File

@ -0,0 +1 @@
vertx.version=4.5.12