Oplog: Tools & Libraries for All

As the final post in this short series on the oplog and what your applications can do with it now that MongoHQ's Elastic Deployments give you access to it, we'll look at the libraries (in various languages) and tools that use the oplog. Although the steps in getting the oplog are simple enough, making sure the code is reliable for all cases means it's often easier to use an existing library.

Ruby

We start with Mongoriver, a Ruby gem created and released as open source (MIT licence) by the developers at Stripe. Mongoriver has some interesting features. In our examples we've tended to start reading the latest from the oplog, but it is possible to work through the oplog from a particular point in time. Mongoriver can start from the beginning of the oplog, or be passed a particular time from which to work. For more complex tailing work there's also a persistant tailer which tracks the time of the operation it last read from the oplog and saves it in a collection which, when it restarts, it will refer to so it can start off without missing anything. The driver for this functionality can be found in another Stripe project, MoSQL, which replicates MongoDB collections content in a table within a PostgreSQL database. Stripe uses the replicated data as part of their offline analytics and uses MoSQL's oplog tailing to build and keep that replica up-to-date.

As well as separating out the insert, delete and update operations, Mongoriver goes slightly further than other libraries in attempting to decode the various commands, like creating and dropping collections. All this is passed to an implementation of an AbstractOutlet:

module Mongoriver  
  class OplogWatcher < AbstractOutlet
    include Mongoriver::Logging

    def insert(db_name, collection_name, document)
      puts("got an insert for #{db_name}.#{collection_name}! #{document.inspect}")
    end

    def remove(db_name, collection_name, document)
      puts("got a remove for #{db_name}.#{collection_name}! #{document.inspect}")
    end

    def update(db_name, collection_name, selector, updates)
      puts("got an update for #{db_name}.#{collection_name}! #{selector}, #{updates}")
    end
  end
end  

To connect to this to the MongoDB database, we've found it easiest to create our own connection with a MongoDB URI and pass that to Mongoriver.

client = Mongo::MongoClient.from_uri(ENV["MONGOHQ_URL"])  
tailer = Mongoriver::Tailer.new([client], :existing)  
outlet = Mongoriver::OplogWatcher.new  
stream = Mongoriver::Stream.new(tailer, outlet)  

This code uses the MongoClient from_uri method so it can use Mongo URIs as generated by the MongoHQ administration dashboard. In previous posts we mentioned how the URI needs to be modified so it connects to the "local" database but authenticates with the named database. In this case we don't have to make that change and can use the mongodb://username:password@host1:port1,host2:port2/database format as generated (/articles/oplog-tailing-in-ruby-and-go-examples/").

If you are looking for other Mongoriver examples, check out mongo_delta which uses Mongoriver as well as the incoming stream to replicate parts of that stream to separate database instances.

Go

For Go developers, tailing the oplog in Go is simple thanks to a number of methods in the Labix mgo driver for MongoDB. A Find() query can be postfixed with .LogReplay() and .Tail() to give a constantly updating stream of events. Of course, anything can be improved with a little wrapping and the mgotail library is just that. A little wrapping that takes care of reading the oplog in a Go routine, decanting its content into a Oplog struct, and delivering it through a Go channel. There's also a function for getting the highest/latest timestamp from the oplog. That reduces the code needed to read the oplog to:

logs := make(chan mgotail.Oplog)  
done := make(chan bool)  
last := mgotail.LastTime(session)

q := mgotail.OplogQuery{session, bson.M{"ts": bson.M{"$gt": last} , "ns": "wiktory.userinfo" }, time.Second * 3}  
go q.Tail(logs, done)  
go printlog(&amp;results, logs)  

Where session is the database session, and printlog is a routine you'll find in the full listing of the Go example which prints out the results as they appear.

For a more extensive use of the oplog in Go, you should look at MongoHQ's own Seed application. Seed is a Go program for zero-downtime replication of MongoDB replica sets with the abilty to rename databases during the migration process. In the BSD-licensed open source code you'll find definitions for an OplogDoc and supporting functions, including a last-timestamp extractor and operation decoder while the logReplayer shows another way to consume the oplog.

Java

For Java, there's actually example code for reading the oplog in the MongoDB driver, though it's far from production code, as it eats through any oplog entries available and then sleeps the thread for a second. With the range of frameworks that people develop Java applications with, it'd be unlikely that any one enhanced example would be useful.

If you want to see a more comprehensive use of the oplog, check out the MongoDB River Plugin for ElasticSearch. As its name suggests, the application takes data from MongoDB and passes selected content over to ElasticSearch for deeper indexing and search.

Node.js

Finally, we return to Node.js and a mention for an oplog tailing library, mongo-watch. This library was originally developed with some extensive caching and event triggering functionality which has since been modularized out into Particle. Particle is being developed as a library for distributed state synchronization, keeping local data models in sync with a server. Although it's a work in progress, with read only local data models, the plan is to enhance it to support not only writing back but to potentially include other data sources.

Although the oplog was built for replication, watching it can unlock all kinds of uses for MongoDB, from Meteor's ability to maintain real-time scaling or gathering metrics or triggering actions in remote systems. The oplog gives you a view into the beating heart of the data inside your database.