Sunday, August 31, 2014

Overcoming Java 8 streams

Java 8 streams provide functional and LINQ-like features in a fluent API. But streams are not without drawbacks:

  • Referenced methods and lambdas cannot throw checked exceptions
  • Controlling the threads used, especially for parallel streams, is awkward
  • Streams are not designed for extension

Overcoming these drawbacks requires a "look-a-like" API. For example, implementing java.util.stream.Stream does not help: none of the existing methods throw checked exceptions, and none of the existing stream factory helpers would return your implementation with new methods.

So I wrote my own, copying the existing stream API, updating the methods to throw checked exceptions:

hm.binkley.util.stream.CheckedStream ('develop' branch for now)

From the javadoc:

CheckedStream is a throwing Stream look-a-like with control over thread pool. It cannot be a Stream as it takes throwing versions of suppliers, functions and consumers. Otherwise it is a faithful reproduction.

Write this:

   long beanCount() throws SomeException, OtherException {
       checked(Stream.of(1, 2, 3)).
           map(this::someThrowingFunction).
           peek(That::oldBean).
           count();
   }

not this:

   long beanCount() throws SomeException, OtherException {
       try {
           Stream.of(1, 2, 3).
               map(i -> {
                   try {
                       someThrowingFunction(i);
                   } catch (final SomeException e) {
                       throw new RuntimeException(e);
                   }
               }).
               peek(i -> {
                   try {
                       That.oldBean(i);
                   } catch (final OtherException e) {
                       throw new RuntimeException(e);
                   }
               }).
               count();
       } catch (final RuntimeException e) {
           final Throwable x = e.getCause();
           if (x instanceof SomeException)
               throw (SomeException) x;
           if (x instanceof OtherException)
               throw (OtherException) x;
           throw e;
       }
   }

"Intentional" exceptions (checked exceptions plus CancellationException) have "scrubbed" stacktraces: frames from framework/glue packages are removed before the intentional exception is rethrown to calling code. Scrubbed stacktraces are much easier to understand, the framework and glue code having been removed.

To see the unscrubbed stacktrace, set the system property "hm.binkley.util.stream.CheckedStream.debug" to "true".

Controlling the thread pool used by Stream is a challenge. Deep in the implementation, it checks if being run in a ForkJoinTask, and uses that thread if so, otherwise using the common pool. So with CheckedStream write this:

       checked(stream, new ForkJoinPool()).
           map(currentThread()).
           forEach(System.out::println);

not this:

       try {
           new ForkJoinPool().submit(() -> stream.
                   map(currentThread()).
                   forEach(System.out::println)).
               get();
       } catch (final ExecutionException e) {
           final Throwable x = e.getCause();
           if (x instanceof Error)
               throw (Error) x;
           if (x instanceof RuntimeException)
               // Much tricker when stream functions throw runtime
               throw (RuntimeException) x;
           throw new Error(e); // We have no checked exceptions in this example
       }

Care is taken to respect lazy and terminal operations in using thread pools. Changing thread pool or thread mode mid-stream is supported, and are "immediate" operations: they terminate the existing stream, and start a new one with the changes:

stream.sequential().
    filter(this::someFilter).
    parallel(threads). // Existing lazy operations terminated
    map(this:someMapper).
    forEach(System.out::println);

Immediate operations ensure stream methods are run in the correct threading context.

I hope you'll agree: CheckedStream is nicer to use, especially with existing code using checked exceptions.

Suggestions, bug fixes, improvements welcome!