Un peu de théorie sur la programmation concurrente
Le serveur, son flux d'exécution et l'API bloquante
Le serveur bloque sur la réception de données d'un client, mais comment faire :
- si le client n'envoie rien ?
- si le serveur veut envoyer en même temps des données vers le client ?
- si un nouveau client arrive ?
- → 1 seul fil d'exécution = 1 seul flot gérable
Intérêt de la programmation concurrente
- Gestion de multiples flots d'E/S
- → permet par exemple pour un serveur de gérer plusieurs clients
- Répartition de calculs sur plusieurs CPUs/coeurs
- → utile pour les algorithmes exploitant des architectures parallèles
Gestion de multiples flux d'entrée/sortie
Première approche inefficace : utiliser un fil avec des méthodes d'E/S bloquantes avec timeout faible : traitement séquentiel des flots nécessaire.
Deux grandes familles d'approches praticables :
- Le multithreading avec méthodes bloquantes : chaque fil s'occupe d'un seul flot. Lorsqu'un fil patiente sur une opération bloquante, l'ordonnanceur redonne la main à un autre fil → optimisation de l'utilisation du temps processeur.
- L'utilisation d'un seul fil avec des méthodes non-bloquantes. Le programmeur gère lui-même l'ordonnancement et peut utiliser un mécanisme de sélecteur afin d'être informé des flots d'entrée avec données en attente et des flots de sortie pouvant accueillir de nouvelles données.
Réception de données de deux clients
Processus et threads
- Chaque processus dispose de son propre espace de mémoire (tas) non accessible par les autres processus.
- Si p1 tente d'accéder au tas de p2 → erreur de segmentation
- Un processus peut contenir plusieurs threads (ou processus légers), chacun associé à un environnement d'exécution (registres) et une pile d'appel contenant des variables locales. Un fil peut accéder aux données du tas du processus
- → plusieurs threads peuvent toucher au tas en même temps ; nécessité de synchronisation pour que les données restent cohérentes
Les peintres
Verrouillage de ressources du tas
Plusieurs threads pouvant lire/écrire sur un objet du tas :
- → nécessité de donner l'accès à une seule thread à la fois
- Les autres threads souhaitant l'accès doivent patienter sagement dans une file d'attente
Ça se complique : les philosophes souhaitent manger
Cinq philosophes pensent et mangent (mais pas les deux à la fois car équipés d'un cerveau monothread).
Pour manger, deux ressources du tas nécessaire : une fourchette gauche et une fourchette droite (pour saisir des spagghetis).
Workflow pour manger :
- penser et saisir la fourchette gauche quand elle est disponible
- penser et saisir la fourchette droite quand elle est disponible
- manger
- reposer la fourchette droite
- reposer la fourchette gauche

Source : Benjamin D. Esham / Wikimedia Commons
Interblocage, bilan : 5 philosophes morts de faim
- t1 saisit sa fourchette droite, t2 sa fourchette droite, ..., t5 sa fourchette droite
- plus personne ne peut continuer son workflow en saisissant sa fourchette gauche
- l'interblocage survient lorsque des threads ont besoin de deux mêmes ressources et les verrouillent en ordre inverse
-
solution : il faut les verrouiller dans le même ordre
- on numérote les fourchettes et chaque philosophe saisit sa plus petite fourchette puis sa plus grande fourchette
Famine
- Compétition pour la saisie de fourchettes : les philosophes lents mangent moins
- Solution : un ordonnanceur doit gérer des files d'attente pour les fourchettes et les distribuer équitablement aux philosophes
-
Exemple d'un supermarché avec deux caisses avec chacune une stratégie de passage différente :
- Caisse 1 : file d'attente classique pour les clients en attente, le 1er arrivé est le 1er qui passe en caisse
- Caisse 2 : sélection aléatoire du prochain client à passer en caisse parmi l'ensemble des clients en attente
-
Faut-il aller attendre en caisse 1 ou 2 ?
- Caisse 1 : attente prévisible dès l'arrivée → nombre de clients déjà dans la file devant nous
- Caisse 2 : dans le meilleur des cas, passage immédiat ; en moyenne, attente égale à la taille de la file ; dans le pire des cas, attente... éternelle !
La classe Thread
Construction d'une instance de Thread
La classe Thread représente un fil que l'on peut créer de deux manières :
- En créant une classe dérivant de Thread implantant la méthode abstraite void run() contenant le code exécuté par le fil. Le fil est créé en instantiant la classe.
- En créant une classe implantant l'interface Runnable avec une méthode void run(), cette classe étant passée au constructeur de Thread.
Une fois la classe (ou un descendant de) Thread instantiée, le fil commence son exécution par un appel à void start().
La méthode run ne doit pas lever d'exception vérifiée (telle qu'une IOException) ; il faut donc capturer ce type d'exceptions dans un bloc try-catch.
Exemple : thread d'incrémentation de compteur
Classe dérivée de Thread
public class IncrementingThread extends Thread { public long counter = 0; public boolean finished = false; public IncrementingThread() { super(); } public void run() { while (! finished) counter++; System.out.println("Counter value: " + counter); } } ... IncrementingThread t = new IncrementingThread(); t.start(); Thread.sleep(WAIT_TIME); t.finished = true; ...
La thread interminable ou l'utilité de volatile
Horreur, la thread d'incrémentation précédente ne s'arrête jamais ! Pourquoi ?
- Maintien par chaque thread d'un cache (registres)
- Utilisation de la version en cache de finished par IncrementingThread
- Mais t.finished = true ne provoque pas la mise à jour de la version en cache d'IncrementingThread.
-
Solution : ne pas cacher finished en le déclarant volatile
- Déclaration : public volatile boolean finished
Vers une solution propre pour arrêter une thread : interrupt()
Méthode interrupt conçue pour interrompre une thread.
Utilisation :
- Depuis la thread contrôlante : appel de auxiliaire.interrupt()
-
Dans la méthode run() de la thread auxiliaire : vérifier régulièrement si une interruption a été émise
- avec Thread.interrupted() : retourne vrai si une interruption a été demandée ; après l'appel l'état d'interruption est réinitialisé à faux
- ne pas utiliser Thread.currentThread().isInterrupted() : buggé sous certaines JVM !
-
ne JAMAIS utiliser la méthode stop() de Thread sous peine d'inculpation pour assassinat barbare de thread...
- ... car cela détruit brutalement la thread alors aue celle-ci était peut-être en train d'exécuter du code important qui ne devrait pas être interrompu (comme mettre à jour des enregistrements dans une base de données).
Thread d'incrémentation avec une expression lambda
final int[] counter = new int[]{0}; Runnable r = () -> { while (! Thread.interrupted()) counter[0]++; System.out.println("Counter value: " + counter[0]); }; Thread t = new Thread(r); t.start(); Thread.sleep(WAIT_TIME); t.interrupt(); ...
Astuce : utilisation d'un tableau d'int à 1 cellule pour passage d'un entier par référence (une variable locale de la méthode englobante n'est pas modifiable par une expression lambda ou une classe anonyme).
Mesure du temps dans une thread
Comment mesurer le temps écoulé pour l'exécution de code dans une thread ?
- Avec une clepsydre ou un sablier : pas très précis...
- Par différence de timestamp currentTimeMillis() :
- long start = System.currentTimeMillis(); ...; long duration = System.currentTimeMillis() - start;
- À ne pas utiliser car il s'agit du temps horloge (peut être modifié à tout moment)
-
Par différence de timestamp nanoTime() :
- long start = System.nanoTime(); ...; long duration = System.nanoTime() - start;
- Permet de mesurer le temps d'exécution réel (wall-clock time)
- Temps d'exécution influencé par des phénomènes externes (E/S, autres processus sur la machine...)
-
Utiliser ManagementFactory.getThreadMXBean() pour avoir des infos sur les threads :
- Obtenir le temps CPU en ns : long getThreadCpuTime(long id)
- Obtenir le temps utilisateur en ns : long getThreadUserTime(long id)
- Et plein d'autres choses intéressantes : nombre de threads, stack traces des threads, threads interbloquées... lire la Javadoc pour plus d'infos.
Mise en sommeil d'une thread
- Mise en sommeil pendant un temps déterminé :
- static void Thread.sleep(long millis) throws InterruptedException
- Attente passive jusqu'à la terminaison d'une autre thread t :
- t.join([long millis]) throws InterruptedException
- InterruptedException est lancée si une interruption est demandée sur la thread
- Mise en pause temporaire (pour ne pas affamer les autres threads) :
- static void Thread.yield()
Exercice :
soient 3 threads current, t1 et t2. Nous sommes dans current et souhaitons attendre :
- au minimum min millisecondes
- au maximum max millisecondes
- tout au plus jusqu'à la fin d'exécution de t1 et t2 sauf si cette fin intervient avant min millisecondes
Solution :
try { int start = System.nanoTime(); // en nanosecondes Thread.sleep(min); // attendons au moins min millisecondes int remaining = max - min; t1.join(remaining); // devrait retourner immédiatement si t1 est déjà terminée. remaining = Math.max(0, (int)(max - (System.nanoTime() - start) / 1_000_000L)); t2.join(remaining); } catch (InterruptedException e) { // an interruption has been made on the thread }
À propos d'InterruptedException
- Exception levable par une méthode réalisant une attente passive (sans solliciter le CPU) lorsqu'elle est interrompue par un appel à interrupt()
- Gestion : capture nécessaire avec sortie de la méthode run (utilisation d'un finally recommandé pour libérer les ressources)
- Malheureusement les IOs "classiques" ne lèvent pas cette exception... mais les InterruptibleChannel le peuvent !
Propriétés des threads Java
- Les threads peuvent être organisés en arbre de groupes de threads (ThreadGroup). Chaque thread peut rejoindre un groupe à son initialisation (premier argument du constructeur).
- Chaque thread peut être nommé par un String ({get,set}Name())).
- Chaque thread possède une priorité (int getPriority()) modifiable (void setPriority(int p)) dans l'intervalle [Thread.MIN_PRIORITY..Thread.MAX_PRIORITY].
- Un thread peut être en mode démon (boolean isDaemon()).
- La JVM se termine uniquement lorsqu'il ne reste plus que des threads démons en cours d'exécution (par exemple les threads du ramasse-miettes de la JVM sont en mode démon).
État d'une thread : Thread.State
Cinq états possibles pour une thread Java :
- NEW : Thread créée mais run() non démarré
- RUNNABLE : run() en cours d'exécution (mais la thread peut ne pas utiliser le CPU à l'instant t)
- BLOCKED : attente pour l'acquisition d'un moniteur/verrou (ressource en exclusion mutuelle)
- WAITING : attente de la réalisation d'une action par une autre thread (après un appel à wait() sur un objet), appel à une méthode bloquante d'E/S.
- Sortie de cet état par un appel à notify() sur l'objet, un appel à interrupt() sur le fil (provoque une InterruptedException) ou par une fermeture du flux d'E/S (lance une exception).
- TIMED_WAITING : idem que précédemment mais avec un délai limite
- TERMINATED : fin de l'exécution de la méthode run().
- L'instance de Thread continue d'exister tant qu'une référence vers elle subsiste.
Obtention des fils en cours d'exécutions
- Ctrl-\ permet d'afficher sur la sortie standard toutes les threads et leur état (utile en cas de blocage).
- jconsole propose une GUI d'affichage de threads (permet de diagnostiquer des interblocages).
- La méthode statique Thread.getCurrentThread() permet d'obtenir la thread courante (appelant la méthode).
- Thread.getCurrentThread().getThreadGroup() permet l'obtention du groupe d'appartenance de la thread courante.
- La méthode d'instance ThreadGroup.getParent() permet de connaître le groupe parent d'un groupe : en répétant l'opération, on obtient le groupe racine (dont le parent est null).
- int ThreadGroup.enumerate(Thread[] t) place tous les fils actifs du groupe (et de ses sous-groupes) dans le tableau en argument et retourne le nombre de threads actives.
Exemple d'affichage de fils actifs sur une JVM
Full thread dump OpenJDK 64-Bit Server VM (14.0-b08 mixed mode): "Low Memory Detector" daemon prio=10 tid=0x00000000016be000 nid=0x5751 runnable [0x0000000000000000..0x0000000000000000] java.lang.Thread.State: RUNNABLE "CompilerThread1" daemon prio=10 tid=0x00000000016bb000 nid=0x5750 waiting on condition [0x0000000000000000..0x00007fa24963f510] java.lang.Thread.State: RUNNABLE "CompilerThread0" daemon prio=10 tid=0x00000000016b7000 nid=0x574f waiting on condition [0x0000000000000000..0x00007fa249740480] java.lang.Thread.State: RUNNABLE "Signal Dispatcher" daemon prio=10 tid=0x00000000016b4800 nid=0x574e waiting on condition [0x0000000000000000..0x0000000000000000] java.lang.Thread.State: RUNNABLE "Finalizer" daemon prio=10 tid=0x0000000001694000 nid=0x574d in Object.wait() [0x00007fa249982000..0x00007fa249982be0] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x00007fa27df41210> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:133) - locked <0x00007fa27df41210> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:149) at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:177) "Reference Handler" daemon prio=10 tid=0x000000000168c800 nid=0x574c in Object.wait() [0x00007fa249a83000..0x00007fa249a83b60] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x00007fa27df41078> (a java.lang.ref.Reference$Lock) at java.lang.Object.wait(Object.java:502) at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133) - locked <0x00007fa27df41078> (a java.lang.ref.Reference$Lock) "main" prio=10 tid=0x0000000001623000 nid=0x5748 runnable [0x00007fa298610000..0x00007fa298610e60] java.lang.Thread.State: RUNNABLE at Test.main(Test.java:5) "VM Thread" prio=10 tid=0x0000000001687800 nid=0x574b runnable "GC task thread#0 (ParallelGC)" prio=10 tid=0x000000000162d800 nid=0x5749 runnable "GC task thread#1 (ParallelGC)" prio=10 tid=0x000000000162f800 nid=0x574a runnable "VM Periodic Task Thread" prio=10 tid=0x0000000001735800 nid=0x5752 waiting on condition JNI global references: 601 Heap PSYoungGen total 18432K, used 1581K [0x00007fa27df40000, 0x00007fa27f3d0000, 0x00007fa292940000) eden space 15808K, 10% used [0x00007fa27df40000,0x00007fa27e0cb560,0x00007fa27eeb0000) from space 2624K, 0% used [0x00007fa27f140000,0x00007fa27f140000,0x00007fa27f3d0000) to space 2624K, 0% used [0x00007fa27eeb0000,0x00007fa27eeb0000,0x00007fa27f140000) PSOldGen total 42240K, used 0K [0x00007fa254b40000, 0x00007fa257480000, 0x00007fa27df40000) object space 42240K, 0% used [0x00007fa254b40000,0x00007fa254b40000,0x00007fa257480000) PSPermGen total 21248K, used 2753K [0x00007fa24a340000, 0x00007fa24b800000, 0x00007fa254b40000) object space 21248K, 12% used [0x00007fa24a340000,0x00007fa24a5f06e0,0x00007fa24b800000)
Affichage JConsole
Applications multithreads
Serveur TCP en multithread
- Accepter les connexions entrantes de client dans un fil principal.
- Création d'au moins un fil par socket connectée à un client. Ce fil lit et écrit sur la socket. Si les opérations de lecture et écriture sont simultanées et non séquentielles, nécessité d'un fil pour l'InputStream et d'un autre fil pour l'OutputStream.
Squelette de code de serveur TCP concurrent
public class ConcurrentTCPServer { public class ClientCommunicator implements Runnable { private final Socket s; public ClientCommunicator(Socket s) { this.s = s; } @Override public void run() { try { InputStream is = s.getInputStream(); OutputStream os = s.getOutputStream(); ... } catch (IOException e) { e.printStackTrace(); } finally { s.close(); // do not forget to close the socket } } } private final int port; public ConcurrentTPCServer(int port) { this.port = port; } public void launch() throws IOException { try (ServerSocket ss = new ServerSocket(port);) // do not forget to close the ServerSocket { while (! Thread.interrupted()) { Socket s = ss.accept(); // Fetch the socket of the next client Runnable r = new ClientCommunicator(s); Thread t = new Thread(r); t.start(); } } } }
Crochet d'arrêt pour gérer la fermeture de l'application
Lorsqu'une exception est remontée par la méthode main ou qu'un signal de terminaison est intercepté par la JVM → arrêt brutal du programme.
Problème : certains fichiers ou communications socket peuvent rester dans un état inconsistant.
Un fichier ou une communication peut être terminée par du code spécifié par un crochet d'arrêt.
Gestion des crochets d'arrêt
- Ajout d'un crochet par spécification d'un fil à exécuter lors de la fermeture : Runtime.getRuntime().addShutdownHook(Thread hook).
- Suppression d'un crochet : Runtime.getRuntime().removeShutdownHook(Thread hook).
- Les fils des crochets sont démarrés dans un ordre quelconque et sont en concurrence lors de la fermeture demandée de l'application.
- Après l'exécution de tous les crochets d'arrêt...
- Le code d'un fil mis en crochet doit être exécuté rapidement sinon la terminaison forcée par le système de la JVM peut être nécessaire (signal Unix SIGKILL).
Gestion de l'exclusion mutuelle
Exclusion mutuelle
Certains objets doivent pouvoir être accessibles que par un unique fil lors de l'atteinte de certaines instructions.
Sinon risque d'état incohérent (exemple des peintres).
Instructions atomiques en Java
- Certaines instructions sont garanties atomiques (lecture ou écriture d'un int, d'une référence d'objet) mais d'autres pas (lecture ou écriture de long).
- Dans le doute, toujours supposer qu'une instruction n'est pas atomique (perte de performance préférable à un état incohérent)
- Dans tous les cas, une séquence d'instructions n'est jamais atomique.
- → Pour l'exclusion mutuelle, moniteurs (ou verrous) sont alors indispensables.
Moniteurs implicites (synchronized)
- Tout objet peut être moniteur.
- Lorsqu'un moniteur est acquis sur un objet par un fil, aucun autre fil ne pourra utiliser le moniteur de l'objet.
- Construction syntaxique :
- synchronized (obj) { code à protéger }.
- Le moniteur est acquis lors de l'entrée dans le bloc synchronized et est libéré à la sortie du bloc (que ce soit par sortie classique ou par levée d'exception).
- Une méthode peut être totalement synchronisée sur l'instance de l'objet par l'usage du mot-clé en tête de sa déclaration (exemple : public synchronized String toString() { ... }} équivalent à public String toString() { synchronized(this) { ... }}).
Moniteurs explicites : Lock
- Les classes implantant Lock peuvent servir de moniteurs plus évolués.
- Utilisation classique :
Lock l = new ReentrantLock(); // Creation du verrou ... l.lock(); // Verrouillage try { ... // Code a executer } finally { l.unlock(); // Dans tous les cas, deverrouillage }
Malheureusement pas de try-with-resources avec un Lock :(
Possibilités avancées de Lock
- boolean tryLock() : méthode non-bloquante essayant d'acquérir le verrou (si ce n'est pas possible retourne immédiatement faux). Peut permettre à un fil de réaliser des opérations alternatives en attendant avant de retenter le verrouillage.
- boolean tryLock(long time, TimeUnit unit) : variante autorisant un blocage pendant un temps indiqué (sauf interruption).
- Condition newCondition() : retourne une nouvelle condition associée au verrou. Le verrou peut ainsi avoir plusieurs conditions sur lesquelles on peut attendre un changement (Condition.await()) ou signaler un changement (Condition.signal() et Condition.signalAll()).
Verrou lecture/écriture : ReadWriteLock
- Classe implantant ReadWriteLock : ReentrantReadWriteLock
- Lock writeLock() fournit un verrou pour l'écriture.
- Lock readLock() fournit un verrou pour la lecture.
-
Les deux verrous sont liés :
- writeLock déjà acquis → ni writeLock ni readLock ne peut être acquis par un autre fil
- readLock déjà acquis → writeLock ne peut être acquis par un autre fil mais readLock peut être acquis.
- Utile pour protéger une structure dont la lecture est plus fréquente que l'écriture.
Attente et signalisation
Attente
- Possibilité de libérer temporairement un moniteur ou verrou en attente de satisfaction d'une condition par while (! condition) object.wait();
- Un wait() est interruptible par un appel à interrupt() sur la thread : lancement de InterruptedException.
Signalisation
- object.notify() réveille un fil en attente sur le moniteur qui peut reprendre le moniteur.
- object.notifyAll() réveille tous les fils en attente sur le moniteur et les place en compétition pour le réacquérir.
- La classe Condition permet un mécanisme d'attente/signalisation avec plusieurs conditions par verrou : évite un réveil inutile de certaines threads.
- Possibilité de sortir d'une attente intempestivement : toujours vérifier la satisfaction de la condition à la sortie d'attente (boucle conditionnelle)
Comparaison attente/signalisation sur Object et Condition
Object | Condition | |
Attente temps indéterminé | void wait() | void await() |
Attente temps borné | void wait(long timeoutMillis) | void await(long time, TimeUnit unit) |
Attente sans interruption | Relancement de void wait() à l'interruption | waitUninterruptibly() |
Signalisation unitaire | void notify() | void signal() |
Signalisation multiple | void notifyAll() | void signalAll() |
Plusieurs files d'attente | Impossible | Possible |
Récapitulatif des avantages des verrous (vs. moniteurs)
- Méthode non-bloquante d'acquisition de verrou échouant immédiatement si le verrou n'est pas disponible (Lock.tryLock())
- Moniteur : une seule condition / verrou : plusieurs conditions possibles
- → possibilité de cibler les notifications vers une condition
- Équitabilité possible (fairness) pour l'acquisition d'un verrou : premier arrivé, premier servi (file d'attente)
- Existence de verrous couplés lecture/écriture (ReadWriteLock)
Une petite pause
- Q : Comment créer une thread réalisant des calculs qui puisse être mise temporairement en pause ?
- R : En utilisant un moniteur
Thread contrôleur :
ComputationThread t = new ComputationThread(); t.start(); while (t.isAlive()) { t.escapePause(); Thread.sleep(WORKING_TIME); t.enterPause(); Thread.sleep(PAUSE_TIME) }
Thread de calcul :
public void run() { while (! Thread.interrupted()) { try { executePauseIfRequested(); } catch (InterruptedException e) { break; } doComputation(); } } private boolean pauseRequested = false; private synchronized void executePauseIfRequested() throws InterruptedException { while (pauseRequested) this.wait(); // Wait until notification, be cautious of spurious wake-ups! } public synchronized void enterPause() { this.pauseRequested = true; } public synchronized void exitPause() { pauseRequested = false; this.notify(); // We notify the thread to exit from the pause }
Le problème du parking (ou sémaphore)
Énoncé du problème
Soit un parking abritant des voitures (en fait des threads) avec :
- c places classiques
- h places réservées à des handicapés
Écrire une classe ParkingLot disposant des méthodes suivantes :
- takePlace() appelée par un conducteur non handicapé pour prendre une place
- takeHandicappedPlace() appelée pour le stationnement d'un conducteur handicapé
Ces deux méthodes retournent un objet ParkingPlace implantant l'interface AutoCloseable : on appelle close() pour libérer la place
Exigences :
- takePlace() bloque jusqu'à ce qu'une place classique soit libre
- takeHandicappedpPlace() bloque jusqu'à la l'existence d'une place classique (en priorité) OU handicapé libre
ParkingLot
package fr.upem.jacosa.threads; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ParkingLot { int remainingClassical; int remainingHandicapped; Lock lock; Condition classicalFree; Condition anyFree; public ParkingLot(int c, int h) { this.lock = new ReentrantLock(); this.classicalFree = lock.newCondition(); this.anyFree = lock.newCondition(); this.remainingClassical = c; this.remainingHandicapped = h; } public ParkingPlace takePlace() throws InterruptedException { lock.lock(); try { while (remainingClassical == 0) classicalFree.await(); remainingClassical--; return new ParkingPlace(this, false); } finally { lock.unlock(); } } public ParkingPlace takeHandicapedPlace() throws InterruptedException { lock.lock(); try { while (remainingClassical == 0 && remainingHandicapped == 0) anyFree.await(); boolean handicapped = remainingClassical == 0; if (handicapped) remainingHandicapped--; else remainingClassical--; return new ParkingPlace(this, handicapped); } finally { lock.unlock(); } } }
ParkingPlace
package fr.upem.jacosa.threads; public class ParkingPlace implements AutoCloseable { private boolean handicapped; private ParkingLot lot; public ParkingPlace(ParkingLot lot, boolean handicapped) { this.handicapped = handicapped; this.lot = lot; } public void close() { lot.lock.lock(); try { if (handicapped) lot.remainingHandicapped++; else lot.remainingClassical++; lot.anyFree.signal(); if (! handicapped) lot.classicalFree.signal(); } finally { lot.lock.unlock(); } } }
Le parking du sémaphore IRL

Gestion des threads avec les ExecutorService
Des Runnable et Callable plutôt que des threads
- Notion de thread trop bas niveau : on souhaite seulement soumettre des tâches sans se préoccuper de threads
-
Deux types de tâches :
- Runnable (que l'on connaît déjà...enfin normalement)
- Callable<V> : interface avec méthode call() à implanter retournant un objet de type V
-
On soumet un Runnable ou Callable à un ExecutorService :
- Future<V> submit(Callable<T> callable)
- Future<?> submit(Runnable runnable)
- submit retourne immédiatement un Future<V> pour suivre l'exécution de la tâche, interagir avec elle et récupérer son résultat
Création d'ExecutorService avec Executors
- newCachedThreadPool() : création de threads au fil des besoins avec conservation en cache des threads créées
- newFixedThreadPool(int n) : création d'une piscine de $n$ threads (toutes les threads occupées → tâche mise en file d'attente)
- newSingleThreadExecutor() ⇔ newFixedThreadPool(1)
Ces méthodes existent avec un argument supplémentaire ThreadFactory permettant de personnaliser le constructeur de threads.
Un Grep récursif
package fr.upem.jacosa.threads; import java.util.*; import java.io.*; import java.util.concurrent.*; import java.util.regex.*; public class Grep { final private List<File> files; final private Pattern pattern; final private ExecutorService executorService; public Grep(File directory, Pattern pattern, ExecutorService executorService) { this.files = new ArrayList<File>(); getFiles(directory, files); this.pattern = pattern; this.executorService = executorService; } public static void getFiles(File file, List<File> files) { if (file.isDirectory()) for (File f: file.listFiles()) getFiles(f, files); else if (file.isFile()) files.add(file); } public Callable<List<MatchResult>> getGrepTask(final File f) { return new Callable<List<MatchResult>>() { public List<MatchResult> call() throws IOException { StringBuilder sb = new StringBuilder(); Reader r = new FileReader(f); char[] buffer = new char[1024]; int read = 0; while ((read = r.read(buffer)) > 0) sb.append(buffer, 0, read); // Append in the buffer Matcher m = pattern.matcher(sb.toString()); List<MatchResult> result = new ArrayList<MatchResult>(); while (m.find()) result.add(m.toMatchResult()); return result; } }; } public static <K> void increment(Map<K,Integer> map, K element) { Integer c = map.get(element); int c2 = (c == null)?0:c; map.put(element, c2 + 1); } public void execute() throws Exception { List<Future<List<MatchResult>>> results = new ArrayList<Future<List<MatchResult>>>(); for (File file: files) results.add(executorService.submit(getGrepTask(file))); executorService.shutdown(); long start = System.nanoTime(); while (! executorService.awaitTermination(1L, TimeUnit.SECONDS)) System.out.println((System.nanoTime() - start) + " ns elapsed"); SortedMap<String, Integer> counterMap = new TreeMap<String, Integer>(); for (Future<List<MatchResult>> result: results) for (MatchResult mr: result.get()) increment(counterMap, mr.group(0)); for (Map.Entry<String, Integer> entry: counterMap.entrySet()) System.out.println(entry.getKey() + "\t" + entry.getValue()); } public static void main(String[] args) throws Exception { Pattern pattern = Pattern.compile(args[0]); File rootFile = new File(args[1]); ExecutorService exs = null; String kind = args[2]; if (kind.equals("cached")) exs = Executors.newCachedThreadPool(); else exs = Executors.newFixedThreadPool(Integer.parseInt(kind)); new Grep(rootFile, pattern, exs).execute(); } }
Content not available
ScheduledExecutorService
Permet de soumettre une tâche à déclencher sous un certain délai une fois ou périodiquement :
- ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
- ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long initialDelay, long period, TimeUnit unit)
- ScheduledFuture<?> scheduleAtFixedDelay(Runnable runnable, long initialDelay, long delay, TimeUnit unit)
Il n'y a pas de garantie de respect de délais (en particulier si le ScheduledExecutorService est saturé)
Création :
- Executors.newScheduledThreadPool(int n) : utilisation d'une piscine de n threads
Un multi compte à rebours
package fr.upem.jacosa.threads; import java.util.concurrent.*; import java.util.concurrent.atomic.*; public class Countdown { public static void main(String[] args) throws InterruptedException { final long start = System.nanoTime(); final ScheduledExecutorService ses = Executors.newScheduledThreadPool(1); int i = 0; final AtomicInteger counter = new AtomicInteger(0); for (String arg: args) { final int id = i++; final int deadline = Integer.parseInt(arg); // in seconds final ScheduledFuture<?> future = ses.scheduleAtFixedRate(new Runnable() { public void run() { System.out.println( String.format("Remaining time for timer #%d: %d s.", id, deadline - (System.nanoTime() - start) / 1000000000)); } }, 0, 1, TimeUnit.SECONDS); counter.incrementAndGet(); ses.schedule(new Callable<Object>() { public Object call() { System.out.println( String.format("End of timer #%d after %d seconds (late offset: %d ns.)", id, deadline, System.nanoTime() - start - deadline * 1000000000L )); future.cancel(true); if (counter.decrementAndGet() == 0) ses.shutdown(); // End of executor service return null; } }, deadline, TimeUnit.SECONDS); } ses.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); // We wait for the end of execution } }
Processus externes
Processus externes
-
On peut créer un ProcessBuilder : ProcessBuilder pb = ProcessBuilder(String... command)
- command est l'exécutable avec ses arguments (ex: ["tar", "-c", "-z", "-f", "mytar.tar.gz", "directory"])
-
On peut le configurer :
- pb.environment() retourne une Map que l'on peut modifier avec les variables d'environnement
- pb.directory("workingDir") pour fixer le répertoire courant de travail
- pb.redirect{Input, Output, Error(f)} pour rediriger stdin, stdout ou stderr vers un fichier (redirection par défaut sur un pipe)
- Et enfin démarrer le processus avec Process p = pb.start()
-
Le processus externe peut ensuite être contrôlé depuis la thread courante :
- p.waitFor() attend la fin d'exécution du processus (retour : entier avec code de résultat, 0 si OK).
- p.getInputStream() retourne une InputStream pour lire sur stdout.
- p.getOutputStream() retourne une OutputStream pour écrire sur stdin.
- p.getErrorStream() retourne une InputStream pour lire sur stderr.
- p.destroy() permet de détruire brutalement le processus.
- p.exitValue() retourne le code du processus ou lève une IllegalThreadStateException si la thread n'est pas terminée.
Exemple : le tuyauteur
On souhaite écrire un programme Java lançant plusieurs processus et connectant la sortie (stdout) du processus précédent avec l'entrée (stdin) du processus suivant, à la manière d'un pipe de shell.
Exemple d'utilisation : pour afficher tous les mots triés d'un texte :java Piper cat monfichier.txt "|" egrep -o "[a-zA-Z]+" "|" sort "|" uniq``
Solution possible
import java.util.*; import java.io.*; public class Piper { public static final int BUFF_LEN = 1024; public static Runnable getTransferer(final InputStream in, final OutputStream out) { return new Runnable(){ @Override public void run() { try { byte[] buff = new byte[BUFF_LEN]; for (int i = in.read(buff); i >= 0; i = in.read(buff)) { out.write(buff, 0, i); } } catch (IOException e) { e.printStackTrace(); } finally { try { in.close(); out.close(); } catch (IOException e) { e.printStackTrace(); } } } }; } public static int execute(List<List<String>> commands) throws IOException, InterruptedException { final List<Process> processes = new ArrayList<Process>(); for (List<String> command: commands) processes.add(new ProcessBuilder(command).start()); for (int i = 0; i < processes.size(); i++) { final InputStream currentOut = processes.get(i).getInputStream(); final OutputStream followingIn = (i+1 < processes.size())? processes.get(i+1).getOutputStream():System.out; new Thread(getTransferer(currentOut, followingIn)).start(); } return processes.get(processes.size() - 1).waitFor(); } public static void main(String[] args) throws Exception { List<List<String>> l = new ArrayList<List<String>>(); List<String> command = new ArrayList<String>(); for (String arg: args) { if (arg.equals("|")) { l.add(command); command = new ArrayList<String>(); } else command.add(arg); } l.add(command); System.exit(execute(l)); } }
Un bon tuyau pour conclure
Un tube pour les flots
- Communication possible entre threads par un tube acheminant un flot d'octets
-
Mise en place :
- Initialisation de l'entrée du tube :
- OutputStream inPipe = new PipedOutputStream()
- Initialisation de la sortie du tube :
- PipedInputStream outPipe = new PipedInputStream(bufferSizeInBytes)
- Connexion de deux extrémités du tube :
- inPipe.connect(outPipe)
- On écrit ensuite des octets sur inPipe depuis une thread...
- ...et on les lit depuis une autre sur outPipe
- Existe également pour les chars : PipedWriter et PipedReader
À voir... si l'on a du temps
- ForkJoinTask pour les tâches subdivisibles
- Les collections supportant la concurrence (BlockingQueue, ConcurrentMap,...)
- Les variables atomiques de java.util.concurrent.atomic
- ...et bien d'autres choses très intéressantes !