This post is older than 2 years and might not be relevant anymore
More Info: Consider searching for newer posts

nrf9160 mqtt keep alive longer than 30 minutes not working

sdk: 1.6

modem fw: 1.2.3

Hi,

Been trying to test mqtt every which way and not getting any success with mqtt keep alive longer than 30 minutes, the mqtt connection is only maintained when the keepalive is less than 30 minutes. Two problems are:

1. setting KEEPALIVE values 1800 or larger than that, the connection just gets dropped.

2. Cant figure out how to set 'will message'. What I currently have in the code I get, "mqtt_connect... -12". which is #define ENOMEM 12 /* Not enough space */.

Removing the will message solves the connection issue, but keepalive issue persists.

Regards

Noaman

6443.2021-09-17-proj.conf

#
# Copyright (c) 2019 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: LicenseRef-BSD-5-Clause-Nordic
#

menu "HTTP application update sample"

config DOWNLOAD_HOST
	string "Download host name"

config DOWNLOAD_FILE
	string "The file to download"

config APPLICATION_VERSION
	int "Application version"
	default 1

config USE_HTTPS
	bool
	prompt "Use HTTPS for download"

endmenu

menu "MQTT simple sample"
config MQTT_PUB_TOPIC
	string "MQTT publish topic"
	default "my/publish/topic"

config MQTT_SUB_TOPIC
	string "MQTT subscribe topic"
	default "my/subscribe/topic"

config MQTT_CLIENT_ID
	string "MQTT Client ID"
	help
	  Use a custom Client ID string. If not set, the client ID will be
	  generated based on IMEI number (for nRF9160 based targets) or
	  randomly (for other platforms).
	default "ltetest"

config MQTT_BROKER_HOSTNAME
	string "MQTT broker hostname"
	default "test.cloudmqtt.com"

config MQTT_BROKER_PORT
	int "MQTT broker port"
	default 16993

config MQTT_MESSAGE_BUFFER_SIZE
	int "MQTT message buffer size"
	default 1024

config MQTT_PAYLOAD_BUFFER_SIZE
	int "MQTT payload buffer size"
	default 1024

config BUTTON_EVENT_PUBLISH_MSG
	string "The message to publish on a button event"
	default "Hello from nRF91 MQTT Simple Sample"

config MQTT_KEEPALIVE
	int ""
	default 3600
	
config BUTTON_EVENT_BTN_NUM
	int "The button number"
	default 1

config MQTT_RECONNECT_DELAY_S
	int "Seconds to delay before attempting to reconnect to the broker."
	default 5

config LTE_CONNECT_RETRY_DELAY_S
	int "Seconds to delay before attempting to retry LTE connection."
	default 120

config MQTT_TLS_SEC_TAG
	int "TLS credentials security tag"
	default 24

config MQTT_TLS_SESSION_CACHING
	bool "Enable TLS session caching"

config MQTT_TLS_PEER_VERIFY
	int "Set peer verification level"
	default 0
	help
		Set to 0 for VERIFY_NONE, 1 for VERIFY_OPTIONAL, and 2 for
		VERIFY_REQUIRED.

endmenu

menu "Zephyr Kernel"
source "Kconfig.zephyr"
endmenu

module = MQTT_SIMPLE
module-str = MQTT Simple
source "${ZEPHYR_BASE}/subsys/logging/Kconfig.template.log_config"

#include "myMQTT.h"
#include "myLTE.h"
#include <net/mqtt.h>
#include <net/socket.h>
#include <random/rand32.h>
#include "global.h"
#include "myHTTPs.h"

#include <modem/at_cmd.h>
#include <modem/at_notif.h>
#include <power/reboot.h>

#include <logging/log.h>

char mqtt_client_id[17] = "ltet001122334455";
uint32_t msgIdCnt = 1000;
bool mqttInitialized = false;
bool mqttConnecting = false;


char CONFIG_MQTT_BROKER_HOSTNAME[50] = "custom.cloudmqtt.com"; //"tailor.cloudmqtt.com";
int CONFIG_MQTT_BROKER_PORT = 14339;
int CONFIG_MQTT_BROKER_PORT_SECURE = 8883; //24339;
char CONFIG_MQTT_USER[30] = "username";
char CONFIG_MQTT_PASSWORD[30] = "password"; 


/* Buffers for MQTT client. */
static uint8_t rx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
static uint8_t tx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
static uint8_t payload_buf[CONFIG_MQTT_PAYLOAD_BUFFER_SIZE];

/* The mqtt client struct */
static struct mqtt_client client;

/* MQTT Broker details. */
static struct sockaddr_storage broker;

/* Connected flag */
static bool mqttIsConnected;

/* File descriptor */
static struct pollfd fds;

#define mqttWillMsg "nrf_lte_connection_terminate"
#define mqttWillTopic "lte/willTopic"

bool flag_is_mqtts_updated = false;

uint8_t mqttInitErrorCounter = 0;
uint8_t mqttConnectErrorCounter = 0;
uint8_t mqttPollErrorCounter = 0;
uint8_t mqttPublishErrorCounter = 0;

/**@brief Function to print strings without null-termination
 */
static void data_print(uint8_t *prefix, uint8_t *data, size_t len)
{
	char buf[len + 1];

	memcpy(buf, data, len);
	buf[len] = 0;
	if (DEBUG_PRINT_MQTT) printk("%s%s\n", prefix, buf);
}

/**@brief Function to publish data on the configured topic
 */
static int data_publish(struct mqtt_client *c, enum mqtt_qos qos,
	uint8_t *data, size_t len)
{
	struct mqtt_publish_param param;

	param.message.topic.qos = qos;
	param.message.topic.topic.utf8 = CONFIG_MQTT_PUB_TOPIC;
	param.message.topic.topic.size = strlen(CONFIG_MQTT_PUB_TOPIC);
	param.message.payload.data = data;
	param.message.payload.len = len;
	param.message_id = msgIdCnt++;//sys_rand32_get();
	param.dup_flag = 0;
	param.retain_flag = 0;

	if (DEBUG_PRINT_MQTT)
	{
		data_print("Publishing: ", data, len);
		// printk("to topic: %s len: %u\n", CONFIG_MQTT_PUB_TOPIC, (unsigned int)strlen(CONFIG_MQTT_PUB_TOPIC));
	}
	return mqtt_publish(c, &param);
}

/**@brief Function to subscribe to the configured topic
 */
static int subscribe(void)
{
 /*
	struct mqtt_topic subscribe_topic = {
		.topic = {
			.utf8 = CONFIG_MQTT_SUB_TOPIC,
			.size = strlen(CONFIG_MQTT_SUB_TOPIC)
		},
		.qos = MQTT_QOS_1_AT_LEAST_ONCE
	};

	const struct mqtt_subscription_list subscription_list = {
		.list = &subscribe_topic,
		.list_count = 1,
		.message_id = 1234
	};

	printk("Subscribing to: %s len %u\n", CONFIG_MQTT_SUB_TOPIC,
		(unsigned int)strlen(CONFIG_MQTT_SUB_TOPIC));

	return mqtt_subscribe(&client, &subscription_list);
        */
}

/**@brief Function to read the published payload.
 */
static int publish_get_payload(struct mqtt_client *c, size_t length)
{
	//uint8_t *buf = payload_buf;
	//uint8_t *end = buf + length;

	//if (length > sizeof(payload_buf)) {
	//	return -EMSGSIZE;
	//}

	//while (buf < end) {
	//	int ret = mqtt_read_publish_payload(c, buf, end - buf);

	//	if (ret < 0) {
	//		int err;

	//		if (ret != -EAGAIN) {
	//			return ret;
	//		}

	//		if (DEBUG_PRINT_MQTT) printk("mqtt_read_publish_payload: EAGAIN\n");

	//		err = poll(&fds, 1, K_SECONDS(CONFIG_MQTT_KEEPALIVE));
	//		if (err > 0 && (fds.revents & POLLIN) == POLLIN) {
	//			continue;
	//		} else {
	//			return -EIO;
	//		}
	//	}

	//	if (ret == 0) {
	//		return -EIO;
	//	}

	//	buf += ret;
	//}

	return 0;
}

/**@brief MQTT client event handler
 */
void mqtt_evt_handler(struct mqtt_client *const c,
		      const struct mqtt_evt *evt)
{
	int err;

	switch (evt->type) {
	case MQTT_EVT_CONNACK:
		if (evt->result != 0) {
			if (DEBUG_PRINT_MQTT) printk("MQTT connect failed: %d", evt->result);
			break;
		}

		if (DEBUG_PRINT_MQTT) printk("MQTT client connected");
		subscribe();
		break;

	case MQTT_EVT_DISCONNECT:
		if (DEBUG_PRINT_MQTT) printk("MQTT client disconnected: %d", evt->result);
		break;

	case MQTT_EVT_PUBLISH: {
		const struct mqtt_publish_param *p = &evt->param.publish;

		if (DEBUG_PRINT_MQTT) printk("MQTT PUBLISH result=%d len=%d",
			evt->result, p->message.payload.len);
		err = publish_get_payload(c, p->message.payload.len);

		if (p->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
			const struct mqtt_puback_param ack = {
				.message_id = p->message_id
			};

			/* Send acknowledgment. */
			mqtt_publish_qos1_ack(&client, &ack);
		}

		if (err >= 0) {
			data_print("Received: ", payload_buf,
				p->message.payload.len);
			/* Echo back received data */
			data_publish(&client, MQTT_QOS_1_AT_LEAST_ONCE,
				payload_buf, p->message.payload.len);
		} else {
			if (DEBUG_PRINT_MQTT) printk("publish_get_payload failed: %d", err);
			if (DEBUG_PRINT_MQTT) printk("Disconnecting MQTT client...");

			err = mqtt_disconnect(c);
			if (err) {
				if (DEBUG_PRINT_MQTT) printk("Could not disconnect: %d", err);
			}
		}
	} break;

	case MQTT_EVT_PUBACK:
		if (evt->result != 0) {
			if (DEBUG_PRINT_MQTT) printk("MQTT PUBACK error: %d", evt->result);
			break;
		}

		if (DEBUG_PRINT_MQTT) printk("PUBACK packet id: %u", evt->param.puback.message_id);
		break;

	case MQTT_EVT_SUBACK:
		if (evt->result != 0) {
			if (DEBUG_PRINT_MQTT) printk("MQTT SUBACK error: %d", evt->result);
			break;
		}

		if (DEBUG_PRINT_MQTT) printk("SUBACK packet id: %u", evt->param.suback.message_id);
		break;

	case MQTT_EVT_PINGRESP:
		if (evt->result != 0) {
			if (DEBUG_PRINT_MQTT) printk("MQTT PINGRESP error: %d", evt->result);
		}
		break;

	default:
		if (DEBUG_PRINT_MQTT) printk("Unhandled MQTT event type: %d", evt->type);
		break;
	}
}

/**@brief Resolves the configured hostname and
 * initializes the MQTT broker structure
 */
static int broker_init(void)
{
	int err;
	struct addrinfo *result;
	struct addrinfo *addr;
	struct addrinfo hints = {
		.ai_family = AF_INET,
		.ai_socktype = SOCK_STREAM
	};

	err = getaddrinfo(CONFIG_MQTT_BROKER_HOSTNAME, NULL, &hints, &result);
	if (err) {
		if (DEBUG_PRINT_MQTT) printk("getaddrinfo failed: %d", err);
		return -ECHILD;
	}

	addr = result;

	/* Look for address of the broker. */
	while (addr != NULL) {
		/* IPv4 Address. */
		if (addr->ai_addrlen == sizeof(struct sockaddr_in)) {
			struct sockaddr_in *broker4 =
				((struct sockaddr_in *)&broker);
			char ipv4_addr[NET_IPV4_ADDR_LEN];

			broker4->sin_addr.s_addr =
				((struct sockaddr_in *)addr->ai_addr)
				->sin_addr.s_addr;
			broker4->sin_family = AF_INET;
			broker4->sin_port = htons(CONFIG_MQTT_BROKER_PORT);

			inet_ntop(AF_INET, &broker4->sin_addr.s_addr,
				  ipv4_addr, sizeof(ipv4_addr));
			if (DEBUG_PRINT_MQTT) printk("IPv4 Address found %s", ipv4_addr);

			break;
		} else {
			if (DEBUG_PRINT_MQTT) printk("ai_addrlen = %u should be %u or %u",
				(unsigned int)addr->ai_addrlen,
				(unsigned int)sizeof(struct sockaddr_in),
				(unsigned int)sizeof(struct sockaddr_in6));
		}

		addr = addr->ai_next;
	}

	/* Free the address. */
	freeaddrinfo(result);

	return err;
}

#if defined(CONFIG_NRF_MODEM_LIB)
#define IMEI_LEN 15
#define CGSN_RESPONSE_LENGTH 19
#define CLIENT_ID_LEN sizeof("nrf-") + IMEI_LEN
#else
#define RANDOM_LEN 10
#define CLIENT_ID_LEN sizeof(CONFIG_BOARD) + 1 + RANDOM_LEN
#endif /* defined(CONFIG_NRF_MODEM_LIB) */

/* Function to get the client id */
static const uint8_t* client_id_get(void)
{
	static uint8_t client_id[MAX(sizeof(CONFIG_MQTT_CLIENT_ID),
				     CLIENT_ID_LEN)];

	if (strlen(CONFIG_MQTT_CLIENT_ID) > 0) {
		snprintf(client_id, sizeof(client_id), "%s",
			 CONFIG_MQTT_CLIENT_ID);
		goto exit;
	}

 #if defined(CONFIG_NRF_MODEM_LIB)
	char imei_buf[CGSN_RESPONSE_LENGTH + 1];
	int err;

	if (!IS_ENABLED(CONFIG_AT_CMD_SYS_INIT)) {
		err = at_cmd_init();
		if (err) {
			if (DEBUG_PRINT_MQTT) printk("at_cmd failed to initialize, error: %d", err);
			goto exit;
		}
	}

	err = at_cmd_write("AT+CGSN", imei_buf, sizeof(imei_buf), NULL);
	if (err) {
		if (DEBUG_PRINT_MQTT) printk("Failed to obtain IMEI, error: %d", err);
		goto exit;
	}

	imei_buf[IMEI_LEN] = '\0';

	snprintf(client_id, sizeof(client_id), "nrf-%.*s", IMEI_LEN, imei_buf);
 #else
	uint32_t id = sys_rand32_get();
	snprintf(client_id, sizeof(client_id), "%s-%010u", CONFIG_BOARD, id);
 #endif /* !defined(NRF_CLOUD_CLIENT_ID) */

 exit:
	if (DEBUG_PRINT_MQTT) printk("client_id = %s", client_id);

	return client_id;
}

/**@brief Initialize the MQTT client structure
 */
static int client_init(struct mqtt_client *client)
{
	int err;

	mqtt_client_init(client);

	err = broker_init();
	if (err) {
		if (DEBUG_PRINT_MQTT) printk("Failed to initialize broker connection");
		return err;
	}

	struct mqtt_utf8 mqtt_user;
	struct mqtt_utf8 mqtt_password;
        struct mqtt_utf8 myWillMsg; //mqtt_user;
        struct mqtt_topic myWillTopic;

    mqtt_user.utf8 = (uint8_t *)(CONFIG_MQTT_USER);
	mqtt_user.size = strlen(CONFIG_MQTT_USER);

	mqtt_password.utf8 = (uint8_t *)(CONFIG_MQTT_PASSWORD);
	mqtt_password.size = strlen(CONFIG_MQTT_PASSWORD);

	/* MQTT client configuration */
	client->broker = &broker;
	client->evt_cb = mqtt_evt_handler;
	client->client_id.utf8 = client_id_get();
	client->client_id.size = strlen(client->client_id.utf8);
	client->password = &mqtt_password;
	client->user_name = &mqtt_user;
	client->protocol_version = MQTT_VERSION_3_1_1;

	/* MQTT buffers configuration */
	client->rx_buf = rx_buffer;
	client->rx_buf_size = sizeof(rx_buffer);
	client->tx_buf = tx_buffer;
	client->tx_buf_size = sizeof(tx_buffer);    

        myWillTopic.topic.utf8 = (uint8_t *)mqttWillTopic;
	myWillTopic.topic.size = strlen(myWillTopic.topic.utf8);
        myWillTopic.qos = MQTT_QOS_1_AT_LEAST_ONCE;
	client->will_topic = &myWillTopic;

        myWillMsg.utf8 = (uint8_t *)mqttWillMsg;
	myWillMsg.size = strlen(myWillMsg.utf8);
	client->will_message = &myWillMsg;
//client->will_topic->topic.size = myWillTopic.size;



	/* MQTT transport configuration */
 #if defined(CONFIG_MQTT_LIB_TLS)
	struct mqtt_sec_config *tls_cfg = &(client->transport).tls.config;
	static sec_tag_t sec_tag_list[] = { CONFIG_MQTT_TLS_SEC_TAG };

	if (DEBUG_PRINT_MQTT) printk("TLS enabled");
	client->transport.type = MQTT_TRANSPORT_SECURE;

	tls_cfg->peer_verify = CONFIG_MQTT_TLS_PEER_VERIFY;
	tls_cfg->cipher_count = 0;
	tls_cfg->cipher_list = NULL;
	tls_cfg->sec_tag_count = ARRAY_SIZE(sec_tag_list);
	tls_cfg->sec_tag_list = sec_tag_list;
	tls_cfg->hostname = CONFIG_MQTT_BROKER_HOSTNAME;

 #if defined(CONFIG_NRF_MODEM_LIB)
	tls_cfg->session_cache = IS_ENABLED(CONFIG_MQTT_TLS_SESSION_CACHING) ?
					    TLS_SESSION_CACHE_ENABLED :
					    TLS_SESSION_CACHE_DISABLED;
 #else
	/* TLS session caching is not supported by the Zephyr network stack */
	tls_cfg->session_cache = TLS_SESSION_CACHE_DISABLED;

 #endif

 #else
	client->transport.type = MQTT_TRANSPORT_NON_SECURE;
 #endif

	return err;
}

/**@brief Initialize the file descriptor structure used by poll.
 */
static int fds_init(struct mqtt_client *c)
{
	if (c->transport.type == MQTT_TRANSPORT_NON_SECURE) {
		fds.fd = c->transport.tcp.sock;
	} else {
#if defined(CONFIG_MQTT_LIB_TLS)
		fds.fd = c->transport.tls.sock;
#else
		return -ENOTSUP;
#endif
	}

	fds.events = POLLIN;

	return 0;
}

void myMqttWorkHandler(struct k_work *work)
{
  	//char msg[100] = 
	printk("Now publishing mqtt data\n");
	int ret = data_publish(&client,
					MQTT_QOS_0_AT_MOST_ONCE,
					CONFIG_BUTTON_EVENT_PUBLISH_MSG,
					sizeof(CONFIG_BUTTON_EVENT_PUBLISH_MSG)-1);
	if (ret) {
		printk("Publish failed: %d\n", ret);
	}
	else
	{
		printk("mqtt sent successfully\n");
	}
}

void test5_mqttKeepAlive()
{
  int err = 0;
  uint32_t connect_attempt = 0;


	err = client_init(&client);
	if (err != 0) {
		if (DEBUG_PRINT_MQTT) printk("client_init: %d", err);
		return;
	}

 do_connect:
	if (connect_attempt++ > 0) {
		if (DEBUG_PRINT_MQTT) printk("Reconnecting in %d seconds...",
			CONFIG_MQTT_RECONNECT_DELAY_S);
		k_sleep(K_SECONDS(CONFIG_MQTT_RECONNECT_DELAY_S));
	}
	err = mqtt_connect(&client);
	if (err != 0) {
		if (DEBUG_PRINT_MQTT) printk("mqtt_connect %d", err);
		goto do_connect;
	}

	err = fds_init(&client);
	if (err != 0) {
		if (DEBUG_PRINT_MQTT) printk("fds_init: %d", err);
		return;
	}

	while (1) {
		err = poll(&fds, 1, mqtt_keepalive_time_left(&client));
		if (err < 0) {
			if (DEBUG_PRINT_MQTT) printk("poll: %d", errno);
			break;
		}

		err = mqtt_live(&client);
		if ((err != 0) && (err != -EAGAIN)) {
			if (DEBUG_PRINT_MQTT) printk("ERROR: mqtt_live: %d", err);
			break;
		}

		if ((fds.revents & POLLIN) == POLLIN) {
			err = mqtt_input(&client);
			if (err != 0) {
				if (DEBUG_PRINT_MQTT) printk("mqtt_input: %d", err);
				break;
			}
		}

		if ((fds.revents & POLLERR) == POLLERR) {
			if (DEBUG_PRINT_MQTT) printk("POLLERR");
			break;
		}

		if ((fds.revents & POLLNVAL) == POLLNVAL) {
			if (DEBUG_PRINT_MQTT) printk("POLLNVAL");
			break;
		}
	}

	if (DEBUG_PRINT_MQTT) printk("Disconnecting MQTT client...");

	err = mqtt_disconnect(&client);
	if (err) {
		if (DEBUG_PRINT_MQTT) printk("Could not disconnect MQTT client: %d", err);
	}
	goto do_connect;
}

Parents Reply Children
No Data
Related