image/svg+xml $ $ ing$ ing$ ces$ ces$ Res Res ea ea Res->ea ou ou Res->ou r r ea->r ch ch ea->ch r->ces$ r->ch ch->$ ch->ing$ T T T->ea ou->r

Généralités

Graphe d'objet sérialisé

Sources de stream

Un stream ne réalise que des opérations de lecture sur sa source (pas de modification).

Ordre des streams

Traitements de transformation

Quels sont les traitements de transformation applicables sur un stream (intermediate operation) ? Consultons la Javadoc de Stream :

Certains traitements peuvent nécessiter de consommer tout le stream pour fonctionner (donc incompatibles avec un stream infini), c'est le cas notamment de :

Exemple : obtenir un stream de toutes les lettres apparaissant dans une List de String par ordre d'apparition (et sans doublon)

List<String> l = List.of("foo", "bar", "test");
Stream<Character> s = l.stream().flatMapToInt( s -> s.chars() ).mapToObj( x -> (char) x).distinct();

Puits de streams

Principaux puits

Un stream peut être acheminé vers un puits pour calculer quelque chose le concernant (terminal operation) :

Le stream ne peut plus être utilisé une fois qu'il a été entraîné dans un puits.

Exemple 1 : quelle est la somme de toutes les puissances paires de 2 (2^0, 2^2, 2^4...) qui soient inférieures à Integer.MAX_INT ?

Stream.iterate(0, x -> x +1).filter( x -> x % 2 == 0).mapToLong( x -> 1L << x ).takeWhile( x -> x <= Integer.MAX_VALUE).sum()

Exemple 2 : combien reproduire le comportement de la méthode List.contains(Object obj) avec un Stream ?

List<String> l = List.of("foo", "bar", "test");
assert(l.contains("bar") == l.stream().filter( x -> x.equals("bar") ).findFirst().isPresent());

Modification de la source par le puits

Un puits ne doit pas modifier la source (si celle-ci est une structure de données) sur lequel il repose. Par exemple, le code suivant poserait problème (suppression des entiers pairs du set et affichage) :

Set<Integer> s = new HashSet<>();
// ... we populate the list
s.stream().filter( x -> x % 2 == 0).peek( s::remove ).forEach(System.out::println);

En revanche, si nous remplaçons la dernière ligne par ceci, il n'y a plus de souci :

s.stream().filter( x -> x % 2 == 0).sorted().peek( s::remove ).forEach(System.out::println);

sorted() génère une nouvelle structure de données en mémoire pour stocker les éléments triés ; s::remove n'impacte pas cette structure et forEach peut s'exécuter sans encombre.

Les collecteurs

Un Collector représente une opération de réduction avancée implantée sous la forme d'une classe avec différentes méthodes qui permet de calculer une nouvelle structure de données à partir d'un stream.

Interface Collector<T, A, R> :

Etapes de la réduction :

  1. collector.supplier() retourne un Supplier<A> permettant de créer une structure de données A d'accumulation : A accumulator = collector.supplier()
  2. collector.accumulator() retourne un BiConsumer<A, T> permettant d'ajouter un élément T du stream dans l'accumulateur mutable : (collector.accumulator())(accumulator, element)
  3. On utilise la fonction d'accumulation pour chaque élément du stream
  4. Finalement, on transforme la structure A d'accumulation en résultat final : R finalResult = (collector.finisher())(accumulator)

Un Collector implante également une méthode combiner() qui retourne un BinaryOperator<A> fusionnant deux accumulateurs (utile pour les traitements parallèles).

La classe Collectors fournit des collecteurs prêt à l'emploi (méthodes statiques) :

Création d'un Collector personnalisé :

Exemples

Nombres premiers

Calculons le nombre de nombres premiers inférieurs à n (exemple non optimisé mais proposé à titre illustratif pour la manipulation des streams)

package fr.upem.jacosa.collections;

import java.util.function.Supplier;
import java.util.stream.Stream;

public class PrimeStream
{
	public static Stream<Long> getCandidateDividerStream(final long n)
	{
		Supplier<Long> supplier = new Supplier<Long>() {
			long i = 2;
			
			public Long get() { return i++; }
		};
		return Stream.generate(supplier)
			.filter( x -> x % 2 == 1 || x == 2 ) // select only odd elements (and also 2)
			.takeWhile(x -> x * x <= n); // takeWhile is available with Java 1.9
	}
	
	public static boolean isPrime(final long n)
	{
		return getCandidateDividerStream(n).noneMatch( x -> n % x == 0 );
	}
	
	/** Return the number of primes below a given limit */
	public static long getPrimeNumber(final long limit)
	{
		return Stream.concat(
			Stream.of(2L), 
			Stream.iterate(3L, x -> x + 2L))
				.takeWhile( x -> x <= limit)
				.filter( x -> isPrime(x) )
				.count();
	}
	
	public static void main(String[] args)
	{
		long n = Long.parseLong(args[0]);
		System.out.println("Number of primes below " + n + ": " + getPrimeNumber(n));
	}
}

Concaténation de deux listes

Nous pouvons utiliser des streams et un collecteur avec de concaténer deux listes d'éléments :

package fr.upem.jacosa.collections;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** Example concatenating lists using streams */
public class ListConcatenation 
{
	public static void main(String[] args)
	{
		List<String> l1 = Arrays.asList(args);
		List<String> l2 = Arrays.asList(args);
		List<String> l3 = Stream.concat(l1.stream(),  l2.stream()).collect(Collectors.toList());
		System.out.println(l3);
	}
}

Un collecteur qui calcule la médiane d'un stream

Nous définissons la médiane d'un ensemble d'éléments comme l'élément de cet ensemble tel qu'autant d'éléments soient plus grands que plus petits que celui-ci (cette définition est valable pour un nombre impair d'éléments)

package fr.upem.jacosa.collections;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collector;
import java.util.stream.Stream;

/** Computation of the median of a stream */
public class StreamMedian 
{
	/** We create a collector that fetches the median of a stream */
	public static <T> Collector<T, List<T>, T> medianCollector()
	{
		return Collector.of(() -> new ArrayList<T>(), // the supplier
			(list, element) -> { list.add(element); }, // the accumulator
			
			// the combiner
			(list1, list2) -> { 
				List<T> list3 = new ArrayList<T>(list1); 
				list3.addAll(list2); 
				return list3; },
			
			// the finisher
			(list) -> {
				list.sort(null);
				return list.get(list.size() / 2);
			});
	}
	
	public static <T> T computeMedian(Stream<T> stream)
	{
		return stream.collect(medianCollector());
	}
	
	/** We test in the main the computeMedian method */
	public static void main(String[] args)
	{
		String median = computeMedian(Arrays.asList(args).stream());
		System.out.println("Median of the arguments: " + median);
	}
}

Stream parallèle

Les traitements sur un stream peuvent être parallélisables. Dans ce cas, les éléments peuvent être traités simultanément par plusieurs threads. Vérifions le avec la méthode forEach :

package fr.upem.jacosa.collections;

import java.util.stream.Stream;

public class ParallelStreamTest 
{
	public static Stream<Integer> createFirstIntegerStream(int bound)
	{
		return Stream.iterate(0, e -> e + 1).limit(bound);
	}
	
	public static void main(String[] args) 
	{
		createFirstIntegerStream(10).parallel().forEach(x -> System.out.println(x));
	}
}

Voici un exemple d'exécution du code :

1
4
2
7
8
0
5
6
9
3

Regroupement d'éléments

Classons des chaînes de caractères par le nombre de voyelles qu'elles possèdent.

package fr.upem.jacosa.collections;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** Gather strings by the number of vowel they contain */
public class VowelGrouper 
{
	public static final String VOWELS = "aeiouy";
	
	public static int getVowelNumber(String s)
	{
		return (int)s.chars().filter(x -> VOWELS.indexOf(x) >= 0 ).count();
	}
	
	public static Map<Integer, List<String>> groupByVowelNumber(Stream<String> stream)
	{
		return stream.collect(Collectors.groupingBy(x -> getVowelNumber(x)));
	}
	
	public static void main(String[] args)
	{
		System.out.println(groupByVowelNumber(Arrays.asList(args).stream()));
	}
}