Deep dive into Azure IoT Hub notifications and Device Twin
Azure IoT Hub notifications give detailed insight into operations happening in your IoT solution such as devices being registered, deregistered or reporting data. Combined with Device Twins, they offer a very powerful tool to control and monitor your IoT solution. Here is how you can replicate devices Twins properties to an external store leveraging Azure IoT Hub notifications.
Storing and managing devices data in Azure IoT
A key feature of Azure IoT Hub is the ability to execute SQL-based queries on data published from devices. Such data is persisted to an IoT Hub managed store allowing IoT solutions to simply query device data without having to provision their own store, define a data model, keep the store synchronized and handle the other challenges of building and maintaining a custom store.
That said, some IoT solutions require device data to be kept in an external store. Reasons may include the need to query device data in a manner not supported by the IoT Hub data store (i.e. graph queries, text search, etc.), the need to join the device data with a broader dataset or the need to control where the data is replicated.
To illustrate this, we created a sample that demonstrates the use of IoT Hub device lifecycle and twin change notifications to replicate device identities and twin properties to an external store – specifically an Azure Cosmos Graph Database. The sample maintains a graph of buildings, rooms, floors and thermostats. Thermostat vertices are dynamically added to the graph as new thermostat devices are provisioned and are updated as the thermostats report their room's temperature. The following diagram shows the architecture of the solution with the projects of the sample (in blue), their dependent Azure resources (in orange) and the overall data flow:
The samples are implemented in the following projects:
- ThermostatAdmin.csproj – an admin tool that provisions new thermostats
- ThermostatDevice.csproj – the thermostat devices that connect to the IoT Hub and report their room’s current temperature
- SyncGraphDbApp.csproj – leverages Azure IoT Hub notifications to replicate thermostat data to the broader graph
Device Twin & Notification Primer
Device Twins are used to synchronize state between an IoT solution’s cloud service and its devices. Each device’s twin exposes a set of desired properties and reported properties. The cloud service populates the desired properties with values it wishes to send to the device. When a device connects it requests and/or subscribes for its desired properties and acts on them. Likewise, a device populates properties with values it wishes to send to the cloud service via its twin’s reported properties. The cloud service can retrieve any of a device’s reported or desired properties via point lookups or, as mentioned in the overview, a query across a set of devices based on its properties. Alternatively a cloud service can be notified of device lifecycle events and twin property change events allowing the service to react as new devices are added, existing devices are removed or as twin properties change.
Let’s go through some Device Twin basics used in the sample. First, each new thermostat needs to be registered with its IoT Hub as a device. This allows the thermostat to securely connect to its hub and allows the cloud service to reference the device via a user-defined device ID. Here's a snippet from ThermostatAdmin:
async Task AddThermostatAsync(RegistryManager registryManager, string deviceId) { var device = new Device(deviceId); Console.WriteLine($"Add thermostat '{deviceId}' ..."); await registryManager.AddDeviceAsync(device); Console.WriteLine("Thermostat added"); Twin thermostat = await registryManager.GetTwinAsync(deviceId); PrintTwin(thermostat); }
Once the thermostat is registered a connection string can be composed that allows the physical thermostat to connect to the hub. IoT Hub currently supports authentication via symmetric keys and X.509 certificates. ThermostatDevice uses the former as follows:
// create a device client to emulate therostat sending temperature update Console.WriteLine("Create device client and connect to IoT Hub ..."); Service.Device device = await registryManager.GetDeviceAsync(deviceId); if (device == null) { Console.WriteLine($"Thermostat {deviceId} not registered. Please register the thermostat first."); return; } var authMethod = new DeviceAuthenticationWithRegistrySymmetricKey(deviceId, device.Authentication.SymmetricKey.PrimaryKey); var connectionStringBuilder = Device.IotHubConnectionStringBuilder.Create(iotHubConnectionStringBuilder.HostName, authMethod); DeviceClient deviceClient = DeviceClient.CreateFromConnectionString(connectionStringBuilder.ToString(), Device.TransportType.Mqtt); await deviceClient.OpenAsync(); Console.WriteLine("Thermostat connected");
The code snippet above gets the previously registered device from the IoT Hub to obtain the device’s symmetric key used to generate authentication tokens. Typically the device will not have permissions to get its symmetric key in this manner but is done so for the sake of the sample. The sample then creates an instance of DeviceAuthenticationWithRegistrySymmetricKey object, which represents the authentication method used when communicating with the hub, passing the device’s symmetric key and ID. The IotHubConnectionStringBuilder helper class is then used to generate the device’s connection string using the DeviceAuthenticationWithRegistrySymmetricKey object and the URI to the IoT Hub where the device is registered. DeviceClient.CreateFromConnectionString is then used to create a DeviceClient with the newly formed connection string. OpenAsync opens a link to the hub.
Now that the thermostat is connected to the hub it can report its room’s current temperature. The following code snippet from ThermostatDevice reports the current temperature via twin reported properties:
var props = new TwinCollection(); props["temperature"] = temperature; Console.WriteLine(); Console.WriteLine($"Update reported properties:"); Console.WriteLine(props.ToJson(Newtonsoft.Json.Formatting.Indented)); await deviceClient.UpdateReportedPropertiesAsync(props); Console.WriteLine("Temperature updated");
The TwinCollection class specifies a set of properties to report. DeviceClient.UpdateReportedPropertiesAsync sends the set of properties to the IoT Hub where they are persisted in the hub's default store. The cloud service can then retrieve the properties for a specific device by its device ID or via queries as discussed earlier.
Device Twin Notifications are implemented using another IoT Hub feature called routes. Routes allow device messages and notifications from various IoT Hub sources to be forwarded to user specified endpoints based on a filter. For instance whenever a device is added or removed from an IoT Hub a routable notification is raised. Whenever a twin property is updated a routable notification is raised. In this sample, both of these notification types are routed to an preconfigured Azure Event Hub endpoint. SyncGraphDbApp listens on an Event Hub for these notifications and synchronizes the graph store. Routes can be configured via the Azure Portal or via ARM templates.
The full solution
To experience the end-to-end sample, you can follow the step-by-step instructions on GitHub.
In the sample, the SyncGraphDbApp application simulates a cloud service that leverages Azure IoT Hub notifications to replicate thermostat data to a Cosmos DB graph. SyncGraphDbApp uses device lifecycle notifications to create Thermostat vertices in the Cosmos DB Graph for each newly provisioned thermostat whenever a new Thermostat is registered via ThermostatAdmin. As ThermostatDevice publishes the thermostat's current temperature SyncGraphDbApp updates the Temperature field of the Thermostat vertex.
SyncGraphDbApp handles the processing of device lifecycle and twin change notifications. Internally its TwinChangesEventProcessor class reads notifications routed to a SyncGraphDBEventHub and updates the graph accordingly. The processor is implemented as a typical Azure Event Hub event processor meaning it has a corresponding factory class (TwinChangesEventProcessorFactory), implements IEventProcessor and is triggered by a EventProcessorHost. For more information on Azure Event Hub event processors see Event Hubs programming guide.
TwinChangesEventProcessor.ProcessEventAsync is called when a new batch of notifications arrive. ProcessEventAsync calls SyncDataAsync to process the batch and updates the event hub checkpoint based on the last successfully processed notification. The checkpoint allows SyncGraphDbApp to restart and resume processing without dropping any notifications.
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) { List<EventData> messagesList = messages.ToList(); int lastSuccessfulIndex = -1; while (messagesList.Count > 0) { lastSuccessfulIndex = await this.SyncDataAsync(context, messagesList); await context.CheckpointAsync(messagesList[lastSuccessfulIndex]); // remove all succeeded messages from the list for (int i = 0; i < lastSuccessfulIndex + 1; i++) { messagesList.RemoveAt(0); } } }
Each notification is processed based on its message source and operation type. These are pulled from each notification’s eventData. Properties header along with source hub name and device ID. The following table describes the meaning of each message source and operation type combination:
Message Source |
Operation Type |
Description |
deviceLifeCycleEvents | createDeviceIdentity | A new device was registered in the hub |
deviceLifeCycleEvents | deleteDeviceIdentity | An existing device was unregistered |
twinChangeEvents | updateTwin | Contains a changeset to be applied to an existing twin in the graph (e.g. PATCH) |
twinChangeEvents | replaceTwin | Contains a full twin to replace existing twin in graph (e.g. PUT) |
SyncGraphDbApp internally implements SyncCommand subclasses to process each message source and operation type combination. To start CreateDeviceIdentitySyncCommand adds a new vertex to the graph that represents a new thermostat and associates the thermostat with a Room vertex based on its location via a new edge. The sample only allows one thermostat to be associated with a room at a time. Also note that notifications can have duplicates. As a result conflict errors must be handled.
try { Console.WriteLine("Add new thermostat vertex ..."); vTwin = await this.AddVertexAsync("thermostat", graphTwinId, null); } catch (DocumentClientException ex) when (ex.Error.Code == "Conflict") { Console.WriteLine($"Thermostat vertex {graphTwinId} already exists in the graph."); return; } // replace location Location? location = this.ParseTaggedLocation(this.jTwin); if (location != null) { await this.ReplaceLocationAsync(vTwin, location.Value); }
DeleteDeviceIdentityCommand removes the existing vertex for the specified thermostat and the edge associating the thermostat with a room:
string graphTwinId = MapGraphTwinId(this.hubName, this.twinId); Console.WriteLine($"Try remove twin {graphTwinId} from graph ..."); await this.ExecuteVertexCommandAsync($"g.V('{graphTwinId}').drop()");
UpdateTwinSyncCommand performs two steps: 1) updates the temperature field of the thermostat's vertex and 2) associates a Thermostat vertex with a new Room vertex if its location changed:
string graphTwinId = MapGraphTwinId(this.hubName, this.twinId); Console.WriteLine("Get thermostat vertex ..."); Vertex vTwin = await this.GetVertexByIdAsync(graphTwinId); if (vTwin == null) { Console.WriteLine("Vertex does not exist. Execute Add command ..."); await this.AddTwinAsync(this.hubName, this.twinId, this.jTwin); return; } Dictionary<string, string> properties = null; string reportedTemperature = this.ParseReportedTemperature(this.jTwin); if (!string.IsNullOrWhiteSpace(reportedTemperature)) { properties = new Dictionary<string, string> { {"temperature", reportedTemperature } }; Console.WriteLine("Update vertex temperature property ..."); vTwin = await this.UpdateVertexAsync(graphTwinId, properties); } Location? location = this.ParseTaggedLocation(this.jTwin); if (location != null) { await this.UpdateLocationAsync(vTwin, location.Value); }
ReplaceTwinSyncCommand has the same implementation as UpdateTwinSyncCommand since changesets and full replacements are processed in the same manner for this scenario.
Tips & Tricks
- SyncGraphDbApp is a single instance console application that demonstrates how to consume IoT Hub notifications and update an external store. In order to make the solution scale it needs to be hosted in an Azure Worker Role which scales to multiple instances and listens on a partitioned Event Hub. IoT Hub notifications use the notification's device ID as the Event Hub partition key. As a result, device lifecycle and twin change notifications are routed to a partition based on the notification's device ID. For more information please see documentation.
- When processing twinChangeEvents notifications, both replaceTwin and updateTwin opTypes must be processed to ensure the latest changes are synced.
- In the sample the IoT Hub routes are created when the IoT Hub is created. As a result, the target Event Hub contains all the device lifecycle and twin change events from the start of the hub's lifetime. This ensures SyncGraphDBApp receives all the events needed to completely sync the graph DB. However, if the SyncGraphDBApp needs to sync a hub that was previous created or if SyncGraphDBApp became unavailable for a period of time longer than the Event Hub's retention policy, SyncGraphDBApp would need a way to catch up. Such a catch-up procedure would work as follows:
- Create IoT Hub's routes as explained previously.
- Query IoT Hub for all twins and update Graph DB with results. SyncGraphDBApp's Program.RunSyncSampleAsync shows a simplistic implementation of this step.
- Start processing notifications and only commit twin change notifications whose version is greater than the version in the Graph DB.
- The routes created in this sample simply specify a source and a target for the notifications. IoT Hub routes also support a powerful filtering mechanism called a condition. For instance, if a thermostat reports a temperature over a specific threshold, the notification can be sent to another target that handles this condition differently (i.e. converts notification to a high priority email). The following route configuration includes a condition such that the notification is only forwarded to the specified target endpoint if the temperature reported property is over 100:
{ "name": "TemperatureExceedsThreshold", "source": "TwinChangeEvents", "condition": "$body.properties.reported.Temperature.NewValue > 100", "endpointNames": [ "TemperatureExceedsThresholdNotifications" ], "isEnabled": true },
Let us know what you think
Once you have gone through the sample, let us know if you have feedback or suggestions, and do not hesitate to send us contributions directly on GitHub as well.
References
- Understanding Device Twins
- IoT Hub query language for device twins, jobs, and message routing
- Event Hubs programming guide
Source: Azure Blog Feed