The Potency of Idempotent with RabbitMQ and MongoDB Upsert

Designing with possible failure in mind is a good strategy when building with distributed systems. The cloud is a distributed system. Latency, network failures, service end points changing and such are to be expected. If you don't expect them, well, be prepared for the unhappy Twitter responses or pager alert in the middle of the night when your application goes down.

One of the easiest strategies to help mitigate potential failures is to design your data model for idempotent functions, which can be correctly called over and over again, and to rely on a messaging system to deliver data at least once. It's easy to do this with RabbitMQ and a ready made statement for idempotent data like Mongo's upsert.

You Use Idempotent Data Models All the Time

When you make a deposit at your bank or transact with some large e-commerce companies, the data contained in those events is modeled for idempotent functions. This type of data model tends to be nothing more than a virtual ledger versus a non-idempotent model that relies on update in place semantics. In the ledger model a history is kept and a tally can be computed from the individual entries. In the non-ledger model there is no history just a balance field. Bank accounts, or even accounting in general, are great examples of this:

accountLedger

In the ledger model the same piece of data can be processed more than once without that being an error. In the above example, if the Exxon withdrawal was inserted over and over again in the ledger model there wouldn't be any difference in the state assuming that the entry had some kind of key that identified it as being the same. Update the entry with the same data and rerun the tally to get the same balance as before. No harm other than some wasted compute cycles.

In the non-ledger model though, every update other than the first would be an error. It would rerun the withdraw transaction steps and over debit the account.

At Least Once

RabbitMQ is one of many messaging systems that provide at least once semantics if configured to do so. By requiring the consumer of a message from a queue to acknowledge when it has finished processing the message, one can guarantee at least once processing. If there is a failure between the consumer's finishing processing prior to acknowledging then it will be rerun again which is the more than once scenario. Systems that are not designed to handle this can create errors. The above non-ledger style account above would be a case in point.

Using pika, a Python RabbitMQ driver, we can review configuring the queue to ack when done. The below connects to Rabbit, consumes from the transaction queue, and the acknowledges a message:

import json  
import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(  
              host='aws-us-east-1-portal12.dblayer.com',
              port=15518, 
              virtual_host='tangible-rabbitmq-66', 
              ssl=True, 
              credentials=pika.credentials.PlainCredentials("hays", "thumper", True))) 

channel = conn.channel()

for frame, props, body in channel.consume('transaction'):  
    msg = json.loads(body)
    # Do Mongo Upsert Here! (See Below) 
    channel.basic_ack(frame.delivery_tag)

In the above you can see the basic_ack. We ack when message processing is complete. The thing that is also handled here is that if this doesn't finish processing then the original message will still be in the transaction queue without having been acked. That means in certain scenarios this entire code would be run again. In the non-ledger system this could be a problem with double processing. Idempotent solves this.

Mongo's upsert as an Idempotent Function

So, one can insert data and let the datastore generate a key such as Mongo's ObjectId or using a SERIAL/SEQUENCE from PostgreSQL. This can be a big problem in the more than once scenarios like above where you could end up with the following:

[
  { 
    id: 1,
    action: "WITHDRAW",
    account: 123,
    value: 45.0,
    at: "2016-10-11T17:04:00-06:00"
  },
  { 
    id: 2,
    action: "WITHDRAW",
    account: 123,
    value: 45.0,
    at: "2016-10-11T17:04:00-06:00"
  }
]

Obviously, this would be bad for our account since it would be withdrawing an extra $45.

The easiest solution is to treat the WITHDRAW as an entity before it ever makes it to the datastore or to any code where a failure could create multiple copies of the data. Keying the data with a UUID before publishing to a queue is a simple solution:

{ 
  id: "37320056-9d42-492a-a216-03bc5beea0ce",
  action: "WITHDRAW",
  account: 123,
  value: 45.0,
  at: "2016-10-11T17:04:00-06:00"
}

Since the id is part of the original record we can assert it as the unique key and just upsert it to our store. We could do it more than once too as long as the processing doesn't create any other side effects.

Extending the Python example from above, we use the pymongo MongoDB driver to key and publish just such data:

...
import pymongo  
import ssl

mongo = pymongo.MongoClient("mongodb://idem:potent@aws-us-east-1-portal.19.dblayer.com:15513/idempotent",  
                            ssl=True,
                            ssl_ca_certs='i_am_a_ca.pem')
accounts = mongo.idempotent.accounts

...

for frame, props, body in channel.consume('transaction'):  
    msg = json.loads(body)
    account = accounts.replace_one({'_id': msg['_id']}, msg, True)
    channel.basic_ack(frame.delivery_tag) 

...

In the above we've connected to a Mongo database named idempotent with a collection named accounts via SSL with our self-signed certificate which was copied from the Compose deployments Overview page. The new addition here is the accounts.replace_one function call. With the True parameter, it will insert the msg if not found. Otherwise it updates it. These together are upsert. It's all that is needed to safely process retries which might rarely happen from our consuming queue code. This is idempotent.

An Added Bonus

Embracing this publisher, queue, and consumer processing allows for splitting an application at a good boundary. Front end responses can proceed quickly and back end processing can be pushed to an asynchronous worker which relies on a queue:

queue

Less latency and more speed up front with acceptable speed and buffered processing on the back equals less total resources which is good from a dollars perspective. To parse that, it makes sense to handle front end HTTP requests optimistically then to asynchronously handle the heavy lifting on a back end process that can even be buffered during peak usage.

Insert Queue, Respond Immediate

There are many scenarios where having an asynchronous worker handle the heavy, stateful lifting makes good sense. Trading a transactional style commit for a simple queue insert can be a really good tradeoff for some workloads since your web serving code can respond without waiting. Let's see what it takes using Python's Flask web framework to create a simple API and use the pika driver to publish from the front end to RabbitMQ:

from flask import Flask  
from flask import request  
from flask import jsonify  
import pika  
import json  
import uuid

app = Flask(__name__)

conn = pika.BlockingConnection(pika.ConnectionParameters(  
              host='aws-us-east-1-portal12.dblayer.com',
              port=15518, 
              virtual_host='tangible-rabbitmq-66', 
              ssl=True, 
              credentials=pika.credentials.PlainCredentials("hays", "thumper", True))) 

channel = conn.channel()  
channel.queue_declare(queue='transaction')

@app.route("/transaction", methods=['POST'])
def transaction():  
    msg = request.json
    msg["_id"] = str(uuid.uuid4())
    channel.basic_publish(exchange='',
                          routing_key='transaction',
                          body= json.dumps(msg),
                          properties=pika.BasicProperties(
                               delivery_mode = 2
                          ))
    return jsonify(msg)

if __name__ == "__main__":  
    app.run()

The above connects to Rabbit, declares a queue, and creates an HTTP endpoint that accepts a JSON POST request. For each POST, it parses the request and generates a unique key before it publishes the data to a persistent, on disk queue. By keying the entity here instead of at the database layer, it protects the identity of the data and allows the later upsert to be idempotent.

Tradeoffs

There is a lot to be gained by building your apps with messaging: separating concerns, asynchronous and possibly parallel processing on the back end, the ability to use multiple languages, and on the list can go. There are some tradeoffs in an approach such as added complexity and running a new server to handle messaging. The beauty of Compose is that we can handle running that RabbitMQ server for you so you can go about the business of building scalable applications.


If you have any feedback about this or any other Compose article, drop the Compose Articles team a line at articles@compose.com. We're happy to hear from you.

Image by: Chris Pastrick