Is it possible to use NON_SECURE (no TLS) transport with an MQTT client, while the MQTT_LIB_TLS and MBED_TLS Kconfig symbols are enabled? I would like my device to, at runtime, be able to choose between using TLS or not using TLS.

Issue:

It appears that whether or not TLS is used by an MQTT client MUST be configured at compile-time. An application with the Kconfig symbols enabled to allow TLS to be used, (CONFIG_MQTT_LIB_TLS=y and CONFIG_MBED_TLS=y), is not able to use NON_SECURE transport, despite the mqtt client config being set accordingly. 

When I connect to a broker using TLS, it works fine.

When I have the MQTT_LIB_TLS and MBED_TLS Kconfig symbols disabled, and then attempt to connect using NON_SECURE transport, it also works fine. 


 What I've done:

For reference I am using a simple hiveMQ public MQTT broker with standard non secure port of 1883 for demonstration. I am able to connect to this perfectly fine, IF the associated TLS Kconfig symbols are disabled.  

I have commented out all lines in the MQTT Helper code that configure the MQTT client to use TLS, based on preprocessor directive conditionals. See here:

/*
 * Copyright (c) 2023 Nordic Semiconductor ASA
 *
 * SPDX-License-Identifier: LicenseRef-Nordic-5-Clause
 */
#include <stdlib.h>
#include <string.h>
#include <zephyr/net/socket.h>

#include <net/mqtt_helper.h>
#include <zephyr/net/mqtt.h>
#include <zephyr/logging/log.h>

#if defined(CONFIG_MQTT_HELPER_PROVISION_CERTIFICATES)
#include "mqtt-certs.h"
#endif

LOG_MODULE_REGISTER(mqtt_helper, CONFIG_MQTT_HELPER_LOG_LEVEL);

// #if defined(CONFIG_MQTT_LIB_TLS)
// BUILD_ASSERT((CONFIG_MQTT_HELPER_SEC_TAG != -1), "Security tag must be configured");
// #endif /* CONFIG_MQTT_LIB_TLS */

/* Define a custom MQTT_HELPER_STATIC macro that exposes internal variables when unit testing. */
#if defined(CONFIG_UNITY)
#define MQTT_HELPER_STATIC
#else
#define MQTT_HELPER_STATIC static
#endif

MQTT_HELPER_STATIC struct mqtt_client mqtt_client;
static struct sockaddr_storage broker;
static char rx_buffer[CONFIG_MQTT_HELPER_RX_TX_BUFFER_SIZE];
static char tx_buffer[CONFIG_MQTT_HELPER_RX_TX_BUFFER_SIZE];
MQTT_HELPER_STATIC char payload_buf[CONFIG_MQTT_HELPER_PAYLOAD_BUFFER_LEN];
MQTT_HELPER_STATIC K_SEM_DEFINE(connection_poll_sem, 0, 1);
static struct mqtt_helper_cfg current_cfg;
MQTT_HELPER_STATIC enum mqtt_state mqtt_state = MQTT_STATE_UNINIT;

static const char *state_name_get(enum mqtt_state state)
{
	switch (state) {
	case MQTT_STATE_UNINIT: return "MQTT_STATE_UNINIT";
	case MQTT_STATE_DISCONNECTED: return "MQTT_STATE_DISCONNECTED";
	case MQTT_STATE_TRANSPORT_CONNECTING: return "MQTT_STATE_TRANSPORT_CONNECTING";
	case MQTT_STATE_CONNECTING: return "MQTT_STATE_CONNECTING";
	case MQTT_STATE_TRANSPORT_CONNECTED: return "MQTT_STATE_TRANSPORT_CONNECTED";
	case MQTT_STATE_CONNECTED: return "MQTT_STATE_CONNECTED";
	case MQTT_STATE_DISCONNECTING: return "MQTT_STATE_DISCONNECTING";
	default: return "MQTT_STATE_UNKNOWN";
	}
}

MQTT_HELPER_STATIC enum mqtt_state mqtt_state_get(void)
{
	return mqtt_state;
}

MQTT_HELPER_STATIC void mqtt_state_set(enum mqtt_state new_state)
{
	bool notify_error = false;

	if (mqtt_state_get() == new_state) {
		LOG_DBG("Skipping transition to the same state (%s)",
			state_name_get(mqtt_state_get()));
		return;
	}

	/* Check for legal state transitions. */
	switch (mqtt_state_get()) {
	case MQTT_STATE_UNINIT:
		if (new_state != MQTT_STATE_DISCONNECTED) {
			notify_error = true;
		}
		break;
	case MQTT_STATE_DISCONNECTED:
		if (new_state != MQTT_STATE_CONNECTING &&
		    new_state != MQTT_STATE_UNINIT &&
		    new_state != MQTT_STATE_TRANSPORT_CONNECTING) {
			notify_error = true;
		}
		break;
	case MQTT_STATE_TRANSPORT_CONNECTING:
		if (new_state != MQTT_STATE_TRANSPORT_CONNECTED &&
		    new_state != MQTT_STATE_DISCONNECTED) {
			notify_error = true;
		}
		break;
	case MQTT_STATE_CONNECTING:
		if (new_state != MQTT_STATE_CONNECTED &&
		    new_state != MQTT_STATE_DISCONNECTED) {
			notify_error = true;
		}
		break;
	case MQTT_STATE_TRANSPORT_CONNECTED:
		if (new_state != MQTT_STATE_CONNECTING &&
		    new_state != MQTT_STATE_DISCONNECTED) {
			notify_error = true;
		}
		break;
	case MQTT_STATE_CONNECTED:
		if (new_state != MQTT_STATE_DISCONNECTING &&
		    new_state != MQTT_STATE_DISCONNECTED) {
			notify_error = true;
		}
		break;
	case MQTT_STATE_DISCONNECTING:
		if (new_state != MQTT_STATE_DISCONNECTED) {
			notify_error = true;
		}
		break;
	default:
		LOG_ERR("New state is unknown: %d", new_state);
		notify_error = true;
		break;
	}

	if (notify_error) {
		LOG_ERR("Invalid state transition, %s --> %s",
			state_name_get(mqtt_state),
			state_name_get(new_state));

		__ASSERT(false, "Illegal state transition: %d --> %d", mqtt_state, new_state);
	}

	LOG_DBG("State transition: %s --> %s",
		state_name_get(mqtt_state),
		state_name_get(new_state));

	mqtt_state = new_state;
}

static bool mqtt_state_verify(enum mqtt_state state)
{
	return (mqtt_state_get() == state);
}

#if defined(CONFIG_MQTT_HELPER_PROVISION_CERTIFICATES)
static int certificates_provision(void)
{
	static bool certs_added;
	int err;

	if (!IS_ENABLED(CONFIG_NET_SOCKETS_SOCKOPT_TLS) || certs_added) {
		return 0;
	}

	if (sizeof(ca_certificate) > 1) {
		err = tls_credential_add(CONFIG_MQTT_HELPER_SEC_TAG,
					 TLS_CREDENTIAL_CA_CERTIFICATE,
					 ca_certificate,
					 sizeof(ca_certificate));
		if (err == -EEXIST) {
			LOG_DBG("CA certificate already exists, sec tag: %d",
				CONFIG_MQTT_HELPER_SEC_TAG);
		} else if (err < 0) {
			LOG_ERR("Failed to register CA certificate: %d", err);
			return err;
		}
	}

	if (sizeof(private_key) > 1) {
		err = tls_credential_add(CONFIG_MQTT_HELPER_SEC_TAG,
					 TLS_CREDENTIAL_PRIVATE_KEY,
					 private_key,
					 sizeof(private_key));
		if (err == -EEXIST) {
			LOG_DBG("Private key already exists, sec tag: %d",
				CONFIG_MQTT_HELPER_SEC_TAG);
		} else if (err < 0) {
			LOG_ERR("Failed to register private key: %d", err);
			return err;
		}
	}

	if (sizeof(device_certificate) > 1) {
		err = tls_credential_add(CONFIG_MQTT_HELPER_SEC_TAG,
					 TLS_CREDENTIAL_SERVER_CERTIFICATE,
					 device_certificate,
					 sizeof(device_certificate));
		if (err == -EEXIST) {
			LOG_DBG("Public certificate already exists, sec tag: %d",
				CONFIG_MQTT_HELPER_SEC_TAG);
		} else  if (err < 0) {
			LOG_ERR("Failed to register public certificate: %d", err);
			return err;
		}
	}

	/* Secondary security tag entries. */

#if CONFIG_MQTT_HELPER_SECONDARY_SEC_TAG != -1

	if (sizeof(ca_certificate_2) > 1) {
		err = tls_credential_add(CONFIG_MQTT_HELPER_SECONDARY_SEC_TAG,
					 TLS_CREDENTIAL_CA_CERTIFICATE,
					 ca_certificate_2,
					 sizeof(ca_certificate_2));
		if (err == -EEXIST) {
			LOG_DBG("CA certificate already exists, sec tag: %d",
				CONFIG_MQTT_HELPER_SECONDARY_SEC_TAG);
		} else if (err < 0) {
			LOG_ERR("Failed to register secondary CA certificate: %d", err);
			return err;
		}
	}

	if (sizeof(private_key_2) > 1) {
		err = tls_credential_add(CONFIG_MQTT_HELPER_SECONDARY_SEC_TAG,
					 TLS_CREDENTIAL_PRIVATE_KEY,
					 private_key_2,
					 sizeof(private_key_2));
		if (err == -EEXIST) {
			LOG_DBG("Private key already exists, sec tag: %d",
				CONFIG_MQTT_HELPER_SECONDARY_SEC_TAG);
		} else if (err < 0) {
			LOG_ERR("Failed to register secondary private key: %d", err);
			return err;
		}
	}

	if (sizeof(device_certificate_2) > 1) {
		err = tls_credential_add(CONFIG_MQTT_HELPER_SECONDARY_SEC_TAG,
					 TLS_CREDENTIAL_SERVER_CERTIFICATE,
					 device_certificate_2,
					 sizeof(device_certificate_2));
		if (err == -EEXIST) {
			LOG_DBG("Public certificate already exists, sec tag: %d",
				CONFIG_MQTT_HELPER_SECONDARY_SEC_TAG);
		} else if (err < 0) {
			LOG_ERR("Failed to register secondary public certificate: %d", err);
			return err;
		}
	}

#endif /* CONFIG_MQTT_HELPER_SECONDARY_SEC_TAG != -1 */

	certs_added = true;

	return 0;
}
#endif /* CONFIG_MQTT_HELPER_PROVISION_CERTIFICATES */

static int publish_get_payload(struct mqtt_client *const mqtt_client, size_t length)
{
	if (length > sizeof(payload_buf)) {
		LOG_ERR("Incoming MQTT message too large for payload buffer");
		return -EMSGSIZE;
	}

	return mqtt_readall_publish_payload(mqtt_client, payload_buf, length);
}

static void send_ack(struct mqtt_client *const mqtt_client, uint16_t message_id)
{
	int err;
	const struct mqtt_puback_param ack = {
		.message_id = message_id
	};

	err = mqtt_publish_qos1_ack(mqtt_client, &ack);
	if (err) {
		LOG_WRN("Failed to send MQTT ACK, error: %d", err);
		return;
	}

	LOG_DBG("PUBACK sent for message ID %d", message_id);
}

MQTT_HELPER_STATIC void on_publish(const struct mqtt_evt *mqtt_evt)
{
	int err;
	const struct mqtt_publish_param *p = &mqtt_evt->param.publish;
	struct mqtt_helper_buf topic = {
		.ptr = (char *)p->message.topic.topic.utf8,
		.size = p->message.topic.topic.size,
	};
	struct mqtt_helper_buf payload = {
		.ptr = payload_buf,
	};

	err = publish_get_payload(&mqtt_client, p->message.payload.len);
	if (err) {
		LOG_ERR("publish_get_payload, error: %d", err);

		if (current_cfg.cb.on_error) {
			current_cfg.cb.on_error(MQTT_HELPER_ERROR_MSG_SIZE);
		}

		return;
	}

	if (p->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
		send_ack(&mqtt_client, p->message_id);
	}

	payload.size = p->message.payload.len;

	if (current_cfg.cb.on_publish) {
		current_cfg.cb.on_publish(topic, payload);
	}
}

MQTT_HELPER_STATIC void mqtt_evt_handler(struct mqtt_client *const mqtt_client,
					 const struct mqtt_evt *mqtt_evt)
{
	if (current_cfg.cb.on_all_events != NULL) {
		if (!current_cfg.cb.on_all_events(mqtt_client,
						  (const struct mqtt_evt *const)mqtt_evt)) {
			/* Event processed by the caller, ignore and return. */
			return;
		}
	}

	switch (mqtt_evt->type) {
	case MQTT_EVT_CONNACK:
		LOG_DBG("MQTT mqtt_client connected");

		if (mqtt_evt->param.connack.return_code == MQTT_CONNECTION_ACCEPTED) {
			mqtt_state_set(MQTT_STATE_CONNECTED);
		} else {
			mqtt_state_set(MQTT_STATE_DISCONNECTED);
		}

		if (current_cfg.cb.on_connack) {
			current_cfg.cb.on_connack(mqtt_evt->param.connack.return_code,
						  mqtt_evt->param.connack.session_present_flag);
		}
		break;
	case MQTT_EVT_DISCONNECT:
		LOG_DBG("MQTT_EVT_DISCONNECT: result = %d", mqtt_evt->result);

		mqtt_state_set(MQTT_STATE_DISCONNECTED);

		if (current_cfg.cb.on_disconnect) {
			current_cfg.cb.on_disconnect(mqtt_evt->result);
		}
		break;
	case MQTT_EVT_PUBLISH:
		LOG_DBG("MQTT_EVT_PUBLISH, message ID: %d, len = %d",
			mqtt_evt->param.publish.message_id,
			mqtt_evt->param.publish.message.payload.len);
		on_publish(mqtt_evt);
		break;
	case MQTT_EVT_PUBACK:
		LOG_DBG("MQTT_EVT_PUBACK: id = %d result = %d",
			mqtt_evt->param.puback.message_id,
			mqtt_evt->result);

		if (current_cfg.cb.on_puback) {
			current_cfg.cb.on_puback(mqtt_evt->param.puback.message_id,
						 mqtt_evt->result);
		}
		break;
	case MQTT_EVT_SUBACK:
		LOG_DBG("MQTT_EVT_SUBACK: id = %d result = %d",
			mqtt_evt->param.suback.message_id,
			mqtt_evt->result);

		if (current_cfg.cb.on_suback) {
			current_cfg.cb.on_suback(mqtt_evt->param.suback.message_id,
						 mqtt_evt->result);
		}
		break;
	case MQTT_EVT_PINGRESP:
		LOG_DBG("MQTT_EVT_PINGRESP");

		if (current_cfg.cb.on_pingresp) {
			current_cfg.cb.on_pingresp();
		}
		break;
	default:
		break;
	}
}

static int broker_init(struct sockaddr_storage *broker,
		       struct mqtt_helper_conn_params *conn_params)
{
	int err;
	struct zsock_addrinfo *result;
	struct zsock_addrinfo *addr;
	struct zsock_addrinfo hints = {
		.ai_socktype = SOCK_STREAM
	};
	char addr_str[NET_IPV6_ADDR_LEN];

	if (sizeof(CONFIG_MQTT_HELPER_STATIC_IP_ADDRESS) > 1) {
		conn_params->hostname.ptr = CONFIG_MQTT_HELPER_STATIC_IP_ADDRESS;

		LOG_DBG("Using static IP address: %s", CONFIG_MQTT_HELPER_STATIC_IP_ADDRESS);
	} else {
		LOG_DBG("Resolving IP address for %s", conn_params->hostname.ptr);
	}

	err = zsock_getaddrinfo(conn_params->hostname.ptr, NULL, &hints, &result);
	if (err) {
		LOG_ERR("getaddrinfo() failed, error %d", err);
		return -err;
	}

	addr = result;

	while (addr != NULL) {
		if (addr->ai_family == AF_INET6) {
			struct sockaddr_in6 *broker6 = ((struct sockaddr_in6 *)broker);

			net_ipaddr_copy(&broker6->sin6_addr,
					&((struct sockaddr_in6 *)addr->ai_addr)->sin6_addr);
			broker6->sin6_family = addr->ai_family;
			broker6->sin6_port = htons(CONFIG_MQTT_HELPER_PORT);

			zsock_inet_ntop(addr->ai_family, &broker6->sin6_addr,
					addr_str, sizeof(addr_str));
			LOG_DBG("IPv6 Address found %s (%s)", addr_str,
				net_family2str(addr->ai_family));
			break;
		} else if (addr->ai_family == AF_INET) {
			struct sockaddr_in *broker4 = ((struct sockaddr_in *)broker);

			net_ipaddr_copy(&broker4->sin_addr,
					&((struct sockaddr_in *)addr->ai_addr)->sin_addr);
			broker4->sin_family = addr->ai_family;
			broker4->sin_port = htons(CONFIG_MQTT_HELPER_PORT);

			zsock_inet_ntop(addr->ai_family, &broker4->sin_addr,
					addr_str, sizeof(addr_str));
			LOG_DBG("IPv4 Address found %s (%s)", addr_str,
				net_family2str(addr->ai_family));
			break;
		} else {
			LOG_DBG("Unknown address family %d", (unsigned int)addr->ai_family);
		}

		addr = addr->ai_next;
	}

	zsock_freeaddrinfo(result);

	return err;
}

static int client_connect(struct mqtt_helper_conn_params *conn_params)
{
	int err;
	struct mqtt_utf8 user_name = {
		.utf8 = conn_params->user_name.ptr,
		.size = conn_params->user_name.size,
	};
	struct mqtt_utf8 password = {
		.utf8 = conn_params->password.ptr,
		.size = conn_params->password.size,
	};

	err = broker_init(&broker, conn_params);
	if (err) {
		return err;
	}

	mqtt_client.broker	        = &broker;
	mqtt_client.evt_cb	        = mqtt_evt_handler;
	mqtt_client.client_id.utf8      = conn_params->device_id.ptr;
	mqtt_client.client_id.size      = conn_params->device_id.size;
	mqtt_client.protocol_version    = MQTT_VERSION_3_1_1;
	mqtt_client.rx_buf	        = rx_buffer;
	mqtt_client.rx_buf_size	        = sizeof(rx_buffer);
	mqtt_client.tx_buf	        = tx_buffer;
	mqtt_client.tx_buf_size	        = sizeof(tx_buffer);

#if defined(CONFIG_MQTT_HELPER_LAST_WILL)
	static struct mqtt_topic last_will_topic = {
		.topic.utf8 = CONFIG_MQTT_HELPER_LAST_WILL_TOPIC,
		.topic.size = sizeof(CONFIG_MQTT_HELPER_LAST_WILL_TOPIC) - 1,
		.qos = MQTT_QOS_0_AT_MOST_ONCE
	};

	static struct mqtt_utf8 last_will_message = {
		.utf8 = CONFIG_MQTT_HELPER_LAST_WILL_MESSAGE,
		.size = sizeof(CONFIG_MQTT_HELPER_LAST_WILL_MESSAGE) - 1
	};

	mqtt_client.will_topic = &last_will_topic;
	mqtt_client.will_message = &last_will_message;
#endif

	mqtt_client.transport.if_name = conn_params->if_name;
// #if defined(CONFIG_MQTT_LIB_TLS)
// 	mqtt_client.transport.type      = MQTT_TRANSPORT_SECURE;
// #else
	mqtt_client.transport.type	= MQTT_TRANSPORT_NON_SECURE;
// #endif /* CONFIG_MQTT_LIB_TLS */
	mqtt_client.user_name	        = conn_params->user_name.size > 0 ? &user_name : NULL;
	mqtt_client.password	        = conn_params->password.size > 0 ? &password : NULL;

// #if defined(CONFIG_MQTT_LIB_TLS)
// 	struct mqtt_sec_config *tls_cfg = &(mqtt_client.transport).tls.config;

// 	sec_tag_t kconfig_sec_tag_list[] = {
// 		CONFIG_MQTT_HELPER_SEC_TAG,
// #if CONFIG_MQTT_HELPER_SECONDARY_SEC_TAG > -1
// 		CONFIG_MQTT_HELPER_SECONDARY_SEC_TAG,
// #endif
// 	};

// 	if (current_cfg.sec_tag_list != NULL && current_cfg.sec_tag_count > 0) {
// 		LOG_DBG("Using run-time provided security tag value");
// 		tls_cfg->sec_tag_count = current_cfg.sec_tag_count;
// 		tls_cfg->sec_tag_list = current_cfg.sec_tag_list;
// 	} else {
// 		LOG_DBG("Using build-time provided security tag value from Kconfig");
// 		tls_cfg->sec_tag_count = ARRAY_SIZE(kconfig_sec_tag_list);
// 		tls_cfg->sec_tag_list = kconfig_sec_tag_list;
// 	}

// 	tls_cfg->peer_verify = TLS_PEER_VERIFY_REQUIRED;
// 	tls_cfg->cipher_count = 0;
// 	tls_cfg->cipher_list = NULL; /* Use default */
// 	tls_cfg->session_cache = TLS_SESSION_CACHE_DISABLED;
// 	tls_cfg->hostname = conn_params->hostname.ptr;
// 	tls_cfg->set_native_tls = IS_ENABLED(CONFIG_MQTT_HELPER_NATIVE_TLS);

// #if defined(CONFIG_MQTT_HELPER_PROVISION_CERTIFICATES)
// 	err = certificates_provision();
// 	if (err) {
// 		LOG_ERR("Could not provision certificates, error: %d", err);
// 		return err;
// 	}
// #endif /* defined(CONFIG_MQTT_HELPER_PROVISION_CERTIFICATES) */
// #endif /* defined(CONFIG_MQTT_LIB_TLS) */

	mqtt_state_set(MQTT_STATE_TRANSPORT_CONNECTING);

	err = mqtt_connect(&mqtt_client);
	if (err) {
		LOG_ERR("mqtt_connect, error: %d", err);
		return err;
	}

	mqtt_state_set(MQTT_STATE_TRANSPORT_CONNECTED);

	mqtt_state_set(MQTT_STATE_CONNECTING);

	if (IS_ENABLED(CONFIG_MQTT_HELPER_SEND_TIMEOUT)) {
		struct timeval timeout = {
			.tv_sec = CONFIG_MQTT_HELPER_SEND_TIMEOUT_SEC
		};

// #if defined(CONFIG_MQTT_LIB_TLS)
// 		int sock  = mqtt_client.transport.tls.sock;
// #else
		int sock = mqtt_client.transport.tcp.sock;
// #endif /* CONFIG_MQTT_LIB_TLS */

		err = zsock_setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &timeout,
				       sizeof(timeout));
		if (err == -1) {
			LOG_WRN("Failed to set timeout, errno: %d", errno);

			/* Don't propagate this as an error. */
			err = 0;
		} else {
			LOG_DBG("Using send socket timeout of %d seconds",
				CONFIG_MQTT_HELPER_SEND_TIMEOUT_SEC);
		}
	}

	return 0;
}

/* Public API */

int mqtt_helper_init(struct mqtt_helper_cfg *cfg)
{
	__ASSERT_NO_MSG(cfg != NULL);

	if (!mqtt_state_verify(MQTT_STATE_UNINIT) && !mqtt_state_verify(MQTT_STATE_DISCONNECTED)) {
		LOG_ERR("Library is in the wrong state (%s), %s required",
			state_name_get(mqtt_state_get()),
			state_name_get(MQTT_STATE_UNINIT));

		return -EOPNOTSUPP;
	}

	current_cfg = *cfg;

	mqtt_client_init(&mqtt_client);

	mqtt_state_set(MQTT_STATE_DISCONNECTED);

	return 0;
}

int mqtt_helper_connect(struct mqtt_helper_conn_params *conn_params)
{
	int err;

	__ASSERT_NO_MSG(conn_params != NULL);

	if (!mqtt_state_verify(MQTT_STATE_DISCONNECTED)) {
		LOG_ERR("Library is in the wrong state (%s), %s required",
			state_name_get(mqtt_state_get()),
			state_name_get(MQTT_STATE_DISCONNECTED));

		return -EOPNOTSUPP;
	}

	err = client_connect(conn_params);
	if (err) {
		mqtt_state_set(MQTT_STATE_DISCONNECTED);
		return err;
	}

	LOG_DBG("MQTT connection request sent");

	k_sem_give(&connection_poll_sem);

	return 0;
}

int mqtt_helper_disconnect(void)
{
	int err;

	if (!mqtt_state_verify(MQTT_STATE_CONNECTED)) {
		LOG_ERR("Library is in the wrong state (%s), %s required",
			state_name_get(mqtt_state_get()),
			state_name_get(MQTT_STATE_CONNECTED));

		return -EOPNOTSUPP;
	}

	mqtt_state_set(MQTT_STATE_DISCONNECTING);

	err = mqtt_disconnect(&mqtt_client, NULL);
	if (err) {
		/* Treat the sitation as an ungraceful disconnect */
		LOG_ERR("Failed to send disconnection request, treating as disconnected");
		mqtt_state_set(MQTT_STATE_DISCONNECTED);

		if (current_cfg.cb.on_disconnect) {
			current_cfg.cb.on_disconnect(err);
		}
	}

	return err;
}

int mqtt_helper_subscribe(struct mqtt_subscription_list *sub_list)
{
	int err;

	__ASSERT_NO_MSG(sub_list != NULL);

	if (sub_list->message_id == 0) {
		LOG_ERR("Invalid message ID");
		return -EINVAL;
	}

	if (!mqtt_state_verify(MQTT_STATE_CONNECTED)) {
		LOG_ERR("Library is in the wrong state (%s), %s required",
			state_name_get(mqtt_state_get()),
			state_name_get(MQTT_STATE_CONNECTED));

		return -EOPNOTSUPP;
	}

	for (size_t i = 0; i < sub_list->list_count; i++) {
		LOG_DBG("Subscribing to: %s", (char *)sub_list->list[i].topic.utf8);
	}

	err = mqtt_subscribe(&mqtt_client, sub_list);
	if (err) {
		return err;
	}

	return 0;
}

int mqtt_helper_publish(const struct mqtt_publish_param *param)
{
	LOG_DBG("Publishing to topic: %.*s",
		param->message.topic.topic.size,
		(char *)param->message.topic.topic.utf8);

	if (param->message_id == 0) {
		LOG_ERR("Invalid message ID");
		return -EINVAL;
	}

	if (!mqtt_state_verify(MQTT_STATE_CONNECTED)) {
		LOG_ERR("Library is in the wrong state (%s), %s required",
			state_name_get(mqtt_state_get()),
			state_name_get(MQTT_STATE_CONNECTED));

		return -EOPNOTSUPP;
	}

	return mqtt_publish(&mqtt_client, param);
}

uint16_t mqtt_helper_msg_id_get(void)
{
	static uint16_t id;

	id++;

	if (id == 0) {
		id++;
	}

	return id;
}

int mqtt_helper_deinit(void)
{
	if (!mqtt_state_verify(MQTT_STATE_DISCONNECTED)) {
		LOG_ERR("Library is in the wrong state (%s), %s required",
			state_name_get(mqtt_state_get()),
			state_name_get(MQTT_STATE_DISCONNECTED));

		return -EOPNOTSUPP;
	}

	memset(&current_cfg, 0, sizeof(current_cfg));
	memset(&mqtt_client, 0, sizeof(mqtt_client));

	mqtt_state_set(MQTT_STATE_UNINIT);

	return 0;
}

MQTT_HELPER_STATIC void mqtt_helper_poll_loop(void)
{
	int ret;
	struct zsock_pollfd fds[1] = {0};

	LOG_DBG("Waiting for connection_poll_sem");
	k_sem_take(&connection_poll_sem, K_FOREVER);
	LOG_DBG("Took connection_poll_sem");

	fds[0].events = ZSOCK_POLLIN;
// #if defined(CONFIG_MQTT_LIB_TLS)
// 	fds[0].fd  = mqtt_client.transport.tls.sock;
// #else
	fds[0].fd = mqtt_client.transport.tcp.sock;
// #endif /* CONFIG_MQTT_LIB_TLS */

	LOG_DBG("Starting to poll on socket, fd: %d", fds[0].fd);

	while (true) {
		if (!mqtt_state_verify(MQTT_STATE_CONNECTING) &&
		    !mqtt_state_verify(MQTT_STATE_CONNECTED)) {
			LOG_DBG("Disconnected on MQTT level, ending poll loop");
			break;
		} else {
			LOG_DBG("Polling on socket fd: %d", fds[0].fd);
		}

		ret = zsock_poll(fds, ARRAY_SIZE(fds),
				 mqtt_keepalive_time_left(&mqtt_client));
		if (ret < 0) {
			LOG_ERR("poll() returned an error (%d), errno: %d", ret, -errno);
			break;
		}

		/* If poll returns 0, the timeout has expired. */
		if (ret == 0) {
			ret = mqtt_live(&mqtt_client);
			/* -EAGAIN indicates it is not time to ping; try later;
			 * otherwise, connection was closed due to NAT timeout.
			 */
			if (ret && (ret != -EAGAIN)) {
				LOG_ERR("Cloud MQTT keepalive ping failed: %d", ret);
				break;
			}
			continue;
		}

		if ((fds[0].revents & ZSOCK_POLLIN) == ZSOCK_POLLIN) {
			ret = mqtt_input(&mqtt_client);
			if (ret) {
				LOG_ERR("Cloud MQTT input error: %d", ret);
				break;
			}

			/* If connection state is set to STATE_DISCONNECTED at
			 * this point we know that the socket has
			 * been closed and we can break out of poll.
			 */
			if (mqtt_state_verify(MQTT_STATE_DISCONNECTED) ||
			    mqtt_state_verify(MQTT_STATE_UNINIT)) {
				LOG_DBG("The socket is already closed");
				break;
			}
		}

		if ((fds[0].revents & ZSOCK_POLLNVAL) == ZSOCK_POLLNVAL) {
			if (mqtt_state_verify(MQTT_STATE_DISCONNECTING)) {
				/* ZSOCK_POLLNVAL is to be expected while
				 * disconnecting, as the socket will be closed
				 * by the MQTT library and become invalid.
				 */
				LOG_DBG("POLLNVAL while disconnecting");
			} else if (mqtt_state_verify(MQTT_STATE_DISCONNECTED)) {
				LOG_DBG("POLLNVAL, no active connection");
			} else {
				LOG_ERR("Socket error: POLLNVAL");
				LOG_ERR("The socket was unexpectedly closed");
			}
			break;
		}

		if ((fds[0].revents & ZSOCK_POLLHUP) == ZSOCK_POLLHUP) {
			LOG_ERR("Socket error: POLLHUP");
			LOG_ERR("Connection was unexpectedly closed");
			break;
		}

		if ((fds[0].revents & ZSOCK_POLLERR) == ZSOCK_POLLERR) {
			LOG_ERR("Socket error: POLLERR");
			LOG_ERR("Connection was unexpectedly closed");
			break;
		}
	}

	if (!mqtt_state_verify(MQTT_STATE_DISCONNECTING)) {
		(void)mqtt_abort(&mqtt_client);
	}
}

static void mqtt_helper_run(void)
{
	while (true) {
		mqtt_helper_poll_loop();
	}
}

K_THREAD_DEFINE(mqtt_helper_thread, CONFIG_MQTT_HELPER_STACK_SIZE,
		mqtt_helper_run, false, NULL, NULL,
		K_LOWEST_APPLICATION_THREAD_PRIO, 0, 0);


With those lines commented out, I can see that the MQTT Helper no longer uses the preprocessor directives for (CONFIG_MQTT_LIB_TLS) , and instead:
- Sets the tranport type to be MQTT_TRANSPORT_NON_SECURE:

 

// #if defined(CONFIG_MQTT_LIB_TLS)
// 	mqtt_client.transport.type      = MQTT_TRANSPORT_SECURE;
// #else
	mqtt_client.transport.type	= MQTT_TRANSPORT_NON_SECURE;
// #endif /* CONFIG_MQTT_LIB_TLS */


- Does not define the tls_cfg params:

// #if defined(CONFIG_MQTT_LIB_TLS)
// 	struct mqtt_sec_config *tls_cfg = &(mqtt_client.transport).tls.config;

// 	sec_tag_t kconfig_sec_tag_list[] = {
// 		CONFIG_MQTT_HELPER_SEC_TAG,
// #if CONFIG_MQTT_HELPER_SECONDARY_SEC_TAG > -1
// 		CONFIG_MQTT_HELPER_SECONDARY_SEC_TAG,
// #endif
// 	};

// 	if (current_cfg.sec_tag_list != NULL && current_cfg.sec_tag_count > 0) {
// 		LOG_DBG("Using run-time provided security tag value");
// 		tls_cfg->sec_tag_count = current_cfg.sec_tag_count;
// 		tls_cfg->sec_tag_list = current_cfg.sec_tag_list;
// 	} else {
// 		LOG_DBG("Using build-time provided security tag value from Kconfig");
// 		tls_cfg->sec_tag_count = ARRAY_SIZE(kconfig_sec_tag_list);
// 		tls_cfg->sec_tag_list = kconfig_sec_tag_list;
// 	}

// 	tls_cfg->peer_verify = TLS_PEER_VERIFY_REQUIRED;
// 	tls_cfg->cipher_count = 0;
// 	tls_cfg->cipher_list = NULL; /* Use default */
// 	tls_cfg->session_cache = TLS_SESSION_CACHE_DISABLED;
// 	tls_cfg->hostname = conn_params->hostname.ptr;
// 	tls_cfg->set_native_tls = IS_ENABLED(CONFIG_MQTT_HELPER_NATIVE_TLS);

// #if defined(CONFIG_MQTT_HELPER_PROVISION_CERTIFICATES)
// 	err = certificates_provision();
// 	if (err) {
// 		LOG_ERR("Could not provision certificates, error: %d", err);
// 		return err;
// 	}
// #endif /* defined(CONFIG_MQTT_HELPER_PROVISION_CERTIFICATES) */
// #endif /* defined(CONFIG_MQTT_LIB_TLS) */

- Uses the 'tcp' socket file descriptor for socket configuration using setsockopt(): 

// #if defined(CONFIG_MQTT_LIB_TLS)
// 		int sock  = mqtt_client.transport.tls.sock;
// #else
		int sock = mqtt_client.transport.tcp.sock;
// #endif /* CONFIG_MQTT_LIB_TLS */


- And sets the pollfd file descriptor to use the tcp socket file descriptor, in the mqtt_helper_poll_loop() fxn:

// #if defined(CONFIG_MQTT_LIB_TLS)
// 	fds[0].fd  = mqtt_client.transport.tls.sock;
// #else
	fds[0].fd = mqtt_client.transport.tcp.sock;
// #endif /* CONFIG_MQTT_LIB_TLS */



The 'net_mqtt' lib log reports 'connect completed' , but as soon as the MQTT Helper polls the tcp socket, the net_mqtt_rx lib reports the 'connection is closed'. See the following console logs:

[00:00:16.950,744] <inf> main_simple: âś“ MQTT client ready
[00:00:16.950,775] <inf> main_simple: === Initialization Complete ===
[00:00:16.950,775] <inf> main_simple: Ready to implement and test MQTT connect/publish/subscribe
[00:00:16.950,836] <dbg> my_mqtt_client: set_state: State transition: DISCONNECTED -> CONNECTING
[00:00:16.950,897] <inf> my_mqtt_client: Connecting to MQTT broker broker.hivemq.com:1883 as client 'xxxxxxx'
[00:00:16.950,958] <dbg> mqtt_helper: broker_init: Resolving IP address for broker.hivemq.com
[00:00:16.951,141] <dbg> lte_net_medium: complete_operation: Semaphore count after: 0
[00:00:17.169,342] <dbg> mqtt_helper: broker_init: IPv4 Address found xxx.xxx.xxx.xxx (AF_INET)
[00:00:17.169,433] <dbg> mqtt_helper: mqtt_state_set: State transition: MQTT_STATE_DISCONNECTED --> MQTT_STATE_TRANSPORT_CONNECTING
[00:00:17.445,953] <inf> net_mqtt: Connect completed
[00:00:17.446,014] <dbg> mqtt_helper: mqtt_state_set: State transition: MQTT_STATE_TRANSPORT_CONNECTING --> MQTT_STATE_TRANSPORT_CONNECTED
[00:00:17.446,075] <dbg> mqtt_helper: mqtt_state_set: State transition: MQTT_STATE_TRANSPORT_CONNECTED --> MQTT_STATE_CONNECTING
[00:00:17.446,105] <dbg> mqtt_helper: client_connect: Using send socket timeout of 60 seconds
[00:00:17.446,136] <dbg> mqtt_helper: mqtt_helper_connect: MQTT connection request sent
[00:00:17.446,228] <dbg> mqtt_helper: mqtt_helper_poll_loop: Took connection_poll_sem
[00:00:17.446,258] <dbg> mqtt_helper: mqtt_helper_poll_loop: Starting to poll on socket, fd: 4
[00:00:17.446,289] <dbg> mqtt_helper: mqtt_helper_poll_loop: Polling on socket fd: 4
[00:00:17.680,358] <err> net_mqtt_rx: [CID 0x20013b30]: Connection closed.
[00:00:17.680,389] <inf> net_mqtt_sock_tcp: Closing socket 4
[00:00:17.681,152] <dbg> mqtt_helper: mqtt_evt_handler: MQTT_EVT_DISCONNECT: result = -128
[00:00:17.681,213] <dbg> mqtt_helper: mqtt_state_set: State transition: MQTT_STATE_CONNECTING --> MQTT_STATE_DISCONNECTED
[00:00:17.681,243] <dbg> my_mqtt_client: on_mqtt_disconnect: MQTT disconnected: result=-128
[00:00:17.681,304] <dbg> my_mqtt_client: set_state: State transition: CONNECTING -> DISCONNECTED
[00:00:17.681,304] <wrn> main_simple: MQTT Disconnected (result: -128)


Step debugging has later revealed that the mqtt_transport_socket_tcp.c associated API fxns are being called (as expected) and the mqtt_client_tcp_connect() fxn seems to work fine, but the 'mqtt_transport_read()' fxn in the mqtt_rx.c code, is returning a len value of 0, which means it thinks the connection is closed. 

I can see through step debugging that, as expected, the mqtt_client_tcp_read fxn is called, which calls zsock_recv() fxn, which suggests 0 bytes were received. 



This happens inside the 'mqtt_read_and_parse_fixed_header() fxn, in the mqtt_hanlde_rx() fxn, within the mqtt_rx.c source code.

int mqtt_handle_rx(struct mqtt_client *client)
{
	int err_code;
	uint8_t type_and_flags;
	uint32_t var_length;
	struct buf_ctx buf;

	buf.cur = client->rx_buf;
	buf.end = client->rx_buf + client->internal.rx_buf_datalen;

	err_code = mqtt_read_and_parse_fixed_header(client, &type_and_flags,
						    &var_length, &buf);
	if (err_code < 0) {
		return (err_code == -EAGAIN) ? 0 : err_code;
	}

	if ((type_and_flags & 0xF0) == MQTT_PKT_TYPE_PUBLISH) {
		err_code = mqtt_read_publish_var_header(client, type_and_flags,
							&buf);
	} else {
		err_code = mqtt_read_message_chunk(client, &buf, var_length);
	}

	if (err_code < 0) {
		return (err_code == -EAGAIN) ? 0 : err_code;
	}

	/* At this point, packet is ready to be passed to the application. */
	err_code = mqtt_handle_packet(client, type_and_flags, var_length, &buf);
	if (err_code < 0) {
		return err_code;
	}

	client->internal.rx_buf_datalen = 0U;

	return 0;
}



This leads me to believe that something is happening outside of the mqtt code that prevents a single application from being able to choose between using TLS, or not, and that you can only have one or the other, which must be defined at compile-time.

Obivously later on I will not use the MQTT Helper, and will setup my own code to do the same thing, but I commented out the TLS stuff as a quick and ditry proof of concept - and this has revealed to me that despite using the tcp socket file descriptor and correctly using the mqtt_transport_socket_tcp.c associated fxns, I am still not able to NOT use TLS in a project where TLS is enabled via Kconfig symbols at compile time.

I have made a post on this matter on the Zephyr discord, but I was told to instead make the post here. 


Dev setup:

OS: Linux Ubuntu 24.04.1

Board: nrf9151DK HW v0.9.0

SDK:  nRF Connect SDK V3.1.0 

 



All help and feedback is greatly appreciated! Thank you Devzone team :) 

Parents Reply
  • Any chance you've been able to look more into this? 

    Ideally I would like to continue using the MQTT Helper library, but it seems like that wont be possible without modifications and a PR, given the Kconfig define preprocessor guards are baked into the mqtt_helper.c source code.

    That being said, I am still curious if this will be an issue in general, regardless of whether the MQTT helper lib is used.

    Thanks

Children
  • Hello,

    Sorry, I haven't found out much personally, after digging more into it.

    I will try to discuss it more internally, but from earlier discussions around related topics it does look like because of the way the high level MQTT library assumes things and doesn't expose all the underlying configurations, adding options like that would require a big rewrite including breaking the APIs as they are today.

    Best regards,

    Michal

Related