Cassandra itself provides an API that uses the Thrift protocol for manipulating data. We'll use this interface to implement a simple proof-of-concept application that might serve as a testbed for manipulating data managed by Cassandra in idiomatic Clojure. Note that the Clojure ecosystem already includes several open-source projects that connect Clojure to Cassandra: these include clj-cassandra and clj-hector, the latter leveraging the Hector Java client to do it's work. In order to keep things simple we choose to avoid any of these third-party libraries; it's not as if the Thrift interface imposes a heavy burden on us. Let's see how far we can get with what's already in the packaging.
That sounds great... so what exactly are we trying to do? Beginning with the database generated during our previous work with Cassandra we should be able to access sets of keys within a keyspace and a set of columns for any specific key. These structures should be available for manipulation in idiomatic Clojure as sequences. Ideally these sequences would be at least somewhat lazy and transparently support multiple datatypes. [2]
Using the Thrift interface requires working with a fair number of Java objects representing return types and/or arguments to the various exposed functions. My Clojure isn't yet solid enough to hash out Java interop code without flailing a bit so we kick things off with a Scala implementation. This approach allows us to simplify the interop problem without sacrificing the functional approach, all within a language that is by now fairly familiar.
The Scala code includes a fair number of intermediate objects but is otherwise fairly clean:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.fencepost.cassandra | |
import scala.collection.JavaConversions | |
import org.apache.thrift.protocol.TBinaryProtocol | |
import org.apache.thrift.transport._ | |
import org.apache.cassandra.service._ | |
import org.apache.cassandra.thrift._ | |
object ThriftCassandraClient { | |
def connect(host:String,port:Int,keyspace:String):Cassandra.Client = { | |
val transport = new TFramedTransport(new TSocket(host, port)) | |
val protocol = new TBinaryProtocol(transport) | |
val client = new Cassandra.Client(protocol) | |
transport.open() | |
client set_keyspace keyspace | |
client | |
} | |
// Execute a range slice query against the specified Cassandra instance. Method returns | |
// an object suitable for later interrogation by range_slices_keys() or range_slices_columns() | |
def get_range_slices(client:Cassandra.Client,cf:String,start:String,end:String):Iterable[KeySlice] = { | |
val sliceRange = new SliceRange() | |
sliceRange setStart new Array[Byte](0) | |
sliceRange setFinish new Array[Byte](0) | |
sliceRange setReversed false | |
sliceRange setCount 100 | |
val slicePredicate = new SlicePredicate() | |
slicePredicate setSlice_range sliceRange | |
val columnParent = new ColumnParent(cf) | |
val keyRange = new KeyRange() | |
keyRange setStart_key (start getBytes) | |
keyRange setEnd_key (end getBytes) | |
val javakeys = client.get_range_slices(columnParent,slicePredicate,keyRange,ConsistencyLevel.ONE) | |
// Return from Thrift Java client is List<KeySlice> so we have to explicitly convert it here | |
JavaConversions asScalaIterable javakeys | |
} | |
// Return an Iterable for all keys in an input query state object | |
def range_slices_keys(slices:Iterable[KeySlice]) = slices map { c => new String(c.getKey) } | |
// Return an Option containing column information for the specified key in the input query | |
// state object. If the key isn't found None is returned, otherwise the Option contains a | |
// map of column names to column values. | |
def range_slices_columns(slices:Iterable[KeySlice], key:String):Option[Map[String,String]] = { | |
slices find { c => new String(c.getKey()) == key } match { | |
case None => None | |
case Some(keyval) => | |
val urcols = JavaConversions asScalaIterable (keyval getColumns) | |
val cols:Seq[Column] = (urcols map (_ getColumn)).toSeq | |
Some(Map(cols map { c => (new String(c.getName())) -> (new String(c.getValue())) }:_*)) | |
} | |
} | |
def main(args: Array[String]): Unit = { | |
val client = connect("localhost", 9160,"twitter") | |
val slices = get_range_slices(client,"authors","!","~") | |
val fivekeys = range_slices_keys(slices) take 5 | |
println("fivekeys: " + fivekeys) | |
for (key <- fivekeys) { | |
range_slices_columns(slices,key) match { | |
case None => | |
case Some(cols) => | |
println( | |
"Key " + key + | |
": name => " + (cols getOrElse ("name","")) + | |
", following => " + (cols getOrElse ("following","")) | |
) | |
} | |
} | |
} | |
} |
Translating this code into Clojure is more straightforward than expected:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(import '(org.apache.thrift.transport TFramedTransport TSocket) | |
'(org.apache.thrift.protocol TBinaryProtocol) | |
'(org.apache.cassandra.thrift Cassandra$Client SliceRange SlicePredicate ColumnParent KeyRange ConsistencyLevel) | |
) | |
(defn connect [host port keyspace] | |
"Connect to a Cassandra instance on the specified host and port. Set things up to use the specified key space." | |
(let [transport (TFramedTransport. (TSocket. host port)) | |
protocol (TBinaryProtocol. transport) | |
client (Cassandra$Client. protocol)] | |
(.open transport) | |
(.set_keyspace client keyspace) | |
client | |
) | |
) | |
(defn get_range_slices [client cf start end] | |
"Simple front end into the get_range_slices function exposed via Thrift" | |
(let [slice_range | |
(doto (SliceRange.) | |
(.setStart (byte-array 0)) | |
(.setFinish (byte-array 0)) | |
(.setReversed false) | |
(.setCount 100) | |
) | |
slice_predicate | |
(doto (SlicePredicate.) | |
(.setSlice_range slice_range)) | |
column_parent (ColumnParent. cf) | |
key_range | |
(doto (KeyRange.) | |
(.setStart_key (.getBytes start)) | |
(.setEnd_key (.getBytes end))) | |
] | |
(.get_range_slices client column_parent slice_predicate key_range ConsistencyLevel/ONE) | |
) | |
) | |
(defn range_slices_keys [slices] | |
"Retrieve the set of keys in a get_range_slices result" | |
(map #(String. (.getKey %)) slices) | |
) | |
(defn range_slices_columns [slices key] | |
"Retrieve a map of the columns associated with the specified key in a get_range_slices result" | |
(let [match (first (filter #(= key (String. (.getKey %))) slices))] | |
(cond (nil? match) nil | |
(true? true) | |
(let [urcols (.getColumns match) | |
cols (map #(.getColumn %) urcols)] | |
(zipmap (map #(keyword (String. (.getName %))) cols) | |
(map #(String. (.getValue %)) cols)) | |
) | |
) | |
) | |
) | |
(let [client (connect "localhost" 9160 "twitter") | |
key_slices (get_range_slices client "authors" "!" "~") | |
five_keys (take 5 (range_slices_keys key_slices))] | |
(print five_keys) | |
(let [formatfn | |
(fn [key] | |
(let [cols (range_slices_columns key_slices key)] | |
(format "Key %s: name => %s, following => %s\n" key (cols :name) (cols :following)) | |
) | |
)] | |
(print (reduce str (map formatfn five_keys))) | |
) | |
) |
These results seem fairly promising, although we're nowhere near done. This code assumes that all column names and values are strings, a perfectly ridiculous assumption. We also don't offer any support for nested data types, although in fairness this was a failing of our earlier work as well. Finally we haven't built in much support for lazy evaluation; we lazily convert column names to Clojure keywords but that's about it. But fear not, gentle reader; we'll revisit some or all of these points in future work.
[1] At least until CQL arrives in Cassandra 0.8
[2] We won't be able to meet these last two goals in our initial implementation, but with any luck we'll be able to revisit them in future work
No comments:
Post a Comment