A broken CompletableFuture invocation with an unexpected fix

During a recent training, I was demonstrating the class CompletableFuture and how it could be used to create non-blocking methods like this:


    public static void <T> persist(T entity, Consumer<T> andThen) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Persisting entity...");
            return entity;
        }).thenAccept(andThen);
    }

    public static void main(String[] args) {
        persist("toPersist", System.out::println);
        System.out.println("Done!");
    }

I wrote a method like the one above and ran the application. The output, unfortunately, was only:


    Done!

Where did the "Persisting entity..." output go?

(P.S.: The absolute worst feeling ever is when you create an example during a training, and it doesn't work! I definitely should have had a precreated example, but that is beside the point. =])

Since it can be hard to think when there are 30 people watching you, I did the first thing that came to mind. I created an ExecutorService and handed that reference into the thread pool. Now the code says:


    private static final ExecutorService pool = Executors.newCachedThreadPool();

    public void <T> persist(T entity, Consumer<T> andThen) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Persisting entity...");
            return entity;
        }, pool).thenAccept(andThen);
    }

And it worked! It was a bit of a shock, but I was pleased that I could fix it and move on to other topics. :)

After class, though, it gnawed at me, and I was excited when I finally got a minute to sit down and poke at it.

After peeling away a couple of layers, I found that I could reproduce the problem in the following way:


public static void main(String[] args) {
    ExecutorService pool = new ForkJoinPool(1); //Executors.newCachedThreadPool();

    pool.execute(() -> System.out.println("Done"));

    pool.shutdown();
}

This example will only print out "Done" occasionally. However, if I change it to:


public static void main(String[] args) {
    ExecutorService pool = Executors.newCachedThreadPool();

    pool.execute(() -> System.out.println("Done"));

    pool.shutdown();
}

It will work every time.

What's going on? Long story short, I posted my Fork/Join vs ThreadPoolExecutor question to Stack Overflow and John Vint gave the simple answer that all threads in the Fork Join pool are Daemon threads, which means that the VM will stop running without waiting for them to complete. The ThreadPoolExecutor creates non-deamon threads which causes the runtime to wait until they are done.

What does this mean for the in-class example I gave? If you need the runtime to hang on while your CompletableFuture is finishing, pass an Executors-obtained thread pool as a second parameter since by default it uses the Fork/Join common pool.

Josh Cummings

"I love to teach, as a painter loves to paint, as a singer loves to sing, as a musician loves to play" - William Lyon Phelps

0 comments: