.NET Core Service Fabric IoT Sample project

Since Microsoft recommends building new applications based on .NET Core, the topic of this blog is simple Service Fabric IoT example based on .NET Core.
The idea is similar and is based on the official Sevice Fabric IoT Example that is .NET Framework based.
This blog will point to some of the differences and will offer a full solution on GitHub.
Service fabric will contain stateful services for consuming messages from the IoT Hub partitions. One stateful service partition per one IoT Hub partition. That way the scaling is achieved. The state of the service will keep information about event hub queue offset, and epoch.
Epoch ensures that there is only one receiver per consumer group, with the following rules:
a) If there is no existing receiver on a consumer group then the user can create a receiver with any epoch value.
b) If there is a receiver with epoch value e1 and a new receiver is created with an epoch value e2 where e1 <= e2 then receiver with e1 will be disconnected automatically, receiver with e2 get created successfully.
c) If there is a receiver with epoch value e1 and a new receiver is created with an epoch value e2 where e1 > e2 then the creation of e2 with fail with the error “A receiver with epoch e1 already exists”
The offset represents the date which is used to read all messages that arrived to the IoT Hub after that date.

Differences

Official Service Fabric IoT Example(Iot.Ingestion.RouterService) is using NuGet packages with WindowsAzure prefix, which causes some incompatibilities in .NET Core based project.
That is why .NET Core IoT based applications prefer to use packages with Microsoft.Azure prefix.
Service Fabric IoT Sample, based on .NET Core, is using the following packages:
1. Microsoft.Azure.EventHubs
2. Microsoft.Azure.ServiceBus
These packages require slightly different implementation when it comes to reading messages from IoT Hub partitions.
Following code represents how the method for creating the event hub receiver should be implemented with .NET core related packages:
        /// <summary>
        /// Creates an EventHubReceiver from the given connection sting and partition key.
        /// The Reliable Dictionaries are used to create a receiver from wherever the service last left off,
        /// or from the current date/time if it's the first time the service is coming up.
        /// </summary>
        /// <param name="connectionString"></param>
        /// <param name="servicePartitionKey"></param>
        /// <param name="epochDictionary"></param>
        /// <param name="offsetDictionary"></param>
        /// <returns></returns>
        private async Task<PartitionReceiver> ConnectToIoTHubAsync(
            string consumerGroup,
            string connectionString,
            long servicePartitionKey,
            IReliableDictionary<string, long> epochDictionary,
            IReliableDictionary<string, string> offsetDictionary)
        {
            PartitionReceiver partitionReceiver = null;

            var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString);

            // Get an IoT Hub partition ID that corresponds to this partition's low key.
            // This assumes that this service has a partition count 'n' that is equal to the IoT Hub partition count and a partition range of 0..n-1.
            // For example, given an IoT Hub with 32 partitions, this service should be created with:
            // partition count = 32
            // partition range = 0..31
            string eventHubPartitionId = servicePartitionKey.ToString();

            using (ITransaction tx = this.StateManager.CreateTransaction())
            {
                ConditionalValue<string> offsetResult = await offsetDictionary.TryGetValueAsync(tx, "offset", LockMode.Default);
                ConditionalValue<long> epochResult = await epochDictionary.TryGetValueAsync(tx, "epoch", LockMode.Update);

                long newEpoch = epochResult.HasValue
                    ? epochResult.Value + 1
                    : 0;

                if (offsetResult.HasValue)
                {
                    // continue where the service left off before the last failover or restart.
                    ServiceEventSource.Current.ServiceMessage(
                        this.Context,
                        "Creating EventHub listener on partition {0} with offset {1}",
                        eventHubPartitionId,
                        offsetResult.Value);

                    partitionReceiver = eventHubClient.CreateEpochReceiver(consumerGroupName: consumerGroup,
                                                                           partitionId: eventHubPartitionId,
                                                                           eventPosition: EventPosition.FromOffset(offsetResult.Value),
                                                                           epoch: newEpoch);
                }
                else
                {
                    // first time this service is running so there is no offset value yet.
                    // start with the current time.
                    ServiceEventSource.Current.ServiceMessage(
                        this.Context,
                        "Creating EventHub listener on partition {0} with offset {1}",
                        eventHubPartitionId,
                        DateTime.UtcNow);

                    partitionReceiver = eventHubClient.CreateEpochReceiver(consumerGroupName: consumerGroup,
                                                                           partitionId: eventHubPartitionId,
                                                                           eventPosition: EventPosition.FromEnqueuedTime(DateTime.UtcNow),
                                                                           epoch: newEpoch);
                }

                // epoch is recorded each time the service fails over or restarts.
                await epochDictionary.SetAsync(tx, "epoch", newEpoch);
                await tx.CommitAsync();
            }

            return partitionReceiver;
        }
The code and other changes are available on GitHub.

References:

1. Explanation for EventHub epochs: https://blogs.msdn.microsoft.com/gyan/2014/09/02/event-hubs-receiver-epoch/
2. Service Fabric Sample IoT solution based on .NET Framework: https://github.com/Azure-Samples/service-fabric-dotnet-iot

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.