diff --git a/spring-showcase-web/build.gradle b/spring-showcase-web/build.gradle index c5eda31..d594600 100644 --- a/spring-showcase-web/build.gradle +++ b/spring-showcase-web/build.gradle @@ -16,6 +16,7 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.postgresql:postgresql:42.2.18' implementation 'org.springframework.kafka:spring-kafka:2.6.4' + implementation 'org.apache.kafka:kafka-streams:2.6.0' providedRuntime 'org.springframework.boot:spring-boot-starter-tomcat' testImplementation 'org.springframework.boot:spring-boot-starter-test' } diff --git a/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/KafkaConsumerConfig.java b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/KafkaConsumerConfig.java index 25c1877..a1a8e21 100644 --- a/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/KafkaConsumerConfig.java +++ b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/KafkaConsumerConfig.java @@ -1,6 +1,7 @@ package ch.polgrabia.springshowcaseweb.configs; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; @@ -26,7 +27,7 @@ public class KafkaConsumerConfig { private String groupId; @Bean - public ConsumerFactory consumerFactory() { + public ConsumerFactory consumerStringFactory() { Map config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); @@ -36,9 +37,26 @@ public class KafkaConsumerConfig { } @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + public ConcurrentKafkaListenerContainerFactory kafkaListenerStringContainerFactory() { var factory = new ConcurrentKafkaListenerContainerFactory(); - factory.setConsumerFactory(consumerFactory()); + factory.setConsumerFactory(consumerStringFactory()); + return factory; + } + + @Bean + public ConsumerFactory consumerLongFactory() { + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + return new DefaultKafkaConsumerFactory<>(config); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerLongContainerFactory() { + var factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerLongFactory()); return factory; } } diff --git a/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/KafkaStreamsConfig.java b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/KafkaStreamsConfig.java new file mode 100644 index 0000000..c146f5f --- /dev/null +++ b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/configs/KafkaStreamsConfig.java @@ -0,0 +1,64 @@ +package ch.polgrabia.springshowcaseweb.configs; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.PropertySource; + +import java.util.Arrays; +import java.util.Properties; + +@Configuration +@PropertySource("classpath:application.properties") +public class KafkaStreamsConfig { + + private static final String APPLICATION_ID = "app_id"; + + @Value("${springshowcase.consumer.bootstrap.address}") + private String bootstrapAddress; + + @Bean + @Qualifier("streamsConfig") + public Properties kStreamsConfigs() { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + // props.put(StreamsConfig.STATE_DIR_CONFIG, stateStoreDir.toString()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + return props; + } + + @Bean + @Qualifier("streamsBuilder") + public StreamsBuilder streamsBuilder() { + StreamsBuilder sb = new StreamsBuilder(); + + KStream textLines = sb.stream("topic1"); + KTable wordCounts = textLines + .flatMapValues(textLine -> Arrays.asList(textLine.split("\\W+"))) + .groupBy((key, word) -> word) + .count(Materialized.as("counts-store")); + + wordCounts.toStream() + .to("counts-store-topic", Produced.with(Serdes.String(), Serdes.Long())); + return sb; + } + + @Bean + public KafkaStreams kafkaStreams( + @Autowired @Qualifier("streamsBuilder") StreamsBuilder sb, + @Autowired @Qualifier("streamsConfig") Properties props) { + return new KafkaStreams(sb.build(), props); + } +} diff --git a/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/controllers/KafkaController.java b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/controllers/KafkaController.java index c1ab04a..abe57ad 100644 --- a/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/controllers/KafkaController.java +++ b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/controllers/KafkaController.java @@ -1,5 +1,6 @@ package ch.polgrabia.springshowcaseweb.controllers; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.PropertySource; @@ -29,8 +30,13 @@ public class KafkaController { kafkaTemplate.send(topic1, message); } - @KafkaListener(topics = "topic1", groupId = "group1") - public void handleMessage(String message) { - System.out.printf("Got message: %s", message); + @KafkaListener(topics = "topic1", groupId = "group1", containerFactory = "kafkaListenerStringContainerFactory") + public void handleMessage(ConsumerRecord message) { + System.out.printf("Got message: %s\n", message.value()); + } + + @KafkaListener(topics = "counts-store-topic", groupId = "group1", containerFactory = "kafkaListenerLongContainerFactory") + public void handleCountsStore(ConsumerRecord message) { + System.out.printf("Got counts store message: %s - with value %d\n", message.key(), message.value()); } } diff --git a/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/services/KafkaService.java b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/services/KafkaService.java new file mode 100644 index 0000000..0646fd7 --- /dev/null +++ b/spring-showcase-web/src/main/java/ch/polgrabia/springshowcaseweb/services/KafkaService.java @@ -0,0 +1,28 @@ +package ch.polgrabia.springshowcaseweb.services; + +import org.apache.kafka.streams.KafkaStreams; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +@Service +public class KafkaService { + private final KafkaStreams kafkaStreams; + + public KafkaService( + @Autowired KafkaStreams kafkaStreams) { + this.kafkaStreams = kafkaStreams; + } + + @PostConstruct + public void init() { + kafkaStreams.start(); + } + + @PreDestroy + public void stop() { + kafkaStreams.close(); + } +}