Part 7 of 16

Advanced Streams: flatMap, Collectors, Grouping, and Partitioning

flatMap in Depth

The Collectors API covers perhaps 90% of real-world aggregation needs, but the moment you need something beyond grouping, partitioning, or joining, the custom Collector is there. Understanding flatMap and the full Collectors toolkit — including when to write your own — separates competent Java 8 developers from those who still reach for a for loop whenever a problem gets slightly unusual.

flatMap is the most powerful transformation in the Streams API. It maps each element to a stream, then flattens all those streams into one continuous stream.

// Without flatMap — stream of lists
Stream<List<String>> nested = Stream.of(
    Arrays.asList("a", "b"),
    Arrays.asList("c", "d"),
    Arrays.asList("e")
);

// Stream<String>: "a", "b", "c", "d", "e"
Stream<String> flat = nested.flatMap(Collection::stream);

Practical flatMap Patterns

Flatten orders into line items:

List<LineItem> allItems = orders.stream()
    .flatMap(order -> order.getLineItems().stream())
    .collect(Collectors.toList());

Extract unique words from sentences:

List<String> uniqueWords = sentences.stream()
    .flatMap(s -> Arrays.stream(s.split("\\s+")))
    .map(String::toLowerCase)
    .distinct()
    .sorted()
    .collect(Collectors.toList());

Build permission sets from roles:

Set<Permission> permissions = user.getRoles().stream()
    .flatMap(role -> role.getPermissions().stream())
    .collect(Collectors.toSet());

flatMap with Optional (Java 8 style):

// Find the first non-empty optional in a list
Optional<String> result = optionals.stream()
    .filter(Optional::isPresent)
    .map(Optional::get)
    .findFirst();

// In Java 9+: optionals.stream().flatMap(Optional::stream)

Collectors in Depth

Collectors is a factory class with static methods that produce Collector instances for the collect() terminal operation.

Collecting to a List or Set

List<String> list = stream.collect(Collectors.toList());
Set<String> set   = stream.collect(Collectors.toSet());

// Guaranteed mutable list
List<String> arrayList = stream.collect(Collectors.toCollection(ArrayList::new));

// Unmodifiable (Java 10+) — in Java 8, use Collections.unmodifiableList

Collecting to a Map (toMap)

// Simple key → value
Map<Long, String> idToName = users.stream()
    .collect(Collectors.toMap(User::getId, User::getName));

// Value is the whole object
Map<Long, User> idToUser = users.stream()
    .collect(Collectors.toMap(User::getId, Function.identity()));

// Handle duplicate keys with merge function
Map<String, Long> nameToId = users.stream()
    .collect(Collectors.toMap(
        User::getName,
        User::getId,
        (existing, replacement) -> existing  // keep first on duplicate
    ));

// Control the map implementation
Map<String, User> nameToUser = users.stream()
    .collect(Collectors.toMap(
        User::getName,
        Function.identity(),
        (a, b) -> a,
        LinkedHashMap::new  // preserve insertion order
    ));

Pitfall: toMap throws IllegalStateException on duplicate keys unless you provide a merge function. Always provide a merge function if duplicates are possible.

Joining Strings

String csv     = names.stream().collect(Collectors.joining(", "));
String wrapped = names.stream().collect(Collectors.joining(", ", "[", "]"));
// [Alice, Bob, Charlie]

Counting

long count = stream.collect(Collectors.counting());
// equivalent to stream.count() but usable as a downstream collector

Summing, Averaging, Summarising

int totalAge    = users.stream().collect(Collectors.summingInt(User::getAge));
double avgAge   = users.stream().collect(Collectors.averagingInt(User::getAge));

IntSummaryStatistics stats = users.stream()
    .collect(Collectors.summarizingInt(User::getAge));
// stats.getCount(), getMin(), getMax(), getSum(), getAverage()

minBy / maxBy

Optional<User> oldest = users.stream()
    .collect(Collectors.maxBy(Comparator.comparingInt(User::getAge)));

groupingBy

groupingBy partitions a stream into a Map<K, List<T>> by a classifier function:

// Group users by city
Map<String, List<User>> byCity = users.stream()
    .collect(Collectors.groupingBy(User::getCity));

Downstream Collectors

The second argument to groupingBy is a downstream collector that post-processes each group:

// Count users per city
Map<String, Long> countByCity = users.stream()
    .collect(Collectors.groupingBy(User::getCity, Collectors.counting()));

// Average age per city
Map<String, Double> avgAgeByCity = users.stream()
    .collect(Collectors.groupingBy(
        User::getCity,
        Collectors.averagingInt(User::getAge)
    ));

// Collect names per city
Map<String, List<String>> namesByCity = users.stream()
    .collect(Collectors.groupingBy(
        User::getCity,
        Collectors.mapping(User::getName, Collectors.toList())
    ));

Multi-Level Grouping

// Group by country, then by city
Map<String, Map<String, List<User>>> byCountryThenCity = users.stream()
    .collect(Collectors.groupingBy(
        User::getCountry,
        Collectors.groupingBy(User::getCity)
    ));

// Access: byCountryThenCity.get("UK").get("London")

Controlling the Map Implementation

// TreeMap for sorted keys
TreeMap<String, List<User>> sorted = users.stream()
    .collect(Collectors.groupingBy(User::getCity, TreeMap::new, Collectors.toList()));

partitioningBy

partitioningBy divides a stream into exactly two groups — true and false — based on a predicate:

// Partition users: active (true) vs inactive (false)
Map<Boolean, List<User>> partitioned = users.stream()
    .collect(Collectors.partitioningBy(User::isActive));

List<User> active   = partitioned.get(true);
List<User> inactive = partitioned.get(false);

With downstream:

// Count active vs inactive
Map<Boolean, Long> counts = users.stream()
    .collect(Collectors.partitioningBy(User::isActive, Collectors.counting()));

Use partitioningBy over groupingBy with a boolean when you need both sides — it guarantees both true and false keys exist in the result, even if one side is empty.


mapping and collectingAndThen

mapping — transform before collecting downstream

// Collect names (not User objects) grouped by city
Map<String, List<String>> namesByCity = users.stream()
    .collect(Collectors.groupingBy(
        User::getCity,
        Collectors.mapping(User::getName, Collectors.toList())
    ));

collectingAndThen — post-process the collected result

// Collect to an unmodifiable list
List<String> immutable = names.stream()
    .collect(Collectors.collectingAndThen(
        Collectors.toList(),
        Collections::unmodifiableList
    ));

// Get the longest name
Optional<String> longest = names.stream()
    .collect(Collectors.collectingAndThen(
        Collectors.maxBy(Comparator.comparingInt(String::length)),
        opt -> opt.orElse(null)
    ));

Custom Collectors

For complex aggregations not covered by built-in collectors, implement Collector<T, A, R>:

  • T — type of stream elements
  • A — mutable accumulation type (intermediate container)
  • R — result type
public class JoiningCollector implements Collector<String, StringJoiner, String> {

    private final String delimiter;

    public JoiningCollector(String delimiter) { this.delimiter = delimiter; }

    @Override
    public Supplier<StringJoiner> supplier() {
        return () -> new StringJoiner(delimiter);
    }

    @Override
    public BiConsumer<StringJoiner, String> accumulator() {
        return StringJoiner::add;
    }

    @Override
    public BinaryOperator<StringJoiner> combiner() {
        return StringJoiner::merge;  // for parallel streams
    }

    @Override
    public Function<StringJoiner, String> finisher() {
        return StringJoiner::toString;
    }

    @Override
    public Set<Characteristics> characteristics() {
        return Collections.emptySet();
    }
}

In practice, Collector.of is simpler:

Collector<String, StringJoiner, String> joiner =
    Collector.of(
        () -> new StringJoiner(", "),     // supplier
        StringJoiner::add,                // accumulator
        StringJoiner::merge,              // combiner
        StringJoiner::toString            // finisher
    );

String result = Stream.of("a", "b", "c").collect(joiner); // "a, b, c"

Real-World Custom Collector: Top-N per Group

A common analytics requirement is “for each category, give me the top 3 by revenue”. The built-in collectors cannot express this directly — a custom collector makes it reusable across the codebase.

/**
 * Collector that keeps only the top N elements by a given comparator.
 * Memory-efficient: never accumulates more than N + 1 elements per accumulation step.
 */
public class TopNCollector<T> implements Collector<T, PriorityQueue<T>, List<T>> {

    private final int n;
    private final Comparator<T> comparator;

    public TopNCollector(int n, Comparator<T> comparator) {
        this.n = n;
        this.comparator = comparator;
    }

    /** Factory method for cleaner call sites */
    public static <T> TopNCollector<T> top(int n, Comparator<T> comparator) {
        return new TopNCollector<>(n, comparator);
    }

    @Override
    public Supplier<PriorityQueue<T>> supplier() {
        // Min-heap: the smallest element is always at the head — easy to evict
        return () -> new PriorityQueue<>(n + 1, comparator);
    }

    @Override
    public BiConsumer<PriorityQueue<T>, T> accumulator() {
        return (heap, element) -> {
            heap.offer(element);
            if (heap.size() > n) heap.poll(); // evict the smallest
        };
    }

    @Override
    public BinaryOperator<PriorityQueue<T>> combiner() {
        return (heap1, heap2) -> {
            heap2.forEach(e -> {
                heap1.offer(e);
                if (heap1.size() > n) heap1.poll();
            });
            return heap1;
        };
    }

    @Override
    public Function<PriorityQueue<T>, List<T>> finisher() {
        return heap -> {
            List<T> result = new ArrayList<>(heap);
            result.sort(comparator.reversed()); // largest first in the final list
            return result;
        };
    }

    @Override
    public Set<Characteristics> characteristics() {
        return Collections.emptySet(); // not UNORDERED — order in result matters
    }
}

Usage — top 3 orders by revenue per region:

// First group by region, then apply the custom collector as downstream
Map<String, List<Order>> top3PerRegion = orders.stream()
    .collect(Collectors.groupingBy(
        Order::getRegion,
        TopNCollector.top(3, Comparator.comparingDouble(Order::getRevenue))
    ));

// Or directly across all orders: global top 5
List<Order> globalTop5 = orders.stream()
    .collect(TopNCollector.top(5, Comparator.comparingDouble(Order::getRevenue)));

This pattern is far more memory-efficient than sorted().limit(N) on large streams because it never sorts the entire stream — it maintains a bounded heap of at most N elements throughout the pipeline.

Characteristics Explained

CharacteristicMeaning
CONCURRENTAccumulator can be called concurrently — allows single shared mutable container for parallel streams
UNORDEREDResult does not depend on encounter order — enables parallelism optimisations
IDENTITY_FINISHFinisher is Function.identity() — JVM skips calling it

Most custom collectors should return Collections.emptySet() from characteristics() until you specifically verify the others apply.


Real-World Examples

E-Commerce: Q1 Revenue from Active Orders

Filter by date range, flatten line items, and sum — all in one pipeline:

BigDecimal q1Revenue = orders.stream()
    .filter(o -> o.getOrderDate().getYear() == 2026
              && o.getOrderDate().getMonthValue() <= 3)   // Q1
    .filter(o -> !o.getStatus().equals("CANCELLED"))
    .flatMap(o -> o.getLineItems().stream())               // flatten items
    .map(LineItem::getSubtotal)
    .reduce(BigDecimal.ZERO, BigDecimal::add);

HR Analytics: High Earners by Department

// Map<Department, List<String>> — names of employees earning above threshold
Map<String, List<String>> highEarnersByDept = employees.stream()
    .filter(emp -> emp.getSalary() > 60_000)
    .collect(Collectors.groupingBy(
        Employee::getDepartment,
        Collectors.mapping(Employee::getName, Collectors.toList())
    ));

Log Analysis: Error Frequency from a Large File

// Parse an app.log and count occurrences of each error code
Map<String, Long> errorFrequency;
try (Stream<String> lines = Files.lines(Paths.get("app.log"))) {
    errorFrequency = lines
        .filter(line -> line.contains("ERROR"))
        .map(line -> line.replaceFirst(".*ERROR\\s+", "").split("\\s+")[0])
        .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
}

// Top 5 most frequent errors
errorFrequency.entrySet().stream()
    .sorted(Map.Entry.<String, Long>comparingByValue().reversed())
    .limit(5)
    .forEach(e -> System.out.printf("%-30s %d%n", e.getKey(), e.getValue()));

Customer Segmentation: Preserve All Groups Even When Empty

Using Collectors.groupingBy with a filter before the stream drops groups with no matches. Use filtering as a downstream collector to keep all keys:

// WRONG: regions with no high-value customers disappear from the result
Map<String, List<Customer>> wrong = customers.stream()
    .filter(c -> c.getTotalSpend() > 10_000)
    .collect(Collectors.groupingBy(Customer::getRegion));

// RIGHT: all regions appear; some may have empty lists
Map<String, List<Customer>> segmented = customers.stream()
    .collect(Collectors.groupingBy(
        Customer::getRegion,
        Collectors.filtering(
            c -> c.getTotalSpend() > 10_000,
            Collectors.toList()
        )
    ));
// Result: {USA=[cust1, cust2], EU=[], ASIA=[cust3]}

Report: Sales by Region and Product Category

// Map<region, Map<category, totalRevenue>>
Map<String, Map<String, Double>> salesReport = orders.stream()
    .collect(Collectors.groupingBy(
        Order::getRegion,
        Collectors.groupingBy(
            Order::getCategory,
            Collectors.summingDouble(Order::getRevenue)
        )
    ));

Histogram: Character Frequency

Map<Character, Long> frequency = "hello world".chars()
    .mapToObj(c -> (char) c)
    .filter(c -> c != ' ')
    .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));

Index: Build a Search Index from Documents

// Map<word, Set<documentId>> — inverted index
Map<String, Set<Long>> invertedIndex = documents.stream()
    .flatMap(doc ->
        Arrays.stream(doc.getContent().split("\\s+"))
              .map(word -> Map.entry(word.toLowerCase(), doc.getId()))
    )
    .collect(Collectors.groupingBy(
        Map.Entry::getKey,
        Collectors.mapping(Map.Entry::getValue, Collectors.toSet())
    ));

Summary

CollectorOutput typeUse case
toList()List<T>Ordered collection
toSet()Set<T>Dedup, unordered
toMap(k, v)Map<K,V>Key-value lookup
joining(delim)StringConcatenate strings
groupingBy(f)Map<K, List<T>>Group by field
groupingBy(f, downstream)Map<K, R>Group + post-process
partitioningBy(p)Map<Boolean, List<T>>Split into two groups
counting()LongCount per group
summingInt/Long/DoubleInteger/Long/DoubleSum per group
averagingInt/Long/DoubleDoubleAverage per group
mapping(f, downstream)variesTransform before collecting
collectingAndThen(c, f)variesPost-process result

Next Step

Parallel Streams: ForkJoinPool, Spliterators, and When NOT to Parallelize →

Part of the DevOps Monk Java tutorial series: Java 8Java 11Java 17Java 21