Azure Stream Analytics Anomaly detection on IoT Edge

Introduction

A few months ago Microsoft announced a preview for the Anomaly Detection Feature for Azure Stream Analytics.
In the IoT cases where anomaly detection is required in order to reduce failures and damages, usually, it would be done in a way that in the first phase data is being collected, labeled and machine learning model would be trained to classify a new piece of information compared to the previous learnings.
Anomaly detection for Azure Stream Analytics works in a similar way, but the difference is that there is no pre-trained model.
Azure Stream analytics tries to learn from the incoming data and then creates a model that can determine if the incoming data is an anomaly.
The difference between both approaches is pricing and the reliability of the models.
This blog will cover only technical aspects of how to take advantage of this feature with IoT Edge and OPC Publisher and would not be focused on the reliability of such models in detail.

 Architecture

Picture 1: Architecture
 
In order to produce relevant information above is the scheme of the setup with Raspberry Pi and the temperature/humidity sensor.
RaspberryPi is running on Raspbian operating system and has dummy OPC Server installed that publishes temperature and humidity information.
Whether this can be done without OPC-UA Server and OPC-UA publisher? Yes, but this way is more interesting 🙂
Raspberry Pi with Raspbian OS has an application written in Python that reads humidity and temperature values from the sensor. Also, it runs a lightweight OPC server that publishes the values from the sensor.
IoT Edge contains three modules.
– OPC Publisher module,
– Azure Stream Analytics Module
– AnomalyDetectionHandling module.
IoT Edge has OPC-UA Publisher module that gets the data from the OPC-UA server and passes further.
After OPC-UA publisher passes the information, Edge Runtime makes sure that these messages reach stream analytics module.
Stream analytics module takes the data and produces the output only in cases when an anomaly has been detected.
Finally, AnomalyHandling module handles anomalies so that it writes them to the output. The final destination of anomalies would be IoT Hub.
Module for handling the anomaly detection is pretty simple, the default one, and could be used for custom logic to handle anomalies.
One example could be sending commands to stop the machine or to start a process that would normalize measured parameters in the production.

 Setup

1. Python application
As previously mentioned, this application reads the data from dht11 temperature and humidity sensor and runs a local OPC server which publishes the data.
FreeOPCUA Python library was used for the OPC server implementation. For reading the sensor values this library was used.
Full code is available on GitHub.
2. IoT Edge
Since custom OPC server for this project has some limitations and accent of the project is not on that end, OPC-UA publisher had to be slightly modified in order to make it work. In the ‘real world case’ this modification would not take place.
The modified version of OPC-UA publisher is here(and is only valid for the purpose of this project).
The modification contains removed following line of code:
SetPublishingMode = true // removed
With the assumption that Docker is installed on a local machine and IoT Hub is in place, for the purpose of testing the following steps are required in order to run the solution in IoT Edge simulator:
a) To create IoT Edge Device in the Azure portal
b) To create Container Registry and enable Admin user.
c) To run the following command:
docker login container_registry_login_server
The login server is available on the Overview page in the Azure Portal. Credentials are available in AccessKeys.
d) build OPC publisher docker image after navigating in the console to the folder where the project is saved:
docker build -t container_registry/opcpublisher .
e) publish OPC-UA publisher docker image to the previously created docker image container
docker push container_registry/opcpublisher
f) To create a new IoT Edge Project with C# module(AnomalyHandlingModule) in Visual Studio Code by using Command Palette
g) Build and push IoT Edge Solution, so that AnomalyHandlingModule is in the container registry

Configuring IoT Edge Deployment template file

In order to have OPC-UA publisher running on IoT Edge, it is necessary to modify the deployment.template.json file which is a part of the IoT Edge Solution.
This file will reference the docker image from container registry and specify required options that will be passed as parameters when running an OPC-UA publisher as a container on IoT Edge.
In the ‘modules’ section it is required to add the following lines:
 "opcpublisher": 
    {
        "version": "1.0",
        "type": "docker",
        "status": "running",
        "restartPolicy": "always",
        "settings": 
        {
            "image": "container_registry/opcpublisher:latest",
            "createOptions": 
            {
                "Hostname": "publisher",
                "Cmd": [
                    "--pf=/appdata/publishednodes.json",
                    "--ns=true", 
                    "--fd=true",
                    "--aa"
                ],
                "HostConfig": 
                {
                      "Binds": [
                      "C:/Test:/appdata"
                      ]
                }
            }
        }
    }
This configuration indicates that the OPC-UA publisher image can be found in the specified(previously deployed) container registry under the tag ‘latest’.
It also specifies that the configuration file for OPC-UA server is in /appdata folder which binds to C:/Test. This means that all files that are placed in C:/Test(Windows running on a host machine) will be available in the internal file system of the container by using /appdata folder location.
Following flags are setting how OPC-UA publisher would work in this environment:
– “–ns=true” – no shutdown, OPC-Publisher runs for the time being
– “–fd=true” – fetch display name, OPC-UA Publisher will fetch the names of the variables that are holding the values
– “–aa” – trust all certificates from OPC-UA server. This is required if running the OPC-UA publisher on IoT Edge

Stream analytics

Azure Stream Analytics for IoT Edge needs to be created in the Azure portal.
Picture 2: Create Azure Stream Analytics Job for IoT Edge
Picture 3: Create Azure Stream Analytics Job for IoT Edge
Before deploying to the IoT Edge, Stream analytics job for IoT Edge needs to have Azure storage account assigned
Picture 4: Assigning Azure Storage account to ASA job
The input of the ASA is Edge hub and output is Edge hub as well.
There are two outputs for Temperature and Humidity anomalies.
Picture 5: ASA job outputs
Following code represents a query for anomaly detection:
with    temperatureValue as (
                SELECT
                    event.DisplayName,
                    event.Value.Value,
                    event.Value.SourceTimestamp,
                    AnomalyDetection_SpikeAndDip(CAST(event.Value.Value AS float), 80, 120, 'spikes')
                OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
                 FROM input as event
                WHERE 
                    event.Value.Value is not null and event.DisplayName = 'Temperature'
                ),
            humidityValues as (
                    SELECT
                        event.DisplayName,
                        event.Value.Value,
                        event.Value.SourceTimestamp,
                        AnomalyDetection_SpikeAndDip(CAST(event.Value.Value AS float), 80, 120, 'spikes')
                    OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
                    FROM input as event
                    WHERE 
                        event.Value.Value is not null and event.DisplayName = 'Humidity'
                    )

    SELECT
        DisplayName,
        Value,
        SourceTimestamp,
        System.Timestamp as CurrentTime,
        Spikeanddipscores.IsAnomaly,
        Spikeanddipscores.Score
    INTO
        temperatureAnomalyDataSink
    FROM
        temperatureValue
    WHERE Spikeanddipscores.IsAnomaly = 1

    SELECT
        DisplayName,
        Value,
        SourceTimestamp,
        System.Timestamp as CurrentTime,
        Spikeanddipscores.IsAnomaly,
        Spikeanddipscores.Score
    INTO
        humidityAnomalyDataSink
    FROM
        humidityValues
    WHERE Spikeanddipscores.IsAnomaly = 1
The key part here are the following lines
     AnomalyDetection_SpikeAndDip(CAST(event.Value.Value AS float), 80, 120, 'spikes')
                    OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
This line of code indicates that ‘spike and dip’ is the function that is used for anomaly detection(there is also ‘ChangePoint’ function).
The first parameter of this function represents which value the function will track. The second parameter represents a confidence level, 80% in this case, and the third parameter represents how many events ASA job should consider for model training.
It is recommended to include only the necessary number of events for better performance.
Finally, the fourth parameter represents the mode. In this case, only spikes would be tracked. Other options are ‘dips’, to detect dips, and ‘spikesanddips’ in order to detect and spikes and dips.
The following part of the code prepares properties for the output in case that an anomaly has been detected.
 SELECT
        DisplayName,
        Value,
        SourceTimestamp,
        System.Timestamp as CurrentTime,
        Spikeanddipscores.IsAnomaly,
        Spikeanddipscores.Score
    INTO
        temperatureAnomalyDataSink
    FROM
        temperatureValue
    WHERE Spikeanddipscores.IsAnomaly = 1

    SELECT
        DisplayName,
        Value,
        SourceTimestamp,
        System.Timestamp as CurrentTime,
        Spikeanddipscores.IsAnomaly,
        Spikeanddipscores.Score
    INTO
        humidityAnomalyDataSink
    FROM
        humidityValues
    WHERE Spikeanddipscores.IsAnomaly = 1
‘Score’ in this case represents a value between 0 and 1. The smaller score the higher chance for the anomaly. ‘IsAnomaly’ flag returns 1 if it is an anomaly, otherwise, it returns 0.
Finally, the result of this query will send to the output all events that are recognized as anomalies.

Deploying to the Edge and incorporating with other modules

Deploying the Azure Stream analytics query to the IoT Edge can be done through Azure Portal.
1. It is required to set IoT Edge device modules through Azure Portal
Picture 6: Set ASA job as a module on IoT Edge
2. Clicking on ‘Add’ button and selecting Azure Stream Analytics Module returns the following form
Picture 7: Selecting previously created ASA job for IoT Edge
By confirming on ‘Save’, Azure Stream analytics job for IoT Edge will be packed as a zip file to a previously assigned storage account.
The remaining part is to ‘tell’ IoT Edge how to fetch this package and start a job.
Clicking on the Azure Stream analytics Anomaly detection module provides information on how to fetch this package and start a job on IoT Edge.
Picture 8: Getting required information for deployment.template.json file
This information needs to be copied and stored in deployment.template.json file below the information about the edge hub:
"$edgeHub": {
      "properties.desired": {
        "schemaVersion": "1.0",
        "routes": {
          "telemetryToAsa": "FROM /messages/modules/opcpublisher/*  INTO BrokeredEndpoint(\"/modules/EdgeAnomalyDetectionAnalyticsService/inputs/input\")",
          "ASAToAnomalyHandlingModule": "FROM /messages/modules/EdgeAnomalyDetectionAnalyticsService/outputs/* INTO BrokeredEndpoint(\"/modules/AnomalyHandlingModule/inputs/input1\")",
          "AnomalyHandlingModuleToIoTHub": "FROM /messages/modules/AnomalyHandlingModule/outputs/* INTO $upstream"
        },
        "storeAndForwardConfiguration": {
          "timeToLiveSecs": 7200
        }
      }
    },
    "EdgeAnomalyDetectionAnalyticsService": {
      "properties.desired": {
        "ASAJobInfo": "sas_to_stream_analytics_job_zip_file",
        "ASAJobResourceId": "_resource_information_.../streamingjobs/EdgeAnomalyDetectionAnalyticsService",
        "ASAJobEtag": "asa_job_tag",
        "PublishTimestamp": "3/29/2019 10:57:25 AM"
      }
    }
Modules section of the same file should contain the following(below OPC-UA publisher module):
"EdgeAnomalyDetectionAnalyticsService": {
            "type": "docker",
            "settings": {
                "image": "mcr.microsoft.com/azure-stream-analytics/azureiotedge:1.0.1",
                "createOptions": ""
            },
            "version": "1.0",
            "env": {
                "PlanId": {
                    "value": "stream-analytics-on-iot-edge"
                }
            },
            "status": "running",
            "restartPolicy": "always"
          }
With all this in place, IoT Edge Solution has information on how to route messages and which containers it should start and with which parameters.
Full IoT Edge solution with the full deployment.template.json file can be found on the GitHub.

 

Observations and conclusions

 
The solution can be started in IoT Edge simulator.
Picture 9: Starting a solution in IoT Edge simulator
Two minutes after starting the simulator the sensor was manually stimulated and first results showed up in the console.
Picture 10: First results in IoT Edge console
 
Picture 11: Anomaly detected and passed to IoT Hub
In this case, value 19 for the humidity is marked as an anomaly(normal was 18). Shortly after the sensor stimulation, humidity jumps to 25, which was expected to be detected as an anomaly.
By stopping the simulation and starting again after time range that is longer than 120s(as in the query), the first couple of events would be detected as an anomaly.
Picture 12: Events that are not anomalies, detected as anomalies
This brings the conclusion that this model is becoming more and more reliable after a certain period of time, which indeed makes sense, as Azure Stream analytics tries to learn from the incoming data. Obviously, this is one of the ways to get the cheaper version of anomaly detection on the edge. Increasing the number of events included for scoring might impact the performance of query execution, so for more reliable models, and for the use cases which require better precision and model trained on bigger data sets, it is recommended to take advantage of Machine learning technics.
On the other side, an advantage of having the Azure Stream Analytics anomaly detection on the edge could be that each of the machines could have its own model trained only on the data that is produced by that machine. Sometimes in the industrial cases, this is desired as each of the machines could be installed in different environments and have different external impacts.

 

References


1. Azure Stream Analytics on IoT Edge:
https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-edge
2. Azure Stream Analytics Anomaly Detection: https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-machine-learning-anomaly-detection
3. OPC-UA publisher: https://github.com/Azure/iot-edge-opc-publisher

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.