Paper Wasp © MzePhotos.com, Some Rights Reserved

In recent work on PyMongo, I used a concurrency-control pattern that solves a variety of reader-writer problem without mutexes. It's similar to the read-copy-update technique used extensively in the Linux kernel. I'm dubbing it the Wasp's Nest. Stick with me—by the end of this post you'll know a neat concurrency pattern, and have a good understanding of how PyMongo handles replica set failovers.

Update: In this post's first version I didn't know how close my code is to "ready-copy-update". Robert Moore schooled me in the comments. I also named it "a lock-free concurrency pattern" and Steve Baptiste pointed out that I was using the term wrong. My algorithm merely solves a race condition without adding a mutex, it's not lock-free. I love this about blogging: in exchange for a little humility I get a serious education.


The Mission

MongoDB is deployed in "replica sets" of identical database servers. A replica set has one primary server and several read-only secondary servers. Over time a replica set's state can change. For example, if the primary's cooling fans fail and it bursts into flames, a secondary takes over as primary a few seconds later. Or a sysadmin can add another server to the set, and once it's synced up it becomes a new secondary.

I help maintain PyMongo, the Python driver for MongoDB. Its MongoReplicaSetClient is charged with connecting to the members of a set and knowing when the set changes state. Replica sets and PyMongo must avoid any single points of failure in the face of unreliable servers and networks—we must never assume any particular members of the set are available.

Consider this very simplified sketch of a MongoReplicaSetClient:

class Member(object):
    """Represents one server in the set."""
    def __init__(self, pool):
        # The connection pool.
        self.pool = pool

class MongoReplicaSetClient(object):
    def __init__(self, seeds):
        self.primary = None
        self.members = {}
        self.refresh()

        # The monitor calls refresh() every 30 sec.
        self.monitor = MonitorThread(self)

    def refresh(self):
        # If we're already connected, use our list of known
        # members. Otherwise use the passed-in list of
        # possible members, the 'seeds'.
        seeds = self.members.keys() or self.seeds

        # Try seeds until first success.
        ismaster_response = None
        for seed in seeds:
            try:
                # The 'ismaster' command gets info
                # about the whole set.
                ismaster_response = call_ismaster(seed)
                break
            except socket.error:
                # Host down / unresolvable, try the next.
                pass

        if not ismaster_response:
            raise ConnectionFailure()

        # Now we can discover the whole replica set.
        for host in ismaster_response['hosts']:
            pool = ConnectionPool(host)
            member = Member(pool)
            self.members[host] = member

        # Remove down members from dict.
        for host in self.members.keys():
            if host not in ismaster_response['hosts']:
                self.members.pop(host)

        self.primary = ismaster_response.get('primary')

    def send_message(self, message):
        # Send an 'insert', 'update', or 'delete'
        # message to the primary.
        if not self.primary:
            self.refresh()

        member = self.members[self.primary]
        pool = member.pool
        try:
            send_message_with_pool(message, pool)
        except socket.error:
            self.primary = None
            raise AutoReconnect()

We don't know which members will be available when our application starts, so we pass a "seed list" of hostnames to the MongoReplicaSetClient. In refresh, the client tries them all until it can connect to one and run the isMaster command, which returns information about all the members in the replica set. The client then makes a connection-pool for each member and records which one is the primary.

Once refresh finishes, the client starts a MonitorThread which calls refresh again every 30 seconds. This ensures that if we add a secondary to the set it will be discovered soon and participate in load-balancing. If a secondary goes down, refresh removes it from self.members. In send_message, if we discover the primary's down, we raise an error and clear self.primary so we'll call refresh the next time send_message runs.

The Bugs

PyMongo 2.1 through 2.5 had two classes of concurrency bugs: race conditions and thundering herds.

The race condition is easy to see. Look at the expression self.members[self.primary] in send_message. If the monitor thread runs refresh and pops a member from self.members while an application thread is executing the dictionary lookup, the latter could get a KeyError. Indeed, that is exactly the bug report we received that prompted my whole investigation and this blog post.

The other bug causes a big waste of effort. Let's say the primary server bursts into flames. The client gets a socket error and clears self.primary. Then a bunch of application threads all call send_message at once. They all find that self.primary is None, and all call refresh. This is a duplication of work that only one thread need do. Depending how many processes and threads we have, it has the potential to create a connection storm in our replica set as a bunch of heavily-loaded applications lurch to the new primary. It also compounds the race condition because many threads are all modifying the shared state. I'm calling this duplicated work a thundering herd problem, although the official definition of thundering herd is a bit different.

Fixing With A Mutex

We know how to fix race conditions: let's add a mutex! We could lock around the whole body of refresh, and lock around the expression self.members[self.primary] in send_message. No thread sees members and primary in a half-updated state.

...and why it's not ideal

This solution has two problems. The first is minor: the slight cost of acquiring and releasing a lock for every message sent to MongoDB, especially since it means only one thread can run that section of send_message at a time. A reader-writer lock alleviates the contention by allowing many threads to run send_message as long as no thread is running refresh, in exchange for greater complexity and cost for the single-threaded case.

The worse problem is the behavior such a mutex would cause in a very heavily multithreaded application. While one thread is running refresh, all threads running send_message will queue on the mutex. If the load is heavy enough our application could fail while waiting for refresh, or could overwhelm MongoDB once they're all simultaneously unblocked. Better under most circumstances for send_message to fail fast, saying "I don't know who the primary is, and I'm not going to wait for refresh to tell me." Failing fast raises more errors but keeps the queues small.

The Wasp's Nest Pattern

There's a better way, one that requires no locks, is less error-prone, and fixes the thundering-herd problem too. Here's what I did for PyMongo 2.5.1, which we'll release next week.

First, all information about the replica set's state is pulled out of MongoReplicaSetClient and put into an RSState object:

class RSState(object):
    def __init__(self, members, primary):
        self.members = members
        self.primary = primary

MongoReplicaSetClient gets one RSState instance that it puts in self.rsstate. This instance is immutable: no thread is allowed to change the contents, only to make a modified copy. So if the primary goes down, refresh doesn't just set primary to None and pop its hostname from the members dict. Instead, it makes a deep copy of the RSState, and updates the copy. Finally, it replaces the old self.rsstate with the new one.

Each of the RSState's attributes must be immutable and cloneable, too, which requires a very different mindset. For example, I'd been tracking each member's ping time using a 5-sample moving average and updating it with a new sample like so:

class Member(object):
    def add_sample(self, ping_time):
        self.samples = self.samples[-4:]
        self.samples.append(ping_time)
        self.avg_ping = sum(self.samples) / len(self.samples)

But if Member is immutable, then adding a sample means cloning the whole Member and updating it. Like this:

class Member(object):
    def clone_with_sample(self, ping_time):
        # Make a new copy of 'samples'
        samples = self.samples[-4:] + [ping_time]
        return Member(samples)

Any method that needs to access self.rsstate more than once must protect itself against the state being replaced concurrently. It has to make a local copy of the reference. So the racy expression in send_message becomes:

rsstate = self.rsstate  # Copy reference.
member = rsstate.members[rsstate.primary]

Since the rsstate cannot be modified by another thread, send_message knows its local reference to the state is safe to read.

A few summers ago I was on a Zen retreat in a rural house. We had paper wasps building nests under the eaves. The wasps make their paper from a combination of chewed-up plant fiber and saliva. The nest hangs from a single skinny petiole. It's precarious, but it seems to protect the nest from ants who want to crawl in and eat the larvae. The queen periodically spreads an ant-repellant secretion around the petiole; its slenderness conserves her ant-repellant, and concentrates it in a small area.

Wasp's Nest in Situ [Source]

I think of the RSState like a wasp's nest: it's an intricate structure hanging off the MongoReplicaSetClient by a single attribute, self.rsstate. The slenderness of the connection protects send_message from race conditions, just as the thin petiole protects the nest from ants.

Since I was fixing the race condition I fixed the thundering herd as well. Only one thread should run refresh after a primary goes down, and all other threads should benefit from its labor. I nominated the monitor to be that one thread:

class MonitorThread(threading.Thread):
    def __init__(self, client):
        threading.Thread.__init__(self)
        self.client = weakref.proxy(client)
        self.event = threading.Event()
        self.refreshed = threading.Event()

    def schedule_refresh(self):
        """Refresh immediately."""
        self.refreshed.clear()
        self.event.set()

    def wait_for_refresh(self, timeout_seconds):
        """Block until refresh completes."""
        self.refreshed.wait(timeout_seconds)

    def run(self):
        while True:
            self.event.wait(timeout=30)
            self.event.clear()

            try:
                try:
                    self.client.refresh()
                finally:
                    self.refreshed.set()
            except AutoReconnect:
                pass
            except:
                # Client was garbage-collected.
                break

(The weakref proxy prevents a reference cycle and lets the thread die when the client is deleted. The weird try-finally syntax is necessary in Python 2.4.)

The monitor normally wakes every 30 seconds to notice changes in the set, like a new secondary being added. If send_message discovers that the primary is gone, it wakes the monitor early by signaling the event it's waiting on:

rsstate = self.rsstate
if not rsstate.primary:
    self.monitor.schedule_refresh()
    raise AutoReconnect()

No matter how many threads call schedule_refresh, the work is only done once.

Any MongoReplicaSetClient method that needs to block on refresh can wait for the "refreshed" event:

rsstate = self.rsstate
if not rsstate.primary:
    self.monitor.schedule_refresh()
    self.monitor.wait_for_refresh(timeout_seconds=5)

# Get the new state.
rsstate = self.rsstate
if not rsstate.primary:
    raise AutoReconnect()

# Proceed normally....

This pattern mitigates the connection storm from a heavily-loaded application discovering that the primary has changed: only the monitor thread goes looking for the new primary. The others can abort or wait.

The wasp's nest pattern is a simple and high-performance solution to some varieties of reader-writer problem. Compared to mutexes it's easy to understand, and most importantly it's easy to program correctly. For further reading see my notes in the source code.

Paper wasp and nest [Source]