I was recently working with a customer that uses Apache Avro compression and discovered that there isn’t a great example out there that shows you how to send Avro compressed messages through the IoT pipeline. This post will hopefully fill that gap.
Compression is a very common need in IoT scenarios because most large scale IoT solutions include message aggregation and compression before sending the messages across the wire. IoT Hub itself “supports compression” because it doesn’t crack the message payload - it just treats the message as bytes across the wire. So, the cloud endpoint (IoT Hub) isn’t an issue. The issue arises when we need to use compression and then process those messages post-ingestion. Azure Stream Analytics, a common Azure IoT message ingestor, supports JSON, CSV and Avro. Gzip/Deflate are not supported, but the request has been made into the ASA team to support them. In the meantime, this post will demonstrate how to get Avro to work through the IoT pipeline and unblock those customers that want compression and are considering Avro. If you need to support Gzip/Deflate or any other compression mechanism, then you will need to process your IoT Hub messages via the backing Event Hub using Event Processor Host via Azure Functions, Service Fabric or any custom application.
The architecture for this walkthrough is as follows:
A Node.js script reads telemetry data from sensors, compresses that data with Avro and then sends to IoT Hub. An Azure Stream Analytics (ASA) job picks up those messages and forwards them to Azure Blob Storage as JSON objects.
You can follow along to build this entire example from scratch or you can click here, if you just want to jump to the Avro code.
You have three options for creating the Azure resources required for this example. I will use Azure CLI 2.0 in this post.
Follow the steps here to install Azure CLI 2.0.
We will use the iothub-explorer to monitor messages that are sent to IoT Hub.
Follow the steps here to install the iothub-explorer.
You could also use the Windows Form version called the Device Explorer.
We will use Azure Storage Explorer to view the outputted messages that are sent to Blob storage from the ASA job.
Follow the steps here to install the Azure Storage Explorer.
Follow the steps here to create an Azure IoT Hub using the Azure CLI 2.0.
Execute the following command to get the connection string for the IoT Hub you just created.
Copy that connection string to a safe place. You will need it later.
Execute the following command to create a new IoT Hub device.
Copy that device’s connection string to a safe place. You will need it later.
You will need a new IoT Hub consumer group when you configure ASA later. Let’s create it now with the following command:
Execute the following command to monitor events for the device you just created:
You will see the following output:
Keep that command open, we’ll come back to it in a minute.
We will now get the Node.js code that will compress messages with Avro and send them to IoT Hub.
Let’s get it running and then go through code.
Open up a command prompt, navigate to an empty folder and execute the following command. This will copy the code to a local folder.
Change to the directory that contains the code you just cloned and execute the following command to get the Node.js packages locally.
This script uses dotenv to allow you to set an IoT Hub connection string in a .env file.
Rename “.env.sample” to “.env” and paste your device connection string that you saved earlier. If you are using Windows, then name the file .env. (with the . at the end) and Windows will automatically change it to .env for you.
Execute the following to start sending messages to IoT Hub.
You will now see the following output:
Switch back to the command prompt with iothub-explorer open and you will now start to see events outputted:
Now that we have it running, let’s take a look at the script to see how it works.
You can see the full code on GitHub here. There are lots of comments inline that should help you see how it all works together.
With Avro, you can embed the payload schema into the Avro Container file. In this repo, we have a file called
schema.avsc, which looks like this:
In fact, ASA requires the schema to be embedded into the message payload.
This file Avro type is loaded via the parse method like this:
At the moment, ASA does not support ENUMs in your Avro schema. Convert the ENUM to string if you are having issues. I will update this post when that has been resolved.
The meat of the code is as follows:
- Instantiate a BlockEncoder with deflate codec
- Instantiate a WriteableStream and pipe BlockEncoder writes to it.
- Write to the BlockEncoder
- Call BlockEncoder.end();
- Send IoT Hub message in the ‘end’ event handler, which calls WriateableStream.toBuffer()
While this seems like a pretty straight forward script, it took me a while to get it all working. In the process, I collected the following Avro related resources:
- Avro Node.js Library: mtth/avsc
- Reading and Writing Avro Files From the Command Line
- Node.js Stream Docs
- Creating duplex streams with Node.js
- How to send compacted data to Azure Stream Analytics?
- Sending and consuming events in Avro format
- NodeJs client and sending AVRO contents to the eventhub #6
- NodeJs client and sending AVRO contents to the eventhub #118
- Rotate stream into google cloud storage #75
- BlockEncoder produces invalid avro format? #17
- avro tools
For this contrived exercise, we’re going to configure a Blob Storage account that ASA sends all messages to. Let’s create the account and then wire up ASA.
You will see the storage account meta data outputted to the screen.
Go to the Azure Portal and create a new Azure Steam Analytics job:
1. Go to the ASA job you just created and click on Inputs.
Something to be aware of, ASA does not allow you to test the input with a physical Avro container file, like it does with JSON/CSV. The only way to test Avro is to send the message to the ASA Input and view the logs if you have issues. The logs are not that helpful as all it says is the file is invalid. You can also use the avro tools to help you diagnose Avro issues.
2. Click Add and create a new Input
There are two bugs in the Azure Portal on this screen:
- You can’t select Avro using your mouse. You must use your keyboard arrows in the “Event Serialization Format” dropdown.
- You can’t use $Default consumer group. Use the consumer group you created earlier, I called mine avrocg.
3. Click on Outputs, click Add and create a new Input
- Select the Blob Storage Account you created earlier
- Select Create a new container and name it
4. Click on Query, then enter the following query into the query text box.
This query is intentionally simple because this post is about wiring everything up and not sophisticated ASA queries.
5. Click Save
Click ‘Overview’, then click Start
Once the ASA job is started, you will see the following monitoring visual in the Azure Portal.
If you have any issues, please refer to the Activity log tab to see if any messages are being logged.
We’ll now use the Azure Storage Explorer to inspect the JSON messages that have been stored in Blob Storage.
1. Open Azure Storage Explorer, sign-in and navigate to the Blob Storage account you created earlier. Expand the Blob Containers node. Click on ‘avros’ and you will see a JSON file in the right pane.
2. Double-click on that file to download it. Open it and you will see the JSON messages.
We now have all of the services setup and running that enable Avro compression in an Azure IoT solution.
1. Avro Node.js Script is sending messages to IoT Hub
2. IoT Hub is receiving messages.
3. Azure Stream Analytics is picking up messages from IoT Hub and saving them as JSON to Blob Storage
Hope this helps you out.