dev-resources.site
for different kinds of informations.
A simple MQTT application using Go
IoT devices promise to change the way we interact with the world around us. IoT enabled devices are in automobiles, agriculture, and many more industries. Retrieving data from all those devices need not require a herculean effort and it doesn't thanks to MQTT.
MQTT is the standard for IoT messaging. Development of it started in the late 90's and it has a formal specification that has been adopted by many. MQTT provides a simple and lightweight publish/subscribe messaging protocol making it easy to implement on devices and simple to integrate into large systems.
MQTT works through the use of a broker. A broker sits between the IoT devices and whatever applications make use of the device data. Both devices and the applications are considered 'clients' in the MQTT world.
If a device needs to send data to the world, it acts like an MQTT client, creates a connection to a broker, and publishes its data to a given topic. MQTT topics are how the data gets routed by the broker. When an application wants to receive data from a given device, it creates a connection to a broker then subscribes to the topic that the given device publishes.
To demonstrate, I'll show a simple go application for publishing to an MQTT broker. This simply simulates sending messages from an IoT device. (Note that this go application will run fine on an embedded device that supports Golang such as a Raspberry Pi)
To run this example, you will want to look at my previous post, A basic MQTT Docker deployment, which includes a dockerfile to run an MQTT broker.
package main
import (
"fmt"
"math/rand"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const (
broker = "tcp://localhost:1883"
clientID = "go-mqtt-client"
topic = "iot-messages"
)
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected to MQTT Broker")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connection lost: %v", err)
}
func main() {
opts := mqtt.NewClientOptions()
opts.AddBroker(broker)
opts.SetClientID(clientID)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
for {
message := generateRandomMessage()
token := client.Publish(topic, 0, false, message)
token.Wait()
fmt.Printf("Published message: %s\n", message)
time.Sleep(1 * time.Second)
}
}
func generateRandomMessage() string {
messages := []string{
"Hello, World!",
"Greetings from Go!",
"MQTT is awesome!",
"Random message incoming!",
"Go is fun!",
}
return messages[rand.Intn(len(messages))]
}
func init() {
rand.New(rand.NewSource(time.Now().UnixNano()))
}
Not much to that although we could make it a bit more robust to failures.
The application creates a new connection specifying the broker url and a client id that the broker will use to identify it.
Notice that publishing a message is pretty simple, just specify the topic, qos, retain flag and the message itself.
A message can be a string, byte array or a buffer. For qos, we set it to 0 which means 'at most once', ie delivery not guaranteed.
The retain flag tells the broker whether or not to store the last message to a given topic. If messages are 'retained', when a new client subscribes to a given topic, the most recent retained message is immediately sent to it. For this example, we are not using that feature.
Note that the source code for this publisher app can be found at the link at the bottom of this page.
Now let's take a look at the subscriber side.
package main
import (
"context"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"os"
"os/signal"
"sync"
"syscall"
)
const (
broker = "tcp://localhost:1883"
clientID = "go-mqtt-subscriber"
topic = "iot-messages"
)
var mqttMsgChan = make(chan mqtt.Message)
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
mqttMsgChan <- msg
}
func processMsg(ctx context.Context, input <-chan mqtt.Message) chan mqtt.Message {
out := make(chan mqtt.Message)
go func() {
defer close(out)
for {
select {
case msg, ok := <-input:
if !ok {
return
}
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
out <- msg
case <-ctx.Done():
return
}
}
}()
return out
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected to MQTT Broker")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connection lost: %v", err)
}
func main() {
opts := mqtt.NewClientOptions()
opts.AddBroker(broker)
opts.SetClientID(clientID)
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
finalChan := processMsg(ctx, mqttMsgChan)
for range finalChan {
// just consuming these for now
}
}()
// Subscribe to the topic
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s\n", topic)
// Wait for interrupt signal to gracefully shutdown the subscriber
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
<-sigChan
// Cancel the context to signal the goroutine to stop
cancel()
// Unsubscribe and disconnect
fmt.Println("Unsubscribing and disconnecting...")
client.Unsubscribe(topic)
client.Disconnect(250)
// Wait for the goroutine to finish
wg.Wait()
fmt.Println("Goroutine terminated, exiting...")
}
Similar to the publish side, the application sets up a client with the broker url and its client id. We set a default handler to deal with incoming messages; but it's possible to have a different for each topic that the application subscribes to.
You can see that this example includes some synchronization to deal with terminating the program.
There is also a pipeline to process messages. While not necessary for this example; it will come in handy when dealing with a lot of messages. I'm just printing them out in the first stage; but imagine a pipeline that performed filtering, routing, and persistence. More on that in a future post in this series.
That's it. Start up the MQTT broker from my previous post or use another. There is a free broker for testing that you can find here.
Start the publisher and subscriber in separate terminals. You should see the messages received in the subscriber console.
You can find the publisher code here and the subscriber code here
What do you think? How would you make the publisher more robust? Can the synchronization and pipeline be simplified? Let me know what you think in the comments below.
Featured ones: