diff --git a/spring-showcase-web/build.gradle b/spring-showcase-web/build.gradle index ae2640f..c5eda31 100644 --- a/spring-showcase-web/build.gradle +++ b/spring-showcase-web/build.gradle @@ -15,6 +15,7 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-data-jpa' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.postgresql:postgresql:42.2.18' + implementation 'org.springframework.kafka:spring-kafka:2.6.4' providedRuntime 'org.springframework.boot:spring-boot-starter-tomcat' testImplementation 'org.springframework.boot:spring-boot-starter-test' } diff --git a/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/ApplicationConfig.java b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/ApplicationConfig.java deleted file mode 100644 index a53c9b3..0000000 --- a/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/ApplicationConfig.java +++ /dev/null @@ -1,8 +0,0 @@ -package ch.polgrabia.springshowcaseweb.configs; - -import org.springframework.context.annotation.Configuration; - -@Configuration -public class ApplicationConfig { - -} diff --git a/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/KafkaAdminConfig.java b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/KafkaAdminConfig.java new file mode 100644 index 0000000..f3a424b --- /dev/null +++ b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/KafkaAdminConfig.java @@ -0,0 +1,45 @@ +package ch.polgrabia.springshowcaseweb.configs; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.PropertySource; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.KafkaAdmin; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +@EnableKafka +@PropertySource("classpath:application.properties") +public class KafkaAdminConfig { + + @Value("${springshowcase.consumer.bootstrap.address}") + private String bootstrapAddress; + + @Value("${springshowcase.consumer.bootstrap.topic1.name}") + private String topic1Name; + + @Value("${springshowcase.consumer.bootstrap.topic1.replicas}") + private Integer replicas; + + @Value("${springshowcase.consumer.bootstrap.topic1.partitions}") + private Integer partitions; + + + @Bean + public KafkaAdmin produceKafkaAdmin() { + Map config = new HashMap<>(); + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + return new KafkaAdmin(config); + } + + @Bean + public NewTopic produceTopic() { + return new NewTopic(topic1Name, partitions, (short) ((int) replicas)); + } + +} diff --git a/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/KafkaConsumerConfig.java b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/KafkaConsumerConfig.java new file mode 100644 index 0000000..25c1877 --- /dev/null +++ b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/KafkaConsumerConfig.java @@ -0,0 +1,44 @@ +package ch.polgrabia.springshowcaseweb.configs; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.PropertySource; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +@EnableKafka +@PropertySource("classpath:application.properties") +public class KafkaConsumerConfig { + + @Value("${springshowcase.consumer.bootstrap.address}") + private String bootstrapAddress; + + @Value("${springshowcase.consumer.groupId}") + private String groupId; + + @Bean + public ConsumerFactory consumerFactory() { + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return new DefaultKafkaConsumerFactory<>(config); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + var factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } +} diff --git a/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/KafkaProducerConfig.java b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/KafkaProducerConfig.java new file mode 100644 index 0000000..241a710 --- /dev/null +++ b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/KafkaProducerConfig.java @@ -0,0 +1,38 @@ +package ch.polgrabia.springshowcaseweb.configs; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.PropertySource; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +@EnableKafka +@PropertySource("classpath:application.properties") +public class KafkaProducerConfig { + + @Value("${springshowcase.consumer.bootstrap.address}") + private String bootstrapAddress; + + @Bean + public ProducerFactory producerFactory() { + Map config = new HashMap<>(); + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory<>(config); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/controllers/KafkaController.java b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/controllers/KafkaController.java new file mode 100644 index 0000000..c1ab04a --- /dev/null +++ b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/controllers/KafkaController.java @@ -0,0 +1,36 @@ +package ch.polgrabia.springshowcaseweb.controllers; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.PropertySource; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/api/messaging") +@PropertySource("classpath:application.properties") +public class KafkaController { + + private final KafkaTemplate kafkaTemplate; + + @Value("${springshowcase.consumer.bootstrap.topic1.name}") + private String topic1; + + public KafkaController( + @Autowired KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + @RequestMapping(path = "/send/{message}") + public void sendMessage(@PathVariable("message") String message) { + kafkaTemplate.send(topic1, message); + } + + @KafkaListener(topics = "topic1", groupId = "group1") + public void handleMessage(String message) { + System.out.printf("Got message: %s", message); + } +} diff --git a/spring-showcase-web/src/main/resources/application.properties b/spring-showcase-web/src/main/resources/application.properties index 7ea764c..95b1f11 100644 --- a/spring-showcase-web/src/main/resources/application.properties +++ b/spring-showcase-web/src/main/resources/application.properties @@ -4,3 +4,9 @@ spring.datasource.username=test spring.datasource.password=test springshowcase.users.controller.page.size=10 +springshowcase.consumer.bootstrap.address=localhost:9092 + +springshowcase.consumer.bootstrap.topic1.name=topic1 +springshowcase.consumer.bootstrap.topic1.replicas=1 +springshowcase.consumer.bootstrap.topic1.partitions=1 +springshowcase.consumer.groupId=group1