Aller au contenu

DevioArchivesKafka

Introduction

DevioArchivesKafka est un module d'archive qui permet l'intégration de DevIO avec Apache Kafka, la plateforme de streaming distribuée. Ce module utilise un producteur Kafka Java via JNI (Java Native Interface) pour publier les données d'archivage DevIO vers des topics Kafka, avec support du format Apache Avro pour la sérialisation des données.

Le module convertit les données DevIO en messages Kafka structurés selon des schémas Avro configurables, permettant une intégration native avec l'écosystème Kafka et les outils de streaming temps réel.

Architecture et fonctionnement

Composants principaux

  • DevioArchivesKafka : Module C++ principal avec interface JNI
  • DevIOKafkaProducer : Producteur Kafka Java intégré
  • KafkaManager : Gestionnaire de threads et de polling
  • MachineGroup : Groupes de machines avec mapping Avro
  • AvroSchema : Gestion des schémas Apache Avro

Intégration JNI

Le module utilise JNI (Java Native Interface) pour : - Instancier le producteur Kafka Java - Publier des messages vers les topics - Gérer les propriétés Kafka - Contrôler le cycle de vie des connexions

Processus d'archivage

  1. Collecte : Réception des données DevIO via l'API d'archivage
  2. Filtrage : Application des gates et conditions
  3. Mapping : Conversion selon les schémas Avro configurés
  4. Sérialisation : Encodage Avro (binaire, JSON, ou pretty)
  5. Publication : Envoi vers les topics Kafka
  6. Monitoring : Logs et métriques de performance

Configuration XML

Fichier DevioArchivesKafka.xml

La configuration s'effectue via un fichier XML DevioArchivesKafka.xml placé dans le répertoire DevIO.

Structure de base

<?xml version="1.0" encoding="utf-8"?>
<DevioArchivesKafka>
    <KafkaProperties>
        <Property name="bootstrap.servers" value="kafka1:9092,kafka2:9092"/>
        <Property name="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
        <Property name="value.serializer" value="org.apache.kafka.common.serialization.ByteArraySerializer"/>
        <Property name="acks" value="all"/>
        <Property name="retries" value="3"/>
        <Property name="batch.size" value="16384"/>
        <Property name="linger.ms" value="5"/>
        <Property name="buffer.memory" value="33554432"/>
    </KafkaProperties>

    <MachineGroup name="sensors" topic="devio-sensors" avroFormat="binary" useKey="true" avroSchema="sensor-schema.avsc">
        <Gate DevIO="production_mode" polarity="true"/>
        <Mapping DevIO="timestamp" Avro="0"/>
        <Mapping DevIO="equipment_name" Avro="1"/>
        <Mapping DevIO="variable_name" Avro="2"/>
        <Mapping DevIO="value" Avro="3"/>
        <Mapping DevIO="status" Avro="4"/>
        <TopicProperties>
            <Property name="compression.type" value="snappy"/>
        </TopicProperties>
    </MachineGroup>
</DevioArchivesKafka>

Éléments de configuration

Balise <KafkaProperties>

Configure les propriétés globales du producteur Kafka.

Propriété Description Exemple
bootstrap.servers Liste des brokers Kafka "kafka1:9092,kafka2:9092"
key.serializer Sérialiseur des clés StringSerializer
value.serializer Sérialiseur des valeurs ByteArraySerializer
acks Niveau d'acquittement "all", "1", "0"
retries Nombre de tentatives 3
batch.size Taille des lots 16384
linger.ms Délai de lot 5
buffer.memory Mémoire tampon 33554432
compression.type Type de compression "snappy", "gzip", "lz4"

Balise <MachineGroup>

Définit un groupe de machines avec leur configuration Kafka.

Attribut Description Valeurs possibles
name Nom du groupe Chaîne unique
topic Topic Kafka de destination Nom du topic
avroFormat Format de sérialisation Avro "binary", "json", "pretty"
useKey Utiliser une clé de message "true", "false"
avroSchema Fichier de schéma Avro Chemin vers .avsc

Balise <Gate>

Définit une condition d'activation pour le groupe.

Attribut Description Exemple
DevIO Variable DevIO à surveiller "production_mode"
polarity Polarité de la condition "true", "false"

Balise <Mapping>

Mappe les champs DevIO vers les champs Avro.

Attribut Description Exemple
DevIO Champ source DevIO "timestamp", "equipment_name"
Avro Index du champ Avro 0, 1, 2, etc.

Balise <TopicProperties>

Propriétés spécifiques au topic (hérite des propriétés globales).

<TopicProperties>
    <Property name="compression.type" value="snappy"/>
    <Property name="max.request.size" value="1048576"/>
</TopicProperties>

Schémas Apache Avro

Format des schémas

Les schémas Avro définissent la structure des messages Kafka.

Exemple de schéma simple

{
  "type": "record",
  "name": "DevIOSensorData",
  "namespace": "com.technilog.devio",
  "fields": [
    {"name": "timestamp", "type": "long"},
    {"name": "equipment", "type": "string"},
    {"name": "variable", "type": "string"},
    {"name": "value", "type": "double"},
    {"name": "status", "type": "string"}
  ]
}

Exemple de schéma complexe

{
  "type": "record",
  "name": "DevIOComplexData",
  "namespace": "com.technilog.devio",
  "fields": [
    {"name": "timestamp", "type": "long"},
    {"name": "equipment", "type": {
      "type": "record",
      "name": "Equipment",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "location", "type": "string"},
        {"name": "type", "type": "string"}
      ]
    }},
    {"name": "measurements", "type": {
      "type": "array",
      "items": {
        "type": "record",
        "name": "Measurement",
        "fields": [
          {"name": "variable", "type": "string"},
          {"name": "value", "type": "double"},
          {"name": "unit", "type": "string"},
          {"name": "quality", "type": "int"}
        ]
      }
    }}
  ]
}

Formats de sérialisation

Binary (binaire)

  • Performance : Format le plus compact et rapide
  • Usage : Production, haute performance
  • Lisibilité : Non lisible directement

JSON

  • Performance : Plus volumineux mais lisible
  • Usage : Debug, intégration simple
  • Lisibilité : Format JSON standard

Pretty

  • Performance : Format JSON indenté
  • Usage : Debug, développement
  • Lisibilité : JSON formaté avec indentation

Configuration via DevIO Studio

1. Activation du module

  1. Ouvrir le projet avec DevIO Studio
  2. Aller dans ConfigurationArchives
  3. Ajouter une nouvelle archive de type Kafka
  4. Configurer les paramètres de connexion

2. Paramètres de base

Nom de l'archive : Kafka_Archive
Type : DevioArchivesKafka
Brokers Kafka : kafka1:9092,kafka2:9092
Topic par défaut : devio-data
Format Avro : binary

3. Configuration avancée

Schéma Avro : sensor-data.avsc
Compression : snappy
Acquittements : all
Buffer mémoire : 32MB

Exemples de configuration

Configuration basique

<?xml version="1.0" encoding="utf-8"?>
<DevioArchivesKafka>
    <KafkaProperties>
        <Property name="bootstrap.servers" value="localhost:9092"/>
        <Property name="acks" value="1"/>
        <Property name="retries" value="1"/>
    </KafkaProperties>

    <MachineGroup name="basic" topic="devio-basic" avroFormat="json" useKey="false">
        <Mapping DevIO="timestamp" Avro="0"/>
        <Mapping DevIO="equipment_name" Avro="1"/>
        <Mapping DevIO="value" Avro="2"/>
    </MachineGroup>
</DevioArchivesKafka>

Configuration haute performance

<?xml version="1.0" encoding="utf-8"?>
<DevioArchivesKafka>
    <KafkaProperties>
        <Property name="bootstrap.servers" value="kafka1:9092,kafka2:9092,kafka3:9092"/>
        <Property name="acks" value="all"/>
        <Property name="retries" value="5"/>
        <Property name="batch.size" value="65536"/>
        <Property name="linger.ms" value="10"/>
        <Property name="buffer.memory" value="67108864"/>
        <Property name="compression.type" value="snappy"/>
        <Property name="max.in.flight.requests.per.connection" value="1"/>
    </KafkaProperties>

    <MachineGroup name="highperf" topic="devio-highperf" avroFormat="binary" useKey="true" avroSchema="highperf-schema.avsc">
        <Gate DevIO="high_frequency_mode" polarity="true"/>
        <Mapping DevIO="timestamp" Avro="0"/>
        <Mapping DevIO="equipment_name" Avro="1"/>
        <Mapping DevIO="variable_name" Avro="2"/>
        <Mapping DevIO="value" Avro="3"/>
        <Mapping DevIO="quality" Avro="4"/>
        <TopicProperties>
            <Property name="compression.type" value="lz4"/>
            <Property name="max.request.size" value="2097152"/>
        </TopicProperties>
    </MachineGroup>
</DevioArchivesKafka>

Configuration sécurisée (SSL/SASL)

<?xml version="1.0" encoding="utf-8"?>
<DevioArchivesKafka>
    <KafkaProperties>
        <Property name="bootstrap.servers" value="secure-kafka1:9093,secure-kafka2:9093"/>
        <Property name="security.protocol" value="SASL_SSL"/>
        <Property name="sasl.mechanism" value="PLAIN"/>
        <Property name="sasl.jaas.config" value="org.apache.kafka.common.security.plain.PlainLoginModule required username='devio' password='password123';"/>
        <Property name="ssl.truststore.location" value="/path/to/kafka.client.truststore.jks"/>
        <Property name="ssl.truststore.password" value="truststore_password"/>
        <Property name="acks" value="all"/>
        <Property name="retries" value="3"/>
    </KafkaProperties>

    <MachineGroup name="secure" topic="devio-secure" avroFormat="binary" useKey="true" avroSchema="secure-schema.avsc">
        <Mapping DevIO="timestamp" Avro="0"/>
        <Mapping DevIO="equipment_name" Avro="1"/>
        <Mapping DevIO="variable_name" Avro="2"/>
        <Mapping DevIO="value" Avro="3"/>
        <Mapping DevIO="status" Avro="4"/>
    </MachineGroup>
</DevioArchivesKafka>

Intégration avec l'écosystème Kafka

Kafka Connect

Source Connector (lecture depuis Kafka)

{
  "name": "devio-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.avro.AvroConverter",
    "tasks.max": "3",
    "topics": "devio-sensors",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Sink Connector (écriture vers base de données)

{
  "name": "devio-jdbc-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "2",
    "topics": "devio-sensors",
    "connection.url": "jdbc:postgresql://localhost:5432/devio",
    "auto.create": "true",
    "auto.evolve": "true"
  }
}

Kafka Streams

Traitement temps réel des données DevIO

StreamsBuilder builder = new StreamsBuilder();

KStream<String, DevIOSensorData> sensorStream = builder
    .stream("devio-sensors", Consumed.with(Serdes.String(), avroSerde));

// Filtrage des valeurs anormales
KStream<String, DevIOSensorData> filteredStream = sensorStream
    .filter((key, value) -> value.getValue() > 0 && value.getValue() < 1000);

// Agrégation par fenêtre temporelle
KTable<Windowed<String>, Double> averages = filteredStream
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
    .aggregate(
        () -> 0.0,
        (key, value, aggregate) -> (aggregate + value.getValue()) / 2
    );

averages.toStream().to("devio-averages");

Schema Registry

Gestion des schémas Avro

# Enregistrement d'un schéma
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\":\"record\",\"name\":\"DevIOSensorData\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"}]}"}' \
  http://schema-registry:8081/subjects/devio-sensors-value/versions

# Évolution de schéma
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{"schema": "{\"type\":\"record\",\"name\":\"DevIOSensorData\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"location\",\"type\":\"string\",\"default\":\"\"}]}"}' \
  http://schema-registry:8081/subjects/devio-sensors-value/versions

Monitoring et observabilité

Métriques Kafka

Le module expose les métriques Kafka standard :

  • Throughput : Messages/seconde, bytes/seconde
  • Latence : Temps de production, batch.size moyen
  • Erreurs : Taux d'erreur, retries
  • Buffer : Utilisation mémoire, records en attente

Logs d'archivage

Les logs sont générés avec les niveaux : - ERROR : Erreurs de connexion, sérialisation - WARN : Avertissements de performance - INFO : Opérations normales, succès de publication - DEBUG : Détails techniques, mapping Avro

Surveillance des topics

# Vérification des topics
kafka-topics.sh --bootstrap-server localhost:9092 --list

# Consommation de test
kafka-avro-console-consumer --bootstrap-server localhost:9092 \
  --topic devio-sensors \
  --from-beginning \
  --property schema.registry.url=http://localhost:8081

# Métriques des producteurs
kafka-run-class.sh kafka.tools.JmxTool \
  --object-name kafka.producer:type=producer-metrics,client-id=devio-producer

Optimisation des performances

Configuration producteur

<!-- Optimisation pour débit élevé -->
<Property name="batch.size" value="131072"/>        <!-- 128KB -->
<Property name="linger.ms" value="20"/>             <!-- 20ms -->
<Property name="buffer.memory" value="134217728"/>  <!-- 128MB -->
<Property name="compression.type" value="lz4"/>
<Property name="acks" value="1"/>                   <!-- Équilibre perf/fiabilité -->

Configuration réseau

<!-- Optimisation réseau -->
<Property name="send.buffer.bytes" value="262144"/>      <!-- 256KB -->
<Property name="receive.buffer.bytes" value="262144"/>   <!-- 256KB -->
<Property name="max.request.size" value="2097152"/>      <!-- 2MB -->
<Property name="request.timeout.ms" value="30000"/>      <!-- 30s -->

Partitioning

<!-- Partitioning par équipement -->
<Property name="partitioner.class" value="org.apache.kafka.clients.producer.internals.DefaultPartitioner"/>

Sécurité

SSL/TLS

<Property name="security.protocol" value="SSL"/>
<Property name="ssl.truststore.location" value="/path/to/truststore.jks"/>
<Property name="ssl.truststore.password" value="truststore_password"/>
<Property name="ssl.keystore.location" value="/path/to/keystore.jks"/>
<Property name="ssl.keystore.password" value="keystore_password"/>
<Property name="ssl.key.password" value="key_password"/>

SASL Authentication

<Property name="security.protocol" value="SASL_PLAINTEXT"/>
<Property name="sasl.mechanism" value="SCRAM-SHA-256"/>
<Property name="sasl.jaas.config" value="org.apache.kafka.common.security.scram.ScramLoginModule required username='devio' password='password123';"/>

FAQ et dépannage

Q : Erreur de connexion aux brokers Kafka ?

R : Vérifier bootstrap.servers, la connectivité réseau, et les ports (9092 par défaut).

Q : Messages perdus ?

R : Configurer acks=all et retries > 0 pour garantir la livraison.

Q : Performance dégradée ?

R : Ajuster batch.size, linger.ms, et buffer.memory selon la charge.

Q : Erreur de sérialisation Avro ?

R : Vérifier la compatibilité du schéma et les mappings DevIO → Avro.

Q : Problème JNI ?

R : Vérifier l'installation Java et les variables d'environnement JVM.

Annexes et références utiles

Documentation Apache Kafka

  • Apache Kafka : Documentation officielle
  • Kafka Producer API : Référence API producteur
  • Apache Avro : Format de sérialisation

Outils de développement

  • Kafka Tool : Interface graphique pour Kafka
  • Confluent Control Center : Monitoring avancé
  • Schema Registry UI : Gestion des schémas Avro

Écosystème Confluent

  • Kafka Connect : Connecteurs pour intégration
  • KSQL : SQL pour streams Kafka
  • Kafka Streams : Traitement de flux

Documentation technique