Teil 6) Multithreading in Java
(Completable) Futures
Im letzten Artikel haben wir uns Threadpools, als einen ExecutorService, der mehrere verschiedene Aufgaben gleichzeitig abarbeitet, angesehen. Anwendungsbeispiel war ein Upload von mehreren Dateien; in diesem Fall eine sog. “fire and forget” Aktion.
📖 Fire and forget
Programmabschnitte von denen man sich keine Rückmeldung erwartet, z.B. das Senden einer E-Mail. Diese Aufgaben können meistens, sehr einfach, asynchron abgearbeitet werden.
Wenn man jedoch wissen möchte, ob die Aufgaben erfolgreich durchgeführt wurden, oder sie sogar eine explizite Antwort zurückgeben, muss man das Gegenteil verwenden. Futures.
Ein Thread mit einer Future: KI generierte Grafiken von OpenAI, August 2025
Beispiel: Dateidownload
In unserem Anwendungsbeispiel, dem Mapper, wollen wir für einen Datensatz eine Reihe von Dateien herunterladen. Hierfür wäre es schön, wenn wir wüssten, wie man Arbeitsergebnisse aus einem ExecutorService weiterverarbeitet, da wir in diesem Beispiel unsere heruntergeladenen Dateien danach woanders hochladen wollen.
Arbeitsergebnisse
… von ExecutorServices
werden über “Futures” abgebildet.
java.util.concurrent.Future
ist hierbei ein Interface, welches eine laufende, unterbrochene, gescheiterte oder erfolgreich abgeschlossene Aufgabe repräsentiert.
Eine Future stellt unter anderem die folgenden Methoden bereit:
Methode | Beschreibung |
---|---|
state() |
Gibt den Status der Aufgabe zurück: - RUNNING : Noch kein Ergebnis- CANCELLED : Siehe #cancel() - SUCCESS : Ergebnis existiert- FAILED : Exception in Aufgabe |
isDone() |
Gibt true zurück, wenn die Aufgabe erfolgreich abgeschlossen wurde (SUCCESS ), oder einen Fehler geworfen hat (FAILED ). |
cancel() |
Versucht, die Aufgabe abzubrechen. Es kann angegeben werden, ob hierfür der unterliegende Thread unterbrochen werden soll. |
isCancelled() |
Gibt true zurück, wenn die laufende Aufgabe abgebrochen wurde. (CANCELLED ) |
resultNow() |
Gibt das Resultat einer erfolgreich abgeschlossenen Aufgabe zurück. |
exceptionNow() |
Gibt die Exception einer fehlgeschlagenen Aufgabe zurück. |
get() |
Blockiere den aufrufenden Thread, bis die Aufgabe abgeschlossen ist. - Gib entweder das Ergebnis zurück, - wirf eine ExecutionException mit dem aufgetretenen Fehler,- wirf eine InterruptedException , wenn der aufrufende Thread beim Warten unterbrochen wurde,- oder wirf eine TimeoutException , wenn der (optionale) Timeout abgelaufen ist. |
Wie bekommt man ein Arbeitsergebnis?
Bisher haben wir ja bei Executors nur die execute()
-Methode behandelt und uns bei ScheduledExecutorServices verschiedene Intervallausführungen angesehen.
Jeder ExecutorService unterstützt jedoch die folgenden Methoden, welche alle entweder sofort ein Arbeitsergebnis, eine oder mehrere Futures zurückgeben:
submit()
: Führt eine asynchrone Aufgabe auf, die einen potenziellen Rückgabewert hat. Gibt sofort eine Future zurück.invokeAll()
: Führt eine Reihe an Aufgaben aus und blockiert den aufrufenden Thread, bis alle fertig sind. (Erfolgreich oder durch Exception). Gibt eine Liste an fertigen Futures zurück.invokeAny()
: Startet alle Aufgaben und gibt die zurück, die als erstes erfolgreich beendet wird. Alle anderen Aufgaben werden abgebrochen. Gibt das Arbeitsergebnis der Aufgabe zurück, oder wirft eine Exception.
Diese drei Methoden haben jeweils verschiedene Ausführungen, invoke...()
ermöglicht z.B. auch das Arbeiten mit Timeouts.
Zurück zum Mapper
Wir wollen nun also (möglichst gleichzeitig) eine Reihe an Dateien herunterladen. Sollten wir eine davon herunterladen, können wir sie gleich wieder an die zweite Plattform weiterleiten. Wenn das nicht klappt, markieren wir die Dateitransfers einfach als fehlerhaft.
static void mapFiles(SourceDataset sourceDataset) throws InterruptedException {
// Starte die asynchronen Downloads
var futureFileDownloads = new ArrayList<Future<File>>();
for (URI fileLocation : sourceDataset.fileLocations()) {
Future<File> futureFileDownload = // Plane den Download
THREAD_POOL.submit(() -> downloadFile(fileLocation));
futureFileDownloads.add(futureFileDownload);
}
// Warte darauf, dass der Datensatz neu formatiert, und hochgeladen wird
TargetDataset formattedDataset = reformatDataset(sourceDataset);
TargetResource targetResource = uploadDataset(formattedDataset);
// Sende die heruntergeladenen Dateien an die erstellte Ressource
var futureFileUploads = new ArrayList<Future<?>>();
for (var futureFileDownload : futureFileDownloads) {
// Warte auf das Ergebnis vom Download ...
try {
var file = futureFileDownload.get();
futureFileUploads.add(
// ... und erstelle dann die nächste Aufgabe zum Upload.
THREAD_POOL.submit(() -> targetResource.uploadAndAttach(file))
);
} catch (ExecutionException ee) {
sourceDataset.addFailedTransfer(ee);
}
}
// Überprüfe, dass alle Dateien hochgeladen wurden
for (var fileUpload : futureFileUploads) {
// Warte auf das Ergebnis der Future
try {
fileUpload.get();
} catch (ExecutionException ee) {
sourceDataset.addFailedTransfer(ee);
}
}
}
Abgesehen davon, dass die Funktion langsam etwas groß und unübersichtlich wird, haben wir hier noch ein kleines Bottleneck, bei dem Futures leider an ihre Grenzen kommen.
Findest du es alleine?
Verknüpfte Aufgaben
Im vorherigen Beispiel haben wir erst die Downloads gestartet,
dann die notwendige Zielressource erstellt und als Nächstes für jeden erfolgreichen Download einen Upload gestartet.
In der zweiten for
-Schleife haben wir jedoch einzeln und nacheinander auf erfolgreiche Downloads gewartet.
Was wäre, wenn die erste Datei extrem lange braucht, und wir noch auf den Download warten, während die anderen schon lange (zum Upload) verfügbar sind?
CompletableFuture
… ist eine Implementierung von Future
, welche zusätzlich noch eine ewige Menge an Methoden aus dem Interface CompletionStage
bereitstellt, mit denen man Aufgaben verketten kann.
Die Gute Nachricht ist jedoch, dass all diese Aufgaben der gleichen Namenskonvention folgen:
Affix | Beschreibung | Beispiele |
---|---|---|
apply |
Nutzt das Ergebnis des vorherigen Schrittes und gibt das daraus erzeugte, eigene Arbeitsergebnis zurück. | thenApply , applyToEither |
accept |
Verbraucht das Ergebnis des vorherigen Schritts ohne ein eigenes Arbeitsergebnis zu erzeugen. | thenAccept , thenAcceptBoth , acceptEither |
runAfter |
Führt eine unabhängige Aufgabe nach Abschluss des letzten Schrittes aus, ohne mit dessen Ergebnis zu arbeiten. | runAfterBoth , runAfterEither |
compose |
Führt zwei CompletableFutures zu einer neuen zusammen. | thenCompose , exceptionallyCompose |
complete |
Beendet die aktuell laufende Aufgabe manuell. (Mit Ergebnis / Fehler / einem Timeout) | complete , completeExceptionally |
exceptionally |
Behandelt vorherige Exceptions und gibt alternative Ergebniswerte zurück. | exceptionally , exceptionallyCompose |
either |
Führt die Aktion aus, nachdem einer von zwei Schritten abgeschlossen ist. | applyToEither , acceptEither |
both |
Führt die Aktion aus, nachdem beide Schritte abgeschlossen sind. | thenAcceptBoth , runAfterBoth |
async |
Delegiert den aktuellen Verarbeitungsschritt an einen ExecutorService, der ihn asynchron ausführt. | thenApplyAsync , completeAsync |
handle |
Erzeugt einen Wert, egal ob die vorherige CompletionStage erfolgreich oder fehlerhaft war. | handle , handleAsync |
whenComplete |
Führt Aktionen basierend auf dem letzten Ergebnis (Erfolg oder Fehler) aus und gibt dieses unverändert weiter. | whenComplete , whenCompleteAsync |
combine |
Kombiniert Ergebnisse zweier CompletableFutures mit einer Funktion. Theoretisch äquivalent zu thenApplyBoth . |
thenCombine , thenCombineAsync |
Zeit, also, den Dateitransferprozess einmal mit CompletableFutures abzubilden:
static void mapFiles(SourceDataset sourceDataset) throws InterruptedException {
var futureTargetResource = CompletableFuture // Stoße das Hochladen der Ressource an
.supplyAsync(() -> reformatDataset(sourceDataset), THREAD_POOL)
.thenApplyAsync((formattedDataset) -> uploadDataset(formattedDataset));
// Zu diesem Zeitpunkt existiert die Ressource noch nicht unbedingt,
// es wurde bisher nur die asynchrone Aufgabe abgegeben.
var futureFileTransfers = new ArrayList<CompletableFuture<?>>();
for (URI fileLocation : sourceDataset.fileLocations()) {
var futureFileTransfer = CompletableFuture // Lade jede Datei herunter
.supplyAsync(() -> downloadFile(fileLocation), THREAD_POOL)
.thenAcceptBothAsync(
futureTargetResource, // Warte auf die Ressource und hänge Datei an
(file, resource) -> resource.uploadAndAttach(file),
);
// Merke dir, die laufenden Dateitransfers
futureFileTransfers.add(futureFileTransfer);
}
// Warte, bis die Ressource erstellt, und alle Dateitransfers abgeschlossen wurden
for (var futureFileTransfer : futureFileTransfers) {
try {
// Blockiere den aktuellen Thread und erwarte Ergebnis (Erfolg oder Fehler)
futureFileTransfer.get();
} catch (ExecutionException ee) {
sourceDataset.addFailedTransfer(ee.getCause());
}
}
}
Gut zu wissen:
Wenn wir in der ersten Stage einen ExecutorService angeben, wird dieser in folgenden Stages übernommen.
Fazit
Wir wissen nun, wie man asynchrone Aufgaben erstellt, zusammenhängt und auswertet.
Durch das Übergeben eines beliebigen ExecutorServices können wir hier auch recht feingranular unseren Ressourcenkonsum steuern.
Im nächsten Artikel lernen wir den ForkJoinPool
bzw den “commonPool” kennen.
Dieser wird häufig in der Standardbibliothek (z.B. auch von CompletableFutures) verwendet, wenn wir selber keinen Threadpool angeben.