Im vorherigen Artikel haben wir einfache Executors kennengelernt. Hier wurde, durch den ScheduledExecutorService, bereits subtil gespoilert, dass es auch noch ExecutorServices gibt.

Was ist das?

ExecutorServices sind Erweiterungen von Executors, welche neben der Fähigkeit, Arbeit aufzunehmen, auch die Möglichkeit haben, Arbeitsergebnisse zurückzugeben und ihre Ausführung zu beenden. Alle Executors, die von den Executors.new...()-Funktionen erzeugt werden, sind übrigens auch ExecutorServices. (Das müssen sie aber nicht immer sein.)

ExecutorServices terminieren

Wir nehmen einfach mal an, dass wir unser Beispielprogramm herunterfahren wollen, um ein Update einzuspielen.

ExecutorService#shutdown()

Bittet den ExecutorService freundlich, ab sofort keine weiteren Aufgaben mehr anzunehmen und nach Beendigung der aktiven Aufgabe(n) die verwendeten Threads zu schließen. Der aufrufende Thread läuft jedoch weiter.

ExecutorService#awaitTermination()

Diese Methode funktioniert in Kombination mit shutdown(), ist aber selbst kein Aufruf an den ExecutorService, herunterzufahren. Sie lässt lediglich den aufrufenden Thread warten, bis der ExecutorService entweder heruntergefahren, oder der spezifizierte Timeout abgelaufen ist. Je nachdem wird entweder true (erfolgreich heruntergefahren) oder false (Timeout abgelaufen, das Teil läuft noch) zurückgegeben. Ohne einen vorherigen Aufruf zu shutdown(), hat die Nutzung dieser Funktion keinen Sinn.

executorService.shutdown(); // ExecutorService soll herunterfahren
System.out.println("ExecutorService wurde angewiesen zu stoppen");
if (executorService.awaitTermination(1, TimeUnit.DAYS)) // Hauptthread wartet
	System.out.println("ExecutorService wurde gestoppt!");
else { // ExecutorService stoppt nicht durch shutdown()
	System.out.println("Ein Tag ist vergangen, aber der Service läuft immer noch!");
	...

Was machen wir, wenn er nicht stoppen will? Vielleicht hat sich ja eine Aufgabe aufgehängt?

ExecutorService#shutdownNow()

… ist das etwas aggressivere Äquivalent von shutdown(). Man kann sich das Ganze wie den Unterschied zwischen einer Bitte und einem Befehl vorstellen, wobei shutdownNow() den unterliegenden Thread anweist, seine Arbeit zu unterbrechen (mehr dazu gleich) und zusätzlich noch die Liste mit den Aufgaben zurückgibt, die geplant waren, aber definitiv nicht mehr abgearbeitet werden.

...
else { // ExecutorService stoppt nicht durch shutdown()
	System.out.println("Ein Tag ist vergangen, aber der Service läuft immer noch!");
	// Du hast 20 Sekunden um aufzuhören, sonst fliegt uns die Anwendung um die Ohren
	List<Runnable> remainingTasks = fixedThreadPool.shutdownNow(); 
	if (!fixedThreadPool.awaitTermination(20, TimeUnit.SECONDS))
		throw new AssertionError("Irgendwas ist kolossal schief gelaufen.");
    ...
}

Meistens funktioniert das. Wenn nicht, hat man eine falsch geschriebene Thread-Implementation genutzt, führt eine ewig blockierende IO-Operation ohne Timeout aus, oder ignoriert Interrupts (die Anweisung an einen Thread, seine Aufgabe zu unterbrechen).

Interrupts

… sind ein, anfangs übersprungener, Mechanismus um laufende Threads zu unterbrechen und dann herunterzufahren. Die Idee ist recht simpel:

Thread thread = new Thread(() -> infiniteTask());
thread.start(); // Starte die Aufgabe
...
thread.interrupt(); // Unterbreche die Aufgabe

Das, was es jedoch komplex macht, ist, dass der Code im Thread selbst entscheiden kann, wie, und ob er sich unterbrechen lässt.

Wie kriegt der Thread mit, dass er unterbrochen werden soll?

Etwas, was dir vermutlich schon sehr früh passiert ist, ist, dass du eine Funktion aufrufst, welche eine InterruptedException werfen kann. Ein einfaches Beispiel hierfür sind diverse Versionen von “sleep”.

Thread.sleep(1_000);        // throws InterruptedException
TimeUnit.SECONDS.sleep(1);  // throws InterruptedException
new Object().wait();        // throws InterruptedException

Diese drei Funktionen können im Thread ausgeführt werden, und sorgen dafür, dass er für eine bestimmte Zeit schläft, danach jedoch wieder von alleine seine Arbeit aufnimmt. Sollte der Thread nun also gerade schlafen, währenddessen jedoch von einem anderen Thread durch einen Interrupt dabei unterbrochen werden, muss klar kommuniziert werden: “Der Aufruf von sleep wurde unterbrochen, der Thread wurde manuell aufgeweckt und soll jetzt heruntergefahren werden”.
Genau das sagt die InterruptedException.

try {
	TimeUnit.SECONDS.sleep(1);
    // Der Thread hat ununterbrochen geschlafen
} catch (InterruptedException ie) {
    // Der Thread wurde beim Schlafen interrupted
}

Einige Funktionen aus der Standardbibliothek, die explizit auf etwas warten, können eine InterruptedException werfen. Andere Beispiele sind…

  • HttpClient#send() (…beim Warten auf die Antwort)
  • Thread#join() (…beim Warten auf einen anderen Thread)
  • Lock#tryLock(long, TimeUnit) (…beim Versuch, innerhalb der Zeit das Schloss zu sperren)

Was passiert, wenn man keine Exception bekommt?

Da man in seinem Thread nicht zwingend Funktionen aufruft, die InterruptedExceptions werfen, sollte man hin und wieder mal “manuell” überprüfen, ob der Thread zur Unterbrechung aufgefordert wurde.

Thread#isInterrupted()

…ist eine Instanzmethode (nicht-statisch) auf jedem Thread-Objekt. Sie gibt einen boolean zurück, welcher ausdrückt, ob der jeweilige Thread unterbrochen wurde.

Diese Methode sollte eher genutzt werden, wenn man von außen, also von einem observierenden, anderen Thread an der Information interessiert ist.

if (otherThread.isInterrupted()) {
    // Anderer Thread ist unterbrochen und fährt hoffentlich gleich herunter
}

Wenn man schauen möchte, ob der aktuelle Thread unterbrochen wurde, verwendet man:

Thread.interrupted()

Eine statische Funktion aus der Thread-Klasse. Sie gibt nicht nur zurück, ob der aktuelle Thread, also der, in dem der Code gerade läuft, unterbrochen wurde. Sie entfernt auch wieder diesen Status, denn es wird davon ausgegangen, dass der aktuelle Thread nach dem Aufruf selber explizit sein Herunterfahren einleitet.

Hier ein Codebeispiel über eine Warteschlange mit mehreren Dateien, die in einem separaten Thread abgearbeitet werden. Nach jeder einzelnen Datei schaut der Thread, ob eine Unterbrechung angefragt wurde; wenn ja, beendet er die Schleife.

Queue<File> queue = new ArrayDeque<>();
new Thread(() -> {
	while (!queue.isEmpty()) {
    	var file = queue.poll();
    	doSomethingWith(file);
        // Könnte auch in der Bedingung der Schleife stehen, ist so aber expliziter:
    	if (Thread.interrupted()) 
        	break;
    }
});

Kann man Interrupts ignorieren?

Ja, aber man sollte es nicht tun. Niemals!

Natürlich kann man eine InterruptedException fangen und so tun, als sei nichts gewesen. Man kann ebenfalls Thread.interrupted() aufrufen und nichts danach tun, das ist aber nicht der Sinn der Sache.

Angenommen, wir sind wieder beim obigen Beispiel, wollen einen ExecutorService schließen, um das Programm herunterzufahren, unser Thread ignoriert seine Interrupts und läuft vielleicht auch noch in einer Endlosschleife.

Jetzt kann man entweder damit leben, dass man die komplette Kontrolle über einen Codebereich und die damit assoziierten Ressourcen abgegeben hat, (schlechte Idee), oder den Thread auf anderem Wege gewaltsam abschießen (meistens noch schlechtere Idee).

Wenn der Thread gerade zum Beispiel dabei war, in ein Backup zu schreiben, kann er, wenn man ihn ziellos killt, die Datei korrumpieren. Man sollte also immer einen sog. “graceful shutdown” (ordnungsgemäßes Herunterfahren) implementieren.

Was bedeutet das für ExecutorServices?

  • Wenn man Code in einem ExecutorService ausführt, dann läuft er meistens in einem separaten Thread.
  • Der Thread kann interrupted werden; zum Beispiel, weil shutdownNow() auf dem ExecutorService ausgeführt wurde.
  • Code in ExecutorServices sollte, genau wie in Thread-Implementierungen, (regelmäßig) auf Interrupts hören und entsprechend die aktuelle Aufgabe beenden.

Die meisten Implementierungen von ExecutorServices haben jedoch eingebaute Mechanismen, welche schon beim shutdown() greifen. Es werden einfach keine neuen Aufgaben mehr angenommen, oder bei ScheduledExecutorServices, keine weitere Wiederholung angestoßen.

Wenn man überprüfen will, ob ein ExecutorService die Aufforderung erhalten hat, keine weiteren Aufgaben mehr anzunehmen, (aber vielleicht noch läuft), dann kann man dies über executorService.isShutdown() tun. Mit executorService.isTerminated() bekommt man versichert, dass ebenfalls keine Aufgaben mehr abgearbeitet werden.

Noch ein Zwischenfazit

Inzwischen haben wir uns schon etwa die Hälfte der Methoden in ExecutorService angeschaut und wissen jetzt, wie wir den Mapper guten Gewissens unterbrechen und herunterfahren können.

// Erstelle und starte den Mapper
ScheduledExecutorService scheduledMappingService = Executors.newSingleThreadScheduledExecutor();
scheduledMappingService.scheduleWithFixedDelay(...);

// Fahre den Mapper irgendwann herunter
... throws InterruptedException {
	scheduledMappingService.shutdown();
	try {
    	if (scheduledMappingService.awaitTermination(10, TimeUnit.MINUTES))
        	return; // ExecutorService wurde ordnungsgemäß heruntergefahren
    } catch (InterruptedException e) {
        // Das normale Herunterfahren wurde unterbrochen
        // Der aktuelle Thread soll sich merken, dass er später herunterfahren soll
    	Thread.currentThread().interrupt();
    }
    // Entweder der ExecutorService will nicht herunterfahren,
    // oder der aktuelle Thread wurde dabei unterbrochen
	scheduledMappingService.shutdownNow();
	if(scheduledMappingService.awaitTermination(10, TimeUnit.SECONDS))
    	return; // ExecutorService wurde erfolgreich abgebrochen
	throw new AssertionError("ExecutorService did not terminate");
}

Für die Ausführung der einzelnen Mappings brauchen wir jedoch noch etwas Besseres als den bekannten ScheduledExecutorService, denn der bildet unseren Anwendungsfall noch nicht ideal ab.

Ab dem nächsten Artikel wird sich dies jedoch ändern, denn wir schauen uns eine etwas mächtigere Art von ExecutorServices an.