image/svg+xml $ $ ing$ ing$ ces$ ces$ Res Res ea ea Res->ea ou ou Res->ou r r ea->r ch ch ea->ch r->ces$ r->ch ch->$ ch->ing$ T T T->ea ou->r

Introduction à la programmation concurrente

  1. Un peu de théorie sur la programmation concurrente
    1. Le serveur, son flux d'exécution et l'API bloquante
    2. Intérêt de la programmation concurrente
    3. Gestion de multiples flux d'entrée/sortie
    4. Réception de données de deux clients
    5. Processus et threads
    6. Les peintres
    7. Verrouillage de ressources du tas
    8. Ça se complique : les philosophes souhaitent manger
      1. Interblocage, bilan : 5 philosophes morts de faim
      2. Famine
  2. La classe Thread
    1. Construction d'une instance de Thread
    2. Exemple : thread d'incrémentation de compteur
    3. La thread interminable ou l'utilité de volatile
    4. Vers une solution propre pour arrêter une thread : interrupt()
    5. Thread d'incrémentation avec une expression lambda
    6. Mesure du temps dans une thread
    7. Mise en sommeil d'une thread
    8. À propos d'InterruptedException
    9. Propriétés des threads Java
    10. État d'une thread : Thread.State
    11. Obtention des fils en cours d'exécutions
    12. Exemple d'affichage de fils actifs sur une JVM
    13. Affichage JConsole
  3. Applications multithreads
    1. Serveur TCP en multithread
    2. Squelette de code de serveur TCP concurrent
    3. Crochet d'arrêt pour gérer la fermeture de l'application
    4. Gestion des crochets d'arrêt
  4. Gestion de l'exclusion mutuelle
    1. Exclusion mutuelle
      1. Instructions atomiques en Java
    2. Moniteurs implicites (synchronized)
    3. Moniteurs explicites : Lock
    4. Possibilités avancées de Lock
    5. Verrou lecture/écriture : ReadWriteLock
  5. Attente et signalisation
    1. Attente
    2. Signalisation
    3. Comparaison attente/signalisation sur Object et Condition
    4. Récapitulatif des avantages des verrous (vs. moniteurs)
    5. Une petite pause
    6. Le problème du parking (ou sémaphore)
      1. Énoncé du problème
      2. ParkingLot
      3. ParkingPlace
      4. Le parking du sémaphore IRL
  6. Gestion des threads avec les ExecutorService
    1. Des Runnable et Callable plutôt que des threads
    2. Création d'ExecutorService avec Executors
    3. Un Grep récursif
    4. ScheduledExecutorService
    5. Un multi compte à rebours
  7. Processus externes
    1. Processus externes
    2. Exemple : le tuyauteur
      1. Solution possible
  8. Un bon tuyau pour conclure
    1. Un tube pour les flots
    2. À voir... si l'on a du temps

Un peu de théorie sur la programmation concurrente

Le serveur, son flux d'exécution et l'API bloquante

Le serveur bloque sur la réception de données d'un client, mais comment faire :

Intérêt de la programmation concurrente

Gestion de multiples flux d'entrée/sortie

Première approche inefficace : utiliser un fil avec des méthodes d'E/S bloquantes avec timeout faible : traitement séquentiel des flots nécessaire.
Deux grandes familles d'approches praticables :

Réception de données de deux clients

Ordonnancement pour la réception de données de plusieurs clients

Processus et threads

Les peintres

Exemple des threads peintres

Verrouillage de ressources du tas

Plusieurs threads pouvant lire/écrire sur un objet du tas :

Ça se complique : les philosophes souhaitent manger

Cinq philosophes pensent et mangent (mais pas les deux à la fois car équipés d'un cerveau monothread).
Pour manger, deux ressources du tas nécessaire : une fourchette gauche et une fourchette droite (pour saisir des spagghetis).
Workflow pour manger :

Philosophes à table

Source : Benjamin D. Esham / Wikimedia Commons

Interblocage, bilan : 5 philosophes morts de faim

Famine

La classe Thread

Construction d'une instance de Thread

La classe Thread représente un fil que l'on peut créer de deux manières :

Une fois la classe (ou un descendant de) Thread instantiée, le fil commence son exécution par un appel à void start().

La méthode run ne doit pas lever d'exception vérifiée (telle qu'une IOException) ; il faut donc capturer ce type d'exceptions dans un bloc try-catch.

Exemple : thread d'incrémentation de compteur

Classe dérivée de Thread

public class IncrementingThread extends Thread
{
	public long counter = 0;
	public boolean finished = false;
	
	public IncrementingThread()
	{
		super();
	}

	public void run()
	{
		while (! finished)
			counter++;
		System.out.println("Counter value: " + counter);
	}
}
...
IncrementingThread t = new IncrementingThread();
t.start();
Thread.sleep(WAIT_TIME);
t.finished = true;
...

La thread interminable ou l'utilité de volatile

Horreur, la thread d'incrémentation précédente ne s'arrête jamais ! Pourquoi ?

Vers une solution propre pour arrêter une thread : interrupt()

Méthode interrupt conçue pour interrompre une thread.
Utilisation :

Thread d'incrémentation avec une expression lambda

final int[] counter = new int[]{0};
Runnable r = () -> {
	while (! Thread.interrupted())
		counter[0]++;
	System.out.println("Counter value: " + counter[0]);
};
Thread t = new Thread(r);
t.start();
Thread.sleep(WAIT_TIME);
t.interrupt();
...

Astuce : utilisation d'un tableau d'int à 1 cellule pour passage d'un entier par référence (une variable locale de la méthode englobante n'est pas modifiable par une expression lambda ou une classe anonyme).

Mesure du temps dans une thread

Comment mesurer le temps écoulé pour l'exécution de code dans une thread ?

Mise en sommeil d'une thread

Exercice :

soient 3 threads current, t1 et t2. Nous sommes dans current et souhaitons attendre :

Solution :

try {
	int start = System.nanoTime(); // en nanosecondes
	Thread.sleep(min); // attendons au moins min millisecondes
	int remaining = max - min;
	t1.join(remaining); // devrait retourner immédiatement si t1 est déjà terminée.
	remaining = Math.max(0, (int)(max - (System.nanoTime() - start) / 1_000_000L));
	t2.join(remaining);
} catch (InterruptedException e)
{
	// an interruption has been made on the thread
}

À propos d'InterruptedException

Propriétés des threads Java

État d'une thread : Thread.State

Cinq états possibles pour une thread Java :

Obtention des fils en cours d'exécutions

Exemple d'affichage de fils actifs sur une JVM

Full thread dump OpenJDK 64-Bit Server VM (14.0-b08 mixed mode):

"Low Memory Detector" daemon prio=10 tid=0x00000000016be000 nid=0x5751 runnable [0x0000000000000000..0x0000000000000000]
   java.lang.Thread.State: RUNNABLE                                                                                     

"CompilerThread1" daemon prio=10 tid=0x00000000016bb000 nid=0x5750 waiting on condition [0x0000000000000000..0x00007fa24963f510]
   java.lang.Thread.State: RUNNABLE                                                                                             

"CompilerThread0" daemon prio=10 tid=0x00000000016b7000 nid=0x574f waiting on condition [0x0000000000000000..0x00007fa249740480]
   java.lang.Thread.State: RUNNABLE                                                                                             

"Signal Dispatcher" daemon prio=10 tid=0x00000000016b4800 nid=0x574e waiting on condition [0x0000000000000000..0x0000000000000000]
   java.lang.Thread.State: RUNNABLE                                                                                               

"Finalizer" daemon prio=10 tid=0x0000000001694000 nid=0x574d in Object.wait() [0x00007fa249982000..0x00007fa249982be0]
   java.lang.Thread.State: WAITING (on object monitor)                                                                
        at java.lang.Object.wait(Native Method)                                                                       
        - waiting on <0x00007fa27df41210> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:133)
        - locked <0x00007fa27df41210> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:149)
        at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:177)

"Reference Handler" daemon prio=10 tid=0x000000000168c800 nid=0x574c in Object.wait() [0x00007fa249a83000..0x00007fa249a83b60]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x00007fa27df41078> (a java.lang.ref.Reference$Lock)
        at java.lang.Object.wait(Object.java:502)
        at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133)
        - locked <0x00007fa27df41078> (a java.lang.ref.Reference$Lock)

"main" prio=10 tid=0x0000000001623000 nid=0x5748 runnable [0x00007fa298610000..0x00007fa298610e60]
   java.lang.Thread.State: RUNNABLE
        at Test.main(Test.java:5)

"VM Thread" prio=10 tid=0x0000000001687800 nid=0x574b runnable

"GC task thread#0 (ParallelGC)" prio=10 tid=0x000000000162d800 nid=0x5749 runnable

"GC task thread#1 (ParallelGC)" prio=10 tid=0x000000000162f800 nid=0x574a runnable

"VM Periodic Task Thread" prio=10 tid=0x0000000001735800 nid=0x5752 waiting on condition

JNI global references: 601

Heap
 PSYoungGen      total 18432K, used 1581K [0x00007fa27df40000, 0x00007fa27f3d0000, 0x00007fa292940000)
  eden space 15808K, 10% used [0x00007fa27df40000,0x00007fa27e0cb560,0x00007fa27eeb0000)
  from space 2624K, 0% used [0x00007fa27f140000,0x00007fa27f140000,0x00007fa27f3d0000)
  to   space 2624K, 0% used [0x00007fa27eeb0000,0x00007fa27eeb0000,0x00007fa27f140000)
 PSOldGen        total 42240K, used 0K [0x00007fa254b40000, 0x00007fa257480000, 0x00007fa27df40000)
  object space 42240K, 0% used [0x00007fa254b40000,0x00007fa254b40000,0x00007fa257480000)
 PSPermGen       total 21248K, used 2753K [0x00007fa24a340000, 0x00007fa24b800000, 0x00007fa254b40000)
  object space 21248K, 12% used [0x00007fa24a340000,0x00007fa24a5f06e0,0x00007fa24b800000)

Affichage JConsole

JConsole

Applications multithreads

Serveur TCP en multithread

Squelette de code de serveur TCP concurrent

public class ConcurrentTCPServer
{
	public class ClientCommunicator implements Runnable
	{
		private final Socket s;

		public ClientCommunicator(Socket s)
		{
			this.s = s;
		}

		@Override
		public void run()
		{
			try {
				InputStream is = s.getInputStream();
				OutputStream os = s.getOutputStream();
				...
			} catch (IOException e)
			{
				e.printStackTrace();
			} finally
			{
				s.close(); // do not forget to close the socket
			}
		}
	}

	private final int port;

	public ConcurrentTPCServer(int port)
	{
		this.port = port;
	}

	public void launch() throws IOException
	{
		try (ServerSocket ss = new ServerSocket(port);) // do not forget to close the ServerSocket
		{
			while (! Thread.interrupted())
			{
				Socket s = ss.accept(); // Fetch the socket of the next client
				Runnable r = new ClientCommunicator(s);
				Thread t = new Thread(r);
				t.start();
			}
		}
	}
}

Crochet d'arrêt pour gérer la fermeture de l'application

Lorsqu'une exception est remontée par la méthode main ou qu'un signal de terminaison est intercepté par la JVM → arrêt brutal du programme.
Problème : certains fichiers ou communications socket peuvent rester dans un état inconsistant.
Un fichier ou une communication peut être terminée par du code spécifié par un crochet d'arrêt.

Gestion des crochets d'arrêt

Gestion de l'exclusion mutuelle

Exclusion mutuelle

Certains objets doivent pouvoir être accessibles que par un unique fil lors de l'atteinte de certaines instructions.
Sinon risque d'état incohérent (exemple des peintres).

Instructions atomiques en Java

Moniteurs implicites (synchronized)

Moniteurs explicites : Lock

Lock l = new ReentrantLock(); // Creation du verrou
...
l.lock(); // Verrouillage
try {
	... // Code a executer
} finally {
	l.unlock(); // Dans tous les cas, deverrouillage
}

Malheureusement pas de try-with-resources avec un Lock :(

Possibilités avancées de Lock

Verrou lecture/écriture : ReadWriteLock

Attente et signalisation

Attente

Signalisation

Comparaison attente/signalisation sur Object et Condition

Object Condition
Attente temps indéterminé void wait() void await()
Attente temps borné void wait(long timeoutMillis) void await(long time, TimeUnit unit)
Attente sans interruption Relancement de void wait() à l'interruption waitUninterruptibly()
Signalisation unitaire void notify() void signal()
Signalisation multiple void notifyAll() void signalAll()
Plusieurs files d'attente Impossible Possible

Récapitulatif des avantages des verrous (vs. moniteurs)

Une petite pause

Thread contrôleur :

ComputationThread t = new ComputationThread();
t.start();
while (t.isAlive())
{
	t.escapePause();			
	Thread.sleep(WORKING_TIME);
	t.enterPause();
	Thread.sleep(PAUSE_TIME)
}

Thread de calcul :

public void run()
{
	while (! Thread.interrupted())
	{
		try {
			executePauseIfRequested();
		} catch (InterruptedException e)
		{
			break;
		}
		doComputation();
	}
}

private boolean pauseRequested = false;

private synchronized void executePauseIfRequested() throws InterruptedException
{
	while (pauseRequested)
		this.wait(); // Wait until notification, be cautious of spurious wake-ups!
}

public synchronized void enterPause()
{
	this.pauseRequested = true;
}

public synchronized void exitPause()
{
	pauseRequested = false;
	this.notify(); // We notify the thread to exit from the pause
}

Le problème du parking (ou sémaphore)

Énoncé du problème

Soit un parking abritant des voitures (en fait des threads) avec :

Écrire une classe ParkingLot disposant des méthodes suivantes :

Ces deux méthodes retournent un objet ParkingPlace implantant l'interface AutoCloseable : on appelle close() pour libérer la place

Exigences :

ParkingLot

package fr.upem.jacosa.threads;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ParkingLot
{
	int remainingClassical;
	int remainingHandicapped;
	
	Lock lock;
	Condition classicalFree;
	Condition anyFree;
	
	public ParkingLot(int c, int h)
	{
		this.lock = new ReentrantLock();
		this.classicalFree = lock.newCondition();
		this.anyFree = lock.newCondition();
		this.remainingClassical = c;
		this.remainingHandicapped = h;
	}
	
	public ParkingPlace takePlace() throws InterruptedException
	{
		lock.lock();
		try {
			while (remainingClassical == 0)
				classicalFree.await();
			remainingClassical--;
			return new ParkingPlace(this, false);
		} finally
		{
			lock.unlock();
		}
	}
	
	public ParkingPlace takeHandicapedPlace() throws InterruptedException
	{
		lock.lock();
		try {
			while (remainingClassical == 0 && remainingHandicapped == 0)
				anyFree.await();
			boolean handicapped = remainingClassical == 0;
			if (handicapped) remainingHandicapped--; else remainingClassical--;
			return new ParkingPlace(this, handicapped);
		} finally
		{
			lock.unlock();
		}
	}
}

ParkingPlace

package fr.upem.jacosa.threads;

public class ParkingPlace implements AutoCloseable
{
	private boolean handicapped;
	private ParkingLot lot;
	
	public ParkingPlace(ParkingLot lot, boolean handicapped) {
		this.handicapped = handicapped;
		this.lot = lot;
	}
	
	public void close() {
		lot.lock.lock();
		try {
			if (handicapped) lot.remainingHandicapped++;
			else lot.remainingClassical++;
			lot.anyFree.signal();
			if (! handicapped) lot.classicalFree.signal();
		} finally
		{
			lot.lock.unlock();
		}
	}
}

Le parking du sémaphore IRL

Le parking du sémaphore

Source

Gestion des threads avec les ExecutorService

Des Runnable et Callable plutôt que des threads

Création d'ExecutorService avec Executors

Ces méthodes existent avec un argument supplémentaire ThreadFactory permettant de personnaliser le constructeur de threads.

Un Grep récursif

package fr.upem.jacosa.threads;

import java.util.*;
import java.io.*;
import java.util.concurrent.*;
import java.util.regex.*;

public class Grep
{
	final private List<File> files;
	final private Pattern pattern;
	final private ExecutorService executorService;

	public Grep(File directory, Pattern pattern, ExecutorService executorService)
	{
		this.files = new ArrayList<File>();
		getFiles(directory, files);
		this.pattern = pattern;
		this.executorService = executorService;
	}

	public static void getFiles(File file, List<File> files)
	{
		if (file.isDirectory())
			for (File f: file.listFiles())
				getFiles(f, files);
		else if (file.isFile())
			files.add(file);
	}

	public Callable<List<MatchResult>> getGrepTask(final File f)
	{
		return new Callable<List<MatchResult>>()
		{
			public List<MatchResult> call() throws IOException
			{
				StringBuilder sb = new StringBuilder();
				Reader r = new FileReader(f);
				char[] buffer = new char[1024];
				int read = 0;
				while ((read = r.read(buffer)) > 0)
					sb.append(buffer, 0, read); // Append in the buffer
				Matcher m = pattern.matcher(sb.toString());
				List<MatchResult> result = new ArrayList<MatchResult>();
				while (m.find()) result.add(m.toMatchResult());
				return result;
			}
		};
	}

	public static <K> void increment(Map<K,Integer> map, K element)
	{
		Integer c = map.get(element);
		int c2 = (c == null)?0:c;
		map.put(element, c2 + 1);
	}

	public void execute() throws Exception
	{
		List<Future<List<MatchResult>>> results = new ArrayList<Future<List<MatchResult>>>();
		for (File file: files)
			results.add(executorService.submit(getGrepTask(file)));
		executorService.shutdown();
		long start = System.nanoTime();
		while (! executorService.awaitTermination(1L, TimeUnit.SECONDS))
			System.out.println((System.nanoTime() - start) + " ns elapsed");
		SortedMap<String, Integer> counterMap = new TreeMap<String, Integer>();
		for (Future<List<MatchResult>> result: results)
			for (MatchResult mr: result.get())
				increment(counterMap, mr.group(0));
		for (Map.Entry<String, Integer> entry: counterMap.entrySet())
			System.out.println(entry.getKey() + "\t" + entry.getValue());
	}

	public static void main(String[] args) throws Exception
	{
		Pattern pattern = Pattern.compile(args[0]);
		File rootFile = new File(args[1]);
		ExecutorService exs = null;
		String kind = args[2];
		if (kind.equals("cached"))
			exs = Executors.newCachedThreadPool();
		else
			exs = Executors.newFixedThreadPool(Integer.parseInt(kind));
		new Grep(rootFile, pattern, exs).execute();
	}
}

Content not available

ScheduledExecutorService

Permet de soumettre une tâche à déclencher sous un certain délai une fois ou périodiquement :

Il n'y a pas de garantie de respect de délais (en particulier si le ScheduledExecutorService est saturé)
Création :

Un multi compte à rebours

package fr.upem.jacosa.threads;

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class Countdown
{
	public static void main(String[] args) throws InterruptedException
	{
		final long start = System.nanoTime();
		final ScheduledExecutorService ses = Executors.newScheduledThreadPool(1);
		int i = 0;
		final AtomicInteger counter = new AtomicInteger(0);
		for (String arg: args)
		{
			final int id = i++;
			final int deadline = Integer.parseInt(arg); // in seconds
			final ScheduledFuture<?> future = ses.scheduleAtFixedRate(new Runnable()
			{
				public void run()
				{
					System.out.println(
						String.format("Remaining time for timer #%d: %d s.", id, 
							deadline - (System.nanoTime() - start) / 1000000000));
				}
			}, 0, 1, TimeUnit.SECONDS);
			counter.incrementAndGet();
			ses.schedule(new Callable<Object>()
			{
				public Object call()
				{
					System.out.println(
						String.format("End of timer #%d after %d seconds (late offset: %d ns.)", id, 
							deadline, System.nanoTime() - start - deadline * 1000000000L ));
					future.cancel(true);
					if (counter.decrementAndGet() == 0) ses.shutdown(); // End of executor service
					return null;
				}
			}, deadline, TimeUnit.SECONDS);
		}
		ses.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); // We wait for the end of execution
	}
}

Processus externes

Processus externes

Exemple : le tuyauteur

On souhaite écrire un programme Java lançant plusieurs processus et connectant la sortie (stdout) du processus précédent avec l'entrée (stdin) du processus suivant, à la manière d'un pipe de shell.

Exemple d'utilisation : pour afficher tous les mots triés d'un texte :
java Piper cat monfichier.txt "|" egrep -o "[a-zA-Z]+" "|" sort "|" uniq``

Solution possible

import java.util.*;
import java.io.*;

public class Piper
{
	public static final int BUFF_LEN = 1024;
	
	public static Runnable getTransferer(final InputStream in, final OutputStream out)
	{
		return new Runnable(){
			@Override
			public void run()
			{
				try {
					byte[] buff = new byte[BUFF_LEN];
					for (int i = in.read(buff); i >= 0; i = in.read(buff))
					{
						out.write(buff, 0, i);
					}
				} catch (IOException e)
				{
					e.printStackTrace();
				} finally
				{
					try {
						in.close();
						out.close();
					} catch (IOException e)
					{
						e.printStackTrace();
					}
				}
			}
		};
	}

	public static int execute(List<List<String>> commands) throws IOException, InterruptedException
	{
		final List<Process> processes = new ArrayList<Process>();
		for (List<String> command: commands)
			processes.add(new ProcessBuilder(command).start());
		for (int i = 0; i < processes.size(); i++)
		{
			final InputStream currentOut = processes.get(i).getInputStream();
			final OutputStream followingIn = (i+1 < processes.size())?
				processes.get(i+1).getOutputStream():System.out;
			new Thread(getTransferer(currentOut, followingIn)).start();
		}
		return processes.get(processes.size() - 1).waitFor();
	}
	
	public static void main(String[] args) throws Exception
	{
		List<List<String>> l = new ArrayList<List<String>>();
		List<String> command = new ArrayList<String>();
		for (String arg: args)
		{
			if (arg.equals("|"))
			{
				l.add(command);
				command = new ArrayList<String>();
			} else
				command.add(arg);
		}
		l.add(command);
		System.exit(execute(l));
	}
}

Un bon tuyau pour conclure

Un tube pour les flots

À voir... si l'on a du temps