:: Enseignements :: ESIPE :: E4INFO :: 2019-2020 :: Collections Concurrentes ::
[LOGO]

Fork/Join


Fork/Join pool et Décomposition Récursive d'un problème en vue de sa parallèlisation.

Exercice 1 - Rappel sur les fonctions d'ordre supérieur

En supposant les deux méthodes suivantes,
    public class Reducer {
      public static void sum(int[] array) {
        var sum = 0;
        for(var value: array) {
          sum += value;
        }
        return sum;
      }
      
      public static void max(int[] array) {
        var max = Integer.MIN_VALUE;
        for(var value: array) {
          max = Math.max(max, value);
        }
        return max;
      }
    }
    
  1. Quelles est la signature d'une méthode reduce qui prend plusieurs paramètres dont le tableau d'entier et un paramètre qui aura au choix soit la valeur Integer::sum soit la valeur Math::max de telle façon à pouvoir ré-écrire les méthodes sum et max comme des appels à la méthode reduce ?
  2. Ecrire la méthode publique reduce et changer le code des méthodes sum et max pour utiliser la méthode reduce
  3. Ecrire un main de test qui tire au sort un million d'élément dont les valeurs sont comprises en 0 et 1_000 et stoque ces éléments dans un tableau d'entier puis appels les méthodes sum et max et affiche les valeurs retrounées.
    Note: On utilisera pour cela la classe java.util.Random et sa méthode ints().
    Note2: Comment fait on si les valeurs sont random pour toujours tirer le même tableau de valeurs ?

Exercice 2 - Grosse réduction

Toujours avec la classe Reducer de l'exercice précédent, on souhaite changer le code pour utiliser les différents coeurs de votre machine pour effectuer le calcul en parallèle.
  1. Ecrire une méthode reduceWithStream qui prend exactement les mêmes paramètres que reduce mais utilise l'API des Stream.
  2. Ecrire une méthode parallelReduceWithStream qui prend exactement les mêmes paramètres que reduce mais utilise un stream parallèle.
    Note: il y a bien marqué qui prend les mêmes paramètres !
  3. Pour avoir plus de contrôle, on veut maintenant écrire une nouvelle version appelée parallelReduceWithForkJoin qui utilise la technique divide and conquer Fork/Join fourni par l'API Java.
    La technique Fork/Join consiste à utiliser l'algorithme suivant
           solve(problem):
           if problem is small enough:
              solve problem directly (sequential algorithm)
           else:
              divide the problem in two parts (part1, part2)
              fork solve(part1)
              solve(part2)
              join part1
              return combined results
         


    Java depuis la version 7 fourni les classes ForkJoinPool et RecursiveTask qui correspondent respectivement à un pool de thread ayant des tâches sachant se subdiviser et une tâche en elle-même.
    1. Pourquoi on ne peut pas utiliser un ThreadPoolExecutor classique comme piscine à threads mais un ForkJoinPool dans notre cas ?
    2. Comment obtenir les ForkJoinPool par défaut ?
    3. Comment doit on envoyer notre tâche à exécuter (notre RecursiveTask) sachant que l'on veut récupérer la valeur de retour ?
      Note: c'est la même méthode que pour le ThreadPoolExecutor classique !
    4. Ecrire la recursive task sous forme d'une classe interne.
    5. Ecrire le code de parallelReduceWithForkJoin qui crée la recursive task, l'envoie au fork/join pool et renvoie le résultat.
      Note: vérifier que vous obtenez le même résultat qu'avec les méthodes précédentes !

Exercice 3 - ForkJoinCollections

On souhaite implanter une classe ForkJoinCollections permettant de faire un reduce sur n'importe qu'elle collection. Pour parcourir et découper la collection en deux, on utilisera l'interface Spliterator.

On partira du code suivant
public class ForkJoinCollections {
  public static <V, T> V forkJoinReduce(Collection<T> collection,  int threshold, V initialValue,
                                        XXX accumulator, YYY combiner) {
    
    return forkJoinReduce(collection.spliterator(), threshold, initialValue, accumulator, combiner);
  }
  
  private static <V, T> V forkJoinReduce(Spliterator<T> spliterator, int threshold, V initialValue,
                                         XXX accumulator, YYY combiner) {
    // TODO
  }
  
  public static void main(String[] args) {
    // sequential
    System.out.println(IntStream.range(0, 10_000).sum());
    
    // fork/join
    var list = IntStream.range(0, 10_000).boxed().collect(Collectors.toList());
    var result = forkJoinReduce(list, 1_000, 0, (acc, value) -> acc + value, (acc1, acc2) -> acc1 + acc2);
    System.out.println(result); 
  }
}
   

  1. Quel sont les types XXX et YYY ou dit différemment pourquoi on a besoin de deux lambdas en paramètre de forkJoinReduce ?
  2. Implanter la méthode forkJoinReduce en utilisant le common ForkJoinPool ainsi qu'une classe interne héritant de la classe RecursiveTask.
  3. Que peut-on en conclure sur la façon dont les Stream parallèles sont implantés ?