adesso Blog

In verteilten Anwendungen wie Microservice-Architekturen ist es oft erforderlich, dass sich Dienste gegenseitig über wichtige Geschäftsereignisse informieren. Ändert sich beispielsweise der Preis eines Artikels in einem zentralen Preisdienst, so ist dieses Änderungsereignis potenziell für einen Abrechnungsdienst relevant. Ein anderes Beispiel wäre ein Bestellereignis in einem Shop-Dienst: Dieses ist auch für den Abrechnungsdienst relevant, wenn dieser eine Rechnung dafür erstellen soll. Der Preis- und der Shop-Dienst sollten daher den Abrechnungsdienst per Nachricht über das Auftreten solcher technisch relevanten Ereignisse informieren. Solche Nachrichten werden oft als Integration Events bezeichnet.

Integration Events können technisch auf verschiedene Arten realisiert werden, unter anderem über Message Queues oder HTTP POST Requests. Dabei sollten jedoch bestimmte Konsistenzeigenschaften gewährleistet sein: Ein Integration Event sollte nur dann gesendet werden, wenn die Verarbeitung der Geschäftslogik im sendenden Service erfolgreich abgeschlossen wurde. Andernfalls, wenn beispielsweise das Speichern des neuen Preises aufgrund eines Datenbankausfalls fehlschlägt, darf kein entsprechendes Integration Event gesendet werden (sonst könnte es zum Beispiel passieren, dass eine Bestellung aus technischen Gründen scheitert, eine Rechnung aber trotzdem versendet wird). Andererseits darf das Integration Event nicht verloren gehen, wenn die Verarbeitung der Geschäftslogik erfolgreich war.

Wie von Vaughn Vernon in "Implementing Domain Driven Design", dem berühmten "roten Buch" des Domain Driven Designs, beschrieben, kann die konsistente Zustellung von Integration Events über einen "Event Store" realisiert werden. In meinem Blog-Beitrag beschriebe ich, wie dies in Java mit Hilfe von Spring realisiert werden kann. Dabei kommen neben Spring Data JPA insbesondere Spring Application Events zum Einsatz - wobei sich Application Events, wie im nächsten Abschnitt erläutert, von Integration Events unterscheiden.

Bevor wir in die Details gehen, eine Einordnung in das Spektrum der Push- und Pull-basierten Kommunikationsansätze: Die hier beschriebene Lösung ist immer dann anwendbar, wenn ein Dienst neu auftretende Events nach dem Push-Prinzip versendet. Dabei ist es unerheblich, ob der Dienst die Ereignisse direkt per HTTP-Request an einen anderen Dienst schickt oder ob er sie bei einem Message-Broker wie RabbitMQ publiziert, von wo aus sie erst indirekt an den oder die eigentlichen Empfänger gelangen.

Integration Events und Application Events

Zurück zu Integration Events und Application Events. Obwohl sie verwandt sind, gibt es einen grundlegenden Unterschied zwischen ihnen:

  • Integration Events dienen, wie bereits beschrieben, der Kommunikation zwischen verschiedenen Services.
  • Spring's Application Events sind ein Mittel, um Nachrichten innerhalb einer Java-Anwendung, also innerhalb eines einzelnen Services, auszutauschen. Hierfür stellt Spring Konstrukte wie die abstrakte Java-Klasse ApplicationEvent, das Interface ApplicationEventPublisher und die Annotation @EventListener zur Verfügung, die im weiteren Verlauf des Artikels näher beschrieben werden.

Application Events sind also konkrete Programmiersprachenkonstrukte, während Integration Events eher ein abstraktes Konzept sind, das (zumindest im Kontext dieses Blog-Beitrags) auf verschiedene Arten realisiert werden kann. Die folgende Abbildung zeigt den Unterschied zwischen Application Events und Integration Events:

Spring Application Events sind ein hervorragendes Mittel zur Entkopplung von Modulen innerhalb einer Java-Anwendung: Denn auch wenn zwei Module, Module1 und Module2, über Application Events miteinander kommunizieren, führt dies nicht zu starken Abhängigkeiten zwischen den beiden Modulen: Weder ruft ein Modul das andere auf, noch gibt es Import- oder Build-Abhängigkeiten. Das einzige, was beide Module kennen müssen, ist das Application Event und das Spring Framework.

Konsistente Integration Events auf Basis eines Event-Stores

Wie bereits in der Einleitung erwähnt, gibt es Herausforderungen bezüglich der Konsistenz beim Senden von Integration Events: Ein Integration Event darf nicht gesendet werden, wenn die zugrunde liegende Ausführung fehlgeschlagen ist. Im obigen Beispiel darf also kein Event für eine Preisänderung versendet werden, wenn der Commit für die Preisänderung im Price Service fehlschlägt. Wenn die Ausführung jedoch erfolgreich war, sollte ein Integration Event gesendet werden.

Prinzipiell könnten diese Konsistenzanforderungen durch die Verwendung verteilter Transaktionen, also über den XA-Standard auf Basis des 2-Phasen-Commit-Protokolls, sichergestellt werden. Verteilte Transaktionen haben jedoch einen zweifelhaften Ruf, unter anderem weil sie komplex zu implementieren sind und oft zu schlechter Performance führen. In vielen Fällen steht diese Option ohnehin nicht zur Verfügung, da die Rahmenbedingungen XA-Transaktionen nicht zulassen. In dem Projekt, das diesem Artikel zugrunde liegt, wurde beispielsweise REST als Technologie für die Implementierung von Integrationsereignissen definiert. REST beziehungsweise HTTP POST Requests unterstützen jedoch keine verteilten Transaktionen. Aber auch bei anderen Technologien wie Message Brokern kann nicht davon ausgegangen werden, dass sie XA-Transaktionen unterstützen - RabbitMQ unterstützt diese beispielsweise auch nicht.

Eine andere Möglichkeit, Konsistenz zu gewährleisten, ist der von Vernon in "Implementing Domain Driven Design" beschriebene Ansatz, der auf einem Event Store basiert. Er funktioniert wie folgt:

  • Alle Integrationsereignisse werden in dem Service, in dem sie auftreten, serialisiert und in der gleichen Datenbank wie die Anwendungsdaten gespeichert. Die Tabelle oder Tabellen, in denen die Integration Events gespeichert werden, bilden dabei den sogenannten Event Store. Dieser befindet sich im Beispiel also in der gleichen Datenbank wie die Preisdaten.
  • Die Integration Events werden in der gleichen Transaktion im Event Store persistiert, in der auch die fachlichen Anwendungsdaten persistiert werden.
  • Erst nach erfolgreichem Commit einer Transaktion werden die Integration Events tatsächlich versendet.

Diese Lösung stellt sicher, dass Integration Events nur dann versendet werden, wenn die Geschäftslogik im sendenden Service erfolgreich abgeschlossen wurde. Da die Integration Events sicher im Event Store persistiert werden, ist es zudem sehr einfach möglich, die Zustellung mehrfach zu versuchen und gegebenenfalls bei wiederholtem Scheitern in eine kontrollierte und protokollierte Fehlerbehandlung einzusteigen. Dies ist ein wesentlicher Vorteil gegenüber einer einfacheren Lösung ohne Event Store, bei der zunächst die Datenbanktransaktion abgeschlossen wird und dann die Integration Events aus den Kontextdaten des laufenden Requests oder Java-Threads versendet werden. Schlägt bei dieser Lösung (auch Best-Effort-1-Phase Commit genannt) das Senden fehl, geht das Integration Event einfach verloren.

Im Gegensatz zum 2-Phasen-Commit wird bei der beschriebenen Lösung übrigens kein Rollback der Geschäftslogik durchgeführt, wenn das Versenden des Integration Events fehlschlägt. Dies ist in vielen Fällen kein Nachteil - wenn beispielsweise eine Preisänderung oder eine Bestellung erst mit Verzögerung an den Abrechnungsdienst gemeldet wird, ist dies in der Regel akzeptabel bzw. besser, als wenn eine Bestellung abgelehnt werden muss, nur weil der Abrechnungsdienst gerade nicht erreichbar ist.

Umsetzung in Spring Data JPA und Application Events

Wir wollen nun den oben skizzierten Ansatz in Java umsetzen, basierend auf Spring Application Events und Data JPA. Dabei gehen wir davon aus, dass eine klassische relationale Datenbank verwendet wird und die Integration Events über HTTP POST Requests gesendet werden sollen.

Grundlage: Application Events in Spring

Spring bietet mit dem ApplicationEventPublisher ein Interface, mit dessen Methode publishEvent beliebige Objekte veröffentlicht werden können. Auf solche Events kann dann innerhalb der Spring-Anwendung auf verschiedene Arten reagiert werden.

Die einfachste Möglichkeit ist die Annotation @EventListener, mit der eine Methode als Event-Listener registriert werden kann. Über die Signatur der Methode, genauer über den Typ ihres Parameters, wird deklariert, auf welche Ereignisse sich der Event-Listener registriert. Der Aufruf eines so annotierten Event-Listeners erfolgt standardmäßig synchron, das heißt, der Service, der das Ereignis publiziert, wird erst dann weiter ausgeführt, wenn alle Listener abgearbeitet wurden.

Die Annotation @TransactionalEventListener erlaubt mehr Kontrolle: Hier kann die Verarbeitung des Events in den Kontext einer Transaktion gestellt werden und der Aufruf eines Listeners an eine bestimmte Phase der Transaktion gebunden werden. Beispielsweise legt @TransactionalEventListener(phase=TransactionPhase.AFTER_COMMIT) fest, dass der Listener erst nach einem erfolgreichen Commit aufgerufen werden soll. Alternativ kann die Ausführung auch in den Phasen BEFORE_COMMIT und AFTER_ROLLBACK erfolgen.

Spring verwendet diese Mechanismen übrigens intern selbst, der ApplicationContext publiziert beispielsweise die Framework-Events ContextStartedEvent und ContextRefreshedEvent.

Überblick über die Lösung

Unsere Implementierung nutzt die oben beschriebene Funktionalität von Spring wie folgt:

  • Das Fachmodul sendet das Integration Event mit den relevanten Informationen zur Preisänderung über einen ApplicationEventPublisher. Dies geschieht innerhalb der Transaktion, in der die Preisänderung per JPA persistiert wird.
  • Das Modul zur Behandlung von Integration Events, im Folgenden Integration Event Service genannt, hört mittels @EventListener auf das Event. Es serialisiert dieses und persistiert es über JPA in der gleichen Datenbank, in der auch die fachlichen Daten (im Beispiel also die Preise) persistiert werden. Da der @EventListener synchron arbeitet, geschieht dies innerhalb der gleichen Transaktion.

- Erst nach erfolgreichem Commit der Preisänderung (und des Events) wird das Event vom Integration Event Service an den (oder die) anderen Service(s) gesendet. Dies wird durch den @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) ausgelöst.

Die folgende Abbildung veranschaulicht dies:

Publikation von Integration Events im fachlichen Modul

Je nach Architekturstil kann das Ereignis im fachlichen Modul durch einen Service, eine DDD Entity oder ein anderes Konstrukt gesendet werden, die grundsätzliche Vorgehensweise ist jedoch die gleiche. Im Folgenden wird der Einfachheit halber von einem Service ausgegangen. Das Publizieren von Integration Events ist sehr einfach zu implementieren: Der ApplicationEventPublisher wird über Dependency Injection zur Verfügung gestellt, das Publizieren des Integration Events erfolgt einfach durch den Aufruf von publishEvent(). Der folgende Code zeigt dies exemplarisch im Kontext unseres Preisänderungsbeispiels:

	
		@Service
		public class PriceService {
		    @Autowired
		    ApplicationEventPublisher eventpublisher
		    @Transactional
		    public void setPrice(String articleId, BigInteger value) {
		        //Neuen Preis speichern
		        //...
		        //Event publizieren
		        publisher.publishEvent(
		          new IntegrationEvent("PriceUpdate " + articleId + "<-" + value));
		    }
	

Der Einfachheit halber besteht in diesem Code-Beispiel die Payload, also der Inhalt des Integration Events, nur aus einem String. In einer produktiven Anwendung wird sicherlich eine komplexere Struktur erforderlich sein. Übrigens besteht bei dieser Lösung keine Abhängigkeit des fachlichen Services vom IntegrationEventService - das fachliche Modul muss lediglich den Publisher und das IntegrationEvent kennen.

Persistieren der Events im Event-Store

Das im fachlichen Service publizierte Application Event kann im Integration Event Service wie erwähnt über den @EventListener abgefangen und persistiert werden. Da die Verarbeitung synchron erfolgt, geschieht dies in der gleichen Transaktion. Der Einfachheit halber wird angenommen, dass das Integration Event als @Entity annotiert wurde, so dass es wie folgt persistiert werden kann:

	
		@Service
		public class IntegrationEventService {
		    @EventListener
		    public void persistEvent(IntegrationEvent event) {
		        eventRepository.save(event);
		    }
		    //...
	

Die JPA Entity enthält neben der eigentlichen Payload (in unserem Beispiel nur ein String) sinnvollerweise noch einige weitere Informationen, insbesondere eine ID und einen Zeitstempel. Im folgenden Code gibt es noch ein boolsches Flag, das speichert, ob das Integration Event bereits ausgeliefert wurde.

	
		@Entity
		public class IntegrationEvent {
		    @Id
		    UID id;
		    @Column
		    String payload;
		    @Column
		    Instant timestamp;
		    @Column
		    boolean successfullyDispatched;
		    public IntegrationEvent(String payload) {        
		this(UUID.randomUUID(), payload, Instant.now(), false);    
		}
		}
	

Der Event Store ist damit schlicht ein JPA Repository für IntegrationEvents.

Dispatch der Integration Events

Erst nach erfolgreichem Commit der Transaktion wird das Event an andere Services weitergeleitet. Dazu wird im Integration Event Service die oben beschriebene @TransactionalEventListener-Annotation verwendet. Damit wird sichergestellt, dass das Senden nur dann erfolgt, wenn die Transaktion erfolgreich abgeschlossen wurde.

	
		    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
		    @Transactional(Transactional.TxType.REQUIRES_NEW)
		    public void sendEvent(IntegrationEvent event) {
		        //Event per HTTP POST verschicken
		        httpClient.send(...);
		        event.setSuccessfullyDispatched(true);
		        eventStore.save(event);        
		}    
		}
	

Im obenstehenden Code wird das Ereignis per HTTP-Request versendet, aber das Verfahren wäre analog, wenn es beispielsweise an einen RabbitMQ Message-Broker publiziert würde. Nach erfolgreichem Versand wird der Versandstatus aktualisiert und gespeichert.

Retry-Mechanismus

Wie bereits erwähnt, kann auf Basis des Event Stores ein Retry-Mechanismus implementiert werden, der im Falle eines fehlgeschlagenen Versands greift. Dies kann über die @Scheduled Annotation von Spring implementiert werden:

	
		    @Scheduled(cron = "0 0 3 * * *") //Retry every day at 3 am
		    public void watchdog() {
		        List<IntegrationEvent> undispatchedEvents = 
		          eventStore.findAllBySuccessfullyDispatched(false);
		        for (IntegrationEven event : undispatchedEvents) {
		            sendEvent(event);
		        }
		    }
	

Damit ist die exemplarische Umsetzung bereits abgeschlossen.

Ablauf in Spring

Trotz aller Vorteile von Spring macht es die „Magie“ von Spring (insbesondere die Verwendung von generierten Interceptoren zur Implementierung von Aspekten oder Annotationen) nicht immer einfach, den resultierenden Ablauf zu verstehen. Aus diesem Grund zeigt das folgende Diagramm, wie die wichtigsten Bausteine im „Happy Path“ interagieren, also in dem Szenario, in dem sowohl die Preisänderung als auch der Nachrichtenversand erfolgreich sind.

  • Die @Transactional-Annotation von setPrice() bewirkt, dass Spring einen Interceptor generiert, der die eigentliche Methode um die Transaktionsverarbeitung erweitert. Dieser Interceptor startet zunächst eine neue Datenbanktransaktion über den Spring TransactionManager.
  • Erst danach wird der eigentliche Methodenrumpf von setPrice() ausgeführt. Diese speichert den neuen Preis über JPA und veröffentlicht anschließend ein Event beim ApplicationEventPublisher.
  • Der ApplicationEventPublisher von Spring ruft daraufhin sofort die über @EventListener registrierte Methode persistEvent() auf. Diese speichert das Integration Event innerhalb derselben Datenbanktransaktion.
  • Zusätzlich sorgt der ApplicationEventPublisher beziehungsweise Spring noch dafür, dass das Event beim TransactionManager registriert wird, damit dieser später (nach erfolgreichem Commit) den @TransactionalEventListener auslösen kann.
  • Nun wird die Ausführung von setPrice() beendet und der Spring Interceptor löst beim TransactionManager einen Commit der Datenbanktransaktion aus.
  • Der TransactionManager führt den Commit über JDBC aus. Erst nach erfolgreichem Commit ruft er die mittels @TransactionalEventListener registrierte Methode sendEvent() auf.
  • Diese verpackt und sendet das IntegrationEvent als HTTP-Nachricht. Nach erfolgreichem Commit aktualisiert sie den Zustellstatus des IntegrationEvents und speichert diesen über JPA.

Diskussion

Eine Besonderheit der beschriebenen Lösung ist noch diskussionswürdig: Es kann vorkommen, dass ein Integration Event mehrfach versendet wird: Dies geschieht unter anderem dann, wenn nach dem Versand die Persistierung der Information über den Versand fehlschlägt. Auch wenn dies nicht ideal ist, können Duplikate anhand der ID des Integration Events im empfangenden Dienst leicht erkannt und ignoriert werden.

Eine produktive Implementierung wird sicherlich komplexer sein als die hier skizzierte Minimallösung. Die Integration Events werden mehr Struktur haben als im obigen Beispiel, wo sie nur aus einem String bestehen. Sie werden möglicherweise nicht direkt persistierbar sein, sondern müssen erst auf JPA Entities abgebildet werden. Es könnte ein Zähler für Retries gespeichert werden, anstatt die erfolgreiche Zustellung über einen Boolean abzubilden. Man könnte zwischen mehreren Empfängern unterscheiden wollen, etc. Dennoch bleibt die Implementierung von Integration Events auf Basis von Spring Application Events und einem Event Store ein konzeptionell einfacher und daher empfehlenswerter Ansatz für die konsistente, push-basierte Zustellung von Integration Events in verteilten Anwendungen.

Bild Henrik Grosskreutz

Autor Henrik Grosskreutz

Henrik Grosskreutz ist Softwarearchitekt bei adesso und verfügt über 20 Jahre Erfahrung im Bereich der agilen Softwareentwicklung. Seine Schwerpunkte liegen in den Bereichen Java und Softwarearchitektur - zuletzt meist im Kontext von verteilten Systemen, Containerisierung und Cloud-Technologien.

Diese Seite speichern. Diese Seite entfernen.