Sending (from rest data triggered) and receiving kafka messages.
parent
bfa751a51a
commit
ef96c091a6
|
@ -15,6 +15,7 @@ dependencies {
|
||||||
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
|
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
|
||||||
implementation 'org.springframework.boot:spring-boot-starter-web'
|
implementation 'org.springframework.boot:spring-boot-starter-web'
|
||||||
implementation 'org.postgresql:postgresql:42.2.18'
|
implementation 'org.postgresql:postgresql:42.2.18'
|
||||||
|
implementation 'org.springframework.kafka:spring-kafka:2.6.4'
|
||||||
providedRuntime 'org.springframework.boot:spring-boot-starter-tomcat'
|
providedRuntime 'org.springframework.boot:spring-boot-starter-tomcat'
|
||||||
testImplementation 'org.springframework.boot:spring-boot-starter-test'
|
testImplementation 'org.springframework.boot:spring-boot-starter-test'
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +0,0 @@
|
||||||
package ch.polgrabia.springshowcaseweb.configs;
|
|
||||||
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
|
|
||||||
@Configuration
|
|
||||||
public class ApplicationConfig {
|
|
||||||
|
|
||||||
}
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
package ch.polgrabia.springshowcaseweb.configs;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||||
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.context.annotation.PropertySource;
|
||||||
|
import org.springframework.kafka.annotation.EnableKafka;
|
||||||
|
import org.springframework.kafka.core.KafkaAdmin;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@EnableKafka
|
||||||
|
@PropertySource("classpath:application.properties")
|
||||||
|
public class KafkaAdminConfig {
|
||||||
|
|
||||||
|
@Value("${springshowcase.consumer.bootstrap.address}")
|
||||||
|
private String bootstrapAddress;
|
||||||
|
|
||||||
|
@Value("${springshowcase.consumer.bootstrap.topic1.name}")
|
||||||
|
private String topic1Name;
|
||||||
|
|
||||||
|
@Value("${springshowcase.consumer.bootstrap.topic1.replicas}")
|
||||||
|
private Integer replicas;
|
||||||
|
|
||||||
|
@Value("${springshowcase.consumer.bootstrap.topic1.partitions}")
|
||||||
|
private Integer partitions;
|
||||||
|
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaAdmin produceKafkaAdmin() {
|
||||||
|
Map<String, Object> config = new HashMap<>();
|
||||||
|
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
return new KafkaAdmin(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic produceTopic() {
|
||||||
|
return new NewTopic(topic1Name, partitions, (short) ((int) replicas));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,44 @@
|
||||||
|
package ch.polgrabia.springshowcaseweb.configs;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.context.annotation.PropertySource;
|
||||||
|
import org.springframework.kafka.annotation.EnableKafka;
|
||||||
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||||
|
import org.springframework.kafka.core.ConsumerFactory;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@EnableKafka
|
||||||
|
@PropertySource("classpath:application.properties")
|
||||||
|
public class KafkaConsumerConfig {
|
||||||
|
|
||||||
|
@Value("${springshowcase.consumer.bootstrap.address}")
|
||||||
|
private String bootstrapAddress;
|
||||||
|
|
||||||
|
@Value("${springshowcase.consumer.groupId}")
|
||||||
|
private String groupId;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConsumerFactory<String, String> consumerFactory() {
|
||||||
|
Map<String, Object> config = new HashMap<>();
|
||||||
|
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
|
||||||
|
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
|
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
|
return new DefaultKafkaConsumerFactory<>(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
|
||||||
|
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
|
||||||
|
factory.setConsumerFactory(consumerFactory());
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
package ch.polgrabia.springshowcaseweb.configs;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.context.annotation.PropertySource;
|
||||||
|
import org.springframework.kafka.annotation.EnableKafka;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.kafka.core.ProducerFactory;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@EnableKafka
|
||||||
|
@PropertySource("classpath:application.properties")
|
||||||
|
public class KafkaProducerConfig {
|
||||||
|
|
||||||
|
@Value("${springshowcase.consumer.bootstrap.address}")
|
||||||
|
private String bootstrapAddress;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ProducerFactory<String, String> producerFactory() {
|
||||||
|
Map<String, Object> config = new HashMap<>();
|
||||||
|
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
|
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
|
return new DefaultKafkaProducerFactory<>(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaTemplate<String, String> kafkaTemplate() {
|
||||||
|
return new KafkaTemplate<>(producerFactory());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,36 @@
|
||||||
|
package ch.polgrabia.springshowcaseweb.controllers;
|
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.PropertySource;
|
||||||
|
import org.springframework.kafka.annotation.KafkaListener;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.web.bind.annotation.PathVariable;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/api/messaging")
|
||||||
|
@PropertySource("classpath:application.properties")
|
||||||
|
public class KafkaController {
|
||||||
|
|
||||||
|
private final KafkaTemplate<String, String> kafkaTemplate;
|
||||||
|
|
||||||
|
@Value("${springshowcase.consumer.bootstrap.topic1.name}")
|
||||||
|
private String topic1;
|
||||||
|
|
||||||
|
public KafkaController(
|
||||||
|
@Autowired KafkaTemplate<String, String> kafkaTemplate) {
|
||||||
|
this.kafkaTemplate = kafkaTemplate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@RequestMapping(path = "/send/{message}")
|
||||||
|
public void sendMessage(@PathVariable("message") String message) {
|
||||||
|
kafkaTemplate.send(topic1, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
@KafkaListener(topics = "topic1", groupId = "group1")
|
||||||
|
public void handleMessage(String message) {
|
||||||
|
System.out.printf("Got message: %s", message);
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,3 +4,9 @@ spring.datasource.username=test
|
||||||
spring.datasource.password=test
|
spring.datasource.password=test
|
||||||
|
|
||||||
springshowcase.users.controller.page.size=10
|
springshowcase.users.controller.page.size=10
|
||||||
|
springshowcase.consumer.bootstrap.address=localhost:9092
|
||||||
|
|
||||||
|
springshowcase.consumer.bootstrap.topic1.name=topic1
|
||||||
|
springshowcase.consumer.bootstrap.topic1.replicas=1
|
||||||
|
springshowcase.consumer.bootstrap.topic1.partitions=1
|
||||||
|
springshowcase.consumer.groupId=group1
|
||||||
|
|
Loading…
Reference in New Issue