Exemple implémentation reactive Kafka consumer et producer template avec spring-boot.
Date de publication : 24/03/2021
Reactive Kafka Consumer Template
ReactiveKafkaConsumerConfig.java
package com.example.reactivekafkaconsumerandproducer.config;import com.example.reactivekafkaconsumerandproducer.dto.FakeConsumerDTO;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collections;
@Configuration
public class ReactiveKafkaConsumerConfig {
@Bean
public ReceiverOptions<String, FakeConsumerDTO> kafkaReceiverOptions(@Value(value = "${FAKE_CONSUMER_DTO_TOPIC}") String topic, KafkaProperties kafkaProperties) {
ReceiverOptions<String, FakeConsumerDTO> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Collections.singletonList(topic));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, FakeConsumerDTO> reactiveKafkaConsumerTemplate(ReceiverOptions<String, FakeConsumerDTO> kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<String, FakeConsumerDTO>(kafkaReceiverOptions);
}
}
ReactiveConsumerService.java
package com.example.reactivekafkaconsumerandproducer.service;
import com.example.reactivekafkaconsumerandproducer.dto.FakeConsumerDTO;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
@Service
public class ReactiveConsumerService implements CommandLineRunner {
Logger log = LoggerFactory.getLogger(ReactiveConsumerService.class);
private final ReactiveKafkaConsumerTemplate<String, FakeConsumerDTO> reactiveKafkaConsumerTemplate;
public ReactiveConsumerService(ReactiveKafkaConsumerTemplate<String, FakeConsumerDTO> reactiveKafkaConsumerTemplate) {
this.reactiveKafkaConsumerTemplate = reactiveKafkaConsumerTemplate;
}
private Flux<FakeConsumerDTO> consumeFakeConsumerDTO() {
return reactiveKafkaConsumerTemplate
.receiveAutoAck()
// .delayElements(Duration.ofSeconds(2L)) // BACKPRESSURE
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.map(ConsumerRecord::value)
.doOnNext(fakeConsumerDTO -> log.info("successfully consumed {}={}", FakeConsumerDTO.class.getSimpleName(), fakeConsumerDTO))
.doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
}
@Override
public void run(String... args) {
// we have to trigger consumption
consumeFakeConsumerDTO().subscribe();
}
}
Reactive Kafka Producer Template
ReactiveKafkaProducerConfig.java
package com.example.reactivekafkaconsumerandproducer.config;
import com.example.reactivekafkaconsumerandproducer.dto.FakeProducerDTO;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import reactor.kafka.sender.SenderOptions;
import java.util.Map;
@Configuration
public class ReactiveKafkaProducerConfig {
@Bean
public ReactiveKafkaProducerTemplate<String, FakeProducerDTO> reactiveKafkaProducerTemplate(
KafkaProperties properties) {
Map<String, Object> props = properties.buildProducerProperties();
return new ReactiveKafkaProducerTemplate<String, FakeProducerDTO>(SenderOptions.create(props));
}
}
ReactiveProducerService.java
package com.example.reactivekafkaconsumerandproducer.service;
import com.example.reactivekafkaconsumerandproducer.dto.FakeProducerDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.springframework.stereotype.Service;
@Service
public class ReactiveProducerService {
private final Logger log = LoggerFactory.getLogger(ReactiveProducerService.class);
private final ReactiveKafkaProducerTemplate<String, FakeProducerDTO> reactiveKafkaProducerTemplate;
@Value(value = "${FAKE_PRODUCER_DTO_TOPIC}")
private String topic;
public ReactiveProducerService(ReactiveKafkaProducerTemplate<String, FakeProducerDTO> reactiveKafkaProducerTemplate) {
this.reactiveKafkaProducerTemplate = reactiveKafkaProducerTemplate;
}
public void send(FakeProducerDTO fakeProducerDTO) {
log.info("send to topic={}, {}={},", topic, FakeProducerDTO.class.getSimpleName(), fakeProducerDTO);
reactiveKafkaProducerTemplate.send(topic, fakeProducerDTO)
.doOnSuccess(senderResult -> log.info("sent {} offset : {}", fakeProducerDTO, senderResult.recordMetadata().offset()))
.subscribe();
}
}
application.properties
Cette configuration peut aussi être écrite en java.
#application.properties
spring.kafka.bootstrap-servers=localhost:9200
# producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonDeserializer
# consumer
spring.kafka.consumer.group-id=reactivekafkaconsumerandproducer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
# json deserializer config
spring.kafka.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.use.type.headers=false
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.reactivekafkaconsumerandproducer.dto.FakeConsumerDTO
# topic
FAKE_PRODUCER_DTO_TOPIC=fake_producer_dto_topic
FAKE_CONSUMER_DTO_TOPIC=fake_consumer_dto_topic
pom.xml
La dépendance importante à rajouter en plus de spring-kafka c’est reactor-kafka.
<!--pom.xml-->
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="<http://maven.apache.org/POM/4.0.0>" xmlns:xsi="<http://www.w3.org/2001/XMLSchema-instance>"
xsi:schemaLocation="<http://maven.apache.org/POM/4.0.0> <https://maven.apache.org/xsd/maven-4.0.0.xsd>">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>reactivekafkaconsumerandproducer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>reactivekafkaconsumerandproducer</name>
<description>Reactive kafka consumer and producer example with tests</description>
<properties>
<java.version>11</java.version>
<reactor.kafka.version>1.2.2.RELEASE</reactor.kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>${reactor.kafka.version}</version>
</dependency>
<!--test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Avantages
Backpressure avec l’opérateur .delaysElements()
sur le reactiveKafkaConsumerTemplate
On peut choisir la cadence à laquelle consommer chaque message. Il ne faudra pas oublier de positionner la configuration spring.kafka.consumer.max.poll.records=1
pour avoir l’effet escompté.
.delayElements(Duration.ofSeconds(2L))
Voir la version non réactive : gitbook.deddy.me/test-dintegration-avec-spring-boot-et-kafka
Sources : github.com/Kevded/example-reactive-spring-kafka-consumer-and-producer