Friday, February 4, 2011

Forks and joins in Scala

A short while ago Alex Miller (aka puredanger) wrote up a blog entry detailing how to use Clojure to interact with the new fork/join concurrency framework to be included with Java7 (and already available from Doug Lea's site). The meat of Alex's solution is a Java wrapper for Clojure's IFn type which integrates nicely with the fork/join framework. At the time there was some discussion about whether a Scala implementation would require an equivalent amount of scaffolding. It turns out that supporting this functionality in Scala requires a bit more effort than one might expect, although the problems one faces are quite different than what Alex worked through in Clojure. We review the steps to a working Scala implementation and see what we can learn from them.

Before we move on, a few notes about fork/join itself. A full overview is outside the scope of this post, however details can be found in the jsr166y section of Doug Lea's site. A solid (although slightly out-of-date) review can be found in a developerWorks article by Brian Goetz.

We begin as simply as possible; a straight port of Alex's code (in this case an implementation of Fibonacci using fork/join) to Scala. We create a named function fib that will be recursively called by our RecursiveTasks. We also use an implicit conversion to map no-arg anonymous functions onto RecursiveTask instances using the technique for supporting SAM interfaces discussed in an earlier post. The resulting code looks pretty good but an attempt to build with sbt gives an unexpected error:

[info] == test-compile ==
[info] Source analysis: 2 new/modified, 0 indirectly invalidated, 0 removed.
[info] Compiling test sources...
[error] .../src/test/scala/org/fencepost/forkjoin/ForkJoinTestFailOne.scala:23: method compute cannot be accessed in jsr166y.RecursiveTask[Int]
[error] (f2 compute) + (f1 join)
[error] ^
[error] one error found

A bit of investigation reveals the problem; RecursiveTask.compute() is a protected abstract method in the fork/join framework. The compute method in the class returned by our implicit conversion consists of nothing more than a call to fib.apply() but there's no way to know this when fib is defined. It follows that an attempt to access RecursiveTask.compute() from within the body of fib is (correctly) understood as an attempt to access a protected method.

How does Alex get around this problem in Clojure? He doesn't; the problem is actually resolved via the Java wrapper. Note that in his Java code compute() has been "elevated" to be a public method. He's thus able to call this method without issue from his "fib-task" function. Without this modification his code fails with similar errors [1].

Strangely, there's no obvious way to resolve this issue within Scala itself. The language provides protected and private keywords to restrict access to certain methods but there is no public keyword; methods are assumed to be public if not annotated otherwise. As a consequence there's no obvious way to "raise" the access level of a method in a subclass. We work around this constraint by way of an egregious hack; we define a new subclass of RecursiveTask containing a surrogate method that provides public access to compute. We then return this subclass from our implicit conversion.

Our new implementation now compiles, but when we try to actually execute the test with sbt we see a curious error; the test starts up and then simply hangs:

[info] == test-compile ==
[info] == copy-test-resources ==
[info] == copy-test-resources ==
[info] == test-start ==
[info] == test-start ==
[info] == org.fencepost.forkjoin.ForkJoinTestFailTwo ==
[info] Test Starting: testFibonacci

Well, this isn't good. What's going on here?

The key point to understand is that the implicit conversion is called every time an existing value needs to be converted to a different type. As written the calls to f1.fork() and f1.join() both require conversion, and the implicit conversion function in play here will return a new instance on each invocation. This means that even though the input to the conversion function is identical in both cases (f1) the object that invokes fork() will be a different object than that which invokes join(). For fork/join tasks this matters; join() is called on an object that was never forked(). Voila, an unterminated wait.

We can resolve this issue one of two ways:

  • Caching previous values returned by the implicit conversion function and re-using them when appropriate

  • Explicitly specifying the type of f1 and f2 as our wrapper subclass. This has the effect of forcing implicit conversion at the time of assignment rather than when we call fork() and join()

I chose the first approach, mainly because it seemed a bit cooler. Oh, and it's a bit cleaner logically as well. The final result runs without issue.

[1] Using Alex's code (as published on his blog):

bartok ~/git/puredanger_forkjoin $ clojure process.clj
(1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597 2584 4181)

After changing IFnTask.compute() to protected (with no other changes):

bartok ~/git/puredanger_forkjoin $ clojure process.clj
(Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
at java.lang.reflect.Method.invoke(
at jline.ConsoleRunner.main(Unknown Source)
Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: No matching field found: compute for class revelytix.federator.process.IFnTask (process.clj:0)
at clojure.lang.Compiler.eval(
at clojure.lang.Compiler.load(
at clojure.lang.Compiler.loadFile(
at clojure.main$load_script.invoke(main.clj:221)
at clojure.main$script_opt.invoke(main.clj:273)