DevioArchivesKafka
Introduction
DevioArchivesKafka est un module d'archive qui permet l'intégration de DevIO avec Apache Kafka, la plateforme de streaming distribuée. Ce module utilise un producteur Kafka Java via JNI (Java Native Interface) pour publier les données d'archivage DevIO vers des topics Kafka, avec support du format Apache Avro pour la sérialisation des données.
Le module convertit les données DevIO en messages Kafka structurés selon des schémas Avro configurables, permettant une intégration native avec l'écosystème Kafka et les outils de streaming temps réel.
Architecture et fonctionnement
Composants principaux
- DevioArchivesKafka : Module C++ principal avec interface JNI
- DevIOKafkaProducer : Producteur Kafka Java intégré
- KafkaManager : Gestionnaire de threads et de polling
- MachineGroup : Groupes de machines avec mapping Avro
- AvroSchema : Gestion des schémas Apache Avro
Intégration JNI
Le module utilise JNI (Java Native Interface) pour : - Instancier le producteur Kafka Java - Publier des messages vers les topics - Gérer les propriétés Kafka - Contrôler le cycle de vie des connexions
Processus d'archivage
- Collecte : Réception des données DevIO via l'API d'archivage
- Filtrage : Application des gates et conditions
- Mapping : Conversion selon les schémas Avro configurés
- Sérialisation : Encodage Avro (binaire, JSON, ou pretty)
- Publication : Envoi vers les topics Kafka
- Monitoring : Logs et métriques de performance
Configuration XML
Fichier DevioArchivesKafka.xml
La configuration s'effectue via un fichier XML DevioArchivesKafka.xml placé dans le répertoire DevIO.
Structure de base
<?xml version="1.0" encoding="utf-8"?>
<DevioArchivesKafka>
<KafkaProperties>
<Property name="bootstrap.servers" value="kafka1:9092,kafka2:9092"/>
<Property name="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<Property name="value.serializer" value="org.apache.kafka.common.serialization.ByteArraySerializer"/>
<Property name="acks" value="all"/>
<Property name="retries" value="3"/>
<Property name="batch.size" value="16384"/>
<Property name="linger.ms" value="5"/>
<Property name="buffer.memory" value="33554432"/>
</KafkaProperties>
<MachineGroup name="sensors" topic="devio-sensors" avroFormat="binary" useKey="true" avroSchema="sensor-schema.avsc">
<Gate DevIO="production_mode" polarity="true"/>
<Mapping DevIO="timestamp" Avro="0"/>
<Mapping DevIO="equipment_name" Avro="1"/>
<Mapping DevIO="variable_name" Avro="2"/>
<Mapping DevIO="value" Avro="3"/>
<Mapping DevIO="status" Avro="4"/>
<TopicProperties>
<Property name="compression.type" value="snappy"/>
</TopicProperties>
</MachineGroup>
</DevioArchivesKafka>
Éléments de configuration
Balise <KafkaProperties>
Configure les propriétés globales du producteur Kafka.
| Propriété | Description | Exemple |
|---|---|---|
bootstrap.servers |
Liste des brokers Kafka | "kafka1:9092,kafka2:9092" |
key.serializer |
Sérialiseur des clés | StringSerializer |
value.serializer |
Sérialiseur des valeurs | ByteArraySerializer |
acks |
Niveau d'acquittement | "all", "1", "0" |
retries |
Nombre de tentatives | 3 |
batch.size |
Taille des lots | 16384 |
linger.ms |
Délai de lot | 5 |
buffer.memory |
Mémoire tampon | 33554432 |
compression.type |
Type de compression | "snappy", "gzip", "lz4" |
Balise <MachineGroup>
Définit un groupe de machines avec leur configuration Kafka.
| Attribut | Description | Valeurs possibles |
|---|---|---|
name |
Nom du groupe | Chaîne unique |
topic |
Topic Kafka de destination | Nom du topic |
avroFormat |
Format de sérialisation Avro | "binary", "json", "pretty" |
useKey |
Utiliser une clé de message | "true", "false" |
avroSchema |
Fichier de schéma Avro | Chemin vers .avsc |
Balise <Gate>
Définit une condition d'activation pour le groupe.
| Attribut | Description | Exemple |
|---|---|---|
DevIO |
Variable DevIO à surveiller | "production_mode" |
polarity |
Polarité de la condition | "true", "false" |
Balise <Mapping>
Mappe les champs DevIO vers les champs Avro.
| Attribut | Description | Exemple |
|---|---|---|
DevIO |
Champ source DevIO | "timestamp", "equipment_name" |
Avro |
Index du champ Avro | 0, 1, 2, etc. |
Balise <TopicProperties>
Propriétés spécifiques au topic (hérite des propriétés globales).
<TopicProperties>
<Property name="compression.type" value="snappy"/>
<Property name="max.request.size" value="1048576"/>
</TopicProperties>
Schémas Apache Avro
Format des schémas
Les schémas Avro définissent la structure des messages Kafka.
Exemple de schéma simple
{
"type": "record",
"name": "DevIOSensorData",
"namespace": "com.technilog.devio",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "equipment", "type": "string"},
{"name": "variable", "type": "string"},
{"name": "value", "type": "double"},
{"name": "status", "type": "string"}
]
}
Exemple de schéma complexe
{
"type": "record",
"name": "DevIOComplexData",
"namespace": "com.technilog.devio",
"fields": [
{"name": "timestamp", "type": "long"},
{"name": "equipment", "type": {
"type": "record",
"name": "Equipment",
"fields": [
{"name": "name", "type": "string"},
{"name": "location", "type": "string"},
{"name": "type", "type": "string"}
]
}},
{"name": "measurements", "type": {
"type": "array",
"items": {
"type": "record",
"name": "Measurement",
"fields": [
{"name": "variable", "type": "string"},
{"name": "value", "type": "double"},
{"name": "unit", "type": "string"},
{"name": "quality", "type": "int"}
]
}
}}
]
}
Formats de sérialisation
Binary (binaire)
- Performance : Format le plus compact et rapide
- Usage : Production, haute performance
- Lisibilité : Non lisible directement
JSON
- Performance : Plus volumineux mais lisible
- Usage : Debug, intégration simple
- Lisibilité : Format JSON standard
Pretty
- Performance : Format JSON indenté
- Usage : Debug, développement
- Lisibilité : JSON formaté avec indentation
Configuration via DevIO Studio
1. Activation du module
- Ouvrir le projet avec DevIO Studio
- Aller dans Configuration → Archives
- Ajouter une nouvelle archive de type Kafka
- Configurer les paramètres de connexion
2. Paramètres de base
Nom de l'archive : Kafka_Archive
Type : DevioArchivesKafka
Brokers Kafka : kafka1:9092,kafka2:9092
Topic par défaut : devio-data
Format Avro : binary
3. Configuration avancée
Schéma Avro : sensor-data.avsc
Compression : snappy
Acquittements : all
Buffer mémoire : 32MB
Exemples de configuration
Configuration basique
<?xml version="1.0" encoding="utf-8"?>
<DevioArchivesKafka>
<KafkaProperties>
<Property name="bootstrap.servers" value="localhost:9092"/>
<Property name="acks" value="1"/>
<Property name="retries" value="1"/>
</KafkaProperties>
<MachineGroup name="basic" topic="devio-basic" avroFormat="json" useKey="false">
<Mapping DevIO="timestamp" Avro="0"/>
<Mapping DevIO="equipment_name" Avro="1"/>
<Mapping DevIO="value" Avro="2"/>
</MachineGroup>
</DevioArchivesKafka>
Configuration haute performance
<?xml version="1.0" encoding="utf-8"?>
<DevioArchivesKafka>
<KafkaProperties>
<Property name="bootstrap.servers" value="kafka1:9092,kafka2:9092,kafka3:9092"/>
<Property name="acks" value="all"/>
<Property name="retries" value="5"/>
<Property name="batch.size" value="65536"/>
<Property name="linger.ms" value="10"/>
<Property name="buffer.memory" value="67108864"/>
<Property name="compression.type" value="snappy"/>
<Property name="max.in.flight.requests.per.connection" value="1"/>
</KafkaProperties>
<MachineGroup name="highperf" topic="devio-highperf" avroFormat="binary" useKey="true" avroSchema="highperf-schema.avsc">
<Gate DevIO="high_frequency_mode" polarity="true"/>
<Mapping DevIO="timestamp" Avro="0"/>
<Mapping DevIO="equipment_name" Avro="1"/>
<Mapping DevIO="variable_name" Avro="2"/>
<Mapping DevIO="value" Avro="3"/>
<Mapping DevIO="quality" Avro="4"/>
<TopicProperties>
<Property name="compression.type" value="lz4"/>
<Property name="max.request.size" value="2097152"/>
</TopicProperties>
</MachineGroup>
</DevioArchivesKafka>
Configuration sécurisée (SSL/SASL)
<?xml version="1.0" encoding="utf-8"?>
<DevioArchivesKafka>
<KafkaProperties>
<Property name="bootstrap.servers" value="secure-kafka1:9093,secure-kafka2:9093"/>
<Property name="security.protocol" value="SASL_SSL"/>
<Property name="sasl.mechanism" value="PLAIN"/>
<Property name="sasl.jaas.config" value="org.apache.kafka.common.security.plain.PlainLoginModule required username='devio' password='password123';"/>
<Property name="ssl.truststore.location" value="/path/to/kafka.client.truststore.jks"/>
<Property name="ssl.truststore.password" value="truststore_password"/>
<Property name="acks" value="all"/>
<Property name="retries" value="3"/>
</KafkaProperties>
<MachineGroup name="secure" topic="devio-secure" avroFormat="binary" useKey="true" avroSchema="secure-schema.avsc">
<Mapping DevIO="timestamp" Avro="0"/>
<Mapping DevIO="equipment_name" Avro="1"/>
<Mapping DevIO="variable_name" Avro="2"/>
<Mapping DevIO="value" Avro="3"/>
<Mapping DevIO="status" Avro="4"/>
</MachineGroup>
</DevioArchivesKafka>
Intégration avec l'écosystème Kafka
Kafka Connect
Source Connector (lecture depuis Kafka)
{
"name": "devio-source-connector",
"config": {
"connector.class": "io.confluent.connect.avro.AvroConverter",
"tasks.max": "3",
"topics": "devio-sensors",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Sink Connector (écriture vers base de données)
{
"name": "devio-jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "2",
"topics": "devio-sensors",
"connection.url": "jdbc:postgresql://localhost:5432/devio",
"auto.create": "true",
"auto.evolve": "true"
}
}
Kafka Streams
Traitement temps réel des données DevIO
StreamsBuilder builder = new StreamsBuilder();
KStream<String, DevIOSensorData> sensorStream = builder
.stream("devio-sensors", Consumed.with(Serdes.String(), avroSerde));
// Filtrage des valeurs anormales
KStream<String, DevIOSensorData> filteredStream = sensorStream
.filter((key, value) -> value.getValue() > 0 && value.getValue() < 1000);
// Agrégation par fenêtre temporelle
KTable<Windowed<String>, Double> averages = filteredStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.aggregate(
() -> 0.0,
(key, value, aggregate) -> (aggregate + value.getValue()) / 2
);
averages.toStream().to("devio-averages");
Schema Registry
Gestion des schémas Avro
# Enregistrement d'un schéma
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"DevIOSensorData\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"}]}"}' \
http://schema-registry:8081/subjects/devio-sensors-value/versions
# Évolution de schéma
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"DevIOSensorData\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"location\",\"type\":\"string\",\"default\":\"\"}]}"}' \
http://schema-registry:8081/subjects/devio-sensors-value/versions
Monitoring et observabilité
Métriques Kafka
Le module expose les métriques Kafka standard :
- Throughput : Messages/seconde, bytes/seconde
- Latence : Temps de production, batch.size moyen
- Erreurs : Taux d'erreur, retries
- Buffer : Utilisation mémoire, records en attente
Logs d'archivage
Les logs sont générés avec les niveaux : - ERROR : Erreurs de connexion, sérialisation - WARN : Avertissements de performance - INFO : Opérations normales, succès de publication - DEBUG : Détails techniques, mapping Avro
Surveillance des topics
# Vérification des topics
kafka-topics.sh --bootstrap-server localhost:9092 --list
# Consommation de test
kafka-avro-console-consumer --bootstrap-server localhost:9092 \
--topic devio-sensors \
--from-beginning \
--property schema.registry.url=http://localhost:8081
# Métriques des producteurs
kafka-run-class.sh kafka.tools.JmxTool \
--object-name kafka.producer:type=producer-metrics,client-id=devio-producer
Optimisation des performances
Configuration producteur
<!-- Optimisation pour débit élevé -->
<Property name="batch.size" value="131072"/> <!-- 128KB -->
<Property name="linger.ms" value="20"/> <!-- 20ms -->
<Property name="buffer.memory" value="134217728"/> <!-- 128MB -->
<Property name="compression.type" value="lz4"/>
<Property name="acks" value="1"/> <!-- Équilibre perf/fiabilité -->
Configuration réseau
<!-- Optimisation réseau -->
<Property name="send.buffer.bytes" value="262144"/> <!-- 256KB -->
<Property name="receive.buffer.bytes" value="262144"/> <!-- 256KB -->
<Property name="max.request.size" value="2097152"/> <!-- 2MB -->
<Property name="request.timeout.ms" value="30000"/> <!-- 30s -->
Partitioning
<!-- Partitioning par équipement -->
<Property name="partitioner.class" value="org.apache.kafka.clients.producer.internals.DefaultPartitioner"/>
Sécurité
SSL/TLS
<Property name="security.protocol" value="SSL"/>
<Property name="ssl.truststore.location" value="/path/to/truststore.jks"/>
<Property name="ssl.truststore.password" value="truststore_password"/>
<Property name="ssl.keystore.location" value="/path/to/keystore.jks"/>
<Property name="ssl.keystore.password" value="keystore_password"/>
<Property name="ssl.key.password" value="key_password"/>
SASL Authentication
<Property name="security.protocol" value="SASL_PLAINTEXT"/>
<Property name="sasl.mechanism" value="SCRAM-SHA-256"/>
<Property name="sasl.jaas.config" value="org.apache.kafka.common.security.scram.ScramLoginModule required username='devio' password='password123';"/>
FAQ et dépannage
Q : Erreur de connexion aux brokers Kafka ?
R : Vérifier bootstrap.servers, la connectivité réseau, et les ports (9092 par défaut).
Q : Messages perdus ?
R : Configurer acks=all et retries > 0 pour garantir la livraison.
Q : Performance dégradée ?
R : Ajuster batch.size, linger.ms, et buffer.memory selon la charge.
Q : Erreur de sérialisation Avro ?
R : Vérifier la compatibilité du schéma et les mappings DevIO → Avro.
Q : Problème JNI ?
R : Vérifier l'installation Java et les variables d'environnement JVM.
Annexes et références utiles
Documentation Apache Kafka
- Apache Kafka : Documentation officielle
- Kafka Producer API : Référence API producteur
- Apache Avro : Format de sérialisation
Outils de développement
- Kafka Tool : Interface graphique pour Kafka
- Confluent Control Center : Monitoring avancé
- Schema Registry UI : Gestion des schémas Avro
Écosystème Confluent
- Kafka Connect : Connecteurs pour intégration
- KSQL : SQL pour streams Kafka
- Kafka Streams : Traitement de flux