:: Enseignements :: ESIPE :: E4INFO :: 2022-2023 :: Collections Concurrentes ::
[LOGO]

Fork/Join


Fork/Join pool et Décomposition Récursive d'un problème en vue de sa parallèlisation.

Exercice 1 - Fork Join et calcul du minimum

Le but de cet exercice est de calculer le minimum des valeurs d'un tableau en répartissant le calcul sur plusieurs threads.

Les tests unitaires MinForkJoinTest.java

  1. On souhaite créer une classe fr.umlv.conc.MinForkJoin ainsi qu'une méthode minLoop ayant la signature suivante
           public static int minLoop(int[] array, int start, int end)
          
    qui calcul le minimum des valeurs du tableau array entre start et end (start inclu, end exclu).
    On peut noter que le minimum n'existe pas si la partie de tableau définie entre start et end est vide, pour cela, on lèvera l'exception IllegalArgumentException si start == end. Il faut aussi vérifier que 0 <= start <= end < length, il existe pour cela la méthode Objects.checkFromToIndex(...).
    Créer la classe MinForkJoin et implanter la méthode minLoop en utilisant une boucle et vérifier que les tests marqués Q1 passent.
  2. On souhaite maintenant écrire une méthode minStream ayant la même signature que minLoop et qui utilise un stream en term d'implantation.
    Écrire la méthode minStream et vérifier que les tests marqués Q2 passent.
    Note: la méthode Arrays.stream a une version qui prend des index en paramètre !
  3. On souhaite maintenant effectuer le calcul sur plusieurs threads en utilisant des objets RecursiveAction que l'on va appeler sur un ForkJoinPool. Dans un premier temps, on va afficher pour chaque thread, le résultat du calcul effectué par celle-ci, on verra comment combiner les résultats de ces calculs dans un second temps.
    On se propose d'utiliser le code suivant
      public static void minForkJoinPrint(int[] array, int start, int end) {
        // get the common fork join pool
        var commonPool = ...
        // create the for join action
        var action = ...
        // gently asks the pool to call the action
        ...
      }
    
      private static final class MinRecursiveAction extends RecursiveAction {
        private final int[] array;
        private final int start;
        private final int end;
    
        // TOOO
    
        @Override
        protected void compute() {
          // TODO
        }
      }
    
      public static void main(String[] args) {
        var array = new Random(0).ints(1_000_000, 0, Integer.MAX_VALUE).toArray();
        minForkJoinPrint(array, 0, array.length);
      }
         
    Avec la méthode minForkJoinPrint qui créer une RecursiveAction correspondant au calcul et demande gentiment au ForkJoinPool par défaut d'appeler celle-ci. Et la méthode MinRecursiveAction.compute() qui fait une décomposition récursive s'il y a au moins 1024 valeurs à calculer et qui sinon utilise la méthode minLoop écrite précédemment.
    Pour l'affichage, on vous demande d'afficher le thread courant ainsi que le résultat du minimum sur la partie de tableau correspondante.
    Par exemple
    ...
    Thread[ForkJoinPool.commonPool-worker-5,5,main] 4566894
    Thread[ForkJoinPool.commonPool-worker-7,5,main] 755964
    Thread[main,5,main] 2527216
    Thread[ForkJoinPool.commonPool-worker-7,5,main] 5763493
    Thread[main,5,main] 2384880
    Thread[ForkJoinPool.commonPool-worker-5,5,main] 4880931
    ...
         

    Compléter l'implantation et vérifier que le main fonctionne.
    Note: attention a bien vérifier que tous les calculs ne sont pas fait par la même thread !
  4. Enfin, on souhaite écrire une méthode minForkJoin avec la même signature que les méthodes minLoop et minStream effectuant le calcul en parallèle sur plusieurs threads en paralèlle.
    Expliquer, dans un premier temps, pourquoi il faut utiliser une classe RecursiveTask au lieu d'une classe RecursiveAction.
    Puis écrire une classe interne MinRecursiveTask effectuant le calcul et la décomposition récursive et combinant les résultats puis implanter la méthode minForkJoin pour propager le résultat du calcul en tant que valeur de retour.
    Vérifier que les tests marqués Q4 passent.

Exercice 2 - ForkJoin pour n'importe qu'elle opération associative

En fait, il est possible de généraliser le code que nous avons écrit précédemment dans un premier temps à n'importe quelle opéation sur un IntStream (donc pas seulement sur les tableaux) puis dans un second temps à n'importe quelle opération sur un Stream d'objet.
pour cela nous devons être capable de couper un Stream en deux, c'est justement ce que propose un Spliterator et sa version spécifique pour les entiers Spliterator.OfInt en utilisant la méthode trySplit.
Contrairement à l'exécice précédent où l'on plantait si l'on demandait le minimum d'un tableau vide, on va utiliser ici la classe Optional et sa spéciflisation pour int OptionalInt pour indiquer que le résultat n'existe pas.
Note: il est possible de créer un Stream à partir d'un Spliterator en utilisant StreamSupport

Les tests unitaires sont là AnyOpForkJoinTest.java

On se propose d'utiliser le code suivant
import static java.util.Objects.requireNonNull;

public class AnyOpForkJoin {
  public static OptionalInt anyOpForkJoinInt(IntStream stream, IntBinaryOperator combiner) {
    requireNonNull(stream);
    requireNonNull(combiner);
    return anyOpForkJoinInt(stream.spliterator(), combiner);
  }

  private static OptionalInt anyOpForkJoinInt(Spliterator.OfInt spliterator, IntBinaryOperator combiner) { 
    var pool = ForkJoinPool.commonPool(); 
    ... 
    // TODO
  }
 }
   

  1. Implanter la méthode anyOpForkJoinInt de telle sorte que l'opération spécifiée (combiner) soit exécutée en parallèle.
    Vérifier que les tests marqués "Q1" passent.
    Note: le but est soit même paralléliser le calcul, pas d'utiliser des Streams parallèle.
  2. Ecrire la méthode anyOpForkJoinObj qui marche comme anyOpForkJoinInt mais pour un Stream d'objet.
    Vérifier que les tests marqués "Q2" passent.