The previous post on designing actors with Akka and JRuby took a bit of a shortcut. Not a big one, mind you, but a shortcut nonetheless. Fortunately the use of this shortcut provides a window into a few interesting aspects of actor design, so despite my embarrassment it's worth spending a bit of time here.
Let's start with the constraints imposed when designing with actors. We've already seen a few of these constraints: all mutable state must be contained within an actor, the outside world (including other actors) can only interact with an actor by sending messages and these messages are handled one at a time so there's no contention on access to mutable state. All this talk of message handling might lead one to wonder; exactly what can an actor do when it receives those messages?
To no one's great surprise there are some constraints here as well. Generally speaking the message handler in an actor consists of some combination of the following actions:
- Creating other actors
- Sending messages to other actors
- Retrieving or updating managed state
There's certainly nothing horrible in that list either. None of these constraints seem terribly onerous. So where did we go wrong?
The problem starts with the Controller's handling of :next messages. Each candidate value is sent off in a message to every model for evaluation, in each case returning a future that can be used to access the model's response when it's provided. The first candidate that gets a positive response from all the models is our next prime. The implementation returns the expected value, but there's a big problem here: how did we observe the response from any of the models if we're still in the message handler for a :next message? Remember that messages are processed one at a time and we're not done with the current message yet. The model is an actor and actors communicate with messages; it seems reasonable to expect the response from the model to be delivered by a message sent to the controller. So how did we see that message if we're not done with the current message? If we can generalize a bit: how do futures (or a request-response messaging pattern in general) fit into an actor-based design?
The short answer appears to be that they don't really fit at all. A better approach would be for the controller to send messages to the models for the current candidate and then return. The models would then compute their answer and send a message containing that answer back to the controller. The message handler at the controller would then determine if an all responses have been received. If every model has answered, the controller computes an overall answer and takes action: if the candidate is prime it's returned to the caller, otherwise a new candidate is generated and the process starts all over again. [1]
So what does this new approach look like in Ruby? [2] Our controller is now a bit longer but after a bit of cleanup the code reads reasonably well:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Controller < UntypedActor | |
def initialize | |
@models = 0.upto(3).map do |idx| | |
model = Actors.actorOf { SieveNonblocking::Model.new } | |
model.start | |
model | |
end | |
# Seed models with a few initial values... just to get things going | |
@seeds = [2,3,5,7] | |
0.upto(3).each { |idx| @models[idx].tell [:add,@seeds[idx]] } | |
@candidates = Candidates.new | |
# Introduce member list to keep track of the responses we've received from each | |
# actor | |
@replies = [] | |
end | |
# Part of the lifecycle for an Akka actor. When this actor is shut down | |
# we'll want to shut down all the models we're aware of as well | |
def postStop | |
@models.each { |m| m.stop } | |
end | |
def onReceive(msg) | |
case msg[0] | |
when :next | |
# If we still have seeds to return do so up front | |
seed = @seeds.shift | |
if seed | |
self.getContext.replySafe seed | |
return | |
end | |
# Preserve a channel to the original sender so that we can deliver answers when we | |
# get them | |
@reply_channel = self.getContext.channel | |
update_and_send_candidate | |
when :prime | |
if !validate_model_reply(msg) | |
return | |
end | |
@replies << true | |
check_replies | |
when :not_prime | |
if !validate_model_reply(msg) | |
return | |
end | |
@replies << false | |
check_replies | |
end | |
end | |
# Some basic validation; make sure the message is a list of the correct size and | |
# that the second element (the value the model is reporting on) matches the current | |
# candidate. | |
def validate_model_reply(msg) | |
return msg.length == 2 && msg[1] == @candidate | |
end | |
# Update the candidate state and send to all models | |
def update_and_send_candidate | |
@candidate = @candidates.first | |
@models.each { |m| m.tell([:prime?,@candidate],self.getContext) } | |
end | |
# Check to see if we've received responses from all models. If we have then check to | |
# see if everybody said the candidate was prime. If that's the case we're okay to send | |
# the candidate to the caller, otherwise we get to start all over with the next candidate. | |
def check_replies | |
if @replies.length < @models.length | |
return | |
end | |
# We've found the next prime. Add this value to one of the models, reset as much | |
# of our state as possible and send the result to the caller | |
if @replies.all? | |
nextprime = @candidate | |
@models[(rand @models.size)].tell [:add,nextprime] | |
@replies.clear | |
@candidate = nil | |
@reply_channel.tell nextprime | |
# We don't have a prime, so get a new candidate, send it off to the models and | |
# reset what state we can | |
else | |
@replies.clear | |
update_and_send_candidate | |
end | |
end | |
end |
The model doesn't really change much:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# A model represents a fragment of the state of our sieve, specifically some subset | |
# of the primes discovered so far. | |
class Model < UntypedActor | |
def initialize | |
@primes = [] | |
end | |
def onReceive(msg) | |
# It's times like this that one really does miss Scala's pattern matching | |
# but case fills in nicely enough | |
case msg[0] | |
when :add | |
if msg.length != 2 | |
return | |
end | |
@primes << msg[1] | |
when :prime? | |
# Upfront validation; make sure we have some primes and that the message is of the appropriate size | |
if msg.length != 2 || @primes.empty? | |
self.getContext.replySafe nil | |
return | |
end | |
# The model only considers a value prime if it doesn't equal or divide evenly into any previously | |
# observed prime. | |
candidate = msg[1] | |
resp = @primes.none? do |prime| | |
candidate != prime and candidate % prime == 0 | |
end | |
self.getContext.replySafe [resp ? :prime : :not_prime,candidate] | |
else | |
puts "Unknown message type #{type}" | |
end | |
end | |
end |
Complete code (including tests) can be found on github.
[1] Viktor Klang mentioned a non-blocking solution in comments on the original post. Any non-blocking implementation seemed to be constrained by a variation on the theme mentioned above; how do we preserve a communciation channel to the original caller (a caller who is not an actor) if the message handler for the :next message doesn't return a response directly? It wasn't until I came across Akka's notion of a reply channel (and the ability to preserve it as an instance variable in our actor) that this problem was resolved.
[2] I briefly considered explicitly modelling the actors as state machines using the intriguing statemachine gem but decided against it; formally defining the state machines involved didn't add much value to actors that are as lightweight as ours. Larger systems with more complex actors would very likely benefit from the use of this gem.