Kafka@Docker - eine erste Verwendung mit Spring Cloud Streaming
In Zeiten der Microservice-Architekturen wird es immer schwieriger den Überblick über die Informationsflüsse zu behalten.
Online Verarbeitungen können über REST Schnittstellen zu Kopplungen von Systemen verschiedener Verantwortlicher Teams führen, was zu fragilen Prozessen führt.
lose Kopplung, Orchestrierung, Event-getriebene Prozesse
Diese Schlagwörter können das beschriebene Problem lösen und ganz neue Möglichkeiten eröffnen.
lose Kopplung
von Services kann bspw. mit einer Service Discovery wie Netflix Eureka erreicht werden.
Orchestrierung
von Services mit Mesos und DC/OS, oder Kubernetes und OpenShift, sowie DockerSwarm sind bekannte und vielfach verwendete Plattformen.
Event-getriebene Prozesse
mit Messaging Systemen wie RabbitMQ, ActiveMQ, SonicMQ oder Apache Kafka.
Vor lauter Möglichkeiten verliert man dabei schnell das eigentliche Ziel aus den Augen und verliert sich womöglich in architektonischen Schönheiten und Dilemma Fragestellungen.
In diesem Beitrag soll es um eine konkrete Problemstellung gehen, die nachfolgend in einer Gherkin Notation beschrieben ist. Dabei dienen mehrere Szenarien zur Darstellung der Anforderung
Die Anforderung
Wenn der Webaufruf http://localhost:8080/source/forward?message=HelloWorld
gesendet wird
Dann empfängt ein Prozess diese Nachricht
Und leitet sie an einen Prozessor weiter
Und dieser gibt die Nachricht ausGegeben sei der Zustand des Prozessors soll keinen Einfluss auf das Empfangen von Nachrichten haben
Wenn der Webaufruf http://localhost:8080/source/forward?message=HelloWorld
gesendet wird
Dann empfängt ein Prozess diese Nachricht
Und leitet sie an einen Prozessor weiter
Und dieser gibt nichts aus, da er abgeschaltet istDie Architektur
Aus der ersten Anforderung leitet sich ein kleiner REST Service mit einer Request Mathode ab.
Die zweite Anforderung relativiert dieses direkt und splittet den Prozess implizit in zwei entkoppelte Stufen.
Wir müssen also in
Empfangen von Nachrichten und
Verarbeiten von Nachrichten unterscheiden.
Um diese Unterscheidung deutlicher darzustellen, separieren wir direkt in zwei Services.
Damit nun die Datenquelle mit dem Prozessor Informationen austauschen kann, bedarf es einem Zwischenspeicher.
Gemäß der Anforderung würde eine einfache Datenbank denkbar sein, aber auch Queue Systeme sind im Lösungsraum enthalten und deutlich passender als eine Datenbank. Dieser Bericht dreht sich um Kafka, also entscheiden wir uns auch für Kafka als Messaging Service. Art und Frequenz der Datenüberaging sind dabei vorerst irrelevante Parameter.
Was ist Kafka?
Kafka ist ein hoch performanter Message Streaming Dienst von Apache, welcher neben einfachem Messaging ebenfalls komplexes Streaming ermöglicht.
Da man mit der Kafka Architektur und Konfiguration ganze Bücher füllen kann, verweise ich für den Beginn auf
Kafka in a nutshell .
Konkrete Implementierungsdetails können der am Ende angegebenen Code Referenz entnommen werden.
Source, Process, Sink
Kafka bildet Nachrichten / Events in Prozessketten ab.
- Source ist der Daten-erzeugende Prozessschritt. Also der Schritt, der das Event in Kafka auslöst.
- Process(es) kann es beliebig viele geben. Sie reagieren auf Events im Stream und verändern / erweitern diese.
- Sink nimmt final das Event an und sichert den Status. Dieses kann ein Persistieren, als auch gleichzeitig die Source einer neuen Prozesskette sein.
Let's code...
startup kafka
Als Messaging-Dienst sollte Kafka (benötigt immer Zookeeper) immer mind. als 3er Gespann aufgebaut werden. So können über die Instanzen Repliaktionen ausfallsicher verteilt werden.
Für unseren Test verzichten wir auf Ausfallsicherheit sowie Sicherheitsmaßnahmen wie Authentifizierung und Verschlüsselung.
In der
Code Referenz begeben wir uns in das Verzeichnis
kafka
.
#zuerst modifiziere die docker-compose.yml und für deine HOST IP als KAFKA_ADVERTISED_HOST_NAME ein
docker-compose up -d
Durch den Befehl
docker ps -a
sehen wir nun zwei gestartete Container (kafka und seinen Zookeeper)
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
726f03575ff3 sredelin/kafka:scala-2.12-kafka-1.0.0 "start-kafka.sh" 9 hours ago Up 9 hours 0.0.0.0:9092->9092/tcp kafka_kafka_1
d4e0554c04ce sredelin/zookeeper:3.4.11 "/usr/bin/start.sh" 9 hours ago Up 9 hours 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp kafka_zookeeper_1
Mittels
docker logs -f kafka_kafka_1
sollte in den letzten 10-15 Zeilen ersichtlich sein, dass die beiden Testtopics angelegt wurden.
Die Datenquelle (Source)
Eine Datenquelle erzeugt Daten. In unserem Fall ist dieses die SpringBoot Applikation im Verzeichnis
SpringDataFlowSource
.
Das Gradle Projekt kann bspw. für Eclipse mit dem vorhandenen Wrapper in Unix einfach mit
./gradlew eclipse
aufgebaut werden.
Gestartet wird die Application mit
./gradlew bootRun
und ist danach über
http://localhost:8080/source/forward?message=HelloWorld erreichbar.
Durch den Aufruf des genannten Pfades werden im
de.redelin.Controller
Nachrichten für zwei Prozessketten 10-fach dupliziert erzeugt.
Zwei Producer
de.redelin.DataProducer
und
de.redelin.AnotherDataProducer
senden dabei Nachrichten für zwei verschiedene Topics.
Die Producer selbst sind fast identisch, nur ihre Basis unterscheidet sich entscheidend.
@EnableBinding(Source.class)
public class DataProducer {
final static Logger LOG = LoggerFactory.getLogger(DataProducer.class);
@Autowired
Source source;
public void sendMessage(HolyMessage message) {
LOG.info("Log message: " + message.getText());
source.output().send(MessageBuilder.withPayload(message).build());
}
}
Das Binding an das Interface
org.springframework.cloud.stream.messaging.Source
zeigt uns den Weg zur Konfiguration.
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
Das Interface
Source
definiert also nur, dass es sich um eine Datenausgabe (Annotation
@Output
) mit einem konstanten String "output" handelt.
Vergleichen wir dazu die
application.yml
des Projektes sehen wir diese Konfiguration wieder.
Die Anwendung des Interfaces zeigt Spring Cloud Data Flow also welches Kafka Topic mit welchen Einstellungen für welchen Zweck verwendet werden soll.
spring:
application:
name: source # source is at the beginning of a process (source -> processor -> sink || source -> sink)
cloud:
stream:
bindings:
output: # topic-binding one based on Source.class
binder: kafka
destination: testOne # name of topic to sent with
anotheroutput: #topic-binding two based on AnotherSource.class
binder: kafka
destination: testTwo # name of topic to sent with
Die Datensenke (Sink)
Um die Funktionsweise kürzer zu beschreiben, verzichten wir auf die Ausprägung
Process
und bilden nur eine Prozesskette in Minimalkonfiguration mit
Source
und
Sink
ab.
Im Gradle Projekt
SpringDataFlowProcess
sollen die Daten angenommen und abschließend verarbeitet werden.
Die Klasse
de.redelin.DataConsumer
zeigt, wie der Datenempfang konfiguriert wird. Wie bei der Source sehen wir hier eine Verwendung von Binding-Annotationen.
@EnableBinding(Sink.class)
public class DataConsumer {
final static Logger LOG = LoggerFactory.getLogger(DataConsumer.class);
@ServiceActivator(inputChannel = Sink.INPUT)
public void receiveMessage(final HolyMessage message) {
LOG.info(String.format("Message is: %s with %d for direction %s", message.getText(), message.getId(),
message.getDirectionKey()));
/*
* message with id 2 should not be processable, so we can see an error
* message in processing and how kafka and spring cloud data flow react.
* Exception-Handling is bad in this implementation, because normally as
* example an error-topic should be used.
*/
if (message.getId() == 2) {
LOG.error("ID 2 not legal!");
throw new IllegalArgumentException("My Exception");
}
}
}
Schaut man sich das Interface
org.springframework.cloud.stream.messaging.Sink
an, sieht man, dass dieses mit
@Input
annotiert und einem fixen String "input" versehen ist.
Auch diese Referenz finden wir in der
application.yml
wieder.
spring:
application:
name: sink # sink is at the end of a process (source -> processor -> sink || source -> sink)
cloud:
stream:
bindings:
input: # topic-binding one based on Sink.class
binder: kafka
destination: testOne # name of topic to receive from
group: groupA # group has its own offset in kafka
anotherinput: #topic-binding two based on AnotherSink.class
binder: kafka
destination: testTwo # name of topic to receive from
group: groupB # group has its own offset in kafka
Wir sehen hier, dass Kafka pro Topic und Gruppe Offsets definiert, wodurch wir sehr einfach einstellen können, ob mehrere Prozesse parallel Daten verarbeiten oder verschiedene Prozesse die gleichen Daten interpretieren sollen. Fällt in dieser Kette der Prozess "Datenempfänger" aus und startet später wieder, erhält er alle Daten, die seit der letzten Übertragung neu hinzugekommen sind.
Ausprobieren und Testen hilft dem Verständnis
Probiere diesen kleinen Einblick in die Verarbeitung von Datenströmen und asynchronen Kommunikationen gerne aus. Es gibt unzählige Anwendungsszenarien und sicherlich nicht immer den EINEN Weg das Ziel zu erreichen.
Wir haben während unseres Ausfluges in diese Welt jedenfalls noch folgende wichtige Dinge festgestellt.
- Ohne Gruppennamen in den Datenempfängern ist der Offset immer
latest
beim Starten
- Es muss zwingend ein "Retry" Prozess implementiert werden, da Kafka im Gegensatz zu Queues ein Stream ist und fehlerhafte Nachrichten nach mehrmaligem Versuch einfach überlesen werden. (siehe Retry operations
- Alte Nachrichten mit Objekten, die nicht zu neuen Consumern passen, führen zu extremen Problemen für neue UserGroups.
- Doppelverarbeitung von Nachrichten bei mehreren Consumern der gleichen userGroup ist durch Kafka ausgeschlossen
- Mounte das Docker-Verzeichnis
/kafka
in ein docker volume
und du wirst keine Datenverluste beim Neustart deiner Instanzen haben
Code Referenz
https://bitbucket.org/firefighter-97/kafka/src