Distributed Consensus using Leader Election Algorithm

Distributed Consensus using Leader Election Algorithm

Introduction

In a distributed system, we often face the challenge to manage multiple clusters or servers in an efficient way. Distributed systems can be difficult to understand, mainly because the knowledge surrounding them is quite distributed. It involves a set of distinct processes (e.g., computers) passing messages to one another and coordinating to accomplish a common objective (i.e., solving a computational problem).

A distributed system is a group of computers working together to achieve a unified goal.

As I mentioned, there are hundreds of architectures for a distributed system. For example, a single computer can also be viewed as a distributed system: the central control unit, memory units, and input-output channels are separate processes collaborating to complete an objective.

So every distributed system has a specific set of characteristics. These include:

  • Concurrency
  • Resource Sharing
  • Heterogenity
  • Openness
  • Scalability
  • Fault Tolerance
  • Transparency

What means to have a consensus in a distributed system

A consensus algorithm is one that allows all the participants in a distributed system to choose a value from a set in such a way that all the participants choose the same value. A solution to the consensus problem is a distributed algorithm that has the following properties:

  • At the beginning, each participant has an initial value.
  • At the end, all the participants must have the same value.
  • The common final value must be the initial value of at least one of the participants.

Sounds confusing, right ? Let me clarify.

Consensus is a fundamental problem in fault-tolerant distributed systems. It involves multiple servers agreeing on values. Once they reach a decision on a value, that decision is final. Typical consensus algorithms make progress when any majority of their servers is available; for example, a cluster of 5 servers can continue to operate even if 2 servers fail. If more servers fail, they stop making progress (but will never return an incorrect result).

So how does Leader Election help

Leader election is the simple idea of giving one thing (a process, host, thread, object, or human) in a distributed system some special powers. Those special powers could include the ability to assign work, the ability to modify a piece of data, or even the responsibility of handling all requests in the system.

Leader election is a powerful tool for improving efficiency, reducing coordination, simplifying architectures, and reducing operations. On the other hand, leader election can introduce new failure modes and scaling bottlenecks. In addition, leader election may make it more difficult for you to evaluate the correctness of a system.

So enough about the introduction. Lets see how we can implement it practically. If you want to know more about distributed consensus or leader election mechanism, refer to this beautiful representation of the concepts: Raft

How to apply this theory in reality

Lets see how we can apply the above theory in a simple distributed application. We will use Apache Zookeeper client APIs to interact or communicate within our cluster.

Zookeper - ZooKeeper is a high-performance coordination service for distributed applications. It exposes common services - such as naming, configuration management, synchronization, and group services - in a simple interface so you don't have to write them from scratch. You can use it off-the-shelf to implement consensus, group management, leader election, and presence protocols. And you can build on it for your own, specific needs.

You can find its basic Java API docs in here. So to start with lets create a basic code to connect to zookeeper using Java.

Dependencies in pom.xml:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.14</version>
</dependency>

Now lets define a class LeaderElection.java:

public class LeaderElection implements Watcher
{
    private static final String ZOOKEEPER_ADDRESS = "localhost:2181";
    private ZooKeeper zooKeeper;
    private static final int SESSION_TIMEOUT = 3000;
    private static final String ELECTION_NAMESPACE = "/election";
    private String currentZNodeName;

    public void connectToZookeper() throws IOException{
        this.zooKeeper = new ZooKeeper(ZOOKEEPER_ADDRESS, SESSION_TIMEOUT, this);
    }

    public void run() throws InterruptedException {
        synchronized (zooKeeper) {
            zooKeeper.wait();
        }
    }

    public void close() throws InterruptedException {
        zooKeeper.close();
    }

    public void process(WatchedEvent event) {
        switch(event.getType()) {
        case None:
            if (event.getState() == Event.KeeperState.SyncConnected) {
                System.out.println("Successfully connected to zookeeper!!");
            } else {
                synchronized (zooKeeper) {
                    System.out.println("Disconnected from zookeeper event");
                    zooKeeper.notifyAll();
                }
            }
            break;
        default:
            break;
        }

    }
}

and initiate a main method by calling:

public static void main( String[] args ) throws IOException, InterruptedException, KeeperException
    {
        LeaderElection leaderElection  = new LeaderElection();
        leaderElection.connectToZookeper();
        leaderElection.run();
        leaderElection.close();
        System.out.println("Disconnected from zooKeeper, exiting application!!");
    }

This code will help you in connecting to zookeeper and close the connection after a session timeout.

Leader Election - The Herd Effect

Now lets enhance the code to add Leader Election logic. That's not all, though. It is important to watch for failures of the leader, so that a new client arises as the new leader in the case the current leader fails. A trivial solution is to have all application processes watching upon the current smallest znode, and checking if they are the new leader when the smallest znode goes away (note that the smallest znode will go away if the leader fails because the node is ephemeral). But this causes a herd effect: upon a failure of the current leader, all other processes receive a notification, and execute getChildren on "/election" to obtain the current list of children of "/election".

Before we start lets see what we have got.

Apache Zookeeper client provides some basic APIs like:

  • getData() - It returns the data stored for each node.
  • getChildren() - It returns the children nodes and their ids.
  • exists() - It checks whether the namespace exists or not.

Other than this, it also provides features like Watchers and Triggers. It can monitor each event id and provide a State update based upon which you can trigger an action.

MTPx9bdavt.gif

So lets use this functionalities and update our use-case in LeaderElection.java:

public void volunteerForLeadership() throws KeeperException, InterruptedException {
    String zNodePrefix = ELECTION_NAMESPACE + "/c_";
    String zNodeFullPath = zooKeeper.create(zNodePrefix, new byte[] {}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

    System.out.println("znode_name: " + zNodeFullPath);
    this.currentZNodeName = zNodeFullPath.replace(ELECTION_NAMESPACE + "/", "");
}

public void reelectLeader() throws KeeperException, InterruptedException {
    Stat predecessorStat = null;
    String predecessorZnodeName = "";

    while (predecessorStat == null) {
        List<String> children = zooKeeper.getChildren(ELECTION_NAMESPACE, false);
        Collections.sort(children);

        String smallestChild = children.get(0);

        if (smallestChild.equals(currentZNodeName)) {
            System.out.println("I am the leader !!");
        } else {
            System.out.println("I am not the leader ");
            int predecessorIndex = Collections.binarySearch(children, currentZNodeName) - 1;
            predecessorZnodeName = children.get(predecessorIndex);
            predecessorStat = zooKeeper.exists(ELECTION_NAMESPACE + "/" + predecessorZnodeName, this);
        }
    }

    System.out.println("Watching node: " + predecessorZnodeName);
    System.out.println();
}

public void watchTargetZNode() throws KeeperException, InterruptedException {
    Stat stat = zooKeeper.exists(ELECTION_NAMESPACE, this);
    if (stat == null) {
        return;
    }

    byte[] data = zooKeeper.getData(ELECTION_NAMESPACE, this, stat);
    List<String> children = zooKeeper.getChildren(ELECTION_NAMESPACE, this);

    System.out.println("Data: " + new String(data) + " Children: " + children);
}

public void process(WatchedEvent event) {
    switch(event.getType()) {
    case None:
        if (event.getState() == Event.KeeperState.SyncConnected) {
            System.out.println("Successfully connected to zookeeper!!");
        } else {
            synchronized (zooKeeper) {
                System.out.println("Disconnected from zookeeper event");
                zooKeeper.notifyAll();
            }
        }
        break;
    case NodeDeleted:
        try {
            reelectLeader();
        } catch(KeeperException e) {
        } catch(InterruptedException e) {

        }
        System.out.println(ELECTION_NAMESPACE + " got deleted!!");
    case NodeCreated: 
        System.out.println(ELECTION_NAMESPACE + " got created!!");
    case NodeDataChanged: 
        System.out.println(ELECTION_NAMESPACE + " data changed!!");
    case NodeChildrenChanged: 
        System.out.println(ELECTION_NAMESPACE + " gchildren changed!!");
    default:
        break;
    }

    try {
        watchTargetZNode();
    } catch (KeeperException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

With this we can update our main method like this:

public static void main( String[] args ) throws IOException, InterruptedException, KeeperException
{
    LeaderElection leaderElection  = new LeaderElection();
    leaderElection.connectToZookeper();
    leaderElection.volunteerForLeadership();
    leaderElection.reelectLeader();
    leaderElection.run();
    leaderElection.close();
    System.out.println("Disconnected from zooKeeper, exiting application!!");
}

Now we have Watchers for each instance who will be watching the previous node instance so that whenever the leader goes down, then they can call for re-election and choose a new leader. Similarly if one of the children node goes down all will talk with each other and update that one of their peer is down.

Pf7TAunAg4.gif

See it in action

CmUKUQitrh.png

Source Code

You can find the source code in: