This post is older than 2 years and might not be relevant anymore
More Info: Consider searching for newer posts

Thread MQTT-SN Example publish and subscribe can not combine operations

Hi Nordic Team:

1.  I use this example "Thread MQTT-SN Example".

after i compile the code than i do the step below

1.Build the example according to the instructions in Building examples.
2.Turn the MQTT-SN gateway on according to the instructions in Thread Border Router.
3.Program a development board with the MQTT-SN Subscriber application.
4.Program another development board with the MQTT-SN Publisher application.
5.Let the clients find the MQTT-SN gateway by pressing Button 2 on both boards.

one of the FTD get kick out , when i do the 5 step

and the error message show up
"0> <error> app: Message of unsupported type has been received.
0>
0> <error> app: MQTT-SN message could not be processed."

Use one device of the MQTT-SN Subscriber application and other one device of the MQTT-SN Publisher application. (one publish device, one subscribe device)

I get the message_type is 1.

so the border router seems to be can only connect to one FTD

if i connect two devices in the same time one of FTD will be kick of

is there any reason may cause that? Or i miss some step need to be done?

2. if using two subscribers well the same problems too. do you have any advice that can solve the problems of two subscribers? Pls kindly help advise how to solve it?

3.I follow the WEB steps below Playing with Thread and MQTT-SN on Nordic nRF52840, I follow the WEB steps below Playing with Thread and MQTT-SN on Nordic nRF52840, Now I have two devices, wanted to combine operations publish and subscribe at the same time in the device.
Pls kindly help advise how to solve it?

Best Regards,

Rick.

  • Hi,

    I tested with one subscriber and one publisher client, and followed the steps in the example. This works well and I'm able to subscribe to topics on one of the devices and publish from another device. I'm still seeing the "message could not be processed" error, but this does not affect the functionality.

    The example is only written for a single publisher and subscriber. 

    I will look into if it is possible to merge the publisher and subscriber examples to run on a single board.

    Best regards,
    Jørgen

  • Hi again,

    Here is a combined example that supports both publishing and subscribing to topics (modified in mqttsn_client_subscriber).

    • Press button 2 to search for GW.
    • Press button 3 to connect to the GW.
    • Press button 4 to subscribe to the topic.
    • Press button 1 to publish to the topic.

    /**
     * Copyright (c) 2017 - 2020, Nordic Semiconductor ASA
     *
     * All rights reserved.
     *
     * Redistribution and use in source and binary forms, with or without modification,
     * are permitted provided that the following conditions are met:
     *
     * 1. Redistributions of source code must retain the above copyright notice, this
     *    list of conditions and the following disclaimer.
     *
     * 2. Redistributions in binary form, except as embedded into a Nordic
     *    Semiconductor ASA integrated circuit in a product or a software update for
     *    such product, must reproduce the above copyright notice, this list of
     *    conditions and the following disclaimer in the documentation and/or other
     *    materials provided with the distribution.
     *
     * 3. Neither the name of Nordic Semiconductor ASA nor the names of its
     *    contributors may be used to endorse or promote products derived from this
     *    software without specific prior written permission.
     *
     * 4. This software, with or without modification, must only be used with a
     *    Nordic Semiconductor ASA integrated circuit.
     *
     * 5. Any software provided in binary form under this license must not be reverse
     *    engineered, decompiled, modified and/or disassembled.
     *
     * THIS SOFTWARE IS PROVIDED BY NORDIC SEMICONDUCTOR ASA "AS IS" AND ANY EXPRESS
     * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     * OF MERCHANTABILITY, NONINFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE ARE
     * DISCLAIMED. IN NO EVENT SHALL NORDIC SEMICONDUCTOR ASA OR CONTRIBUTORS BE
     * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
     * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
     * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     *
     */
    /** @file
     *
     * @defgroup thread_mqttsn_subscriber_example_main main.c
     * @{
     * @ingroup thread_mqttsn_subscriber_example
     * @brief Thread MQTT-SN Client Subscriber Example Application main file.
     *
     * @details This example demonstrates an MQTT-SN client subscriber application that
     *          provides resources to control BSP_LED_2 via MQTT-SN messages.
     *          It can be controlled by a board with related MQTT-SN client publisher application.
     *
     */
    
    #include <stdbool.h>
    #include <stdint.h>
    #include <string.h>
    
    #include "app_scheduler.h"
    #include "app_timer.h"
    #include "bsp_thread.h"
    #include "nrf_log.h"
    #include "nrf_log_ctrl.h"
    #include "nrf_log_default_backends.h"
    
    #include "mqttsn_client.h"
    #include "thread_utils.h"
    
    #include <openthread/thread.h>
    
    #define LED_ON_REQUEST         1                                            /**< LED ON command. */
    #define LED_OFF_REQUEST        0                                            /**< LED OFF command. */
    #define SEARCH_GATEWAY_TIMEOUT 5                                            /**< MQTT-SN Gateway discovery procedure timeout in [s]. */
    
    #define SCHED_QUEUE_SIZE       32                                           /**< Maximum number of events in the scheduler queue. */
    #define SCHED_EVENT_DATA_SIZE  APP_TIMER_SCHED_EVENT_DATA_SIZE              /**< Maximum app_scheduler event size. */
    
    static mqttsn_client_t      m_client;                                       /**< An MQTT-SN client instance. */
    static mqttsn_remote_t      m_gateway_addr;                                 /**< A gateway address. */
    static uint8_t              m_gateway_id;                                   /**< A gateway ID. */
    static mqttsn_connect_opt_t m_connect_opt;                                  /**< Connect options for the MQTT-SN client. */
    static uint8_t              m_led_state        = 0;                         /**< Previously sent BSP_LED_2 command. */
    static bool                 m_subscribed       = 0;                         /**< Current subscription state. */
    static uint16_t             m_msg_id           = 0;                         /**< Message ID thrown with MQTTSN_EVENT_TIMEOUT. */
    static char                 m_client_id[]      = "nRF52840_subscriber";     /**< The MQTT-SN Client's ID. */
    static char                 m_topic_name[]     = "nRF52840_resources/led3"; /**< Name of the topic corresponding to subscriber's BSP_LED_2. */
    static mqttsn_topic_t       m_topic            =                            /**< Topic corresponding to subscriber's BSP_LED_2. */
    {
        .p_topic_name = (unsigned char *)m_topic_name,
        .topic_id     = 0,
    };
    
    /***************************************************************************************************
     * @section MQTT-SN
     **************************************************************************************************/
    
     /**@brief Turns the MQTT-SN network indication LED on.
     *
     * @details This LED is on when an MQTT-SN client is in connected or awake state.
     */
    static void light_on(void)
    {
        LEDS_ON(BSP_LED_3_MASK);
    }
    
    
    /**@brief Turns the MQTT-SN network indication LED off.
     *
     * @details This LED is on when an MQTT-SN client is in disconnected or asleep state.
     */
    static void light_off(void)
    {
        LEDS_OFF(BSP_LED_3_MASK);
    }
    
    
    /**@brief Toggles BSP_LED_2 based on received LED command. */
    static void led_update(uint8_t * p_data)
    {
        if (*p_data == LED_ON_REQUEST)
        {
            LEDS_ON(BSP_LED_2_MASK);
        }
        else if (*p_data == LED_OFF_REQUEST)
        {
            LEDS_OFF(BSP_LED_2_MASK);
        }
    
        return;
    }
    
    /**@brief Initializes MQTT-SN client's connection options. */
    static void connect_opt_init(void)
    {
        m_connect_opt.alive_duration = MQTTSN_DEFAULT_ALIVE_DURATION,
        m_connect_opt.clean_session  = MQTTSN_DEFAULT_CLEAN_SESSION_FLAG,
        m_connect_opt.will_flag      = MQTTSN_DEFAULT_WILL_FLAG,
        m_connect_opt.client_id_len  = strlen(m_client_id),
    
        memcpy(m_connect_opt.p_client_id,  (unsigned char *)m_client_id,  m_connect_opt.client_id_len);
    }
    
    
    /**@brief Processes GWINFO message from a gateway.
     *
     * @details This function updates MQTT-SN Gateway information.
     *
     * @param[in]    p_event  Pointer to MQTT-SN event.
     */
    static void gateway_info_callback(mqttsn_event_t * p_event)
    {
        m_gateway_addr  = *(p_event->event_data.connected.p_gateway_addr);
        m_gateway_id    = p_event->event_data.connected.gateway_id;
    }
    
    
    /**@brief Processes CONNACK message from a gateway.
     *
     * @details This function launches the topic registration procedure if necessary.
     */
    static void connected_callback(void)
    {
        light_on();
    
        uint32_t err_code = mqttsn_client_topic_register(&m_client,
                                                         m_topic.p_topic_name,
                                                         strlen(m_topic_name),
                                                         &m_msg_id);
        if (err_code != NRF_SUCCESS)
        {
            NRF_LOG_ERROR("REGISTER message could not be sent. Error code: 0x%x\r\n", err_code);
        }
    }
    
    
    /**@brief Processes DISCONNECT message from a gateway. */
    static void disconnected_callback(void)
    {
        light_off();
    }
    
    
    /**@brief Processes REGACK message from a gateway.
     *
     * @param[in] p_event Pointer to MQTT-SN event.
     */
    static void regack_callback(mqttsn_event_t * p_event)
    {
        m_topic.topic_id = p_event->event_data.registered.packet.topic.topic_id;
        NRF_LOG_INFO("MQTT-SN event: Topic has been registered with ID: %d.\r\n",
                     p_event->event_data.registered.packet.topic.topic_id);
    }
    
    
    /**@brief Processes data published by a broker.
     *
     * @details This function processes LED command.
     */
    static void received_callback(mqttsn_event_t * p_event)
    {
        if (p_event->event_data.published.packet.topic.topic_id == m_topic.topic_id)
        {
            NRF_LOG_INFO("MQTT-SN event: Content to subscribed topic received.\r\n");
            led_update(p_event->event_data.published.p_payload);
        }
        else
        {
            NRF_LOG_INFO("MQTT-SN event: Content to unsubscribed topic received. Dropping packet.\r\n");
        }
    }
    
    
    /**@brief Processes retransmission limit reached event. */
    static void timeout_callback(mqttsn_event_t * p_event)
    {
        NRF_LOG_INFO("MQTT-SN event: Timed-out message: %d. Message ID: %d.\r\n",
                      p_event->event_data.error.msg_type,
                      p_event->event_data.error.msg_id);
    }
    
    
    /**@brief Processes results of gateway discovery procedure. */
    static void searchgw_timeout_callback(mqttsn_event_t * p_event)
    {
        NRF_LOG_INFO("MQTT-SN event: Gateway discovery result: 0x%x.\r\n", p_event->event_data.discovery);
    }
    
    
    /**@brief Function for handling MQTT-SN events. */
    void mqttsn_evt_handler(mqttsn_client_t * p_client, mqttsn_event_t * p_event)
    {
        switch(p_event->event_id)
        {
            case MQTTSN_EVENT_GATEWAY_FOUND:
                NRF_LOG_INFO("MQTT-SN event: Client has found an active gateway.\r\n");
                gateway_info_callback(p_event);
                break;
    
            case MQTTSN_EVENT_CONNECTED:
                NRF_LOG_INFO("MQTT-SN event: Client connected.\r\n");
                connected_callback();
                break;
    
            case MQTTSN_EVENT_DISCONNECT_PERMIT:
                NRF_LOG_INFO("MQTT-SN event: Client disconnected.\r\n");
                disconnected_callback();
                break;
    
            case MQTTSN_EVENT_REGISTERED:
                NRF_LOG_INFO("MQTT-SN event: Client registered topic.\r\n");
                regack_callback(p_event);
                break;
    
            case MQTTSN_EVENT_SUBSCRIBED:
                NRF_LOG_INFO("MQTT-SN event: Client subscribed to topic.\r\n");
                break;
    
            case MQTTSN_EVENT_UNSUBSCRIBED:
                NRF_LOG_INFO("MQTT-SN event: Client unsubscribed to topic.\r\n");
                break;
    
            case MQTTSN_EVENT_RECEIVED:
                NRF_LOG_INFO("MQTT-SN event: Client received content.\r\n");
                received_callback(p_event);
                break;
    
            case MQTTSN_EVENT_TIMEOUT:
                NRF_LOG_INFO("MQTT-SN event: Retransmission retries limit has been reached.\r\n");
                timeout_callback(p_event);
                break;
    
            case MQTTSN_EVENT_SEARCHGW_TIMEOUT:
                NRF_LOG_INFO("MQTT-SN event: Gateway discovery procedure has finished.\r\n");
                searchgw_timeout_callback(p_event);
                break;
    
            default:
                break;
        }
    }
    
    /***************************************************************************************************
     * @section State
     **************************************************************************************************/
    
    static void state_changed_callback(uint32_t flags, void * p_context)
    {
        NRF_LOG_INFO("State changed! Flags: 0x%08x Current role: %d\r\n",
                     flags, otThreadGetDeviceRole(p_context));
    }
    
    /***************************************************************************************************
     * @section Buttons
     **************************************************************************************************/
    
    static void subscribe(void)
    {
        uint8_t  topic_name_len = strlen(m_topic_name);
        uint32_t err_code       = NRF_SUCCESS;
    
        if (m_subscribed)
        {
            err_code = mqttsn_client_unsubscribe(&m_client, m_topic.p_topic_name, topic_name_len, &m_msg_id);
            if (err_code != NRF_SUCCESS)
            {
                NRF_LOG_ERROR("UNSUBSCRIBE message could not be sent.\r\n");
            }
            else
            {
                m_subscribed = false;
            }
        }
        else
        {
            err_code = mqttsn_client_subscribe(&m_client, m_topic.p_topic_name, topic_name_len, &m_msg_id);
            if (err_code != NRF_SUCCESS)
            {
                NRF_LOG_ERROR("SUBSCRIBE message could not be sent.\r\n");
            }
            else
            {
                m_subscribed = true;
            }
        }
    }
    
    static void led_state_pub(uint8_t led_state)
    {
        uint32_t err_code = mqttsn_client_publish(&m_client, m_topic.topic_id, &led_state, 1, &m_msg_id);
        if (err_code != NRF_SUCCESS)
        {
            NRF_LOG_ERROR("PUBLISH message could not be sent. Error code: 0x%x\r\n", err_code)
        }
    }
    
    static void publish(void)
    {
        m_led_state = m_led_state == 1 ? 0 : 1;
        led_state_pub(m_led_state);
    }
    
    static void bsp_event_handler(bsp_event_t event)
    {
        if (otThreadGetDeviceRole(thread_ot_instance_get()) < OT_DEVICE_ROLE_CHILD )
        {
            (void)event;
            return;
        }
    
        switch (event)
        {
    
            case BSP_EVENT_KEY_0:
            {
                publish();
                break;
            }
    
            case BSP_EVENT_KEY_1:
            {
                uint32_t err_code = mqttsn_client_search_gateway(&m_client, SEARCH_GATEWAY_TIMEOUT);
                if (err_code != NRF_SUCCESS)
                {
                    NRF_LOG_ERROR("SEARCH GATEWAY message could not be sent. Error: 0x%x\r\n", err_code);
                }
                break;
            }
    
            case BSP_EVENT_KEY_2:
            {
                uint32_t err_code;
    
                if (mqttsn_client_state_get(&m_client) == MQTTSN_CLIENT_CONNECTED)
                {
                    err_code = mqttsn_client_disconnect(&m_client);
                    if (err_code != NRF_SUCCESS)
                    {
                        NRF_LOG_ERROR("DISCONNECT message could not be sent. Error: 0x%x\r\n", err_code);
                    }
                }
                else
                {
                    err_code = mqttsn_client_connect(&m_client, &m_gateway_addr, m_gateway_id, &m_connect_opt);
                    if (err_code != NRF_SUCCESS)
                    {
                        NRF_LOG_ERROR("CONNECT message could not be sent. Error: 0x%x\r\n", err_code);
                    }
                }
                break;
            }
    
            case BSP_EVENT_KEY_3:
            {
                subscribe();
                break;
            }
    
            default:
                break;
        }
    }
    
    /***************************************************************************************************
     * @section Initialization
     **************************************************************************************************/
    
    /**@brief Function for initializing the Application Timer Module.
     */
    static void timer_init(void)
    {
        uint32_t err_code = app_timer_init();
        APP_ERROR_CHECK(err_code);
    }
    
    
    /**@brief Function for initializing the LEDs.
     */
    static void leds_init(void)
    {
        LEDS_CONFIGURE(LEDS_MASK);
        LEDS_OFF(LEDS_MASK);
    }
    
    
    /**@brief Function for initializing the nrf log module.
     */
    static void log_init(void)
    {
        ret_code_t err_code = NRF_LOG_INIT(NULL);
        APP_ERROR_CHECK(err_code);
    
        NRF_LOG_DEFAULT_BACKENDS_INIT();
    }
    
    
    /**@brief Function for initializing the Thread Board Support Package.
     */
    static void thread_bsp_init(void)
    {
        uint32_t err_code;
        err_code = bsp_init(BSP_INIT_LEDS | BSP_INIT_BUTTONS, bsp_event_handler);
        APP_ERROR_CHECK(err_code);
    
        err_code = bsp_thread_init(thread_ot_instance_get());
        APP_ERROR_CHECK(err_code);
    }
    
    
    /**@brief Function for initializing the Thread Stack.
     */
    static void thread_instance_init(void)
    {
        thread_configuration_t thread_configuration =
        {
            .radio_mode        = THREAD_RADIO_MODE_RX_ON_WHEN_IDLE,
            .autocommissioning = true,
        };
    
        thread_init(&thread_configuration);
        thread_cli_init();
        thread_state_changed_callback_set(state_changed_callback);
    }
    
    
    /**@brief Function for initializing the MQTTSN client.
     */
    static void mqttsn_init(void)
    {
        uint32_t err_code = mqttsn_client_init(&m_client,
                                               MQTTSN_DEFAULT_CLIENT_PORT,
                                               mqttsn_evt_handler,
                                               thread_ot_instance_get());
        APP_ERROR_CHECK(err_code);
    
        connect_opt_init();
    }
    
    
    /**@brief Function for initializing scheduler module.
     */
    static void scheduler_init(void)
    {
        APP_SCHED_INIT(SCHED_EVENT_DATA_SIZE, SCHED_QUEUE_SIZE);
    }
    
    /***************************************************************************************************
     * @section Main
     **************************************************************************************************/
    
    int main(int argc, char *argv[])
    {
        log_init();
        scheduler_init();
        timer_init();
        leds_init();
    
        thread_instance_init();
        thread_bsp_init();
        mqttsn_init();
    
        while (true)
        {
            thread_process();
            app_sched_execute();
    
            if (NRF_LOG_PROCESS() == false)
            {
                thread_sleep();
            }
        }
    }
    
    /**
     *@}
     **/
    

    Best regards,
    Jørgen

  • Hi Jørgen,
    Thank you for your reply.

    1. Can this combined example support two devices at the same time?
    If using only one device, it can work, but using two devices, it will display "message could not be processed" error.
    this is my step.
    a) device A subscribe to the topic of device B publish
    b) device B subscribe to the topic of device A publish
    c) device A Press button 2 to search for GW.
    d) device B Press button 2 to search for GW.
    e) device A will display "message could not be processed" error
    Pls kindly help advise how to solve it?

    2. you say "I tested this on my end, and I am seeing the same behavior. I believe that the unhandled event message happens because the second board is in a state where it does not expect to receive this event when the first board does a Gateway search. The examples are most likely not written with multiple publishers in mind." from "Thread Border Router networking fail", now I just search for GW, it display "message could not be processed" error.
    Pls kindly help advise how to solve it?

    Best Regards,

    Rick.

  • Hi Jørgen,

    Thank you for your help!

    I can work of MQTT from your example.

    but it still will display "message could not be processed" error.

    Best Regards,

    Rick.

  • Hi,

    Rick said:
    but it still will display "message could not be processed" error.

    Yes, since the GW search uses a broadcast address, this message will be received by all nodes. You can suppress the message by handling the event in message_handle in mqttsn_packet_receiver.c:

    case MQTTSN_SEARCHGW:
        NRF_LOG_INFO("Another Thread device searched for MQTT-SN GW, ignore and return success");
        err_code = NRF_SUCCESS;
        break;

    This message type is sent by the client, and should only need to be handled by the gateway.

    Best regards,
    Jørgen

Related