Généralités
-
Stream = vue paresseuse d'une collection (séquence d'éléments) ; les élements ne sont pas calculés tant qu'on en a pas besoin
- Stream<T> pour manipuler des objets de type T
- Versions existantes pour types primitifs: IntStream, LongStream, DoubleStream
- Possibilité d'avoir un nombre d'éléments fini ou infini dans un stream
-
Application possible d'opérations pour filtrer, transformer les éléments d'un stream
- Un stream est créé à partir d'une source
- Des traitements successifs peuvent être appliqués (pipeline) pour transformer le stream en un autre stream
- Enfin le résultat du stream transormé (ou non) est acheminé vers un puits pour calculer le résultat final souhaité
- Parallélisation possible des traitements sur un stream (exploitation de plusieurs cœurs de calcul)
Sources de stream
-
Une collection
- En appelant la méthode stream() sur la collection
- En appelant la méthode parallelStream() sur la collection pour avoir un stream avec traitement parallèle
-
Un String
- En appelant chars() pour obtenir un IntStream avec le code des caractères
-
Un range
- Avec la méthode statique IntStream.range(start, stop) pour obtenir un IntStream de nombres entre start et stop (exclus)
-
Un supplier
- Avec la méthode statique Stream.generate(Supplier<T> s) : le supplier retourne un élément à chaque appel
-
Une façon d'itérer
- Avec la méthode statique Stream.iterate(T seed, UnaryOperator<T> operator) qui démarre le stream avec seed (1er élément) et calcule l'élément suivant en appliquant operator sur l'élément courant
-
Un tableau d'éléments
- Avec la méthode statique Stream.of(T... values)
-
Plusieurs streams concaténées les uns aux autres
- Avec la méthode statique Stream.concat(Stream<T> stream1, Stream<T> stream2)
Un stream ne réalise que des opérations de lecture sur sa source (pas de modification).
Ordre des streams
-
Stream ordonné : l'ordre d'apparition des éléments est déterminé ; permet d'obtenir un résultat déterministe
- List, SortedSet produisent des streams ordonnés (selon l'ordre des indices pour List, l'ordre défini par le comparateur sur SortedSet)
-
Stream non ordonné : l'ordre des éléments n'a pas d'importance
- HashSet produit des streams non ordonnés
- Transformation d'un stream ordonné en stream non-ordonné en appelant stream.unordered()
Traitements de transformation
Quels sont les traitements de transformation applicables sur un stream (intermediate operation) ? Consultons la Javadoc de Stream :
- filter(Predicate<T> predicate) permet de filtrer un stream (ne conserve que les éléments validant le prédicat)
- limit(long maxSize) permet de limiter la taille d'un stream (élimine la fin du stream)
- skip(long n) permet d'ignorer n éléments à la tête d'un stream
- takeWhile(Predicate<T> predicate) permet d'arrêter un stream lorsque le prédicat n'est plus satisfait (introduit avec Java 9)
-
map(Function<? super T,? extends R> mapper) permet de convertir les éléments d'un stream à l'aide de la fonction de conversion fournie
- des variantes de map existent pour transformer les éléments en streams de type primitifs : mapToInt pour un IntStream, mapToLong pour un LongStream et mapToDouble pour un DoubleStream
- inversement on peut convertir un IntStream, LongStream, DoubleStream en Stream<T> avec mapToObj({Int,Long,Double}Function<? extends U> mapper)
-
flatMap(Function<? super T,? extends Stream<? extends R>> mapper) permet de convertir chaque élément du stream en un stream ; tous les streams étant concaténés entre-eux pour retourner un seul stream
- des variantes de flatMap existent pour des types primitifs flatMapToInt, flatMapToLong, flatMapToDouble
- peek(Consumer<? super T> action) ne change pas le stream mais appelle action à chaque fois qu'un élément du stream est consommé (permet d'espionner pour savoir quand un élément est réellement récupéré)
- distinct() qui élimine les doublons (au sens de la méthode equals()) ; l'implantation sous-jacente utilise LinkedHashSet (ce qui nécessite l'implantation cohérente de hashCode())
Certains traitements peuvent nécessiter de consommer tout le stream pour fonctionner (donc incompatibles avec un stream infini), c'est le cas notamment de :
-
sorted(Comparator<? super T> comparator) qui trie un stream en utilisant un comparator (la même méthode existe sans comparateur pour utiliser l'ordre naturel des classes implantant Comparable)
- L'appel de sorted() sur un stream non-ordonné le transforme en stream ordonné
Exemple : obtenir un stream de toutes les lettres apparaissant dans une List de String par ordre d'apparition (et sans doublon)
List<String> l = List.of("foo", "bar", "test"); Stream<Character> s = l.stream().flatMapToInt( s -> s.chars() ).mapToObj( x -> (char) x).distinct();
Puits de streams
Principaux puits
Un stream peut être acheminé vers un puits pour calculer quelque chose le concernant (terminal operation) :
- count() qui compte le nombre d'éléments du stream
-
sum() qui somme tous les éléments du stream (ne fonctionne qu'avec IntStream, LongStream et DoubleStream)
- Pour des informations plus avancées summaryStatistics() retourne un objet avec des méthodes getSum(), getMin(), getMax(), getAverage() et getCount()
- max(Comparator<? super T> comparator) permet d'obtenir l'élément maximum au sens du comparateur (existe aussi en version min)
- allMatch(Predicate<? super T> predicate) retourne un booléen indiquant si tous les éléments du stream valident le prédicat (existe en version anyMatch retournant true ssi un des éléments valide le prédicat, noneMatch ssi aucun élément ne valide le prédicat)
- reduce(T identity, BinaryOperator<T> accumulator) effectue une réduction en calculant accumulator(...(accumulator(accumulator(identifity, element_0), element_1))...), element_n-1)
-
reduce(BinaryOperator<T> accumulator) est une variante de l'opération précédente qui calcule accumulator(...(accumulator(accumulator(element_0, element_1), element_2))..), element_n-1)
- Retour d'un objet Optional<T> qui est une enveloppe pour l'objet calculé
- Si aucun objet n'a pu être calculé (stream ne comportant aucun élément), la méthode isPresent() d'Optional<T> retourne false
- forEach(Consumer<? super T> action) permet de réaliser une action pour chaque élément du stream (pareil que peek mais ne retourne pas de Stream)
- collect(Collector<T, A, R> collector) permet de réaliser une opération de réduction mise en œuvre par un collecteur
- findFirst() retourne un Optional<T> contenant le premier élément du Stream<T> (s'il en existe) ; la variante findAny() retourne n'importe quel élément (pas nécessairement le premier)
Le stream ne peut plus être utilisé une fois qu'il a été entraîné dans un puits.
Exemple 1 : quelle est la somme de toutes les puissances paires de 2 (2^0, 2^2, 2^4...) qui soient inférieures à Integer.MAX_INT ?
Stream.iterate(0, x -> x +1).filter( x -> x % 2 == 0).mapToLong( x -> 1L << x ).takeWhile( x -> x <= Integer.MAX_VALUE).sum()
Exemple 2 : combien reproduire le comportement de la méthode List.contains(Object obj) avec un Stream ?
List<String> l = List.of("foo", "bar", "test"); assert(l.contains("bar") == l.stream().filter( x -> x.equals("bar") ).findFirst().isPresent());
Modification de la source par le puits
Un puits ne doit pas modifier la source (si celle-ci est une structure de données) sur lequel il repose. Par exemple, le code suivant poserait problème (suppression des entiers pairs du set et affichage) :
Set<Integer> s = new HashSet<>(); // ... we populate the list s.stream().filter( x -> x % 2 == 0).peek( s::remove ).forEach(System.out::println);
En revanche, si nous remplaçons la dernière ligne par ceci, il n'y a plus de souci :
s.stream().filter( x -> x % 2 == 0).sorted().peek( s::remove ).forEach(System.out::println);
sorted() génère une nouvelle structure de données en mémoire pour stocker les éléments triés ; s::remove n'impacte pas cette structure et forEach peut s'exécuter sans encombre.
Les collecteurs
Un Collector représente une opération de réduction avancée implantée sous la forme d'une classe avec différentes méthodes qui permet de calculer une nouvelle structure de données à partir d'un stream.
Interface Collector<T, A, R> :
- T : type d'éléments du stream
- A : structure intermédiaire utilisée pour la réduction
- R : type du résultat final
Etapes de la réduction :
- collector.supplier() retourne un Supplier<A> permettant de créer une structure de données A d'accumulation : A accumulator = collector.supplier()
- collector.accumulator() retourne un BiConsumer<A, T> permettant d'ajouter un élément T du stream dans l'accumulateur mutable : (collector.accumulator())(accumulator, element)
- On utilise la fonction d'accumulation pour chaque élément du stream
- Finalement, on transforme la structure A d'accumulation en résultat final : R finalResult = (collector.finisher())(accumulator)
Un Collector implante également une méthode combiner() qui retourne un BinaryOperator<A> fusionnant deux accumulateurs (utile pour les traitements parallèles).
La classe Collectors fournit des collecteurs prêt à l'emploi (méthodes statiques) :
- Collectors.toList() : pour transformer le stream en liste
- Collectors.toSet() : pour transformer le stream en ensemble
- Collectors.joining(String separator) : pour concaténer un Stream<String> en une seule chaîne de caractères avec un séparateur
-
Collectors.groupingBy(Function<? super T,? extends K> classifier)
- classe tous les éléments du stream dans des groupes
- la clé de groupe d'un élément est obtenu avec classifier
- le résultat final est une Map dont les clés sont les clés de groupes et les valeurs les membres du groupe sous forme de liste
- une variante groupingBy(Function<? super T,? extends K> classifier, Collector<? super T,A,D> downstream) permet de fournir la liste de chaque clé à un collector retournant un résultat de type D
- Collectors.toMap(Function<? super T,? extends K> keyMapper, Function<? super T,? extends U> valueMapper, BinaryOperator<U> mergeFunction) permet de créer un Map avec les éléments du stream ; les clés étant calculées avec keyMapper, les valeurs avec valueMapper et si plusieurs éléments partagent la même clé, leurs valeurs sont fusionnées avec mergeFunction
Création d'un Collector personnalisé :
- Soit en implantant l'interface Collector (implantation de supplier(), accumulator(), combiner() et finisher() (ainsi que characteristics() pour indiquer les caractéristiques du collecteur)
- Soit en utilisant la méthode statique of(Supplier<A> supplier, BiConsumer<A,T> accumulator, BinaryOperator<A> combiner, Function<A,R> finisher, Collector.Characteristics... characteristics)
Exemples
Nombres premiers
Calculons le nombre de nombres premiers inférieurs à (exemple non optimisé mais proposé à titre illustratif pour la manipulation des streams)
package fr.upem.jacosa.collections; import java.util.function.Supplier; import java.util.stream.Stream; public class PrimeStream { public static Stream<Long> getCandidateDividerStream(final long n) { Supplier<Long> supplier = new Supplier<Long>() { long i = 2; public Long get() { return i++; } }; return Stream.generate(supplier) .filter( x -> x % 2 == 1 || x == 2 ) // select only odd elements (and also 2) .takeWhile(x -> x * x <= n); // takeWhile is available with Java 1.9 } public static boolean isPrime(final long n) { return getCandidateDividerStream(n).noneMatch( x -> n % x == 0 ); } /** Return the number of primes below a given limit */ public static long getPrimeNumber(final long limit) { return Stream.concat( Stream.of(2L), Stream.iterate(3L, x -> x + 2L)) .takeWhile( x -> x <= limit) .filter( x -> isPrime(x) ) .count(); } public static void main(String[] args) { long n = Long.parseLong(args[0]); System.out.println("Number of primes below " + n + ": " + getPrimeNumber(n)); } }
Concaténation de deux listes
Nous pouvons utiliser des streams et un collecteur avec de concaténer deux listes d'éléments :
package fr.upem.jacosa.collections; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; /** Example concatenating lists using streams */ public class ListConcatenation { public static void main(String[] args) { List<String> l1 = Arrays.asList(args); List<String> l2 = Arrays.asList(args); List<String> l3 = Stream.concat(l1.stream(), l2.stream()).collect(Collectors.toList()); System.out.println(l3); } }
Un collecteur qui calcule la médiane d'un stream
Nous définissons la médiane d'un ensemble d'éléments comme l'élément de cet ensemble tel qu'autant d'éléments soient plus grands que plus petits que celui-ci (cette définition est valable pour un nombre impair d'éléments)
package fr.upem.jacosa.collections; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collector; import java.util.stream.Stream; /** Computation of the median of a stream */ public class StreamMedian { /** We create a collector that fetches the median of a stream */ public static <T> Collector<T, List<T>, T> medianCollector() { return Collector.of(() -> new ArrayList<T>(), // the supplier (list, element) -> { list.add(element); }, // the accumulator // the combiner (list1, list2) -> { List<T> list3 = new ArrayList<T>(list1); list3.addAll(list2); return list3; }, // the finisher (list) -> { list.sort(null); return list.get(list.size() / 2); }); } public static <T> T computeMedian(Stream<T> stream) { return stream.collect(medianCollector()); } /** We test in the main the computeMedian method */ public static void main(String[] args) { String median = computeMedian(Arrays.asList(args).stream()); System.out.println("Median of the arguments: " + median); } }
Stream parallèle
Les traitements sur un stream peuvent être parallélisables. Dans ce cas, les éléments peuvent être traités simultanément par plusieurs threads. Vérifions le avec la méthode forEach :
package fr.upem.jacosa.collections; import java.util.stream.Stream; public class ParallelStreamTest { public static Stream<Integer> createFirstIntegerStream(int bound) { return Stream.iterate(0, e -> e + 1).limit(bound); } public static void main(String[] args) { createFirstIntegerStream(10).parallel().forEach(x -> System.out.println(x)); } }
Voici un exemple d'exécution du code :
1 4 2 7 8 0 5 6 9 3
Regroupement d'éléments
Classons des chaînes de caractères par le nombre de voyelles qu'elles possèdent.
package fr.upem.jacosa.collections; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; /** Gather strings by the number of vowel they contain */ public class VowelGrouper { public static final String VOWELS = "aeiouy"; public static int getVowelNumber(String s) { return (int)s.chars().filter(x -> VOWELS.indexOf(x) >= 0 ).count(); } public static Map<Integer, List<String>> groupByVowelNumber(Stream<String> stream) { return stream.collect(Collectors.groupingBy(x -> getVowelNumber(x))); } public static void main(String[] args) { System.out.println(groupByVowelNumber(Arrays.asList(args).stream())); } }