This post is older than 2 years and might not be relevant anymore
More Info: Consider searching for newer posts

mqtt connect error

Hello, 

So I'm trying to modify the mqtt_simple sample program to connect to azure iot

I've modified the code in a few ways but when I went to run the program it gave me this error

Which from my understanding is a failure to connect w/ the mqtt client

and the mqtt_connect error comes from the mqtt.h/c file which is an error of  ENOMEM (err no memory??) in the tx_buf or the rx_buf

So I manually changed the buffer size in the config file

# General config
CONFIG_TEST_RANDOM_GENERATOR=y

# Networking
CONFIG_NETWORKING=y
CONFIG_NET_SOCKETS_OFFLOAD=y
CONFIG_NET_SOCKETS=y
CONFIG_NET_SOCKETS_POSIX_NAMES=y

# LTE link control
CONFIG_LTE_LINK_CONTROL=y
CONFIG_LTE_AUTO_INIT_AND_CONNECT=n

# BSD library
CONFIG_BSD_LIBRARY=y

# AT Host
CONFIG_UART_INTERRUPT_DRIVEN=y
CONFIG_AT_HOST_LIBRARY=y

# MQTT
CONFIG_MQTT_LIB=y
CONFIG_MQTT_LIB_TLS=n

# Appliaction
CONFIG_MQTT_PUB_TOPIC="publish"
CONFIG_MQTT_SUB_TOPIC="subscribe"
CONFIG_MQTT_CLIENT_ID="MyCDevice"
CONFIG_MQTT_BROKER_HOSTNAME="MaxusHub.azure-devices.net"
CONFIG_MQTT_BROKER_PORT=11105

# Main thread
CONFIG_MAIN_THREAD_PRIORITY=7
CONFIG_MAIN_STACK_SIZE=4096

CONFIG_HEAP_MEM_POOL_SIZE=1024

# Disable native network stack to save some memory
CONFIG_NET_IPV4=n
CONFIG_NET_IPV6=n
CONFIG_NET_UDP=n
CONFIG_NET_TCP=n

#added Configuring of buffer sizes?
CONFIG_MQTT_MESSAGE_BUFFER_SIZE = 128
CONFIG_MQTT_PAYLOAD_BUFFER_SIZE = 128

And once I try to build it, it gives me an out of tree board error

My main question is, would I be able to resolve the -12 mqtt_connect error by adjusting the buffer size? And if so, how would I go about doing that if not in the proj.conf file.

Any help is appreciated, thank you

Octavio

EDIT: I should add, I tried running SES as administrator and that didn't seem to help.

Parents Reply Children
  • How does your main.c look like now ?

    Something like this?

    /*
     * Copyright (c) 2018 Nordic Semiconductor ASA
     *
     * SPDX-License-Identifier: LicenseRef-BSD-5-Clause-Nordic
     */
    
    #include <zephyr.h>
    #include <stdio.h>
    #include <uart.h>
    #include <string.h>
    
    #include <net/mqtt.h>
    #include <net/socket.h>
    #include <lte_lc.h>
    
    #include "certificates.h"
    
    #if defined(CONFIG_BSD_LIBRARY)
    #include "nrf_inbuilt_key.h"
    #endif
    
    /* Buffers for MQTT client. */
    static u8_t rx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
    static u8_t tx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
    static u8_t payload_buf[CONFIG_MQTT_PAYLOAD_BUFFER_SIZE];
    
    /* The mqtt client struct */
    static struct mqtt_client client;
    
    /* MQTT Broker details. */
    static struct sockaddr_storage broker;
    
    #if defined(CONFIG_MQTT_LIB_TLS)
    static struct mqtt_sec_config tls_config;
    #endif
    
    /* Connected flag */
    static bool connected;
    
    /* File descriptor */
    static struct pollfd fds;
    
    #if defined(CONFIG_BSD_LIBRARY)
    
    /**@brief Recoverable BSD library error. */
    void bsd_recoverable_error_handler(uint32_t err)
    {
    	printk("bsdlib recoverable error: %u\n", err);
    }
    
    /**@brief Irrecoverable BSD library error. */
    void bsd_irrecoverable_error_handler(uint32_t err)
    {
    	printk("bsdlib irrecoverable error: %u\n", err);
    
    	__ASSERT_NO_MSG(false);
    }
    
    #endif /* defined(CONFIG_BSD_LIBRARY) */
    
    /**@brief Function to print strings without null-termination
     */
    static void data_print(u8_t *prefix, u8_t *data, size_t len)
    {
    	char buf[len + 1];
    
    	memcpy(buf, data, len);
    	buf[len] = 0;
    	printk("%s%s\n", prefix, buf);
    }
    
    /**@brief Function to publish data on the configured topic
     */
    static int data_publish(struct mqtt_client *c, enum mqtt_qos qos,
    	u8_t *data, size_t len)
    {
    	struct mqtt_publish_param param;
    
    	param.message.topic.qos = qos;
    	param.message.topic.topic.utf8 = CONFIG_MQTT_PUB_TOPIC;
    	param.message.topic.topic.size = strlen(CONFIG_MQTT_PUB_TOPIC);
    	param.message.payload.data = data;
    	param.message.payload.len = len;
    	param.message_id = sys_rand32_get();
    	param.dup_flag = 0;
    	param.retain_flag = 0;
    
    	data_print("Publishing: ", data, len);
    	printk("to topic: %s len: %u\n",
    		CONFIG_MQTT_PUB_TOPIC,
    		(unsigned int)strlen(CONFIG_MQTT_PUB_TOPIC));
    
    	return mqtt_publish(c, &param);
    }
    
    /**@brief Function to subscribe to the configured topic
     */
    static int subscribe(void)
    {
    	struct mqtt_topic subscribe_topic = {
    		.topic = {
    			.utf8 = CONFIG_MQTT_SUB_TOPIC,
    			.size = strlen(CONFIG_MQTT_SUB_TOPIC)
    		},
    		.qos = MQTT_QOS_1_AT_LEAST_ONCE
    	};
    
    	const struct mqtt_subscription_list subscription_list = {
    		.list = &subscribe_topic,
    		.list_count = 1,
    		.message_id = 1234
    	};
    
    	printk("Subscribing to: %s len %u\n", CONFIG_MQTT_SUB_TOPIC,
    		(unsigned int)strlen(CONFIG_MQTT_SUB_TOPIC));
    
    	return mqtt_subscribe(&client, &subscription_list);
    }
    
    /**@brief Function to read the published payload.
     */
    static int publish_get_payload(struct mqtt_client *c, size_t length)
    {
    	u8_t *buf = payload_buf;
    	u8_t *end = buf + length;
    
    	if (length > sizeof(payload_buf)) {
    		return -EMSGSIZE;
    	}
    
    	while (buf < end) {
    		int ret = mqtt_read_publish_payload(c, buf, end - buf);
    
    		if (ret < 0) {
    			int err;
    
    			if (ret != -EAGAIN) {
    				return ret;
    			}
    
    			printk("mqtt_read_publish_payload: EAGAIN\n");
    
    			err = poll(&fds, 1, K_SECONDS(CONFIG_MQTT_KEEPALIVE));
    			if (err > 0 && (fds.revents & POLLIN) == POLLIN) {
    				continue;
    			} else {
    				return -EIO;
    			}
    		}
    
    		if (ret == 0) {
    			return -EIO;
    		}
    
    		buf += ret;
    	}
    
    	return 0;
    }
    
    /**@brief MQTT client event handler
     */
    void mqtt_evt_handler(struct mqtt_client *const c,
    		      const struct mqtt_evt *evt)
    {
    	int err;
    	printk("MQTT EVENT: %d\n", evt->type);
    
    	switch (evt->type) {
    	case MQTT_EVT_CONNACK:
    		if (evt->result != 0) {
    			printk("MQTT connect failed %d\n", evt->result);
    			break;
    		}
    
    		connected = true;
    		printk("[%s:%d] MQTT client connected!\n", __func__, __LINE__);
    		subscribe();
    		break;
    
    	case MQTT_EVT_DISCONNECT:
    		printk("[%s:%d] MQTT client disconnected %d\n", __func__,
    		       __LINE__, evt->result);
    
    		connected = false;
    		break;
    
    	case MQTT_EVT_PUBLISH: {
    		const struct mqtt_publish_param *p = &evt->param.publish;
    
    		printk("[%s:%d] MQTT PUBLISH result=%d len=%d\n", __func__,
    		       __LINE__, evt->result, p->message.payload.len);
    		err = publish_get_payload(c, p->message.payload.len);
    		if (err >= 0) {
    			data_print("Received: ", payload_buf,
    				p->message.payload.len);
    			/* Echo back received data */
    			data_publish(&client, MQTT_QOS_1_AT_LEAST_ONCE,
    				payload_buf, p->message.payload.len);
    		} else {
    			printk("mqtt_read_publish_payload: Failed! %d\n", err);
    			printk("Disconnecting MQTT client...\n");
    
    			err = mqtt_disconnect(c);
    			if (err) {
    				printk("Could not disconnect: %d\n", err);
    			}
    		}
    	} break;
    
    	case MQTT_EVT_PUBACK:
    		if (evt->result != 0) {
    			printk("MQTT PUBACK error %d\n", evt->result);
    			break;
    		}
    
    		printk("[%s:%d] PUBACK packet id: %u\n", __func__, __LINE__,
    				evt->param.puback.message_id);
    		break;
    
    	case MQTT_EVT_SUBACK:
    		if (evt->result != 0) {
    			printk("MQTT SUBACK error %d\n", evt->result);
    			break;
    		}
    
    		printk("[%s:%d] SUBACK packet id: %u\n", __func__, __LINE__,
    				evt->param.suback.message_id);
    		break;
    
    	default:
    		printk("[%s:%d] default: %d\n", __func__, __LINE__,
    				evt->type);
    		break;
    	}
    }
    
    /**@brief Resolves the configured hostname and
     * initializes the MQTT broker structure
     */
    static void broker_init(void)
    {
    	int err;
    	struct addrinfo *result;
    	struct addrinfo *addr;
    	struct addrinfo hints = {
    		.ai_family = AF_INET,
    		.ai_socktype = SOCK_STREAM
    	};
    
    	err = getaddrinfo(CONFIG_MQTT_BROKER_HOSTNAME, NULL, &hints, &result);
    	if (err) {
    		printk("ERROR: getaddrinfo failed %d\n", err);
    
    		return;
    	}
    
    	addr = result;
    	err = -ENOENT;
    
    	/* Look for address of the broker. */
    	while (addr != NULL) {
    		/* IPv4 Address. */
    		if (addr->ai_addrlen == sizeof(struct sockaddr_in)) {
    			struct sockaddr_in *broker4 =
    				((struct sockaddr_in *)&broker);
    			char ipv4_addr[NET_IPV4_ADDR_LEN];
    
    			broker4->sin_addr.s_addr =
    				((struct sockaddr_in *)addr->ai_addr)
    				->sin_addr.s_addr;
    			broker4->sin_family = AF_INET;
    			broker4->sin_port = htons(CONFIG_MQTT_BROKER_PORT);
    
    			inet_ntop(AF_INET, &broker4->sin_addr.s_addr,
    				  ipv4_addr, sizeof(ipv4_addr));
    			printk("IPv4 Address found %s\n", ipv4_addr);
    
    			break;
    		} else {
    			printk("ai_addrlen = %u should be %u or %u\n",
    				(unsigned int)addr->ai_addrlen,
    				(unsigned int)sizeof(struct sockaddr_in),
    				(unsigned int)sizeof(struct sockaddr_in6));
    		}
    
    		addr = addr->ai_next;
    		break;
    	}
    
    	/* Free the address. */
    	freeaddrinfo(result);
    }
    
    /**@brief Initialize the MQTT client structure
     */
    static void client_init(struct mqtt_client *client)
    {
    	mqtt_client_init(client);
    
    	broker_init();
    
    	/* MQTT client configuration */
    	client->broker = &broker;
    	client->evt_cb = mqtt_evt_handler;
    	client->client_id.utf8 = (u8_t *)CONFIG_MQTT_CLIENT_ID;
    	client->client_id.size = strlen(CONFIG_MQTT_CLIENT_ID);
    	client->password = NULL;
    	client->user_name = NULL;
    	client->protocol_version = MQTT_VERSION_3_1_1;
    
    	/* MQTT buffers configuration */
    	client->rx_buf = rx_buffer;
    	client->rx_buf_size = sizeof(rx_buffer);
    	client->tx_buf = tx_buffer;
    	client->tx_buf_size = sizeof(tx_buffer);
    
    	/* MQTT transport configuration */
    #if defined(CONFIG_MQTT_LIB_TLS)
    	client->transport.type = MQTT_TRANSPORT_SECURE;
    
    	//struct mqtt_sec_config *tls_config = &client->transport.tls.config;
    
    	memcpy(&client->transport.tls.config, &tls_config, sizeof(struct mqtt_sec_config));
    #else
    	client->transport.type = MQTT_TRANSPORT_NON_SECURE;
    #endif
    }
    
    /**@brief Initialize the file descriptor structure used by poll.
     */
    static int fds_init(struct mqtt_client *c)
    {
    	if (c->transport.type == MQTT_TRANSPORT_NON_SECURE) {
    		fds.fd = c->transport.tcp.sock;
    	} else {
    #if defined(CONFIG_MQTT_LIB_TLS)
    		fds.fd = c->transport.tls.sock;
    #else
    		return -ENOTSUP;
    #endif
    	}
    
    	fds.events = POLLIN;
    
    	return 0;
    }
    
    /**@brief Configures modem to provide LTE link. Blocks until link is
     * successfully established.
     */
    static void modem_configure(void)
    {
    #if defined(CONFIG_LTE_LINK_CONTROL)
    	if (IS_ENABLED(CONFIG_LTE_AUTO_INIT_AND_CONNECT)) {
    		/* Do nothing, modem is already turned on
    		 * and connected.
    		 */
    	} else {
    		int err;
    
    		printk("LTE Link Connecting ...\n");
    		err = lte_lc_init_and_connect();
    		__ASSERT(err == 0, "LTE link could not be established.");
    		printk("LTE Link Connected!\n");
    	}
    #endif
    }
    
    /* Provisions root CA certificate using nrf_inbuilt_key API */
    static int mqtt_provision(void)
    {
    #if defined(CONFIG_MQTT_LIB_TLS)
    	static sec_tag_t sec_tag_list[] = {CONFIG_MQTT_SEC_TAG};
    
    	tls_config.peer_verify = 2;
    	tls_config.cipher_count = 0;
    	tls_config.cipher_list = NULL;
    	tls_config.sec_tag_count = ARRAY_SIZE(sec_tag_list);
    	tls_config.sec_tag_list = sec_tag_list;
    	tls_config.hostname = CONFIG_MQTT_BROKER_HOSTNAME;
    #endif
    
    #if defined(CONFIG_MQTT_PROVISION_CERTIFICATES)
    #if defined(CONFIG_BSD_LIBRARY)
    	{
    		int err;
    
    		/* Delete certificates */
    		nrf_sec_tag_t sec_tag = CONFIG_MQTT_SEC_TAG;
    
    		for (nrf_key_mgnt_cred_type_t type = 0; type < 5; type++) {
    			err = nrf_inbuilt_key_delete(sec_tag, type);
    			printk("nrf_inbuilt_key_delete(%lu, %d) => result=%d\n",
    				sec_tag, type, err);
    		}
    
    		/* Provision CA Certificate. */
    		err = nrf_inbuilt_key_write(CONFIG_MQTT_SEC_TAG,
    					NRF_KEY_MGMT_CRED_TYPE_CA_CHAIN,
    					MQTT_CA_CERTIFICATE,
    					strlen(MQTT_CA_CERTIFICATE));
    		if (err) {
    			printk("MQTT_CA_CERTIFICATE err: %d\n", err);
    			return err;
    		}
    
    		/* Provision Private Certificate. */
    		err = nrf_inbuilt_key_write(
    			CONFIG_MQTT_SEC_TAG,
    			NRF_KEY_MGMT_CRED_TYPE_PRIVATE_CERT,
    			MQTT_CLIENT_PRIVATE_KEY,
    			strlen(MQTT_CLIENT_PRIVATE_KEY));
    		if (err) {
    			printk("MQTT_CLIENT_PRIVATE_KEY err: %d\n", err);
    			return err;
    		}
    
    		/* Provision Public Certificate. */
    		err = nrf_inbuilt_key_write(
    			CONFIG_MQTT_SEC_TAG,
    			NRF_KEY_MGMT_CRED_TYPE_PUBLIC_CERT,
    			MQTT_CLIENT_PUBLIC_CERTIFICATE,
    			strlen(MQTT_CLIENT_PUBLIC_CERTIFICATE));
    		if (err) {
    			printk("MQTT_CLIENT_PUBLIC_CERTIFICATE err: %d\n",
    				err);
    			return err;
    		}
    	}
    #else
    	{
    		int err;
    
    		err = tls_credential_add(CONFIG_MQTT_SEC_TAG,
    			TLS_CREDENTIAL_CA_CERTIFICATE,
    			MQTT_CA_CERTIFICATE,
    			sizeof(MQTT_CA_CERTIFICATE));
    		if (err < 0) {
    			printk("Failed to register ca certificate: %d\n",
    				err);
    			return err;
    		}
    		err = tls_credential_add(CONFIG_MQTT_SEC_TAG,
    			TLS_CREDENTIAL_PRIVATE_KEY,
    			MQTT_CLIENT_PRIVATE_KEY,
    			sizeof(MQTT_CLIENT_PRIVATE_KEY));
    		if (err < 0) {
    			printk("Failed to register private key: %d\n",
    				err);
    			return err;
    		}
    		err = tls_credential_add(CONFIG_MQTT_SEC_TAG,
    			TLS_CREDENTIAL_SERVER_CERTIFICATE,
    			MQTT_CLIENT_PUBLIC_CERTIFICATE,
    			sizeof(MQTT_CLIENT_PUBLIC_CERTIFICATE));
    		if (err < 0) {
    			printk("Failed to register public certificate: %d\n",
    				err);
    			return err;
    		}
    
    	}
    #endif /* defined(CONFIG_BSD_LIBRARY) */
    #endif /* defined(CONFIG_MQTT_PROVISION_CERTIFICATES) */
    
    	return 0;
    }
    
    void main(void)
    {
    	int err;
    
    	printk("The MQTT simple sample started\n");
    
    	err = mqtt_provision();
    	if (err != 0) {
    		printk("ERROR: mqtt_provision %d\n", err);
    		return;
    	}
    
    	modem_configure();
    
    	client_init(&client);
    
    	err = mqtt_connect(&client);
    	if (err != 0) {
    		printk("ERROR: mqtt_connect %d\n", err);
    		return;
    	}
    
    	err = fds_init(&client);
    	if (err != 0) {
    		printk("ERROR: fds_init %d\n", err);
    		return;
    	}
    
    	while (1) {
    		err = poll(&fds, 1, K_SECONDS(CONFIG_MQTT_KEEPALIVE));
    		if (err < 0) {
    			printk("ERROR: poll %d\n", errno);
    			break;
    		}
    
    		err = mqtt_live(&client);
    		if (err != 0) {
    			printk("ERROR: mqtt_live %d\n", err);
    			break;
    		}
    
    		if ((fds.revents & POLLIN) == POLLIN) {
    			err = mqtt_input(&client);
    			if (err != 0) {
    				printk("ERROR: mqtt_input %d\n", err);
    				break;
    			}
    		}
    
    		if ((fds.revents & POLLERR) == POLLERR) {
    			printk("POLLERR\n");
    			break;
    		}
    
    		if ((fds.revents & POLLNVAL) == POLLNVAL) {
    			printk("POLLNVAL\n");
    			break;
    		}
    	}
    
    	printk("Disconnecting MQTT client...\n");
    
    	err = mqtt_disconnect(&client);
    	if (err) {
    		printk("Could not disconnect MQTT client. Error: %d\n", err);
    	}
    }
    

    # General config
    CONFIG_TEST_RANDOM_GENERATOR=y
    
    # Networking
    CONFIG_NETWORKING=y
    CONFIG_NET_SOCKETS_OFFLOAD=y
    CONFIG_NET_SOCKETS=y
    CONFIG_NET_SOCKETS_POSIX_NAMES=y
    
    # LTE link control
    CONFIG_LTE_LINK_CONTROL=y
    CONFIG_LTE_AUTO_INIT_AND_CONNECT=n
    
    # BSD library
    CONFIG_BSD_LIBRARY=y
    
    # AT Host
    CONFIG_UART_INTERRUPT_DRIVEN=y
    CONFIG_AT_HOST_LIBRARY=y
    
    # MQTT
    CONFIG_MQTT_LIB=y
    CONFIG_MQTT_LIB_TLS=y
    
    # Appliaction
    CONFIG_MQTT_PUB_TOPIC="YOUR_PUB_TOPIC"
    CONFIG_MQTT_SUB_TOPIC="YOU_SUB_TOPIC"
    CONFIG_MQTT_CLIENT_ID="YOUR_CLIENT_ID"
    CONFIG_MQTT_BROKER_HOSTNAME="YOUR_HOSTNAME"
    CONFIG_MQTT_BROKER_PORT=8883
    CONFIG_MQTT_PROVISION_CERTIFICATES=y
    
    # Main thread
    CONFIG_MAIN_THREAD_PRIORITY=7
    CONFIG_MAIN_STACK_SIZE=4096
    
    CONFIG_HEAP_MEM_POOL_SIZE=1024
    
    # Disable native network stack to save some memory
    CONFIG_NET_IPV4=n
    CONFIG_NET_IPV6=n
    CONFIG_NET_UDP=n
    CONFIG_NET_TCP=n
    
    CONFIG_LOG=y
    CONFIG_LOG_BACKEND_SHOW_COLOR=n
    CONFIG_MQTT_LOG_LEVEL_DBG=y
    CONFIG_NET_LOG=y
    

  • let me edit my files to remove any sensitive information and i'll post it

  • Hi Sigurd,

    I finally got around to editing my files, I'll upload them here. Thanks in advance

    /*
     * Copyright (c) 2018 Nordic Semiconductor ASA
     *
     * SPDX-License-Identifier: LicenseRef-BSD-5-Clause-Nordic
     */
    
    #include <zephyr.h>
    #include <stdio.h>
    #include <uart.h>
    #include <string.h>
    
    #include <net/mqtt.h>
    #include <net/socket.h>
    #include <lte_lc.h>
    
    #include "certificates.h"
    
    #if defined(CONFIG_PROVISION_CERTIFICATES)
    #if defined(CONFIG_BSD_LIBRARY)
    #include "nrf_inbuilt_key.h"
    #endif
    #endif
    
    #if defined CONFIG_MQTT_LIB_TLS
    static sec_tag_t sec_tag_list[] = { CONFIG_SEC_TAG };
    #endif
    
    #define AZ_Pass                                                                \
    	"azureiothubpass"
    #define AZ_User_Name                                                           \
    	"azureiothubusername"
    #define Test_Pass "XXXXXXXXX"
    #define Test_User_Name "XXXXXXXXXX"
    
    /* Buffers for MQTT client. */
    static u8_t rx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
    static u8_t tx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
    static u8_t payload_buf[CONFIG_MQTT_PAYLOAD_BUFFER_SIZE];
    
    /* The mqtt client struct */
    static struct mqtt_client client;
    
    /*Client username/pass*/
    struct mqtt_utf8 pass, user_name;
    
    /* MQTT Broker details. */
    static struct sockaddr_storage broker;
    
    /* Connected flag */
    static bool connected;
    
    /* File descriptor */
    static struct pollfd fds;
    
    #if defined(CONFIG_BSD_LIBRARY)
    
    /**@brief Recoverable BSD library error. */
    void bsd_recoverable_error_handler(uint32_t err)
    {
    	printk("bsdlib recoverable error: %u\n", err);
    }
    
    /**@brief Irrecoverable BSD library error. */
    void bsd_irrecoverable_error_handler(uint32_t err)
    {
    	printk("bsdlib irrecoverable error: %u\n", err);
    
    	__ASSERT_NO_MSG(false);
    }
    
    #endif /* defined(CONFIG_BSD_LIBRARY) */
    
    /**@brief Function to print strings without null-termination
     */
    static void data_print(u8_t *prefix, u8_t *data, size_t len)
    {
    	char buf[len + 1];
    
    	memcpy(buf, data, len);
    	buf[len] = 0;
    	printk("%s%s\n", prefix, buf);
    }
    
    /**@brief Function to publish data on the configured topic
     */
    static int data_publish(struct mqtt_client *c, enum mqtt_qos qos, u8_t *data,
    			size_t len)
    {
    	struct mqtt_publish_param param;
    
    	param.message.topic.qos = qos;
    	param.message.topic.topic.utf8 = CONFIG_MQTT_PUB_TOPIC;
    	param.message.topic.topic.size = strlen(CONFIG_MQTT_PUB_TOPIC);
    	param.message.payload.data = data;
    	param.message.payload.len = len;
    	param.message_id = sys_rand32_get();
    	param.dup_flag = 0;
    	param.retain_flag = 0;
    
    	data_print("Publishing: ", data, len);
    	printk("to topic: %s len: %u\n", CONFIG_MQTT_PUB_TOPIC,
    	       (unsigned int)strlen(CONFIG_MQTT_PUB_TOPIC));
    
    	return mqtt_publish(c, &param);
    }
    
    /**@brief Function to subscribe to the configured topic
     */
    static int subscribe(void)
    {
    	struct mqtt_topic subscribe_topic = {
    		.topic = { .utf8 = CONFIG_MQTT_SUB_TOPIC,
    			   .size = strlen(CONFIG_MQTT_SUB_TOPIC) },
    		.qos = MQTT_QOS_1_AT_LEAST_ONCE
    	};
    
    	const struct mqtt_subscription_list subscription_list = {
    		.list = &subscribe_topic, .list_count = 1, .message_id = 1234
    	};
    
    	printk("Subscribing to: %s len %u\n", CONFIG_MQTT_SUB_TOPIC,
    	       (unsigned int)strlen(CONFIG_MQTT_SUB_TOPIC));
    
    	return mqtt_subscribe(&client, &subscription_list);
    }
    
    /**@brief Function to read the published payload.
     */
    static int publish_get_payload(struct mqtt_client *c, size_t length)
    {
    	u8_t *buf = payload_buf;
    	u8_t *end = buf + length;
    
    	if (length > sizeof(payload_buf)) {
    		return -EMSGSIZE;
    	}
    
    	while (buf < end) {
    		int ret = mqtt_read_publish_payload(c, buf, end - buf);
    
    		if (ret < 0) {
    			int err;
    
    			if (ret != -EAGAIN) {
    				return ret;
    			}
    
    			printk("mqtt_read_publish_payload: EAGAIN\n");
    
    			err = poll(&fds, 1, K_SECONDS(CONFIG_MQTT_KEEPALIVE));
    			if (err > 0 && (fds.revents & POLLIN) == POLLIN) {
    				continue;
    			} else {
    				return -EIO;
    			}
    		}
    
    		if (ret == 0) {
    			return -EIO;
    		}
    
    		buf += ret;
    	}
    
    	return 0;
    }
    
    /**@brief MQTT client event handler
     */
    void mqtt_evt_handler(struct mqtt_client *const c, const struct mqtt_evt *evt)
    {
    	int err;
    
    	switch (evt->type) {
    	case MQTT_EVT_CONNACK:
    		if (evt->result != 0) {
    			printk("MQTT connect failed %d\n",
    			       evt->result); //this is executed...why?
    			break;
    		}
    
    		connected = true;
    		printk("[%s:%d] MQTT client connected!\n", __func__, __LINE__);
    		subscribe();
    		break;
    
    	case MQTT_EVT_DISCONNECT:
    		printk("[%s:%d] MQTT client disconnected %d\n", __func__,
    		       __LINE__, evt->result);
    
    		connected = false;
    		break;
    
    	case MQTT_EVT_PUBLISH: {
    		const struct mqtt_publish_param *p = &evt->param.publish;
    
    		printk("[%s:%d] MQTT PUBLISH result=%d len=%d\n", __func__,
    		       __LINE__, evt->result, p->message.payload.len);
    		err = publish_get_payload(c, p->message.payload.len);
    		if (err >= 0) {
    			data_print("Received: ", payload_buf,
    				   p->message.payload.len);
    			/* Echo back received data */
    			data_publish(&client, MQTT_QOS_1_AT_LEAST_ONCE,
    				     payload_buf, p->message.payload.len);
    		} else {
    			printk("mqtt_read_publish_payload: Failed! %d\n", err);
    			printk("Disconnecting MQTT client...\n");
    
    			err = mqtt_disconnect(c);
    			if (err) {
    				printk("Could not disconnect: %d\n", err);
    			}
    		}
    	} break;
    
    	case MQTT_EVT_PUBACK:
    		if (evt->result != 0) {
    			printk("MQTT PUBACK error %d\n", evt->result);
    			break;
    		}
    
    		printk("[%s:%d] PUBACK packet id: %u\n", __func__, __LINE__,
    		       evt->param.puback.message_id);
    		break;
    
    	case MQTT_EVT_SUBACK:
    		if (evt->result != 0) {
    			printk("MQTT SUBACK error %d\n", evt->result);
    			break;
    		}
    
    		printk("[%s:%d] SUBACK packet id: %u\n", __func__, __LINE__,
    		       evt->param.suback.message_id);
    		break;
    
    	default:
    		printk("[%s:%d] default: %d\n", __func__, __LINE__, evt->type);
    		break;
    	}
    }
    
    /**@brief Resolves the configured hostname and
     * initializes the MQTT broker structure
     */
    static void broker_init(void)
    {
    	int err;
    	struct addrinfo *result;
    	struct addrinfo *addr;
    	struct addrinfo hints = { .ai_family = AF_INET,
    				  .ai_socktype = SOCK_STREAM };
    
    	err = getaddrinfo(CONFIG_MQTT_BROKER_HOSTNAME, NULL, &hints, &result);
    	if (err) {
    		printk("ERROR: getaddrinfo failed %d\n", err);
    
    		return;
    	}
    
    	addr = result;
    	err = -ENOENT;
    
    	/* Look for address of the broker. */
    	while (addr != NULL) {
    		/* IPv4 Address. */
    		if (addr->ai_addrlen == sizeof(struct sockaddr_in)) {
    			struct sockaddr_in *broker4 =
    				((struct sockaddr_in *)&broker);
    			char ipv4_addr[NET_IPV4_ADDR_LEN];
    
    			broker4->sin_addr.s_addr =
    				((struct sockaddr_in *)addr->ai_addr)
    					->sin_addr.s_addr;
    			broker4->sin_family = AF_INET;
    			broker4->sin_port = htons(CONFIG_MQTT_BROKER_PORT);
    
    			inet_ntop(AF_INET, &broker4->sin_addr.s_addr, ipv4_addr,
    				  sizeof(ipv4_addr));
    			printk("IPv4 Address found %s\n", ipv4_addr);
    
    			break;
    		} else {
    			printk("ai_addrlen = %u should be %u or %u\n",
    			       (unsigned int)addr->ai_addrlen,
    			       (unsigned int)sizeof(struct sockaddr_in),
    			       (unsigned int)sizeof(struct sockaddr_in6));
    		}
    
    		addr = addr->ai_next;
    		break;
    	}
    
    	/* Free the address. */
    	freeaddrinfo(result);
    }
    
    /**@brief Initialize the MQTT client structure
     */
    static void client_init(struct mqtt_client *client)
    {
    	mqtt_client_init(client);
    
    	pass.size = strlen(AZ_Pass);
    	pass.utf8 = (u8_t *)AZ_Pass;
    	user_name.size = strlen(AZ_User_Name);
    	user_name.utf8 = (u8_t *)AZ_User_Name;
    
    	broker_init();
    
    	/* MQTT client configuration */
    	client->broker = &broker;
    	client->evt_cb = mqtt_evt_handler;
    	client->client_id.utf8 = (u8_t *)CONFIG_MQTT_CLIENT_ID;
    	client->client_id.size = strlen(CONFIG_MQTT_CLIENT_ID);
    	client->password = &pass;
    	client->user_name = &user_name;
    	client->protocol_version = MQTT_VERSION_3_1_1;
    
    	/* MQTT buffers configuration */
    	client->rx_buf = rx_buffer;
    	client->rx_buf_size = sizeof(rx_buffer);
    	client->tx_buf = tx_buffer;
    	client->tx_buf_size = sizeof(tx_buffer);
    
    	/* MQTT transport configuration */
    #if defined(CONFIG_MQTT_LIB_TLS)
    	client->transport.type = MQTT_TRANSPORT_SECURE;
    	client->transport.tls.config.peer_verify = 2; //formerly 0
    	client->transport.tls.config.cipher_count = 0;
    	client->transport.tls.config.cipher_list = NULL;
    	client->transport.tls.config.sec_tag_count = ARRAY_SIZE(sec_tag_list); // formerly 0
    	client->transport.tls.config.sec_tag_list = sec_tag_list; //formerly NULL
    	client->transport.tls.config.hostname = CONFIG_MQTT_BROKER_HOSTNAME;
    #else
    	client->transport.type = MQTT_TRANSPORT_NON_SECURE;
    #endif
    }
    
    /**@brief Initialize the file descriptor structure used by poll.
     */
    static int fds_init(struct mqtt_client *c)
    {
    	if (c->transport.type == MQTT_TRANSPORT_NON_SECURE) {
    		fds.fd = c->transport.tcp.sock;
    	} 
    	else {
    #if defined(CONFIG_MQTT_LIB_TLS)
    		fds.fd = c->transport.tls.sock;
    #else
    		return -ENOTSUP;
    #endif
    	}
    
    	fds.events = POLLIN;
    	return 0;
    }
    
    /**@brief Configures modem to provide LTE link. Blocks until link is
     * successfully established.
     */
    static void modem_configure(void)
    {
    #if defined(CONFIG_LTE_LINK_CONTROL)
    	if (IS_ENABLED(CONFIG_LTE_AUTO_INIT_AND_CONNECT)) {
    		/* Do nothing, modem is already turned on
    		 * and connected.
    		 */
    	} 
    	else {
    		int err;
    
    		printk("LTE Link Connecting ...\n");
    		err = lte_lc_init_and_connect();
    		__ASSERT(err == 0, "LTE link could not be established.");
    		printk("LTE Link Connected!\n");
    	}
    #endif
    }
    
    
    static int provision_certificates(void)
    {
        {
    #if defined(CONFIG_PROVISION_CERTIFICATES)
    #if defined(CONFIG_BSD_LIBRARY)
    	
    		int err;
    
    		/* Delete Certificates */
    		nrf_sec_tag_t sec_tag = (nrf_sec_tag_t)sec_tag_list[0];
    
    		for (nrf_key_mgnt_cred_type_t type = 0; type < 5; type++) {
    			printk("Deleting certs sec_tag: %d\n", sec_tag);
    			err = nrf_inbuilt_key_delete(sec_tag, type);
    			printk("nrf_inbuilt_key_delete(%u, %d) => result%d\n",
    			       sec_tag, type, err);
    		}
    
    #if defined(AZ_ROOT_CERTIFICATE)
    		/*provision CA certificates*/
    		printk("Write ca certs sec_tag: %d\n", sec_tag);
    		err = nrf_inbuilt_key_write(sec_tag,
    					    NRF_KEY_MGMT_CRED_TYPE_CA_CHAIN,
    					    AZ_ROOT_CERTIFICATE,
    					    strlen(AZ_ROOT_CERTIFICATE));
    		if (err) {
    			printk("AZ_ROOT_CERTIFICATE err: %d", err);
    			return err;
    		}
    #endif //end az_root_certificate
    
    #if defined(CLIENT_PRIVATE_KEY)
    		/* provision private certificate */
    		printk("Write private cert sec_tag: %d\n", sec_tag);
    		err = nrf_inbuilt_key_write(sec_tag,
    					    NRF_KEY_MGMT_CRED_TYPE_PRIVATE_CERT,
    					    CLIENT_PRIVATE_KEY,
    					    strlen(CLIENT_PRIVATE_KEY));
    		if (err) {
    			printk("CLIENT_PRIVATE_KEY err: %d\n", err);
    			return err;
    		}
    #endif //client_private_key
    
    #if defined(CLIENT_PUBLIC_CERTIFICATE)
    		/* provision public certificate */
    		printk("Write public cert sec_tag: %d\n", sec_tag);
    		err = nrf_inbuilt_key_write(sec_tag,
    					    NRF_KEY_MGMT_CRED_TYPE_PUBLIC_CERT,
    					    CLIENT_PUBLIC_CERTIFICATE,
    					    strlen(CLIENT_PUBLIC_CERTIFICATE));
    		if (err) {
    			printk("CLIENT_PUBLIC_CERTIFICATE err: %d\n", err);
    			return err;
    		}
    	}
    #endif //client_public_certificate
    #else
    	{
    		int err;
    
    		err = tls_credential_add(CONFIG_SEC_TAG,
    					 TLS_CREDENTIAL_CA_CERTIFICATE,
    					 NRF_CLOUD_CA_CERTIFICATE,
    					 sizeof(NRF_CLOUD_CA_CERTIFICATE));
    		if (err < 0) {
    			printk("Failed to register ca certificate: %d\n", err);
    			return err;
    		}
    
    		err = tls_credential_add(CONFIG_SEC_TAG,
    					 TLS_CREDENTIAL_PRIVATE_KEY,
    					 NRF_CLOUD_CLIENT_PRIVATE_KEY,
    					 sizeof(NRF_CLOUD_CLIENT_PRIVATE_KEY));
    		if (err < 0) {
    			printk("Failed to register private key: %d\n", err);
    			return err;
    		}
    
    		err = tls_credential_add(
    			CONFIG_SEC_TAG, TLS_CREDENTIAL_SERVER_CERTIFICATE,
    			NRF_CLOUD_CLIENT_PUBLIC_CERTIFICATE,
    			sizeof(NRF_CLOUD_CLIENT_PUBLIC_CERTIFICATE));
    		if (err < 0) {
    			printk("Failed to register public certificate: %d\n",
    			       err);
    			return err;
    		}
    
    #endif //defined(config_bsd_library)
    #endif //defined(config_provision_certificates)
    	}
    return 0;
    }
    
    void main(void)
    {
    	int err;
    
    	printk("The MQTT simple sample started\n");
    
    	provision_certificates();
    
    	modem_configure();
    
    	client_init(&client);
    
    	err = mqtt_connect(&client);
    	if (err != 0) {
    		printk("ERROR: mqtt_connect %d\n", err);
    		return;
    	}
    
    	err = fds_init(&client);
    	if (err != 0) {
    		printk("ERROR: fds_init %d\n", err);
    		return;
    	}
    
    	printk("NOW ENTERING ENDLESS POLLING LOOP\n");
    	while (1) {
    		err = poll(&fds, 1, K_SECONDS(CONFIG_MQTT_KEEPALIVE));
    		if (err < 0) {
    			printk("ERROR: poll %d\n", errno);
    			break;
    		}
    
    		err = mqtt_live(&client);
    		if (err != 0) {
    			printk("ERROR: mqtt_live %d\n", err);
    			break;
    		}
    
    		if ((fds.revents & POLLIN) == POLLIN) {
    			err = mqtt_input(&client);
    			if (err != 0) {
    				printk("ERROR: mqtt_input %d\n", err);
    				break;
    			}
    		}
    
    		if ((fds.revents & POLLERR) == POLLERR) {
    			printk("POLLERR\n");
    			break;
    		}
    
    		if ((fds.revents & POLLNVAL) == POLLNVAL) {
    			printk("POLLNVAL\n");
    			break;
    		}
    	}
    
    	printk("Disconnecting MQTT client...\n");
    
    	err = mqtt_disconnect(&client);
    	if (err) {
    		printk("Could not disconnect MQTT client. Error: %d\n",
    		       err);
    	}
    }
    
    # General config
    CONFIG_TEST_RANDOM_GENERATOR=y
    
    # Networking
    CONFIG_NETWORKING=y
    CONFIG_NET_SOCKETS_OFFLOAD=y
    CONFIG_NET_SOCKETS=y
    CONFIG_NET_SOCKETS_POSIX_NAMES=y
    
    CONFIG_NET_SOCKETS_SOCKOPT_TLS=y
    
    # LTE link control
    CONFIG_LTE_LINK_CONTROL=y
    CONFIG_LTE_AUTO_INIT_AND_CONNECT=n
    
    # BSD library
    CONFIG_BSD_LIBRARY=y
    
    # AT Host
    CONFIG_UART_INTERRUPT_DRIVEN=y
    CONFIG_AT_HOST_LIBRARY=y
    
    # MQTT
    CONFIG_MQTT_LIB_TLS=y
    CONFIG_MQTT_LIB=y
    
    
    #added config stuff
    CONFIG_SEC_TAG=16842753
    CONFIG_PROVISION_CERTIFICATES=y
    CONFIG_CERTIFICATES_FILE="certificates.h"
    CONFIG_NO_OPTIMIZATIONS=y
    
    
    # Application
    CONFIG_MQTT_PUB_TOPIC="publish"
    CONFIG_MQTT_SUB_TOPIC="subscribe"
    CONFIG_MQTT_CLIENT_ID="MyCDevice"
    #CONFIG_MQTT_BROKER_HOSTNAME="postman.cloudmqtt.com"
    #CONFIG_MQTT_BROKER_PORT=11105
    
    CONFIG_MQTT_BROKER_HOSTNAME="azureiothubhostname"
    CONFIG_MQTT_BROKER_PORT=8883
    
    # Main thread
    CONFIG_MAIN_THREAD_PRIORITY=7
    CONFIG_MAIN_STACK_SIZE=4096
    
    CONFIG_HEAP_MEM_POOL_SIZE=1024
    
    # Disable native network stack to save some memory
    CONFIG_NET_IPV4=n
    CONFIG_NET_IPV6=n
    CONFIG_NET_UDP=n
    CONFIG_NET_TCP=n
    
    #added Configuring of buffer sizes?
    CONFIG_MQTT_MESSAGE_BUFFER_SIZE=512
    CONFIG_MQTT_PAYLOAD_BUFFER_SIZE=512
    

  • Hi sigurd, I was wondering if you had any insight into my problem. Thanks

  • Hi,

    I was wondering if you had any insight into my problem.

    Yes, the error 5 is MQTT_NOT_AUTHORIZED

    I did some tests here with my own Azure IoT HUB account, and I got this error if either the SAS token was wrong/invalid/expired, or the device name was wrong.

Related