mqtt_connect(&client) does not return any value during operation, and the program is stuck in mqtt_connect(&client) and does not come out

When I connected mqtt on nRF9160, I found a strange problem, which caused the device to freeze and become bricked. The location cause was found to appear in err = mqtt_connect(&client), mqtt_connect(&client) does not return any value when running, and the program is stuck mqtt_connect(&client) does not come out. What is going on? It is easy to reproduce after a period of normal operation. The device is running on ncs1.5.1, modem V1.2.3.

Below is my mqtt file:


#include <logging/log.h>
#include <zephyr.h>
#include <stdio.h>
#include <net/mqtt.h>
#include <net/socket.h>
#include "mt_mqtt.h"
#include <random/rand32.h>
#include "watchdog.h"

LOG_MODULE_REGISTER(mqtt, 4);
/* clang-format off */
#define INVALID_FDS -1

#define THREAD_STACK_SIZE	KB(2)
#define THREAD_PRIORITY		K_LOWEST_APPLICATION_THREAD_PRIO
#define IMEI_LEN			(15)
/* clang-format on */

static K_THREAD_STACK_DEFINE(mqtt_thread_stack, THREAD_STACK_SIZE);

/* Buffers for MQTT client. */
static uint8_t rx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
static uint8_t tx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
static uint8_t payload_buf[CONFIG_MQTT_PAYLOAD_BUFFER_SIZE];

#if defined(CONFIG_MQTT_LIB_TLS)
static sec_tag_t sec_tag_list[] = { CONFIG_SEC_TAG };
#endif /* defined(CONFIG_MQTT_LIB_TLS) */

/* The mqtt client struct */
static struct mqtt_client client;

/* MQTT Broker details. */
static struct sockaddr_storage broker;

/* Connected flag */
static K_SEM_DEFINE(mqtt_start_connect, 0, 1);

static bool keep_connect;

/* File descriptor */
static struct pollfd fds = {
    .fd = INVALID_FDS,
};

static mqtt_evt_cb m_cb;
static uint8_t dev_imei[IMEI_LEN];

/**@brief Function to subscribe to the configured topic
 */
static int subscribe(void)
{
    struct mqtt_topic subscribe_topic = { .topic = { .utf8 = CONFIG_MQTT_SUB_TOPIC,
                                                     .size = strlen(CONFIG_MQTT_SUB_TOPIC) },
                                          .qos   = MQTT_QOS_1_AT_LEAST_ONCE };

    const struct mqtt_subscription_list subscription_list = { .list       = &subscribe_topic,
                                                              .list_count = 1,
                                                              .message_id = 1234 };

    LOG_INF("Subscribing to: %s len %u\n",
            CONFIG_MQTT_SUB_TOPIC,
            (unsigned int)strlen(CONFIG_MQTT_SUB_TOPIC));

    return mqtt_subscribe(&client, &subscription_list);
}

/**@brief Function to read the published payload.
 */
static int publish_get_payload(struct mqtt_client *c, size_t length)
{
    uint8_t *buf = payload_buf;
    uint8_t *end = buf + length;

    if (length > sizeof(payload_buf)) {
        return -EMSGSIZE;
    }

    while (buf < end) {
        int ret = mqtt_read_publish_payload(c, buf, end - buf);

        if (ret < 0) {
            int err;

            if (ret != -EAGAIN) {
                return ret;
            }

            LOG_INF("mqtt_read_publish_payload: EAGAIN\n");

            err = poll(&fds, 1, CONFIG_MQTT_KEEPALIVE * MSEC_PER_SEC);
            if (err > 0 && (fds.revents & POLLIN) == POLLIN) {
                continue;
            } else {
                return -EIO;
            }
        }

        if (ret == 0) {
            return -EIO;
        }

        buf += ret;
    }

    return 0;
}

/**@brief MQTT client event handler
 */
static void mqtt_evt_handler(struct mqtt_client *const c, const struct mqtt_evt *evt)
{
    int                              err;
    const struct mqtt_publish_param *p = &evt->param.publish;
    mqtt_evt_t tmp_mqtt_evt;

    switch (evt->type) {
        case MQTT_EVT_CONNACK:
            if (evt->result != 0) {
                keep_connect = false;
            }
            LOG_INF("[%s:%d] MQTT client connected!\n", __func__, __LINE__);
            subscribe();
            break;

        case MQTT_EVT_DISCONNECT:
            LOG_INF("[%s:%d] MQTT client disconnected %d\n", __func__, __LINE__, evt->result);
            keep_connect   = false;
            tmp_mqtt_evt.type = MQTT_EVT_DISCONNECT;
            m_cb(&tmp_mqtt_evt);
            break;

        case MQTT_EVT_PUBLISH:

            LOG_INF("[%s:%d] MQTT PUBLISH result=%d len=%d\n",
                    __func__,
                    __LINE__,
                    evt->result,
                    p->message.payload.len);
            err = publish_get_payload(c, p->message.payload.len);
            if (err >= 0) {
                tmp_mqtt_evt.type = MQTT_EVT_PUBLISH;
                memcpy(tmp_mqtt_evt.rec_data, payload_buf, p->message.payload.len);
                tmp_mqtt_evt.rec_len = p->message.payload.len;
                m_cb(&tmp_mqtt_evt);
            } else {
                LOG_INF("mqtt_read_publish_payload: Failed! %d\n", err);
                LOG_INF("Disconnecting MQTT client...\n");

                err = mqtt_disconnect(c);
                if (err) {
                    LOG_INF("Could not disconnect: %d\n", err);
                }
            }
            break;

        case MQTT_EVT_PUBACK:
            if (evt->result != 0) {
                LOG_INF("MQTT PUBACK error %d\n", evt->result);
                break;
            }
            LOG_INF(
                "[%s:%d] PUBACK packet id: %u\n", __func__, __LINE__, evt->param.puback.message_id);
            tmp_mqtt_evt.type = MQTT_EVT_PUBACK;
            m_cb(&tmp_mqtt_evt);
            break;

        case MQTT_EVT_SUBACK:
            if (evt->result != 0) {
                LOG_INF("MQTT SUBACK error %d\n", evt->result);
                break;
            }

            LOG_INF(
                "[%s:%d] SUBACK packet id: %u\n", __func__, __LINE__, evt->param.suback.message_id);
            tmp_mqtt_evt.type = MQTT_EVT_SUBACK;
            m_cb(&tmp_mqtt_evt);
            break;

        default:
            LOG_INF("[%s:%d] default: %d\n", __func__, __LINE__, evt->type);
            break;
    }
}

static void mqtt_thread_fn(void *arg1, void *arg2, void *arg3)
{
    int err;

    while (1) {
        /* Don't go any further until MQTT is connected */
        k_sem_take(&mqtt_start_connect, K_FOREVER);
        while (keep_connect) {
            err = poll(&fds, 1, mqtt_keepalive_time_left(&client));
            if (err < 0) {
                LOG_ERR("ERROR: poll %d", errno);
                break;
            }
            err = mqtt_live(&client);
            if ((err != 0) && (err != -EAGAIN)) {
                LOG_ERR("ERROR: mqtt_live %d", err);
                break;
            }
            if ((fds.revents & POLLIN) == POLLIN) {
                err = mqtt_input(&client);
                if (err != 0) {
                    LOG_ERR("ERROR: mqtt_input %d", err);
                    mqtt_abort(&client);
                    break;
                }
            }
            if ((fds.revents & POLLERR) == POLLERR) {
                LOG_ERR("POLLERR");
                mqtt_abort(&client);
                break;
            }
            if ((fds.revents & POLLNVAL) == POLLNVAL) {
                LOG_ERR("POLLNVAL");
                mqtt_abort(&client);
                break;
            }
        }
    }
}

/**@brief Resolves the configured hostname and
 * initializes the MQTT broker structure
 */
static void broker_init(void)
{
    int              err;
    struct addrinfo *result;
    struct addrinfo *addr;
    struct addrinfo  hints = { .ai_family = AF_INET, .ai_socktype = SOCK_STREAM };

    err = getaddrinfo(CONFIG_MQTT_BROKER_HOSTNAME, NULL, &hints, &result);
    if (err) {
        LOG_INF("ERROR: getaddrinfo failed %d\n", err);

        return;
    }

    addr = result;
    err  = -ENOENT;

    /* Look for address of the broker. */
    while (addr != NULL) {
        /* IPv4 Address. */
        if (addr->ai_addrlen == sizeof(struct sockaddr_in)) {
            struct sockaddr_in *broker4 = ((struct sockaddr_in *)&broker);
            char                ipv4_addr[NET_IPV4_ADDR_LEN];

            broker4->sin_addr.s_addr = ((struct sockaddr_in *)addr->ai_addr)->sin_addr.s_addr;
            broker4->sin_family      = AF_INET;
            broker4->sin_port        = htons(CONFIG_MQTT_BROKER_PORT);

            inet_ntop(AF_INET, &broker4->sin_addr.s_addr, ipv4_addr, sizeof(ipv4_addr));
            LOG_INF("IPv4 Address found %s\n", ipv4_addr);

            break;
        } else {
            LOG_INF("ai_addrlen = %u should be %u or %u\n",
                    (unsigned int)addr->ai_addrlen,
                    (unsigned int)sizeof(struct sockaddr_in),
                    (unsigned int)sizeof(struct sockaddr_in6));
        }

        addr = addr->ai_next;
        break;
    }

    /* Free the address. */
    freeaddrinfo(result);
}

/**@brief Initialize the MQTT client structure
 */
static void client_init(void)
{
    mqtt_client_init(&client);

    broker_init();

    /* MQTT client configuration */
    client.broker           = &broker;
    client.evt_cb           = mqtt_evt_handler;
    client.client_id.utf8   = (uint8_t *)dev_imei;
    client.client_id.size   = IMEI_LEN;
    client.password         = NULL;
    client.user_name        = NULL;
    client.protocol_version = MQTT_VERSION_3_1_1;

    /* MQTT buffers configuration */
    client.rx_buf      = rx_buffer;
    client.rx_buf_size = sizeof(rx_buffer);
    client.tx_buf      = tx_buffer;
    client.tx_buf_size = sizeof(tx_buffer);

    /* MQTT transport configuration */
#if defined(CONFIG_MQTT_LIB_TLS)
    struct mqtt_sec_config *tls_config = &client.transport.tls.config;

    client.transport.type = MQTT_TRANSPORT_SECURE;

    tls_config->peer_verify   = CONFIG_PEER_VERIFY;
    tls_config->cipher_count  = 0;
    tls_config->cipher_list   = NULL;
    tls_config->sec_tag_count = ARRAY_SIZE(sec_tag_list);
    tls_config->sec_tag_list  = sec_tag_list;
    tls_config->hostname      = CONFIG_MQTT_BROKER_HOSTNAME;
#else  /* MQTT transport configuration */
    client.transport.type = MQTT_TRANSPORT_NON_SECURE;
#endif /* defined(CONFIG_MQTT_LIB_TLS) */
}

/**@brief Initialize the file descriptor structure used by poll.
 */
static int fds_init(struct mqtt_client *c)
{
    if (c->transport.type == MQTT_TRANSPORT_NON_SECURE) {
        fds.fd = c->transport.tcp.sock;
    } else {
#if defined(CONFIG_MQTT_LIB_TLS)
        fds.fd = c->transport.tls.sock;
#else
        return -ENOTSUP;
#endif
    }

    fds.events = POLLIN;

    return 0;
}

int mt_mqtt_disconnect(void)
{
    int err;

    err = mqtt_disconnect(&client);
    if (err) {
        LOG_ERR("ERROR: mqtt_disconnect %d", err);
    }

    return err;
}

int mt_mqtt_publish(uint16_t qos, uint8_t *msg, size_t msg_len)
{
    struct mqtt_publish_param param;

    param.message.topic.qos        = qos;
    param.message.topic.topic.utf8 = CONFIG_MQTT_PUB_TOPIC;
    param.message.topic.topic.size = strlen(CONFIG_MQTT_PUB_TOPIC);
    param.message.payload.data     = msg;
    param.message.payload.len      = msg_len;
    param.message_id               = sys_rand32_get();
    param.dup_flag                 = 0;
    param.retain_flag              = 0;

    LOG_HEXDUMP_INF(msg, msg_len, "Publishing: ");
    LOG_INF("to topic: %s len: %u\n",
            CONFIG_MQTT_PUB_TOPIC,
            (unsigned int)strlen(CONFIG_MQTT_PUB_TOPIC));
    return mqtt_publish(&client, &param);
}

int mt_mqtt_cb_init(uint8_t * imei , mqtt_evt_cb mqtt_cb)
{
    if (mqtt_cb == NULL) {
        return -1;
    }
    m_cb = mqtt_cb;
    memcpy(dev_imei, imei, IMEI_LEN);
    return 0;
}

void suspend_thread(void)
{
    k_thread_suspend(k_current_get());
}

int mt_mqtt_connect(void)
{
    int err = -EINVAL;

    if (keep_connect) {
        return -EINPROGRESS;
    }

    client_init();
    watchdog_feed_dog();
    err = mqtt_connect(&client);
    if (err != 0) {
        LOG_ERR("ERROR: mqtt_connect %d", err);
        return err;
    }

    err = fds_init(&client);
    if (err != 0) {
        LOG_ERR("ERROR: fds_init %d", err);
        return err;
    }

    /** start polling now for CONNACK */
    keep_connect = true;
    k_sem_give(&mqtt_start_connect);

    return 0;
}

K_THREAD_DEFINE(mqtt_thread,
                K_THREAD_STACK_SIZEOF(mqtt_thread_stack),
                mqtt_thread_fn,
                NULL,
                NULL,
                NULL,
                THREAD_PRIORITY,
                0,
                0);
mt_mqtt.h

Parents Reply Children
Related