When I connected mqtt on nRF9160, I found a strange problem, which caused the device to freeze and become bricked. The location cause was found to appear in err = mqtt_connect(&client), mqtt_connect(&client) does not return any value when running, and the program is stuck mqtt_connect(&client) does not come out. What is going on? It is easy to reproduce after a period of normal operation. The device is running on ncs1.5.1, modem V1.2.3.
Below is my mqtt file:
#include <logging/log.h>
#include <zephyr.h>
#include <stdio.h>
#include <net/mqtt.h>
#include <net/socket.h>
#include "mt_mqtt.h"
#include <random/rand32.h>
#include "watchdog.h"
LOG_MODULE_REGISTER(mqtt, 4);
/* clang-format off */
#define INVALID_FDS -1
#define THREAD_STACK_SIZE KB(2)
#define THREAD_PRIORITY K_LOWEST_APPLICATION_THREAD_PRIO
#define IMEI_LEN (15)
/* clang-format on */
static K_THREAD_STACK_DEFINE(mqtt_thread_stack, THREAD_STACK_SIZE);
/* Buffers for MQTT client. */
static uint8_t rx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
static uint8_t tx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
static uint8_t payload_buf[CONFIG_MQTT_PAYLOAD_BUFFER_SIZE];
#if defined(CONFIG_MQTT_LIB_TLS)
static sec_tag_t sec_tag_list[] = { CONFIG_SEC_TAG };
#endif /* defined(CONFIG_MQTT_LIB_TLS) */
/* The mqtt client struct */
static struct mqtt_client client;
/* MQTT Broker details. */
static struct sockaddr_storage broker;
/* Connected flag */
static K_SEM_DEFINE(mqtt_start_connect, 0, 1);
static bool keep_connect;
/* File descriptor */
static struct pollfd fds = {
.fd = INVALID_FDS,
};
static mqtt_evt_cb m_cb;
static uint8_t dev_imei[IMEI_LEN];
/**@brief Function to subscribe to the configured topic
*/
static int subscribe(void)
{
struct mqtt_topic subscribe_topic = { .topic = { .utf8 = CONFIG_MQTT_SUB_TOPIC,
.size = strlen(CONFIG_MQTT_SUB_TOPIC) },
.qos = MQTT_QOS_1_AT_LEAST_ONCE };
const struct mqtt_subscription_list subscription_list = { .list = &subscribe_topic,
.list_count = 1,
.message_id = 1234 };
LOG_INF("Subscribing to: %s len %u\n",
CONFIG_MQTT_SUB_TOPIC,
(unsigned int)strlen(CONFIG_MQTT_SUB_TOPIC));
return mqtt_subscribe(&client, &subscription_list);
}
/**@brief Function to read the published payload.
*/
static int publish_get_payload(struct mqtt_client *c, size_t length)
{
uint8_t *buf = payload_buf;
uint8_t *end = buf + length;
if (length > sizeof(payload_buf)) {
return -EMSGSIZE;
}
while (buf < end) {
int ret = mqtt_read_publish_payload(c, buf, end - buf);
if (ret < 0) {
int err;
if (ret != -EAGAIN) {
return ret;
}
LOG_INF("mqtt_read_publish_payload: EAGAIN\n");
err = poll(&fds, 1, CONFIG_MQTT_KEEPALIVE * MSEC_PER_SEC);
if (err > 0 && (fds.revents & POLLIN) == POLLIN) {
continue;
} else {
return -EIO;
}
}
if (ret == 0) {
return -EIO;
}
buf += ret;
}
return 0;
}
/**@brief MQTT client event handler
*/
static void mqtt_evt_handler(struct mqtt_client *const c, const struct mqtt_evt *evt)
{
int err;
const struct mqtt_publish_param *p = &evt->param.publish;
mqtt_evt_t tmp_mqtt_evt;
switch (evt->type) {
case MQTT_EVT_CONNACK:
if (evt->result != 0) {
keep_connect = false;
}
LOG_INF("[%s:%d] MQTT client connected!\n", __func__, __LINE__);
subscribe();
break;
case MQTT_EVT_DISCONNECT:
LOG_INF("[%s:%d] MQTT client disconnected %d\n", __func__, __LINE__, evt->result);
keep_connect = false;
tmp_mqtt_evt.type = MQTT_EVT_DISCONNECT;
m_cb(&tmp_mqtt_evt);
break;
case MQTT_EVT_PUBLISH:
LOG_INF("[%s:%d] MQTT PUBLISH result=%d len=%d\n",
__func__,
__LINE__,
evt->result,
p->message.payload.len);
err = publish_get_payload(c, p->message.payload.len);
if (err >= 0) {
tmp_mqtt_evt.type = MQTT_EVT_PUBLISH;
memcpy(tmp_mqtt_evt.rec_data, payload_buf, p->message.payload.len);
tmp_mqtt_evt.rec_len = p->message.payload.len;
m_cb(&tmp_mqtt_evt);
} else {
LOG_INF("mqtt_read_publish_payload: Failed! %d\n", err);
LOG_INF("Disconnecting MQTT client...\n");
err = mqtt_disconnect(c);
if (err) {
LOG_INF("Could not disconnect: %d\n", err);
}
}
break;
case MQTT_EVT_PUBACK:
if (evt->result != 0) {
LOG_INF("MQTT PUBACK error %d\n", evt->result);
break;
}
LOG_INF(
"[%s:%d] PUBACK packet id: %u\n", __func__, __LINE__, evt->param.puback.message_id);
tmp_mqtt_evt.type = MQTT_EVT_PUBACK;
m_cb(&tmp_mqtt_evt);
break;
case MQTT_EVT_SUBACK:
if (evt->result != 0) {
LOG_INF("MQTT SUBACK error %d\n", evt->result);
break;
}
LOG_INF(
"[%s:%d] SUBACK packet id: %u\n", __func__, __LINE__, evt->param.suback.message_id);
tmp_mqtt_evt.type = MQTT_EVT_SUBACK;
m_cb(&tmp_mqtt_evt);
break;
default:
LOG_INF("[%s:%d] default: %d\n", __func__, __LINE__, evt->type);
break;
}
}
static void mqtt_thread_fn(void *arg1, void *arg2, void *arg3)
{
int err;
while (1) {
/* Don't go any further until MQTT is connected */
k_sem_take(&mqtt_start_connect, K_FOREVER);
while (keep_connect) {
err = poll(&fds, 1, mqtt_keepalive_time_left(&client));
if (err < 0) {
LOG_ERR("ERROR: poll %d", errno);
break;
}
err = mqtt_live(&client);
if ((err != 0) && (err != -EAGAIN)) {
LOG_ERR("ERROR: mqtt_live %d", err);
break;
}
if ((fds.revents & POLLIN) == POLLIN) {
err = mqtt_input(&client);
if (err != 0) {
LOG_ERR("ERROR: mqtt_input %d", err);
mqtt_abort(&client);
break;
}
}
if ((fds.revents & POLLERR) == POLLERR) {
LOG_ERR("POLLERR");
mqtt_abort(&client);
break;
}
if ((fds.revents & POLLNVAL) == POLLNVAL) {
LOG_ERR("POLLNVAL");
mqtt_abort(&client);
break;
}
}
}
}
/**@brief Resolves the configured hostname and
* initializes the MQTT broker structure
*/
static void broker_init(void)
{
int err;
struct addrinfo *result;
struct addrinfo *addr;
struct addrinfo hints = { .ai_family = AF_INET, .ai_socktype = SOCK_STREAM };
err = getaddrinfo(CONFIG_MQTT_BROKER_HOSTNAME, NULL, &hints, &result);
if (err) {
LOG_INF("ERROR: getaddrinfo failed %d\n", err);
return;
}
addr = result;
err = -ENOENT;
/* Look for address of the broker. */
while (addr != NULL) {
/* IPv4 Address. */
if (addr->ai_addrlen == sizeof(struct sockaddr_in)) {
struct sockaddr_in *broker4 = ((struct sockaddr_in *)&broker);
char ipv4_addr[NET_IPV4_ADDR_LEN];
broker4->sin_addr.s_addr = ((struct sockaddr_in *)addr->ai_addr)->sin_addr.s_addr;
broker4->sin_family = AF_INET;
broker4->sin_port = htons(CONFIG_MQTT_BROKER_PORT);
inet_ntop(AF_INET, &broker4->sin_addr.s_addr, ipv4_addr, sizeof(ipv4_addr));
LOG_INF("IPv4 Address found %s\n", ipv4_addr);
break;
} else {
LOG_INF("ai_addrlen = %u should be %u or %u\n",
(unsigned int)addr->ai_addrlen,
(unsigned int)sizeof(struct sockaddr_in),
(unsigned int)sizeof(struct sockaddr_in6));
}
addr = addr->ai_next;
break;
}
/* Free the address. */
freeaddrinfo(result);
}
/**@brief Initialize the MQTT client structure
*/
static void client_init(void)
{
mqtt_client_init(&client);
broker_init();
/* MQTT client configuration */
client.broker = &broker;
client.evt_cb = mqtt_evt_handler;
client.client_id.utf8 = (uint8_t *)dev_imei;
client.client_id.size = IMEI_LEN;
client.password = NULL;
client.user_name = NULL;
client.protocol_version = MQTT_VERSION_3_1_1;
/* MQTT buffers configuration */
client.rx_buf = rx_buffer;
client.rx_buf_size = sizeof(rx_buffer);
client.tx_buf = tx_buffer;
client.tx_buf_size = sizeof(tx_buffer);
/* MQTT transport configuration */
#if defined(CONFIG_MQTT_LIB_TLS)
struct mqtt_sec_config *tls_config = &client.transport.tls.config;
client.transport.type = MQTT_TRANSPORT_SECURE;
tls_config->peer_verify = CONFIG_PEER_VERIFY;
tls_config->cipher_count = 0;
tls_config->cipher_list = NULL;
tls_config->sec_tag_count = ARRAY_SIZE(sec_tag_list);
tls_config->sec_tag_list = sec_tag_list;
tls_config->hostname = CONFIG_MQTT_BROKER_HOSTNAME;
#else /* MQTT transport configuration */
client.transport.type = MQTT_TRANSPORT_NON_SECURE;
#endif /* defined(CONFIG_MQTT_LIB_TLS) */
}
/**@brief Initialize the file descriptor structure used by poll.
*/
static int fds_init(struct mqtt_client *c)
{
if (c->transport.type == MQTT_TRANSPORT_NON_SECURE) {
fds.fd = c->transport.tcp.sock;
} else {
#if defined(CONFIG_MQTT_LIB_TLS)
fds.fd = c->transport.tls.sock;
#else
return -ENOTSUP;
#endif
}
fds.events = POLLIN;
return 0;
}
int mt_mqtt_disconnect(void)
{
int err;
err = mqtt_disconnect(&client);
if (err) {
LOG_ERR("ERROR: mqtt_disconnect %d", err);
}
return err;
}
int mt_mqtt_publish(uint16_t qos, uint8_t *msg, size_t msg_len)
{
struct mqtt_publish_param param;
param.message.topic.qos = qos;
param.message.topic.topic.utf8 = CONFIG_MQTT_PUB_TOPIC;
param.message.topic.topic.size = strlen(CONFIG_MQTT_PUB_TOPIC);
param.message.payload.data = msg;
param.message.payload.len = msg_len;
param.message_id = sys_rand32_get();
param.dup_flag = 0;
param.retain_flag = 0;
LOG_HEXDUMP_INF(msg, msg_len, "Publishing: ");
LOG_INF("to topic: %s len: %u\n",
CONFIG_MQTT_PUB_TOPIC,
(unsigned int)strlen(CONFIG_MQTT_PUB_TOPIC));
return mqtt_publish(&client, ¶m);
}
int mt_mqtt_cb_init(uint8_t * imei , mqtt_evt_cb mqtt_cb)
{
if (mqtt_cb == NULL) {
return -1;
}
m_cb = mqtt_cb;
memcpy(dev_imei, imei, IMEI_LEN);
return 0;
}
void suspend_thread(void)
{
k_thread_suspend(k_current_get());
}
int mt_mqtt_connect(void)
{
int err = -EINVAL;
if (keep_connect) {
return -EINPROGRESS;
}
client_init();
watchdog_feed_dog();
err = mqtt_connect(&client);
if (err != 0) {
LOG_ERR("ERROR: mqtt_connect %d", err);
return err;
}
err = fds_init(&client);
if (err != 0) {
LOG_ERR("ERROR: fds_init %d", err);
return err;
}
/** start polling now for CONNACK */
keep_connect = true;
k_sem_give(&mqtt_start_connect);
return 0;
}
K_THREAD_DEFINE(mqtt_thread,
K_THREAD_STACK_SIZEOF(mqtt_thread_stack),
mqtt_thread_fn,
NULL,
NULL,
NULL,
THREAD_PRIORITY,
0,
0);
