Introduction
With the exponential growth of connected devices in today’s data-driven world, the ability to process billions of pieces of information at scale has become essential. MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol designed for resource-constrained devices and low-bandwidth networks, making it ideal for IoT applications. An MQTT broker like Coreflux and a scalable cloud platform like DigitalOcean can solve the challenges of processing and analyzing IoT data.
This tutorial will teach you how to connect an MQTT broker with a managed OpenSearch service on DigitalOcean. This seamless setup enables real-time data collection and storage, making monitoring, analyzing, and visualizing your IoT data more accessible.
Coreflux provides a Lightweight MQTT Broker for Efficient IoT Communication on DigitalOcean.
What is MQTT?
MQTT (Message Queuing Telemetry Transport) is a lightweight, publish-subscribe network protocol widely adopted in IoT ecosystems. Designed for constrained devices and low-bandwidth, high-latency, or unreliable networks, MQTT enables efficient, real-time messaging in bandwidth-constrained environments.
About Coreflux
Coreflux offers a lightweight MQTT broker to facilitate efficient, real-time communication between IoT devices and applications. Built for scalability and reliability, Coreflux is tailored for environments where low latency and high throughput are critical.
Coreflux provides the robust messaging backbone to ensure smooth data flow between devices, whether developing a small-scale IoT project or deploying a large-scale industrial monitoring system.
With Coreflux on DigitalOcean, you get:
Scalability: Easily handle growing amounts of data and devices without compromising performance.
Reliability: Ensure consistent and dependable messaging across all connected devices.
Efficiency: Optimize bandwidth usage in environments where network resources are limited.
The above screenshot is a real-life example of solar-power park monitoring on OpenSearch Dashboards
Before diving into the integration, make sure you have the following:
A DigitalOcean account. If you don’t have one, sign up for an account at DigitalOcean.
Coreflux Broker Setup: Coreflux Broker should be running and accessible. If it’s not set up yet, refer to the Coreflux documentation or check the initial steps in this guide.
MQTT Explorer: This tool interacts with the MQTT broker. You can download it from MQTT Explorer.
Python Environment: Ensure you have Python installed and the necessary libraries like
paho-mqtt
andOpenSearch-py
.
Python Script: You’ll need the Python script that bridges the Coreflux MQTT broker with your DigitalOcean OpenSearch instance. This script checks published MQTT messages, processes them, and stores them in OpenSearch.
If you are new to Python? Here’s a basic outline of what the Python script above does:
Connects to Coreflux: The script uses paho-mqtt to connect to your Coreflux MQTT broker.
- Connects to Coreflux: The script uses
paho-mqtt
to connect to your Coreflux MQTT broker. - Subscribes to Topics: It listens for messages on specific topics you define in the Python script.
- Processes and Indexes Data: The script parses published JSON messages and attempts to index them into OpenSearch using
OpenSearch-py
.
• Publishes Feedback: After processing, the script can publish feedback messages back to the MQTT broker alerting of errors or task completion.
You can watch the Coreflux Tutorial on how to start a Free Trial of the Online MQTT Broker quickly:
Or you can follow this step-by-step guide:
Create a Coreflux Account
- Go to the Coreflux website and sign up for a free account if you don’t already have one
- After signing up, verify your email address to activate your account
Start a Free Trial Broker
- Once logged in, navigate to MQTT Broker
- Click on Start Free Trial to create a new MQTT broker
- Choose a Data Center Region: Select a region geographically close to your IoT devices or the DigitalOcean data center where you plan to deploy OpenSearch for lower latency
- Confirm your choices to start the trial
Receive Broker Credentials
- Coreflux will send the credentials to access your broker (such as the broker URL, port, username, and password) to your registered email after creating your broker
- Keep these credentials safe, as you’ll need them to configure your IoT devices and the Python script later
Set Up MQTT Explorer
- Download and install MQTT Explorer if you haven’t already
- Open MQTT Explorer and configure it to connect to your Coreflux broker using the credentials you received:
- Broker Address: Enter the broker URL
- Port: Use the port provided (typically 8883 for SSL connections)
- Username/Password: Enter the credentials provided in your email
- Connect to the broker and try subscribing to a topic to ensure everything works
Test the Broker Connection
- Publish a test message using MQTT Explorer to one of the topics on your Coreflux broker
- Verify that the message is received and displayed correctly in the Explorer. This confirms that your broker is up and running
Now that your Coreflux MQTT broker is set up and tested, it’s time to connect it to a managed OpenSearch instance on DigitalOcean. Here’s how:
Log in to DigitalOcean:
Create a New Database:
- On the dashboard, click on Databases in the left-hand menu.
- Select Create Database Cluster.
- Choose OpenSearch from the list of available database types.
Configure Your OpenSearch Instance:
- Select a Data Center Region:
- Choose a region that’s geographically close to your IoT devices or Coreflux broker for lower latency.
- Choose Your Plan:
- You can start with a basic plan that is suitable for your current needs.
- You can always scale up later as your data grows.
- Select a Data Center Region:
Create the Cluster:
- Once configured, click Create Cluster
- Wait for DigitalOcean to provision your OpenSearch instance. This may take a few minutes.
Get Your Connection Details:
- Go to the Connection Details tab in your database cluster after creating the cluster
- Note down the following details:
- Host
- Port
- Username
- Password
- You’ll need these details to connect from your Python script.
Before you start indexing data from your Coreflux MQTT broker into OpenSearch, you need to define the mapping for your index. Mapping is the schema for your index, specifying the data types for each field in your documents. This step is crucial for ensuring the data is stored correctly and can be searched effectively.
Here’s how to create and map an index in your OpenSearch instance:
Access the OpenSearch Dashboard
Log in to the OpenSearch dashboard using the connection details you obtained when setting up the OpenSearch instance.Navigate to the “Index Management” section
Create a New Index
Click on Create Index to start the process. Enter a name for your index (e.g., machine_production)
Define the Mapping
Click on the Mappings tab during the index creation process. Here, you will define the fields that your data will have. For example:
{ "mappings": { "properties": { "timestamp": { "type": "date" }, "machine_id": { "type": "keyword" }, "temperature": { "type": "float" }, "status": { "type": "keyword" }, "error_code": { "type": "integer" } } }
}
In this example:
- timestamp is stored as a date type, useful for time-based searches.
- machine_id and status are stored as keyword types, which means they are not analyzed and are used for exact matches.
- temperature is stored as a float type to accommodate decimal values.
- error_code is stored as an integer type, suitable for numeric values without decimals.
After defining your mappings, review the settings and click on Create Index.
OpenSearch will now create the index with the mappings you specified. This index is now ready to store and organize the data that will be published from your Coreflux MQTT broker.
Test the Mapping
Use your Python script or the OpenSearch API to index a test document and ensure it matches the defined mapping.
Example of a test document:
{ "timestamp": "2024-08-23T10:30:00Z", "machine_id": "MACHINE123", "temperature": 75.5, "status": "operational", "error_code": 0
}
Insert this document into the machine-production
index (or the index you choose) and verify that all fields are correctly stored and searchable.
With Coreflux and OpenSearch set up, it’s time to link them together using a Python script. This script will connect to the Coreflux broker, process published messages, and store them in OpenSearch.
Set Up Your Environment Variables
In the directory where your Python script is located, create a .env
file.
Add the following environment variables, replacing the placeholder values with your actual credentials (note: if the MQTT url begins with MQTT://
, please remove that section. Since the code only requires the DNS.):
MQTT_BROKER=<your-coreflux-broker-url>
MQTT_PORT=1883
MQTT_USERNAME=<your-coreflux-username>
MQTT_PASSWORD=<your-coreflux-password> OPENSEARCH_HOST=<your-opensearch-host>
OPENSEARCH_USERNAME=<your-opensearch-username>
OPENSEARCH_PASSWORD=<your-opensearch-password>
Install Required Python Libraries
Ensure you have the necessary Python libraries installed. You can install them using pip
.
pip install paho-mqtt Opensearch-py python-dotenv
Write or Configure the Python Script
Use the Python script, which connects to the Coreflux MQTT broker, listens for published messages in the topic Machine/Produce, and indexes them into OpenSearch.
Make sure the script correctly references the environment variables you set up.
Run the Script
Execute the Python script. It should connect to the Coreflux broker, subscribe to the desired topics, and index published messages into your OpenSearch instance.
python mqttToOS.py
Monitor the output to ensure that messages are processed and stored correctly.
Test Data Flow
- Publish Sample Data: Use MQTT Explorer to publish sample datasets to your Coreflux broker. Experiment with different payload structures to see how they are processed and indexed in OpenSearch.
– Data Validation: Verify that the data in OpenSearch matches the payloads you published. Check for consistency and accuracy, ensuring your integration is working as expected.
– Real-Time Monitoring: Set up a real-time feed using MQTT Explorer to publish messages continuously. Watch how OpenSearch handles incoming data streams and explore how quickly you can retrieve and analyze the data.
Build Visualizations
– Create Dashboards: Use OpenSearch’s dashboarding tools to create dynamic dashboards that visualize your IoT data. You could track metrics like device uptime, sensor readings, or user interactions.
– Trend Analysis: Analyze trends over time by aggregating data in OpenSearch. Look for patterns, spikes, or anomalies in your data.
– Geo-Visualizations: If your data includes geographic information, create maps that display data points based on location. This is especially useful for IoT devices spread across different regions.
Optimize and Scale
Performance Tuning: Experiment with different broker and OpenSearch configurations to optimize performance. Adjust your Coreflux broker settings to improve efficiency, such as connection limits or message retention policies. You can also learn about more advanced configurations for DigitalOcean’s Droplet.
Load Testing: Simulate high traffic by publishing many messages simultaneously. Monitor how your Coreflux broker and OpenSearch instance handle the load and identify any bottlenecks or areas for improvement.
Scaling: DigitalOcean offers scaling, allowing you to increase the resources (CPU, RAM, or storage) of your Droplets as your data needs grow. You can also set up alerts to notify you when resource limits are approaching.
Integrating Coreflux MQTT Broker with DigitalOcean’s Managed OpenSearch service provides a powerful solution for real-time IoT data processing and analytics. Following this tutorial, you have set up a seamless data pipeline that allows you to collect, process, and visualize IoT data efficiently.
With Coreflux’s scalability and reliability and OpenSearch’s robust search and analytics capabilities, you can handle large volumes of data and gain valuable insights in real-time. Whether you are monitoring industrial systems, tracking environmental data, or managing smart home devices, this integration empowers you to make data-driven decisions quickly and effectively.
You can check here to learn how to start with OpenSearch on DigitalOcean.
Get a free Coreflux Online MQTT Broker trial or learn more with the Coreflux Docs and Tutorials.