Kafka Administration App

I have been working with Kafka lately, I am building a microservices solution that utilizes Kafka for various tasks. I wanted a simple administration tool to help me with simple tasks for managing the Kafka environment on my development environments. Basically, I wanted the tool to do the following:

  1. Cluster
    1. Describe the Kafka cluster
    2. Describe the configuration
    3. Change the configuration of the cluster
    4. Manage ACLs
      1. Create
      2. Delete
  • Describe
  1. Manage Delegation Tokens
    1. Create a delegation token
    2. Describe delegation token
  • Renew delegation token
  1. Expire delegation token

 

  1. Manage Topics
    1. List topics
    2. Describe topics
    3. Create topics
    4. Delete Topics
    5. Increase the number of topic partitions
  2. Manage Consumer Groups
    1. Lists consumers and the topics they subscribed too
    2. List consumers offsets

 

And the list can go on and on, to simplify the code I started this as a console application in Scala, with IntelliJ community edition. I have not finished the tool yet, but I got it to a usable level that is helping me in my most day to day tasks.

I thought it might be helpful to share the code maybe it helps others. Please do not hesitate to contact me with your comments or thoughts about this tool.

Figure 1 shows the project structure.

091918_2055_KafkaAdmini1.png

Figure 1: Project Structure

When you run the tool you get the following main menu

********************************************************************************

*****    Kafka Admin Tool                                                  *****

*****    (c) Genetic Thought Software Inc 2018                             *****

********************************************************************************

*****    Note: set Kafka environment in environment.properties             *****

*****      Connecting to server: localhost:9092                            *****

*****      Connected to server: localhost:9092                             *****

********************************************************************************

*****    Commands:                                                         *****

*****      1- Describe Cluster                                             *****

*****      2- Topics                                                       *****

*****      3- Consumers                                                    *****

*****      4- Exit                                                         *****

********************************************************************************

*****        Enter selection:                                              *****

 

If for example you selected the Topics menu you get the following menu

********************************************************************************

*****    Topics Commands:                                                  *****

*****      1- List Topics                                                  *****

*****      2- Describe Topics                                              *****

*****      3- Create Topics                                                *****

*****      4- Delete Topic                                                 *****

*****      5- Increase Topic Partitions                                    *****

*****      6- Return                                                       *****

********************************************************************************

*****        Enter selection:                                              *****

1

*****      Topics in this cluster are:                                     *****

*****        Topic: US                                                     *****

*****        Topic: China                                                  *****

*****        Topic: Mexico                                                 *****

*****        Topic: Korai                                                  *****

*****        Topic: Canada                                                 *****

*****        Topic: India                                                  *****

*****        Topic: Italy                                                  *****

*****        Topic: Germany                                                *****

********************************************************************************

 

 

The mainApp is the objects that extends the App trait, here I just setup the logger and load the properties form environment.properties file. Where you set

bootstrap.servers to Kafka node. Initialize the KafkaAdmin object admin and start the controller mainMenu which controls the rest of the lifecycle till the user selects exit.

 

/*   *********************************************************************
(c) Genetic Thought Software Inc.
*********************************************************************
Author: Moustafa Refaat
email: MRefaat@GeneticThought.com
**********************************************************************/

import com.geneticthought.Kafka.Administration.KafkaAdmin
import com.geneticthought.Utilities.{Helpers, Printer}
import org.slf4j.LoggerFactory

object mainApp extends App {

val logger = LoggerFactory.getLogger(“MainApp”)
logger.info(“Starting”)
val props = Helpers.getProperties(“environment.properties”)
Printer.printHeader
Printer.printBoxedString(s”Connecting to server: ” + props.getProperty(“bootstrap.servers”), 1)
val admin = new KafkaAdmin(props)
Printer.printBoxedString(s”Connected to server: ” + props.getProperty(“bootstrap.servers”), 1)
Controller.mainMenu(admin)
Printer.printBoxedString(s”Disconnecting from server: ” + props.getProperty(“bootstrap.servers”), 1)
admin.close
}

 

 

Below is the Controller Object Listing which is very simple implementation that take advantage of some of the Scala functional capabilities

*   *********************************************************************
(c) Genetic Thought Software Inc.
*********************************************************************
Author: Moustafa Refaat
email: MRefaat@GeneticThought.com
**********************************************************************/

import com.geneticthought.Kafka.Administration.KafkaAdmin
import com.geneticthought.Utilities.{Helpers, Printer}

import scala.io.StdIn

object Controller {

def mainMenu(admin: KafkaAdmin): Unit = menu(4, admin, Printer.printCommandsMenu)(mainMenuCommands)

def menu(exCommand: Int, admin: KafkaAdmin, mn: () => Unit)(cmd: (Int, KafkaAdmin) => Unit): Unit = {
var command: Int = 0
while (command != exCommand) {
mn()
Printer.printBoxedString(“Enter selection: “, 2)
var str = StdIn.readLine()
command = Helpers.toInt(str)
cmd(command, admin)
}
}

def mainMenuCommands(cmd: Int, admin: KafkaAdmin): Unit = {
cmd match {
case 1 => describeCluster(admin)
case 2 => TopicsCommand(admin)
case 3 => ConsumerCommands(admin)
case 4 => Printer.printBoxedString(“Exiting “, 2)
case _ => Printer.printBoxedString(“Invalid selection: “, 2)
}
}

def describeCluster(admin: KafkaAdmin): Unit = {
val cluster = admin.descibeCluster
println(cluster.toString)
}

def TopicsCommand(admin: KafkaAdmin): Unit = menu(6, admin, Printer.printTopicsMenu)(topicsMenuCommands)

def topicsMenuCommands(cmd: Int, admin: KafkaAdmin): Unit = {
cmd match {
case 1 => listTopics(admin)
case 2 => describeTopics(admin)
case 3 => createTopics(admin)
case 4 => deleteTopics(admin)
case 5 => increaseTopicPartations(admin)
case 6 => Printer.printBoxedString(“Return to main Menu “, 2)
case _ => Printer.printBoxedString(“Invalid selection: “, 2)
}
}

def listTopics(admin: KafkaAdmin): Unit = {
val topics = admin.getTopics
if (topics.size > 0) Printer.printBoxedString(“Topics in this cluster are:”, 1)
else Printer.printBoxedString(“No Topics in this cluster defined yet”, 1)
for (x <- admin.getTopics) Printer.printBoxedString(s”Topic: $x, 2)
}

def describeTopics(admin: KafkaAdmin): Unit = {
val topics = getTopics
if (topics.size <= 0) {
Printer.printBoxedString(s”No Topics to Describe:$topics , 2)
return
}
Printer.printBoxedString(s”Describe Topics:$topics , 2)

val tpdes = admin.describeTopics(topics)
for (tpd <- tpdes)
Printer.printBoxedString(tpd.toString, 4)
Printer.printBoxedString(s”Describe Topics:$topics  Done”, 2)
}

def createTopics(admin: KafkaAdmin): Unit = {
val topics = getTopics.map(x => (x, 1, 1.toShort)).toList
if (topics.size <= 0) {
Printer.printBoxedString(s”No Topics to Create:$topics , 2)
return
}
admin.createTopics(topics)

Printer.printBoxedString(s”Created Topics: “, 2)
for (t <- topics) Printer.printBoxedString(s”(topicName,Partions,Replication):$t , 4)

}

def deleteTopics(admin: KafkaAdmin): Unit = {
val topics = getTopics
if (topics.size <= 0) {
Printer.printBoxedString(s”No Topics to Delete:$topics , 2)
return
}
admin.deleteTopics(topics)
Printer.printBoxedString(s”Deleted Topics:$topics , 2)
}

def increaseTopicPartations(admin: KafkaAdmin): Unit = {
val topics = getTopics
if (topics.size <= 0) {
Printer.printBoxedString(s”No Topics to Increse Partitions:$topics , 2)
return
}
Printer.printBoxedString(“Enter Total Patitions No: “, 2)
var str = StdIn.readLine()
val partNo = Helpers.toInt(str)
if (partNo <= 1) {
Printer.printBoxedString(s”Partitions Number has to be ?> 1 you entered :$partNo , 2)
return
}
val topicsIncrease: Map[String, Int] = topics.map(x => x -> partNo).toMap
admin.increasePartitions(topicsIncrease)
Printer.printBoxedString(s”Increased Topics $topics partitions to :$partNo , 2)
}

def getTopics: List[String] = {
var cont = true
var
result: List[String] = null
do
try
{
Printer.printBoxedString(“Enter Topics names seperated by comma”, 1)
val input = StdIn.readLine() + “,”
result = input.split(“,”).toList
cont = false
}
catch {
case _ => Printer.printBoxedString(“Invalid Topics List”, 2)

}
while (cont)

result
}

def ConsumerCommands(admin: KafkaAdmin): Unit = menu(2, admin, Printer.printConsumersMenu)(consumersMenuCommands)

def consumersMenuCommands(cmd: Int, admin: KafkaAdmin): Unit = {
cmd match {
case 1 => listConsumers(admin)
case 2 => Printer.printBoxedString(“Return to main Menu “, 2)
case _ => Printer.printBoxedString(“Invalid selection: “, 2)
}
}

def listConsumers(admin: KafkaAdmin): Unit = {

val consumers = admin.listConsumers
Printer.printBoxedString(s”Consumers List: “, 2)
consumers.foreach(x => Printer.printBoxedString(x, 4))
Printer.printBoxedString(s”end of Consumers List “, 2)
}

}

 

Below is the KafkaAdmin Class Listing which is implementing the main functionality of performing actions/queries against Kafka cluster

*   *********************************************************************
(c) Genetic Thought Software Inc.
*********************************************************************
Author: Moustafa Refaat
email: MRefaat@GeneticThought.com
**********************************************************************/

package com.geneticthought.Kafka.Administration

import java.util.Properties

import org.apache.kafka.clients.admin._

import scala.collection.JavaConverters

class KafkaAdmin(val props: Properties) {

private val client: AdminClient = AdminClient.create(props)

def close: Unit = this.client.close()

/* Cluster operations */
def descibeCluster: ClusterInfo = new ClusterInfo(this.client.describeCluster())

/* Topics region */
def getTopics: Set[String] = JavaConverters.asScalaSet(this.client.listTopics.names.get()).toSet

def createTopics(topics: List[(String, Int, Short)]): Unit = {
val kafkaTopics = scala.collection.mutable.ArrayBuffer[NewTopic]()
for (topic <- topics) kafkaTopics += new NewTopic(topic._1, topic._2, topic._3)
val opresult = this.client.createTopics(JavaConverters.asJavaCollection(kafkaTopics))
opresult.all()
}

def deleteTopics(topics: List[String]): Unit = {
val result = this.client.deleteTopics(JavaConverters.asJavaCollection(topics))
result.all()
}

def describeTopics(topics: List[String]): Iterable[TopicDescription] = {
val describeResult = this.client.describeTopics(JavaConverters.asJavaCollection(topics))
val topicsdesc = describeResult.all().get()
JavaConverters.collectionAsScalaIterable(topicsdesc.values())
}

def increasePartitions(partitions: Map[String, Int]): Unit = {

val partionsRequest: scala.collection.mutable.ListMap[String, NewPartitions] =
new scala.collection.mutable.ListMap[String, NewPartitions]()
for ((k, v) <- partitions) {
partionsRequest += (k -> NewPartitions.increaseTo(v))
}
val requestReslut = this.client.createPartitions(JavaConverters.mutableMapAsJavaMap(partionsRequest))
requestReslut.all()
}

/* Configuration region */
/* Records region */
/* consumers region */
def listConsumers: List[String] = {
val consumers = this.client.listConsumerGroups().all().get().toArray()
consumers.map(x => x.toString).toList
}

}

 

The rest of the code is available for download.

 

Hope this helps

 

Advertisements

Kafka VS BizTalk as Integration Platform

Apache Kafka is all about getting large amounts of data from one place to another, rapidly, and reliably. Apache Kafka is a messaging system that is tailored for high throughput use cases, where vast amounts of data need to be moved in a scalable, fault tolerant way. I have been designing and building integrations for more than a decade using mostly Microsoft Technologies (BizTalk, WCF). I have been working lately with other integration technologies from Oracle (Fusion), IBM (ODM), Informatica and Kafka. A few aspects of Kafka seems interesting to me that cover some of the issues that I had points of pain with in other integration platforms. Here some of the points that crossed my mind:

Saving Data locally rather than in a Data Center or DB

Figure 1: How BizTalk works

BizTalk works on by saving all messages into MessageBox Database that resides on a SQL server. The SQL server is usually implemented as a SQL cluster with the data files saved on a SAN disk. This introduces some latency in the processing and if as some organizations do use the SQL Cluster for other systems beside BizTalk such as SharePoint, the stress on SQL cluster grows. Tuning the SQL cluster for multiple systems is not a trivial endeavor as there are some conflicts between the requirements of these systems.

Figure 2: BizTalk Infrastructure Cluster

Of course you can use a dedicated DB cluster for each system but the cost of the implementation sky rockets. The fact that Kafka just saves the message data locally on the drive makes processing the message faster and hence the high through put of Kafka

Keeping the messages available for reprocessing for a configurable Duration of time

BTS wants to handle the messages as quickly as possible to move them to the Archive. If a message lingers in the message box BTS raises an error. If too many messages lingers in the message box, it would degrade BizTalk performance if not bring the whole cluster down. So messages are cleared from the message box as soon as possible. Now, if a receiving system has an issue with a message, it has no way of asking for that message again from BTS, and resubmitting that message manually is a big hustle. Kafka on the other hand keeps messages for 168 hours by default. And you can configure how long you want to keep the messages. Receiving systems called consumers in Kafka ask for message from certain topics, partitions and starting at a certain point (called message offset in Kafka). This makes recovering from erroneous processing much less of a hustle.

Keeping all the logic in the Client systems (Consumers, Producers)

Figure 3: Kfaka Producers and Consumers

BizTalk has many capabilities that allows the designer/developer to put logic in it. You got Orchestrations, Maps that transforms between different schemas, pipeline components, and you can inject custom code to all these components. This kind of seduces the designers and developers into putting integration logic in BTS. While this has its benefits, the Spaghetti integration shown in figure 4, where application A keeps the integration logic with all other applications integrating with.

Figure 4: Spaghetti Integration

Now when Application D is replace then all the applications integration with it has to be updated (Code change, QA, UAT, Deployment to production etc.). This has repel effect especially in large enterprises with many systems. Keeping the integration logic contained in BTS or ESB (HUB) would alleviate this situation as there would be only one system that needs to be updated and changed.

Figure 5: Hub and Spoke Integration

In Kafka all the integration logic is encapsulated in the Kafka producer and Kafka Consumer which are usually parts of the Applications. This might lead to the Spaghetti situation described in Figure 4. One might be tempted to create a hub that includes all the producers and consumers for Kafka, but really you will need to think this one through. Ok this is a big issue, I am working on a solution for it.

Scalability

While BizTalk can scale horizontally and vertically no one can deny that Kafka can scale much better than BizTalk.

Integration Adapters

BizTalk comes with many adapters and integration accelerators for various systems that makes it quicker and easier to integrate different systems. Kafka lacks in that regard.

Introduction to Apache Kafka

Apache Kafka is all about getting large amounts of data from one place to another, rapidly, and reliably. In computing terms Apache Kafka is a messaging system that is tailored for high throughput use cases, where vast amounts of data need to be moved in a scalable, fault tolerant way. That is why companies like LinkedIn, Netflix, Uber and Airbnb utilize Apache Kafka to provide the messaging infrastructure that can handle hundreds of billions of messages that amount to several hundred terabytes of data being produced, moved around and consumed per day.

Why Apache Kafka?

Figure 1: Sample Enterprise Subsystems

Figure 1 shows a part of how a large enterprise systems looks like. In large companies, there are hundreds of applications all needing data to operate. Now, whether it be creating logs, records in databases, flat files, key value pairs, binary files, or messages, all of these applications are creating data at an incredible rate. Oftentimes that rate can strain existing data stores and require more stores to take on the load. When that happens, you have issues related to getting the data where it needs to be and enabling applications to find it. Furthermore, as businesses change, the variety of the data increases, making the types of applications and data stores change as well. Now, this leads the enterprise systems integrations system to become a complex web of point-to-point data movements that are very hard to manage and work with. Enterprise IT departments have utilized tools and methods to make this complex distribution topology possible. Each of these tools comes with its fair share of trade-offs:

  • Database replication and log shipping: This method is limited to a certain kind of data movement between relational databases that support replication, and that’s it. The way a database implements replication is very specific to the database, and therefore doesn’t work across vendors. So in a heterogeneous database environment, this becomes a limitation. As a point to point integration, there is a significant amount of coupling between the source and the target. Changes to the schema have a direct impact on replication. So as your requirements change, the ripple effect can introduce challenges to your replication architecture
  • Extract, Transform, and Load (ETL): is used for integrating data between different sources and targets. Every ETL job that runs is a custom application, written by a developer who specializes in ETL. As the data environment increases in complexity, so do the jobs, and as most ETL systems centralize the execution of these jobs, the performance and scalability become strained, as concurrent or sequential jobs compete for the limited resources, which may require multiple ETL environments to exist, which further increases the complexity of the enterprise systems.
  • Message Broker, Messaging makes a lot of sense because it establishes a fairly simple paradigm for moving data between applications and data stores; however, when it comes to a large scale implementation, traditional message systems can struggle. Namely with scalability. The means to collect and distribute data as messages relies on the role of a messaging broker, which is oftentimes a bottleneck. There are a lot of variables that determine the reliability and performance of a messaging system, a big one being message size. Larger messages can put severe strain on message brokers stores, and this a challenge because you may not be able to control messages coming from some systems. Furthermore, a messaging environment is dependent on the ability for message consumers to actually consume at a reasonable rate. There is also the challenge of fault-tolerance. If a consumer pops something off the queue, it’s probably gone. Faults in the consuming applications can happen for any reason, where this becomes a problem is when the bug incorrectly processes the message it is getting from the broker. The broker’s job is to turn over the messages, it doesn’t and can’t keep them around. So if a consumer consumes the message, processes it incorrectly, it can’t go back to retrieve the message again because it’s not there anymore.

Figure 2: Broker or Hub and Spoke System

Technically, messaging systems are considered a form of Middleware, where you need to write complex logic to handle data movement between applications and data stores. Your code needs to have intimate knowledge of every data store, and that knowledge will likely be specific to the data store type and provider. Furthermore, you will likely be in the realm of dealing with distributed coordinate logic, multiphase commits, and error handling to consistently manage data. This is extremely complex. With every application change, new data store, new schema, you have to revisit this code.

LinkedIn’s Search for a Better Solution

This is the typical enterprise challenge when it comes to handling growing data sets, moving faster and faster through systems. Surely, there has to be a better way to move data cleanly without a complex web of different integration topologies. Reliably, as to reduce the impact of any one component’s slowness or availability on the system. Quickly, as data movement and access is only getting faster for real-time use cases, and finally, autonomously, reducing the coupling between components so we can improve or change parts of the system without a cascading effect. Kafka started as a LinkedIn internal project in 2009. Kafka refers to the German language writer, Franz Kafka, whose work was so freakish and surreal, it inspired an adjective based on his name. For LinkedIn the data infrastructure and the ability to work with it had become so nightmarish and scary that they named their solution after the author whose name would best describe the solution they were hoping to escape from.

Apache Kafka Design Guiding Principles

  1. Able to handle large volumes of data in the terabytes and beyond.
  2. Designed to scale out by adding machines to seamlessly share the load.
  3. Failure tolerant => data had to be reliably managed, transmitted, and made durable in the case of failure.
  4. Loosely coupled application producers and consumers, but can engage in common data exchanges. It would be unacceptable for one application’s runtime conditions to affect another’s. To enable this loosely coupled paradigm between producers and consumers, they wanted to embody common and simple messaging semantics of publish-subscribe. Independent data producing applications would send data on a topic, and any interested consuming application could subscribe and receive data on that topic, which it could process, and in turn, produce on a different topic for others to consume.

Figure 3: Kafka Cluster

As a central broker of data, Kafka enables disparate applications and data stores to engage with one another in a loosely coupled fashion by conveying data through messaging topics which Kafka manages at scale and reliably. Regardless of the system, the vendor, the language, or runtime, all can integrate into this data fabric, provided by none other than Apache Kafka. Kafka’s development started in 2009, and its first deployment was in 2010. Within the next year, LinkedIn hardened Kafka to a point that they felt it could be released as an open source project under the Apache Software Foundation in 2011. Very soon after its submission to the Apache incubator, it achieved top-level status and has become one of the most adopted tools in the Apache ecosystem. Since 2015, Apache Kafka’s adoption rate has grown 700%, as the software development community contributes more and more capabilities to its code base.

Apache Kafka’s Architecture

Figure 4: Kafka Brokers

Apache Kafka is a publish / subscribe messaging system. A publisher creates some data and sends it to a specific location where an interested and authorized subscriber can retrieve the message and process it. In Kafka calls publishers= producers, and the subscribers consumers. Producers and consumers are simply applications that you write or use to implement the producing and consuming APIs. Now, the producer sends its messages to w a specific location referred to as a topic, which is really a collection or grouping of messages. Topics have a specific name that can be defined upfront or on-demand, as long as producers know the topic name and have permission to send to it, messages can be sent to that specific location. The same goes for consumers. Consumers retrieve messages based on the topic it is interested in. The messages and their topics are kept in the Broker, as it is in other messaging systems. The Kafka Broker is a software process that runs on a machine. The Broker uses the file system to store messages which it categorizes as topics. Like any executable, you can run more than one on a machine, but each must have unique settings so that they don’t conflict with one another. It is in the Kafka Broker where the differences between other messaging systems become apparent.

The Apache Kafka Cluster

Figure 5: Kafka Cluster components

How the Kafka Broker handles messages in their topics is what gives Kafka its high throughout capabilities. Achieving high throughput is largely a function of how well a system can distribute its load and efficiently process it on multiple nodes in parallel. With Apache Kafka, you can scale out the number of brokers as much as needed to achieve the levels of throughput required, and all of this without affecting existing producer and consuming applications. To achieve h high levels of reliability the Apache Kafka architecture utilizes the Cluster concept. A Kafka Cluster is a grouping of multiple Kafka Brokers. A Kafka cluster is just a grouping of Brokers, it doesn’t matter if they’re running on their own machines or not, what matters is how independent Brokers are grouped to form a cluster. The grouping mechanism that determines a cluster’s membership of Brokers is an important part of Kafka’s architecture, and what really enables its ability to scale to thousands upon thousands of Brokers and be distributed in a fault-tolerant way. For the sake of putting down a placeholder, this is where Apache Zookeeper comes in.

Figure 6: Kafka nodes relationships

A system is a collection of resources that have instructions to achieve a specific goal or function. A distributed system is one that consists of multiple independent resources, also known as workers or nodes; sometimes even called worker nodes. The reason there are multiple nodes is to spread the work around, to get more done. In order to do that, there needs to be coordination amongst all of the available working nodes to ensure consistency and optimal progress towards the overall task or goal at hand. In Kafka, these worker nodes are the Kafka brokers. A controller is just a worker node like any other. The worker node selected as the controller is commonly the one that’s been around the longest. The controller has some critical responsibilities:

  1. Maintain an inventory of what workers are available to take on work.
  2. Maintain a list of work items that has been committed to and assigned to workers,
  3. And maintain active status of the workers and their progress on assigned tasks.

Once a controller is established, and the workers are assigned and available a Kafka cluster. When a task comes in, as an example, from a Kafka producer, the controller has to make a decision which worker should take it. There are a lot of factors at play here:

  1. The controller needs to know who is available and in good health
  2. The controller needs to know what risk policy should govern its assignment decisions. An example for a risk policy is the redundancy level, the thing that determines what level of replication to employ in case an assigned worker fails. That means each task given to a worker must also be given to at least one of the worker’s peers in the event of an unexpected catastrophe. For an assignment, if the controller determines redundancy is required, it will promote a worker into a leader, which will take direct ownership of the task assigned. It will be the leader’s job to recruit two of its peers to take part in the replication. Once peers have committed to the leader, a quorum is formed, and these committed peers now take on a new role in relation to a leader, a follower. If for whatever reason a leader cannot get a quorum, the controller will reassign tasks to leaders that can. In Apache Kafka, the work that the cluster of Brokers performs is receiving messages, categorizing them into topics, and reliably persisting them for eventual retrieval.

Distributed Consensus with Apache Zookeeper

Virtually every component within a distributed system has to keep some form of communication going between the nodes. Besides the obvious data payloads being transferred as messages, there are other types of network communications happening that keep the cluster operating normally. For example, events related to Brokers becoming available and requesting cluster membership or Broker name lookups, retrieving bootstrap configuration settings, and being notified of new settings consistently and in a timely fashion, events related to controller and leader election and health status updates, like heartbeat events. That is where Apache Zookeeper comes in. Zookeeper serves as a centralized service for metadata about vast clusters of distributed nodes needing bootstrap and runtime configuration information, health and synchronization status, and cluster and quorum membership, including the roles of elected nodes. Zookeeper itself is a distributed system, and for it to run reliably, has to have multiple nodes which form what is called a Zookeeper ensemble. An ensemble’s like saying a cluster. For Kafka, because of the type of work Zookeeper ensemble performs, it is generally not needed to have more than one ensemble to power one or many Kafka clusters.

Figure 7: Full Apache Kafka Cluster

At the heart of Apache Kafka, you have cluster, consists of possibly hundreds of independent Brokers. Closely associated with the Kafka cluster, you have a Zookeeper environment, which provides the Brokers within a cluster, the metadata it needs to operate at scale and reliably. As this metadata is constantly changing, connectivity and chatter between the cluster members and Zookeeper is required. Of course, the cluster doesn’t do much unless if you put it to work, and that’s where Kafka producers and consumer applications come in. Each of these components can scale out to take on more demand and increase levels of reliability and availability.