Advanced Streams: flatMap, Collectors, Grouping, and Partitioning
flatMap in Depth
The Collectors API covers perhaps 90% of real-world aggregation needs, but the moment you need something beyond grouping, partitioning, or joining, the custom Collector is there. Understanding flatMap and the full Collectors toolkit — including when to write your own — separates competent Java 8 developers from those who still reach for a for loop whenever a problem gets slightly unusual.
flatMap is the most powerful transformation in the Streams API. It maps each element to a stream, then flattens all those streams into one continuous stream.
// Without flatMap — stream of lists
Stream<List<String>> nested = Stream.of(
Arrays.asList("a", "b"),
Arrays.asList("c", "d"),
Arrays.asList("e")
);
// Stream<String>: "a", "b", "c", "d", "e"
Stream<String> flat = nested.flatMap(Collection::stream);
Practical flatMap Patterns
Flatten orders into line items:
List<LineItem> allItems = orders.stream()
.flatMap(order -> order.getLineItems().stream())
.collect(Collectors.toList());
Extract unique words from sentences:
List<String> uniqueWords = sentences.stream()
.flatMap(s -> Arrays.stream(s.split("\\s+")))
.map(String::toLowerCase)
.distinct()
.sorted()
.collect(Collectors.toList());
Build permission sets from roles:
Set<Permission> permissions = user.getRoles().stream()
.flatMap(role -> role.getPermissions().stream())
.collect(Collectors.toSet());
flatMap with Optional (Java 8 style):
// Find the first non-empty optional in a list
Optional<String> result = optionals.stream()
.filter(Optional::isPresent)
.map(Optional::get)
.findFirst();
// In Java 9+: optionals.stream().flatMap(Optional::stream)
Collectors in Depth
Collectors is a factory class with static methods that produce Collector instances for the collect() terminal operation.
Collecting to a List or Set
List<String> list = stream.collect(Collectors.toList());
Set<String> set = stream.collect(Collectors.toSet());
// Guaranteed mutable list
List<String> arrayList = stream.collect(Collectors.toCollection(ArrayList::new));
// Unmodifiable (Java 10+) — in Java 8, use Collections.unmodifiableList
Collecting to a Map (toMap)
// Simple key → value
Map<Long, String> idToName = users.stream()
.collect(Collectors.toMap(User::getId, User::getName));
// Value is the whole object
Map<Long, User> idToUser = users.stream()
.collect(Collectors.toMap(User::getId, Function.identity()));
// Handle duplicate keys with merge function
Map<String, Long> nameToId = users.stream()
.collect(Collectors.toMap(
User::getName,
User::getId,
(existing, replacement) -> existing // keep first on duplicate
));
// Control the map implementation
Map<String, User> nameToUser = users.stream()
.collect(Collectors.toMap(
User::getName,
Function.identity(),
(a, b) -> a,
LinkedHashMap::new // preserve insertion order
));
Pitfall: toMap throws IllegalStateException on duplicate keys unless you provide a merge function. Always provide a merge function if duplicates are possible.
Joining Strings
String csv = names.stream().collect(Collectors.joining(", "));
String wrapped = names.stream().collect(Collectors.joining(", ", "[", "]"));
// [Alice, Bob, Charlie]
Counting
long count = stream.collect(Collectors.counting());
// equivalent to stream.count() but usable as a downstream collector
Summing, Averaging, Summarising
int totalAge = users.stream().collect(Collectors.summingInt(User::getAge));
double avgAge = users.stream().collect(Collectors.averagingInt(User::getAge));
IntSummaryStatistics stats = users.stream()
.collect(Collectors.summarizingInt(User::getAge));
// stats.getCount(), getMin(), getMax(), getSum(), getAverage()
minBy / maxBy
Optional<User> oldest = users.stream()
.collect(Collectors.maxBy(Comparator.comparingInt(User::getAge)));
groupingBy
groupingBy partitions a stream into a Map<K, List<T>> by a classifier function:
// Group users by city
Map<String, List<User>> byCity = users.stream()
.collect(Collectors.groupingBy(User::getCity));
Downstream Collectors
The second argument to groupingBy is a downstream collector that post-processes each group:
// Count users per city
Map<String, Long> countByCity = users.stream()
.collect(Collectors.groupingBy(User::getCity, Collectors.counting()));
// Average age per city
Map<String, Double> avgAgeByCity = users.stream()
.collect(Collectors.groupingBy(
User::getCity,
Collectors.averagingInt(User::getAge)
));
// Collect names per city
Map<String, List<String>> namesByCity = users.stream()
.collect(Collectors.groupingBy(
User::getCity,
Collectors.mapping(User::getName, Collectors.toList())
));
Multi-Level Grouping
// Group by country, then by city
Map<String, Map<String, List<User>>> byCountryThenCity = users.stream()
.collect(Collectors.groupingBy(
User::getCountry,
Collectors.groupingBy(User::getCity)
));
// Access: byCountryThenCity.get("UK").get("London")
Controlling the Map Implementation
// TreeMap for sorted keys
TreeMap<String, List<User>> sorted = users.stream()
.collect(Collectors.groupingBy(User::getCity, TreeMap::new, Collectors.toList()));
partitioningBy
partitioningBy divides a stream into exactly two groups — true and false — based on a predicate:
// Partition users: active (true) vs inactive (false)
Map<Boolean, List<User>> partitioned = users.stream()
.collect(Collectors.partitioningBy(User::isActive));
List<User> active = partitioned.get(true);
List<User> inactive = partitioned.get(false);
With downstream:
// Count active vs inactive
Map<Boolean, Long> counts = users.stream()
.collect(Collectors.partitioningBy(User::isActive, Collectors.counting()));
Use partitioningBy over groupingBy with a boolean when you need both sides — it guarantees both true and false keys exist in the result, even if one side is empty.
mapping and collectingAndThen
mapping — transform before collecting downstream
// Collect names (not User objects) grouped by city
Map<String, List<String>> namesByCity = users.stream()
.collect(Collectors.groupingBy(
User::getCity,
Collectors.mapping(User::getName, Collectors.toList())
));
collectingAndThen — post-process the collected result
// Collect to an unmodifiable list
List<String> immutable = names.stream()
.collect(Collectors.collectingAndThen(
Collectors.toList(),
Collections::unmodifiableList
));
// Get the longest name
Optional<String> longest = names.stream()
.collect(Collectors.collectingAndThen(
Collectors.maxBy(Comparator.comparingInt(String::length)),
opt -> opt.orElse(null)
));
Custom Collectors
For complex aggregations not covered by built-in collectors, implement Collector<T, A, R>:
T— type of stream elementsA— mutable accumulation type (intermediate container)R— result type
public class JoiningCollector implements Collector<String, StringJoiner, String> {
private final String delimiter;
public JoiningCollector(String delimiter) { this.delimiter = delimiter; }
@Override
public Supplier<StringJoiner> supplier() {
return () -> new StringJoiner(delimiter);
}
@Override
public BiConsumer<StringJoiner, String> accumulator() {
return StringJoiner::add;
}
@Override
public BinaryOperator<StringJoiner> combiner() {
return StringJoiner::merge; // for parallel streams
}
@Override
public Function<StringJoiner, String> finisher() {
return StringJoiner::toString;
}
@Override
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
In practice, Collector.of is simpler:
Collector<String, StringJoiner, String> joiner =
Collector.of(
() -> new StringJoiner(", "), // supplier
StringJoiner::add, // accumulator
StringJoiner::merge, // combiner
StringJoiner::toString // finisher
);
String result = Stream.of("a", "b", "c").collect(joiner); // "a, b, c"
Real-World Custom Collector: Top-N per Group
A common analytics requirement is “for each category, give me the top 3 by revenue”. The built-in collectors cannot express this directly — a custom collector makes it reusable across the codebase.
/**
* Collector that keeps only the top N elements by a given comparator.
* Memory-efficient: never accumulates more than N + 1 elements per accumulation step.
*/
public class TopNCollector<T> implements Collector<T, PriorityQueue<T>, List<T>> {
private final int n;
private final Comparator<T> comparator;
public TopNCollector(int n, Comparator<T> comparator) {
this.n = n;
this.comparator = comparator;
}
/** Factory method for cleaner call sites */
public static <T> TopNCollector<T> top(int n, Comparator<T> comparator) {
return new TopNCollector<>(n, comparator);
}
@Override
public Supplier<PriorityQueue<T>> supplier() {
// Min-heap: the smallest element is always at the head — easy to evict
return () -> new PriorityQueue<>(n + 1, comparator);
}
@Override
public BiConsumer<PriorityQueue<T>, T> accumulator() {
return (heap, element) -> {
heap.offer(element);
if (heap.size() > n) heap.poll(); // evict the smallest
};
}
@Override
public BinaryOperator<PriorityQueue<T>> combiner() {
return (heap1, heap2) -> {
heap2.forEach(e -> {
heap1.offer(e);
if (heap1.size() > n) heap1.poll();
});
return heap1;
};
}
@Override
public Function<PriorityQueue<T>, List<T>> finisher() {
return heap -> {
List<T> result = new ArrayList<>(heap);
result.sort(comparator.reversed()); // largest first in the final list
return result;
};
}
@Override
public Set<Characteristics> characteristics() {
return Collections.emptySet(); // not UNORDERED — order in result matters
}
}
Usage — top 3 orders by revenue per region:
// First group by region, then apply the custom collector as downstream
Map<String, List<Order>> top3PerRegion = orders.stream()
.collect(Collectors.groupingBy(
Order::getRegion,
TopNCollector.top(3, Comparator.comparingDouble(Order::getRevenue))
));
// Or directly across all orders: global top 5
List<Order> globalTop5 = orders.stream()
.collect(TopNCollector.top(5, Comparator.comparingDouble(Order::getRevenue)));
This pattern is far more memory-efficient than sorted().limit(N) on large streams because it never sorts the entire stream — it maintains a bounded heap of at most N elements throughout the pipeline.
Characteristics Explained
| Characteristic | Meaning |
|---|---|
CONCURRENT | Accumulator can be called concurrently — allows single shared mutable container for parallel streams |
UNORDERED | Result does not depend on encounter order — enables parallelism optimisations |
IDENTITY_FINISH | Finisher is Function.identity() — JVM skips calling it |
Most custom collectors should return Collections.emptySet() from characteristics() until you specifically verify the others apply.
Real-World Examples
E-Commerce: Q1 Revenue from Active Orders
Filter by date range, flatten line items, and sum — all in one pipeline:
BigDecimal q1Revenue = orders.stream()
.filter(o -> o.getOrderDate().getYear() == 2026
&& o.getOrderDate().getMonthValue() <= 3) // Q1
.filter(o -> !o.getStatus().equals("CANCELLED"))
.flatMap(o -> o.getLineItems().stream()) // flatten items
.map(LineItem::getSubtotal)
.reduce(BigDecimal.ZERO, BigDecimal::add);
HR Analytics: High Earners by Department
// Map<Department, List<String>> — names of employees earning above threshold
Map<String, List<String>> highEarnersByDept = employees.stream()
.filter(emp -> emp.getSalary() > 60_000)
.collect(Collectors.groupingBy(
Employee::getDepartment,
Collectors.mapping(Employee::getName, Collectors.toList())
));
Log Analysis: Error Frequency from a Large File
// Parse an app.log and count occurrences of each error code
Map<String, Long> errorFrequency;
try (Stream<String> lines = Files.lines(Paths.get("app.log"))) {
errorFrequency = lines
.filter(line -> line.contains("ERROR"))
.map(line -> line.replaceFirst(".*ERROR\\s+", "").split("\\s+")[0])
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
}
// Top 5 most frequent errors
errorFrequency.entrySet().stream()
.sorted(Map.Entry.<String, Long>comparingByValue().reversed())
.limit(5)
.forEach(e -> System.out.printf("%-30s %d%n", e.getKey(), e.getValue()));
Customer Segmentation: Preserve All Groups Even When Empty
Using Collectors.groupingBy with a filter before the stream drops groups with no matches. Use filtering as a downstream collector to keep all keys:
// WRONG: regions with no high-value customers disappear from the result
Map<String, List<Customer>> wrong = customers.stream()
.filter(c -> c.getTotalSpend() > 10_000)
.collect(Collectors.groupingBy(Customer::getRegion));
// RIGHT: all regions appear; some may have empty lists
Map<String, List<Customer>> segmented = customers.stream()
.collect(Collectors.groupingBy(
Customer::getRegion,
Collectors.filtering(
c -> c.getTotalSpend() > 10_000,
Collectors.toList()
)
));
// Result: {USA=[cust1, cust2], EU=[], ASIA=[cust3]}
Report: Sales by Region and Product Category
// Map<region, Map<category, totalRevenue>>
Map<String, Map<String, Double>> salesReport = orders.stream()
.collect(Collectors.groupingBy(
Order::getRegion,
Collectors.groupingBy(
Order::getCategory,
Collectors.summingDouble(Order::getRevenue)
)
));
Histogram: Character Frequency
Map<Character, Long> frequency = "hello world".chars()
.mapToObj(c -> (char) c)
.filter(c -> c != ' ')
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
Index: Build a Search Index from Documents
// Map<word, Set<documentId>> — inverted index
Map<String, Set<Long>> invertedIndex = documents.stream()
.flatMap(doc ->
Arrays.stream(doc.getContent().split("\\s+"))
.map(word -> Map.entry(word.toLowerCase(), doc.getId()))
)
.collect(Collectors.groupingBy(
Map.Entry::getKey,
Collectors.mapping(Map.Entry::getValue, Collectors.toSet())
));
Summary
| Collector | Output type | Use case |
|---|---|---|
toList() | List<T> | Ordered collection |
toSet() | Set<T> | Dedup, unordered |
toMap(k, v) | Map<K,V> | Key-value lookup |
joining(delim) | String | Concatenate strings |
groupingBy(f) | Map<K, List<T>> | Group by field |
groupingBy(f, downstream) | Map<K, R> | Group + post-process |
partitioningBy(p) | Map<Boolean, List<T>> | Split into two groups |
counting() | Long | Count per group |
summingInt/Long/Double | Integer/Long/Double | Sum per group |
averagingInt/Long/Double | Double | Average per group |
mapping(f, downstream) | varies | Transform before collecting |
collectingAndThen(c, f) | varies | Post-process result |
Next Step
Parallel Streams: ForkJoinPool, Spliterators, and When NOT to Parallelize →
Part of the DevOps Monk Java tutorial series: Java 8 → Java 11 → Java 17 → Java 21