Nugget Friday - Stream Gatherers: A New Way to Enhance Your Java Streams

Photo of Luqman Saeed by Luqman Saeed

Welcome to another Nugget Friday! Today, we’re excited to share some insights into a recent development in the Java ecosystem that promises to enhance the way we process data with streams, exploring how it can make your stream processing more flexible, efficient and expressive. 

The Problem

Since Java 8, the Java Stream API has been a powerful tool for processing collection data. However, it has its limitations. Sometimes, the built-in operations just don't cut it, with software developers finding that the built-in operations don't quite meet their needs. You might need a specific transformation that doesn't exist, or you might find yourself resorting to clunky workarounds like intermediate lists or complex collectors.

Let's consider a scenario where you want to group elements into fixed-size groups of three, keeping only the first two groups:

public static ArrayList<ArrayList<Integer>> groupInThrees(int size, int numGroups) {


return Stream.iterate(0, i -> i + 1)
.limit((long) size * numGroups)
.collect(Collector.of(
ArrayList::new,
(groups, element) -> {
if (groups.isEmpty() || groups.getLast().size() == size) {
groups.add(new ArrayList<>(List.of(element)));
} else {
groups.getLast().add(element);
}
},
(_, _) -> { throw new UnsupportedOperationException("Cannot be parallelized");
}
));
}

This code achieves the goal, but it's quite verbose and hard to read. The custom collector within collect() adds unnecessary complexity.

The Solution: Stream Gatherers

Introduced as a preview feature in JDK 22, Java Stream Gatherers (JEP 461) provide a way to define custom intermediate operations, giving you the flexibility to create exactly what you need. Think of them as the "missing link" in the Stream API, allowing you to build pipelines that are more expressive and efficient.

How It Works

Gatherers work by consuming elements from an input stream and producing elements for an output stream. They have four key functions:

  1. Initializer: Creates a state object to track information during processing.
  2. Integrator: Consumes each input element, updates the state, and optionally pushes elements to the output stream.
  3. Combiner: Merges state objects from parallel streams, if applicable.
  4. Finisher: Performs any final actions and optionally pushes more elements to the output stream.

Built-in Gatherers

To get you started, Java 22 provides several useful built-in stream gatherers:

  • fold: Creates an aggregate incrementally and emits it at the end.
  • mapConcurrent: Executes a function concurrently for each input element.
  • scan: Produces a new element based on the current state and input element.
  • windowFixed: Groups elements into fixed-size lists.
  • windowSliding: Creates sliding windows over the input stream.

Why You Should Care

  • Flexibility: Customize your stream pipelines to meet specific needs.
  • Efficiency: Parallel-ready gatherers can boost performance for suitable tasks.
  • Expressiveness: Write cleaner, more intuitive code for complex transformations.
  • Future-Proofing: Prepare your codebase for the evolving Stream API.

With Stream Gatherers, we can rewrite our above examples as follows:

public static List<List<Integer>> findGroupsOfThreeWithGatherer(long fixed_size, int grouping) {
return Stream.iterate(0, i -> i + 1)
.gather(Gatherers.windowFixed((int)fixed_size)).limit(grouping)
.collect(Collectors.toList());
}

This code is smoother, concise and much easier to read.

Caveats

  • Gatherers are a preview feature, so they might change in future releases.
  • Not all gatherers can be parallelized effectively.
  • While powerful, gatherers can add complexity to your code. Use them judiciously!

Conclusions

Stream Gatherers are a great addition to the Java Language, especially given the pervasiveness of collections in Java applications. They open up a whole new world of possibilities, allowing you to tackle tasks that were previously difficult or impossible. So why not give them a try? Your code (and your future self) will thank you!

By integrating gatherers into your stream pipelines, you can achieve a level of customization and efficiency that was not possible with the traditional Stream API. This is particularly beneficial for complex data processing tasks that require unique transformations or aggregations. To learn more, you can read our detailed blog post on Stream Gatherers here

Happy Friday and Happy Coding!

 

Related Posts

Comments