Teil 3) Multithreading in Java
Einfache Executors
Bisher haben wir etabliert, was OS-Threads sind und dass sie von Javas Platformthreads abgebildet werden. Außerdem wurde angeschnitten, dass es kostenaufwändig ist, Threads zu starten (braucht viel Speicher und einiges an Zeit und OS-Ressourcen) und es deshalb ineffizient ist, für viele kleine Aufgaben immer einen einzelnen Thread zu starten.
Was ist ein Executor?
Executor
ist ein Interface im Package java.util.concurrent
, welches nur eine einzige Methode definiert:
void execute(Runnable command);
Gut. So weit, so einfach. In der Praxis ist ein Executor also ein Objekt, welches über execute()
ausführbare Aufgaben entgegennimmt.
Diese müssen jedoch nicht unbedingt gleichzeitig ausgeführt werden.
Sequentieller Executor
Der langweiligste Executor ist der, der die übergebene Funktion sofort im selben Thread ausführt.
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
Executor executor = new DirectExecutor();
executor.execute(() -> meineFunktion());
Dies unterscheidet sich jedoch nicht wirklich davon, die Funktion einfach selbst aufzurufen; der wirkliche Nutzen von Executors wird also erst dann offensichtlich, wenn man sie dazu bringt, Aufgaben nebenläufig oder parallel auszuführen.
Concurrent Executors
Nun könnten wir selbst einen Executor schreiben, der nach bestimmten Regeln Threads erstellt und verwaltet, aber genau darum wollten wir uns ja nicht kümmern. Glücklicherweise waren die Freunde von Oracle so nett, und haben uns schon ein paar Implementierungen bereitgestellt, welche wir uns hier anschauen werden.
SingleThreadExecutor
In der Klasse Executors
(wie Executor, nur mit “s” am Ende) gibt es eine Reihe an statischen Funktionen, welche uns bereits vorkonfigurierte Executors bereitstellen. Der simpelste davon ist der SingleThreadExecutor
. Er betreibt genau einen Thread, dem man nacheinander verschiedene Aufgaben zuweisen kann.
public static void main(String[] args) {
// Erstelle einen Executor im Hauptthread...
System.out.println("Erzeuge Executor in " + Thread.currentThread().getName());
Executor singleThreadExecutor = Executors.newSingleThreadExecutor();
// ... und gib ihm drei Aufgaben, die er in einem anderen Thread ausführen soll.
singleThreadExecutor.execute(() -> aufgabe(1));
singleThreadExecutor.execute(() -> aufgabe(2));
singleThreadExecutor.execute(() -> aufgabe(3));
// Führe eine Aufgabe im Hauptthread "main" aus.
aufgabe(4);
}
static void aufgabe(int n) {
System.out.println("Aufgabe " + n + " in " + Thread.currentThread().getName());
}
Wie zu erwarten ist die Ausgabe dieses Programmes:
Erzeuge Executor in main
Aufgabe 4 in main
Aufgabe 1 in pool-1-thread-1
Aufgabe 2 in pool-1-thread-1
Aufgabe 3 in pool-1-thread-1
… wobei “pool-1-thread-1” der Name des Threads im Executor ist.
ThreadPerTaskExecutor
Wir haben bereits mehrfach besprochen, dass es ein Antipattern ist, für viele kleine Aufgaben jeweils einen Platformthread zu starten.
Deshalb kann es seltsam sein, wenn man plötzlich Executors.newThreadPerTaskExecutor()
entdeckt.
Diese Funktion nimmt eine sog. ThreadFactory
entgegen, also ein Objekt, welches Threads erzeugt, und nutzt sie für jede einzelne Aufgabe.
Ich möchte noch einmal ausdrucksstark betonen, dass es in den allermeisten Fällen eine schlechte Idee ist, hier reguläre Platformthreads zu erzeugen.
// ⚠️ Achtung: Antipattern!
Executors.newThreadPerTaskExecutor(Thread::new);
Executors.newThreadPerTaskExecutor(Thread.ofPlatform().factory());
Warum diese Funktion trotzdem eine Existenzberechtigung hat, schauen wir uns später noch einmal an. Tatsächlich sollte man sie in der Praxis jedoch, so oder so, nie direkt aufrufen.
SingleThreadScheduledExecutor
Die dritte und letzte “grundlegende” Factory-Methode der Executors-Klasse ist Executors.newSingleThreadScheduledExecutor()
und hier kommen gleich zwei Konzepte auf uns zu.
Angenommen, wir wollen eine Aufgabe verzögert und / oder in einem regelmäßigen Intervall ausführen, so können wir auf den ScheduledExecutorService
, eine Implementierung des Executors
zurückgreifen.
Witzigerweise passiert, wenn wir hier nur execute()
aufrufen, nichts anderes als beim SingleThreadExecutor
.
Unsere Aufgaben werden ohne Verzögerung nacheinander in einem anderen Thread abgearbeitet.
Die Funktionen, die diesen Executor interessant machen, sind: schedule
, scheduleWithFixedDelay
und scheduleAtFixedRate
.
schedule()
Führt eine einmalige Aufgabe mit einer bestimmten Verzögerung aus.
var scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
scheduledExecutor.schedule(() -> aufgabe(1), 5, TimeUnit.SECONDS);
aufgabe(2);
Der Hauptthread instanziiert hier den Executor und führt dann Aufgabe 2 aus, der Executor wartet aber (währenddessen) noch 5 Sekunden, und führt dann erst Aufgabe 1 aus.
Wichtig ist hier, dass aufgabe(1)
nach 5 Sekunden erst gestartet wird und nicht fertig ist.
Wie lange eine Aufgabe braucht, kann der Executor nicht garantieren.
scheduleWithFixedDelay()
Der Executor führt eine Aufgabe aus, wartet nach Abschluss für eine bestimmte Zeit und wiederholt dann das Ganze. Wie bei schedule()
kann hier ebenfalls eine initiale Pause eingebaut werden, die abgewartet wird,
bevor der Prozess gestartet wird.
// Warte 5 Sekunden, führe dann unendlich lange Aufgabe 1 aus, jeweils mit 10sek Pause
int initialDelay = 5, delayBetweenTasks = 10;
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(
() -> aufgabe(1), initialDelay, delayBetweenTasks, TimeUnit.SECONDS
);
scheduleAtFixedRate()
Der Executor startet eine Aufgabe in einem gleichmäßigen Interval, zum Beispiel alle 10 Sekunden, unabhängig davon, ob sie vorher schon abgeschlossen wurde. Hier handelt es sich nicht um einen Neustart; wenn die Aufgaben also länger brauchen als das Intervall groß ist, kann das die Warteschlange verstopfen.
// Warte 5 Sekunden und starte dann alle 10sek Aufgabe 1
int initialDelay = 5, delayBetweenTasks = 10;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
() -> aufgabe(1), initialDelay, delayBetweenTasks, TimeUnit.SECONDS
);
Hinzuzufügen ist vielleicht noch, dass alle drei Funktionen ebenfalls vollständig asynchron zum aufrufenden Thread laufen. Während sie also bis in alle Ewigkeit Aufgabe 1 ausführen, läuft der Hauptthread weiter und kann machen, was er will.
📖 Asynchronität
Ist ein etwas unintuitiver Fachbegriff, denn man könnte denken, dass “synchron” auch “gleichzeitig” bedeutet und damit äquivalent zu “parallel” ist. Das ist nicht der Fall!Ein Programmabschnitt läuft “asynchron”, wenn er zeitlich unabhängig vom “Hauptprogramm” ausgeführt wird. Beispiele für dieses Verhalten sind Aufgaben, die irgendwann später ausgeführt werden können, wie das Senden von E-Mails.
Zurück zum Beispiel
In unserem Problem haben wir bisher den Hauptthread dazu verwendet, die neuen Datensätze anzufragen, einzeln über jeden davon zu iterieren und die Aufgaben an andere Threads zu verteilen. Was jedoch, wenn wir dies im Hintergrund erledigen wollen?
// Eine Reihe an Datensätzen wird, ohne initiale Verzögerung, in einem Nebenthread,
// und mit 10 Sekunden Pause zwischen zwei Durchläufen, gemapped.
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(() -> {
List<SourceDataset> newestData = fetchNewestData();
for (var sourceDataset : newestData) {
if (!sourceDataset.isMapped()) {
// Führe ein Mapping durch
}
}
}, 0, 10, TimeUnit.SECONDS);
// ... hier macht der Hauptthread sofort weiter
Nun können wir mit dem Hauptthread beliebige andere Aufgaben durchführen, denn er hängt nicht mehr in einer Endlosschleife fest. Dies könnte zum Beispiel die Verwaltung einer Oberfläche, etwa einer Konsolen-, Desktop- oder Web-Anwendung sein.
Zwischenfazit
In diesem Artikel haben wir Executors kennengelernt, also Objekte, die bestimmte Aufgaben übernehmen, und potenziell asynchron ausführen können.
Wir sind auf dem Weg über ScheduledExecutorServices
gestolpert, welche uns helfen, wiederkehrende Aufgaben mit Verzögerung und Pausen in einem Nebenthread auszuführen.
Im nächsten Teil der Serie geht es ganz allgemein um ExecutorServices und darum, Threads zu unterbrechen.