Non Regression Test on Spring Boot microservices and Kafka

Présentation d’outils pour implémenter une solution de test de non régression dans une architecture microservices avec les technologies, Spring Boot, Kafka, PostgreSQL, MongoDB, et ElasticSearch.

Schéma de l’architecture
Schéma de l’architecture

Certains de ces microservices communiquent avec une base de données. Parfois, un microservice est responsable de publier un message sur un topic, un autre est responsable d’écouter et réagir face au message reçu sur un topic.

Les problématiques soulevées avant de pouvoir tester le bon fonctionnement d’une fonctionnalité au sein de cette architecture :

  • Comment tester l’existence d’un objet sur un topic Kafka ?
  • Comment tester l’existence d’un enregistrement dans une base de données PostgresSQL, MongoDB, Elastic Search ?

Les outils pour tester cette architecture

Postman Newman : Exécuter le scénario de recette

PostgREST : Interagir avec la base de données PostgreSQL via API REST

Kafka REST Proxy : Interagir avec un topic via API REST

Kafdrop : Interagir avec un topic via API REST et interface graphique

  • Fonctionnalités
    • Publier des messages sur un topic via API REST.
    • Récupérer des messages sur un topic via API REST.
    • Interface graphique pour consulter les messages présents sur un topic.
  • Lien

RestHeart MongoDB : Interagir avec une base de données via API REST

Fujifilm transfer Raw pictures from the Fujifilm Camera Remote App is tedious ! Do this instead.

Xiaomi Mi 9T RAW & JPEG import via USB C to USB C connection OTG Android

Xiaomi Mi 9T RAW & JPEG import via USB C to USB C connection OTG Android

How to import RAW photos from Fujifilm to your Android phone ?

Use the USB OTG option on Android !

Example with the Xiaomi Mi 9T and Fujifilm X-S10.

Connect a USB C to USB C between your phone and your Fujifilm Camera.

android raw & jpeg import options via usb otg
Android raw & jpeg import options via usb otg
Xiaomi Mi 9T RAW & JPEG import via USB C to USB C connection OTG Android
Xiaomi Mi 9T RAW & JPEG import via USB C to USB C connection OTG Android

Adobe Lightroom

If you have the Adobe Lightroom App installed on your phone, you can scan all JPEG, RAW files and Videos files in the memory card of your Camera. You can select which one to import directly into a Lightroom Album.

adobe lightroom android filter view import via usb otg 1 - Fujifilm transfer Raw pictures from the Fujifilm Camera Remote App is tedious ! Do this instead.
adobe lightroom android filter view import via usb otg
adobe lightroom android raw jpeg import camera via usb otg - Fujifilm transfer Raw pictures from the Fujifilm Camera Remote App is tedious ! Do this instead.
adobe lightroom android raw & jpeg import camera via usb otg

Google Photos

Google Photos can be used to navigate through your files, and import them.

google photo android jpeg import via usb otg - Fujifilm transfer Raw pictures from the Fujifilm Camera Remote App is tedious ! Do this instead.
google photo android jpeg import via usb otg
google photo android jpeg import via usb otg select - Fujifilm transfer Raw pictures from the Fujifilm Camera Remote App is tedious ! Do this instead.
google photo android jpeg import via usb otg select

Android File Manager

Lastly, with the Android file Manager the default file manager installed on your phone, example here with the file manager of the Xiaomi MI 9 T, import into a folder in the phone memory.

file manager android open preview jpeg import via usb otg
file manager android open preview jpeg import via usb otg
file manager android jpeg & raw import via usb otg
file manager android jpeg & raw import via usb otg
file manager android filter jpeg import via usb otg
file manager android filter jpeg import via usb otg

The advantage of directly connect the camera to the phone with a USB cable, it allows you to import the RAW files. Then you can edit RAW files directly into your Adobe Lightroom App.

Astuces de productivité du DEV (the 10x developper rules)

photo 1511376777868 611b54f68947 1 scaled - Astuces de productivité du DEV (the 10x developper rules)

10x developer rules 😅

Ce guide liste des astuces pour améliorer votre productivité en tant que développeur.

  1. Avoir une définition de ce qui est terminée akka Definition of Done et éventuellement un Definition of Ready (méthode agile). Cela t’obligera à être rigoureux. À partir de quel moment on peut considérer que ta tâche est terminée. Lorsque le client est satisfait ? Lorsque les critères d’acceptation sont validés. Lorsque le testeur à validé la recette ?
  2. Connaitre son rôle en tant que développeur Partons du principe que le rôle du développeur est d’apporter de la valeur à l’entreprise, c’est sa priorité numéro 1. Poses toi la question suivante dans chaque tâche que tu entames “Est-ce que je vais apporter de la valeur à mon entreprise en faisant cela ?” Si tu as un doute pose la question à ton collègue ou un supérieur.
  3. Résister au réfactoring quand ce n’est pas nécessaire On peut distinguer deux situations réfactoring :
    • Tu as besoin de refactorer du code pour faciliter l’implémentation d’une fonctionnalité.
    • Tu souhaites refactorer, car une partie du code n’est pas lisible (selon toi), ou n’est pas au goût du jour.
    C’est aussi une façon de te différencier en ne tombant pas dans le piège du refactoring. En effet beaucoup de développeurs auront pour réflexe de critiquer ce qui a été fait a telle endroit du code. Alors comment se différencier de ces développeurs de façon intelligente. Déjà avant de critiquer ouvertement ou même avec un collègue assures toi d’avoir déjà une solution, de comment améliorer la chose. Où sinon tournes ta phrase. En partant du principe que tu ne critiques pas, mais plutôt que tu demandes de l’aide à améliorer une partie du code.
  4. Tester son application
    • En la démarrant sur mon poste.
    • Je teste l’application une fois déployée sur l’environnement. (développement|recette) Cela me permet de :
      • Voir si je n’aurai pas oublié certains cas de test.
      • Vérifier mes logs, voir s’ils sont bien contextualisés. Même si mes tests s’exécutent correctement. Démarrer l’application sur mon poste
    • Je demande à une personne de tester mon application.
  5. Vérifier les logs une fois l’application déployée. Ok, j’ai pensé à mettre des logs dans mon implémentation, mais quand est-il vraiment lorsque j’exécute mon application ? Est-ce qu’ils sont suffisamment contextualisés ?
  6. Relire ma propre Merge|Pull Request À la recherche de faute d’orthographe. De TODO oublié dans le code source. S’il y a des parties du code dont je ne suis pas confiant, j’ajoute un commentaire pour attirer l’attention du relecteur et lui demander un avis.
  7. Éviter de faire du copier/coller de code source Ça peut paraître contre productif. Mais prendre le temps de réécrire un morceau de code trouvé sur Internet. Surtout si c’est du code qu’on ne connaît pas. D’une part permet de comprendre ligne après ligne ce que le code fait. Mais surtout ça me permet de le mémoriser.
  8. Utiliser les raccourcis clavier
  9. Quand je code, j’utilise régulièrement l’autocomplétion. C’est un moyen d’apprendre de nouvelles méthodes ou fonctionnalités. Également lire la documentation du code. Parfois, tu trouveras des informations pertinentes uniquement dans la documentation du code source et pas dans la documentation officielle du site web. Apprends à utiliser d’autres fonctionnalités comme “call hiérarchie”. https://jetbrains.developpez.com/tutoriel/top-dix-plugins-intellij-idea/
  10. Réfléchir à haute voix (think Loud) Le principe est simple lorsque tu es bloqué pendant un certain temps, exprimes ton problème à haute voix sans pour autant demander de l’aide à un collègue. Peut-être un collègue aura l’oreille attentive et t’apportera une solution.
  11. Prendre le temps de connaître son IDE, navigateur, système d’exploitation
  12. Utiliser des générateurs en ligne Je dois générer un mot de passe, pour un énième site ou je souhaite m’inscrire. Vient le moment de s’inscrire et de choisir un mot passe, inutile de perdre du temps en trouvant la bonne combinaison, je cherche un générateur de mot de passe sur Google. Il en existe même pour générer sa signature de mail.
  13. Ne pas taper plus de 2 touches sur le clavier sans regarder son écran. L’autocomplétion est partout, je ne peux pas me permettre d’écrire un mot complet en regardant uniquement le clavier, Alors que l’autocomplétion suggère peut-être le mot que je souhaite taper.
  14. Cultiver sa passion pour son métier. Où apprendre ce qui nous passionnes dans les tâches du quotidien, si je n’éprouve pas encore de passion. Si tu n’es pas passionné, tu ne comprendras pas pourquoi un collègue sur le même poste est plus productif que toi. Tu n’essayeras même pas d’observer sa façon de travail, pour copier de bonnes idées. Aussi, tu n’auras pas recours à ton imagination pour essayer de t’améliorer.
  15. Développer son expertise technique. Je peux approfondir mes connaissances sur l’une des technos que j’utilises.
    • Travaille sur un Side Project C’est le premier élément de cette liste. C’est la façon la plus amusante d’apprendre.
    • Faire de la veille ciblée Pourquoi ciblé ? Rappel toi, tu dois apporter de la valeur dans le milieu dans lequel tu évolues. Ta veille doit donc cibler les technos/domaines avec lesquels tu interagis lors de ton travail.
    • Acheter un livre sur une technologie Les livres techniques, peuvent être difficiles à terminer. L’objectif ne sera pas forcément de lire ce livre de A à Z. En revanches, tu t’attarderas sur le sommaire, repère les sujets que tu ne maîtrises pas. Et concentre ta lecture sur ces parties du livre. En étant conscient de tous les sujets abordés dans ton livre. Le jour où tu auras besoin d’une solution à un problème technique, tu sauras où chercher dans ce livre.
    • Suivre une formation N’attends pas que ton entreprise te propose une formation. Suis une formation gratuite ou payante. Les formations ont aussi des sommaires. Cible lorsque cela est possible les parties de la formation les plus pertinentes. Tu peux suivre la formation en entier. Mais tu en tireras plus si tu commences tout de suite à mettre en pratique ce que tu apprends dans un mini projet POC ou en prenant des notes sur une feuille blanche.
    • Écouter des podcasts
  16. Regarder les vidéos ou écouter les podcasts en vitesse accélérée 1.75x ou 2x.
  17. Je ne serai pas productif sans une bonne nuit de sommeil.
  18. Ne pas suivre à la lettre tout ce que tu lis.

Send multiple QueryParam with same name POST

The current behavior :

/users?hobbies=handball&hobbies=volley

A Solution :

<form action="/users" method="POST">
	<select class="form-control" name="hobbies[]">
    <option value="" selected disabled>Select Users</option>
    <option value="bread">Best bread available</option>
	</select>
</form>

Will generate a URL /users?hobbies[]=bread&name

In Laravel PHP, it will automatically create an array variable.

Héberger un site WordPress gratuitement, sans publicité

Hébergement Web Gratuit planethoster

Comment héberger un site WordPress gratuitement, sans publicité ? Avec la possibilité d’installer des extensions et le thème de son choix.

Avantages :

  • Certificat SSL gratuit
  • Nom de domaine personnalisé

Inconvénients :

  • Lenteur du site, dû aux faibles ressources allouées pour le site gratuit.

Tout cela est disponible dans l’offre World Lite de Planethoster gratuitement. Ce n’est pas l’offre qui est la plus mise en avant, mais en cherchant bien elle est bien présente sur le site.

Créer votre site WordPress sur PlanetHoster gratuitement

Aller sur le site avec ce lien https://www.planethoster.com/goph-deddy-saint-val

Comment utiliser WordPress. Créer un article.

https://openclassrooms.com/fr/courses/5489551-creez-un-site-moderne-et-professionnel-avec-wordpress-5

En 2021-2022 les outils no code et d’automatisation sont en vogues, Planethoster est l’un des seuls fournisseur d’hébergement qui propose des APIs pour gérer :

  • Domaine
  • FTP
  • Mail
  • Base de données
  • Gestion des comptes
  • Hébergement

How to run softinstigate/restheart on Heroku

We will create a custom Dockerfile and assign the Heroku $ PORT environment variable to the restheart HTTP_PORT environment variable.

Restheart : github.com/SoftInstigate/restheart

Running insctructions with Docker : hub.docker.com/r/softinstigate/restheart

The original Dockerfile :

FROM adoptopenjdk:16-jre-hotspotLABEL maintainer="SoftInstigate <info@softinstigate.com>"
WORKDIR /opt/restheart
COPY etc/restheart.yml etc/default.properties etc/acl.yml etc/users.yml etc/
COPY target/restheart.jar /opt/restheart/
COPY target/plugins/* /opt/restheart/plugins/
ENTRYPOINT [ "java", "-Dfile.encoding=UTF-8", "-server", "-jar", "restheart.jar", "etc/restheart.yml"]
CMD ["--envFile", "etc/default.properties"]
EXPOSE 8009 8080 4443

Dockerfile updated :

The important line here is :

ENTRYPOINT [“java”,"-DHTTP_PORT=$PORT", “-Dfile.encoding=UTF-8”, “-server”, “-jar”, “restheart.jar”, “etc/restheart.yml”].

# <https://github.com/SoftInstigate/restheart>
FROM softinstigate/restheart:6.1.2
ENTRYPOINT ["java","-DHTTP_PORT=$PORT", "-Dfile.encoding=UTF-8", "-server", "-jar", "restheart.jar", "etc/restheart.yml"]
CMD [ "--envFile", "etc/default.properties"]
COPY /restheart.yml /opt/restheart/etc/restheart.yml
COPY /default.properties /opt/restheart/etc/default.properties
COPY /acl.yml /opt/restheart/etc/acl.yml
COPY /users.yml /opt/restheart/etc/users.yml

Other environment variables like MONGO_URI can be configured in Dashboard> Settings> Config Vars.

Now you can follow the heroku tutorial to deploy with Docker : devcenter.heroku.com/categories/deploying-with-docker

Coding session 5 – un web crawler avec Puppeteer

Date de la session : 21/08/2021 15h00-18h00

Date de publication : 22/03/2021

Intro

Analyser comment crawler digicamdb.com.

Description

Un fichier JSON contenant la liste des noms des caméras est facilement récupérable depuis la console de développement.

D’ailleurs, on peut remarquer que le nombre d’éléments dans le JSON ne correspond pas au nombre de camera répertoriés sur le site. Le Fujifilm X-S10 est sorti en fin 2020 et est 4028e élément.

cs5 number-cameras-in-digicamdb-edited.gif

Au début, je pensais qu’avec cette liste, j’aurais pu facilement déterminer les URLs.

En partant du principe que le premier espace rencontré serait à remplacer par un _. Et les espaces suivant sont à remplacer par un -.

Example :

Fujifilm X-S10 ⇒ fujifilm_x-s10

Bien que cette règle fonctionne dans la plupart des cas, j’ai quand même trouvé certains cas où elle ne fonctionne pas.

cs5 inconsistence-url-edited.gif

Deuxième solution plus fiable, utiliser le formulaire qui sert de navigation en haut de la page.

Il suffira de copier chaque valeur du tableau dans le champ du formulaire, et de cliquer sur le bouton de recherche. Attendre le chargement de la page. Récupérer les informations. Recommencer avec la valeur suivante ceci plus de 4 000 fois.

cs5 form navigation-edited.gif

Objectifs

  • Crawler digicamdb.com
  • Récupérer la liste des URLs des pages
  • Définir une méthodologie pour naviguer sur les 4 000 + pages

Code

Le script crawler digicamdb sera créé en s’inspirant des scripts précédant.

Difficultés rencontrées

Rien à voir, mais j’ai décidé de faire des enregistrements d’écrans. Et les transformer en GIF pour les inclure dans les articles coding sessions. Ça rajoute une perte de temps dans la progression vers la finalisation de l’application.

Il faut enregistrer l’écran, éditer la vidéo retirer les moments ennuyeux. Parfois transformer en zoomant, croppant des parties de la vidéo. Après l’export convertir en GIF. Une étape supplémentaire en plus de rédiger l’article 😆.

Points positifs

Pas de point de blocage pour l’instant, c’est encourageant.

Nouvelles problématiques

Coder, écrire un article coding session, et dans un même temps capturer, éditer ma progression sous forme de vidéo/GIF.

Prochaines étapes

Exécuter le script crawler digicamdb.

Coding session 4 – un web crawler avec Puppeteer

Date de la session : 19/08/2021 18h30-00h00++

Date de publication : 22/09/2021

Intro

Exécution du deuxième script.

deuxième script crawler allphotolenses
deuxième script crawler allphotolenses

Description

Parfois, ça plante.

Mais grâce aux logs, il suffit de récupérer le nombre de l’itération qui a planté, et replacer la boucle for à l’endroit ou ça à planté.

script faillure-edited.gif
script failure

Lors de ma recherche de base de données, j’ai enfin trouvé le bon terme de recherche pour Google. “digital camera database”.

Je suis tombé sur ce site. teoalida.com/database/digitalcameras

C’est apparemment quelqu’un qui c’est spécialisé dans le scraping de site web. Il propose les données du site digicamdb.com. Contre un paiement de $75.32, marrant, c’est ma prochaine cible.

teoalida-digital-camera-database.PNG

Site très intéressant, je m’arrête là pour ce soir, il faut que visite plus en profondeur ce fameux site WordPress teoalida.com

Objectifs

  • Faire fonctionner le 2ième Crawler

Code

2ième script.

Récupère les informations sur l’objectif.

  • ‘Name’, ‘Focal length’, ‘Max. aperture’, ‘Min. aperture’, ‘Blades’, ‘Min. Focus (m.)’, ‘Filter Ø (mm.)’, ‘Weight (gr/oz)’, ‘Length (mm/in)’, pictures
import { chromium } from 'playwright';import * as fs from "fs";
const pages = require("./paths.json");
(async () => {
    const browser = await chromium.launch();
    const page = await browser.newPage();
    // START
    let results: any[][] = [
        [
            'Name',
            'Focal length',
            'Max. aperture',
            'Min. aperture',
            'Blades',
            'Min. Focus (m.)',
            'Filter Ø (mm.)',
            'Weight (gr/oz)',
            'Length (mm/in)'
        ]
    ];
    let fileNumber = 0;
    console.time('time_elapsed')
    for (let i = 526; i < pages.length; i++) {
        await page.goto(pages[i], { waitUntil: 'networkidle', timeout:40000});
        console.log("iteration ", i, pages[i])
        let infos: any[] = await page.$eval('strong', elementStrong => {
            let values: string[] = [];
            //let parentElement = elementStrong.find(el => el.textContent === 'Specifications:')
            // let elementTh = elementStrong.filter(el => el.textContent === 'Specifications:')[0].parentElement.querySelectorAll('th');
            // headers = Array.from(elementTh).map(th => th.textContent)
            let elementTd = elementStrong.filter(el => el.textContent === 'Specifications:')[0].parentElement.querySelectorAll('td');
            values = Array.from(elementTd).map(r => r.textContent)
            return values;
        })
        let name: string = await page.$eval('h1', h1 => {
            return h1.textContent;
        })
        let pictures: string[] = await page.$eval('strong', elementStrong => {
            let values: string[] = [];
            let elementTd = elementStrong.filter(el => el.textContent === 'Pictures')[0].parentElement.querySelectorAll('a');
            values = Array.from(elementTd).map(r => '<http://allphotolenses.com>' + r.getAttribute('href'))
            return values;
        })
        let pics = {
            pictures: pictures
        };
        infos.push(pics);
        // rotate file
        let r = await page.evaluate(({ results, name, infos, fileNumber }) => {
            if (results.length == 100) {
                fileNumber = fileNumber + 1;
                results = [];
            }
            results.push([name].concat(infos));
            return { fileNumber, results }
        }, { results, name, infos, fileNumber });
        results = r.results;
        fileNumber = r.fileNumber
        console.log("write to file ", r.fileNumber, results.length)
        fs.writeFileSync(`./results/lenses-${r.fileNumber}.json`, JSON.stringify(r.results));
    };
    console.log("results length", results.length);
    console.log("results :", results);
    // END
    console.timeEnd('time_elapsed');
    await browser.close();
})();

Difficultés rencontrées

  • Trouver la bonne technique pour traverser le DOM
  • Le script plante parfois pour accéder à une page. Lié à une erreur de chargement de la page. Cela induit une opération manuelle pour relancer le crawl. 😑

Points positifs

  • 2ième script exécution ok partielle.

Nouvelles problématiques

Fixer les erreurs de chargement de la page lors de l’exécution du script.

Prochaines étapes

Crawler digicamdb.com.

Coding session 3 – un web crawler avec Puppeteer

Date de la session : 19/08/2021 19h30-02h00++

Date de publication : 22/09/2021

Intro

Idée d’app. Pouvoir calculer le poids de son équipement appareil photo et voir quel stabilisateur serait approprié en fonction du poids supporté. (En écrivant ces lignes me vient une autre idée. Cette app pourrais aussi calculer le poids des équipements style trépieds, lumière, micro, batterie externe, disque dur, mais bon là ça devient fastidieux. Faut d’abord se concentrer sur une feature. Et cette feature doit être bien faite donc je disais…) Pour cette appli il me faut des données. À savoir les appareils photo noms et poids. Et les objectifs noms et poids.

Aucune base de données libre d’accès, n’est disponible ni d’API. En revanche, il existe des sites qui recenses des appareils photos et des objectifs.

Changeons de perspective internet est une base de données et les pages sont des tables.

Je veux récupérer des informations dans ces tables. Je vais devoir les crawler 😎.

Voici quelques sites qui semblent contenir les informations que je recherche.

Description

Pour récupérer les données de dxomark il suffit d’ouvrir la console. Et regarder les appels HTTP.

retrieve data from dxomark
Retrieve data from dxomark

Après analyse, il semblerait que les données de dxomark ne correspondent pas à ce que je recherche. Le poids de l’appareil photo n’est pas présent.

allphotolenses.com/lenses contient des informations sur les objectifs. Le nom et le poids de l’objectif sont présents. Même quelques images de l’objectif parfois 👌🏾.

En revanche, pour récupérer ces données ce n’est pas aussi simple que sur dxomark.

Les données ne sont pas requêtées depuis le client. Les pages sont soit statiques côté serveur ou généré côté serveur.

Du coup, il va falloir récupérer les informations directement dans la page HTML.

La première étape va consister à récupérer toutes les URLs des pages qui m’intéressent.

À savoir 3800+ URLs.

Seulement après avoir écrit et exécuté le 1er script je rendit compte que les URLs aurait pu facilement être deviné.

["Canon/Canon-EF-35mm-F14L-II-USM",
"Canon/Canon-EF-100mm-F2-USM",
"Fujifilm/Fujifilm-FUJINON-XF-200mm-F2-R-LM-OIS-WR"
]

J’aurais pu générer ces URLs avec une boucle sur le nom des objectifs contenu dans le JSON récupéré précédemment. Passer plus de temps sur l’analyse m’aurait évité du temps perdu a développer ce premier script. Mais ce n’est pas plus mal. J’ai pu monter en compétences sur l’utilisation de Puppetteer et de valider mon choix de d’outil.

Objectifs

  • Base de données cameras.
  • Base de données objectifs.
  • Base de données sur Google Sheets
  • API de recherche GET cameras/ & GET lenses/ +  Swagger
  • Crawler Nodejs

Code

Premier script permet de récupérer toutes les pages.

import { chromium } from 'playwright';
import * as fs from "fs";
(async () => {
    const browser = await chromium.launch();
    const page = await browser.newPage();
    // START
    const results = [];
    const pageLenesSize = Array.from({ length: 239 }, (_, index) => index + 1);;
    for (let i = 0; i < pageLenesSize.length; i++) {
        await page.goto('<https://allphotolenses.com/lenses/>' + `p_${i + 1}.html`,
            { waitUntil: 'networkidle' });
        console.log("iteration ", i)
        const trLight = await page.$eval('.light_tr', e => {
            const as = e.map(r => r.querySelector('a').href);
            return as;
        })
        const trDark = await page.$eval('.dark_tr', e => {
            const as = e.map(r => r.querySelector('a').href);
            return as;
        })
        console.log('found', trLight.length, trDark.length)
        console.log(trLight, trDark);
        results.push(...trLight);
        results.push(...trDark);
    };
    console.log("results length", results.length);
    fs.writeFileSync('./paths.json', JSON.stringify(results));
    // END
    await browser.close();
})();

Difficultés rencontrées

Quel Framework choisir ?

Le choix ira pour Apfy Playwright.

Points positifs

JSON to Google Sheets api via Pipedream.

Réalisation d’un premier crawler avec Puppetteer.

Un premier script permet de récupérer les pages à crawler.

Le deuxième script va récupérer les informations sur la page.

Import du JSON généré dans notre API JSON To Google Sheets.

Nouvelles problématiques

Est-ce légal de réutiliser ces données accessibles publiquement dans une autre application ?

Est-ce légal d’exposer ces données via une API REST publique ?

Reactive Kafka consumer/producer Spring Boot

Exemple implémentation reactive Kafka consumer et producer template avec spring-boot.


Date de publication : 24/03/2021

Reactive Kafka Consumer Template

ReactiveKafkaConsumerConfig.java

package com.example.reactivekafkaconsumerandproducer.config;import com.example.reactivekafkaconsumerandproducer.dto.FakeConsumerDTO;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collections;
@Configuration
public class ReactiveKafkaConsumerConfig {
    @Bean
    public ReceiverOptions<String, FakeConsumerDTO> kafkaReceiverOptions(@Value(value = "${FAKE_CONSUMER_DTO_TOPIC}") String topic, KafkaProperties kafkaProperties) {
        ReceiverOptions<String, FakeConsumerDTO> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
        return basicReceiverOptions.subscription(Collections.singletonList(topic));
    }
    @Bean
    public ReactiveKafkaConsumerTemplate<String, FakeConsumerDTO> reactiveKafkaConsumerTemplate(ReceiverOptions<String, FakeConsumerDTO> kafkaReceiverOptions) {
        return new ReactiveKafkaConsumerTemplate<String, FakeConsumerDTO>(kafkaReceiverOptions);
    }
}

ReactiveConsumerService.java

package com.example.reactivekafkaconsumerandproducer.service;
import com.example.reactivekafkaconsumerandproducer.dto.FakeConsumerDTO;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
@Service
public class ReactiveConsumerService implements CommandLineRunner {
    Logger log = LoggerFactory.getLogger(ReactiveConsumerService.class);
    private final ReactiveKafkaConsumerTemplate<String, FakeConsumerDTO> reactiveKafkaConsumerTemplate;
    public ReactiveConsumerService(ReactiveKafkaConsumerTemplate<String, FakeConsumerDTO> reactiveKafkaConsumerTemplate) {
        this.reactiveKafkaConsumerTemplate = reactiveKafkaConsumerTemplate;
    }
    private Flux<FakeConsumerDTO> consumeFakeConsumerDTO() {
        return reactiveKafkaConsumerTemplate
                .receiveAutoAck()
                // .delayElements(Duration.ofSeconds(2L)) // BACKPRESSURE
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                .map(ConsumerRecord::value)
                .doOnNext(fakeConsumerDTO -> log.info("successfully consumed {}={}", FakeConsumerDTO.class.getSimpleName(), fakeConsumerDTO))
                .doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
    }
    @Override
    public void run(String... args) {
        // we have to trigger consumption
        consumeFakeConsumerDTO().subscribe();
    }
}

Reactive Kafka Producer Template

ReactiveKafkaProducerConfig.java

package com.example.reactivekafkaconsumerandproducer.config;
import com.example.reactivekafkaconsumerandproducer.dto.FakeProducerDTO;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import reactor.kafka.sender.SenderOptions;
import java.util.Map;
@Configuration
public class ReactiveKafkaProducerConfig {
    @Bean
    public ReactiveKafkaProducerTemplate<String, FakeProducerDTO> reactiveKafkaProducerTemplate(
            KafkaProperties properties) {
        Map<String, Object> props = properties.buildProducerProperties();
        return new ReactiveKafkaProducerTemplate<String, FakeProducerDTO>(SenderOptions.create(props));
    }
}

ReactiveProducerService.java

package com.example.reactivekafkaconsumerandproducer.service;
import com.example.reactivekafkaconsumerandproducer.dto.FakeProducerDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.springframework.stereotype.Service;
@Service
public class ReactiveProducerService {
    private final Logger log = LoggerFactory.getLogger(ReactiveProducerService.class);
    private final ReactiveKafkaProducerTemplate<String, FakeProducerDTO> reactiveKafkaProducerTemplate;
    @Value(value = "${FAKE_PRODUCER_DTO_TOPIC}")
    private String topic;
    public ReactiveProducerService(ReactiveKafkaProducerTemplate<String, FakeProducerDTO> reactiveKafkaProducerTemplate) {
        this.reactiveKafkaProducerTemplate = reactiveKafkaProducerTemplate;
    }
    public void send(FakeProducerDTO fakeProducerDTO) {
        log.info("send to topic={}, {}={},", topic, FakeProducerDTO.class.getSimpleName(), fakeProducerDTO);
        reactiveKafkaProducerTemplate.send(topic, fakeProducerDTO)
                .doOnSuccess(senderResult -> log.info("sent {} offset : {}", fakeProducerDTO, senderResult.recordMetadata().offset()))
                .subscribe();
    }
}

application.properties

Cette configuration peut aussi être écrite en java.

#application.properties
spring.kafka.bootstrap-servers=localhost:9200
# producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonDeserializer
# consumer
spring.kafka.consumer.group-id=reactivekafkaconsumerandproducer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
# json deserializer config
spring.kafka.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.use.type.headers=false
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.reactivekafkaconsumerandproducer.dto.FakeConsumerDTO
# topic
FAKE_PRODUCER_DTO_TOPIC=fake_producer_dto_topic
FAKE_CONSUMER_DTO_TOPIC=fake_consumer_dto_topic

pom.xml

La dépendance importante à rajouter en plus de spring-kafka c’est reactor-kafka.

<!--pom.xml-->
				<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="<http://maven.apache.org/POM/4.0.0>" xmlns:xsi="<http://www.w3.org/2001/XMLSchema-instance>"
         xsi:schemaLocation="<http://maven.apache.org/POM/4.0.0> <https://maven.apache.org/xsd/maven-4.0.0.xsd>">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>reactivekafkaconsumerandproducer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>reactivekafkaconsumerandproducer</name>
    <description>Reactive kafka consumer and producer example with tests</description>
    <properties>
        <java.version>11</java.version>
        <reactor.kafka.version>1.2.2.RELEASE</reactor.kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor.kafka</groupId>
            <artifactId>reactor-kafka</artifactId>
            <version>${reactor.kafka.version}</version>
        </dependency>
        <!--test-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Avantages

Backpressure avec l’opérateur .delaysElements() sur le reactiveKafkaConsumerTemplate

On peut choisir la cadence à laquelle consommer chaque message. Il ne faudra pas oublier de positionner la configuration spring.kafka.consumer.max.poll.records=1 pour avoir l’effet escompté.

.delayElements(Duration.ofSeconds(2L))

Voir la version non réactive : gitbook.deddy.me/test-dintegration-avec-spring-boot-et-kafka

Sources : github.com/Kevded/example-reactive-spring-kafka-consumer-and-producer