MQTT on 54L15

Hi all,

Has anyone implemented an MQTT client on an 54L15 before? I have been searching the web and I have not seen a sample program or anyone talking about MQTT with 54L15 only. I would appreciate any sample programs, it doesn't need to be complicated, I just want to verify that we can indeed implement the callback functions and its functionalities on 54L15. Thanks.

  • Here is the code that I have attempted to create as a simple sample program, but I receive connect error: -12. 

    Here is my main.c:

    #include <zephyr/kernel.h>
    #include <zephyr/net/mqtt.h>
    #include <zephyr/net/socket.h>
    #include <stdio.h>
    
    #define MQTT_BROKER_HOSTNAME "192.168.3.153"
    #define MQTT_BROKER_PORT 1883
    #define MQTT_CLIENT_ID "nrf54l15_subscriber"
    #define MQTT_TOPIC_SUBSCRIBE "test/54L15"
    
    static struct mqtt_client client;
    static struct sockaddr_storage broker;
    
    /* MQTT event handler */
    void mqtt_evt_handler(struct mqtt_client *const client,
                         const struct mqtt_evt *evt)
    {
        switch (evt->type) {
        case MQTT_EVT_CONNACK:
            if (evt->result != 0) {
                printk("Connection failed: %d\n", evt->result);
                return;
            }
            
            printk("Connected to broker\n");
            
            /* Subscribe to topic */
            struct mqtt_topic subscribe_topic = {
                .topic.utf8 = MQTT_TOPIC_SUBSCRIBE,
                .topic.size = strlen(MQTT_TOPIC_SUBSCRIBE)
            };
            
            const struct mqtt_subscription_list sub_list = {
                .list = &subscribe_topic,
                .list_count = 1,
                .message_id = 1
            };
            
            mqtt_subscribe(client, &sub_list);
            break;
            
        case MQTT_EVT_PUBLISH:
            /* Handle incoming messages */
            printk("Received: %.*s\n",
                   evt->param.publish.message.payload.len,
                   evt->param.publish.message.payload.data);
            break;
            
        case MQTT_EVT_DISCONNECT:
            printk("Disconnected from broker\n");
            break;
    
        default: 
            break;
        }
        
    }
    
    void connect_to_broker(void)
    {
        int err;
        
        /* Initialize client */
        mqtt_client_init(&client);
        
        /* Set callback */
        client.evt_cb = mqtt_evt_handler;
        client.client_id.utf8 = MQTT_CLIENT_ID;
        client.client_id.size = strlen(MQTT_CLIENT_ID);
        
        /* Configure broker address */
        struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker;
        broker4->sin_family = AF_INET;
        broker4->sin_port = htons(MQTT_BROKER_PORT);
        inet_pton(AF_INET, MQTT_BROKER_HOSTNAME, &broker4->sin_addr.s_addr);
        
        /* Configure transport */
        client.transport.type = MQTT_TRANSPORT_NON_SECURE;
        client.broker = (struct sockaddr *)&broker;
        
        printk("Client parameters: \n");
            printk("  Client ID: %s\n", client.client_id.utf8);
            printk("  Broker: %s:%d\n", MQTT_BROKER_HOSTNAME, MQTT_BROKER_PORT);
            printk("  Topic: %s\n", MQTT_TOPIC_SUBSCRIBE);
            printk("  Transport: %s\n", client.transport.type == MQTT_TRANSPORT_NON_SECURE ? "Non-secure" : "Secure");
            
    
    
        /* Connect to broker */
        err = mqtt_connect(&client);
        if (err != 0) {
            printk("Connect error: %d\n", err);
        }
    }
    
    int main(void)
    {
        printk("Starting MQTT subscriber\n");
        connect_to_broker();
        
        while (1) {
            k_sleep(K_SECONDS(1));
        }
    }

    Here is my prj.conf:

    CONFIG_NETWORKING=y
    CONFIG_MQTT_LIB=y
    CONFIG_MQTT_LIB_TLS=n
    CONFIG_NET_SOCKETS=y
    CONFIG_NET_SOCKETS_POSIX_NAMES=y
    CONFIG_NET_IPV6=y
    CONFIG_NET_TCP=y
    CONFIG_NET_UDP=y
    CONFIG_NET_DHCPV6=n
    CONFIG_NET_MGMT=y
    CONFIG_NET_MGMT_EVENT=y
    CONFIG_LOG=y
    
    
    CONFIG_SHELL=y
    CONFIG_OPENTHREAD_SHELL=y
    CONFIG_SHELL_ARGC_MAX=26
    CONFIG_SHELL_CMD_BUFF_SIZE=416
    CONFIG_OPENTHREAD_NORDIC_LIBRARY_MASTER=y
    CONFIG_NET_L2_OPENTHREAD=y
    CONFIG_SETTINGS_ZMS=y
    CONFIG_ZMS=y
    CONFIG_NVS=n
    CONFIG_FLASH=y
    CONFIG_FLASH_MAP=y
    CONFIG_SPI_NOR=n 
    
    
    CONFIG_MAIN_STACK_SIZE=6144
    CONFIG_SHELL_STACK_SIZE=5120

  • In addition, I have a Mosquitto broker running on my raspi5, and it has OTBR set up. I'm trying to connect the 54L15 via Thread. 

  • This is the most updated version of my code. After setting client rx_buf and tx_buf, the error code became 116 instead of 12. 

    Running the code below would give a warning (self-defined): broker IPv6 address not in routing table. The IPv6 address that I passed in was obtained from the OTBR setup with raspi5, where the broker is. 

    Here is my main.c:

    #include <zephyr/kernel.h>
    #include <zephyr/net/mqtt.h>
    #include <zephyr/net/socket.h>
    #include <stdio.h>
    #include <string.h>
    #include <zephyr/logging/log.h>
    #include <openthread/instance.h>
    #include <openthread/thread_ftd.h>
    
    LOG_MODULE_REGISTER(main, LOG_LEVEL_INF);
    
    #define MQTT_BROKER_HOSTNAME "192.168.3.153"
    #define MQTT_BROKER_PORT 1883
    #define MQTT_CLIENT_ID "nrf54l15_subscriber"
    #define MQTT_TOPIC_SUBSCRIBE "test/54L15"
    
    struct mqtt_client client;
    static struct sockaddr_storage broker;
    otInstance *instance;
    
    #include <zephyr/app_memory/app_memdomain.h>
    K_APPMEM_PARTITION_DEFINE(app_partition);
    #define APP_BMEM K_APP_BMEM(app_partition)
    #define APP_DMEM K_APP_DMEM(app_partition)
    
    #define APP_MQTT_BUFFER_SIZE 128
    /* Buffers for MQTT client. */
    static APP_BMEM uint8_t rx_buffer[APP_MQTT_BUFFER_SIZE];
    static APP_BMEM uint8_t tx_buffer[APP_MQTT_BUFFER_SIZE];
    
    
    
    static void broker_init(void)
    {
    	struct sockaddr_in6 *broker6 = (struct sockaddr_in6 *)&broker;
    
    	broker6->sin6_family = AF_INET6;
    	broker6->sin6_port = htons(MQTT_BROKER_PORT);
    	inet_pton(AF_INET6, "fde3:8b00:6d4d:3436:0:ff:fe00:4800", &broker6->sin6_addr);
    }
    
    void mqtt_evt_handler(struct mqtt_client *const client,
                          const struct mqtt_evt *evt)
    {
        switch (evt->type)
        {
        case MQTT_EVT_CONNACK:
            if (evt->result != 0)
            {
                printk("Connection failed: %d\n", evt->result);
                return;
            }
    
            printk("Connected to broker\n");
    
            /* Subscribe to topic */
            struct mqtt_topic subscribe_topic = {
                .topic.utf8 = MQTT_TOPIC_SUBSCRIBE,
                .topic.size = strlen(MQTT_TOPIC_SUBSCRIBE)};
    
            const struct mqtt_subscription_list sub_list = {
                .list = &subscribe_topic,
                .list_count = 1,
                .message_id = 1};
    
            mqtt_subscribe(client, &sub_list);
            break;
    
        case MQTT_EVT_PUBLISH:
            /* Handle incoming messages */
            printk("Received: %.*s\n",
                   evt->param.publish.message.payload.len,
                   evt->param.publish.message.payload.data);
            break;
    
        case MQTT_EVT_DISCONNECT:
            printk("Disconnected from broker\n");
            break;
    
        default:
            break;
        }
    }
    
    void connect_to_broker(struct mqtt_client *client)
    {
        int err;
        broker_init();
        mqtt_client_init(client);
        /* MQTT client configuration */
    	client->broker = &broker;
    	client->evt_cb = mqtt_evt_handler;
    	client->client_id.utf8 = (uint8_t *)"test_client";
    	client->client_id.size = strlen("test_client");
    	client->password = NULL;
    	client->user_name = NULL;
    	client->protocol_version = MQTT_VERSION_3_1_1;
    
    	/* MQTT buffers configuration */
    	client->rx_buf = rx_buffer;
    	client->rx_buf_size = sizeof(rx_buffer);
    	client->tx_buf = tx_buffer;
    	client->tx_buf_size = sizeof(tx_buffer);
        err = mqtt_connect(client);
        if (err != 0)
        {
            LOG_ERR("Connect error: %d\n", err);
        }
    }
    
    void state_changed_callback(uint32_t flags, void *context)
    {
        otInstance *instance = (otInstance *)context;
    
        if (flags & OT_CHANGED_THREAD_ROLE)
        {
            switch (otThreadGetDeviceRole(instance))
            {
            case OT_DEVICE_ROLE_DISABLED:
                LOG_INF("Thread role: Disabled\n");
                break;
            case OT_DEVICE_ROLE_DETACHED:
                LOG_INF("Thread role: Detached\n");
                break;
            case OT_DEVICE_ROLE_CHILD:
                LOG_INF("Thread role: Child\n");
                // otThreadSetRouterEligible(instance, true);
                break;
            case OT_DEVICE_ROLE_ROUTER:
                LOG_INF("Thread role: Router\n");
                break;
            case OT_DEVICE_ROLE_LEADER:
                LOG_INF("Thread role: Leader\n");
                break;
            default:
                LOG_INF("Thread role: Unknown\n");
                break;
            }
        }
    }
    
    void thread_start()
    {
        instance = otInstanceInitSingle();
        if (instance == NULL)
        {
            LOG_ERR("Failed to initialize OpenThread instance");
            return;
        }
        otError error = otSetStateChangedCallback(instance, state_changed_callback, instance);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to set state changed callback: %d\n", error);
            return;
        }
        otPanId panid = 0x1234;
        error = otLinkSetPanId(instance, panid);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to set PAN ID: %d\n", error);
            return;
        }
    
        error = otLinkSetChannel(instance, 24);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to set channel: %d\n", error);
            return;
        }
        uint8_t networkkey[] = {0x9E, 0xEA, 0xF1, 0x11, 0x19, 0xF3, 0x88, 0x74, 0x19, 0xA6, 0x22, 0x54, 0x98, 0x31, 0xD7, 0x99};
        otNetworkKey networkKey_set; // the network key to set
        memcpy(networkKey_set.m8, networkkey, sizeof(networkKey_set.m8));
        for (int i = 0; i < sizeof(networkKey_set.m8); i++)
        {
            printk("%02X ", networkKey_set.m8[i]);
        }
        error = otThreadSetNetworkKey(instance, &networkKey_set);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to set network key: %d\n", error);
            return;
        }
        error = otIp6SetEnabled(instance, true);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to enable IPv6: %d\n", error);
            return;
        }
        error = otThreadSetEnabled(instance, true);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to enable Thread: %d\n", error);
            return;
        }
    }
    
    int main(void)
    {
        thread_start();
        while (otThreadGetDeviceRole(instance) == OT_DEVICE_ROLE_DISABLED ||
               otThreadGetDeviceRole(instance) == OT_DEVICE_ROLE_DETACHED)
        {
            LOG_INF("Waiting for device to attach to the OTBR on raspi5...");
            k_sleep(K_MSEC(1000));
        }
    
        while (otThreadGetDeviceRole(instance) > OT_DEVICE_ROLE_DETACHED)
        {
            struct sockaddr_in6 *broker6 = (struct sockaddr_in6 *)&broker;
            char addr_str[NET_IPV6_ADDR_LEN]; // Buffer to store the IPv6 address as a string
    
            if (net_addr_ntop(AF_INET6, &broker6->sin6_addr, addr_str, sizeof(addr_str)) == NULL)
            {
                LOG_ERR("Failed to convert broker IPv6 address to string");
            }
            else
            {
                LOG_INF("Broker IPv6 address: %s", addr_str);
            }
    
            if (net_if_ipv6_addr_lookup(&broker6->sin6_addr, NULL) == NULL)
            {
                LOG_WRN("Broker IPv6 address not in routing table");
            }
    
            connect_to_broker(&client);
            k_sleep(K_SECONDS(1));
        }
    
        return 0;
    }

    Here is my prj.conf:

    CONFIG_NETWORKING=y
    CONFIG_MQTT_LIB=y
    CONFIG_MQTT_LIB_TLS=n
    CONFIG_NET_SOCKETS=y
    CONFIG_NET_SOCKETS_POSIX_NAMES=y
    CONFIG_NET_IPV6=y
    CONFIG_NET_IPV4=y
    CONFIG_NET_TCP=y
    CONFIG_NET_UDP=y
    CONFIG_NET_DHCPV6=n
    CONFIG_NET_MGMT=y
    CONFIG_NET_MGMT_EVENT=y
    CONFIG_LOG=y
    
    
    CONFIG_SHELL=y
    CONFIG_OPENTHREAD_SHELL=y
    CONFIG_SHELL_ARGC_MAX=26
    CONFIG_SHELL_CMD_BUFF_SIZE=416
    CONFIG_OPENTHREAD_NORDIC_LIBRARY_MASTER=y
    CONFIG_NET_L2_OPENTHREAD=y
    CONFIG_SETTINGS_ZMS=y
    CONFIG_ZMS=y
    CONFIG_NVS=n
    CONFIG_FLASH=y
    CONFIG_FLASH_MAP=y
    CONFIG_SPI_NOR=n 
    
    
    CONFIG_MAIN_STACK_SIZE=6144
    CONFIG_SHELL_STACK_SIZE=5120
    
    CONFIG_NET_SHELL=y
    CONFIG_NET_IPV6_NBR_CACHE=y
    
    CONFIG_NET_PKT_RX_COUNT=16
    CONFIG_NET_PKT_TX_COUNT=16
    CONFIG_NET_BUF_RX_COUNT=32
    CONFIG_NET_BUF_TX_COUNT=32
    CONFIG_HEAP_MEM_POOL_SIZE=24576

  • Hi Allan-led,

    I don't think anyone has tried MQTT over Thread on the nRF54L15 before, but it seems to have been attempted on the nRF52840. The nRF54L15 has improvements compared to the nRF52840, so perhaps it could handle TCP better. However, it hasn't been tried, to my knowledge.

    As for the error you are having, it is going to be challenging to figure out from just reading the code alone. Could you please use the debugger to get the full call stack to where it failed? That will give us more ideas to work with.

    Hieu

  • Hi Hieu,

    I don't think the debugger has taken us very far; the error that I encountered occurred during the call to mqtt_connect(), but it did not lead to a total halt of the device. tried re-initializing the client and re-connecting it in the main while loop, and it keeps producing the same error. After the errors are produced, I am still able to interact with the device through the command line, which indicates that the device is still alive. 

    Here are the outputs:

    In the image above, I verified that the connection between my device and the OTBR is indeed established. I can continue interacting with the device even if the error message was produced. 

    Then, I tried using the debugger: 

    Here is where the debugger took me.

    However, I tried pressing continue and pausing past this point, it led me to a fatal halt with reason 3. This is actually the case with all of my projects, am I using the debugger incorrectly or is there a common issue among all of my projects? 

    Best regards,
    Allan

Related