Further refactorings of the client, vertx libs version upgraded.

master
Tomasz Polgrabia 2025-01-24 01:17:38 +01:00
parent 74c1f2d94e
commit 093615aeab
5 changed files with 50 additions and 30 deletions

View File

@ -9,9 +9,11 @@ repositories {
mavenCentral() mavenCentral()
} }
val vertxVersion = project.ext.get("vertx.version")
dependencies { dependencies {
implementation("commons-cli:commons-cli:1.9.0") implementation("commons-cli:commons-cli:1.9.0")
implementation("io.vertx:vertx-core:4.5.9") implementation("io.vertx:vertx-core:$vertxVersion")
implementation("io.vertx:vertx-web:$vertxVersion")
implementation("ch.qos.logback:logback-classic:1.5.6") implementation("ch.qos.logback:logback-classic:1.5.6")
implementation("org.apache.kafka:kafka-clients:3.9.0") implementation("org.apache.kafka:kafka-clients:3.9.0")
testImplementation(platform("org.junit:junit-bom:5.10.0")) testImplementation(platform("org.junit:junit-bom:5.10.0"))

View File

@ -1,26 +1,35 @@
package ch.polgrabia.demos.client; package ch.polgrabia.demos.client;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.AbstractVerticle; import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult; import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions; import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse; import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.Router;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class WebApiParticle extends AbstractVerticle { public class WebApiParticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(WebApiParticle.class); private static final Logger logger = LoggerFactory.getLogger(WebApiParticle.class);
private final int clientPort;
private HttpServer server; private HttpServer server;
public WebApiParticle(int clientPort) {
this.clientPort = clientPort;
}
@Override @Override
public void start() throws Exception { public void start() throws Exception {
HttpServerOptions httpServerOptions = new HttpServerOptions(); HttpServerOptions httpServerOptions = new HttpServerOptions();
var router = Router.router(getVertx());
this.server = getVertx().createHttpServer(httpServerOptions); this.server = getVertx().createHttpServer(httpServerOptions);
this.server.requestHandler(this::handleCommandRequests); router.get("/message").respond(routingContext -> handleCommandRequests(routingContext.request()));
this.server.listen(8123, "localhost", this::handleListenCommand); this.server.requestHandler(router);
this.server.listen(clientPort, "localhost", this::handleListenCommand);
} }
@Override @Override
@ -28,19 +37,18 @@ public class WebApiParticle extends AbstractVerticle {
server.close(); server.close();
} }
private void handleCommandRequests(HttpServerRequest httpServerRequest) { private Future<Void> handleCommandRequests(HttpServerRequest httpServerRequest) {
logger.info("Received command request on 8123"); logger.info("Received command request on {}", clientPort);
HttpServerResponse response = httpServerRequest.response(); HttpServerResponse response = httpServerRequest.response();
String qParam = httpServerRequest.getParam("q"); String qParam = httpServerRequest.getParam("q");
if (qParam == null || qParam.isBlank()) { if (qParam == null || qParam.isBlank()) {
response.setStatusCode(400); response.setStatusCode(HttpResponseStatus.BAD_REQUEST.code());
response.end("bad request"); return response.end("%s\n".formatted(HttpResponseStatus.BAD_REQUEST.reasonPhrase()));
return;
} }
logger.info("Sending binary message: {}", qParam); logger.info("Sending binary message: {}", qParam);
getVertx().eventBus().publish(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, Buffer.buffer(qParam)); getVertx().eventBus().publish(Constants.CLIENT_WEBSOCKET_SENDER_EVENTBUS_KEY, Buffer.buffer(qParam));
response.setStatusCode(202); response.setStatusCode(HttpResponseStatus.ACCEPTED.code());
response.end("Ok"); return response.end("%s\n".formatted(HttpResponseStatus.ACCEPTED.reasonPhrase()));
} }
private void handleListenCommand(AsyncResult<HttpServer> httpServerAsyncResult) { private void handleListenCommand(AsyncResult<HttpServer> httpServerAsyncResult) {

View File

@ -9,7 +9,11 @@ public class WebsocketClientApp {
public static void main(String[] args) { public static void main(String[] args) {
var vertx = Vertx.vertx(); var vertx = Vertx.vertx();
logger.info("Deploying websocket particle"); logger.info("Deploying websocket particle");
WebsocketClientParticle websocketClientParticle = new WebsocketClientParticle("localhost", 8080, "/test"); WebsocketClientParticle websocketClientParticle = new WebsocketClientParticle(
"localhost",
8080,
"/test",
8123);
vertx.deployVerticle(websocketClientParticle); vertx.deployVerticle(websocketClientParticle);
logger.info("Deployed websocket particle"); logger.info("Deployed websocket particle");
websocketClientParticle websocketClientParticle

View File

@ -16,26 +16,31 @@ import java.util.concurrent.CompletableFuture;
public class WebsocketClientParticle extends AbstractVerticle { public class WebsocketClientParticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(WebsocketClientParticle.class); private static final Logger logger = LoggerFactory.getLogger(WebsocketClientParticle.class);
private final String hostname; private final String webSocketHostname;
private final int port; private final int webSocketPort;
// path maps to channel // path maps to channel
private final String path; private final String webSocketPath;
private final String channel; private final String channel;
private final ClientPeriodicStdinScannerParticle clientPeriodicStdinScannerParticle = new ClientPeriodicStdinScannerParticle( private final ClientPeriodicStdinScannerParticle clientPeriodicStdinScannerParticle = new ClientPeriodicStdinScannerParticle(
System.in System.in
); );
private final WebApiParticle webApiParticle = new WebApiParticle(); private final WebApiParticle webApiParticle;
private HttpClient client; private HttpClient client;
private boolean isShuttingDownActivated = false; private boolean isShuttingDownActivated = false;
private final Object runningMonitor = new Object(); private final Object runningMonitor = new Object();
private WebSocketSenderParticle webSocketSenderParticle; private WebSocketSenderParticle webSocketSenderParticle;
private WebSocketReceiverParticle webSocketReceiverParticle; private WebSocketReceiverParticle webSocketReceiverParticle;
public WebsocketClientParticle(String hostname, int port, String path) { public WebsocketClientParticle(
this.hostname = hostname; String webSocketHostname,
this.port = port; int webSocketPort,
this.path = path; String webSocketPath,
this.channel = TopicUtil.convertTopicName(path); int clientApiPort) {
this.webSocketHostname = webSocketHostname;
this.webSocketPort = webSocketPort;
this.webSocketPath = webSocketPath;
this.channel = TopicUtil.convertTopicName(webSocketPath);
this.webApiParticle = new WebApiParticle(clientApiPort);
} }
@Override @Override
@ -44,7 +49,7 @@ public class WebsocketClientParticle extends AbstractVerticle {
.createHttpClient(); .createHttpClient();
logger.info("Starting websocket client command server"); logger.info("Starting websocket client command server");
new WebSocketClientImpl((VertxInternal) getVertx(), new HttpClientOptions(), new CloseFuture()) new WebSocketClientImpl((VertxInternal) getVertx(), new HttpClientOptions(), new CloseFuture())
.webSocket(port, hostname, path, this::handleMessage); .webSocket(webSocketPort, webSocketHostname, webSocketPath, this::handleMessage);
getVertx().deployVerticle(clientPeriodicStdinScannerParticle); getVertx().deployVerticle(clientPeriodicStdinScannerParticle);
getVertx().deployVerticle(webApiParticle); getVertx().deployVerticle(webApiParticle);
} }

View File

@ -0,0 +1 @@
vertx.version=4.5.12