Essentially etcd - Part 2 - Coordination

In this part of our look around etcd, we'll look at using etcd for coordination. In part one we looked at how we could use etcd as a source for truth and configuration information between numerous servers at Examplco.

But now the engineers have realised they have a new problem; they don't know what servers are listening for that configuration data.

This may seem like a simple problem... but it isn't.

"We could ping the server?"
"Just pinging the server isn't enough, that just tells you the network layer is alive."
"We could run something on the server?"
"That isn't really a light weight check and may affect other processes. And It also doesn't tell you about whether the applications are actually running."
"We could check the applications..."
"So we'd have to modify them to be actively checked?"
"Well what if we could make the servers regularly check in with their status and get alerted when they aren't doing that..."

And that's where etcd comes in again. As we're using it for the configuration data, let's also use it to keep an eye on server status. The way we can do this is using key/values with a time to live. Each server can create its own key and set it to expire after a number of seconds. The server has to get back before the key expires to update it . We touched on time-to-live in the previous part when we had to allow for keys disappearing because they had been deleted or expired.

TTL, the quick way to say Time To Live

So, how do you set a time to live? Well, the Examplco plan is to have a path called /running/ and each server will put a key in there with a time-to-live of 60 seconds. This is done with the ttl parameter in cURL like so:

curl -L https://user:pass@aws-us-east-1-portal8.dblayer.com:10329/v2/keys/running/server-15?ttl=60 -XPUT -d value="running"  

or in the Go API in our example, we could just loop and every 59 seconds update the key like this:

    var key = runningbase + *serverbeatname

    for {
        time.Sleep(time.Second * 59)
        fmt.Println("Badum")
        kapi.Set(context.TODO(), key, "running", &client.SetOptions{TTL: time.Second * 60})
    }

And, using some code that's quite similar to our server's config watcher from part one:

    watcher := kapi.Watcher(runningbase, &client.WatcherOptions{Recursive: true})

    for true {
        resp, err := watcher.Next(context.TODO())

        _, server := path.Split(resp.Node.Key)
        switch resp.Action {
        case "set":
            fmt.Println(server + " has heart beat")
        case "delete":
            fmt.Println(server + " has shut down correctly")
        case "expire":
            fmt.Println("*** " + server + " has missed heartbeat")
        }
    }

We can listen to /running/ for any changes and react on them. If we were building a real application we could delete our own running key and that could inform listening applications that we had shut down correctly. If the key has expired then for whatever reason, the server has stopped updating. If we see a "set" taking place, we know that is the server heartbeat.

PrevExist...

Or is it... Here's the first problem. If the server is delayed for whatever reason and the key expires, all it takes is the server to just update the key and no one knows any better. Except for the now confused watching process which saw a server expire... then come back to life.

The first thing to make sure of when we update the key, is that it hasn’t expired since we last left it. Enter the PrevExist option for set. Using this option, we can ensure that when we start up, no key currently exists and after that only updating the key if it already exists. Using the REST API, this option can be true or false (or you can just ignore using it). In the Go client API, you have client.PrevNoExist, client.PrevExist and client.PrevIgnore as options. When we start our heartbeat we'll go with PrevNoExist:

    var key = runningbase + *serverbeatname

    _, err := kapi.Set(context.TODO(), key, "running", &client.SetOptions{PrevExist: client.PrevNoExist, TTL: time.Second * 60})
    if err != nil {
        log.Fatal(err)
    }

Running this with an existing key would throw an error that looks something like this:

2015/10/19 17:00:29 105: Key already exists (/running/server-15) [9526]  

That will stop a server starting up without letting the old key expire according to its time to live. We could, instead of erroring and exiting, explicitly delete the old key and establish a new key, but we'll leave that as an exercise for the reader.

If we go into our heartbeat loop we can require that the key already exists with PrevExist set to true:

    for {
        time.Sleep(time.Second * 59)
        fmt.Println("Badum")
        _, err := kapi.Set(context.TODO(), key, "running", &client.SetOptions{PrevExist: client.PrevExist, TTL: time.Second * 60})
        if err != nil {
            log.Fatal(err)
        }
    }

Now, if the process stalls for whatever reason and it fails to heartbeat on time, it won't just rewrite the ttl-expired heartbeat key.

So that's prevExist, helping us ensure that the life cycle of a server is more predictable. The thing that makes it possible to rely on prevExist is the fact that it's an atomic operation; it checks the existence and does the setting all as one uninterruptable operation. That means there's no "grey area" where the race conditions might creep in. But there's another issue waiting in the wings. Who actually owns that key?

If we were to start two processes called server-15 say, they'd both try and claim ownership of the key and there's always the possibility that both would manage to get into their heartbeat loops.

Is this your key?

What we really need to do is, instead of just setting an arbitrary value in the heartbeat key, set a value that is unique and means something. The simplest way we can make that kind of value is to create a UUID string...

import (  
    ....
    "github.com/twinj/uuid"
)
...
    myuuid := uuid.NewV4()
    uuidstring := myuuid.String()

Now, when we create our key, we'll make it this value.

    _, err := kapi.Set(context.TODO(), key, uuidstring, &client.SetOptions{PrevExist: client.PrevNoExist, TTL: time.Second * 60})

When we go and heartbeat, we need to check that that UUID value is still the value we set it to... then we can reset it to generate this heartbeat. If we got the value, checked it and then set it, that's potentially a lot of time where another server could step in and change the value. Step up, etcd's atomic Compare-and-Swap which lets you do that all in one call.

        _, err := kapi.Set(context.TODO(), key, uuidstring, &client.SetOptions{PrevExist: client.PrevExist, TTL: time.Second * 60, PrevValue: uuidstring})

Here, we are setting the PrevValue option (?prevValue= in a URL for cURL) to the uuid string and setting the new value to be, well, in this case exactly the same. We could, if we wanted, make a new uuid each time and check against that, but for now we're sticking with the initial UUID. The next stop is listening to the heart beats... This is somewhat like what we did earlier, creating a watcher to track changes in the heartbeat tracker, but now we have new actions to process:

       _, server := path.Split(resp.Node.Key)
                switch resp.Action {
                case "create":
                        fmt.Println(server + " has started heart beat")
                case "compareAndSwap":
                        fmt.Println(server + " heart beat")
                case "compareAndDelete":
                        fmt.Println(server + " has shut down correctly")
                case "expire":
                        fmt.Println("*** " + server + " has missed heartbeat")
                default:
                        fmt.Println("Didn't handle " + resp.Action)
                }

Now, we can see the key being created with the "create" action coming from our newly heartbeating servers. We don't see set actions now when we have a heartbeat, we see a "compareAndSwap" happening because that's the name of the action setting prevValue on a PUT generates. We haven't implemented a server cleanly shutting down, the call would look something like this:

    _, err = kapi.Delete(context.TODO(), key, &client.DeleteOptions{PrevValue: uuidstring})

Here, rather than just deleting, we're making sure, before we delete, that we still own that heartbeat. If we do then the delete goes ahead and our watcher gets to see that the atomic "compareAndDelete" action occurred. We can now handle the "expire" action separately from the delete knowing now that if a key does expire, it means remedial action will have to be taken to restart that server.

This is just an example of using the prevExists and prevValue parameters and the atomic "Compare and Swap" and "Compare and Delete" actions that they generate. They allow a more precise handling of locks and other mechanisms, which combined with etcd's policy of truth by consensus, give a more reliable way to orchestrate actions.

So you can experiment with heartbeats and servers, there's an updated version of the examplco app, examplco2, which has a serverwatch command to listen to servers and a serverbeat command which takes a server name and optional --rate and --count flags to set number of beats and how quickly (and set the count to 0 to let it run forever).

In the next part of the Essentially etcd series, we'll look at what indexes mean in etcd and give a quick tour of some of the APIs, libraries and tools you can use with etcd.