Implementing Raft with Go

Following my previous post Raft, from an engineering perspective, I gathered some thoughts on related topics.

Latency and RPCs

Latency matters in a distributed system

Latency is tied directly to availability. The larger the latency, the longer the period the system is unavailable. RPC latency compounds quickly when multiple rounds of RPCs are needed, i.e. during a log entry disagreement. In Raft, latency is mostly caused by the network. A lock (mutex, spinlock etc.) usually adds less than 5ms of latency, whereas an RPC could be a couple of times as slow. Ironically, we can run hundreds of thousands of instructions in that time on most consumer CPUs today. There is a big price to pay to run distributed systems.

Channels and latency

Go channels can be a drag in a latency-sensitive environment. The sending party usually goes into sleep immediately after putting a message into a channel. The receiving end, on the other hand, may not be awoken immediately.

I noticed this pattern when counting election votes. I have a goroutine watching the RPC response of RequestVote. It sends a "please count this vote" message to another goroutine after receiving the RPC response. What often happens is that the first goroutine would print "I received a vote", but the corresponding "I counted this vote" message never shows up before the next election starts. The scheduling behavior is a bit mysterious to me. In contrast, the cause and effect is much more clear and direct when I use a condition variable.

RPCs must be retried

RPC failures can be common when the network is unstable. Retrying RPCs helps mitigate the risk of staying unavailable longer than necessary.

The downside of RPC retries is that if everybody is retrying with high frequency, a lot of network bandwidth will be wasted on sending the same RPCs over and over again. The solution is exponential backoff. The first retry should be n ms after an RPC failure, the second retry should be 2n ms after that, then 4n and so on. The wait time grows quickly, thus only a linear amount of RPC (2/n) would be sent in a unit of time.

Another down side of retries is the "stay down" situation. When an RPC server is down, the RPCs ought to be processed during that time will be withheld by clients. When the server is up again, a huge amount of retries arrive at the exact same time. That could easily overwhelm the RPC server's buffer, and cause it to die again. The solution is to add a randomized component to exponential backoff. That way RPCs will arrive at slightly different times, allowing some processing time. Together the technique is called randomized exponential backoff, and is proven to be effective.

No unnecessary RPCs

The other extreme of handling RPCs is sending as many of them as possible. This is obviously bad, especially when the network is congested already. I once tried to make the leader send one heartbeat immediately after another, to minimize new elections when heartbeats are dropped by the network. It turns out even a software simulated network has its bandwidth limit. The leader ended up not being able to do anything other than sending heartbeats.

Another experiment I did was worse. In my implementation, the leader starts syncing logs whenever a new entry is created by clients. Accidentally I made it backtrack (i.e. sending another RPC with more logs) when the RPC timeouts, and at the same time retry the original RPC as well. The number of RPCs blowed up exponentially, because one failed RPC causes two being sent. The goroutine resource was quickly exhausted when a few new entries were added at the same time. That is, I reached the maximum number of goroutines that can be created. I have since reverted to retrying in an RPC failure, and backtracking only when the recipient disagrees.

Condition Variables

Mutexes and condition variables are handy

Mutex is a well known concept. Though I have never heard of condition variables before, at least not in the form that comes with Go. A condition variable is like a semaphore, in the sense that you can wait and signal on it (PV Primitives. The difference (among others) is that after a signal is received, a condition variable automatically acquires the lock associated with it. Acquiring the lock is both convenient, and a requirement for correctness. The reasoning is that the condition can only be safely evaluated while holding the lock, so that the data underneath the condition is protected.

When implementing Raft, there are many places where we need to wait on a condition. Once that condition is met, we need to acquire the global lock that guards core states. Condition variables fit perfectly into this type of usage.

But sometimes I only need a semaphore ...

There is one case where I don't need the locking part of a condition variable: the election. After sending out requests to vote, the election goroutine will block until one of the following things happen

  1. Enough votes for me are collected,
  2. Enough votes against me are collected,
  3. Someone has started a new election, or
  4. We are being shut down.

Those four things can be easily implemented with atomic integer counters. We do not need to acquire the global lock to access or modify those counters. What I really need is a semaphore that works on a goroutine.

It can be argued that after one of those things happen, we might need to acquire the global lock, if we are elected. That is true. Depending on the network environment, being elected may or may not be the dominating result of an election. The line is a bit blurry.

I ended up creating a lock just for the election. The lock is also used to guarantee there is only one election running, which is a nice side effect.

Signal() is not always required?

It appears that in some systems other than Go, condition variables can unblock without anyone calling Signal(). The doc of Wait() says

Unlike in other systems, Wait cannot return unless awoken by Broadcast or Signal.

I'm wondering why that is. In those systems, extra caution should be taken before an awoken goroutine makes any moves.

Critical Sections

Critical sections are the code between acquiring a lock and releasing it. No lock should be held for too long, for obvious reasons. The code within critical sections thus must be short.

When implementing Raft, it is rather easy to keep it short. The most complex thing that requires holding the global lock is copying an entire array of log entries. Other than that, the only thing left is copying integers in core states. Occasionally goroutines are created while holding the lock, which is not ideal. I’m just too lazy to optimize it away.

Goroutine is not in the critical section

In the snippet below, given that the caller holds the lock, is the code in the goroutine within the critical section?

rf.mu.Lock()
go func() {
    term := rf.currentTerm // Are we in the critical section?
}
rf.mu.Unlock()

The answer is no. The goroutine can run at any time in the future, maybe long after the lock is released on the last line. There is no guarantee that the lock is still being held by the caller of the Go routine. Even if it does, the goroutine is still asynchronous to its caller.

As a principle, do not access protected fields in a goroutine, or hold the lock when doing so.

Miscellaneous

There is potentially a bug in the testing framework

The potential bug causes new instances not being added to the simulated network.

If I run the 'basic persistent' test in 2C 100 times, when a server is supposed to be restarted and connected to the network, there is a ~2% chance none of the other servers could reach it. I know the other servers were alive, because they were sending and receiving heartbeats. The tests usually ended up with timing-outes after 10 minutes. This error happens more frequently if I increase the rate of sending RPCs.

It could also be a deadlock in my code. I have yet to formally prove the bug exists.

Disk delay was not emulated

In 6.824, persistent() is implemented in memory. I can pretty much call it as often as I want, without any performance impact. But in practise, disk delay can also be significant if sync() is called extensively.

Corrections

When writing all of those down, I noticed that while my implementation passes all the tests, some of it might not be the standard way of doing things in Raft. For example, log entry syncing should really be triggered by timer only. I plan to correct those behaviors, and please read at your own risk. :-)