Cassandra’s Gossip Protocol

Lets learn while gossiping !😛

Cassandra’s Gossip Protocol

In my preparation for the Design Data-intensive application book. I faced Cassandra as a great storage system that cover everything I need to be ready to fight this book.

You may hear about RING & VNodes. We are going to cover those techniques in the upcoming blogs. Today we are going to prepare for them by going through Gossip Protocol first.

Cassandra

This is one of the most popular NoSQL distributed databases currently and after skimming through some usual online tutorials, I decided to learn more by diving into the source code itself. The code base is huge and so I had to pick a topic to dig deeper into, Gossip protocol was my choice, primarily because it is not just specific to Cassandra but is an important tool for Failure Detection, Monitoring, Messaging And Other Good Things in a distributed setup in general.

As a developer have always been intrigued by how the code for distributed systems is written and I am sure there are more inquisitive engineers like me, who have the skill set to understand the source code, but in general, never bother. My intention here is to demonstrate the implementation to some extent for such people who are interested in learning

Introduction

When building a system on top of a set of wildly uncooperative and unruly computers you have knowledge problems: knowing when other nodes are dead; knowing when nodes become alive; getting information about other nodes so you can make local decisions, like knowing which node should handle a request based on a scheme for assigning nodes to a certain range of users; learning about new configuration data; agreeing on data values; and so on.

  • Do you think we need A centralized database?

    • I think that would be a single point of failure, so a big NO

I think we need a super cool decentralized way to bring order to large clusters.

Hi, This is Gossip protocol.

Note: Different distributed systems have different ways to achieve this ( Kafka uses Zookeeper, ElasticSearch uses Mesh Topology Communication, etc).

Gossip Protocol

Gossip is a peer-to-peer communication protocol in which nodes periodically exchange state information about themselves and about other nodes they know about. The gossip process in Cassandra runs every second and exchanges state messages with other nodes in the cluster. Each node independently will always select one to three peers to gossip with. It will always select a live peer (if any) in the cluster, it will probabilistically pick a seed node from the cluster or maybe it will probabilistically select an unavailable node. The nodes exchange information about themselves and about the other nodes that they have gossiped about, so all nodes quickly learn about all other nodes in the cluster.

How can he do that?

Let's break things down!

Gossip messaging is very similar to the TCP three-way handshake. With a regular broadcast protocol, there could only have been one message per round, and the data can be allowed to gradually spread through the cluster. But with the gossip protocol, having three messages for each round adds a degree of anti-entropy. This process allows obtaining “convergence” of data shared between the two interacting nodes much faster.

Implementation Overview

Let's split this into two parts :

  • what exactly is exchanged as part of each msg: To begin with, simply consider each of the above 3 messages as a “payload”, encapsulating certain information.

  • the entities (in code) required to facilitate the transfer of information: Let's start with this first.

Each node has a central singleton entity, Gossiper. The method of initiating gossip is in the Gossiper.gossipTask.run method, which is runnable and executed once per second by a scheduled task in the upper layer.

Gossiper.endpointStateMap<InetAddress, EndPointState> holds the state of the entire cluster known to the current node. The key is the IP address of the corresponding node, and the value is EndPointState information which looks something like below.

HeartBeatState

Consists of generation and version number. Generation stays the same when the server is running and grows every time the node is started. Used for distinguishing state information before and after a node restart. The version number is shared with application states and guarantees ordering. Each node has one HeartBeatState associated with it.

public class VersionGenerator {
    private static final AtomicInteger version = new AtomicInteger(0);
    public static int getNextVersion() {
        return version.incrementAndGet();
    }
}

public class HeartBeatState implements Serializable {
    private volatile int generation;
    private volatile int version;
    HeartBeatState(int gen) {
        this(gen, 0);
    }
    public HeartBeatState(int gen, int ver) {
        generation = gen;
        version = ver;
    }

    int getGeneration() { return generation;}
    void updateHeartBeat() { version = VersionGenerator.getNextVersion(); }
    int getHeartBeatVersion() { return version; }
    void forceNewerGenerationUnsafe() { generation += 1; }
    void forceHighestPossibleVersionUnsafe() { version = Integer.MAX_VALUE; }
    public String toString() { return 
        String.format("HeartBeat: generation = %d, version = %d", generation, version); }
}

ApplicationState

Consists of state and version number and represents the state of a single "component" or "element" within Cassandra. For instance, the application state for "load information" could be (5.2, 45), which means that the node load is 5.2 at version 45. Similarly, a node that is bootstrapping would have a "bootstrapping" application state: (bxLpassF3XD8Kyks, 56) where the first one is a bootstrap token, and the second is a version. The version number is shared by application states and HeartBeatState to guarantee ordering and can only grow.

public enum ApplicationState {
    STATUS,
    LOAD,
    SCHEMA,
    DC,
    RACK,
    RELEASE_VERSION,
    REMOVAL_COORDINATOR,
    SEVERITY,
    NET_VERSION,
    HOST_ID,
    TOKENS,
    RPC_READY,
    // pad to allow adding new states to existing cluster
    INTERNAL_ADDRESS_AND_PORT, //Replacement for INTERNAL_IP with up to two ports
    NATIVE_ADDRESS_AND_PORT, //Replacement for RPC_ADDRESS
}

EndPointState

Includes all ApplicationStates and HeartBeatState for certain nodes. EndPointState can include only one of each type of ApplicationState, so if EndPointState already includes, say, load information, new load information will overwrite the old one. ApplicationState version number guarantees that the old value will not overwrite the new one.

import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

public class EndpointState {

    private volatile HeartBeatState hbState;
    private final AtomicReference<Map<ApplicationState, VersionedValue>> applicationState;

    /* fields below do not get serialized */
    private volatile long updateTimestamp;
    private volatile boolean isAlive;

    public EndpointState(HeartBeatState initialHbState) {
        this(initialHbState, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class));
    }
    EndpointState(HeartBeatState initialHbState, Map<ApplicationState, VersionedValue> states) {
        hbState = initialHbState;
        applicationState = new AtomicReference<Map<ApplicationState, VersionedValue>>(new EnumMap<ApplicationState,VersionedValue>(states));
        updateTimestamp = System.nanoTime();
        isAlive = true;
    }

    HeartBeatState getHeartBeatState() {
        return hbState;
    }
    void setHeartBeatState(HeartBeatState newHbState) { 
        updateTimestamp(); hbState = newHbState;
    }

    public VersionedValue getApplicationState(ApplicationState key) {
        return applicationState.get().get(key);
    }
    public Set<Map.Entry<ApplicationState, VersionedValue>> states() {
        return applicationState.get().entrySet();
    }
    public void addApplicationState(ApplicationState key, VersionedValue value) {
        addApplicationStates(Collections.singletonMap(key, value));
    }
    public void addApplicationStates(Map<ApplicationState, VersionedValue> values) {
        addApplicationStates(values.entrySet());
    }
    public void addApplicationStates(Set<Map.Entry<ApplicationState, VersionedValue>> values) 
    {
        while (true) {
           Map<ApplicationState, VersionedValue> orig = applicationState.get();
           Map<ApplicationState, VersionedValue> copy = new EnumMap<ApplicationState, VersionedValue>(orig);

           for (Map.Entry<ApplicationState, VersionedValue> value : values)
                copy.put(value.getKey(), value.getValue());

           if (applicationState.compareAndSet(orig, copy))
                return;
        }
    }
    public long getUpdateTimestamp() { 
        return updateTimestamp; 
    }
    void updateTimestamp() { 
        updateTimestamp = System.nanoTime(); 
    }
    public boolean isNormalState() {
        return getStatus().equals(VersionedValue.STATUS_NORMAL);
    }
    public String getStatus() {
        VersionedValue status = getApplicationState(ApplicationState.STATUS_WITH_PORT);
        return status.value;
    }
    public String toString() {
        return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get();
    }
}

endPointStateMap

Internal structure in Gossiper that has EndPointState for all nodes (including itself) that it has heard about.

GenericMessage class

One might think that we can have a Message base class with GossipDigestSynMessage, GossipDigestAckMessage & GossipDigestAck2Message as child classes. However, such an inheritance hierarchy will introduce tight coupling and any change will have a waterfall effect on all the related classes. To solve this, we will use composition.

A Message object will have a generic payload (which can be any one of GossipDigestSyn, GossipDigestAck, etc or any object in general we wish to transfer ~ the power of composition).

Also, it will have a Header (similar to an HTTP header). The payload will contain the actual object and the header will have metadata such as created_at, from, to and so on.

Another important thing to note is that gossip is in some way a form of event-driven programming, i.e. actions are to be performed in response to certain events. This can be further extended beyond just gossip to other features of Cassandra. To handle this generically, Cassandra has “verbs” and their respective “handlers”. On receipt of a message, a node analyses the associated verb and triggers the associated handler, passing the entire message object as an argument. The handlers will then extract the payload and carry on their execution.

Considering all the above points, listing the additional classes we need :

  • Verb (enum): Each verb will be mapped to its handler, in the enum declaration itself.

  • The header will have Verb, from, to, created_at fields

  • Message<T> (class). Two attributes — Header and payload.

  • As already mentioned, there will be a central singleton entity Gossiper.

// **************************** Verb.java ***********************************

public enum Verb {

    GOSSIP_DIGEST_SYN    (1, 0, GossipDigestSynVerbHandler.instance),
    GOSSIP_DIGEST_ACK    (2, 0, GossipDigestAckVerbHandler.instance),
    GOSSIP_DIGEST_ACK2   (3, 0, GossipDigestAck2VerbHandler.instance);

    public final int id;
    public final int priority;
    private final IVerbHandler handler;
    Verb(int id, int priority, IVerbHandler handler){
        if (id < 0)
            throw new IllegalArgumentException(
              "Verb id must be non-negative, got " + id + " for payload " + name());
        this.id = id;
        this.priority = priority;
        this.handler = handler;
    }

}


// **************************** Header.java ***********************************

import java.io.Serializable;
import java.util.Map;

public class Header implements Serializable {
    public final long id;
    public final Verb verb;
    public final InetAddressAndPort from;
    public final long createdAtNanos;
    public final long expiresAtNanos;
    public Header(long id, Verb verb, InetAddressAndPort from, 
                  long createdAtNanos, long expiresAtNanos)
    {
        this.id = id;
        this.verb = verb;
        this.from = from;
        this.createdAtNanos = createdAtNanos;
        this.expiresAtNanos = expiresAtNanos;
    }
}


// **************************** Message.java ***********************************

import java.io.Serializable;
import java.util.Date;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class Message<T> implements Serializable {

    public final Header header;
    public final T payload;
    private Message(Header header, T payload) {
        this.header = header;
        this.payload = payload;
    }

    /** Sender of the message. */
    public InetAddressAndPort from() {
        return header.from;
    }

    public long id() {
        return header.id;
    }

    public Verb verb() {
        return header.verb;
    }

    public static <T> Message<T> getNewMessage(Verb verb, T payload){
        return getNewMessage(nextId(), verb, 0, payload, null, null);
    }

    private static <T> Message<T> getNewMessage(long id, Verb verb, 
                                                long expiresAtNanos, T payload) 
    {

        if (payload == null)
            throw new IllegalArgumentException();

        Date date = new Date();
        InetAddressAndPort from = NetworkUtils.getBroadcastAddressAndPort();
        long createdAtNanos = date.getTime();
        if (expiresAtNanos == 0)
            expiresAtNanos = 100;

        return new Message<T>(new Header(id, verb, from, createdAtNanos, expiresAtNanos, payload);
    }

    private static final long NO_ID = 0L; // this is a valid ID for pre40 nodes
    private static final AtomicInteger nextId = new AtomicInteger(0);

    private static long nextId() {
        long id;
        do {
            id = nextId.incrementAndGet();
        } while (id == NO_ID);
        return id;
    }
}

Header has InetAddressPort which is nothing but the IP address and the port for a node as both of them in combination uniquely identify a node.

import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;

public final class InetAddressAndPort implements Serializable {

    private static final long serialVersionUID = 0;
    static volatile int defaultPort = 7000;

    private final InetAddress address;
    private final int port;

    public InetAddressAndPort() throws UnknownHostException {
        this.address = InetAddress.getLocalHost();
        this.port = defaultPort;
    }

    private InetAddressAndPort(InetAddress address, int port) {
        validatePortRange(port);
        this.address = address;
        this.port = port;
    }

    public static InetAddressAndPort getByAddress(InetAddress address){
        return getByAddressOverrideDefaults(address, null);
    }

    public static InetAddressAndPort getByAddressOverrideDefaults(InetAddress address, Integer port) {
        if (port == null) {
            port = defaultPort;
        }
        return new InetAddressAndPort(address, port);
    }

    public InetAddressAndPort withPort(int port) {
        return new InetAddressAndPort(address, port);
    }

    private static void validatePortRange(int port) {
        if (port < 0 | port > 65535)
        {
            throw new IllegalArgumentException("Port " + port + " is not a valid port number in the range 0-65535");
        }
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        InetAddressAndPort that = (InetAddressAndPort) o;

        if (port != that.port) return false;
        return address.equals(that.address);
    }

    @Override
    public int hashCode(){
        int result = address.hashCode();
        result = 31 * result + port;
        return result;
    }

    public String getHostAddress(boolean withPort) {
        if (withPort)
            return toString();
        else
            return address.getHostAddress();
    }

    @Override
    public String toString()
    {
        return toString(true);
    }

    public String toString(boolean withPort) {
        if (withPort)
            return toString(address, port);
        else
            return address.toString();
    }

    public static String toString(InetAddress address, int port) {
        return address.getHostAddress() + ":" + port;
    }

    public static String getLocalHost() throws UnknownHostException {
        return InetAddress.getLocalHost().getHostAddress();
    }

    public static void initializeDefaultPort(int port) {
        defaultPort = port;
    }

    static int getDefaultPort() {
        return defaultPort;
    }

}

Now, Let's focus on what information is contained in each of the mentioned three messages and how nodes in the cluster use handlers to update and exchange information to make the magic happen.

Do you remember this Photo?

Before going into the details of each of the messages, it would be nice to get accustomed to a very simple code construct “GossipDigest” which consists of the endpoint address, generation number and maximum version that has been seen for the endpoint.

public class GossipDigest implements Comparable<GossipDigest> {

    final InetAddressAndPort endpoint;
    final int generation; // generation stays the same when server is running and grows every time the node is started 
    final int maxVersion; // maximum version number is the biggest version number in EndPointState for this endpoint

    GossipDigest(InetAddressAndPort ep, int gen, int version){
        endpoint = ep;
        generation = gen;
        maxVersion = version;
    }

    public int compareTo(GossipDigest gDigest){
        if (generation != gDigest.generation)
            return (generation - gDigest.generation);
        return (maxVersion - gDigest.maxVersion);
    }
}

GossipDigestSynMessage

Node starting gossip exchange sends GossipDigestSynMessage, which includes a list of gossip digests.

public class GossipDigestSyn {

    final String clusterId;
    final String partioner;
    final List<GossipDigest> gDigests;

    public GossipDigestSyn(String clusterId, String partioner, List<GossipDigest> gDigests{
        this.clusterId = clusterId;
        this.partioner = partioner;
        this.gDigests = gDigests;
    }

    List<GossipDigest> getGossipDigests() {
        return gDigests;
    }
}

We have discussed that there is a Message Class that has a generic payload, GossipDigestSyn is the payload in this case.

Another important thing is that each message has an associated verb which is mapped to a VerbHandler, GossipDigestSynVerbHandler is the handler in this case.

On receipt of the GossipDigestSyn message, the peer node does the following :

  1. Get the sender’s node address (X in this case). This is used to send the Ack response.

  2. Extract the gossipDigestList from the syn messages.

  3. Construct ACK response which has the following two pieces of information

    1. DeltaEpStateMap (EndpointState map)for nodes

      1. which X doesn’t know about

      2. where Y has newer information than X.

    2. List of gossip digest for nodes

      1. which Y doesn't know about

      2. which have a lower version in Y than X.

  4. Send back the ACK response Note: The newness of information is determined using maxVersion sent in the gossip digest. The greater the value, the newer is the info.

public class GossipDigestSynVerbHandler extends GossipVerbHandler<GossipDigestSyn> {

    public static final GossipDigestSynVerbHandler instance = new GossipDigestSynVerbHandler();

    public void doVerb(Message<GossipDigestSyn> message) {

        InetAddressAndPort from = message.from();
        GossipDigestSyn gDigestMessage = message.payload;
        List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();

        List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
        Map<InetAddressAndPort, EndpointState> deltaEpStateMap = new HashMap<InetAddressAndPort, EndpointState>();
        Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
        // construct and send the GOSSIP_DIGEST_ACK msg
        Message<GossipDigestAck> gDigestAckMessage = Message.out(GOSSIP_DIGEST_ACK, new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap));
        MessagingService.instance().send(gDigestAckMessage, from);

        super.doVerb(message);
    }
}

GossipDigestAckMessage

This Ack gets sent out as a result of the receipt of a GossipDigestSynMessage by an endpoint. This is the 2 stage of the 3-way messaging in the Gossip protocol.

On receipt of the GossipDigestAck message, the initiator node does the following:

  1. Get the sender’s node address (Y in this case). This is used to send the Ack2 response.

  2. Extract out the gossipDigestList and EndPointStateMap from the Ack message's payload.

  3. Construct Ack2 response which contains

    1. deltaEpStateMap (EndpointState map)for nodes

      1. Which Y doesn’t know about

      2. Where X has newer information than Y.

  4. Send back the Ack2

public class GossipDigestAck {

    final List<GossipDigest> gDigestList;
    final Map<InetAddressAndPort, EndpointState> epStateMap;

    GossipDigestAck(List<GossipDigest> gDigestList, Map<InetAddressAndPort, EndpointState> epStateMap{
        this.gDigestList = gDigestList;
        this.epStateMap = epStateMap;
    }
}

public class GossipDigestAckVerbHandler extends GossipVerbHandler<GossipDigestAck> {

    public static final GossipDigestAckVerbHandler instance = new GossipDigestAckVerbHandler();

    public void doVerb(Message<GossipDigestAck> message) {

        InetAddressAndPort from = message.from();
        GossipDigestAck gDigestAckMessage = message.payload;
        List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
        Map<InetAddressAndPort, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();

        Map<InetAddressAndPort, EndpointState> deltaEpStateMap = new HashMap<InetAddressAndPort, EndpointState>();
        for (GossipDigest gDigest : gDigestList)
        {
            InetAddressAndPort addr = gDigest.getEndpoint();
            EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
            if (localEpStatePtr != null)
                deltaEpStateMap.put(addr, localEpStatePtr);
        }
        // construct GossipDigestAck2Message and send it
        Message<GossipDigestAck2> gDigestAck2Message = Message.out(GOSSIP_DIGEST_ACK2, new GossipDigestAck2(deltaEpStateMap));
        MessagingService.instance().send(gDigestAck2Message, from);
        super.doVerb(message);
    }
}

GossipDigestAck2Message

This ack gets sent out as a result of the receipt of a GossipDigestAckMessage. This is the last stage of the 3 way messaging of the Gossip protocol. GossipDigestAck2VerbHandler is pretty straightforward, the peer node (Y) receives the message and simply updates the local state with the information from EndPointStateMap.

public class GossipDigestAck2 {
    final Map<InetAddressAndPort, EndpointState> epStateMap;
    GossipDigestAck2(Map<InetAddressAndPort, EndpointState> epStateMap){
        this.epStateMap = epStateMap;
    }
    Map<InetAddressAndPort, EndpointState> getEndpointStateMap(){
        return epStateMap;
    }
}

public class GossipDigestAck2VerbHandler extends GossipVerbHandler<GossipDigestAck2> {
    public static final GossipDigestAck2VerbHandler instance = new GossipDigestAck2VerbHandler();
    public void doVerb(Message<GossipDigestAck2> message){
        Map<InetAddressAndPort, EndpointState> remoteEpStateMap = message.payload.getEndpointStateMap();
        Gossiper.instance.applyStateLocally(remoteEpStateMap);
        super.doVerb(message);
    }
}

This completes the round of gossip and both peer and the initiator nodes (node-X and node-Y for us) have now exchanged and updated information for the nodes they know about. They will carry this on with other nodes in the cluster.

  • The above implementations are very close but not exactly similar to Cassandra’s source code. Cassandra implements custom serializers for almost all of the above

  • There are a few buzz keywords such as “volatile”, “AtomicReference”, etc which a reader might not be aware of. These are meant to ensure thread-safety in concurrent programs.

References