Understanding the consumer side of Azure Event Hubs (Checkpoint, InitialOffsetProvider, EventProcessorHost)

|  Posted: July 9, 2018  |  Categories: Microsoft Azure

Azure Event Hubs are a cloud-scale telemetry ingestion from websites, apps, and devices. Because of the tremendous event handling capacities, they are also being used in IoT architecture. Thus, we talk about handling millions of events per second.

With the implementation of multiple partition architecture behind the scenes, Azure Event Hubs are highly scalable to receive events from hundreds of sources. As the capability of ingestion of events is one important aspect of Azure Event Hubs, it also provides enough capabilities for the consumers to read events efficiently with high scalability.

Consuming events from Azure Event Hubs

You can consume events from Azure Event hubs using one of the following techniques

1. EventHubReceiver is used to receive events from a specific partition through a specific consumer group. But, do not provide much control on managing the receiving end of the EventHub

2. EventProcessorHost provides a more efficient way of receiving events from Azure Event Hubs with Checkpointing, Partition lease management. It is thread-safe, provides a multi-process, safe running environment for event process implementations.

Refer this article for basic guidance on Event Consumers

This blog post will focus on Checkpointing and setting up InitialOffsetProvider

Events arrive Azure Event Hubs at the partition level. EventProcessorHost is the .Net client for EventHub that is used extensively to consume events.

The following question is very common with the developers starting to use Azure Event Hubs

I have set the message retention day to 1 for my event hubs but I am anyway seeing messages older than 1 one day in my event hub when I retrieve them”

One important concept to understand is “Message Retention” days. Mostly, it is misunderstood with “Time to Live (TTL)” in Queues, Topic or Subscription.

Free-Trial

Message Retention 

It is to ensure the events are available for the specific time i.e. like a minimum guarantee that the events will be available for consumption. It does not mandate that the events will be deleted after this time.

Time to Live (TTL) 

It is to define the lifetime of the messages i.e. the messages will be lost or remove from the storage after this specific time irrespective of whether the message is read or not.

Data Retention in Azure Event HubsDan Rosanova

Events in EventHub partitions are not cleared after the Message Retention is elapsed nor they can be cleared programmatically. So there is a high likelihood that events from beyond the retention period are retrieved. This can be controlled by checkpointing and managing offsets properly.

This opens multiple scenarios in consuming events. Let us discuss few with code examples. A new EventHub sb-test-ns01-eh01 with 4 partitions are created for this purpose.Event-Hub

Configure the EventHub Shared Access Policies to Publish and Consume events to and from the EventHub respectively.

Configure-Event-Hub

A new Storage Account sbtestns01eh01storage is created. This is used for maintaining checkpoint and partition lease details while consuming events from the EventHub.Storage-Account

Two .Net client applications, one for publishing events and the other for consuming events are developed.

Event Publisher Application

Refer the following packages and its dependencies in the application:
Microsoft.ServiceBus
This application uses EventHubClient to publish events.

Code

Event Consumer Application

Refer the following packages and its dependencies in the application:

Microsoft.ServiceBus

Microsoft.Azure.ServiceBus.EventProcessorHost

WindowsAzure.Storage

Code-2

The above EventProcessorHost uses EventHubConsumerGroup.DefaultGroupName to consume events.

Event Processor Implementation

Event-Processor-Implementation

Scenario 1: Clean Event Hub

The Event Publisher and Processor applications are connected to the EventHub and no events are processed as the partitions are clean and no events are yet published.Event-Hub-PartitionProcessing-Event

Once you start the publisher and the processor, you’ll notice the events start getting published and they are being processed.

Events-Published

Scenario 2: Event Hub with existing data

When an EventProcessorHost connect to an EventHub that already has events in the partitions, it starts processing all the older events that are still retained in the EventHub.Event-Hub-Data

Though no events are being published, the EventProcessor started processing all old read events from the EventHub partitions. Also, note the offset are empty when the leases are allocated.Processing-Odd-Event

Scenario 3: Consume any unread data

Mostly, it is unnecessary to process events that are read already. It is practical to process only unread events. Unread events can happen in two cases.

  1. New events that are yet to arrive
  2. When EventProcessor is idle due to some pause, not running due to error or intentional shutdown.

The EventHub may be receiving events if there are active publishers.

Active-Publisher

In such cases, the EventProcessor has to start processing events that are received after the events that are processed previously. This is where Checkpointing plays a useful role.

The messages arrive at the EventProcessor in batches. It is suggested to set Checkpoint after processing each batch.Code-3

The checkpoint along with lease details for each partition for the consumer group will be stored in a container within the storage configured initially (this is the main reason why EventProcessor hosts expect you to supply Azure blob storage details) at EventProcessorHost. The name of the lease will be eventhub path if not specified. The files in the container are blob locations that store a json text.

Eg: {“PartitionId”:”1″,”Owner”:”eecd42df-a253-49d1-bb04-e5f00c106cfc”,”Token”:”6271aadb-801f-4ec7-a011-a008808a656c”,”Epoch”:5,”Offset”:”400″,”SequenceNumber”:125}Microsoft-Azure

When the EventProcessor connects to the EventHub and the leases are allocated, EventProcessor will load details from these container files for the checkpoint offset and start processing events based on that. Based on the checkpoint, the offset is initialized.

The following represents that the EventProcessorHost is awaiting new events after the checkpoint offset and has not processed the old read events that are still present at the EventHub.Event-Processor-Host 

The following represents that the EventProcessorHost is processing the events arrived after the checkpoint offset while it is not running or idle and has not processed the old read events that are still present at the EventHub. The new arriving events and the events that are not read can be differentiated based on timestamps.Events-Arrival

Note:

If these container files are missing or deleted, the checkpoint data is lost and the EventProcessor will process all the events available in the EventHub.

For high scalable consumer throughput, multiple EventProcessorHosts can be used.

Checkpointing can also be done in various patterns, like end of batch processing or at regular intervals.Code-4

Scenario 4: Consume only new data

This is an interesting scenario where you simply want to ignore all the old (existing events) and start consuming only the fresh ones. This is very common during development where you applied a new message type and wanted to ignore the previous ones, it’s also useful in a certain scenario where it doesn’t make sense to look into historical data.

When an EventProcessorHost initializes an EventProcessor, IntialOffsetProvider can be set in the EventProcessorOptions to start processing event from the set offset.

Note:

The initial offset will only work once for each container lease. Once check-pointed the lease will start overriding initial-offset setting. This also means that initial offset will be overridden by the checkpoint offset even if the former is higher than the latter. So based on the required scenario, checkpoint, and initial offset are to be handled appropriately.

Based on this scenario and the available EventHub capabilities, some changes to the initialization and checkpoint are done.

a. Assign a lease name at the EventProcessorHost initialization, so that a new container is created whenever the EventProcessorHost is initialized and there will be no checkpoint data. Hold on, if there are no checkpoint data, the EventProcessor would process old read event. Yes, but the following configurations will make sure only new events are processed.

Code-5

b. Set the InitialOffsetProvider to read events in all partitions from now as mentioned below. 

Code-6

Both of the above changes will ensure the EventProcessorHost will not have a Checkpoint offset value to consider or override the InitialOffsetProvider value. So the events will always be processed based on the initial offset option. 

Final-Event-Processing

Note:

The offset will not be assigned as there are no values in the storage container. But till process events that arrive only from the time the EventProcessor start listening

Also, every time the EventProcessorHost is initialized a new storage container will be created to store the partition lease management data. The containers can be cleared or deleted as needed later.

Containers-Deletion

Though this blog post is a longer one, hope it helps to understand the behavior of the EventHub Consumers and efficient processing of events.

Serverless360 is a one platform to operate, manage and monitor Azure Serverless components. It provides efficient tooling that is not and likely to be not available in Azure Portal. Try Serverless360 free for 30 days!