Compare commits

..

No commits in common. "c2f1372652ad177eee3d2258937a9c37cc2a8fd3" and "baf413a063be84ac247fb7abde8fa919bed13f32" have entirely different histories.

3 changed files with 8 additions and 6 deletions

View File

@ -9,7 +9,7 @@ public class WebsocketClientApp {
public static void main(String[] args) { public static void main(String[] args) {
var vertx = Vertx.vertx(); var vertx = Vertx.vertx();
logger.info("Deploying websocket particle"); logger.info("Deploying websocket particle");
WebsocketClientParticle websocketClientParticle = new WebsocketClientParticle("localhost", 8080, "/test"); WebsocketClientParticle websocketClientParticle = new WebsocketClientParticle("localhost", 8080, "/");
vertx.deployVerticle(websocketClientParticle); vertx.deployVerticle(websocketClientParticle);
logger.info("Deployed websocket particle"); logger.info("Deployed websocket particle");
websocketClientParticle websocketClientParticle

View File

@ -11,17 +11,18 @@ import io.vertx.core.impl.VertxInternal;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
public class WebsocketClientParticle extends AbstractVerticle { public class WebsocketClientParticle extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(WebsocketClientParticle.class); private static final Logger logger = LoggerFactory.getLogger(WebsocketClientParticle.class);
private final String hostname; private final String hostname;
private final int port; private final int port;
// path maps to channel
private final String path; private final String path;
private HttpClient client; private HttpClient client;
private boolean isShuttingDownActivated = false; private boolean isShuttingDownActivated = false;
private final Object runningMonitor = new Object(); private final Object runningMonitor = new Object();
private ByteArrayOutputStream clientTextInput;
public WebsocketClientParticle(String hostname, int port, String path) { public WebsocketClientParticle(String hostname, int port, String path) {
this.hostname = hostname; this.hostname = hostname;
@ -30,7 +31,7 @@ public class WebsocketClientParticle extends AbstractVerticle {
} }
@Override @Override
public void start() { public void start() throws Exception {
this.client = getVertx() this.client = getVertx()
.createHttpClient(); .createHttpClient();
new WebSocketClientImpl((VertxInternal) getVertx(), new HttpClientOptions(), new CloseFuture()) new WebSocketClientImpl((VertxInternal) getVertx(), new HttpClientOptions(), new CloseFuture())
@ -55,7 +56,7 @@ public class WebsocketClientParticle extends AbstractVerticle {
)); ));
webSocket webSocket
.binaryMessageHandler(msg -> logger.info("[{}] Got message: {}", path, msg)) .binaryMessageHandler(msg -> logger.info("Got message: {}", msg))
.closeHandler(unused -> { .closeHandler(unused -> {
try { try {
stop(); stop();
@ -67,7 +68,7 @@ public class WebsocketClientParticle extends AbstractVerticle {
} }
@Override @Override
public void stop() { public void stop() throws Exception {
isShuttingDownActivated = true; isShuttingDownActivated = true;
client.close(); client.close();
synchronized (runningMonitor) { synchronized (runningMonitor) {

View File

@ -1,5 +1,6 @@
package ch.polgrabia.demos.server; package ch.polgrabia.demos.server;
import ch.polgrabia.demos.client.WebsocketClientApp;
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket; import io.vertx.core.http.ServerWebSocket;
@ -26,7 +27,7 @@ public class WebsocketServerHandler implements Handler<ServerWebSocket> {
@Override @Override
public void handle(Buffer s) { public void handle(Buffer s) {
logger.info("[{}] Got message {}", serverWebSocket.path(), s); logger.info("Got message {}", s);
serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka serverWebSocket.writeBinaryMessage(Buffer.buffer("Pong: " + s.toString() + "\n")); // TBD send to kafka
} }
} }