MQTT on 54L15

Hi all,

Has anyone implemented an MQTT client on an 54L15 before? I have been searching the web and I have not seen a sample program or anyone talking about MQTT with 54L15 only. I would appreciate any sample programs, it doesn't need to be complicated, I just want to verify that we can indeed implement the callback functions and its functionalities on 54L15. Thanks.

Parents
  • This is the most updated version of my code. After setting client rx_buf and tx_buf, the error code became 116 instead of 12. 

    Running the code below would give a warning (self-defined): broker IPv6 address not in routing table. The IPv6 address that I passed in was obtained from the OTBR setup with raspi5, where the broker is. 

    Here is my main.c:

    #include <zephyr/kernel.h>
    #include <zephyr/net/mqtt.h>
    #include <zephyr/net/socket.h>
    #include <stdio.h>
    #include <string.h>
    #include <zephyr/logging/log.h>
    #include <openthread/instance.h>
    #include <openthread/thread_ftd.h>
    
    LOG_MODULE_REGISTER(main, LOG_LEVEL_INF);
    
    #define MQTT_BROKER_HOSTNAME "192.168.3.153"
    #define MQTT_BROKER_PORT 1883
    #define MQTT_CLIENT_ID "nrf54l15_subscriber"
    #define MQTT_TOPIC_SUBSCRIBE "test/54L15"
    
    struct mqtt_client client;
    static struct sockaddr_storage broker;
    otInstance *instance;
    
    #include <zephyr/app_memory/app_memdomain.h>
    K_APPMEM_PARTITION_DEFINE(app_partition);
    #define APP_BMEM K_APP_BMEM(app_partition)
    #define APP_DMEM K_APP_DMEM(app_partition)
    
    #define APP_MQTT_BUFFER_SIZE 128
    /* Buffers for MQTT client. */
    static APP_BMEM uint8_t rx_buffer[APP_MQTT_BUFFER_SIZE];
    static APP_BMEM uint8_t tx_buffer[APP_MQTT_BUFFER_SIZE];
    
    
    
    static void broker_init(void)
    {
    	struct sockaddr_in6 *broker6 = (struct sockaddr_in6 *)&broker;
    
    	broker6->sin6_family = AF_INET6;
    	broker6->sin6_port = htons(MQTT_BROKER_PORT);
    	inet_pton(AF_INET6, "fde3:8b00:6d4d:3436:0:ff:fe00:4800", &broker6->sin6_addr);
    }
    
    void mqtt_evt_handler(struct mqtt_client *const client,
                          const struct mqtt_evt *evt)
    {
        switch (evt->type)
        {
        case MQTT_EVT_CONNACK:
            if (evt->result != 0)
            {
                printk("Connection failed: %d\n", evt->result);
                return;
            }
    
            printk("Connected to broker\n");
    
            /* Subscribe to topic */
            struct mqtt_topic subscribe_topic = {
                .topic.utf8 = MQTT_TOPIC_SUBSCRIBE,
                .topic.size = strlen(MQTT_TOPIC_SUBSCRIBE)};
    
            const struct mqtt_subscription_list sub_list = {
                .list = &subscribe_topic,
                .list_count = 1,
                .message_id = 1};
    
            mqtt_subscribe(client, &sub_list);
            break;
    
        case MQTT_EVT_PUBLISH:
            /* Handle incoming messages */
            printk("Received: %.*s\n",
                   evt->param.publish.message.payload.len,
                   evt->param.publish.message.payload.data);
            break;
    
        case MQTT_EVT_DISCONNECT:
            printk("Disconnected from broker\n");
            break;
    
        default:
            break;
        }
    }
    
    void connect_to_broker(struct mqtt_client *client)
    {
        int err;
        broker_init();
        mqtt_client_init(client);
        /* MQTT client configuration */
    	client->broker = &broker;
    	client->evt_cb = mqtt_evt_handler;
    	client->client_id.utf8 = (uint8_t *)"test_client";
    	client->client_id.size = strlen("test_client");
    	client->password = NULL;
    	client->user_name = NULL;
    	client->protocol_version = MQTT_VERSION_3_1_1;
    
    	/* MQTT buffers configuration */
    	client->rx_buf = rx_buffer;
    	client->rx_buf_size = sizeof(rx_buffer);
    	client->tx_buf = tx_buffer;
    	client->tx_buf_size = sizeof(tx_buffer);
        err = mqtt_connect(client);
        if (err != 0)
        {
            LOG_ERR("Connect error: %d\n", err);
        }
    }
    
    void state_changed_callback(uint32_t flags, void *context)
    {
        otInstance *instance = (otInstance *)context;
    
        if (flags & OT_CHANGED_THREAD_ROLE)
        {
            switch (otThreadGetDeviceRole(instance))
            {
            case OT_DEVICE_ROLE_DISABLED:
                LOG_INF("Thread role: Disabled\n");
                break;
            case OT_DEVICE_ROLE_DETACHED:
                LOG_INF("Thread role: Detached\n");
                break;
            case OT_DEVICE_ROLE_CHILD:
                LOG_INF("Thread role: Child\n");
                // otThreadSetRouterEligible(instance, true);
                break;
            case OT_DEVICE_ROLE_ROUTER:
                LOG_INF("Thread role: Router\n");
                break;
            case OT_DEVICE_ROLE_LEADER:
                LOG_INF("Thread role: Leader\n");
                break;
            default:
                LOG_INF("Thread role: Unknown\n");
                break;
            }
        }
    }
    
    void thread_start()
    {
        instance = otInstanceInitSingle();
        if (instance == NULL)
        {
            LOG_ERR("Failed to initialize OpenThread instance");
            return;
        }
        otError error = otSetStateChangedCallback(instance, state_changed_callback, instance);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to set state changed callback: %d\n", error);
            return;
        }
        otPanId panid = 0x1234;
        error = otLinkSetPanId(instance, panid);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to set PAN ID: %d\n", error);
            return;
        }
    
        error = otLinkSetChannel(instance, 24);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to set channel: %d\n", error);
            return;
        }
        uint8_t networkkey[] = {0x9E, 0xEA, 0xF1, 0x11, 0x19, 0xF3, 0x88, 0x74, 0x19, 0xA6, 0x22, 0x54, 0x98, 0x31, 0xD7, 0x99};
        otNetworkKey networkKey_set; // the network key to set
        memcpy(networkKey_set.m8, networkkey, sizeof(networkKey_set.m8));
        for (int i = 0; i < sizeof(networkKey_set.m8); i++)
        {
            printk("%02X ", networkKey_set.m8[i]);
        }
        error = otThreadSetNetworkKey(instance, &networkKey_set);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to set network key: %d\n", error);
            return;
        }
        error = otIp6SetEnabled(instance, true);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to enable IPv6: %d\n", error);
            return;
        }
        error = otThreadSetEnabled(instance, true);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to enable Thread: %d\n", error);
            return;
        }
    }
    
    int main(void)
    {
        thread_start();
        while (otThreadGetDeviceRole(instance) == OT_DEVICE_ROLE_DISABLED ||
               otThreadGetDeviceRole(instance) == OT_DEVICE_ROLE_DETACHED)
        {
            LOG_INF("Waiting for device to attach to the OTBR on raspi5...");
            k_sleep(K_MSEC(1000));
        }
    
        while (otThreadGetDeviceRole(instance) > OT_DEVICE_ROLE_DETACHED)
        {
            struct sockaddr_in6 *broker6 = (struct sockaddr_in6 *)&broker;
            char addr_str[NET_IPV6_ADDR_LEN]; // Buffer to store the IPv6 address as a string
    
            if (net_addr_ntop(AF_INET6, &broker6->sin6_addr, addr_str, sizeof(addr_str)) == NULL)
            {
                LOG_ERR("Failed to convert broker IPv6 address to string");
            }
            else
            {
                LOG_INF("Broker IPv6 address: %s", addr_str);
            }
    
            if (net_if_ipv6_addr_lookup(&broker6->sin6_addr, NULL) == NULL)
            {
                LOG_WRN("Broker IPv6 address not in routing table");
            }
    
            connect_to_broker(&client);
            k_sleep(K_SECONDS(1));
        }
    
        return 0;
    }

    Here is my prj.conf:

    CONFIG_NETWORKING=y
    CONFIG_MQTT_LIB=y
    CONFIG_MQTT_LIB_TLS=n
    CONFIG_NET_SOCKETS=y
    CONFIG_NET_SOCKETS_POSIX_NAMES=y
    CONFIG_NET_IPV6=y
    CONFIG_NET_IPV4=y
    CONFIG_NET_TCP=y
    CONFIG_NET_UDP=y
    CONFIG_NET_DHCPV6=n
    CONFIG_NET_MGMT=y
    CONFIG_NET_MGMT_EVENT=y
    CONFIG_LOG=y
    
    
    CONFIG_SHELL=y
    CONFIG_OPENTHREAD_SHELL=y
    CONFIG_SHELL_ARGC_MAX=26
    CONFIG_SHELL_CMD_BUFF_SIZE=416
    CONFIG_OPENTHREAD_NORDIC_LIBRARY_MASTER=y
    CONFIG_NET_L2_OPENTHREAD=y
    CONFIG_SETTINGS_ZMS=y
    CONFIG_ZMS=y
    CONFIG_NVS=n
    CONFIG_FLASH=y
    CONFIG_FLASH_MAP=y
    CONFIG_SPI_NOR=n 
    
    
    CONFIG_MAIN_STACK_SIZE=6144
    CONFIG_SHELL_STACK_SIZE=5120
    
    CONFIG_NET_SHELL=y
    CONFIG_NET_IPV6_NBR_CACHE=y
    
    CONFIG_NET_PKT_RX_COUNT=16
    CONFIG_NET_PKT_TX_COUNT=16
    CONFIG_NET_BUF_RX_COUNT=32
    CONFIG_NET_BUF_TX_COUNT=32
    CONFIG_HEAP_MEM_POOL_SIZE=24576

Reply
  • This is the most updated version of my code. After setting client rx_buf and tx_buf, the error code became 116 instead of 12. 

    Running the code below would give a warning (self-defined): broker IPv6 address not in routing table. The IPv6 address that I passed in was obtained from the OTBR setup with raspi5, where the broker is. 

    Here is my main.c:

    #include <zephyr/kernel.h>
    #include <zephyr/net/mqtt.h>
    #include <zephyr/net/socket.h>
    #include <stdio.h>
    #include <string.h>
    #include <zephyr/logging/log.h>
    #include <openthread/instance.h>
    #include <openthread/thread_ftd.h>
    
    LOG_MODULE_REGISTER(main, LOG_LEVEL_INF);
    
    #define MQTT_BROKER_HOSTNAME "192.168.3.153"
    #define MQTT_BROKER_PORT 1883
    #define MQTT_CLIENT_ID "nrf54l15_subscriber"
    #define MQTT_TOPIC_SUBSCRIBE "test/54L15"
    
    struct mqtt_client client;
    static struct sockaddr_storage broker;
    otInstance *instance;
    
    #include <zephyr/app_memory/app_memdomain.h>
    K_APPMEM_PARTITION_DEFINE(app_partition);
    #define APP_BMEM K_APP_BMEM(app_partition)
    #define APP_DMEM K_APP_DMEM(app_partition)
    
    #define APP_MQTT_BUFFER_SIZE 128
    /* Buffers for MQTT client. */
    static APP_BMEM uint8_t rx_buffer[APP_MQTT_BUFFER_SIZE];
    static APP_BMEM uint8_t tx_buffer[APP_MQTT_BUFFER_SIZE];
    
    
    
    static void broker_init(void)
    {
    	struct sockaddr_in6 *broker6 = (struct sockaddr_in6 *)&broker;
    
    	broker6->sin6_family = AF_INET6;
    	broker6->sin6_port = htons(MQTT_BROKER_PORT);
    	inet_pton(AF_INET6, "fde3:8b00:6d4d:3436:0:ff:fe00:4800", &broker6->sin6_addr);
    }
    
    void mqtt_evt_handler(struct mqtt_client *const client,
                          const struct mqtt_evt *evt)
    {
        switch (evt->type)
        {
        case MQTT_EVT_CONNACK:
            if (evt->result != 0)
            {
                printk("Connection failed: %d\n", evt->result);
                return;
            }
    
            printk("Connected to broker\n");
    
            /* Subscribe to topic */
            struct mqtt_topic subscribe_topic = {
                .topic.utf8 = MQTT_TOPIC_SUBSCRIBE,
                .topic.size = strlen(MQTT_TOPIC_SUBSCRIBE)};
    
            const struct mqtt_subscription_list sub_list = {
                .list = &subscribe_topic,
                .list_count = 1,
                .message_id = 1};
    
            mqtt_subscribe(client, &sub_list);
            break;
    
        case MQTT_EVT_PUBLISH:
            /* Handle incoming messages */
            printk("Received: %.*s\n",
                   evt->param.publish.message.payload.len,
                   evt->param.publish.message.payload.data);
            break;
    
        case MQTT_EVT_DISCONNECT:
            printk("Disconnected from broker\n");
            break;
    
        default:
            break;
        }
    }
    
    void connect_to_broker(struct mqtt_client *client)
    {
        int err;
        broker_init();
        mqtt_client_init(client);
        /* MQTT client configuration */
    	client->broker = &broker;
    	client->evt_cb = mqtt_evt_handler;
    	client->client_id.utf8 = (uint8_t *)"test_client";
    	client->client_id.size = strlen("test_client");
    	client->password = NULL;
    	client->user_name = NULL;
    	client->protocol_version = MQTT_VERSION_3_1_1;
    
    	/* MQTT buffers configuration */
    	client->rx_buf = rx_buffer;
    	client->rx_buf_size = sizeof(rx_buffer);
    	client->tx_buf = tx_buffer;
    	client->tx_buf_size = sizeof(tx_buffer);
        err = mqtt_connect(client);
        if (err != 0)
        {
            LOG_ERR("Connect error: %d\n", err);
        }
    }
    
    void state_changed_callback(uint32_t flags, void *context)
    {
        otInstance *instance = (otInstance *)context;
    
        if (flags & OT_CHANGED_THREAD_ROLE)
        {
            switch (otThreadGetDeviceRole(instance))
            {
            case OT_DEVICE_ROLE_DISABLED:
                LOG_INF("Thread role: Disabled\n");
                break;
            case OT_DEVICE_ROLE_DETACHED:
                LOG_INF("Thread role: Detached\n");
                break;
            case OT_DEVICE_ROLE_CHILD:
                LOG_INF("Thread role: Child\n");
                // otThreadSetRouterEligible(instance, true);
                break;
            case OT_DEVICE_ROLE_ROUTER:
                LOG_INF("Thread role: Router\n");
                break;
            case OT_DEVICE_ROLE_LEADER:
                LOG_INF("Thread role: Leader\n");
                break;
            default:
                LOG_INF("Thread role: Unknown\n");
                break;
            }
        }
    }
    
    void thread_start()
    {
        instance = otInstanceInitSingle();
        if (instance == NULL)
        {
            LOG_ERR("Failed to initialize OpenThread instance");
            return;
        }
        otError error = otSetStateChangedCallback(instance, state_changed_callback, instance);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to set state changed callback: %d\n", error);
            return;
        }
        otPanId panid = 0x1234;
        error = otLinkSetPanId(instance, panid);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to set PAN ID: %d\n", error);
            return;
        }
    
        error = otLinkSetChannel(instance, 24);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to set channel: %d\n", error);
            return;
        }
        uint8_t networkkey[] = {0x9E, 0xEA, 0xF1, 0x11, 0x19, 0xF3, 0x88, 0x74, 0x19, 0xA6, 0x22, 0x54, 0x98, 0x31, 0xD7, 0x99};
        otNetworkKey networkKey_set; // the network key to set
        memcpy(networkKey_set.m8, networkkey, sizeof(networkKey_set.m8));
        for (int i = 0; i < sizeof(networkKey_set.m8); i++)
        {
            printk("%02X ", networkKey_set.m8[i]);
        }
        error = otThreadSetNetworkKey(instance, &networkKey_set);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to set network key: %d\n", error);
            return;
        }
        error = otIp6SetEnabled(instance, true);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to enable IPv6: %d\n", error);
            return;
        }
        error = otThreadSetEnabled(instance, true);
        if (error != OT_ERROR_NONE)
        {
            printk("Failed to enable Thread: %d\n", error);
            return;
        }
    }
    
    int main(void)
    {
        thread_start();
        while (otThreadGetDeviceRole(instance) == OT_DEVICE_ROLE_DISABLED ||
               otThreadGetDeviceRole(instance) == OT_DEVICE_ROLE_DETACHED)
        {
            LOG_INF("Waiting for device to attach to the OTBR on raspi5...");
            k_sleep(K_MSEC(1000));
        }
    
        while (otThreadGetDeviceRole(instance) > OT_DEVICE_ROLE_DETACHED)
        {
            struct sockaddr_in6 *broker6 = (struct sockaddr_in6 *)&broker;
            char addr_str[NET_IPV6_ADDR_LEN]; // Buffer to store the IPv6 address as a string
    
            if (net_addr_ntop(AF_INET6, &broker6->sin6_addr, addr_str, sizeof(addr_str)) == NULL)
            {
                LOG_ERR("Failed to convert broker IPv6 address to string");
            }
            else
            {
                LOG_INF("Broker IPv6 address: %s", addr_str);
            }
    
            if (net_if_ipv6_addr_lookup(&broker6->sin6_addr, NULL) == NULL)
            {
                LOG_WRN("Broker IPv6 address not in routing table");
            }
    
            connect_to_broker(&client);
            k_sleep(K_SECONDS(1));
        }
    
        return 0;
    }

    Here is my prj.conf:

    CONFIG_NETWORKING=y
    CONFIG_MQTT_LIB=y
    CONFIG_MQTT_LIB_TLS=n
    CONFIG_NET_SOCKETS=y
    CONFIG_NET_SOCKETS_POSIX_NAMES=y
    CONFIG_NET_IPV6=y
    CONFIG_NET_IPV4=y
    CONFIG_NET_TCP=y
    CONFIG_NET_UDP=y
    CONFIG_NET_DHCPV6=n
    CONFIG_NET_MGMT=y
    CONFIG_NET_MGMT_EVENT=y
    CONFIG_LOG=y
    
    
    CONFIG_SHELL=y
    CONFIG_OPENTHREAD_SHELL=y
    CONFIG_SHELL_ARGC_MAX=26
    CONFIG_SHELL_CMD_BUFF_SIZE=416
    CONFIG_OPENTHREAD_NORDIC_LIBRARY_MASTER=y
    CONFIG_NET_L2_OPENTHREAD=y
    CONFIG_SETTINGS_ZMS=y
    CONFIG_ZMS=y
    CONFIG_NVS=n
    CONFIG_FLASH=y
    CONFIG_FLASH_MAP=y
    CONFIG_SPI_NOR=n 
    
    
    CONFIG_MAIN_STACK_SIZE=6144
    CONFIG_SHELL_STACK_SIZE=5120
    
    CONFIG_NET_SHELL=y
    CONFIG_NET_IPV6_NBR_CACHE=y
    
    CONFIG_NET_PKT_RX_COUNT=16
    CONFIG_NET_PKT_TX_COUNT=16
    CONFIG_NET_BUF_RX_COUNT=32
    CONFIG_NET_BUF_TX_COUNT=32
    CONFIG_HEAP_MEM_POOL_SIZE=24576

Children
No Data
Related