31. Januar 2024 von Henrik Grosskreutz
Konsistente Zustellung von Integration Events mittels Event Store und Spring Application Events
In verteilten Anwendungen wie Microservice-Architekturen ist es oft erforderlich, dass sich Dienste gegenseitig über wichtige Geschäftsereignisse informieren. Ändert sich beispielsweise der Preis eines Artikels in einem zentralen Preisdienst, so ist dieses Änderungsereignis potenziell für einen Abrechnungsdienst relevant. Ein anderes Beispiel wäre ein Bestellereignis in einem Shop-Dienst: Dieses ist auch für den Abrechnungsdienst relevant, wenn dieser eine Rechnung dafür erstellen soll. Der Preis- und der Shop-Dienst sollten daher den Abrechnungsdienst per Nachricht über das Auftreten solcher technisch relevanten Ereignisse informieren. Solche Nachrichten werden oft als Integration Events bezeichnet.
Integration Events können technisch auf verschiedene Arten realisiert werden, unter anderem über Message Queues oder HTTP POST Requests. Dabei sollten jedoch bestimmte Konsistenzeigenschaften gewährleistet sein: Ein Integration Event sollte nur dann gesendet werden, wenn die Verarbeitung der Geschäftslogik im sendenden Service erfolgreich abgeschlossen wurde. Andernfalls, wenn beispielsweise das Speichern des neuen Preises aufgrund eines Datenbankausfalls fehlschlägt, darf kein entsprechendes Integration Event gesendet werden (sonst könnte es zum Beispiel passieren, dass eine Bestellung aus technischen Gründen scheitert, eine Rechnung aber trotzdem versendet wird). Andererseits darf das Integration Event nicht verloren gehen, wenn die Verarbeitung der Geschäftslogik erfolgreich war.
Wie von Vaughn Vernon in "Implementing Domain Driven Design", dem berühmten "roten Buch" des Domain Driven Designs, beschrieben, kann die konsistente Zustellung von Integration Events über einen "Event Store" realisiert werden. In meinem Blog-Beitrag beschriebe ich, wie dies in Java mit Hilfe von Spring realisiert werden kann. Dabei kommen neben Spring Data JPA insbesondere Spring Application Events zum Einsatz - wobei sich Application Events, wie im nächsten Abschnitt erläutert, von Integration Events unterscheiden.
Bevor wir in die Details gehen, eine Einordnung in das Spektrum der Push- und Pull-basierten Kommunikationsansätze: Die hier beschriebene Lösung ist immer dann anwendbar, wenn ein Dienst neu auftretende Events nach dem Push-Prinzip versendet. Dabei ist es unerheblich, ob der Dienst die Ereignisse direkt per HTTP-Request an einen anderen Dienst schickt oder ob er sie bei einem Message-Broker wie RabbitMQ publiziert, von wo aus sie erst indirekt an den oder die eigentlichen Empfänger gelangen.
Integration Events und Application Events
Zurück zu Integration Events und Application Events. Obwohl sie verwandt sind, gibt es einen grundlegenden Unterschied zwischen ihnen:
- Integration Events dienen, wie bereits beschrieben, der Kommunikation zwischen verschiedenen Services.
- Spring's Application Events sind ein Mittel, um Nachrichten innerhalb einer Java-Anwendung, also innerhalb eines einzelnen Services, auszutauschen. Hierfür stellt Spring Konstrukte wie die abstrakte Java-Klasse
ApplicationEvent
, das InterfaceApplicationEventPublisher
und die Annotation@EventListener
zur Verfügung, die im weiteren Verlauf des Artikels näher beschrieben werden.
Application Events sind also konkrete Programmiersprachenkonstrukte, während Integration Events eher ein abstraktes Konzept sind, das (zumindest im Kontext dieses Blog-Beitrags) auf verschiedene Arten realisiert werden kann. Die folgende Abbildung zeigt den Unterschied zwischen Application Events und Integration Events:
Spring Application Events sind ein hervorragendes Mittel zur Entkopplung von Modulen innerhalb einer Java-Anwendung: Denn auch wenn zwei Module, Module1 und Module2, über Application Events miteinander kommunizieren, führt dies nicht zu starken Abhängigkeiten zwischen den beiden Modulen: Weder ruft ein Modul das andere auf, noch gibt es Import- oder Build-Abhängigkeiten. Das einzige, was beide Module kennen müssen, ist das Application Event und das Spring Framework.
Konsistente Integration Events auf Basis eines Event-Stores
Wie bereits in der Einleitung erwähnt, gibt es Herausforderungen bezüglich der Konsistenz beim Senden von Integration Events: Ein Integration Event darf nicht gesendet werden, wenn die zugrunde liegende Ausführung fehlgeschlagen ist. Im obigen Beispiel darf also kein Event für eine Preisänderung versendet werden, wenn der Commit für die Preisänderung im Price Service fehlschlägt. Wenn die Ausführung jedoch erfolgreich war, sollte ein Integration Event gesendet werden.
Prinzipiell könnten diese Konsistenzanforderungen durch die Verwendung verteilter Transaktionen, also über den XA-Standard auf Basis des 2-Phasen-Commit-Protokolls, sichergestellt werden. Verteilte Transaktionen haben jedoch einen zweifelhaften Ruf, unter anderem weil sie komplex zu implementieren sind und oft zu schlechter Performance führen. In vielen Fällen steht diese Option ohnehin nicht zur Verfügung, da die Rahmenbedingungen XA-Transaktionen nicht zulassen. In dem Projekt, das diesem Artikel zugrunde liegt, wurde beispielsweise REST als Technologie für die Implementierung von Integrationsereignissen definiert. REST beziehungsweise HTTP POST Requests unterstützen jedoch keine verteilten Transaktionen. Aber auch bei anderen Technologien wie Message Brokern kann nicht davon ausgegangen werden, dass sie XA-Transaktionen unterstützen - RabbitMQ unterstützt diese beispielsweise auch nicht.
Eine andere Möglichkeit, Konsistenz zu gewährleisten, ist der von Vernon in "Implementing Domain Driven Design" beschriebene Ansatz, der auf einem Event Store basiert. Er funktioniert wie folgt:
- Alle Integrationsereignisse werden in dem Service, in dem sie auftreten, serialisiert und in der gleichen Datenbank wie die Anwendungsdaten gespeichert. Die Tabelle oder Tabellen, in denen die Integration Events gespeichert werden, bilden dabei den sogenannten Event Store. Dieser befindet sich im Beispiel also in der gleichen Datenbank wie die Preisdaten.
- Die Integration Events werden in der gleichen Transaktion im Event Store persistiert, in der auch die fachlichen Anwendungsdaten persistiert werden.
- Erst nach erfolgreichem Commit einer Transaktion werden die Integration Events tatsächlich versendet.
Diese Lösung stellt sicher, dass Integration Events nur dann versendet werden, wenn die Geschäftslogik im sendenden Service erfolgreich abgeschlossen wurde. Da die Integration Events sicher im Event Store persistiert werden, ist es zudem sehr einfach möglich, die Zustellung mehrfach zu versuchen und gegebenenfalls bei wiederholtem Scheitern in eine kontrollierte und protokollierte Fehlerbehandlung einzusteigen. Dies ist ein wesentlicher Vorteil gegenüber einer einfacheren Lösung ohne Event Store, bei der zunächst die Datenbanktransaktion abgeschlossen wird und dann die Integration Events aus den Kontextdaten des laufenden Requests oder Java-Threads versendet werden. Schlägt bei dieser Lösung (auch Best-Effort-1-Phase Commit genannt) das Senden fehl, geht das Integration Event einfach verloren.
Im Gegensatz zum 2-Phasen-Commit wird bei der beschriebenen Lösung übrigens kein Rollback der Geschäftslogik durchgeführt, wenn das Versenden des Integration Events fehlschlägt. Dies ist in vielen Fällen kein Nachteil - wenn beispielsweise eine Preisänderung oder eine Bestellung erst mit Verzögerung an den Abrechnungsdienst gemeldet wird, ist dies in der Regel akzeptabel bzw. besser, als wenn eine Bestellung abgelehnt werden muss, nur weil der Abrechnungsdienst gerade nicht erreichbar ist.
Umsetzung in Spring Data JPA und Application Events
Wir wollen nun den oben skizzierten Ansatz in Java umsetzen, basierend auf Spring Application Events und Data JPA. Dabei gehen wir davon aus, dass eine klassische relationale Datenbank verwendet wird und die Integration Events über HTTP POST Requests gesendet werden sollen.
Grundlage: Application Events in Spring
Spring bietet mit dem ApplicationEventPublisher
ein Interface, mit dessen Methode publishEvent
beliebige Objekte veröffentlicht werden können. Auf solche Events kann dann innerhalb der Spring-Anwendung auf verschiedene Arten reagiert werden.
Die einfachste Möglichkeit ist die Annotation @EventListener
, mit der eine Methode als Event-Listener registriert werden kann. Über die Signatur der Methode, genauer über den Typ ihres Parameters, wird deklariert, auf welche Ereignisse sich der Event-Listener registriert. Der Aufruf eines so annotierten Event-Listeners erfolgt standardmäßig synchron, das heißt, der Service, der das Ereignis publiziert, wird erst dann weiter ausgeführt, wenn alle Listener abgearbeitet wurden.
Die Annotation @TransactionalEventListener
erlaubt mehr Kontrolle: Hier kann die Verarbeitung des Events in den Kontext einer Transaktion gestellt werden und der Aufruf eines Listeners an eine bestimmte Phase der Transaktion gebunden werden. Beispielsweise legt @TransactionalEventListener(phase=TransactionPhase.AFTER_COMMIT)
fest, dass der Listener erst nach einem erfolgreichen Commit aufgerufen werden soll. Alternativ kann die Ausführung auch in den Phasen BEFORE_COMMIT
und AFTER_ROLLBACK
erfolgen.
Spring verwendet diese Mechanismen übrigens intern selbst, der ApplicationContext publiziert beispielsweise die Framework-Events ContextStartedEvent
und ContextRefreshedEvent
.
Überblick über die Lösung
Unsere Implementierung nutzt die oben beschriebene Funktionalität von Spring wie folgt:
- Das Fachmodul sendet das Integration Event mit den relevanten Informationen zur Preisänderung über einen ApplicationEventPublisher. Dies geschieht innerhalb der Transaktion, in der die Preisänderung per JPA persistiert wird.
- Das Modul zur Behandlung von Integration Events, im Folgenden Integration Event Service genannt, hört mittels
@EventListener
auf das Event. Es serialisiert dieses und persistiert es über JPA in der gleichen Datenbank, in der auch die fachlichen Daten (im Beispiel also die Preise) persistiert werden. Da der@EventListener
synchron arbeitet, geschieht dies innerhalb der gleichen Transaktion.
- Erst nach erfolgreichem Commit der Preisänderung (und des Events) wird das Event vom Integration Event Service an den (oder die) anderen Service(s) gesendet. Dies wird durch den @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
ausgelöst.
Die folgende Abbildung veranschaulicht dies:
Publikation von Integration Events im fachlichen Modul
Je nach Architekturstil kann das Ereignis im fachlichen Modul durch einen Service, eine DDD Entity oder ein anderes Konstrukt gesendet werden, die grundsätzliche Vorgehensweise ist jedoch die gleiche. Im Folgenden wird der Einfachheit halber von einem Service ausgegangen. Das Publizieren von Integration Events ist sehr einfach zu implementieren: Der ApplicationEventPublisher
wird über Dependency Injection zur Verfügung gestellt, das Publizieren des Integration Events erfolgt einfach durch den Aufruf von publishEvent()
. Der folgende Code zeigt dies exemplarisch im Kontext unseres Preisänderungsbeispiels:
@Service
public class PriceService {
@Autowired
ApplicationEventPublisher eventpublisher
@Transactional
public void setPrice(String articleId, BigInteger value) {
//Neuen Preis speichern
//...
//Event publizieren
publisher.publishEvent(
new IntegrationEvent("PriceUpdate " + articleId + "<-" + value));
}
Der Einfachheit halber besteht in diesem Code-Beispiel die Payload, also der Inhalt des Integration Events, nur aus einem String. In einer produktiven Anwendung wird sicherlich eine komplexere Struktur erforderlich sein. Übrigens besteht bei dieser Lösung keine Abhängigkeit des fachlichen Services vom IntegrationEventService - das fachliche Modul muss lediglich den Publisher und das IntegrationEvent kennen.
Persistieren der Events im Event-Store
Das im fachlichen Service publizierte Application Event kann im Integration Event Service wie erwähnt über den @EventListener
abgefangen und persistiert werden. Da die Verarbeitung synchron erfolgt, geschieht dies in der gleichen Transaktion. Der Einfachheit halber wird angenommen, dass das Integration Event als @Entity annotiert wurde, so dass es wie folgt persistiert werden kann:
@Service
public class IntegrationEventService {
@EventListener
public void persistEvent(IntegrationEvent event) {
eventRepository.save(event);
}
//...
Die JPA Entity enthält neben der eigentlichen Payload (in unserem Beispiel nur ein String) sinnvollerweise noch einige weitere Informationen, insbesondere eine ID und einen Zeitstempel. Im folgenden Code gibt es noch ein boolsches Flag, das speichert, ob das Integration Event bereits ausgeliefert wurde.
@Entity
public class IntegrationEvent {
@Id
UID id;
@Column
String payload;
@Column
Instant timestamp;
@Column
boolean successfullyDispatched;
public IntegrationEvent(String payload) {
this(UUID.randomUUID(), payload, Instant.now(), false);
}
}
Der Event Store ist damit schlicht ein JPA Repository für IntegrationEvents.
Dispatch der Integration Events
Erst nach erfolgreichem Commit der Transaktion wird das Event an andere Services weitergeleitet. Dazu wird im Integration Event Service die oben beschriebene @TransactionalEventListener-Annotation verwendet. Damit wird sichergestellt, dass das Senden nur dann erfolgt, wenn die Transaktion erfolgreich abgeschlossen wurde.
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Transactional(Transactional.TxType.REQUIRES_NEW)
public void sendEvent(IntegrationEvent event) {
//Event per HTTP POST verschicken
httpClient.send(...);
event.setSuccessfullyDispatched(true);
eventStore.save(event);
}
}
Im obenstehenden Code wird das Ereignis per HTTP-Request versendet, aber das Verfahren wäre analog, wenn es beispielsweise an einen RabbitMQ Message-Broker publiziert würde. Nach erfolgreichem Versand wird der Versandstatus aktualisiert und gespeichert.
Retry-Mechanismus
Wie bereits erwähnt, kann auf Basis des Event Stores ein Retry-Mechanismus implementiert werden, der im Falle eines fehlgeschlagenen Versands greift. Dies kann über die @Scheduled Annotation von Spring implementiert werden:
@Scheduled(cron = "0 0 3 * * *") //Retry every day at 3 am
public void watchdog() {
List<IntegrationEvent> undispatchedEvents =
eventStore.findAllBySuccessfullyDispatched(false);
for (IntegrationEven event : undispatchedEvents) {
sendEvent(event);
}
}
Damit ist die exemplarische Umsetzung bereits abgeschlossen.
Ablauf in Spring
Trotz aller Vorteile von Spring macht es die „Magie“ von Spring (insbesondere die Verwendung von generierten Interceptoren zur Implementierung von Aspekten oder Annotationen) nicht immer einfach, den resultierenden Ablauf zu verstehen. Aus diesem Grund zeigt das folgende Diagramm, wie die wichtigsten Bausteine im „Happy Path“ interagieren, also in dem Szenario, in dem sowohl die Preisänderung als auch der Nachrichtenversand erfolgreich sind.
- Die
@Transactional-Annotation
vonsetPrice()
bewirkt, dass Spring einen Interceptor generiert, der die eigentliche Methode um die Transaktionsverarbeitung erweitert. Dieser Interceptor startet zunächst eine neue Datenbanktransaktion über den SpringTransactionManager
. - Erst danach wird der eigentliche Methodenrumpf von
setPrice()
ausgeführt. Diese speichert den neuen Preis über JPA und veröffentlicht anschließend ein Event beimApplicationEventPublisher
. - Der
ApplicationEventPublisher
von Spring ruft daraufhin sofort die über@EventListener
registrierte MethodepersistEvent()
auf. Diese speichert das Integration Event innerhalb derselben Datenbanktransaktion. - Zusätzlich sorgt der
ApplicationEventPublisher
beziehungsweise Spring noch dafür, dass das Event beimTransactionManager
registriert wird, damit dieser später (nach erfolgreichem Commit) den@TransactionalEventListener
auslösen kann. - Nun wird die Ausführung von
setPrice()
beendet und der Spring Interceptor löst beimTransactionManager
einen Commit der Datenbanktransaktion aus. - Der TransactionManager führt den Commit über JDBC aus. Erst nach erfolgreichem Commit ruft er die mittels
@TransactionalEventListener
registrierte MethodesendEvent()
auf. - Diese verpackt und sendet das
IntegrationEvent
als HTTP-Nachricht. Nach erfolgreichem Commit aktualisiert sie den Zustellstatus desIntegrationEvents
und speichert diesen über JPA.
Diskussion
Eine Besonderheit der beschriebenen Lösung ist noch diskussionswürdig: Es kann vorkommen, dass ein Integration Event mehrfach versendet wird: Dies geschieht unter anderem dann, wenn nach dem Versand die Persistierung der Information über den Versand fehlschlägt. Auch wenn dies nicht ideal ist, können Duplikate anhand der ID des Integration Events im empfangenden Dienst leicht erkannt und ignoriert werden.
Eine produktive Implementierung wird sicherlich komplexer sein als die hier skizzierte Minimallösung. Die Integration Events werden mehr Struktur haben als im obigen Beispiel, wo sie nur aus einem String bestehen. Sie werden möglicherweise nicht direkt persistierbar sein, sondern müssen erst auf JPA Entities abgebildet werden. Es könnte ein Zähler für Retries gespeichert werden, anstatt die erfolgreiche Zustellung über einen Boolean abzubilden. Man könnte zwischen mehreren Empfängern unterscheiden wollen, etc. Dennoch bleibt die Implementierung von Integration Events auf Basis von Spring Application Events und einem Event Store ein konzeptionell einfacher und daher empfehlenswerter Ansatz für die konsistente, push-basierte Zustellung von Integration Events in verteilten Anwendungen.