Python and Compose - Redis, RethinkDB, and RabbitMQ

Published

Hello, and welcome back to the Python leg of the Compose Grand Tour. The purpose of the tour is to provide the best examples of connecting to all of the databases that are available on Compose, in a wide range of languages. In this second part, we cover Pythonic connections to Redis, RethinkDB, and RabbitMQ.

The common feature of all of the pieces of Grand Tour is an example web application where the frontend is written in Javascript, and the backend in the featured language (here: Python). What varies between the examples is the data persistence, which is handled by a different Compose database. To get a feel for the Javascript and Python code, check out the example github repository where there is a standalone version of the application to make the database connections in the following examples stand out. Where possible, the examples incorporate good practices for secure connections such as using TLS/SSL, certificate use, and server verification. A more detailed overview of the application is in part 1 of the Tour.

As this is the second of three articles, here is a short version of what is going on. The example is a web application where users can enter in word/definition pairs and see previously entered pairs. The one exception to this is the RabbitMQ example, where the web application passes messages instead of storing words, but we'll cover it a little further on. The server framework in Python uses Flask. All of the other information to get connected to the Compose databases are provided in the Connection Info of a deployment's overview page.

It's time to move on to the databases!

Redis

The Code: The full example code is in the example-redis folder of the Compose Grand Tour/Python repository.

The Driver: The recommended driver for Redis in Python is redis-py. It supports both TLS/SSL connections and unecrypted connections, so it is a good fit for working with Redis deployments on Compose. Speaking of which....

The Certificate: By default, Redis deployments do not come with an TLS/SSL secured connection. But since using SSL/TLS is considered best practice you should provision a TLS connection on your deployment through its Security panel. The SSL/TLS connections are backed with a Let's Encrypt certificate.

The Connection: Set the connection string from your deployment as the environment variable COMPOSE_REDIS_URL. The driver won't parse it automatically, but the Python urllib can perform that for us. The parameters are then set to the parsed values. The SSL parameter is determined from the prefix of your connection string, "redis:" for non-secured connections and "rediss:" for secured ones. The last parameter, decode_responses set to True, takes the bytes returned from a Redis query and decodes them into strings.

# connection string and initialization
compose_redis_url = os.environ['COMPOSE_REDIS_URL']  
parsed = urlparse(compose_redis_url)  
ssl_wanted=compose_redis_url.startswith("rediss:")  
r = redis.StrictRedis(  
    host=parsed.hostname,
    port=parsed.port,
    password=parsed.password,
    ssl=ssl_wanted,
    decode_responses=True)

The Read: The word/definition pairs are stored in a hash named "words". The hash maps a key field to a value field, so the "word" is the key and the "definition" is the value. Getting all the pairs back is achieved with hgetall. The rest of the steps make an object that can be represented in JSON for display in the web app. First, the keys and values are separated into two lists. Then, those two lists are zipped together. Each pair in the zipped list is then made into an object with the field labels, so that json.dumps can return a well-formatted JSON object.

# queries and formats results for display on page
def display_find():  
    # query for all the words in the hash
    cursor_obj = r.hgetall('words')

    #makes two lists, one of keys and one of values
    keys_list = list(cursor_obj.keys())
    values_list = list(cursor_obj.values())

    # zips the lists of keys/values together, and makes an object of all word/definition pairs
    results_list = [{'word': word, 'definition': definition} 
        for word, definition in zip(keys_list, values_list)]

    # returns a json object from the object of word/definition pairs
    return json.dumps(results_list)

The Write: To add a new word/definition pair to the "words" hash, hset just needs the values in the text fields in the page, and the hash to add them to.

# triggers on hitting the 'Add' button; inserts word/definition into a hash
def handle_words():  
    r.hset("words", request.form['word'], request.form['definition'])
    return ('', 204)

RethinkDB

The Code: The full example code is in the example-rethinkdb folder of the Compose Grand Tour/Python repository.

The Driver: RethinkDB has an official Python driver.

The Certificate: On Compose, RethinkDB SSL/TLS is backed with a self-signed certificate. To use certificate verification, save a copy of the certificate locally, and keep it's path handy. The certificate is on the Overview page of the deployment, and the path to the local copy should go in an environment variable PATH_TO_RETHINKDB_CERT.

The Connection: Set the connection string provided by your RethinkDB deployment to the environment variable COMPOSE_RETHINKDB_URL. Again, urllib is used to parse the pieces of the connection string for the driver to comsume, as in the Redis example. The SSL settings are given the path to the local copy of the certificate from the environment variable set in the previous section.

RethinkDB structures data by storing documents in databases and tables, so also included here is the code to create the database and table to store our words. Should this be an initial run of the app, they do not already exist and so will be created. Subsequent runs will detect their presence and continue to establishing the connection using the 'grand_tour' database.

# connection string and initialization
compose_rethinkdb_url = os.environ['COMPOSE_RETHINKDB_URL']  
path_to_rethinkdb_cert = os.environ['PATH_TO_RETHINKDB_CERT']

parsed = urlparse(compose_rethinkdb_url)  
conn = r.connect(  
    host=parsed.hostname,
    port=parsed.port,
    user=parsed.username,
    password=parsed.password,
    ssl={'ca_certs': path_to_rethinkdb_cert}
)

# database initialization and table creation 
try:  
    r.db_create('grand_tour').run(conn)
    conn.use("grand_tour")
    r.table_create('words', shards=1, replicas=3).run(conn)
except r.ReqlRuntimeError:  
    # assume database or table already exists
    pass

conn.use("grand_tour")  

The Read: We run a query for all the currently stored words, but narrow each result to only the "word" and "definition" fields we need with pluck. This builds an object that contains a document for each word/definition pair. Upon executing the run(conn), the database connection is established and our requested object returned. Since each word is already a document, turning the results object into a list will give us a list of documents. The list, in turn, can be JSON-ified with json.dumps and the JSON is then returned to the page for display.

# queries and formats results for display on page
def display_find():  
    # query for all the words in the table
    cursor_obj = r.table("words").pluck("word", "definition").run(conn)

    # makes a list from the query results
    word_list = list(cursor_obj)

    # returns list as a JSON object for display
    return json.dumps(word_list)

The Write: To add a word to the table, we make a document from the text fields on the web page. To insert that document, we construct the insert specifying the "words" table and the "new_word" document, then connect to the database and execute on run(conn).

# triggers on hitting the 'Add' button
def handle_words():  
    #creates new word object
    new_word = {"word":request.form['word'], "definition":request.form['definition']}

    #inserts object into the table
    r.table("words").insert(new_word).run(conn)
    return ('', 204)

RabbitMQ

The Code: The full example code is in the example-rabbitmq folder of the Compose Grand Tour/Python github repository. Since RabbitMQ is a message queue and not a storage system, the example application is a little different. The web application provides a text field for a user’s message and a send button. When the button is clicked, the text is sent to the example Python server which then creates a message and sends it to a RabbitMQ exchange. Inside the messaging server, that message is transferred to a queue. A separate button in the browser calls the Python server and asks it to read from that queue then displays the results.

The Driver: The Python driver for RabbitMQ is pika. This is a generic AMQP 0.91 messaging driver and is well suited to RabbitMQ which uses AMQP 0.91 as its native protocol.

The Certificate: Compose RabbitMQ deployments use TLS/SSL-enabled connections that are backed with a Let's Encrypt certificate. You won't need to use a local copy of the certificate for this example.

The Connection: Find the connection strings provided with your RabbitMQ deployment and set one of them as an environment variable COMPOSE_RABBITMQ_URL. Again, Python's urllib does the parsing and the parsed pieces are passed to the driver. The vhost has to be set as a connection parameter, which is included in the provided connection string (with a little bit of formatting). ssl should be set to "True" to ensure an encrypted connection. Once the parameters are defined, we use them to establish a connection, and open a AMPQS channel within that connection.

The rest of the code sets up how RabbitMQ routes each message. Since this is the grand tour, we declare the exchange "grand_tour" to receive the messages. Then, we set up a queue named "words" to consume and hold the messages. In order to ensure that the queue gets the correct messages, bind the "words" queue to the "grand_tour" exchange and tell it to look for the routing key "python-msg".

# connection string and initialization
compose_rabbitmq_url = os.environ['COMPOSE_RABBITMQ_URL']  
parsed = urlparse(compose_rabbitmq_url)

credentials = pika.PlainCredentials(parsed.username, parsed.password)  
vhost = parsed.path.replace("/", "")

parameters = pika.ConnectionParameters(  
    host=parsed.hostname,
    port=parsed.port,
    credentials=credentials,
    virtual_host=vhost,
    ssl=True
)

connection = pika.BlockingConnection(parameters)

# establishes a connection
channel = connection.channel()

# creates an exchange to deliver messages to
channel.exchange_declare(exchange='grand_tour',  
                         exchange_type='direct',
                         durable=True)

# creates the message queue to consume the messages from the exchange
channel.queue_declare(queue='words')

# attaches the queue to the exchange and says any messages with
# a particular routing key should end up in this queue
channel.queue_bind(exchange='grand_tour',  
                   queue='words',
                   routing_key='python-msg')

The Write: The "Write" part of this example takes the message from the text field, and pushes it to RabbitMQ. Using the AMPQ channel that we opened, basic_publish will use the "grand_tour" exchange, give our message a routing key (the very creative "python-msg") and send the message as the body parameter. Once the message is sent, the web page will update by displaying "Message sent:" and the body of our message.

# triggers on hitting the 'send' button
def send_message():  
    # makes a message from the field in the page
    msg = request.form['message']

    # pushes the message into the exchange
    channel.basic_publish(
        exchange='grand_tour',
        routing_key='python-msg',
        body=msg)
    return msg

The Read: The "Read" part retrieves the message. basic_get will pull out messages from the "words" queue and separate their parts so that we can more easily parse the result. method_frame is used to let us know if there was even a message in the queue to receive, if not the app will display "{--No Messages in Queue--}". If there is a message, the app will display the message body on the page.

# Triggers on hitting the 'receive' button; retrieves a message from the queue
def receive_message():  
    # retrieves a message from the queue, stores its parts
    method_frame, props, body = channel.basic_get(queue='words', no_ack=True)
    # returns message body or 'no message' for display on page
    if method_frame:
        return body
    else:
        return "{--No Messages in Queue--}"

Part 2 Wrap-up

That concludes the second part of the Grand Tour in Python. This installment covered connecting to Redis, as well as making JSON out of key/value pairs. We connected to and queried RethinkDB. And finally, we took a look at RabbitMQ connections, exchanges, queues and messages. If you missed it, Part 1 covered connecting to MongoDB, Elasticsearch, and PostgreSQL. Watch for Part 3 covering connecting to Scylla and MySQL, coming soon.

attribution MabelAmber

Conquer the Data Layer

Spend your time developing apps, not managing databases.