Refactoring for reading config options.
parent
ffc7e1a4f2
commit
091fdaa8b8
|
@ -10,6 +10,7 @@ repositories {
|
||||||
}
|
}
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
|
implementation("commons-cli:commons-cli:1.9.0")
|
||||||
implementation("io.vertx:vertx-core:4.5.9")
|
implementation("io.vertx:vertx-core:4.5.9")
|
||||||
implementation("ch.qos.logback:logback-classic:1.5.6")
|
implementation("ch.qos.logback:logback-classic:1.5.6")
|
||||||
implementation("org.apache.kafka:kafka-clients:3.9.0")
|
implementation("org.apache.kafka:kafka-clients:3.9.0")
|
||||||
|
|
|
@ -1,32 +1,69 @@
|
||||||
package ch.polgrabia.demos.server;
|
package ch.polgrabia.demos.server;
|
||||||
|
|
||||||
import io.vertx.core.Vertx;
|
import io.vertx.core.Vertx;
|
||||||
|
import org.apache.commons.cli.CommandLine;
|
||||||
|
import org.apache.commons.cli.CommandLineParser;
|
||||||
|
import org.apache.commons.cli.DefaultParser;
|
||||||
|
import org.apache.commons.cli.Options;
|
||||||
|
import org.apache.commons.cli.ParseException;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
import org.apache.kafka.clients.producer.Producer;
|
import org.apache.kafka.clients.producer.Producer;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
public class WebsocketServerApp {
|
public class WebsocketServerApp {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(WebsocketServerApp.class);
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
Properties props = new Properties();
|
|
||||||
props.put("bootstrap.servers", "localhost:9092");
|
|
||||||
props.put("linger.ms", 1);
|
|
||||||
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
|
||||||
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
|
|
||||||
|
|
||||||
Producer<String, byte[]> kafkaProducer = new KafkaProducer<>(props);
|
|
||||||
|
|
||||||
|
|
||||||
var vertx = Vertx.vertx();
|
var vertx = Vertx.vertx();
|
||||||
WebsocketServerChatVerticle websocketServerChatVerticle = new WebsocketServerChatVerticle(
|
Producer<String, byte[]> kafkaProducer = null;
|
||||||
8080,
|
try {
|
||||||
"localhost",
|
CommandLineParser parser = new DefaultParser();
|
||||||
null); // kafka producer
|
var options = new Options()
|
||||||
vertx.deployVerticle(websocketServerChatVerticle);
|
.addOption("h", "host", true, "Host")
|
||||||
websocketServerChatVerticle
|
.addOption("p", "port", true, "Port")
|
||||||
.waitUntilFinished()
|
.addOption("ke", "kafka-enabled", true, "Enable Kafka server")
|
||||||
.join();
|
.addOption("bs", "bootstrap-servers", true, "Bootstrap server address");
|
||||||
vertx.close();
|
CommandLine result = parser.parse(options, args);
|
||||||
kafkaProducer.close();
|
logger.info("Got options: {}", result);
|
||||||
|
boolean isKafkaEnabled = "true".equals(result.getOptionValue("kafka-enabled"));
|
||||||
|
String bootstrapServers = result.getOptionValue("bootstrap-servers");
|
||||||
|
logger.info("Got options kafka.enabled: {}", isKafkaEnabled);
|
||||||
|
if (isKafkaEnabled) {
|
||||||
|
logger.info("Bootstrap servers {}", bootstrapServers);
|
||||||
|
Properties props = new Properties();
|
||||||
|
props.put("bootstrap.servers", bootstrapServers);
|
||||||
|
props.put("linger.ms", 1);
|
||||||
|
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||||
|
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
|
||||||
|
|
||||||
|
kafkaProducer = new KafkaProducer<>(props);
|
||||||
|
}
|
||||||
|
|
||||||
|
var config = new WebsocketServerConfig(
|
||||||
|
result.getOptionValue("host"),
|
||||||
|
result.getOptionValue("port"),
|
||||||
|
isKafkaEnabled,
|
||||||
|
bootstrapServers,
|
||||||
|
kafkaProducer);
|
||||||
|
|
||||||
|
logger.info("Configuration {}", config);
|
||||||
|
|
||||||
|
WebsocketServerChatVerticle websocketServerChatVerticle = new WebsocketServerChatVerticle(config); // kafka producer
|
||||||
|
vertx.deployVerticle(websocketServerChatVerticle);
|
||||||
|
websocketServerChatVerticle
|
||||||
|
.waitUntilFinished()
|
||||||
|
.join();
|
||||||
|
} catch (ParseException e) {
|
||||||
|
logger.error("Invalid use of configuration", e);
|
||||||
|
} finally {
|
||||||
|
vertx.close();
|
||||||
|
if (kafkaProducer != null) {
|
||||||
|
kafkaProducer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -18,10 +18,11 @@ public class WebsocketServerChatVerticle extends AbstractVerticle {
|
||||||
private final Object runningMonitor = new Object();
|
private final Object runningMonitor = new Object();
|
||||||
private boolean shutdownActivation = false;
|
private boolean shutdownActivation = false;
|
||||||
|
|
||||||
public WebsocketServerChatVerticle(int port, String hostname, Producer<String, byte[]> kafkaProducer) {
|
public WebsocketServerChatVerticle(WebsocketServerConfig websocketServerConfig) {
|
||||||
this.port = port;
|
this.port = websocketServerConfig.port() == null ? 8080 : Integer.parseInt(websocketServerConfig.port());
|
||||||
this.hostname = hostname;
|
this.hostname = websocketServerConfig.host() == null ? "localhost" : websocketServerConfig.host();
|
||||||
this.kafkaProducer = kafkaProducer;
|
this.kafkaProducer = websocketServerConfig.kafkaProducer();
|
||||||
|
logger.info("Host: {}, port {}", hostname, port);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
package ch.polgrabia.demos.server;
|
||||||
|
|
||||||
|
public record WebsocketServerConfig(
|
||||||
|
String host,
|
||||||
|
String port,
|
||||||
|
boolean isKafkaEnabled,
|
||||||
|
String bootstrapServers,
|
||||||
|
org.apache.kafka.clients.producer.Producer<String,byte[]> kafkaProducer) {
|
||||||
|
}
|
Loading…
Reference in New Issue