← Return To Home

Understanding the consumer side of Azure Event Hubs (Checkpoint, InitialOffsetProvider, EventProcessorHost)

Tags:  
Azure Event Hubs

Azure Event Hubs are cloud-scale telemetry ingestion from websites, apps, and devices. Because of the tremendous event handling capacities, IoT architectures also consume the Azure Event Hubs. Thus, we talk about handling millions of events per second.

With the implementation of multiple partition architecture behind the scenes, Azure Event Hubs are highly scalable to receive events from hundreds of sources. As the capability to ingest events is a critical aspect of Azure Event Hubs, it also provides enough stuff for the consumers to read events efficiently with high scalability.

Consuming events from Azure Event Hubs

You can consume events from Azure Event hubs using one of the following techniques

  1. EventHubReceiveris used to receive events from a specific partition through a particular consumer group. But, do not provide much control on managing the receiving end of the EventHub
  2. EventProcessorHostprovides a more efficient way of receiving Azure Event Hubs events with Checkpointing, Partition lease management. It is thread-safe, provides a multi-process, safe running environment for event process implementations.

Refer to this article for basic guidance on Event Consumers

This blog post will focus on Checkpointing and setting up InitialOffsetProvider

Events arrive in Azure Event Hubs at the partition level. EventProcessorHost is the .Net client for EventHub used extensively to consume events.

The following question is widespread with the developers starting to use Azure Event Hubs

I have set the message retention day to 1 for my event hubs, but I anyway see messages older than 1 one day in my event hub when I retrieve them”

One important concept to understand is “Message Retention” days. Mostly, users tend to misunderstand this concept with “Time to Live (TTL)” in Queues, Topic or Subscription.

Message Retention

It ensures the events are available for a specific time, i.e. as a minimum guarantee that the events will be available for consumption. It does not mean that events get deleted after this time.

Time to Live (TTL)

It defines the lifetime of the messages, i.e. messages will be lost or removed from the storage after this specific time irrespective of whether the user had read the message or not.

Data Retention in Azure Event Hubs – Dan Rosanova

Even after the Message Retention gets elapsed in EventHub, the partition events do not get cleared. Programmatically clearing up is also not feasible. So there is a high likelihood that events from beyond the retention period are retrieved. User can control such situations by using checkpoints and managing offsets properly.

The above activity opens multiple scenarios in consuming events. Let us discuss a few with code examples. Initially, create a  new EventHub sb-test-ns01-eh01 with four partitions for this purpose.

Time to Live (TTL)

Configure the EventHub Shared Access Policies to Publish and Consume events to and from the EventHub, respectively.

azure event hub consumer group

Now create a new Storage Account sbtestns01eh01storage for maintaining checkpoint and partition lease details while consuming events from the EventHub.

create a new Storage Account

Two .Net client applications, one for publishing events and the other for consuming events, are developed.

Event Publisher Application

Refer to the following packages and their dependencies in the application:
Microsoft.ServiceBus
This application uses EventHubClient to publish events.

Event Publisher Application
Azure Event Hub Consumer group

What is Event Hub Consumer?

Event Hub Consumers or Event Consumers are any entities that read event data from an event hub. All Event Hubs consumers connect via the AMQP 1.0 session and events are delivered through the session as they become available.

What Is Consumer Group in Event Hub?

Consumer groups helps event hubs to enable the publish/subscribe mechanism and provides a view for an event hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets.

How Do I Create a Consumer Group in Event Hub?

Azure Event Hub can be created with a consumer group using Azure CLI. Below given code snippets can be used in Azure CLI to create a Consumer group in Event Hub

az eventhubs eventhub consumer-group create --eventhub-name
                                            --name
                                            --namespace-name
                                            --resource-group
                                            [--subscription]
                                            [--user-metadata]

Event Consumer Application

Refer to the following packages and their dependencies in the application:

Microsoft.ServiceBus

Microsoft.Azure.ServiceBus.EventProcessorHost

WindowsAzure.Storage

Event Consumer Application

The above EventProcessorHost uses EventHubConsumerGroup.DefaultGroupName to consume events.

Event Processor Implementation

Event Processor Implementation

Scenario 1: Clean Event Hub

In this scenario, the Event Publisher and Processor applications connect with the EventHub, and no events are processed as the partitions are clean, and no events are published.

Clean Event Hub Azure Event Hub consumer group

Once you start the publisher and the processor, you’ll notice the events getting published and processed.

Event published

Scenario 2: Event Hub with existing data

When an EventProcessorHost connect to an EventHub that already has events in the partitions, it starts processing all the older events retained in the EventHub.

Event Hub with existing data

Though no events are published, the EventProcessor started processing all old read events from the EventHub partitions. Also, note the offset are empty when the leases get allocated.

Azure Event Hub consumer group

Scenario 3: Consume any unread data

Mostly, it is unnecessary to process events already read. It is practical to process only unread events. Unread events can happen in two cases.

  1. New events that are yet to arrive
  2. When EventProcessor is idle due to some pause, not running due to error or intentional shutdown.

The EventHub may be receiving events if there are active publishers.

Consume any unread data

In such cases, the EventProcessor has to start processing events received after the previously processed events where Checkpointing plays a useful role.

The messages arrive at the EventProcessor in batches. It is preferable to set Checkpoint after processing each batch.

EventProcessor

The Checkpoint, along with lease details for each partition for the consumer group, will be stored in a container within the storage configured initially (this is the main reason why EventProcessor hosts expect you to supply Azure blob storage details) at EventProcessorHost. The name of the lease will be the event hub path if not specified. The files in the container are blob locations that store JSON text.

Eg: {"PartitionId":"1","Owner":"eecd42df-a253-49d1-bb04-e5f00c106cfc",

"Token":"6271aadb-801f-4ec7-a011-a008808a656c","Epoch":5,"Offset":"400","SequenceNumber":125}
Azure Event Hub consumer group

When the EventProcessor connects to the EventHub and the leases are allocated, EventProcessor will load details from these container files for the Checkpoint offset and start processing events based on that. Based on the Checkpoint, the offset gets initialized.

The following represents that the EventProcessorHost is awaiting new events after the Checkpoint offset and has not processed the old read events that are still present at the EventHub. 

EventProcessorHost

The following represents that the EventProcessorHost is processing the events that arrived after the Checkpoint offset while it is not running or idle and has not processed the old read events present at the EventHub. The new arriving events and the events that are not read can be differentiated based on timestamps.

processing events

Note:

If these container files are missing or deleted, the checkpoint data is lost, and the EventProcessor will process all the events available in the EventHub.

For high scalable consumer throughput, use multiple EventProcessorHosts.

Checkpointing can be in various patterns, like the end of batch processing or at regular intervals.

code

Scenario 4: Consume only new data

Here is an interesting scenario where you want to ignore all the old (existing events) and start consuming only the fresh ones. It is very common during development where you applied a new message type and wanted to ignore the previous ones; it’s also useful in a particular scenario where it doesn’t make sense to look into historical data.

When an EventProcessorHost initializes an EventProcessor, IntialOffsetProvider can be set in the EventProcessorOptions to start processing the event from the set offset.

Note:

The initial offset will only work once for each container lease. Once checkpointed, the lease will start overriding the initial-offset setting. It also means that the initial offset will be overridden by the Checkpoint offset even if the former is higher than the latter. So based on the required scenario, Checkpoint and initial offset are to be handled appropriately.

Based on this scenario and the available EventHub capabilities, some changes happen to the initialization and Checkpoint.

  1. Assign a lease nameat the EventProcessorHost initialization to create a new container whenever the EventProcessorHost gets initialized, and there will be no checkpoint data. Hold on; if there are no checkpoint data, the EventProcessor will process the old read event. Yes, but the following configurations will make sure only new events are processed.
  2. Consume only new data
  3. Set the InitialOffsetProvider to read events in all partitions from now, as mentioned below. 
  4. Read events

Both of the above changes will ensure the EventProcessorHost will not have a Checkpoint offset value to consider or override the InitialOffsetProvider value. The events are processed based on the initial offset option. 

processed events

Note:

The offset will not be assigned as there are no values in the storage container. But till process events that arrive only from the time the EventProcessor start listening

Every time the EventProcessorHost initialization happens, a new storage container gets created to store the partition lease management data. User can clear or delete the containers if required.

Event Hub consumer group

Though this blog post is a long one, I hope it helps to understand the EventHub Consumers’ behaviour and the efficient processing of events.

Azure Event hubs Documentation

Utilize the below docs link for better understanding the concept in Azure Event Hubs