Im letzten Artikel haben wir uns Threadpools, als einen ExecutorService, der mehrere verschiedene Aufgaben gleichzeitig abarbeitet, angesehen. Anwendungsbeispiel war ein Upload von mehreren Dateien; in diesem Fall eine sog. “fire and forget” Aktion.

📖 Fire and forget
Programmabschnitte von denen man sich keine Rückmeldung erwartet, z.B. das Senden einer E-Mail. Diese Aufgaben können meistens, sehr einfach, asynchron abgearbeitet werden.

Wenn man jedoch wissen möchte, ob die Aufgaben erfolgreich durchgeführt wurden, oder sie sogar eine explizite Antwort zurückgeben, muss man das Gegenteil verwenden. Futures.

Ein Thread mit einer Future: KI generierte Grafiken von OpenAI, August 2025

Beispiel: Dateidownload

In unserem Anwendungsbeispiel, dem Mapper, wollen wir für einen Datensatz eine Reihe von Dateien herunterladen. Hierfür wäre es schön, wenn wir wüssten, wie man Arbeitsergebnisse aus einem ExecutorService weiterverarbeitet, da wir in diesem Beispiel unsere heruntergeladenen Dateien danach woanders hochladen wollen.

Arbeitsergebnisse

… von ExecutorServices werden über “Futures” abgebildet. java.util.concurrent.Future ist hierbei ein Interface, welches eine laufende, unterbrochene, gescheiterte oder erfolgreich abgeschlossene Aufgabe repräsentiert. Eine Future stellt unter anderem die folgenden Methoden bereit:

Methode Beschreibung
state() Gibt den Status der Aufgabe zurück:
- RUNNING: Noch kein Ergebnis
- CANCELLED: Siehe #cancel()
- SUCCESS: Ergebnis existiert
- FAILED: Exception in Aufgabe
isDone() Gibt true zurück, wenn die Aufgabe erfolgreich abgeschlossen wurde (SUCCESS), oder einen Fehler geworfen hat (FAILED).
cancel() Versucht, die Aufgabe abzubrechen. Es kann angegeben werden, ob hierfür der unterliegende Thread unterbrochen werden soll.
isCancelled() Gibt true zurück, wenn die laufende Aufgabe abgebrochen wurde. (CANCELLED)
resultNow() Gibt das Resultat einer erfolgreich abgeschlossenen Aufgabe zurück.
exceptionNow() Gibt die Exception einer fehlgeschlagenen Aufgabe zurück.
get() Blockiere den aufrufenden Thread, bis die Aufgabe abgeschlossen ist.
- Gib entweder das Ergebnis zurück,
- wirf eine ExecutionException mit dem aufgetretenen Fehler,
- wirf eine InterruptedException, wenn der aufrufende Thread beim Warten unterbrochen wurde,
- oder wirf eine TimeoutException, wenn der (optionale) Timeout abgelaufen ist.

Wie bekommt man ein Arbeitsergebnis?

Bisher haben wir ja bei Executors nur die execute()-Methode behandelt und uns bei ScheduledExecutorServices verschiedene Intervallausführungen angesehen. Jeder ExecutorService unterstützt jedoch die folgenden Methoden, welche alle entweder sofort ein Arbeitsergebnis, eine oder mehrere Futures zurückgeben:

  • submit(): Führt eine asynchrone Aufgabe auf, die einen potenziellen Rückgabewert hat. Gibt sofort eine Future zurück.
  • invokeAll(): Führt eine Reihe an Aufgaben aus und blockiert den aufrufenden Thread, bis alle fertig sind. (Erfolgreich oder durch Exception). Gibt eine Liste an fertigen Futures zurück.
  • invokeAny(): Startet alle Aufgaben und gibt die zurück, die als erstes erfolgreich beendet wird. Alle anderen Aufgaben werden abgebrochen. Gibt das Arbeitsergebnis der Aufgabe zurück, oder wirft eine Exception.

Diese drei Methoden haben jeweils verschiedene Ausführungen, invoke...() ermöglicht z.B. auch das Arbeiten mit Timeouts.

Zurück zum Mapper

Wir wollen nun also (möglichst gleichzeitig) eine Reihe an Dateien herunterladen. Sollten wir eine davon herunterladen, können wir sie gleich wieder an die zweite Plattform weiterleiten. Wenn das nicht klappt, markieren wir die Dateitransfers einfach als fehlerhaft.

static void mapFiles(SourceDataset sourceDataset) throws InterruptedException {
    // Starte die asynchronen Downloads
    var futureFileDownloads = new ArrayList<Future<File>>();
    for (URI fileLocation : sourceDataset.fileLocations()) {
        Future<File> futureFileDownload = // Plane den Download
          THREAD_POOL.submit(() -> downloadFile(fileLocation));
        futureFileDownloads.add(futureFileDownload);
    }

    // Warte darauf, dass der Datensatz neu formatiert, und hochgeladen wird
    TargetDataset formattedDataset = reformatDataset(sourceDataset);
    TargetResource targetResource = uploadDataset(formattedDataset);

    // Sende die heruntergeladenen Dateien an die erstellte Ressource
    var futureFileUploads = new ArrayList<Future<?>>();
    for (var futureFileDownload : futureFileDownloads) {
        // Warte auf das Ergebnis vom Download ...
        try {
            var file = futureFileDownload.get();
            futureFileUploads.add(
                // ... und erstelle dann die nächste Aufgabe zum Upload.
                THREAD_POOL.submit(() -> targetResource.uploadAndAttach(file))
            );
        } catch (ExecutionException ee) {
            sourceDataset.addFailedTransfer(ee);
        }
    }

    // Überprüfe, dass alle Dateien hochgeladen wurden
    for (var fileUpload : futureFileUploads) {
        // Warte auf das Ergebnis der Future
        try {
            fileUpload.get();
        } catch (ExecutionException ee) {
            sourceDataset.addFailedTransfer(ee);
        }
	}
}

Abgesehen davon, dass die Funktion langsam etwas groß und unübersichtlich wird, haben wir hier noch ein kleines Bottleneck, bei dem Futures leider an ihre Grenzen kommen.

Findest du es alleine?

Verknüpfte Aufgaben

Im vorherigen Beispiel haben wir erst die Downloads gestartet, dann die notwendige Zielressource erstellt und als Nächstes für jeden erfolgreichen Download einen Upload gestartet. In der zweiten for-Schleife haben wir jedoch einzeln und nacheinander auf erfolgreiche Downloads gewartet. Was wäre, wenn die erste Datei extrem lange braucht, und wir noch auf den Download warten, während die anderen schon lange (zum Upload) verfügbar sind?

CompletableFuture

… ist eine Implementierung von Future, welche zusätzlich noch eine ewige Menge an Methoden aus dem Interface CompletionStage bereitstellt, mit denen man Aufgaben verketten kann. Die Gute Nachricht ist jedoch, dass all diese Aufgaben der gleichen Namenskonvention folgen:

Affix Beschreibung Beispiele
apply Nutzt das Ergebnis des vorherigen Schrittes und gibt das daraus erzeugte, eigene Arbeitsergebnis zurück. thenApply, applyToEither
accept Verbraucht das Ergebnis des vorherigen Schritts ohne ein eigenes Arbeitsergebnis zu erzeugen. thenAccept, thenAcceptBoth, acceptEither
runAfter Führt eine unabhängige Aufgabe nach Abschluss des letzten Schrittes aus, ohne mit dessen Ergebnis zu arbeiten. runAfterBoth, runAfterEither
compose Führt zwei CompletableFutures zu einer neuen zusammen. thenCompose, exceptionallyCompose
complete Beendet die aktuell laufende Aufgabe manuell. (Mit Ergebnis / Fehler / einem Timeout) complete, completeExceptionally
exceptionally Behandelt vorherige Exceptions und gibt alternative Ergebniswerte zurück. exceptionally, exceptionallyCompose
either Führt die Aktion aus, nachdem einer von zwei Schritten abgeschlossen ist. applyToEither, acceptEither
both Führt die Aktion aus, nachdem beide Schritte abgeschlossen sind. thenAcceptBoth, runAfterBoth
async Delegiert den aktuellen Verarbeitungsschritt an einen ExecutorService, der ihn asynchron ausführt. thenApplyAsync, completeAsync
handle Erzeugt einen Wert, egal ob die vorherige CompletionStage erfolgreich oder fehlerhaft war. handle, handleAsync
whenComplete Führt Aktionen basierend auf dem letzten Ergebnis (Erfolg oder Fehler) aus und gibt dieses unverändert weiter. whenComplete, whenCompleteAsync
combine Kombiniert Ergebnisse zweier CompletableFutures mit einer Funktion. Theoretisch äquivalent zu thenApplyBoth. thenCombine, thenCombineAsync

Zeit, also, den Dateitransferprozess einmal mit CompletableFutures abzubilden:

static void mapFiles(SourceDataset sourceDataset) throws InterruptedException {
    var futureTargetResource = CompletableFuture // Stoße das Hochladen der Ressource an
      .supplyAsync(() -> reformatDataset(sourceDataset), THREAD_POOL)
      .thenApplyAsync((formattedDataset) -> uploadDataset(formattedDataset));

    // Zu diesem Zeitpunkt existiert die Ressource noch nicht unbedingt,
    // es wurde bisher nur die asynchrone Aufgabe abgegeben.
    var futureFileTransfers = new ArrayList<CompletableFuture<?>>();
    for (URI fileLocation : sourceDataset.fileLocations()) {
        var futureFileTransfer = CompletableFuture // Lade jede Datei herunter
            .supplyAsync(() -> downloadFile(fileLocation), THREAD_POOL)
            .thenAcceptBothAsync(
                futureTargetResource, // Warte auf die Ressource und hänge Datei an
                (file, resource) -> resource.uploadAndAttach(file),
            );
        // Merke dir, die laufenden Dateitransfers
        futureFileTransfers.add(futureFileTransfer);
    }
    // Warte, bis die Ressource erstellt, und alle Dateitransfers abgeschlossen wurden
    for (var futureFileTransfer : futureFileTransfers) {
        try {
            // Blockiere den aktuellen Thread und erwarte Ergebnis (Erfolg oder Fehler)
            futureFileTransfer.get(); 
        } catch (ExecutionException ee) {
            sourceDataset.addFailedTransfer(ee.getCause());
        }
    }
}

Gut zu wissen:
Wenn wir in der ersten Stage einen ExecutorService angeben, wird dieser in folgenden Stages übernommen.

Fazit

Wir wissen nun, wie man asynchrone Aufgaben erstellt, zusammenhängt und auswertet.
Durch das Übergeben eines beliebigen ExecutorServices können wir hier auch recht feingranular unseren Ressourcenkonsum steuern.

Im nächsten Artikel lernen wir den ForkJoinPool bzw den “commonPool” kennen. Dieser wird häufig in der Standardbibliothek (z.B. auch von CompletableFutures) verwendet, wenn wir selber keinen Threadpool angeben.