Introducing Stream Gatherers (JEP-461) for Enhanced Java Stream Operations

Photo of Mike Redlich by Mike Redlich

The upcoming JDK 22 release is set to introduce a promising new feature, JEP 461: Stream Gatherers (Preview), aimed at enriching the Java Stream API with capabilities for custom intermediate operations. This addition will enable you to manipulate data streams in ways that were previously complex or not directly supported by the existing API. 

 

Stream API Quick Recap

 

The Stream API, introduced in JDK 8, defines a few terms:

 

  • A stream is a lazily computed, potentially unbounded sequence of values that supports the ability to process a stream either sequentially or in parallel.
  • A stream pipeline consists of three parts:
    • a source of elements
    • any number of intermediate operations (like filtering or sorting)
    • a terminal operation

 

The Stream API, anchored by the Stream interface, provides built-in intermediate operations such as mapping, filtering, reduction and sorting. A typical stream pipeline involves a source, any number of intermediate operations (like filtering or sorting), and concludes with a terminal operation that often collects or sums up the results.

Simple Stream Example

 

 Let’s look at a simple example that counts the number of non-zero length words in a given stream:

 

public static long getNumberOfNonZeroWords() {

    return Stream.of("Jakarta", "", "EE", "will", "be", "released", "", "soon") ➊

            .filter(Predicate.not(String::isEmpty)) ➋

            .collect(Collectors.counting()); ➌

    }


 

➊ creates a new, unprocessed stream containing eight elements.

➋ initiates an filter operation to count the number of non-zero length words. It returns a new stream based on the result of the filtering.

➌ initiates the terminal operation that evaluates the final stream.

 

The value returned upon successful execution of the getNumberOfNonZeroWords() method defined above would therefore be 6.

 

The Stream API has, of course, become very popular and is ideal for many tasks using its built-in set of intermediate operations. However, complex tasks may not be so easily expressed due to the limitations of the current set of built-in intermediate operations.

 

Stream Operations Without Gatherers

 

To demonstrate this, let’s take a look at an example of a complex task that groups elements into fixed-size groups of three, but retain only the first two groups. Therefore, the stream [0, 1, 2, 3, 4, 5, 6, ...] should produce [[0, 1, 2], [3, 4, 5]]. Since there is no intermediate operation for this task, one way to accomplish it would be through this method:

 

public static ArrayList<ArrayList<Integer>> findGroupsOfThree(long fixed_size, int grouping) {

    return Stream.iterate(0, i -> i + 1)

            .limit(fixed_size * grouping)

            .collect(Collector.of(

                    () -> new ArrayList<ArrayList<Integer>>(),

                    (groups, element) -> {

                        if(groups.isEmpty() || groups.getLast().size() == fixed_size) {

                            var current = new ArrayList<Integer>();

                            current.add(element);

                            groups.addLast(current);

                            }

                        else {

                            groups.getLast().add(element);

                            }

                        },

                    (left, right) -> {

                        throw new UnsupportedOperationException("Cannot be parallelized");

                        }

            ));

    }

 

 

Please note the values for the variables, fixed_size and grouping, are 3 and 2, respectively. The result returned upon successful execution of the findGroupsOfThree(long, int) method is: [[0, 1, 2], [3, 4, 5]], but as you can easily see, this method is overly complex and would be difficult to maintain. Wouldn’t it be nice to have an intermediate operation to make this method more easily readable and easy to maintain?

 

Stream Gatherers to the Rescue

 

The Stream Gatherers API, anchored by the Gatherer interface and Gatherers class, provides additional built-in intermediate operations such as:

 

  • Folding: a generalization of reduction where the result type is the same as the element type, the combiner-function is associative, and the initial value is an identity for the combiner-function.
  • Unfolding: takes an aggregate and decomposes it into elements. An unfold is essentially the reverse operation of a fold.
  • Barriers: requires that the entirety of one operation is completed before providing any elements to the next operation.
  • Windowing: constructs a stream which groups the original elements, either in overlapping or disjoint groups.
  • Prefix Scans: a stream of incremental reductions.
  • Duplicated Elements: finds those elements that are duplicated as opposed to removing duplicates as performed by Stream::distinct operation.

 

Stream Operations With Gatherers

 

So, given these new set of intermediate operations, let’s improve upon the findGroupsOfThree() method:

 

public static List<List<Integer>> findGroupsOfThreeWithGatherer(long fixed_size, int grouping) {

    return Stream.iterate(0, i -> i + 1)

            .gather(Gatherers.windowFixed((int)fixed_size)) ➊➋

            .limit(grouping)

            .collect(Collectors.toList());

    }


 

➊ a new method, gather(), will be added to the Stream API in JDK 22.

➋ the windowFixed() method returns a Gatherer that gathers elements into windows, i.e., encounter-ordered groups of elements, of a fixed size.

 

The result returned upon successful execution of the findGroupsOfThreeWithGatherer(long, int) method is the same: [[0, 1, 2], [3, 4, 5]]. This is, of course, much easier to read and maintain.

 

Further details on the Stream Gatherers API may be found in the original design document written by Viktor Klang, Software Architect, Java Platform Group at Oracle.

 

As you can see, the new Stream Gatherers API does indeed enhance the Stream API with new built-in intermediate operations to solve more complex tasks. It is important to note that JEP 461 is a preview feature. We don’t know how many preview JEPs will evolve with this feature before it is decided to declare it final.

 

And since JDK 22 is still in early access until its formal release on March 19, 2024, experimenting with Stream Gatherers requires the use of the --enable-preview command line parameter. And it is also important to note that as this feature evolves through JDK 23 and beyond, things can drastically change. Stay tuned!

 

Several demo applications that complement this blog post, including the ones that have been highlighted here, are available at this GitHub repository. The README.md file provides details on how to use --enable-preview on the command line or configure with Maven.

 

Conclusion

Stream Gatherers represents a significant advancement in Java's ability to handle complex data processing tasks more elegantly. As JDK 22 nears its release, the Java community eagerly anticipates the opportunities this feature will unlock for stream-based data manipulation and analysis.

 

WAIT!

Have you tried out all new Payara as a Service for deploying your Jakarta EE applications? No? Don't take our word for it, try Payara Cloud for yourself and see how easy it is to have your application running on a fully hosted Payara runtime in seconds. Go on, try it

Comments