Tuesday, March 29, 2011

Data Models and Cassandra

In a pair of previous posts we considered how we might use CouchDB to store and query a collection of tweets and tweet authors. The goal was to explore the document-oriented data model by constructing queries across multiple types of documents in a loose approximation to a join statement in SQL. Cassandra implements a data model built on ColumnFamilies which differs from both SQL and document-oriented databases such as CouchDB. Since we already have a problem space defined we begin our exploration of Cassandra by attempting to answer the same questions using a new data model.

The examples below are implemented in Python with a little help from a few excellent libraries:


  • Pycassa for interacting with Cassandra.

  • Twitter tools when we have to interact with Twitter via their REST API



We'll begin by looking at how we might query an Cassandra instance populated with data in order to find the answers to the questions in our problem space. We'll close by briefly discussing how to get the data into Cassandra.

We should be all set; bring on the questions!

For each author: what language do they write their tweets in and how many followers do they have?



The organizing structure of the Cassandra data model is the column family defined within a keyspace. The keyspace is exactly what it sounds like: a collection of keys, each identifying a distinct entity in your data model. Each column family represents a logical grouping of data about these keys. This data is represented by one or more columns, which are really not much more than a tuple containing a name, value and timestamp. A column family can contain one or more columns for a key in the keyspace, and as it turns out you have a great deal of flexibility here; columns in a column family don't have to be defined in advance and do not have to be the same for all keys.

We begin by imagining a column family named "authors" in a keyspace defined by the user's Twitter handle or screen name. Assume that for each of these keys the "authors" column family contains a set of columns, one for each property returned by the "user/show" resource found in the Twitter REST API. Let's further assume that included in this data are fields named "lang" and "followers_count" and that these fields correspond to exactly the data we're looking for. We can satisfy our requirement by using a range query to identify all keys that fall within a specified range. In our case we want to include all alphanumeric screen names so we query across the range of printable ASCII characters. The Pycassa API makes this very easy [1]:



The result is exactly what we wanted:

[@varese src]$ python Query1.py
Key: Alanfbrum, language: en, followers: 73
Key: AloisBelaska, language: en, followers: 8
Key: DASHmiami3, language: en, followers: 11
Key: DFW_BigData, language: en, followers: 21


How many tweets did each author write?



Okay, so we've got the idea of a column family; we can use them to define a set of information for keys in our keyspace. Clearly this is a useful organizing principle, but in some cases we need a hierarchy that goes a bit deeper. The set of tweets written by an author illustrates just such a case: tweets are written by a specific author, but each tweet has data of it's own (the actual content of the tweet, a timestamp indicating when it's written, perhaps some geo-location info, etc.). How can we represent this additional level of hierarchy?

We could define a new keyspace consisting of some combination of the author's screen name and the tweet ID but this doesn't seem terribly efficient; identifying all tweets written by an author is now unnecessarily complicated. Fortunately Cassandra provides a super column family which meets our needs exactly. The value of each column in a super column family is itself a collection of regular columns.

Let's apply this structure to the problem at hand. Assume that we also have a super column family named "tweets" within our keyspace. For each key we define one or more super columns, one for each tweet written by the author identified by the key. The value of any given super column is a collection of columns, one for each field contained in the results we get when we search for tweets using Twitter's search API. Once again we utilize a range query to access the relevant keys:



Running this script gives us the following:

[@varese src]$ python Query2.py
Authors: Alanfbrum, tweets written: 1
Authors: AloisBelaska, tweets written: 1
Authors: DASHmiami3, tweets written: 1
Authors: DFW_BigData, tweets written: 1
...
Authors: LaariPimenteel, tweets written: 2
Authors: MeqqSmile, tweets written: 1
Authors: Mesoziinha, tweets written: 2


How many tweet authors are also followers of @spyced?



This problem presented the largest challenge when trying to model this problem space using CouchDB. Somehow we have to relate one type of resource (the set of followers for a Twitter user) to the set of authors defined in our "authors" column family. The Twitter REST API exposes the set of user IDs that follow a given user, so one approach to this problem might be to obtain these IDs and, for each of them, query the "authors" table to see if we have an author with a matching ID. As for the user to search for... well, when we were working with CouchDB we used @damienkatz so it only seems logical that we use Jonathan Ellis (@spyced) in this case.

Newer versions of Cassandra provide support for indexed slices on a column family. The database maintains an index for a named column within a column family, enabling very efficient lookups for rows in which the column matches a simple query. Query terms can test for equality or whether a column value is greater than or less than an input value [2]. Multiple search clauses can be combined within a single query, but in our case we're interested in strict equality only. Our solution to this problem looks something like the following:



The result looks pretty promising:

[@varese src]$ python Query3.py
tlberglund
evidentsoftware
sreeix
...
joealex
cassandra
21


Some spot checking of these results using the Twitter Web interface seems to confirm the results. [3]

Populating the data



So far we've talked about accessing data from a Cassandra instance that has already been populated with the data we need. But how do we get it in there in the first place? The answer to this question is a two-step process; first we create the relevant structures within Cassandra and then we use our Python tools to gather and add the actual data.

My preferred tool for managing my Cassandra instance is the pycassaShell that comes with Pycassa. This tool makes it's easy to create the column families and indexes we've been working with:

SYSTEM_MANAGER.create_keyspace('twitter',replication_factor=1)
SYSTEM_MANAGER.create_column_family('twitter','authors',comparator_type=UTF8_TYPE)
SYSTEM_MANAGER.create_column_family('twitter','tweets',super=True,comparator_type=UTF8_TYPE)
SYSTEM_MANAGER.create_index('twitter','authors','id_str',UTF8_TYPE)


There are plenty of similar tools around; your mileage may vary.

When it comes to the heavy lifting, we combine Pycassa and Twitter tools into a simple script that does everything we need:



[1] For sake of brevity this discussion omits a few details, including the configuration of a partitioner and tokens in order to use range queries effectively and how keys are ordered. Consult the project documentation and wiki for additional details.

[2] Here "greater than" and "less than" are defined in terms of the type provided at index creation time.

[3] You could complain that we're cheating a bit here. When we were working with CouchDB we were tasked with joining two distinct "types" of resources using a map-reduce operation applied to documents within the database; that was the whole point of the exercise. Here we're just querying the DB in response to some external data source. This is true to a point, but in my defense it's worth noting that we could easily create a "followers" column family containing the followers @spyced and then execute the same logic against this column family rather than the Twitter REST API directly. This isn't a very satisfying answer, however; this issue could very well be taken up in a future post.

Monday, March 7, 2011

A Few More Words on Laziness

A few updates to the recent post on lazy evaluation. While most of the discussion below is directed at the Ruby implementation the same principles could also be applied to the Scala and/or Clojure versions. The implementations in these two languages are already fairly speedy, however, so for now we'll focus on bringing the Ruby version into line.

So, without further ado...

Don't blame JRuby


Of the three implementations discussed in the previous post the Ruby version was by far the worst in terms of performance. This fact should in no way be interpreted as a criticism of JRuby itself; the fault rests entirely on the code being executed (and by extension the abilities of the programmer who wrote it), not the platform it runs on. To prove the point, consider the following:


[@varese ruby]$ more Euler10Primes.rb
require 'prime.rb'

puts Prime.each(2000000).reduce(0) { |i,total| total + i }
[@varese ruby]$ time jruby --1.9 Euler10Primes.rb
142913828922

real 0m27.737s
user 0m20.807s
sys 0m6.181s


Clearly we have work to do.

Remember that part about testing fewer candidates?


Our intial algorithm tested a candidate set of all odd numbers beginning with three. The underlying logic is two-fold:


  • Any even number is by definition divisible by two (a prime) and therefore not prime

  • We can exclude these numbers by generating candidates in a clever way (starting from an odd value and incrementing by two) rather than testing every candidate to see if it's even or odd



As a first optimization we consider this process for generating candidates. Is there any room for improvement here? As it turns out there is; approximately 20% of the time we're testing a candidate that could be excluded with zero computation. Permit me to explain.

Multiples of five (a prime) always end in zero or five. Anything ending in zero must be even (and therefore already excluded) but so far we haven't excluded anything ending in five. And since one out of every five of our current candidate pool ends in five we're doing exactly 20% more work than we need to. Okay, so how do we keep these values out of our set of candidates? We know that odd numbers must end with a fixed range of values (1,3,5,7 or 9). If we exclude five from that list and then add the remaining values to successive multiples of ten we'll generate the full set of potential candidate values.

This optimization alone yields some benefit:


[@varese ruby]$ time jruby Euler10.rb
142913828922

real 2m42.012s
user 0m14.394s
sys 2m23.462s


There's still plenty of room for improvement.

Laziness can be fleeting


Languages that fully support lazy evaluation try not to return values until absolutely necessary. Suppose you want to return the first three items from a lazily defined sequence in Clojure. You might use "(take 3 coll)", but note that this doesn't return an array containing three values; it returns another lazy sequence. You could thus chain operations together without completely evaluating (or "realizing" as the Clojure folks are fond of saying) anything. Given a predicate "pred", for example, you could identify which of the these values satisfy pred using "(filter pred (take 3 coll))". This expression also returns a lazy sequence; if you want to return the values you need something like "(doall (filter pred (take 3 coll)))"

Ruby does not support lazy evaluation. We've been using "yield" to approximate lazy evaluation but it's only an approximation. In Ruby "coll.take 5" will return an array containing the first five items in the collection rather than another Enumerable that will generate these values as needed. As a result we're once again doing more work than we need to. And unfortunately this is exactly what we find in our initial Ruby implementation:


if @primearr.take_while {|i| i <= root }.all? {|i| @nat % i != 0}
@primearr << @nat
yield @nat
end


On the plus side we're only returning all primes up to the square root of our candidate. On the negative side we completely negate the fail-fast functionality of the "all?" method. Wouldn't it be better if we could exit this process as soon as we find a prime that the candidate divides evenly into?

This problem is actually a bit trickier than it looks. We can exit our test by finding a prime that our candidate divides into (indicating that the candidate isn't prime) or by finding a prime greater than the square root of the candidate (indicating that the candidate is prime). How can we distinguish between these two cases? One way to implement this test is to combine both conditions into a single predicate and find the first prime that satisfies it. If that prime is greater than the square root of the candidate we have a prime, otherwise we don't. We know this find operation will always be successful since eventually we'll find a prime greater than the square root of the candidate.

The resulting code (including the candidate generation fix mentioned above) looks like this:



So, how'd we do?


[@varese ruby]$ time jruby Euler10.rb
142913828922

real 0m43.730s
user 0m33.408s
sys 0m9.040s


That's more like it! This code overtakes the original Clojure implementation to move into second place. We're still slower than the implementation using the prime number generator that comes with JRuby but we've made up quite a bit of ground.

You don't need a JVM to be lazy


If you're interested in lazy evaluation you could do a lot worse than spending some time with Haskell. The language fully embraces lazy evaluation which, according to some people, matters a great deal to the utility of functional programming.

An initial implementation of our algorithm in Haskell looks something like this:



Unfortunately the performance is dreadful:


[@varese haskell]$ ghc -o euler10 euler10.hs
[@varese haskell]$ time ./euler10
142913828922

real 4m50.253s
user 0m8.976s
sys 4m35.131s


Once again it's important to realize that these numbers don't reflect on Haskell or the GHC in any way. They do reflect on my utter lack of experience and/or skill with this language. There's almost certainly a better (and probably more idiomatic) way to implement this algorithm in Haskell, although in my defense even this performance is orders of magnitude better than what we get using the Sieve of Eratosthenes implementation from Hutton's text:


[@varese haskell]$ more euler10sieve.hs
module Main where

primes = sieve [2..]
sieve(p:xs) = p:sieve[x|x <- xs, x `mod` p /= 0]

main = print (foldr (+) 0 (takeWhile (<2000000) primes))
[@varese haskell]$ ghc -o euler10sieve euler10sieve.hs
[@varese haskell]$ time ./euler10sieve
142913828922

real 401m25.043s
user 0m0.069s
sys 394m13.061s


In the end, of course, this probably indicates only that the Sieve of Eratosthenes is probably not an ideal solution to this particular problem.

Thursday, March 3, 2011

In Defense of Laziness and Being Self-Centered

On a whim I decided to dig into a few more Project Euler problems. Problem 10 was up next, and on the face of things the issue seemed fairly straightforward: compute the sum of all prime numbers less than two million. Unfortunately the simple implementation of the Sieve of Eratosthenes referenced in the Scala documentation (and used in a solution to a previous problem) promptly blows the stack when trying to perform the necessary computations. Clearly we need a more efficient way to compute large collections of primes.

It may seem counter-intuitive but the brute force method of trial division works very well in this situation. In addition to being relatively easy to implement there are several entry points that might be used to increase performance. We might try to select fewer candidates for testing. Or perhaps we can minimize the number of tests for any specific candidate. Ideally we could do both.

As an initial optimization we observe that 2 is the only possible prime number; if we fix our candidate sequence to return 2 as a fixed initial value and then only generate odd values we'll keep our work down. We keep the number of comparisons on each candidate low by utilizing the principle that for an integer N we need only compare primes less than or equal to the square root of N in order to determine primality. The square root of two million is a little more than 1414 so we only need to compare a relatively small set of primes for each candidate.

An implementation of this algorithm in Scala looks something like this:



This code is quite performant on my commodity laptop:


[@varese src]$ time scala org.fencepost.Euler10
Result: 142913828922

real 0m18.876s
user 0m15.149s
sys 0m3.042s


Note that in this code the stream "primes" is a self-referential data type. An initial value of 2 is specified, but all subsequent values are determined by filtering the candidate pool using the primes defined (and returned) so far. Don't let the indirection of the "pred" function fool you; it's there only to make the code a bit easier to read. We could just as easily include the logic defined in "pred" as an inline function to the filter call within "primes".

The definition of self-referential data types is entirely dependent upon the idea of lazy evaluation. If a self-referential type were evaluated immediately we'd quickly find ourselves in a recursive loop with no termination condition. Using lazy evaluation, however, values are only computed as they are needed. We provide a function to generate new values from previous values and enough initial values to provide the necessary input(s) to the first invocation of this function; lazy evaluation handles the rest.

Clojure also includes extensive support for lazy evaluation within sequences; we should be able to use them to implement something similar. A bit of work is required to get the scoping right but eventually we come up with the following:



The Clojure implementation isn't as fast as our Scala version but it is still respectable; understand that we are computing just under 150,000 primes here:


[@varese clojure]$ time clojure euler10.clj
142913828922

real 1m2.860s
user 0m51.556s
sys 0m6.327s


As it happens I've been working through the Ruby chapter of Bruce Tate's Seven Languages in Seven Weeks. Bruce strongly encourages experimentation with the various languages described in this text (and I could use the practice) so let's make an attempt to implement the same algorithm in Ruby. The language doesn't offer support for self-referential data types or lazy evaluation but we can get close using an implementation of Enumerable that uses this algorithm to create the values our "each" method will yield. The code I came up with is the following:



I would ask the experienced Ruby hackers in the back to please keep the snickering down.

We test this implementation using JRuby, specifically RC2 of the upcoming 1.6 release. Why JRuby over the alternatives? The other languages we've considered here are JVM languages so it seemed sensible to remain on the JVM unless there was a compelling reason not to. And since JRuby is generally regarded as the fastest Ruby implementation going there was little incentive to leave.


[@varese ruby]$ time jruby Euler10.rb
142913828922

real 3m4.725s
user 0m17.733s
sys 2m41.426s


We've definitely violated Project Euler's "one minute rule" but we haven't strayed into the realm of completely unacceptable performance. That said, there's no doubt that we've taken a step backwards.

Self-referential data types can be a very concise way to represent a stream of data, even a stream of infinite length. They're just one of the ways lazy evaluation can help you think about problems in different ways. If your language of choice supports this feature it's worth your time to experiment with it.