Monday, September 12, 2011

Ruby and Concurrency: The Mechanics of Akka and JRuby

I'm interested in concurrency. I'm also interested in Ruby. There doesn't seem to be much reason to keep these two guys apart anymore.

This post marks the beginning of an occasional series on the topic of using Ruby to write concurrent code. Ruby doesn't yet have a big reputation in the world of concurrent and/or parallel applications, but there is some interesting work being done in this space. And since problems in concurrency are notoriously difficult to reason about we could probably do a lot worse than attempt to address those problems in a language designed to make life easier for the developer.

We begin with Akka, an excellent library designed to bring actors, actor coordination and STM to Scala and, to a slightly lesser degree, the JVM generally. Our initial task is simple: we wish to be able to define an Akka actor by providing a message handler as a code block. We'll start by attempting to implement the actor as a standalone class and then abstract that solution into code which requires the input block and nothing else.

A simple initial implementation looks something like this:

require 'java'
require 'akka-actor-1.2-RC6.jar'
require 'scala-library.jar'
java_import 'akka.actor.UntypedActor'
java_import 'akka.actor.Actors'
# Start with something simple. Implement the actor as a distinct
# class and start it up within the Akka runtime.
# Look, I've implemented the actor API!
class SomeActor < UntypedActor
def onReceive msg
puts "Received message: #{msg.to_s}"
end
end
# So we should be fine then, right?
ref = Actors.actorOf SomeActor.new
ref.start
ref.tell "foo"
view raw akka_sample1.rb hosted with ❤ by GitHub


[@varese ruby]$ ruby --version
jruby 1.6.2 (ruby-1.8.7-p330) (2011-05-23 e2ea975) (OpenJDK Client VM 1.6.0_22) [linux-i386-java]
[@varese ruby]$ ruby akka_sample1.rb
ArgumentError: Constructor invocation failed: ActorRef for instance of actor [org.jruby.proxy.akka.actor.UntypedActor$Proxy0] is not in scope.
You can not create an instance of an actor explicitly using 'new MyActor'.
You have to use one of the factory methods in the 'Actor' object to create a new actor.
Either use:
'val actor = Actor.actorOf[MyActor]', or
'val actor = Actor.actorOf(new MyActor(..))'
(root) at akka_sample1.rb:20


Well, that didn't work very well. Apparently the class we defined in JRuby is being exposed to the Java lib as a proxy object and that proxy's class is unknown to the Akka runtime. No problem; Akka supports a factory model for actor creation, and by using that approach the underlying class of our actor should become a non-issue. With a few simple changes we're ready to try again:

require 'java'
require 'akka-actor-1.2-RC6.jar'
require 'scala-library.jar'
java_import 'akka.actor.UntypedActor'
java_import 'akka.actor.Actors'
# A second attempt. Define our actor in a standlone class again
# but this time use an ActorFactory (via closure coercion) to
# interact with Akka.
# Define our actor in a class again...
class SomeActor < UntypedActor
def onReceive msg
puts "Received message: #{msg.to_s}"
end
end
# ... and then provide an UntypedActorFactory to instantiate
# the actor. The factory interface qualifies as a SAM so
# we only need to provide a closure to generate the actor and
# let JRuby's closure coercion handle the rest.
ref = Actors.actorOf do
SomeActor.new
end
ref.start
ref.tell "foo"
view raw akka_sample2.rb hosted with ❤ by GitHub


[@varese ruby]$ ruby akka_sample2.rb
Received message: foo


We now have a working actor, but we still have some work to do; remember, we want to be able to define arbitrary actors by supplying just a code block. We need a few additional pieces to make this work:


  • A generic actor implementation whose state includes a block or Proc instance. The onReceive method of this actor could then simply call the contained block/Proc, passing the input message as an arg.

  • An ActorFactory implementation which takes a code block as an arg, stores it in internal state and then uses that block to build an instance of the generic actor described above on demand.



A first cut at this concept might look something like this:

require 'java'
require 'akka-actor-1.2-RC6.jar'
require 'scala-library.jar'
java_import 'akka.actor.UntypedActor'
java_import 'akka.actor.UntypedActorFactory'
java_import 'akka.actor.Actors'
# Shift to working with code blocks via a generic actor and a simple
# factory for creating them.
class SomeActor < UntypedActor
# Look, I've got a constructor... but it'll get ignored!
def initialize(proc)
@proc = proc
end
def onReceive(msg)
@proc.call msg
end
end
class SomeActorFactory
include UntypedActorFactory
def initialize(&b)
@proc = b
end
def create
SomeActor.new @proc
end
end
# Create a factory for an actor that uses the input block to pass incoming messages.
factory = SomeActorFactory.new do |msg|
puts "Message recieved: #{msg.to_s}"
end
ref = Actors.actorOf factory
ref.start
ref.tell "foo"
view raw akka_sample3.rb hosted with ❤ by GitHub


[@varese ruby]$ ruby akka_sample3.rb
ArgumentError: wrong number of arguments for constructor
create at akka_sample3.rb:29
(root) at akka_sample3.rb:37


What went wrong here? UntypedActor is a concrete class with a defined no-arg constructor. That constructor is being called in favor of the one we've provided, and as a consequence our block never gets into the mix. There's almost certainly a cleaner way to solve this using JRuby, but for the moment we can get around the problem (in an admittedly ugly way) by providing a setter on our generic actor class:

require 'java'
require 'akka-actor-1.2-RC6.jar'
require 'scala-library.jar'
java_import 'akka.actor.UntypedActor'
java_import 'akka.actor.UntypedActorFactory'
java_import 'akka.actor.Actors'
# Continue with the generic actor implementation, but shift to using a setter
# for passing in the Proc to which we'll delegate message handling.
class SomeActor < UntypedActor
def proc=(b)
@proc = b
end
def onReceive(msg)
@proc.call msg
end
end
class SomeActorFactory
include UntypedActorFactory
def initialize(&b)
@proc = b
end
def create
rv = SomeActor.new
rv.proc = @proc
rv
end
end
# Create a factory for an actor that uses the input block to pass incoming messages.
factory = SomeActorFactory.new do |msg|
puts "Message recieved: #{msg.to_s}"
end
ref = Actors.actorOf factory
ref.start
ref.tell "foo"
view raw akka_sample4.rb hosted with ❤ by GitHub


[@varese ruby]$ ruby akka_sample4.rb
Message recieved: foo


We now have what we wanted.

If you're interested in this topic note that Nick Sieger has covered similar ground (including the interaction between JRuby and Akka) here. Nick's article draws on some very good work done by Daniel Ribeiro late last year. The code referenced in Daniel's article is available on Github. I didn't come across Daniel's post until my own was nearly done but there is quite a bit of overlap between his code and mine. That said, I recommend taking a look at both articles, if for no other reason than the fact that both authors are much better at writing Ruby code than I am.

Friday, September 2, 2011

Cassandra and Clojure: Things To Bytes And Back Again

In a previous post we briefly discussed the idea of using idiomatic Clojure to access data contained in a Cassandra instance, including transparent conversion to and from Clojure data. We'll explore an implementation of this idea, although we won't address the question of laziness, in part because there are sizable trade-offs to consider. For example, any solution that provides lazy evaluation must do so while also attempting to minimize the number of trips to the underlying data store. This question may be picked up in yet more future work, but for now we'll continue on. We've upgraded to Cassandra 0.8.2 and Clojure 1.2, and we're using a new data model (see below), but for the most part we'll try to pick up where we left off.

At the core the Cassandra data model is centered around columns. Each of these columns contains both a name and a value, both of which are represented as binary data. This model is very simple, and while it may appear limiting it is in reality quite flexible. The lack of any pre-defined data types avoids any "impedance mismatch" resulting from structured data being shoehorned into data types that don't really fit. We're free to represent column values in any way we see fit; if we can convert it into bytes it's fair game. Our problem thus devolves into a question of serialization, and suddenly there are many suitors vying for our attention. Among the set of well-known serialization packages we find Google's Protocol Buffers, Thrift and Avro. And since we're working in a language with good Java interop we can always have Java's built-in serialization (or something similar like Kryo) available. Finally we're always free to roll our own.

Let's begin by ruling out that last idea. There are already well-tested third-party serialization libraries so unless we believe that all of them suffer from some fatal error it's difficult to justify the risk and expense of creating something new. We'd also like our approach to have some level of cross-platform support so native Java serialization is excluded (along with Kryo). We also need to be able to encode and decode arbitrary data without defining message types or their payload(s) in advance, a limitation that rules out Protocol Buffers and Thrift. The last man standing is Avro, and fortunately for us he's a good candidate. Avro is schema-based but the library includes facilities for generating schemas on the fly by inspecting objects via Java reflection. Also included is a notion of storing schemas with data, allowing the original object to be easily reconstructed as needed. The Avro data model includes a rich set of basic types as well as support for "complex" types such as arrays and maps.

We'll need to implement a Clojure interface into the Avro functionality; this can be as simple as methods to encode and decode data and schemas. At least some implementations of Avro data files use a pre-defined "meta-schema" (consisting of the schema for the embedded data and that data itself) for storing both items. Consumers of these files first decode the meta-schema then use the discovered schema to decode the underlying data. We'll follow a similar path for our library. We'll also extend our Cassandra support a bit in order to support the insertion of new columns for a given key and column family.

Our core example will be built around a collection of employee records. We want to create a report on these employees using attributes defined in various well-known columns. We'd like to access the values in these columns as simple data types (booleans, integers, perhaps even an array or a map) but we'd like to do so through a uniform interface. We don't want to access certain columns in certain ways, as if we "knew" that a specific column contained data of a specific type. Any such approach is by definition brittle if the underlying data model should shift in flight (as data models are known to do).

After finishing up with our Clojure libraries we implement a simple app for populating our Cassandra instance with randomly-generated employee information:

; Populate data for a set of random users to a Cassandra instance.
;
; Users consist of the following set of data:
; - a username [String]
; - a user ID [integer]
; - a flag indicating whether the user is "active" [boolean]
; - a list of location IDs for each user [list of integer]
;
; User records are keyed by username rather than user IDs, mainly because at the moment
; we only support strings for key values. The Cassandra API exposes keys as byte arrays
; so we could extend our Cassandra support to include other datatypes.
(use '[fencepost.avro])
(use '[fencepost.cassandra])
(import '(org.apache.commons.lang3 RandomStringUtils)
'(java.util Random)
)
; Utility function to combine our Avro lib with our Cassandra lib
(defn add_user [client username userid active locationids]
(let [userid_data (encode_with_schema userid)
active_data (encode_with_schema active)
locationids_data (encode_with_schema locationids)]
(insert client username "employee" "userid" userid_data)
(insert client username "employee" "active" active_data)
(insert client username "employee" "locationids" locationids_data)
)
)
; Generate a list of random usernames
(let [client (connect "localhost" 9160 "employees")]
(dotimes [n 10]
(let [username (RandomStringUtils/randomAlphanumeric 16)
random (Random.)
userid (.nextInt random 1000)
active (.nextBoolean random)
locationids (into [] (repeatedly 10 #(.nextInt random 100)))]
(add_user client username userid active locationids)
(println (format "Added user %s: [%s %s %s]" username userid active locationids))
)
)
)


[varese clojure]$ ~/local/bin/clojure add_employee_data.clj
Added user gFc0pVnLKPnjrrLx: [158 true [77 73 99 58 31 64 1 37 70 69]]
Added user 5gGh9anHwFINpr5t: [459 true [34 71 28 1 2 84 11 33 37 28]]
Added user pGRMeXBTFoBIWhud: [945 true [45 83 51 45 11 4 80 68 73 27]]
...


We're now ready to create our reporting application. Using Avro and a consistent meta-schema this comes off fairly easily:

; Retrieve information from the Cassandra database about one of our employees
(use '[fencepost.avro])
(use '[fencepost.cassandra])
(defn evaluate_user [slices username]
"Gather information for the specified user and display a minimal report about them"
; Note that the code below says nothing about types. We specify the column names we
; wish to access but whatever Cassandra + Avro supplies for the value of that column
; is what we get.
(let [user_data (range_slices_columns slices username)
userid (decode_from_schema (user_data :userid))
active (decode_from_schema (user_data :active))
locationids (decode_from_schema (user_data :locationids))]
(println (format "Username: %s" username))
(println (format "Userid: %s" userid))
(println (if (> userid 0) "Userid is greater than zero" "Userid is not greater than zero"))
(println (format "Active: %s" active))
(println (if active "User is active" "User is not active"))
; Every user should have at least one location ID.
;
; Well, they would if we were able to successfully handle an Avro record.
;(assert (> (count locationids) 0))
)
)
(let [client (connect "localhost" 9160 "employees")
key_slices (get_range_slices client "employee" "!" "~")
keys (range_slices_keys key_slices)]
(println (format "Found %d users" (count keys)))
(dotimes [n (count keys)]
(evaluate_user key_slices (nth keys n))
)
)


[varese clojure]$ ~/local/bin/clojure get_employee_data.clj
Found 10 users
Username: 5gGh9anHwFINpr5t
Userid: 459
Userid is greater than zero
Active: true
User is active
...


And in order to verify our assumptions about simple cross-platform support we create a Ruby version of something very much like our reporting application:

require 'rubygems'
require 'avro'
require 'cassandra'
def evaluate_avro_data bytes
# Define the meta-schema
meta_schema = Avro::Schema.parse("{\"type\": \"map\", \"values\": \"bytes\"}")
# Read the meta source and extract the contained data and schema
meta_datum_reader = Avro::IO::DatumReader.new(meta_schema)
meta_val = meta_datum_reader.read(Avro::IO::BinaryDecoder.new(StringIO.new(bytes)))
# Build a new reader which can handle the indicated schema
schema = Avro::Schema.parse(meta_val["schema"])
datum_reader = Avro::IO::DatumReader.new(schema)
val = datum_reader.read(Avro::IO::BinaryDecoder.new(StringIO.new(meta_val["data"])))
end
client = Cassandra.new('employees', '127.0.0.1:9160')
client.get_range(:employee,{:start_key => "!",:finish_key => "~"}).each do |k,v|
userid = evaluate_avro_data v["userid"]
active = evaluate_avro_data v["active"]
locationids = evaluate_avro_data v["locationids"]
puts "Username: #{k}, user ID: #{userid}, active: #{active}"
puts "User ID #{(userid > 0) ? "is" : "is not"} greater than zero"
puts "User #{active ? "is" : "is not"} active"
# Ruby's much more flexible notion of truthiness makes the tests above somewhat less
# compelling. For extra validation we add the following
"Oops, it's not a number" unless userid.is_a? Fixnum
end


[varese 1.8]$ ruby get_employee_data.rb
Username: 5gGh9anHwFINpr5t, user ID: 459, active: true
User ID is greater than zero
User is active
Username: 76v8iEJcc79Huj9L, user ID: 469, active: false
User ID is greater than zero
User is not active
...


This code meets our basic requirement, but as always there were a few stumbling blocks along the way. Avro includes strings as a primitive type, but unfortunately the Java API (which we leverage for our Clojure code) returns string instances as a Utf8 type. We can get a java.lang.String from these objects, but unfortunately we need another toString() method call that (logically) is completely unnecessary. We also don't fully support complex types. The Avro code maps the Clojure classes representing arrays and maps onto a "record" type that includes the various fields exposed via getters. Supporting these types requires the ability to reconstruct the underlying object based on these fields, and doing so reliably is beyond the scope of this work. Finally, we were forced to use Ruby 1.8.x for the Ruby examples since the Avro gem apparently doesn't yet support 1.9.


Full code can be found on github.