Pynamo: Exploring the Dynamo Paper in Python

The original paper [PDF] that describes Amazon's Dynamo system is extremely interesting, and clearly explained. This page documents my attempt to create a bare-bones Python implementation of the key ideas from the Dynamo paper.

My aim here is to use the process of writing code as a way of understanding the relevant ideas better – in other words the code here is pedagogical rather than practical. For example, I deliberately use inefficent implementations in places – I'd rather have a slow implementation that just uses the Python standard library, than a fast version with distractions (by either requiring additional imports, or by including implementations of data structure that aren't specific to Dynamo).

I've also built the system on top of a simulated network, rather than on top of a real network or web server. This means I can more easily test different scenarios and track what's going on (more on the framework at the end).

For my final proviso, I should point out that I'm not going to explain much about the original ideas in the Dynamo paper, which are generally explained perfectly clearly. In other words, this page assumes you're read the original paper and that you have it close to hand when you're reading this.

Structurally, we'll start by exploring some of the building blocks used in the design, then start building up the Dynamo software step by step.

Contents

Building Block: Consistent Hashing (s4.2/s4.3)

The first area to explore is the consistent hashing algorithm described in section 4.2 of the paper. The constructor for the ConsistentHashTable class builds up the list of nodes, and the find_nodes method returns the preference list of nodes for the key (where the first entry in the list is the coordinator).

The first implementation just implements the straightforward one-hash-per-node approach:

#!/usr/bin/env python
"""Consistent hash code"""
import hashlib
import binascii
import bisect


class SimpleConsistentHashTable(object):
    def __init__(self, nodelist):
        """Initialize a consistent hash table for the given list of nodes"""
        baselist = [(hashlib.md5(str(node)).digest(), node) for node in nodelist]
        # Build two lists: one of (hashvalue, node) pairs, sorted by
        # hashvalue, one of just the hashvalues, to allow use of bisect.
        self.nodelist = sorted(baselist, key=lambda x: x[0])
        self.hashlist = [hashnode[0] for hashnode in self.nodelist]

    def find_nodes(self, key, count=1, avoid=None):
        """Return a list of count nodes from the hash table that are
        consecutively after the hash of the given key, together with
        those nodes from the avoid collection that have been avoided.

        Returned list size is <= count, and any nodes in the avoid collection
        are not included."""
        if avoid is None:  # Use an empty set
            avoid = set()
        # Hash the key to find where it belongs on the ring
        hv = hashlib.md5(str(key)).digest()
        # Find the node after this hash value around the ring, as an index
        # into self.hashlist/self.nodelist
        initial_index = bisect.bisect(self.hashlist, hv)
        next_index = initial_index
        results = []
        avoided = []
        while len(results) < count:
            if next_index == len(self.nodelist):  # Wrap round to the start
                next_index = 0
            node = self.nodelist[next_index][1]
            if node in avoid:
                if node not in avoided:
                    avoided.append(node)
            else:
                results.append(node)
            next_index = next_index + 1
            if next_index == initial_index:
                # Gone all the way around -- terminate loop regardless
                break
        return results, avoided

    def __str__(self):
        return ",".join(["(%s, %s)" %
                         (binascii.hexlify(nodeinfo[0]), nodeinfo[1])
                         for nodeinfo in self.nodelist])

The paper indicates a couple of problems with this simplistic approach, which we can check by feeding in some random data:

So we move to the second implementation, where each node gets multiple points in the hash ring, known as virtual nodes. We implement this very simply, by adding a ":<count>" suffix to the string that we hash for the node position.

#!/usr/bin/env python
"""Consistent hash code"""
import hashlib
import binascii
import bisect


class ConsistentHashTable(object):
    def __init__(self, nodelist, repeat):
        """Initialize a consistent hash table for the given list of nodes"""
        # Insert each node into the hash circle multiple times
        baselist = []
        for node in nodelist:
            for ii in xrange(repeat):
                nodestring = "%s:%d" % (node, ii)
                baselist.append((hashlib.md5(nodestring).digest(), node))
        # Build two lists: one of (hashvalue, node) pairs, sorted by
        # hashvalue, one of just the hashvalues, to allow use of bisect.
        self.nodelist = sorted(baselist, key=lambda x: x[0])
        self.hashlist = [hashnode[0] for hashnode in self.nodelist]

    def find_nodes(self, key, count=1, avoid=None):
        """Return a list of count nodes from the hash table that are
        consecutively after the hash of the given key, together with
        those nodes from the avoid collection that have been avoided.

        Returned list size is <= count, and any nodes in the avoid collection
        are not included."""
        if avoid is None:  # Use an empty set
            avoid = set()
        # Hash the key to find where it belongs on the ring
        hv = hashlib.md5(str(key)).digest()
        # Find the node after this hash value around the ring, as an index
        # into self.hashlist/self.nodelist
        initial_index = bisect.bisect(self.hashlist, hv)
        next_index = initial_index
        results = []
        avoided = []
        while len(results) < count:
            if next_index == len(self.nodelist):  # Wrap round to the start
                next_index = 0
            node = self.nodelist[next_index][1]
            if node in avoid:
                if node not in avoided:
                    avoided.append(node)
            elif node not in results:
                results.append(node)
            next_index = next_index + 1
            if next_index == initial_index:
                # Gone all the way around -- terminate loop regardless
                break
        return results, avoided

    def __str__(self):
        return ",".join(["(%s, %s)" %
                         (binascii.hexlify(nodeinfo[0]), nodeinfo[1])
                         for nodeinfo in self.nodelist])

Let's see how much different this makes by feeding in some random data to a set of 50 nodes with 10 copies of each node in the hash ring.

For 50 nodes with 100 copies of each node in the hash ring.

So the multiple node approach has lead to a much less 'lumpy' distribution of keys.

Putting It Together

Now we're going to start to put everything together. We're going to build on a simulated enviroment of nodes and messages between them, which allows testing and simulation. The framework is described in more detail in the appendix, but hopefully the framework's methods are clear enough not to need much explanation.

First up, let's simulate a client that wants to use the Dynamo system. Equivalently to section 4.1 of the paper, it has two methods: put(key, context, value) and get(key). The client is outside of the Dynamo system proper, so these methods are straightforward: build the appropriate message and send it to a random node in the Dynamo system (although this can be overridden with an explicit choice of node).

class DynamoClientNode(Node):
    def put(self, key, metadata, value, destnode=None):
        if destnode is None:  # Pick a random node to send the request to
            destnode = random.choice(DynamoNode.nodelist)
        putmsg = ClientPut(self, destnode, key, value, metadata)
        Framework.send_message(putmsg)

    def get(self, key, destnode=None):
        if destnode is None:  # Pick a random node to send the request to
            destnode = random.choice(DynamoNode.nodelist)
        getmsg = ClientGet(self, destnode, key)
        Framework.send_message(getmsg)

These messages are sent to a DynamoNode, whose definition includes some data structures. The class includes some constants controlling the amount of replication, including the N, W and R parameters described in section 4.5 of the paper. The class also keeps track of how many DynamoNode instances there are, and keeps a consistent hash table that corresponds to those instances.

class DynamoNode(Node):
    T = 10  # Number of repeats for nodes in consistent hash table
    N = 3  # Number of nodes to replicate at
    W = 2  # Number of nodes that need to reply to a write operation
    R = 2  # Number of nodes that need to reply to a read operation
    nodelist = []
    chash = ConsistentHashTable(nodelist, T)

    def __init__(self):
        super(DynamoNode, self).__init__()
        self.local_store = {}  # key => (value, metadata)
        self.pending_put_rsp = {}  # seqno => set of nodes that have stored
        self.pending_put_msg = {}  # seqno => original client message
        self.pending_get_rsp = {}  # seqno => set of (node, value, metadata) tuples
        self.pending_get_msg = {}  # seqno => original client message
        # Rebuild the consistent hash table
        DynamoNode.nodelist.append(self)
        DynamoNode.chash = ConsistentHashTable(DynamoNode.nodelist, DynamoNode.T)

Each instance of DynamoNode has some attributes of its own. The local_store attribute is a Python dictionary that simulates the node's local data store; this is where the key/value pairs stored by the system end up. There are also a pending_put and pending_get data structures that keep track of pending operations being coordinated by this node.

Access to the local data store local_store is via the store() and retrieve() methods; this will allow us to substitute in more sophisticated functionality later.

    def store(self, key, value, metadata):
        self.local_store[key] = (value, metadata)

    def retrieve(self, key):
        if key in self.local_store:
            return self.local_store[key]
        else:
            return (None, None)

So what happens when a client tries to put() a piece of data? The initial request message will arrive at a random Dynamo node, and its first action is to figure out the preference list.

    def rcv_clientput(self, msg):
        preference_list = DynamoNode.chash.find_nodes(msg.key, DynamoNode.N)[0]
        # Determine if we are in the list
        if self not in preference_list:
            # Forward to the coordinator for this key
            coordinator = preference_list[0]
            Framework.forward_message(msg, coordinator)
        else:
            # Use an incrementing local sequence number to distinguish
            # multiple requests for the same key
            seqno = self.generate_sequence_number()
            metadata = (self.name, seqno)  # For now, metadata is just sequence number at coordinator
            # Send out to preference list, and keep track of who has replied
            self.pending_put_rsp[seqno] = set()
            self.pending_put_msg[seqno] = msg
            reqcount = 0
            for node in preference_list:
                # Send message to get node in preference list to store
                putmsg = PutReq(self, node, msg.key, msg.value, metadata, msg_id=seqno)
                Framework.send_message(putmsg)
                reqcount = reqcount + 1
                if reqcount >= DynamoNode.N:
                    # preference_list may have more than N entries to allow for failed nodes
                    break

Continuing to follow the progress of a put() operation, the behaviour of the nodes that receive the Put messages is straightforward: they store the value, and send a response to say they've done it.

    def rcv_put(self, putmsg):
        self.store(putmsg.key, putmsg.value, putmsg.metadata)
        putrsp = PutRsp(putmsg)
        Framework.send_message(putrsp)

In turn, the original sending node ticks off these responses from its list of pending responses, and when it has had W total replies (including itself), the put() operation is done.

    def rcv_putrsp(self, putrsp):
        seqno = putrsp.msg_id
        if seqno in self.pending_put_rsp:
            self.pending_put_rsp[seqno].add(putrsp.from_node)
            if len(self.pending_put_rsp[seqno]) >= DynamoNode.W:
                # Tidy up tracking data structures
                original_msg = self.pending_put_msg[seqno]
                del self.pending_put_rsp[seqno]
                del self.pending_put_msg[seqno]
                # Reply to the original client
                client_putrsp = ClientPutRsp(original_msg)
                Framework.send_message(client_putrsp)
        else:
            pass  # Superfluous reply

We can see an example of the whole process as a ladder diagram. Notice that the response goes back to the client before the final Put response arrives – the N parameter is 3, so the Put request is processed by 3 nodes, but the W parameter is only 2 and so the final response isn't needed for the process to complete.

A                    B                    C                    D                    a
.                    .                    .                    .   ClientPut(K1=1) +o
<----------------------------------------------------------------------------------+.
++ ClientPut(K1=1)   .                    .                    .                    .
.+------------------------------------------------------------->                    .
.                    .                    .      PutReq(K1=1) +o                    .
.                    .                    .     PutReq(K1=1) +-o                    .
.                    .                    .    PutReq(K1=1) +--o                    .
.                    .                    .                 ||+>                    .
.                    .                    .      PutRsp(K1=1) +o                    .
.                    <---------------------------------------+|.                    .
.                    o+ PutRsp(K1=1)      .                 | |.                    .
.                    .|                   <-----------------+ |.                    .
.                    .|                   o+ PutRsp(K1=1)     |.                    .
.                    .|                   .|                  +>                    .
.                    .+---------------------------------------->                    .
.                    .                    .|                   o+ ClientPutRsp(K1=1).
.                    .                    .+------------------->|                   .
.                    .                    .                    .+------------------->
A                    B                    C                    D                    a
                   K1:1                 K1:1                 K1:1 

Interspersing Put operations for two different keys, from two different clients makes the ladder diagram more complicated, but the underlying details are the same.

A              B              C              D              E              F              a              b
.              .              .              .              .            ClientPut(K1=1) +o              .
<----------------------------------------------------------------------------------------+.              .
++ ClientPut(K1=1)            .              .              .              .              .              .
.|             .              .              .              .              .           ClientPut(K2=17) +o
.+------------------------------------------->              .              .              .             |.
.              .              .PutReq(K1=1) +o              .              .              .             |.
.              .              PutReq(K1=1) +-o              .              .              .             |.
.              .             PutReq(K1=1) +--o              .              .              .             |.
.              .              <-------------------------------------------------------------------------+.
.              .              o+ PutReq(K2=17)              .              .              .              .
.              PutReq(K2=17) +o|          |||.              .              .              .              .
.              .             |o-+ PutReq(K2=17)             .              .              .              .
.              .             |.||         ||+>              .              .              .              .
.              .             |.PutRsp(K1=1) +o              .              .              .              .
.              <---------------------------+|.              .              .              .              .
.              o+ PutRsp(K1=1).||         | |.              .              .              .              .
.              .|            |<-----------+ |.              .              .              .              .
.              .|            |o--+ PutRsp(K1=1)             .              .              .              .
.              .|            |.+------------------------------------------->              .              .
.              .|            |. ||          |.              PutRsp(K2=17) +o              .              .
.              .|            +> ||          |.              .             |.              .              .
.              PutRsp(K2=17) +o ||          |.              .             |.              .              .
.              .|            |. +--------------------------->             |.              .              .
.              .|            |.  |          |PutRsp(K2=17) +o             |.              .              .
.              .|            |.  |          +>             |.             |.              .              .
.              .+---------------------------->             |.             |.              .              .
.              .             |.  |           o+ ClientPutRsp(K1=1)        |.              .              .
.              .             |.  +----------->|            |.             |.              .              .
.              .             |<-------------------------------------------+.              .              .
.              .             +>              .|            |.              .              .              .
.              .              o+ ClientPutRsp(K2=17)       |.              .              .              .
.              .              <----------------------------+.              .              .              .
.              .              .|             .+------------------------------------------->              .
.              .              .+------------------------------------------------------------------------->
A              B              C              D              E              F              a              b
             K1:1           K2:17          K1:1           K2:17          K2:17                            
                            K1:1 

Getting it Back

After seeing the Put infrastructure, the Get infrastructure is pretty similar.

    def rcv_clientget(self, msg):
        preference_list = DynamoNode.chash.find_nodes(msg.key, DynamoNode.N)[0]
        # Determine if we are in the list
        if self not in preference_list:
            # Forward to the coordinator for this key
            coordinator = preference_list[0]
            Framework.forward_message(msg, coordinator)
        else:
            seqno = self.generate_sequence_number()
            self.pending_get_rsp[seqno] = set()
            self.pending_get_msg[seqno] = msg
            reqcount = 0
            for node in preference_list:
                getmsg = GetReq(self, node, msg.key, msg_id=seqno)
                Framework.send_message(getmsg)
                reqcount = reqcount + 1
                if reqcount >= DynamoNode.N:
                    # preference_list may have more than N entries to allow for failed nodes
                    break

Each of those nodes simply retrieves the relevant information from its local data store and sends it back.

    def rcv_get(self, getmsg):
        (value, metadata) = self.retrieve(getmsg.key)
        getrsp = GetRsp(getmsg, value, metadata)
        Framework.send_message(getrsp)

Finally, when enough (i.e. R) responses arrive, the first Dynamo node sends a confirmation response back to the client. This part has a slight complication – the results aren't necessarily all the same, so we return the whole set (folding duplicates).

    def rcv_getrsp(self, getrsp):
        seqno = getrsp.msg_id
        if seqno in self.pending_get_rsp:
            self.pending_get_rsp[seqno].add((getrsp.from_node, getrsp.value, getrsp.metadata))
            if len(self.pending_get_rsp[seqno]) >= DynamoNode.R:
                # Build up all the distinct values/metadata values for the response to the original request
                results = set([(value, metadata) for (node, value, metadata) in self.pending_get_rsp[seqno]])
                # Tidy up tracking data structures
                original_msg = self.pending_get_msg[seqno]
                del self.pending_get_rsp[seqno]
                del self.pending_get_msg[seqno]
                # Reply to the original client, including all received values
                client_getrsp = ClientGetRsp(original_msg,
                                             [value for (value, metadata) in results],
                                             [metadata for (value, metadata) in results])
                Framework.send_message(client_getrsp)
        else:
            pass  # Superfluous reply

The whole process looks like the following:

A                    B                    C                    D                    a
.                    .                    .                    .   ClientGet(K1=?) +o
.                    .                    <----------------------------------------+.
.                    .                    o+ GetReq(K1=?)      .                    .
.                    .      GetReq(K1=?) +o|                   .                    .
.                    .     GetReq(K1=?) +-o|                   .                    .
.                    .                  ||.+------------------->                    .
.                    .                  ||.      GetRsp(K1=1) +o                    .
.                    <-------------------+.                   |.                    .
.                    o+ GetRsp(K1=1)    | .                   |.                    .
.                    .|                 +->                   |.                    .
.                    .|     GetRsp(K1=1) +o                   |.                    .
.                    .|                  |<-------------------+.                    .
.                    .+------------------->                    .                    .
.                    .                   |o+ ClientGetRsp(K1=[1])                   .
.                    .                   +>|                   .                    .
.                    .                    .+---------------------------------------->
A                    B                    C                    D                    a
                   K1:1                 K1:1                 K1:1 

Coping With Failure

Nothing we've written so far copes with failure. If just one node were to fail, then because N is 3 and (say) W is 2, we might just cope:

A                    B                    C                    D                    a
.                    .                    .                    .   ClientPut(K1=1) +o
<----------------------------------------------------------------------------------+.
++ ClientPut(K1=1)   .                    .                    .                    .
.|                 FAIL                   .                    .                    .
.+------------------------------------------------------------->                    .
.                    x                    .      PutReq(K1=1) +o                    .
.                    x                    .     PutReq(K1=1) +-o                    .
.                    x                    .    PutReq(K1=1) +--o                    .
.                    x                    .                 ||+>                    .
.                    x                    .      PutRsp(K1=1) +o                    .
.                    X---------------------------------------+|.                    .
.                    x                    <-----------------+ |.                    .
.                    x                    o+ PutRsp(K1=1)     |.                    .
.                    x                    .|                  +>                    .
.                    x                    .+------------------->                    .
.                    x                    .                    o+ ClientPutRsp(K1=1).
.                    x                    .                    .+------------------->
.                    x                    .                    .   ClientGet(K1=?) +o
.                    x                    <----------------------------------------+.
.                    x                    o+ GetReq(K1=?)      .                    .
.                    x      GetReq(K1=?) +o|                   .                    .
.                    x     GetReq(K1=?) +-o|                   .                    .
.                    x                  ||.+------------------->                    .
.                    x                  ||.      GetRsp(K1=1) +o                    .
.                    X-------------------+.                   |.                    .
.                    x                  +->                   |.                    .
.                    x      GetRsp(K1=1) +o                   |.                    .
.                    x                   |<-------------------+.                    .
.                    x                   +>                    .                    .
.                    x                    o+ ClientGetRsp(K1=[1])                   .
.                    x                    .+---------------------------------------->
A                    B                    C                    D                    a
                                        K1:1                 K1:1 

But if two nodes fail, then we're doomed:

B                C                D                a
.                .                ClientPut(K1=1) +o
.                .                <---------------+.
.                .  PutReq(K1=1) +o                .
.                . PutReq(K1=1) +-o                .
.                .PutReq(K1=1) +--o                .
FAIL             .             |||.                .
x              FAIL            |||.                .
x                x             ||+>                .
x                x  PutRsp(K1=1) +o                .
X-------------------------------+|.                .
x                X-------------+ |.                .
x                x               +>                .
B                C                D                a
                                K1:1 

Almost everything we still need to add to the code is about engineering around the consequences of failures:

Node Failure Detection (Section 4.8.3)

The first step in coping with failure is detecting it. The message-sending framework we're using can automatically start a timer for any request message that is sent, and after a suitable period without a response, it can notify the original sender of the failure. To use this facility, the sender just needs to include a rsp_timer_pop method in its class.

To illustrate, here's the changes to the client to get it to resend any request on failure. If there are multiple pending requests for the same failed destination node, we assume that they will all also fail, and resend them too.

  class DynamoClientNode(Node):
+     timer_priority = 17
+ 
      def put(self, key, metadata, value, destnode=None):
          if destnode is None:  # Pick a random node to send the request to
              destnode = random.choice(DynamoNode.nodelist)
          putmsg = ClientPut(self, destnode, key, value, metadata)
          Framework.send_message(putmsg)
  
      def get(self, key, destnode=None):
          if destnode is None:  # Pick a random node to send the request to
              destnode = random.choice(DynamoNode.nodelist)
          getmsg = ClientGet(self, destnode, key)
          Framework.send_message(getmsg)
  
+     def rsp_timer_pop(self, reqmsg):
+         if isinstance(reqmsg, ClientPut):  # retry
+             self.put(reqmsg.key, reqmsg.metadata, reqmsg.value)
+         elif isinstance(reqmsg, ClientGet):  # retry
+             self.get(reqmsg.key)
+ 

To show this in action, we can see what happens if the first receiving node fails; the client eventually retries and picks a different node at random.

A                    B                    C                    D                    a
.                    .                    .                    .   ClientPut(K1=1) +o
FAIL                 .                    .                    .                   |.
X----------------------------------------------------------------------------------+.
x                    .                    .                    .                Timer:Pop
x                    .                    .                    .   ClientPut(K1=1) +o
x                    .                    <----------------------------------------+.
x                    .                    o+ PutReq(K1=1)      .                    .
x                    .      PutReq(K1=1) +o|                   .                    .
x                    .     PutReq(K1=1) +-o|                   .                    .
x                    .                  ||.+------------------->                    .
x                    .                  ||.      PutRsp(K1=1) +o                    .
x                    <-------------------+.                   |.                    .
x                    o+ PutRsp(K1=1)    | .                   |.                    .
x                    .|                 +->                   |.                    .
x                    .|     PutRsp(K1=1) +o                   |.                    .
x                    .|                  |<-------------------+.                    .
x                    .+------------------->                    .                    .
x                    .                   |o+ ClientPutRsp(K1=1).                    .
x                    .                   +>|                   .                    .
x                    .                    .+---------------------------------------->
A                    B                    C                    D                    a
                   K1:1                 K1:1                 K1:1 

Inside the Dynamo network, each node keeps track of the requests it has outstanding, and when a request times out the destination node is marked as failed and the request is sent on to additional (hopefully working) nodes.

  class DynamoNode(Node):
+     timer_priority = 20
      T = 10  # Number of repeats for nodes in consistent hash table
      N = 3  # Number of nodes to replicate at
      W = 2  # Number of nodes that need to reply to a write operation
      R = 2  # Number of nodes that need to reply to a read operation
      nodelist = []
      chash = ConsistentHashTable(nodelist, T)
  
      def __init__(self):
          super(DynamoNode, self).__init__()
          self.local_store = {}  # key => (value, metadata)
          self.pending_put_rsp = {}  # seqno => set of nodes that have stored
          self.pending_put_msg = {}  # seqno => original client message
          self.pending_get_rsp = {}  # seqno => set of (node, value, metadata) tuples
          self.pending_get_msg = {}  # seqno => original client message
+         # seqno => set of requests sent to other nodes, for each message class
+         self.pending_req = {PutReq: {}, GetReq: {}}
+         self.failed_nodes = []
          # Rebuild the consistent hash table
          DynamoNode.nodelist.append(self)
          DynamoNode.chash = ConsistentHashTable(DynamoNode.nodelist, DynamoNode.T)
  

Failed nodes are avoided in the preference list, both for Put operations:

      def rcv_clientput(self, msg):
-         preference_list = DynamoNode.chash.find_nodes(msg.key, DynamoNode.N)[0]
+         preference_list = DynamoNode.chash.find_nodes(msg.key, DynamoNode.N, self.failed_nodes)[0]
?                                                                            +++++++++++++++++++

          # Determine if we are in the list
          if self not in preference_list:
              # Forward to the coordinator for this key
              coordinator = preference_list[0]
              Framework.forward_message(msg, coordinator)
          else:
              # Use an incrementing local sequence number to distinguish
              # multiple requests for the same key
              seqno = self.generate_sequence_number()
              metadata = (self.name, seqno)  # For now, metadata is just sequence number at coordinator
              # Send out to preference list, and keep track of who has replied
+             self.pending_req[PutReq][seqno] = set()
              self.pending_put_rsp[seqno] = set()
              self.pending_put_msg[seqno] = msg
              reqcount = 0
              for node in preference_list:
                  # Send message to get node in preference list to store
                  putmsg = PutReq(self, node, msg.key, msg.value, metadata, msg_id=seqno)
+                 self.pending_req[PutReq][seqno].add(putmsg)
                  Framework.send_message(putmsg)
                  reqcount = reqcount + 1
                  if reqcount >= DynamoNode.N:
                      # preference_list may have more than N entries to allow for failed nodes
                      break
  

and for Get operations:

      def rcv_clientget(self, msg):
-         preference_list = DynamoNode.chash.find_nodes(msg.key, DynamoNode.N)[0]
+         preference_list = DynamoNode.chash.find_nodes(msg.key, DynamoNode.N, self.failed_nodes)[0]
?                                                                            +++++++++++++++++++

          # Determine if we are in the list
          if self not in preference_list:
              # Forward to the coordinator for this key
              coordinator = preference_list[0]
              Framework.forward_message(msg, coordinator)
          else:
              seqno = self.generate_sequence_number()
+             self.pending_req[GetReq][seqno] = set()
              self.pending_get_rsp[seqno] = set()
              self.pending_get_msg[seqno] = msg
              reqcount = 0
              for node in preference_list:
                  getmsg = GetReq(self, node, msg.key, msg_id=seqno)
+                 self.pending_req[GetReq][seqno].add(getmsg)
                  Framework.send_message(getmsg)
                  reqcount = reqcount + 1
                  if reqcount >= DynamoNode.N:
                      # preference_list may have more than N entries to allow for failed nodes
                      break
  

The code also needs to update the set of outstanding request when responses are received, both for Put:

      def rcv_putrsp(self, putrsp):
          seqno = putrsp.msg_id
          if seqno in self.pending_put_rsp:
              self.pending_put_rsp[seqno].add(putrsp.from_node)
              if len(self.pending_put_rsp[seqno]) >= DynamoNode.W:
                  # Tidy up tracking data structures
                  original_msg = self.pending_put_msg[seqno]
+                 del self.pending_req[PutReq][seqno]
                  del self.pending_put_rsp[seqno]
                  del self.pending_put_msg[seqno]
                  # Reply to the original client
                  client_putrsp = ClientPutRsp(original_msg)
                  Framework.send_message(client_putrsp)
          else:
              pass  # Superfluous reply
  

and Get:

      def rcv_getrsp(self, getrsp):
          seqno = getrsp.msg_id
          if seqno in self.pending_get_rsp:
              self.pending_get_rsp[seqno].add((getrsp.from_node, getrsp.value, getrsp.metadata))
              if len(self.pending_get_rsp[seqno]) >= DynamoNode.R:
                  # Build up all the distinct values/metadata values for the response to the original request
                  results = set([(value, metadata) for (node, value, metadata) in self.pending_get_rsp[seqno]])
                  # Tidy up tracking data structures
                  original_msg = self.pending_get_msg[seqno]
+                 del self.pending_req[GetReq][seqno]
                  del self.pending_get_rsp[seqno]
                  del self.pending_get_msg[seqno]
                  # Reply to the original client, including all received values
                  client_getrsp = ClientGetRsp(original_msg,
                                               [value for (value, metadata) in results],
                                               [metadata for (value, metadata) in results])
                  Framework.send_message(client_getrsp)
          else:
              pass  # Superfluous reply
  

A node gets treated as failed when some request to it times out. To keep the replication factors up to scratch, the timeout code also expands the set of nodes that the original request was sent to.

    def rsp_timer_pop(self, reqmsg):
        # no response to this request; treat the destination node as failed
        self.failed_nodes.append(reqmsg.to_node)
        failed_requests = Framework.cancel_timers_to(reqmsg.to_node)
        failed_requests.append(reqmsg)
        for failedmsg in failed_requests:
            self.retry_request(failedmsg)

    def retry_request(self, reqmsg):
        if not isinstance(reqmsg, DynamoRequestMessage):
            return
        # Send the request to an additional node by regenerating the preference list
        preference_list = DynamoNode.chash.find_nodes(reqmsg.key, DynamoNode.N, self.failed_nodes)[0]
        kls = reqmsg.__class__
        # Check the pending-request list for this type of request message
        if kls in self.pending_req and reqmsg.msg_id in self.pending_req[kls]:
            for node in preference_list:
                if node not in [req.to_node for req in self.pending_req[kls][reqmsg.msg_id]]:
                    # Found a node on the new preference list that hasn't been sent the request.
                    # Send it a copy
                    newreqmsg = copy.copy(reqmsg)
                    newreqmsg.to_node = node
                    self.pending_req[kls][reqmsg.msg_id].add(newreqmsg)
                    Framework.send_message(newreqmsg)

With these modifications, the failures shown earlier start to look more recoverable from:

B                C                D                E                F                a
.                .                .                .                ClientPut(K1=1) +o
.                .                <-------------------------------------------------+.
.                .  PutReq(K1=1) +o                .                .                .
.                . PutReq(K1=1) +-o                .                .                .
.                .PutReq(K1=1) +--o                .                .                .
FAIL             .             |||.                .                .                .
x              FAIL            |||.                .                .                .
x                x             ||+>                .                .                .
x                x  PutRsp(K1=1) +o                .                .                .
X-------------------------------+|.                .                .                .
x                X-------------+ |.                .                .                .
x                x               +>                .                .                .
x                x            Timer:Pop            .                .                .
x                x                o+ PutReq(K1=1)  .                .                .
x                x                .+-------------------------------->                .
x                x                .                .  PutRsp(K1=1) +o                .
x                x                <--------------------------------+.                .
x                x                o+ ClientPutRsp(K1=1)             .                .
x                x                .+------------------------------------------------->
B                C                D                E                F                a
                                K1:1                              K1:1 

A subsequent request for the same key will skip the failed nodes automatically:

B                C                D                E                F                a
x                x                .                .                ClientPut(K1=2) +o
x                x                <-------------------------------------------------+.
x                x  PutReq(K1=2) +o                .                .                .
x                x               |o+ PutReq(K1=2)  .                .                .
x                x               |o-+ PutReq(K1=2) .                .                .
x                x               +>||              .                .                .
x                x  PutRsp(K1=2) +o||              .                .                .
x                x               |.+-------------------------------->                .
x                x               |. |              .  PutRsp(K1=2) +o                .
x                x               |. +-------------->               |.                .
x                x               |.  PutRsp(K1=2) +o               |.                .
x                x               +>               |.               |.                .
x                x                <--------------------------------+.                .
x                x                o+ ClientPutRsp(K1=2)             .                .
x                x                <---------------+.                .                .
x                x                .+------------------------------------------------->
B                C                D                E                F                a
                                K1:2             K1:2             K1:2 

Node Recovery Detection

Of course, nodes that have failed may recover. To keep an eye out for this, we periodically check a node that has failed to see if it has recovered.

    def retry_failed_node(self, _):  # Permanently repeating timer
        if self.failed_nodes:
            node = self.failed_nodes.pop(0)
            # Send a test message to the oldest failed node
            pingmsg = PingReq(self, node)
            Framework.send_message(pingmsg)
        # Restart the timer
        TimerManager.start_timer(self, reason="retry", priority=15, callback=self.retry_failed_node)

    def rcv_pingreq(self, pingmsg):
        # Always reply to a test message
        pingrsp = PingRsp(pingmsg)
        Framework.send_message(pingrsp)

    def rcv_pingrsp(self, pingmsg):
        # Remove all instances of recovered node from failed node list
        recovered_node = pingmsg.from_node
        while recovered_node in self.failed_nodes:
            self.failed_nodes.remove(recovered_node)
        if recovered_node in self.pending_handoffs:
            for key in self.pending_handoffs[recovered_node]:
                # Send our latest value for this key
                (value, metadata) = self.retrieve(key)
                putmsg = PutReq(self, recovered_node, key, value, metadata)
                Framework.send_message(putmsg)
            del self.pending_handoffs[recovered_node]

This Ping request will likely fail at first, which will keep the node on the failed list.

B                C                D                E                F                a
x                x            Timer:Pop            .                .                .
x                x       PingReq +o                .                .                .
X--------------------------------+.                .                .                .
B                C                D                E                F                a
                                K1:2             K1:2             K1:2 

But once a node recovers, some Ping will eventually succeed. However, a Get request after this recovery will not necessarily get the right answer: the different nodes that have been involved in storing values for the key along the way now have a different idea of what the most up-to-date value for that key is. This means we'll need some way for the nodes to get back in sync.

B                C                D                E                F                a
RECOVER          x                .                .                .                .
.             RECOVER             .                .                .                .
.                .            Timer:Pop            .                .                .
.                .       PingReq +o                .                .                .
<--------------------------------+.                .                .                .
o+ PingRsp       .                .                .                .                .
.+-------------------------------->                .                .                .
.                .            Timer:Pop            .                .                .
.                .       PingReq +o                .                .                .
.                <---------------+.                .                .                .
.                o+ PingRsp       .                .                .                .
.                .+--------------->                .                .                .
.                .                .                .                ClientGet(K1=?) +o
.                .                <-------------------------------------------------+.
.                .  GetReq(K1=?) +o                .                .                .
.                . GetReq(K1=?) +-o                .                .                .
.                .GetReq(K1=?) +--o                .                .                .
.                .             ||+>                .                .                .
.                .  GetRsp(K1=2) +o                .                .                .
<-------------------------------+|.                .                .                .
o+ GetRsp(K1=None)             | |.                .                .                .
.|               <-------------+ |.                .                .                .
.|               o+ GetRsp(K1=None)                .                .                .
.|               .|              +>                .                .                .
.+-------------------------------->                .                .                .
.                .|               o+ ClientGetRsp(K1=[2, None])     .                .
.                .+--------------->|               .                .                .
.                .                .+------------------------------------------------->
B                C                D                E                F                a
                                K1:2             K1:2             K1:2 

However, at this point a subsequent Put request will return to using the original preference list.

B                C                D                E                F                a
RECOVER          x                .                .                .                .
.             RECOVER             .                .                .                .
.                .            Timer:Pop            .                .                .
.                .       PingReq +o                .                .                .
<--------------------------------+.                .                .                .
o+ PingRsp       .                .                .                .                .
.+-------------------------------->                .                .                .
.                .            Timer:Pop            .                .                .
.                .       PingReq +o                .                .                .
.                <---------------+.                .                .                .
.                o+ PingRsp       .                .                .                .
.                .+--------------->                .                .                .
.                .                .                .                ClientPut(K1=3) +o
.                .                <-------------------------------------------------+.
.                .  PutReq(K1=3) +o                .                .                .
.                . PutReq(K1=3) +-o                .                .                .
.                .PutReq(K1=3) +--o                .                .                .
.                .             ||+>                .                .                .
.                .  PutRsp(K1=3) +o                .                .                .
<-------------------------------+|.                .                .                .
o+ PutRsp(K1=3)  .             | |.                .                .                .
.|               <-------------+ |.                .                .                .
.|               o+ PutRsp(K1=3) |.                .                .                .
.|               .|              +>                .                .                .
.+-------------------------------->                .                .                .
.                .|               o+ ClientPutRsp(K1=3)             .                .
.                .+--------------->|               .                .                .
.                .                .+------------------------------------------------->
B                C                D                E                F                a
K1:3           K1:3             K1:3             K1:2             K1:2 

Hinted Handoff

The most straightforward approach to restore synchronization is a hinted handoff: the extra node that received the Put operation gets told about the failed node that should have received the Put in the first place. The extra node can then monitor the original node for liveness, and when it recovers send it the data that it missed.

The first change to do this is at the sending node, filling in a handoff parameter on the Put message that is sent to the extra nodes.

      def rcv_clientput(self, msg):
-         preference_list = DynamoNode.chash.find_nodes(msg.key, DynamoNode.N, self.failed_nodes)[0]
?                                                                                                ---

+         preference_list, avoided = DynamoNode.chash.find_nodes(msg.key, DynamoNode.N, self.failed_nodes)
?                        +++++++++

+         # Only track avoided nodes that would have been part of the original preference list
+         avoided = avoided[:DynamoNode.N]
+         non_extra_count = DynamoNode.N - len(avoided)
          # Determine if we are in the list
          if self not in preference_list:
              # Forward to the coordinator for this key
              coordinator = preference_list[0]
              Framework.forward_message(msg, coordinator)
          else:
              # Use an incrementing local sequence number to distinguish
              # multiple requests for the same key
              seqno = self.generate_sequence_number()
              metadata = (self.name, seqno)  # For now, metadata is just sequence number at coordinator
              # Send out to preference list, and keep track of who has replied
              self.pending_req[PutReq][seqno] = set()
              self.pending_put_rsp[seqno] = set()
              self.pending_put_msg[seqno] = msg
              reqcount = 0
-             for node in preference_list:
+             for ii, node in enumerate(preference_list):
?                 ++++        ++++++++++               +

+                 if ii >= non_extra_count:
+                     # This is an extra node that's only include because of a failed node
+                     handoff = avoided
+                 else:
+                     handoff = None
                  # Send message to get node in preference list to store
-                 putmsg = PutReq(self, node, msg.key, msg.value, metadata, msg_id=seqno)
+                 putmsg = PutReq(self, node, msg.key, msg.value, metadata, msg_id=seqno, handoff=handoff)
?                                                                                       +++++++++++++++++

                  self.pending_req[PutReq][seqno].add(putmsg)
                  Framework.send_message(putmsg)
                  reqcount = reqcount + 1
                  if reqcount >= DynamoNode.N:
                      # preference_list may have more than N entries to allow for failed nodes
                      break
  

The next change is at the receiving node, which explictly starts to monitor the failed nodes for recovery, and tracks which keys need to be propagated on recovery.

      def rcv_put(self, putmsg):
          self.store(putmsg.key, putmsg.value, putmsg.metadata)
+         if putmsg.handoff is not None:
+             for failed_node in putmsg.handoff:
+                 self.failed_nodes.append(failed_node)
+                 if failed_node not in self.pending_handoffs:
+                     self.pending_handoffs[failed_node] = set()
+                 self.pending_handoffs[failed_node].add(putmsg.key)
          putrsp = PutRsp(putmsg)
          Framework.send_message(putrsp)
  

This results in the generation of some additional Ping messages.

B                C                D                E                F                a
x                x                .                .                ClientPut(K1=2) +o
x                x                <-------------------------------------------------+.
x                x  PutReq(K1=2) +o                .                .                .
x                x               |o+ PutReq(K1=2, handoff=(B,C))    .                .
x                x               |o-+ PutReq(K1=2, handoff=(B,C))   .                .
x                x               +>||              .                .                .
x                x  PutRsp(K1=2) +o||              .                .                .
x                x               |.+-------------------------------->                .
x                x               |. |              .  PutRsp(K1=2) +o                .
x                x               |. +-------------->               |.                .
x                x               |.  PutRsp(K1=2) +o               |.                .
x                x               +>               |.               |.                .
x                x                <--------------------------------+.                .
x                x                o+ ClientPutRsp(K1=2)             .                .
x                x                <---------------+.                .                .
x                x                .+------------------------------------------------->
x                x            Timer:Pop            .                .                .
x                x       PingReq +o                .                .                .
X--------------------------------+.                .                .                .
x                x                .            Timer:Pop            .                .
x                x                .       PingReq +o                .                .
X-------------------------------------------------+.                .                .
x                x                .                .            Timer:Pop            .
x                x                .                .       PingReq +o                .
X------------------------------------------------------------------+.                .
x                x            Timer:Pop            .                .                .
x                x       PingReq +o                .                .                .
x                X---------------+.                .                .                .
B                C                D                E                F                a
                                K1:2             K1:2             K1:2 

Finally, when node recovery is detected, the appropriate Put messages to resynchronize the failed node's state are sent.

      def retry_failed_node(self, _):  # Permanently repeating timer
          if self.failed_nodes:
              node = self.failed_nodes.pop(0)
              # Send a test message to the oldest failed node
              pingmsg = PingReq(self, node)
              Framework.send_message(pingmsg)
          # Restart the timer
          TimerManager.start_timer(self, reason="retry", priority=15, callback=self.retry_failed_node)
  
      def rcv_pingreq(self, pingmsg):
          # Always reply to a test message
          pingrsp = PingRsp(pingmsg)
          Framework.send_message(pingrsp)
  
      def rcv_pingrsp(self, pingmsg):
          # Remove all instances of recovered node from failed node list
          recovered_node = pingmsg.from_node
          while recovered_node in self.failed_nodes:
              self.failed_nodes.remove(recovered_node)
+         if recovered_node in self.pending_handoffs:
+             for key in self.pending_handoffs[recovered_node]:
+                 # Send our latest value for this key
+                 (value, metadata) = self.retrieve(key)
+                 putmsg = PutReq(self, recovered_node, key, value, metadata)
+                 Framework.send_message(putmsg)
+             del self.pending_handoffs[recovered_node]
  

This results in the recovered nodes discovering the state update that they'd missed out on.

B                C                D                E                F                a
RECOVER          x                .                .                .                .
.             RECOVER             .                .                .                .
.                .                .            Timer:Pop            .                .
.                .                .       PingReq +o                .                .
.                <--------------------------------+.                .                .
.                o+ PingRsp       .                .                .                .
.                .+-------------------------------->                .                .
.                .                .  PutReq(K1=2) +o                .                .
.                <--------------------------------+.                .                .
.                o+ PutRsp(K1=2)  .                .                .                .
.                .+-------------------------------->                .                .
.                .                .                .            Timer:Pop            .
.                .                .                .       PingReq +o                .
.                <-------------------------------------------------+.                .
.                o+ PingRsp       .                .                .                .
.                .+------------------------------------------------->                .
.                .                .                .  PutReq(K1=2) +o                .
.                <-------------------------------------------------+.                .
.                o+ PutRsp(K1=2)  .                .                .                .
.                .+------------------------------------------------->                .
.                .            Timer:Pop            .                .                .
.                .       PingReq +o                .                .                .
<--------------------------------+.                .                .                .
o+ PingRsp       .                .                .                .                .
.+-------------------------------->                .                .                .
.                .                .            Timer:Pop            .                .
.                .                .       PingReq +o                .                .
<-------------------------------------------------+.                .                .
o+ PingRsp       .                .                .                .                .
.+------------------------------------------------->                .                .
.                .                .  PutReq(K1=2) +o                .                .
<-------------------------------------------------+.                .                .
o+ PutRsp(K1=2)  .                .                .                .                .
.+------------------------------------------------->                .                .
.                .                .                .            Timer:Pop            .
.                .                .                .       PingReq +o                .
<------------------------------------------------------------------+.                .
o+ PingRsp       .                .                .                .                .
.+------------------------------------------------------------------>                .
.                .                .                .  PutReq(K1=2) +o                .
<------------------------------------------------------------------+.                .
o+ PutRsp(K1=2)  .                .                .                .                .
.+------------------------------------------------------------------>                .
.                .            Timer:Pop            .                .                .
.                .       PingReq +o                .                .                .
.                <---------------+.                .                .                .
.                o+ PingRsp       .                .                .                .
.                .+--------------->                .                .                .
B                C                D                E                F                a
K1:2           K1:2             K1:2             K1:2             K1:2 

Building Block: Vector Clocks (s4.4)

A vector clock is easy to implement; it's basically a dictionary whose keys are nodes and whose values are the last-seen sequence number for that node.

class VectorClock(object):
    def __init__(self):
        self.clock = {}  # node => counter

    def update(self, node, counter):
        """Add a new node:counter value to a VectorClock."""
        if node in self.clock and counter <= self.clock[node]:
            raise Exception("Node %s has gone backwards from %d to %d" %
                            (node, self.clock[node], counter))
        self.clock[node] = counter
        return self  # allow chaining of .update() operations

    def __str__(self):
        return "{%s}" % ", ".join(["%s:%d" % (node, self.clock[node])
                                   for node in sorted(self.clock.keys())])

We can add entries to the vector clock, simulating different nodes and their counters, with one proviso: a node's counter isn't allowed to go backwards.

>>> from vectorclock import VectorClock
>>> v = VectorClock()
>>> print v
{}
>>> v.update('A', 1)
>>> print v
{A:1}
>>> v.update('A', 3)
>>> print v
{A:3}
>>> v.update('B', 1001)
>>> print v
{A:3, B:1001}
>>> v.update('B', 1002)
>>> print v
{A:3, B:1002}
>>> v.update('B', 1)
Traceback (most recent call last):
  File "", line 1, in 
  File "/Users/dmd/projects/pynamo/vectorclock.py", line 15, in update
    (node, self.clock[node], counter))
Exception: Node B has gone backwards from 1002 to 1

We can also define an ordering operation < on vector clocks (together with all the other comparison operations) by adding the relevant rich comparison methods to the VectorClock class.

    # Comparison operations. Vector clocks are partially ordered, but not totally ordered.
    def __eq__(self, other):
        return self.clock == other.clock

    def __lt__(self, other):
        for node in self.clock:
            if node not in other.clock:
                return False
            if self.clock[node] > other.clock[node]:
                return False
        return True

    def __ne__(self, other):
        return not (self == other)

    def __le__(self, other):
        return (self == other) or (self < other)

    def __gt__(self, other):
        return (other < self)

    def __ge__(self, other):
        return (self == other) or (self > other)

This is a partial order:

but it's not a total order – it's possible for neither a < b nor b < a to hold.

>>> from vectorclock import VectorClock
>>> v1 = VectorClock().update('A', 1).update('B', 2)
>>> v2 = VectorClock().update('A', 2).update('B', 3)
>>> print (v1 < v2)
True
>>> v3 = VectorClock().update('X', 1).update('Y', 2)
>>> print (v1 < v3)
False
>>> print (v3 < v1)
False

This partial order forms the basis for the more important thing we can do with vector clocks – combine them.

    @classmethod
    def coalesce(cls, vcs):
        """Coalesce a container of VectorClock objects.

        The result is a list of VectorClocks; each input VectorClock is a direct
        ancestor of one of the results, and no result entry is a direct ancestor
        of any other result entry."""
        results = []
        for vc in vcs:
            # See if this vector-clock subsumes or is subsumed by anything already present
            subsumed = False
            for ii, result in enumerate(results):
                if vc <= result:  # subsumed by existing answer
                    subsumed = True
                    break
                if result < vc:  # subsumes existing answer so replace it
                    results[ii] = copy.deepcopy(vc)
                    subsumed = True
                    break
            if not subsumed:
                results.append(copy.deepcopy(vc))
        return results

This operation folds together those vector clocks that are direct ancestors of each other, but keeping separate those that are not.

>>> print v1, v2, v3
{A:1, B:2} {A:2, B:3} {X:1, Y:2}
>>> diverged_clocks = VectorClock.coalesce((v1, v2, v3))
>>> for vc in diverged_clocks: print vc
{A:2, B:3}
{X:1, Y:2}
>>> print (diverged_clocks[0] < diverged_clocks[1])
False
>>> print (diverged_clocks[1] < diverged_clocks[0])
False

Finally, we need to be able to build a single vector clock that has an arbitrary set of direct ancestors; in other words, a way of reconverging a divergent set of vector clocks.

    @classmethod
    def converge(cls, vcs):
        """Return a single VectorClock that subsumes all of the input VectorClocks"""
        result = cls()
        for vc in vcs:
            if vc is None:
                continue
            for node, counter in vc.clock.items():
                if node in result.clock:
                    if result.clock[node] < counter:
                        result.clock[node] = counter
                else:
                    result.clock[node] = counter
        return result

The Dynamo system doesn't (and can't) solve the general problem of resolving inconsistencies; that's pushed out to the code that uses the system, and which is aware of the meaning of the data that is opaque to Dynamo. But when that application code has resolved an inconsistency, this converge method builds a vector clock that makes it clear that the inconsistency has been resolved.

>>> converged_clock = VectorClock.converge(diverged_clocks)
>>> print converged_clock
{A:2, B:3, X:1, Y:2}
>>> print (v1 < converged_clock)
True
>>> print (v2 < converged_clock)
True
>>> print (v3 < converged_clock)
True

The Dynamo paper also discusses the problem that the size of vector clocks can become large as more and more nodes get involved in the history of changes to a particular key/value. To get around this, they suggest keeping a timestamp along with the sequence number, and throwing away the oldest entry in a vector clock when it has more than 10 entries. This is easily implemented as a subclass of VectorClock, but we won't bother using this variant from here on.

#!/usr/bin/env python
"""Vector clock class with truncation support"""
import sys
import time
from vectorclock import VectorClock


class VectorClockTimestamp(VectorClock):
    NODE_LIMIT = 10

    def __init__(self):
        super(VectorClockTimestamp, self).__init__()
        self.clock_time = {}  # node => timestamp

    def _maybe_truncate(self):
        if len(self.clock_time) < VectorClockTimestamp.NODE_LIMIT:
            return
        # Find the oldest entry
        oldest_node = None
        oldest_time = sys.maxint
        for node, when in self.clock_time.items():
            if when < oldest_time:
                oldest_node = node
                oldest_time = when
        del self.clock_time[oldest_node]
        del self.clock[oldest_node]

    def update(self, node, counter):
        VectorClock.update(self, node, counter)
        self.clock_time[node] = time.time()
        self._maybe_truncate()
        return self

Detecting Divergence

The vector clocks of the previous section allow Dynamo to detect when there have been distinct, inconsistent updates for a particular key. The vector clock is held in the metadata associated with the key, and it is the responsibility of the Dynamo client to indicate the version of the data that it is updating. In practical terms, this means that the client needs to preceded every Put operation with a Get operation to retrieve the appropriate metadata.

B                C                D                E                F                a
.                .                .                .                ClientGet(K1=?) +o
.                .                <-------------------------------------------------+.
.                .  GetReq(K1=?) +o                .                .                .
.                . GetReq(K1=?) +-o                .                .                .
.                .GetReq(K1=?) +--o                .                .                .
.                .             ||+>                .                .                .
.           GetRsp(K1=None@None) +o                .                .                .
<-------------------------------+|.                .                .                .
o+ GetRsp(K1=None@None)        | |.                .                .                .
.|               <-------------+ |.                .                .                .
.|               o+ GetRsp(K1=None@None)           .                .                .
.|               .|              +>                .                .                .
.+-------------------------------->                .                .                .
.                .|               o+ ClientGetRsp(K1=[None]@[{}])   .                .
.                .+--------------->|               .                .                .
.                .                .+------------------------------------------------->
.                .                .                .             ClientPut(K1=1@{}) +o
.                .                <-------------------------------------------------+.
.             PutReq(K1=1@{D:2}) +o                .                .                .
.            PutReq(K1=1@{D:2}) +-o                .                .                .
.           PutReq(K1=1@{D:2}) +--o                .                .                .
.                .             ||+>                .                .                .
.             PutRsp(K1=1@{D:2}) +o                .                .                .
<-------------------------------+|.                .                .                .
o+ PutRsp(K1=1@{D:2})          | |.                .                .                .
.|               <-------------+ |.                .                .                .
.|               o+ PutRsp(K1=1@{D:2})             .                .                .
.|               .|              +>                .                .                .
.+-------------------------------->                .                .                .
.                .|               o+ ClientPutRsp(K1=1@{D:2})       .                .
.                .+--------------->|               .                .                .
.                .                .+------------------------------------------------->
.                .                .                .                ClientGet(K1=?) +o
.                .                <-------------------------------------------------+.
.                .  GetReq(K1=?) +o                .                .                .
.                . GetReq(K1=?) +-o                .                .                .
.                .GetReq(K1=?) +--o                .                .                .
.                .             ||+>                .                .                .
.             GetRsp(K1=1@{D:2}) +o                .                .                .
<-------------------------------+|.                .                .                .
o+ GetRsp(K1=1@{D:2})          | |.                .                .                .
.|               <-------------+ |.                .                .                .
.|               o+ GetRsp(K1=1@{D:2})             .                .                .
.|               .|              +>                .                .                .
.+-------------------------------->                .                .                .
.                .|               o+ ClientGetRsp(K1=[1]@[{D:2}])   .                .
.                .+--------------->|               .                .                .
.                .                .+------------------------------------------------->
.                .                .                .          ClientPut(K1=2@{D:2}) +o
.                .                <-------------------------------------------------+.
.             PutReq(K1=2@{D:4}) +o                .                .                .
.            PutReq(K1=2@{D:4}) +-o                .                .                .
.           PutReq(K1=2@{D:4}) +--o                .                .                .
.                .             ||+>                .                .                .
.             PutRsp(K1=2@{D:4}) +o                .                .                .
<-------------------------------+|.                .                .                .
o+ PutRsp(K1=2@{D:4})          | |.                .                .                .
.|               <-------------+ |.                .                .                .
.|               o+ PutRsp(K1=2@{D:4})             .                .                .
.|               .|              +>                .                .                .
.+-------------------------------->                .                .                .
.                .|               o+ ClientPutRsp(K1=2@{D:4})       .                .
.                .+--------------->|               .                .                .
.                .                .+------------------------------------------------->
B                C                D                E                F                a
K1:2           K1:2             K1:2 

For convenience, to cope with the situation where a client holds on to a key and makes multiple updates, we also return the new metadata on a Put response so it can be used for a subsequent Put:

B                C                D                E                F                a
.                .                .                .                ClientGet(K1=?) +o
.                .                <-------------------------------------------------+.
.                .  GetReq(K1=?) +o                .                .                .
.                . GetReq(K1=?) +-o                .                .                .
.                .GetReq(K1=?) +--o                .                .                .
.                .             ||+>                .                .                .
.             GetRsp(K1=2@{D:4}) +o                .                .                .
<-------------------------------+|.                .                .                .
o+ GetRsp(K1=2@{D:4})          | |.                .                .                .
.|               <-------------+ |.                .                .                .
.|               o+ GetRsp(K1=2@{D:4})             .                .                .
.|               .|              +>                .                .                .
.+-------------------------------->                .                .                .
.                .|               o+ ClientGetRsp(K1=[2]@[{D:4}])   .                .
.                .+--------------->|               .                .                .
.                .                .+------------------------------------------------->
.                .                .                .          ClientPut(K1=3@{D:4}) +o
.                .                <-------------------------------------------------+.
.             PutReq(K1=3@{D:6}) +o                .                .                .
.            PutReq(K1=3@{D:6}) +-o                .                .                .
.           PutReq(K1=3@{D:6}) +--o                .                .                .
.                .             ||+>                .                .                .
.             PutRsp(K1=3@{D:6}) +o                .                .                .
<-------------------------------+|.                .                .                .
o+ PutRsp(K1=3@{D:6})          | |.                .                .                .
.|               <-------------+ |.                .                .                .
.|               o+ PutRsp(K1=3@{D:6})             .                .                .
.|               .|              +>                .                .                .
.+-------------------------------->                .                .                .
.                .|               o+ ClientPutRsp(K1=3@{D:6})       .                .
.                .+--------------->|               .                .                .
.                .                .+------------------------------------------------->
.                .                .                .          ClientPut(K1=4@{D:6}) +o
.                .                <-------------------------------------------------+.
.             PutReq(K1=4@{D:7}) +o                .                .                .
.            PutReq(K1=4@{D:7}) +-o                .                .                .
.           PutReq(K1=4@{D:7}) +--o                .                .                .
.                .             ||+>                .                .                .
.             PutRsp(K1=4@{D:7}) +o                .                .                .
<-------------------------------+|.                .                .                .
o+ PutRsp(K1=4@{D:7})          | |.                .                .                .
.|               <-------------+ |.                .                .                .
.|               o+ PutRsp(K1=4@{D:7})             .                .                .
.|               .|              +>                .                .                .
.+-------------------------------->                .                .                .
.                .|               o+ ClientPutRsp(K1=4@{D:7})       .                .
.                .+--------------->|               .                .                .
.                .                .+------------------------------------------------->
B                C                D                E                F                a
K1:4           K1:4             K1:4 

The Dynamo node that acts as the coordinator for the key updates the vector clock with its own sequence numbers:

      def rcv_clientput(self, msg):
          preference_list, avoided = DynamoNode.chash.find_nodes(msg.key, DynamoNode.N, self.failed_nodes)
          # Only track avoided nodes that would have been part of the original preference list
          avoided = avoided[:DynamoNode.N]
          non_extra_count = DynamoNode.N - len(avoided)
          # Determine if we are in the list
          if self not in preference_list:
              # Forward to the coordinator for this key
              coordinator = preference_list[0]
              Framework.forward_message(msg, coordinator)
          else:
              # Use an incrementing local sequence number to distinguish
              # multiple requests for the same key
              seqno = self.generate_sequence_number()
-             metadata = (self.name, seqno)  # For now, metadata is just sequence number at coordinator
+             # The metadata for a key is passed in by the client, and updated by the coordinator node.
+             metadata = copy.deepcopy(msg.metadata)
+             metadata.update(self.name, seqno)
              # Send out to preference list, and keep track of who has replied
              self.pending_req[PutReq][seqno] = set()
              self.pending_put_rsp[seqno] = set()
              self.pending_put_msg[seqno] = msg
              reqcount = 0
              for ii, node in enumerate(preference_list):
                  if ii >= non_extra_count:
                      # This is an extra node that's only include because of a failed node
                      handoff = avoided
                  else:
                      handoff = None
                  # Send message to get node in preference list to store
                  putmsg = PutReq(self, node, msg.key, msg.value, metadata, msg_id=seqno, handoff=handoff)
                  self.pending_req[PutReq][seqno].add(putmsg)
                  Framework.send_message(putmsg)
                  reqcount = reqcount + 1
                  if reqcount >= DynamoNode.N:
                      # preference_list may have more than N entries to allow for failed nodes
                      break
  

For a straightforward node failure, the vector clock doesn't really add anything. Continuing from the sequences above, a subsequent Put then a Get still return a single consistent value – albeit with a vector clock that indicates multiple Dynamo nodes have been involved in the history of this key.

B                C                D                E                F                a
.                .              FAIL               .                .                .
.                .                x                .         ClientPut(K1=11@{D:7}) +o
<-----------------------------------------------------------------------------------+.
o+ PutReq(K1=11@{B:1, D:7})       x                .                .                .
o-+ PutReq(K1=11@{B:1, D:7})      x                .                .                .
o--+ PutReq(K1=11@{B:1, D:7})     x                .                .                .
.+--------------------------------X                .                .                .
<-+|             .                x                .                .                .
o+ PutRsp(K1=11@{B:1, D:7})       x                .                .                .
.| +------------->                x                .                .                .
.|              +o PutRsp(K1=11@{B:1, D:7})        .                .                .
<+              |.                x                .                .                .
<---------------+.                x                .                .                .
o+ ClientPutRsp(K1=11@{B:1, D:7}) x                .                .                .
.+----------------------------------------------------------------------------------->
.                .                x                .                ClientGet(K1=?) +o
<-----------------------------------------------------------------------------------+.
o+ GetReq(K1=?)  .                x                .                .                .
o-+ GetReq(K1=?) .                x                .                .                .
o--+ GetReq(K1=?).                x                .                .                .
.+--------------------------------X                .                .                .
<-+|             .                x                .                .                .
o+ GetRsp(K1=11@{B:1, D:7})       x                .                .                .
.| +------------->                x                .                .                .
.|              +o GetRsp(K1=11@{B:1, D:7})        .                .                .
<+              |.                x                .                .                .
<---------------+.                x                .                .                .
o+ ClientGetRsp(K1=[11]@[{B:1, D:7}])              .                .                .
.+----------------------------------------------------------------------------------->
B                C                D                E                F                a
K1:11          K1:11            K1:4 

Of course, it's not just nodes that can fail – sometimes, the links between them fail too. The worst scenario that this leads to is network partition: half of the network is on one side of the partition, the other half is on the other side, and never the twain shall meet. Network partitions are particularly bad because different clients can update keys on the either side of the partition, resulting in different, incompatible, values for the keys.

b                A                B                C                D                E                F                a
.                .                .                .                .                .                ClientGet(K1=?) +o
.                .                .                .                <-------------------------------------------------+.
.                .                .                .  GetReq(K1=?) +o                .                .                .
.                .                .                . GetReq(K1=?) +-o                .                .                .
.                .                .                .GetReq(K1=?) +--o                .                .                .
.                .                .                .             ||+>                .                .                .
.                .                .           GetRsp(K1=None@None) +o                .                .                .
.                .                <-------------------------------+|.                .                .                .
.                .                o+ GetRsp(K1=None@None)        | |.                .                .                .
.                .                .|               <-------------+ |.                .                .                .
.                .                .|               o+ GetRsp(K1=None@None)           .                .                .
.                .                .|               .|              +>                .                .                .
.                .                .+-------------------------------->                .                .                .
.                .                .                .|               o+ ClientGetRsp(K1=[None]@[{}])   .                .
.                .                .                .+--------------->|               .                .                .
.                .                .                .                .+------------------------------------------------->
.                .                .                .                .                .             ClientPut(K1=1@{}) +o
.                .                .                .                <-------------------------------------------------+.
.                .                .             PutReq(K1=1@{D:2}) +o                .                .                .
.                .                .            PutReq(K1=1@{D:2}) +-o                .                .                .
.                .                .           PutReq(K1=1@{D:2}) +--o                .                .                .
.                .                .                .             ||+>                .                .                .
.                .                .             PutRsp(K1=1@{D:2}) +o                .                .                .
.                .                <-------------------------------+|.                .                .                .
.                .                o+ PutRsp(K1=1@{D:2})          | |.                .                .                .
.                .                .|               <-------------+ |.                .                .                .
.                .                .|               o+ PutRsp(K1=1@{D:2})             .                .                .
.                .                .|               .|              +>                .                .                .
.                .                .+-------------------------------->                .                .                .
.                .                .                .|               o+ ClientPutRsp(K1=1@{D:2})       .                .
.                .                .                .+--------------->|               .                .                .
.                .                .                .                .+------------------------------------------------->
 ********************************** Cut ['b', 'A', 'B', 'C'] -> ['D', 'E', 'F', 'a'] ********************************** 
 ********************************** Cut ['D', 'E', 'F', 'a'] -> ['b', 'A', 'B', 'C'] ********************************** 
.                .                .                .                .                .         ClientPut(K1=11@{D:2}) +o
.                .                .                .                <-------------------------------------------------+.
.                .                .            PutReq(K1=11@{D:3}) +o                .                .                .
.                .                .           PutReq(K1=11@{D:3}) +-o                .                .                .
.                .                .          PutReq(K1=11@{D:3}) +--o                .                .                .
.                .                .                .             ||+>                .                .                .
.                .                .            PutRsp(K1=11@{D:3}) +o                .                .                .
.                .                .                .             |X|.                .                .                .
.                .                .                .             X |.                .                .                .
.                .                .                .               +>                .                .                .
.                .                .                .            Timer:Pop            .                .                .
.                .                .                .                o+ PutReq(K1=11@{D:3})            .                .
.                .                .                .                .+-------------------------------->                .
.                .                .                .                .            PutRsp(K1=11@{D:3}) +o                .
.                .                .                .                <--------------------------------+.                .
.                .                .                .                o+ ClientPutRsp(K1=11@{D:3})      .                .
.                .                .                .                .+------------------------------------------------->
.                .                .                .            Timer:Pop            .                .                .
o+ ClientGet(K1=?)                .                .                .                .                .                .
.X               .                .                .                .                .                .                .
Timer:Pop        .                .                .                .                .                .                .
o+ ClientGet(K1=?)                .                .                .                .                .                .
.+-------------------------------->                .                .                .                .                .
.                .                o+ GetReq(K1=?)  .                .                .                .                .
.                .  GetReq(K1=?) +o|               .                .                .                .                .
.                .               |o-+ GetReq(K1=?) .                .                .                .                .
.                .               |.X|              .                .                .                .                .
.                .               +> |              .                .                .                .                .
.             GetRsp(K1=1@{D:2}) +o |              .                .                .                .                .
.                .               |. +-------------->                .                .                .                .
.                .             GetRsp(K1=1@{D:2}) +o                .                .                .                .
.                .               +>               |.                .                .                .                .
.                .                <---------------+.                .                .                .                .
.   ClientGetRsp(K1=[1]@[{D:2}]) +o                .                .                .                .                .
<--------------------------------+.                .                .                .                .                .
.                .            Timer:Pop            .                .                .                .                .
o+ ClientPut(K1=21@{D:2})         .                .                .                .                .                .
.+--------------->                .                .                .                .                .                .
.                ++ ClientPut(K1=21@{D:2})         .                .                .                .                .
.                .X               .                .                .                .                .                .
Timer:Pop        .                .                .                .                .                .                .
o+ ClientPut(K1=21@{D:2})         .                .                .                .                .                .
.+------------------------------------------------->                .                .                .                .
.                .                .                o+ PutReq(K1=21@{C:1, D:2})       .                .                .
.                .       PutReq(K1=21@{C:1, D:2}) +o|               .                .                .                .
.                .      PutReq(K1=21@{C:1, D:2}) +-o|               .                .                .                .
.                .                .              ||.X               .                .                .                .
.                .                <---------------+.                .                .                .                .
.                .                o+ PutRsp(K1=21@{C:1, D:2})       .                .                .                .
.                .                .|             +->                .                .                .                .
.                .       PutRsp(K1=21@{C:1, D:2}) +o                .                .                .                .
.                .                .+--------------->                .                .                .                .
.                .                .               +>                .                .                .                .
.                . ClientPutRsp(K1=21@{C:1, D:2}) +o                .                .                .                .
<-------------------------------------------------+.                .                .                .                .
b                A                B                C                D                E                F                a
                                K1:21            K1:21            K1:11                             K1:11 

This is where the vector clock helps; after the network is repaired, the next Get operation returns a divergent set of metadata – [11, 21]@[{D:3},{C:1, D:2}] indicating two possible values for the key, with two associated vector clocks. This indicates that there are inconsistent values for the key that need to be dealt with.

b                A                B                C                D                E                F                a
 ********************************************** Repair network partition ********************************************** 
.                .            Timer:Pop            .                .                .                .                .
.                .                o+ PingReq       .                .                .                .                .
.                .                .+-------------------------------->                .                .                .
.                .                .                .       PingRsp +o                .                .                .
.                .                <--------------------------------+.                .                .                .
.                .                .            Timer:Pop            .                .                .                .
.                .                .                o+ PingReq       .                .                .                .
.                .                .                .+--------------->                .                .                .
.                .                .                .       PingRsp +o                .                .                .
.                .                .                <---------------+.                .                .                .
.                .                .                .            Timer:Pop            .                .                .
.                .                .                .       PingReq +o                .                .                .
.                .                <--------------------------------+.                .                .                .
.                .                o+ PingRsp       .                .                .                .                .
.                .                .+-------------------------------->                .                .                .
.                .                .                .            Timer:Pop            .                .                .
.                .                .                .       PingReq +o                .                .                .
.                .                .                <---------------+.                .                .                .
.                .                .                o+ PingRsp       .                .                .                .
.                .                .                .+--------------->                .                .                .
.                .                .                .                .                .                ClientGet(K1=?) +o
.                .                .                <------------------------------------------------------------------+.
.                .                .                o+ GetReq(K1=?)  .                .                .                .
.                .                .  GetReq(K1=?) +o|               .                .                .                .
.                .                . GetReq(K1=?) +-o|               .                .                .                .
.                .                .              ||.+--------------->                .                .                .
.                .                .            GetRsp(K1=11@{D:3}) +o                .                .                .
.                .                <---------------+.               |.                .                .                .
.                .                o+ GetRsp(K1=21@{C:1, D:2})      |.                .                .                .
.                .                .|             +->               |.                .                .                .
.                .       GetRsp(K1=21@{C:1, D:2}) +o               |.                .                .                .
.                .                .|              |<---------------+.                .                .                .
.                .                .+--------------->                .                .                .                .
.                .                .               |o+ ClientGetRsp(K1=[21, 11]@[{C:1, D:2},{D:3}])    .                .
.                .                .               +>|               .                .                .                .
.                .                .                .+------------------------------------------------------------------>
b                A                B                C                D                E                F                a
                                K1:21            K1:21            K1:11                             K1:11 

Repairing Divergence

The data stored by Dynamo is opaque to the system, which means that Dynamo itself has no way of figuring out how to deal with the inconsistent data. Therefore, the problem is pushed out to the client: the client has to figure it out, and the next Put message that includes a divergent set of vector clocks is assumed to subsume them all.

  class DynamoClientNode(Node):
      timer_priority = 17
+ 
+     def __init__(self, name=None):
+         super(DynamoClientNode, self).__init__(name)
+         self.last_msg = None  # Track last received message
  
      def put(self, key, metadata, value, destnode=None):
          if destnode is None:  # Pick a random node to send the request to
              destnode = random.choice(DynamoNode.nodelist)
+         # Input metadata is always a sequence, but we always need to insert a
+         # single VectorClock object into the ClientPut message
+         if len(metadata) == 1 and metadata[0] is None:
+             metadata = VectorClock()
+         else:
+             # A Put operation always implies convergence
+             metadata = VectorClock.converge(metadata)
          putmsg = ClientPut(self, destnode, key, value, metadata)
          Framework.send_message(putmsg)
+         return putmsg
  
      def get(self, key, destnode=None):
          if destnode is None:  # Pick a random node to send the request to
              destnode = random.choice(DynamoNode.nodelist)
          getmsg = ClientGet(self, destnode, key)
          Framework.send_message(getmsg)
+         return getmsg
  
      def rsp_timer_pop(self, reqmsg):
          if isinstance(reqmsg, ClientPut):  # retry
-             self.put(reqmsg.key, reqmsg.metadata, reqmsg.value)
+             self.put(reqmsg.key, [reqmsg.metadata], reqmsg.value)
?                                  +               +

          elif isinstance(reqmsg, ClientGet):  # retry
              self.get(reqmsg.key)
  

Following on from the sequences of the last section, this results in a Put operation with a single converged vector clock {C:1, D:3} .

b                A                B                C                D                E                F                a
.                .                .                .                .                .   ClientPut(K1=101@{C:1, D:3}) +o
.                .                .                .                .                .                <---------------+.
.                .                .                .                .   ClientPut(K1=101@{C:1, D:3}) ++                .
.                .                .                .                <--------------------------------+.                .
.                .                .      PutReq(K1=101@{C:1, D:4}) +o                .                .                .
.                .                .     PutReq(K1=101@{C:1, D:4}) +-o                .                .                .
.                .                .    PutReq(K1=101@{C:1, D:4}) +--o                .                .                .
.                .                .                .             ||+>                .                .                .
.                .                .      PutRsp(K1=101@{C:1, D:4}) +o                .                .                .
.                .                <-------------------------------+|.                .                .                .
.                .                o+ PutRsp(K1=101@{C:1, D:4})   | |.                .                .                .
.                .                .|               <-------------+ |.                .                .                .
.                .                .|               o+ PutRsp(K1=101@{C:1, D:4})      .                .                .
.                .                .|               .|              +>                .                .                .
.                .                .+-------------------------------->                .                .                .
.                .                .                .|               o+ ClientPutRsp(K1=101@{C:1, D:4}).                .
.                .                .                .+--------------->|               .                .                .
.                .                .                .                .+------------------------------------------------->
b                A                B                C                D                E                F                a
                               K1:101           K1:101           K1:101                             K1:11 

What's Left

In theory, there's no difference between theory and practice. In practice there is.

We've implemented some of the key ideas from the Dynamo paper here, but we haven't implemented everything in the paper. There's a summary of the parts we've skipped below; the length of this list gives an indication of just how many reality-based complications are involved in turning Dynamo into a real implementation.

Appendix: Node Simulation Framework

The code here is built on a framework that simulates nodes and the messages between them. The modules for this are divided up as follows.

Epilog: Code Location & Licenses

A full copy of this project (text, source, scripts) can be downloaded at GitHub or downloads as a tarball here. The text is available under the GFDL 1.3, the code is available under version 2 of the GPL.


Copyright (c) 2010-2012, David Drysdale

Permission is granted to copy, distribute and/or modify this document under the terms of the GNU Free Documentation License, Version 1.3 or any later version published by the Free Software Foundation; with no Invariant Sections, with no Front-Cover Texts, and with no Back-Cover Texts. A copy of the license is available here.


Back to Home Page


Contact me