MQTT Example

Top  Previous  Next

/*
    **** MQTT Client Application Example ****
 
It connects to the internet via a Local Area Network (LAN), communicates with a public MQTT broker, 
and performs two main tasks: Listening for commands and reporting its status.
 
Here is a step-by-step breakdown of exactly what the program does:
1. Hardware Initialization
    Established device identity.
    Starts the LAN Interface: 
      At the very beginning, it calls netOpen(iface := 1). 
      This powers up the Ethernet/LAN hardware on the device so it's ready to communicate.
    Sets a Timer: It initializes a timer (pub_timer) to fire every 10 seconds.
 
2. Main Execution Loop
 
The program enters an infinite loop, meaning it runs forever until the device is powered off.
A. Network Monitoring
    Checks Connection: It constantly checks netConnected().
    If Disconnected: 
      It prints "Waiting for network..." and sleeps for 5 seconds to avoid spamming the logs. 
      It will not try to do anything else until the cable is plugged in and the network is up.
    If Connected: 
      It proceeds to the MQTT logic.
 
B. MQTT Connection Management
    Checks MQTT Status: It asks "Am I currently connected to the broker?" (mqttConnected).
    If Not Connected:
        It cleans up any old connection handles (mqttClose).
        It attempts to open a new connection to the broker.
        On Success: It immediately subscribes to the topic RTCU/<serialnumber>/Command. 
        On Failure: It waits 5 seconds before trying again.
 
C. Handling Incoming Messages (Subscribe)
    Waits for Events: It uses mqttWaitEvent with a timeout to check if the broker has sent any data.
    Reads Data: If an event is detected, it calls rxd (the mqttReceive block) to pull the data into a memory buffer (rx_buffer).
    Processes Payload:
        It converts the raw bytes in the buffer into a readable string (rx_string).
        It checks if that string equals "REBOOT".
        Action: If the message is "REBOOT", the device immediately resets itself (boardReset).
 
D. Sending Status Updates (Publish)
    Checks Timer: Every 10 seconds, the pub_timer expires.
    Prepares Message: It gets the current time (clockNow) and creates a string: "Alive. Time: <timestamp>".
    Converts to Raw Memory: Because MQTT sends raw bytes, not VPL strings, it copies this text into a memory buffer (tx_buffer).
    Sends Message: It publishes this buffer to the topic RTCU/<serialnumber>/Status. 
                   This acts as a "heartbeat," letting you know the device is online and functioning.
*/
INCLUDE rtcu.inc
 
// Broker Used:
#DEFINE  BROKER_IP        "test.mosquitto.org"
#DEFINE  BROKER_PORT      1883
 
 
PROGRAM MQTT_Example;
 
VAR
    //Connection:
    mqtt            : INT    := -1;            //invalid handle.
    rc              : INT;                     
    rxd             : mqttReceive;      
    
    //Identity:
    ser_num         : DINT;                    // Stores device serial number
    base_topic      : STRING;                  // Unique prefix for topics
    client_id       : STRING;
    
    // State Flags
    subscribed      : BOOL;                   // Tracks if we have subscribed
    
    // Buffers
    rx_buffer       : ARRAY[1..1024] OF SINT;  
    tx_buffer       : ARRAY[1..1024] OF SINT;  
    
    // Strings and Lengths
    rx_string       : STRING;                  
    pub_string      : STRING;
    payload_len     : INT;
    
    // Timing
    pub_timer       : TON;        
    linsec          : DINT;       
END_VAR;
 
// 1. Get Device Identity
ser_num := boardSerialNumber();
client_id := "RTCU_" + dintToStr(:= ser_num);
base_topic := "RTCU/" + dintToStr(:= ser_num);
 
DebugFmt(message:="MQTT Client Application Example");
DebugMsg(message:="Using: client_id=["+client_id+"] base_topic=["+base_topic+"]");
 
// 2. Initialize Network (LAN)
rc := netOpen(iface := _NET_IFACE_LAN1); 
IF rc <> 0 THEN
  DebugMsg(message := "LAN Open Failed: " + intToStr(:= rc));
ELSE
  DebugMsg(message := "LAN Interface Opened.");
END_IF;
 
// 3. Initialize the timer to 10000 ms = 10 seconds
pub_timer.pt := 10000;
 
BEGIN
   // 2. Network Check
   IF NOT netConnected() THEN
      DebugMsg(message := "Waiting for network...");
      subscribed := FALSE; // Reset state if cable pulled
      Sleep(delay := 5000); 
   ELSE
      
      // 3. Handle Management
      // If we don't have a valid handle, try to open one
      IF mqtt < 0 THEN
          DebugMsg(message := "Opening MQTT Connection...");
          mqtt := mqttOpen(
              iface    := _NET_IFACE_LAN1,
              ip       := BROKER_IP, 
              port     := BROKER_PORT, 
              clientid := client_id
          );
          
          // Reset subscription flag when we create a new handle
          subscribed := FALSE;
          
          // Allow a small delay for handshake to start
          Sleep(delay := 500); 
      END_IF;
 
      // 4. Connection Status & Application Logic
      // We only proceed if the handle is valid AND the connection is active
      IF mqtt >= 0 AND mqttConnected(handle := mqtt) THEN
          
          // A. Handle One-Time Subscription
          IF NOT subscribed THEN
              DebugMsg(message := "Connection Established! Subscribing...");
              
              // This device now only listens to: RTCU/<SerialNumber>/Command
              rc := mqttSubscribe(handle := mqtt, topic := base_topic + "/Command", qos := 1);             
              DebugFmt(message:="mqttSubscribe rc=\1",v1:=rc);
              subscribed := TRUE;
          END_IF;
 
          // B. Listen for Events (Receive)
          rc := mqttWaitEvent(timeout := 2);
          IF rc > 0 THEN
              rxd(data := ADDR(rx_buffer), maxsize := SIZEOF(rx_buffer));
              IF rxd.ready AND rxd.handle = mqtt THEN
                  rx_string := strFromMemory(src := ADDR(rx_buffer), len := rxd.size);
                  DebugMsg(message := "RX: " + rx_string);
                  IF rx_string = "REBOOT" THEN boardReset(); END_IF;
              END_IF;
          END_IF;
 
          // C. Publish Heartbeat to unique status topic
          pub_timer(trig := TRUE);
          IF pub_timer.THEN
              linsec := clockNow();
              pub_string := "Alive. Time: " + dintToStr(:= linsec);
              payload_len := strLen(str := pub_string);
              strToMemory(dst := ADDR(tx_buffer), str := pub_string, len := payload_len);
              rc := mqttPublish(
                 handle  := mqtt, 
                 topic   := base_topic + "/Status", 
                 data    := ADDR(tx_buffer),   
                 size    := payload_len,       
                 qos     := 0
              );
              DebugFmt(message:="Publish with linsec=\4,rc=\1",v4:=linsec,v1:=rc);
              
              //Restart publish timer:
              pub_timer(trig := FALSE);
          END_IF;
 
      ELSE
          // 5. Not Connected
          // We have a handle (mqtt > 0), but mqttConnected is FALSE.
          // It might be connecting, or the connection might have dropped.
          DebugMsg(message := "MQTT Not Connected...");
          
          // Important: If we lose connection, we mark as not subscribed
          subscribed := FALSE;
          
          // Optional: If this persists too long, you might want logic to 
          // close the handle (mqttClose) and set mqtt := -1 to restart the process.
          Sleep(delay := 1000);
      END_IF;
      
   END_IF; // End NetConnected
END;
END_PROGRAM;