Kafka and mongodb
In today’s data landscape, no single system can provide all of the required perspectives to deliver real insight. Deriving the full meaning from data requires mixing huge volumes of information from many sources.
At the same time, we’re impatient to get answers instantly; if the time to insight exceeds 10s of milliseconds then the value is lost – applications such as high frequency trading, fraud detection, and recommendation engines can’t afford to wait. This often means analyzing the in ow of data before it even makes it to the database of record. Add in zero tolerance for data loss and the challenge gets even more daunting.
As the number, variety, and velocity of data sources grow, new architectures and technologies are needed.
Apache Kafka and data streams are focused on ingesting the massive ow of data from multiple re-hoses and then routing it to the systems that need it – ltering, aggregating, and analyzing en-route.
Enterprise messaging systems are far from new, but even frameworks from the last 10 years (such as ActiveMQ and RabbitMQ) are not always up to the job of managing modern data ingestion and routing pipelines. A new generation of technologies is needed to consume and exploit today’s data sources. This paper digs into these technologies (Kafka in particular) and how they’re used. The paper also examines where MongoDB ts into the data streaming landscape and includes a deep dive into how it integrates with Kafka.
How Kafka Works & What it Provides
Kafka provides a exible, scalable, and reliable method to distribute streams of event data from one or more producers to one or more consumers. Examples of events (or messages) include:
• A periodic sensor reading such as the current temperature
• A user adding an item to the shopping cart in an online store
• A Tweet being sent with a speci c hashtag
• A log entry generated for each click in a web application
Streams of Kafka events are organized into topics. A producer chooses a topic to send a given event to and consumers select which topics they pull events from. For example, a nancial application could pull NYSE stock trades from one topic, and company nancial announcements from another in order to look for trading opportunities.
The consumers typically receive streamed events in near real-time, but Kafka actually persists data internally, allowing consumers to disconnect for hours and then catch up once they’re back online. In fact, an individual consumer can request the full stream of events from the rst event stored on the broker. Administrators can choose to keep the full history for a topic, or can con gure an appropriate deletion policy. With this system, it’s possible for different consumers to be processing different chunks of the sequence of events at any given time. Each event in a topic is assigned an offset to identify its position within the stream. This offset is unique to the event and never changes.
The ability to reliably replay events that have already been received, in the exact order in which they were received, provides a number of bene ts such as:
• Newly added consumers can catch up on everything that’s happened
• When the code for an application is updated, the application can process the full stream of events again, applying the new logic to each one
In Kafka, topics are further divided into partitions to support scale out. As each message is produced, the producer determines the correct partition for a message (depending on its topic and message key), and sends it to an appropriate broker for that partition. In this way, the processing and storage for a topic can be linearly scaled across many brokers. Similarly, an application may scale out by using many consumers for a given topic, with each pulling events from a discrete set of partitions.
A consumer receives events from a topic’s partition in the order that they were added by the provider, but the order is not guaranteed between different partitions. While this appears troublesome, it can be mitigated by controlling how events are assigned to partitions. By default, the mapping of events to partitions is random but a partitioning key can be de ned such that ‘related’ events are placed in the same partitions. In our nancial application example, the stock symbol could be used as the partitioning key so that all events for the same company are written to the same partition – if the application then has just one consumer pulling from that partition then you have a guarantee that all trades for that stock are processed in the correct order.
Multiple consumers can be combined to form a consumer group. Each consumer within the group processes a subset of events from a speci c topic – at any instant, messages from one partition can only be received by one consumer within the group. Different applications (e.g., fraud detection and product recommendation) can be represented by their own consumer groups.
High Availability can be implemented using multiple Kafka brokers for each topic partition. For each partition there is a single broker that acts as the leader in addition to one or more followers. The leader handles all reads and writes for the topic partition and the followers each replicate from the leader. Should the leader fail, one of the followers is automatically promoted to be the new leader. Typically, each broker acts as the leader for some partitions and as a follower for others. This replication approach prevents the loss of data when a broker fails and increases Kafka’s availability,
An event is considered committed when all replicas for that topic partition have applied it. A producer has the option of blocking until the write has been committed or continuing right away. Consumers only ever read committed events.
Each Kafka broker stores all of the data for its topic partitions on disk in order to provide persistence. Because the data is immutable and append-only, each broker is able to handle a large number of writes, and the cost of reading the most recent message remains constant as the volume of data stored grows. In spite of Kafka’s ability to work ef ciently with large data volumes, it is often desirable to remove old or obsolete data; Kafka users can choose between two different algorithms for managing space: retention policies and log compaction:
• Retention policy: You can choose, on a per-topic basis, to delete log les after they reach a certain age, or the number of bytes in the topic exceeds a certain size
• Log compaction: Log Compaction is an optional Kafka feature that reduces the storage size for keyed data (e.g., change logs for data stored in a database). Log compaction allows you to retain only the most recent message with a given key, and delete older messages with the same key. This can be useful for operations like database updates, where you only care about the most recent message.
Log Aggregation: This has traditionally involved the collection of physical log les from multiple servers so that they can be stored in a central location – for example, HDFS. Analyzing the data would then be performed by a periodic batch job. More modern architectures use Kafka to combine real-time log feeds from multiple sources so that the data can be constantly monitored and analyzed – reducing the interval between an event being logged and its consequences being understood and acted upon.
Website Activity Tracking: This was the original use case for Kafka. Site activity such as pages visited or adverts rendered are captured into Kafka topics – one topic per data type. Those topics can then be consumed by multiple functions such as monitoring, real-time analysis, or archiving for of ine analysis. Insights from the data are then stored in an operational database such as MongoDB where they can be analyzed alongside data from other sources.
Event Sourcing: Rather than maintaining and storing the latest application state, event sourcing relies on storing all of the changes to the state (e.g., [x=150, x++, x+=12, x-=2]) in the original order so that they can be replayed to recreate the nal state. This pattern is often used in nancial applications. Kafka is well suited to this design approach as it can store arbitrarily long sequences of events, and quickly and ef ciently provide them in the correct sequence to an application.
Microservices: Microservice architectures break up services into small, discrete functions (typically isolated within containers) which can only communicate with each other through well de ned, network-based APIs. Examples include eCommerce applications where the service behind the ‘Buy Now’ button must communicate with the inventory service. A typical application contains a large number of microservices and containers. Kafka provides the means for containers to pass messages to each other – multiple containers publishing and subscribing to the same topics so that each container has the data it needs. Since Kafka persists the sequences of messages, when a container is rescheduled, it is able to catch up on everything it has missed; when a new container is added (e.g., to scale out) it can bootstrap itself by requesting prior event data.
More information on Microservices can be found in the
Microservices: The Evolution of Building Modern Applications white paper as well as in Enabling Microservices: Containers & Orchestration Explained.
Stream Processing: stream processing involves ltering, manipulating, triggering actions, and deriving insights from the data stream as it passes through a series of functions. Kafka passes the event messages between the processing functions, merging and forking the data as required. Technologies such as Apache Storm, Samza, Spark Streaming, Apache Flink, and Kafka Streams are used to process events as they pass through, while interesting events and results are written to a database like MongoDB where they’re used for analysis and operational decisions. This is the pattern used for the Indian smart housing project described later – where MongoDB stores aggregated energy sensor data which is used for billing and energy performance benchmarking between properties.
Lambda Architecture: Applications following the Lambda Architecture (Figure 4) augment data produced by stream processing on recent events (the Speed Layer) with views built from batch processing jobs run on the full, historical data set (the Batch Layer).
The Lambda Architecture is coming under increasing scrutiny due to the operational complexity of managing two distributed systems which must implement the same logic. A more contemporary, scalable, and less complex solution is described below in Operationalizing the Data Lake with MongoDB.
Internet of Things (IoT): IoT applications must cope with massive numbers of events being generated by a multitude of devices. Kafka plays a vital role in providing the fan-in and real-time collection of all of that sensor data. A common use case is telematics, where diagnostics from a vehicle’s sensors must be received and processed back at base.
Once captured in Kafka topics, the data can be processed in multiple ways, including stream processing or Lambda architectures. It is also likely to be stored in an operational database such as MongoDB, where it can be combined with other stored data to perform real-time analytics and support operational applications such as triggering personalized offers.
Implementation Recommendations for Kafka
Great value can be derived from combining multiple sources of data – some combinations might be obvious when setting up the Kafka architecture, but others only come to mind later when an application developer or analyst sees everything that’s available. For this reason, it’s recommended to design on the side of openness and extensibility:
• Build a minimal number of Kafka clusters.
• Make your Kafka topics accessible to all.
• Use common data formats between topics to reduce barriers in parsing and combining multiple data sources.
Some data may still need to be kept in silos due to security concerns, but it’s worth asking the question about which topics can be made available to all internal users. Kafka 0.9 supports secure authentication and per-topic ACLs, which allow administrators to control access to individual topics.
JSON is a suitable common data format for standardization, and it’s also worth considering Apache Avro, which allows schemas to be enforced on any given topic. Avro also has the advantage that it’s more compact than JSON. It’s very simple to map between JSON and Avro, which helps when integrating with MongoDB.
Each topic should use a common schema, but it’s also important to recognize that schemas evolve over time; consider including a schema version identi er in each message.
Kafka is extremely good at ingesting and storing massive numbers of events in real-time; this makes it a great buffer to smooth out bursty traf c. An example could be the results produced at the end of a customer classi cation Hadoop batch job, which must then be propagated into personalization and marketing applications.
As in any distributed system, poor network performance is the enemy of low latency and high throughput. Where possible, keep processing local and replicate data between Kafka clusters in different data centers.
By default, the replication of messages from the leader to the followers is asynchronous, which can result in lost messages. In many applications, that is a worthwhile trade off (e.g., for log aggregation) as the performance bene ts outweigh the potential loss of a small number of events. When greater safety is required, a topic can be con gured to require 1 or more followers to acknowledge receipt of the event before it’s committed.
Writes are immutable – once an event has been written to a Kafka topic, it cannot be removed or changed, and at some point, it will be received and acted upon by all subscribed consumers. In many cases, it may be possible to undo the downstream effects of an event by creating a new, compensating event – e.g. if you want to undo an event to reduce a balance by $100 then a second event can be written that increases the same balance by $100. There are other scenarios where it’s harder to roll back the clock; consider a rogue temperature sensor event from a data center that triggers cutting power and a halon dump.
Kafka is designed for high performance writes and sequential reads but if random access is a requirement then the data should be be persisted and read from a database such as MongoDB.
Alternate Messaging Systems
The idea of moving data between systems is nothing new and frameworks have been available for decades. Two of the most popular frameworks are described here.
RabbitMQ: Implements the Advanced Message Queuing Protocol (AMQP) to act as a message broker. RabbitMQ focuses on the delivery of messages to consumers with complex routing and per-message delivery guarantees. Kafka is more focused on ingesting and persisting massive streams of updates and ensuring that they’re delivered to consumers in the correct sequence (for a given partition).
Apache ActiveMQ: A Java based message broker which include a Java Message Service (JMS) client. As with RabbitMQ, the advantage offered by Kafka is the ingesting and persisting of massive streams of data.
A number of technologies complement Kafka; some of the more notable ones are described here.
Kafka Connect: A standard framework to reliably stream data between Kafka and other systems. It makes it simple to implement connectors that move large sets of data into and out of Kafka. Kafka Connect can run in either standalone or distributed modes – providing scalability.
Apache Zookeeper: Provides distributed configuration, synchronization, and naming for large distributed systems. Kafka uses Zookeeper to share configuration information across a cluster.
Apache Storm: Provides batch, distributed processing of streaming data. Storm is often used to implement Stream Processing, where it can act as both a Kafka consumer of raw data and a producer of derived results.
Apache Heron: The successor to Apache Storm, produced by Twitter after their Storm deployment ran into issues when scaled to thousands of nodes. It is more resource ef cient and maintains API compatibility with Storm, while delivering an order of magnitude greater performance.
Apache Spark: Performs data analytics using a cluster of nodes. Spark is used for many batch workloads that would previously have been run with Apache Hadoop – Spark’s ability to store the data in memory rather than on disk results in much faster execution. More information can be found in Apache Spark and MongoDB – Turning Analytics into Real-Time Action.
Apache Spark Streaming: Adds the ability to perform streaming analytics, reusing the same application analytics code used in Spark batch jobs. As with Storm it can act as both a Kafka consumer of raw data and producer of derived results – it may be preferred if you already have Spark batch code that can be reused with Spark Streaming.
Kafka Streams: A new library included with Apache Kafka 0.10 that allows you to build a modern stream processing system with your existing Kafka Cluster. Kafka Streams makes it easy to join, lter, or aggregate data from streams using a high level API.
Apache Apex: Enables batch and streaming analytics on Hadoop – allowing the same application Java code to be used for both. Through Apache Apex Malhar, reference data can be pulled in from other sources and results can be pushed to those same data stores – including MongoDB. Malhar is also able to act as a Kafka consumer, allowing Apex to analyze streams of topic data.
Apache Flume: A distributed, highly available service for aggregating large volumes of log data, based on streaming data ows. Kafka is more general purpose and has a couple of advantages over Flume:
• Kafka consumers pull data and so cannot be overwhelmed by the data stream.
• Kafka includes replication so events are not lost if a single cluster node is lost.
Apache Flink: A framework for distributed big data analytics using a distributed data ow engine. Any data storage is external to Flink; e.g., in MongoDB or HDFS. Flink often consumes its incoming data from Kafka.
Apache Avro: A data serialization system, very often used for event messages in Kafka. The schema used is stored with the messages and so serialization and deserialization is straight-forward and efficient. Avro schemas are de ned in JSON, making it simple to use with languages, libraries, and databases such as MongoDB designed to work with JSON.