⚠️ Achtung!
Während ich diesen Artikel schreibe, ist Structured Concurrency erst als Preview-Feature verfügbar. Wir schauen uns hier also die Version aus Java 25, JEP 505 an, in der Zukunft kann es jedoch noch einige Änderungen geben!

Preview-Features aktiviert man, indem man --enable-preview als Compilerflag übergibt.

Was ist das?

Das Konzept von Structured Concurrency ist recht einfach. Wir sind in einem Thread, der möchte, dass ganz viele Aufgaben (am besten IO intensiv) asynchron in Virtual Threads durchgeführt werden.

Diese Aufgaben könnten auch zum Beispiel ein Endresultat haben; es zählt jedoch vorerst nur, dass sie am Ende nach einem bestimmten Kriterium “fertig” sind.

Beispiel

Wir wollen ganz viele Dateien herunterladen. Wenn jedoch nur ein Download fehlschlägt, brechen wir ab.

static void downloadFiles(SourceDataset sourceDataset) throws InterruptedException {
	try (var scope = StructuredTaskScope.open()) {
		// Gebe die asynchronen Downloads in Auftrag. Pro Download ein Virtual Thread
		for (URI fileLocation : sourceDataset.fileLocations())
			scope.fork(() -> downloadFile(fileLocation));
			
		// Warte im aufrufenden Thread darauf,
		// dass alle Downloads erfolgreich sind, oder einer fehlschlägt.
		scope.join();
	}
}

Okay, jetzt wäre es aber schon auch noch gut, mit den Ergebnissen zu arbeiten. Lass uns die heruntergeladenen Dateien weiterleiten. Dieses Mal warten wir jedoch auf alle, unabhängig davon ob einer fehlschlägt.

static void downloadFiles(SourceDataset sourceDataset) throws InterruptedException {
	// Der "Joiner" awaitAll() lässt uns auf alle Aufgaben warten
	try (var scope = StructuredTaskScope.open(StructuredTaskScope.Joiner.awaitAll())) {
		for (URI fileLocation : sourceDataset.fileLocations()) {
			scope.fork(() -> { 
				// Hier können wir bedenkenlos sequentielle Abläufe schreiben,
				// denn die Virtual Threads kümmern sich um die Nebenläufigkeit.
				var downloadedFile = downloadFile(fileLocation);
				transferFile(downloadedFile);
			});
		}
		scope.join(); // führt den Joiner aus
	}
}

Was gibt es für Joiner?

Joiner Beschreibung
awaitAllSuccessfulOrThrow() - Standard für open(). Warte darauf, dass alle Aufgaben erfolgreich sind, oder mindestens eine fehlschlägt.
- Produziert keinen Ergebniswert.
allSuccessfulOrThrow() - Warte darauf, dass alle Aufgaben erfolgreich sind.
- Gib die Ergebnisse dieser Aufgaben als Stream<Subtask> mit State SUCCESS zurück.
- Lass join() eine FailedException werfen, wenn mindestens eine Aufgabe fehlschlägt.
awaitAll() - Warte darauf, dass alle Aufgaben erfolgreich sind, unabhängig davon, ob sie erfolgreich sind oder fehlschlagen.
- Produziert keinen Ergebniswert.
anySuccessfulResultOrThrow() - Warte darauf, dass mindestens eine Aufgabe erfolgreich ist.
- Gib den Ergebniswert dieser Aufgabe zurück.
- Lass join() eine FailedException werfen, wenn alle Aufgaben fehlschlagen.
allUntil(Predicate<Subtask>) - Warte darauf, dass entweder alle Aufgaben beendet sind, oder darauf, dass das Predicate, beim Aufruf von join(), für genau einen abgeschlossenen Subtask true zurückgibt.
- Gib die Ergebnisse dieser Aufgaben als Stream<Subtask> zurück.
Eigenimplementierung Joiner ist ein public interface; man kann also entsprechende eigene Logik schreiben. Es müssen auch nur drei Funktionen onFork(Subtask), onComplete(Subtask) und result() implementiert werden.

Anwendungsbeispiel & Fazit

Wir haben in dieser Serie einige verschiedene Mechanismen kennengelernt, mit denen man asynchrone Aufgaben ausführen kann. Für unser Anwendungsbeispiel, den Mapper, können wir nun ein letztes Mal überlegen, was wohl die besten Werkzeuge für die parallele Umsetzung sind.

Gedanken

  • Der Mapper ist von Grund auf sehr IO-Lastig: Die Aufgaben sind hauptsächlich das Runter- und Hochladen von Dateien. Nur das Formatieren des Datensatzes ist ein CPU-Task.
  • Die Aufgaben bauen aufeinander auf: Wir können die Dateien erst auf der Zielplattform hochladen, nachdem wir sie von der Quellplattform heruntergeladen, und die Zielressource erstellt haben.
  • Wir können mehrere Mappingvorgänge gleichzeitig ausführen, da diese vollkommen unabhängig voneinander sind.
/// Für IO-Tasks
static final ExecutorService VIRTUAL_TASK = Executors.newVirtualThreadPerTaskExecutor();

/// Für CPU-Tasks
static final ExecutorService IN_THREAD_POOL = new ThreadPoolExecutor(
		1, Runtime.getRuntime().availableProcessors() * 3,
		6, TimeUnit.MINUTES,
		new LinkedBlockingQueue<>() // Unbeschränkte Warteschlange
);

static void main() {
	final var SCHEDULED_EXEC = Executors.newSingleThreadScheduledExecutor();
	// Führe alle 5 Minuten neue Mappingläufe durch
	SCHEDULED_EXEC.scheduleWithFixedDelay(
			() -> performNewestMappings(),
			0, 5, TimeUnit.MINUTES
	);
	// Logik zum Herunterfahren
	...
}

static void performNewestMappings() {
	// Mach erst mit der nächsten Iteration weiter,
	// wenn alle neuen Mappings abgeschlossen sind.
	try (var scope = StructuredTaskScope.open(StructuredTaskScope.Joiner.awaitAll())) {
		for (var newDataset : fetchNewestData())
			scope.fork(() -> mapDataset(newDataset));
		scope.join();
	} catch (InterruptedException e) {
		// Sollte nie auftreten, aber wenn doch,
		// fahre den SCHEDULED_EXEC vor nächster Mapping-Iteration herunter.
		Thread.currentThread().interrupt();
	}
}

static void mapDataset(SourceDataset sourceDataset) {
	var futureTargetResource = // Stoße das Hochladen der Ressource an
			CompletableFuture
					// Formatieren im regulären Platformthreadpool
					.supplyAsync(() -> reformatDataset(sourceDataset), IN_THREAD_POOL)
					// Hochladen mit Virtual Thread
					.thenApplyAsync((formattedDataset) -> uploadDataset(formattedDataset), VIRTUAL_TASK);

	var futureFileTransfers = Arrays.stream(sourceDataset.fileLocations())
			.map(uri -> CompletableFuture.supplyAsync(() -> downloadFile(uri))
					.thenAcceptBothAsync(
						futureTargetResource,
						// Warte auf durch Virtual Thread erstellte Zielressource
						(downloadedFile, targetResource) // und lade die Datei hoch 
							-> targetResource.uploadAndAttach(downloadedFile),
						VIRTUAL_TASK
					).exceptionally(ex -> {
						sourceDataset.addFailedTransfer(ex);
						return null;
					}))
			.toArray(CompletableFuture[]::new);

	// Warte darauf, dass alle Transfers fertig sind
	CompletableFuture.allOf(futureFileTransfers).join();
}

Das wars nun vorerst mit Multithreading, aber vielleicht gibt es ja später noch eine weitere Artikelserie über Thread-Safety. 🤷🏻
Wenn du Anmerkungen, Fragen oder Kritik hast, schreib mir gerne eine E-Mail oder auf LinkedIn.