Oplog Tailing in Ruby & Go - Examples

Ruby

#!/usr/bin/env ruby
require 'logger'  
require 'optparse'

require 'rubygems'  
require 'bundler/setup'  
require 'mongoriver'


module Mongoriver  
  class OplogWatcher < AbstractOutlet
    include Mongoriver::Logging

    def insert(db_name, collection_name, document)
      puts("got an insert for #{db_name}.#{collection_name}! #{document.inspect}")
    end

    def remove(db_name, collection_name, document)
      puts("got a remove for #{db_name}.#{collection_name}! #{document.inspect}")
    end

    def update(db_name, collection_name, selector, updates)
      puts("got an update for #{db_name}.#{collection_name}! #{selector}, #{updates}")
    end
  end

  class Stream
    def handle_create_collection(db_name, data)
      collection_name = data.delete('create')

      options = {}
      data.each do |k, v|
        options[k.to_sym] = v
      end

      trigger(:create_collection, db_name, collection_name, options)
    end
  end
end

def main  
  options = {:host => nil, :port => nil, :optime => nil, :pause => true, :verbose => 0}
  optparse = OptionParser.new do |opts|
    opts.banner = "Usage: #{$0} [options]"

    opts.on('--help', 'Display this message') do
      puts opts
      exit(1)
    end

    opts.on('-s OPTIME', '--start', 'Starting optime') do |optime|
      options[:optime] = Integer(optime)
    end
  end
  optparse.parse!

  if ARGV.length != 0
    puts optparse
    return 1
  end

  client = Mongo::MongoClient.from_uri(ENV["MONGOHQ_URL"])
  tailer = Mongoriver::Tailer.new([client], :existing)
  outlet = Mongoriver::OplogWatcher.new
  stream = Mongoriver::Stream.new(tailer, outlet)

  %w[TERM INT USR2].each do |sig|
    Signal.trap(sig) do
      log.info("Got SIG#{sig}. Preparing to exit...")
      stream.stop
    end
  end

  stream.run_forever(options[:optime])
  return 0
end

if $0 == __FILE__  
  ret = main
  begin
    exit(ret)
  rescue TypeError
    exit(0)
  end
end  

Go

package main

import (  
  "bytes"
  "fmt"
  "io"
  "labix.org/v2/mgo"
  "labix.org/v2/mgo/bson"
  "os"
  "time"
  "mgotail"
)

func printlog(buffer io.Writer, logs chan mgotail.Oplog) {  
  // Print logs from an oplog channel to a buffer
  for log := range logs {
    switch log.Operation {
    case "i":
      fmt.Printf("Insert %s into %s\n", log.Object, log.Namespace)
    case "u":
      fmt.Printf("Update %s with %s in %s\n", log.QueryObject, log.Object, log.Namespace)
    case "d":
      fmt.Printf("Delete %s from %s\n", log.Object, log.Namespace)
    }
  }
}

func main() {  
  // Test the `Tail` on the Oplog
  fmt.Println("Going to start tailing")

  session, err := mgo.Dial(os.Getenv("MONGOHQ_URL"))
  if err != nil {
    fmt.Printf("Cannot connect to Mongodb: %s.\n %s", os.Getenv("MONGOHQ_URL"), err)
    os.Exit(1)
  }

  session.EnsureSafe(&mgo.Safe{WMode: "majority"})

  var results bytes.Buffer

  logs := make(chan mgotail.Oplog)
  done := make(chan bool)
  last := mgotail.LastTime(session)

  q := mgotail.OplogQuery{session, bson.M{"ts": bson.M{"$gt": last} ,"ns": "wiktory.userinfo" }, time.Second * 3}
  go q.Tail(logs, done)
  go printlog(&results, logs)

  select{}

  <-done

  fmt.Println("..done.\n")
}