Approaches of Cassandra's ring partitioning

Approaches of Cassandra's ring partitioning

Apache Cassandra

In the previous article, we talked about Cassandra's gossip protocol.
Today, We are going a further step to talk about ring partitioning concepts in terms of Cassandra.

Partitioning Approaches

Before diving into approaches to partitioning a database, let us clear up one thing in the aspect of partition and its relationship with replication. Partitioning & replication are not an either/or concept but they are mostly used in conjunction with each other to build a robust system. After we partition a database, each partition is replicated across nodes so that it can be recovered in case of a failure. However, you can use just replication if your dataset fits on a single node. but, You might not be able to operate a partitioned system without replication if your system is built of commodity hardware. Always remember that in distributed computing server failure is a question of when and not if.

On a higher level partitioning can look like the best solution ever. If you have existing storage where you store 1 million records and you have started seeing issues with high traffic load and you also feel that your datastore will soon run out of memory, then partitioning can look like a great option. Just split 1 million records into 10 partitions with 100k records each and now you gain both in sense of performance and also solve the out-of-memory issues. But on what basis do you split these records?

  • You can do it based on a record attribute. Example zip code. But if this attribute is not diversified enough across your dataset i.e. 90% of users belong to one zip code then you will end up with a single partition handling 90% of the load and the remaining 9 partitions being under-utilized by handling the remaining 10% of the load.

  • You can just spin up a random number generator and assign each record to a partition based on the random number. Considering that your random number generator is correct, based on the probability you will end up dividing your data evenly. However, now you don’t have the mapping of which record is stored in which partition. You can store this mapping while assigning partitions but then that is additional data that you need to handle in a distributed system.

One approach can be to assign a key to each record. This key can be as simple as the primary_id of the record and as complex as some multi-level hash of a string consisting of a primary_id coupled with a bunch of other record-specific attributes. This approach is better than the one we discussed above as now you control the criteria based on which you are distributing the dataset. Choosing something as simple as the modulus of primary_id by the number of nodes works as each record is going to have a unique primary_id and will be evenly spread across all records. Also now you have the information to locate a partition on which a particular record exists because you are aware of the partitioning criteria. We will discuss both partitioning based on the key and the hash key.

Based on the key

Partitioning based on a key is pretty straightforward to implement and understand. What you need to do is initially make a logical partition of your data based on a key and then allocate these partitions to respective nodes. So for example if we are maintaining a database of students which contains 1000 records. To split it into 10 partitions we can choose the student_id as key and divide the database according to student_id. By doing this we can move records from student_id 1 to 100 to partition 1, student_id 101 to 200 to partition 2 and so on.
This becomes easier to understand on the implementation layer but at the same time, this can lead to hot spots if the key is not chosen correctly. In the above example if we would have chosen a key as a class in which a student studies then we might end up with an uneven distribution. It might very well be the case that more than 50% of the students are in the primary standard and the remaining 50% are spread across the other standards. Hence choosing a key for partitioning requires understanding the dataset and deciding a criteria that can even out the partitions.

Based on the hash key

To avoid ending up in a hot spot a preferable approach is to use a hash of the decided key. A good hash function ensures that a range of keys is evenly distributed among all partitions. Hash function isn’t required to be overly complex as it is required to decide a partition and not store any user-specific information. So instead of assigning a range of keys to a partition, a range of hashes are assigned. Though using this approach we can not be sure of the spatial boundary of a key. So if we get a key with a value of 5 on a partition then we cannot be sure of the fact that we will get a key with a value of 1 or a value of 10 on the same partition (Assuming each partition has 100 keys) which could have been true in the key based partition. This might not be a big problem in general but certain applications rely on this kind of query pattern. Eg a weather application might query data for n-2 & n+2 days while fetching the data for a date n. This is based on the fact that users usually compare the weather with the last or next few days and this kind of query pattern allows to merge multiple queries into one in turn improving the performance of the application.

Problem

Both the above-mentioned approaches solve the partitioning problem to a certain extent. However, we have not yet discussed the most common scenario that Cassandra implemented i.e. failure.

  1. What happens when after partitioning our database, we start seeing node failures?

  2. What happens when we need to add more nodes to our system?

The 1 million records that we partitioned initially onto 10 partitions won’t scale when we reach 10 million records. Adding more partitions will require updating our hashing logic which was initially dependent upon several nodes in our system. The same goes for the case when we encounter a node failure as the number of nodes in our system decreases by one for a single node failure that in turn affects our hashing logic.

Most of these problems are solved by Consistent Hashing!

Consistent Hashing

We saw how the typical hashing approaches fail when we encounter a node failure or when we have to add more nodes to our system. To review the reason for failure let us consider a system with 3 nodes where a client interacts with a particular node based on the value they get for each key from the below function

hash(key) % number of nodes

So whatever value we get from the above function, we will store our key on the node assigned to that particular number. This will work perfectly fine and even satisfies our requirement for even distribution of the key. But we do have one problem. What changes do we need to make if we want to add another node to our system?

Our node allocation logic is no longer valid as it will not send any request to the new node. So to onboard the new node we will have to update the function to id_hash % 4 But doing just that doesn’t solve our problem because now the client won’t be able to find the correct nodes to read the keys. Initially, a key whose id_hash came out to be 10 was sent to Node_1 because id_hash(10) % 3 = 1. But now with the updated function, it will give a value of id_hash(10) % 4 = 2. When the client will try to read from Node_2, it will get a KEY_NOT_FOUND error.

So to make our system work correctly, we will have to rerun our updated function for all the keys and move the keys to the correct node. This is a lot of data movement due to rehashing when several nodes change and it will result in downtime for our application.

In case of the addition or removal of a node, the best system will result in minimal movement of data. This helps in achieving a system where we have to do very little work to handle a failure or to onboard a new node. This is where consistent hashing shows up.

Before jumping into the details of how consistent hashing works, visualize all the nodes located on a circular ring covering a certain range of hash values. This is different in terms of how we typically visualize a system consisting of multiple nodes. In the below example, we are assuming that our hash function will give us a value in the range of 0 – 31. The range is decided based on the values of a hash function you use and for simplicity, we are considering a smaller range.

Now if our hash for a key results in a value of 6, we will figure out the piece of pie where our value can be located. In our example, 6 will lie between Node_0 & Node_1. So we move in a clockwise direction and store the value in Node_1. The key will always be stored in the node that is the successor of the hash value. So a hash value of 10 will be stored on Node_2, a hash value of 20 will be stored on Node_3 and so on. The circle containing these nodes goes around infinitely. So, if a hash results in a value of 30 then we will continue moving in clockwise order and store it on Node_0.

As part of the consistent hashing we also store the same key in the next two nodes. So if we are storing a key on Node_0 then we also store it on Node_1 & Node_2 which is the next in the clockwise direction. This duplication of value on additional nodes is decided based on a configuration. So in the above case, the value is 2 hence we duplicate the value on 2 additional nodes.

Now let us see how the consistent hashing solved the problem we saw initially of adding a new node and removing a node in case of a failure.

Adding new node

Consider we will add a new node Node_4 between Node_3 & Node_0. We need to decide what keys we need to move to make our system ready for the client. Currently Node_0 consists of all the keys in a range 24 – 31. Now when there is another node in between this range, Node_0 will be made responsible for only the keys in range 28 – 31. So we will need to move keys from range 24 – 27 on Node_3.

And that’s it. We don’t need to touch any of the other nodes and our system will work just fine as the key mapping for other nodes is still the same. That's POGG!!

The only thing that other nodes need to do is to update the node id where they are going to duplicate their keys. So after adding the new node, Node_3 now replicated its keys to Node_4 & Node_0 whereas previously it was copying it to Node_0 & Node_1. This can be done as part of a background job.

Handling node failure

Consider what will happen when Node_1 fails. How do we distribute its keys which are in a range 0-7 that it holds among other nodes? Neighbor of Node_1 which is Node_2 takes over all the keys from Node_1. Now remember that as part of adding a key to any node, we duplicated that key to its neighboring nodes also. So if our system was functioning correctly before Node_1 failed then there is a high probability that Node_2 might already have all the keys from Node_1. We can run a background job to verify this and move any keys that we missed as part of duplication.

So even in case of a node failure our system works correctly without much data movement. Whenever we spin a new node to replace Node_1, It will be onboarded in the way we discussed above.


An edge case where consistent hashing might not work optimally is when a system has very few nodes. We might end up with an uneven distribution of keys across the nodes. One solution to this is to portray one node as multiple nodes. In other words Node_1 is represented as Node_1_1, Node_1_2 and so on to create an even distribution. So a system of say 3 nodes is portrayed as 9 nodes representing an even distribution of the circular ring. These are known as virtual nodes. One thing to ensure is that we want to distribute the virtual nodes randomly rather than sequentially to keep an even distribution.

Another advantage of having virtual nodes is better distribution of keys in case of a node failure. So instead of moving all the keys of the failed node onto another node, we move the keys from each of the virtual nodes to their neighbor. So if a node consisted of 100 keys and was represented as 10 virtual nodes then in case of failure we move 10 keys from each of the virtual nodes to their neighbor. This avoids overloading a node with a large chunk of data and keeps the distribution even across all nodes.

Consistent hashing is used by various data stores such as Cassandra, DynamoDb also uses consistent hashing with some additional improvements by leveraging virtual nodes.

This will be my last article for 2023. It was an amazing year.
See you in 2024 till that time.

Keep reading!