mqtt_connect(&client) does not return any value during operation, and the program is stuck in mqtt_connect(&client) and does not come out

When I connected mqtt on nRF9160, I found a strange problem, which caused the device to freeze and become bricked. The location cause was found to appear in err = mqtt_connect(&client), mqtt_connect(&client) does not return any value when running, and the program is stuck mqtt_connect(&client) does not come out. What is going on? It is easy to reproduce after a period of normal operation. The device is running on ncs1.5.1, modem V1.2.3.

Below is my mqtt file:


#include <logging/log.h>
#include <zephyr.h>
#include <stdio.h>
#include <net/mqtt.h>
#include <net/socket.h>
#include "mt_mqtt.h"
#include <random/rand32.h>
#include "watchdog.h"

LOG_MODULE_REGISTER(mqtt, 4);
/* clang-format off */
#define INVALID_FDS -1

#define THREAD_STACK_SIZE	KB(2)
#define THREAD_PRIORITY		K_LOWEST_APPLICATION_THREAD_PRIO
#define IMEI_LEN			(15)
/* clang-format on */

static K_THREAD_STACK_DEFINE(mqtt_thread_stack, THREAD_STACK_SIZE);

/* Buffers for MQTT client. */
static uint8_t rx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
static uint8_t tx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
static uint8_t payload_buf[CONFIG_MQTT_PAYLOAD_BUFFER_SIZE];

#if defined(CONFIG_MQTT_LIB_TLS)
static sec_tag_t sec_tag_list[] = { CONFIG_SEC_TAG };
#endif /* defined(CONFIG_MQTT_LIB_TLS) */

/* The mqtt client struct */
static struct mqtt_client client;

/* MQTT Broker details. */
static struct sockaddr_storage broker;

/* Connected flag */
static K_SEM_DEFINE(mqtt_start_connect, 0, 1);

static bool keep_connect;

/* File descriptor */
static struct pollfd fds = {
    .fd = INVALID_FDS,
};

static mqtt_evt_cb m_cb;
static uint8_t dev_imei[IMEI_LEN];

/**@brief Function to subscribe to the configured topic
 */
static int subscribe(void)
{
    struct mqtt_topic subscribe_topic = { .topic = { .utf8 = CONFIG_MQTT_SUB_TOPIC,
                                                     .size = strlen(CONFIG_MQTT_SUB_TOPIC) },
                                          .qos   = MQTT_QOS_1_AT_LEAST_ONCE };

    const struct mqtt_subscription_list subscription_list = { .list       = &subscribe_topic,
                                                              .list_count = 1,
                                                              .message_id = 1234 };

    LOG_INF("Subscribing to: %s len %u\n",
            CONFIG_MQTT_SUB_TOPIC,
            (unsigned int)strlen(CONFIG_MQTT_SUB_TOPIC));

    return mqtt_subscribe(&client, &subscription_list);
}

/**@brief Function to read the published payload.
 */
static int publish_get_payload(struct mqtt_client *c, size_t length)
{
    uint8_t *buf = payload_buf;
    uint8_t *end = buf + length;

    if (length > sizeof(payload_buf)) {
        return -EMSGSIZE;
    }

    while (buf < end) {
        int ret = mqtt_read_publish_payload(c, buf, end - buf);

        if (ret < 0) {
            int err;

            if (ret != -EAGAIN) {
                return ret;
            }

            LOG_INF("mqtt_read_publish_payload: EAGAIN\n");

            err = poll(&fds, 1, CONFIG_MQTT_KEEPALIVE * MSEC_PER_SEC);
            if (err > 0 && (fds.revents & POLLIN) == POLLIN) {
                continue;
            } else {
                return -EIO;
            }
        }

        if (ret == 0) {
            return -EIO;
        }

        buf += ret;
    }

    return 0;
}

/**@brief MQTT client event handler
 */
static void mqtt_evt_handler(struct mqtt_client *const c, const struct mqtt_evt *evt)
{
    int                              err;
    const struct mqtt_publish_param *p = &evt->param.publish;
    mqtt_evt_t tmp_mqtt_evt;

    switch (evt->type) {
        case MQTT_EVT_CONNACK:
            if (evt->result != 0) {
                keep_connect = false;
            }
            LOG_INF("[%s:%d] MQTT client connected!\n", __func__, __LINE__);
            subscribe();
            break;

        case MQTT_EVT_DISCONNECT:
            LOG_INF("[%s:%d] MQTT client disconnected %d\n", __func__, __LINE__, evt->result);
            keep_connect   = false;
            tmp_mqtt_evt.type = MQTT_EVT_DISCONNECT;
            m_cb(&tmp_mqtt_evt);
            break;

        case MQTT_EVT_PUBLISH:

            LOG_INF("[%s:%d] MQTT PUBLISH result=%d len=%d\n",
                    __func__,
                    __LINE__,
                    evt->result,
                    p->message.payload.len);
            err = publish_get_payload(c, p->message.payload.len);
            if (err >= 0) {
                tmp_mqtt_evt.type = MQTT_EVT_PUBLISH;
                memcpy(tmp_mqtt_evt.rec_data, payload_buf, p->message.payload.len);
                tmp_mqtt_evt.rec_len = p->message.payload.len;
                m_cb(&tmp_mqtt_evt);
            } else {
                LOG_INF("mqtt_read_publish_payload: Failed! %d\n", err);
                LOG_INF("Disconnecting MQTT client...\n");

                err = mqtt_disconnect(c);
                if (err) {
                    LOG_INF("Could not disconnect: %d\n", err);
                }
            }
            break;

        case MQTT_EVT_PUBACK:
            if (evt->result != 0) {
                LOG_INF("MQTT PUBACK error %d\n", evt->result);
                break;
            }
            LOG_INF(
                "[%s:%d] PUBACK packet id: %u\n", __func__, __LINE__, evt->param.puback.message_id);
            tmp_mqtt_evt.type = MQTT_EVT_PUBACK;
            m_cb(&tmp_mqtt_evt);
            break;

        case MQTT_EVT_SUBACK:
            if (evt->result != 0) {
                LOG_INF("MQTT SUBACK error %d\n", evt->result);
                break;
            }

            LOG_INF(
                "[%s:%d] SUBACK packet id: %u\n", __func__, __LINE__, evt->param.suback.message_id);
            tmp_mqtt_evt.type = MQTT_EVT_SUBACK;
            m_cb(&tmp_mqtt_evt);
            break;

        default:
            LOG_INF("[%s:%d] default: %d\n", __func__, __LINE__, evt->type);
            break;
    }
}

static void mqtt_thread_fn(void *arg1, void *arg2, void *arg3)
{
    int err;

    while (1) {
        /* Don't go any further until MQTT is connected */
        k_sem_take(&mqtt_start_connect, K_FOREVER);
        while (keep_connect) {
            err = poll(&fds, 1, mqtt_keepalive_time_left(&client));
            if (err < 0) {
                LOG_ERR("ERROR: poll %d", errno);
                break;
            }
            err = mqtt_live(&client);
            if ((err != 0) && (err != -EAGAIN)) {
                LOG_ERR("ERROR: mqtt_live %d", err);
                break;
            }
            if ((fds.revents & POLLIN) == POLLIN) {
                err = mqtt_input(&client);
                if (err != 0) {
                    LOG_ERR("ERROR: mqtt_input %d", err);
                    mqtt_abort(&client);
                    break;
                }
            }
            if ((fds.revents & POLLERR) == POLLERR) {
                LOG_ERR("POLLERR");
                mqtt_abort(&client);
                break;
            }
            if ((fds.revents & POLLNVAL) == POLLNVAL) {
                LOG_ERR("POLLNVAL");
                mqtt_abort(&client);
                break;
            }
        }
    }
}

/**@brief Resolves the configured hostname and
 * initializes the MQTT broker structure
 */
static void broker_init(void)
{
    int              err;
    struct addrinfo *result;
    struct addrinfo *addr;
    struct addrinfo  hints = { .ai_family = AF_INET, .ai_socktype = SOCK_STREAM };

    err = getaddrinfo(CONFIG_MQTT_BROKER_HOSTNAME, NULL, &hints, &result);
    if (err) {
        LOG_INF("ERROR: getaddrinfo failed %d\n", err);

        return;
    }

    addr = result;
    err  = -ENOENT;

    /* Look for address of the broker. */
    while (addr != NULL) {
        /* IPv4 Address. */
        if (addr->ai_addrlen == sizeof(struct sockaddr_in)) {
            struct sockaddr_in *broker4 = ((struct sockaddr_in *)&broker);
            char                ipv4_addr[NET_IPV4_ADDR_LEN];

            broker4->sin_addr.s_addr = ((struct sockaddr_in *)addr->ai_addr)->sin_addr.s_addr;
            broker4->sin_family      = AF_INET;
            broker4->sin_port        = htons(CONFIG_MQTT_BROKER_PORT);

            inet_ntop(AF_INET, &broker4->sin_addr.s_addr, ipv4_addr, sizeof(ipv4_addr));
            LOG_INF("IPv4 Address found %s\n", ipv4_addr);

            break;
        } else {
            LOG_INF("ai_addrlen = %u should be %u or %u\n",
                    (unsigned int)addr->ai_addrlen,
                    (unsigned int)sizeof(struct sockaddr_in),
                    (unsigned int)sizeof(struct sockaddr_in6));
        }

        addr = addr->ai_next;
        break;
    }

    /* Free the address. */
    freeaddrinfo(result);
}

/**@brief Initialize the MQTT client structure
 */
static void client_init(void)
{
    mqtt_client_init(&client);

    broker_init();

    /* MQTT client configuration */
    client.broker           = &broker;
    client.evt_cb           = mqtt_evt_handler;
    client.client_id.utf8   = (uint8_t *)dev_imei;
    client.client_id.size   = IMEI_LEN;
    client.password         = NULL;
    client.user_name        = NULL;
    client.protocol_version = MQTT_VERSION_3_1_1;

    /* MQTT buffers configuration */
    client.rx_buf      = rx_buffer;
    client.rx_buf_size = sizeof(rx_buffer);
    client.tx_buf      = tx_buffer;
    client.tx_buf_size = sizeof(tx_buffer);

    /* MQTT transport configuration */
#if defined(CONFIG_MQTT_LIB_TLS)
    struct mqtt_sec_config *tls_config = &client.transport.tls.config;

    client.transport.type = MQTT_TRANSPORT_SECURE;

    tls_config->peer_verify   = CONFIG_PEER_VERIFY;
    tls_config->cipher_count  = 0;
    tls_config->cipher_list   = NULL;
    tls_config->sec_tag_count = ARRAY_SIZE(sec_tag_list);
    tls_config->sec_tag_list  = sec_tag_list;
    tls_config->hostname      = CONFIG_MQTT_BROKER_HOSTNAME;
#else  /* MQTT transport configuration */
    client.transport.type = MQTT_TRANSPORT_NON_SECURE;
#endif /* defined(CONFIG_MQTT_LIB_TLS) */
}

/**@brief Initialize the file descriptor structure used by poll.
 */
static int fds_init(struct mqtt_client *c)
{
    if (c->transport.type == MQTT_TRANSPORT_NON_SECURE) {
        fds.fd = c->transport.tcp.sock;
    } else {
#if defined(CONFIG_MQTT_LIB_TLS)
        fds.fd = c->transport.tls.sock;
#else
        return -ENOTSUP;
#endif
    }

    fds.events = POLLIN;

    return 0;
}

int mt_mqtt_disconnect(void)
{
    int err;

    err = mqtt_disconnect(&client);
    if (err) {
        LOG_ERR("ERROR: mqtt_disconnect %d", err);
    }

    return err;
}

int mt_mqtt_publish(uint16_t qos, uint8_t *msg, size_t msg_len)
{
    struct mqtt_publish_param param;

    param.message.topic.qos        = qos;
    param.message.topic.topic.utf8 = CONFIG_MQTT_PUB_TOPIC;
    param.message.topic.topic.size = strlen(CONFIG_MQTT_PUB_TOPIC);
    param.message.payload.data     = msg;
    param.message.payload.len      = msg_len;
    param.message_id               = sys_rand32_get();
    param.dup_flag                 = 0;
    param.retain_flag              = 0;

    LOG_HEXDUMP_INF(msg, msg_len, "Publishing: ");
    LOG_INF("to topic: %s len: %u\n",
            CONFIG_MQTT_PUB_TOPIC,
            (unsigned int)strlen(CONFIG_MQTT_PUB_TOPIC));
    return mqtt_publish(&client, &param);
}

int mt_mqtt_cb_init(uint8_t * imei , mqtt_evt_cb mqtt_cb)
{
    if (mqtt_cb == NULL) {
        return -1;
    }
    m_cb = mqtt_cb;
    memcpy(dev_imei, imei, IMEI_LEN);
    return 0;
}

void suspend_thread(void)
{
    k_thread_suspend(k_current_get());
}

int mt_mqtt_connect(void)
{
    int err = -EINVAL;

    if (keep_connect) {
        return -EINPROGRESS;
    }

    client_init();
    watchdog_feed_dog();
    err = mqtt_connect(&client);
    if (err != 0) {
        LOG_ERR("ERROR: mqtt_connect %d", err);
        return err;
    }

    err = fds_init(&client);
    if (err != 0) {
        LOG_ERR("ERROR: fds_init %d", err);
        return err;
    }

    /** start polling now for CONNACK */
    keep_connect = true;
    k_sem_give(&mqtt_start_connect);

    return 0;
}

K_THREAD_DEFINE(mqtt_thread,
                K_THREAD_STACK_SIZEOF(mqtt_thread_stack),
                mqtt_thread_fn,
                NULL,
                NULL,
                NULL,
                THREAD_PRIORITY,
                0,
                0);
mt_mqtt.h

Parents Reply
  • Hi,

     

    george.zhou said:
    I did not close the socket

    You should close the socket, via the mqtt_disconnect() or mqtt_abort(). If not, you risk leaving the old socket still open after the mqtt_connect() call.

    george.zhou said:
    After a day of testing, the problem still reappeared, but it did not solve the problem. Encountered a new bug, please take a look at what is going on, thank you!

    Which new bug did you observe?

     

    Kind regards,

    Håkon

Children
  • Tested according to the method you said, and did not solve the problem, indicating that the problem I encountered is different from the problem you mentioned, and it should be a new problem.

    First of all, the connection to mqtt is blocked, and I did not reconnect to mqtt because the program is stuck in the mqtt connection and does not return, but once the mqtt connection is successful, I will use mqtt_disconnect() to close the socket after sending the data.I can make sure that the socket is closed every time before connecting mqtt

  • Hi,

     

    Could you try setting these configurations to see if the scenario improves?

    CONFIG_NRF_MODEM_LIB_SENDMSG_BUF_SIZE=2048
    CONFIG_NRF_MODEM_LIB_HEAP_SIZE=2048

     

    Kind regards,

    Håkon

  • Hi,

    I added the two macros you mentioned and tested them for a day on ncs1.6.0 and mfw_nrf9160_1.2.7, and found that the problem was improved. Some get stuck for 3 and a half minutes and return an error code, and sometimes get stuck for more than ten minutes and return an error code. The error returned is EINVAL. I will test for a few more days to see if the problem is completely solved. If it does solve the problem, I will test again on ncs1.5.1 and mfw_nrf9160_1.2.3 to see if this problem can also be solved. I will reply to the results after the test to you, thank you so much!

  • Hi,

    I tested one more day, unfortunately, the problem reappeared, stuck in mqtt connection with no return value all the time.

  • Hi,

     

    george.zhou said:
    I added the two macros you mentioned and tested them for a day on ncs1.6.0 and mfw_nrf9160_1.2.7, and found that the problem was improved. Some get stuck for 3 and a half minutes and return an error code, and sometimes get stuck for more than ten minutes and return an error code. The error returned is EINVAL.

    This is expected behavior if the nRF is unable to connect, due to network connectivity problems.

    george.zhou said:
    I tested one more day, unfortunately, the problem reappeared, stuck in mqtt connection with no return value all the time.

    And this was with which SDK and modem fw?

    The connect() call never returned?

     

    Kind regards,

    Håkon

Related