|
/*
**** MQTT Client Application Example ****
It connects to the internet via a Local Area Network (LAN), communicates with a public MQTT broker,
and performs two main tasks: Listening for commands and reporting its status.
Here is a step-by-step breakdown of exactly what the program does:
1. Hardware Initialization
Established device identity.
Starts the LAN Interface:
At the very beginning, it calls netOpen(iface := 1).
This powers up the Ethernet/LAN hardware on the device so it's ready to communicate.
Sets a Timer: It initializes a timer (pub_timer) to fire every 10 seconds.
2. Main Execution Loop
The program enters an infinite loop, meaning it runs forever until the device is powered off.
A. Network Monitoring
Checks Connection: It constantly checks netConnected().
If Disconnected:
It prints "Waiting for network..." and sleeps for 5 seconds to avoid spamming the logs.
It will not try to do anything else until the cable is plugged in and the network is up.
If Connected:
It proceeds to the MQTT logic.
B. MQTT Connection Management
Checks MQTT Status: It asks "Am I currently connected to the broker?" (mqttConnected).
If Not Connected:
It cleans up any old connection handles (mqttClose).
It attempts to open a new connection to the broker.
On Success: It immediately subscribes to the topic RTCU/<serialnumber>/Command.
On Failure: It waits 5 seconds before trying again.
C. Handling Incoming Messages (Subscribe)
Waits for Events: It uses mqttWaitEvent with a timeout to check if the broker has sent any data.
Reads Data: If an event is detected, it calls rxd (the mqttReceive block) to pull the data into a memory buffer (rx_buffer).
Processes Payload:
It converts the raw bytes in the buffer into a readable string (rx_string).
It checks if that string equals "REBOOT".
Action: If the message is "REBOOT", the device immediately resets itself (boardReset).
D. Sending Status Updates (Publish)
Checks Timer: Every 10 seconds, the pub_timer expires.
Prepares Message: It gets the current time (clockNow) and creates a string: "Alive. Time: <timestamp>".
Converts to Raw Memory: Because MQTT sends raw bytes, not VPL strings, it copies this text into a memory buffer (tx_buffer).
Sends Message: It publishes this buffer to the topic RTCU/<serialnumber>/Status.
This acts as a "heartbeat," letting you know the device is online and functioning.
*/
INCLUDE rtcu.inc
// Broker Used:
#DEFINE BROKER_IP "test.mosquitto.org"
#DEFINE BROKER_PORT 1883
PROGRAM MQTT_Example;
VAR
//Connection:
mqtt : INT := -1; //invalid handle.
rc : INT;
rxd : mqttReceive;
//Identity:
ser_num : DINT; // Stores device serial number
base_topic : STRING; // Unique prefix for topics
client_id : STRING;
// State Flags
subscribed : BOOL; // Tracks if we have subscribed
// Buffers
rx_buffer : ARRAY[1..1024] OF SINT;
tx_buffer : ARRAY[1..1024] OF SINT;
// Strings and Lengths
rx_string : STRING;
pub_string : STRING;
payload_len : INT;
// Timing
pub_timer : TON;
linsec : DINT;
END_VAR;
// 1. Get Device Identity
ser_num := boardSerialNumber();
client_id := "RTCU_" + dintToStr(v := ser_num);
base_topic := "RTCU/" + dintToStr(v := ser_num);
DebugFmt(message:="MQTT Client Application Example");
DebugMsg(message:="Using: client_id=["+client_id+"] base_topic=["+base_topic+"]");
// 2. Initialize Network (LAN)
rc := netOpen(iface := _NET_IFACE_LAN1);
IF rc <> 0 THEN
DebugMsg(message := "LAN Open Failed: " + intToStr(v := rc));
ELSE
DebugMsg(message := "LAN Interface Opened.");
END_IF;
// 3. Initialize the timer to 10000 ms = 10 seconds
pub_timer.pt := 10000;
BEGIN
// 2. Network Check
IF NOT netConnected() THEN
DebugMsg(message := "Waiting for network...");
subscribed := FALSE; // Reset state if cable pulled
Sleep(delay := 5000);
ELSE
// 3. Handle Management
// If we don't have a valid handle, try to open one
IF mqtt < 0 THEN
DebugMsg(message := "Opening MQTT Connection...");
mqtt := mqttOpen(
iface := _NET_IFACE_LAN1,
ip := BROKER_IP,
port := BROKER_PORT,
clientid := client_id
);
// Reset subscription flag when we create a new handle
subscribed := FALSE;
// Allow a small delay for handshake to start
Sleep(delay := 500);
END_IF;
// 4. Connection Status & Application Logic
// We only proceed if the handle is valid AND the connection is active
IF mqtt >= 0 AND mqttConnected(handle := mqtt) THEN
// A. Handle One-Time Subscription
IF NOT subscribed THEN
DebugMsg(message := "Connection Established! Subscribing...");
// This device now only listens to: RTCU/<SerialNumber>/Command
rc := mqttSubscribe(handle := mqtt, topic := base_topic + "/Command", qos := 1);
DebugFmt(message:="mqttSubscribe rc=\1",v1:=rc);
subscribed := TRUE;
END_IF;
// B. Listen for Events (Receive)
rc := mqttWaitEvent(timeout := 2);
IF rc > 0 THEN
rxd(data := ADDR(rx_buffer), maxsize := SIZEOF(rx_buffer));
IF rxd.ready AND rxd.handle = mqtt THEN
rx_string := strFromMemory(src := ADDR(rx_buffer), len := rxd.size);
DebugMsg(message := "RX: " + rx_string);
IF rx_string = "REBOOT" THEN boardReset(); END_IF;
END_IF;
END_IF;
// C. Publish Heartbeat to unique status topic
pub_timer(trig := TRUE);
IF pub_timer.q THEN
linsec := clockNow();
pub_string := "Alive. Time: " + dintToStr(v := linsec);
payload_len := strLen(str := pub_string);
strToMemory(dst := ADDR(tx_buffer), str := pub_string, len := payload_len);
rc := mqttPublish(
handle := mqtt,
topic := base_topic + "/Status",
data := ADDR(tx_buffer),
size := payload_len,
qos := 0
);
DebugFmt(message:="Publish with linsec=\4,rc=\1",v4:=linsec,v1:=rc);
//Restart publish timer:
pub_timer(trig := FALSE);
END_IF;
ELSE
// 5. Not Connected
// We have a handle (mqtt > 0), but mqttConnected is FALSE.
// It might be connecting, or the connection might have dropped.
DebugMsg(message := "MQTT Not Connected...");
// Important: If we lose connection, we mark as not subscribed
subscribed := FALSE;
// Optional: If this persists too long, you might want logic to
// close the handle (mqttClose) and set mqtt := -1 to restart the process.
Sleep(delay := 1000);
END_IF;
END_IF; // End NetConnected
END;
END_PROGRAM;
|