MQTT over Wi-Fi (nRF7002DK) - can not connect

Hello

I am trying to connect to an MQTT broker over wi-fi using the nRF7002-DK.
The code that I wrote is based on the mqtt_over_wifi_nrf7002DK example (https://github.com/AliNordic/mqtt_over_wifi_nrf7002DK), so the functions and the basic operation is similar.
I can successfully connect to the wi-fi network, the DHCP gives me an IP address.
Also, the client_init function and the mqtt_connect function returns no error.
But the mqtt_publish function returns -128, which means "Socket is not connected"
Maybe this makes sense since the 
MQTT_EVT_CONNACK callback never gets called (which I think it means that it is not connected to the MQTT) although from the LOG information the net_mqtt returns: Connect completed.

Bellow is the logging information.
 LOG information

prj.conf

CONFIG_WIFI=y
CONFIG_WIFI_NRF700X=y
CONFIG_WPA_SUPP=y           #Enable or disable Wi-Fi Protected Access (WPA) supplicant

# System settings
CONFIG_NEWLIB_LIBC=y


# Networking
CONFIG_NETWORKING=y
CONFIG_NET_L2_ETHERNET=y
CONFIG_NET_SOCKETS=y
CONFIG_POSIX_CLOCK=y
CONFIG_NET_SOCKETS_POSIX_NAMES=y
CONFIG_NET_LOG=y
CONFIG_NET_IPV6=n
CONFIG_NET_IPV4=y
CONFIG_NET_UDP=y
CONFIG_NET_TCP=y
CONFIG_NET_DHCPV4=y

# DNS
CONFIG_DNS_RESOLVER=y

CONFIG_NET_STATISTICS=y

CONFIG_NET_PKT_RX_COUNT=8
CONFIG_NET_PKT_TX_COUNT=8
CONFIG_POSIX_MAX_FDS=9

# Below section is the primary contributor to SRAM and is currently
# tuned for performance, but this will be revisited in the future.
CONFIG_NET_BUF_RX_COUNT=16
CONFIG_NET_BUF_TX_COUNT=16
CONFIG_NET_BUF_DATA_SIZE=512
CONFIG_HEAP_MEM_POOL_SIZE=153600
CONFIG_NET_TC_TX_COUNT=0
CONFIG_NET_TCP_WORKQ_STACK_SIZE=2048
CONFIG_NET_IF_UNICAST_IPV4_ADDR_COUNT=1
CONFIG_NET_MAX_CONTEXTS=5
CONFIG_NET_CONTEXT_SYNC_RECV=y

CONFIG_INIT_STACKS=y

CONFIG_NET_CONFIG_SETTINGS=y

CONFIG_NET_CONFIG_MY_IPV4_ADDR="192.165.100.150"
CONFIG_NET_CONFIG_PEER_IPV4_ADDR="192.165.100.1"

CONFIG_NET_SOCKETS_POLL_MAX=4

# Memories
CONFIG_MAIN_STACK_SIZE=4096
CONFIG_SYSTEM_WORKQUEUE_STACK_SIZE=2048
CONFIG_NET_TX_STACK_SIZE=4096
CONFIG_NET_RX_STACK_SIZE=4096


# Logging
CONFIG_LOG=y

CONFIG_NET_L2_WIFI_MGMT=y
CONFIG_NET_MGMT_EVENT_STACK_SIZE=2048

CONFIG_GPIO=y


CONFIG_NET_CONFIG_SETTINGS=y
# Below configs need to be modified based on security
# CONFIG_WIFI_KEY_MGMT_NONE=y
CONFIG_WIFI_KEY_MGMT_WPA2=y
# CONFIG_WIFI_KEY_MGMT_WPA2_256=y
# CONFIG_WIFI_KEY_MGMT_WPA3=y
CONFIG_WIFI_SSID="XXXXXXXX"
CONFIG_WIFI_PASSWORD="XXXXXXXXXXXXXX"


# MQTT
CONFIG_MQTT_LIB=y
CONFIG_MQTT_LIB_TLS=n
CONFIG_MQTT_CLEAN_SESSION=y
CONFIG_MQTT_LOG_LEVEL_ERR=y
CONFIG_MQTT_LOG_LEVEL_INF=y
CONFIG_MQTT_LOG_LEVEL_DBG=y

# Application
CONFIG_MQTT_PUB_TOPIC="/sensors/c47f5192950d_test"
CONFIG_MQTT_SUB_TOPIC="/devices/c47f5192950d_test/control"
CONFIG_MQTT_CLIENT_ID="XXXXXXXX"
CONFIG_MQTT_CLIENT_USERNAME="XXXXXXX"
CONFIG_MQTT_CLIENT_PASSWORD="XXXXXXXXXXXX"
CONFIG_MQTT_BROKER_HOSTNAME="XXXXXXXXXXXX"
CONFIG_MQTT_BROKER_PORT=1883

main.c

#include <nrfx_clock.h>
#include <zephyr/kernel.h>
#include <zephyr/device.h>
#include <zephyr/drivers/gpio.h>
#include <zephyr/logging/log.h>
#include <zephyr/random/rand32.h>

#include <zephyr/net/net_if.h>
#include <zephyr/net/wifi_mgmt.h>
#include <zephyr/net/net_event.h>
#include <zephyr/net/socket.h>
#include <zephyr/net/mqtt.h>

#include <net/wifi_mgmt_ext.h>






LOG_MODULE_REGISTER(logg, LOG_LEVEL_INF);


#define SW0_NODE	DT_ALIAS(sw0)

static const struct gpio_dt_spec button = GPIO_DT_SPEC_GET_OR(SW0_NODE, gpios, {0});

#define WIFI_SHELL_MGMT_EVENTS (NET_EVENT_WIFI_CONNECT_RESULT | NET_EVENT_WIFI_DISCONNECT_RESULT | NET_EVENT_WIFI_SCAN_RESULT | NET_EVENT_WIFI_SCAN_DONE )

static struct net_mgmt_event_callback wifi_shell_mgmt_cb;	//Network Management event callback structure
static struct net_mgmt_event_callback net_shell_mgmt_cb;
struct wifi_scan_result scan_results[50];					//hold scan results - max 50!!!!
int scan_results_index = 0;
uint8_t connected = 0;
uint8_t mqtt_connected = 0;


/* Buffers for MQTT client. */
static uint8_t rx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
static uint8_t tx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
static uint8_t payload_buf[CONFIG_MQTT_PAYLOAD_BUFFER_SIZE];


static struct mqtt_client client_ctx;						// The mqtt client struct
static struct pollfd fds;									// File descriptor

static struct sockaddr_storage broker;						// MQTT Broker details

static int wifi_connect(void)
{
	struct net_if *iface = net_if_get_default();
	static struct wifi_connect_req_params cnx_params;
	
	cnx_params.timeout = SYS_FOREVER_MS;

	// SSID 
	cnx_params.ssid = CONFIG_WIFI_SSID;
	cnx_params.ssid_length = strlen(cnx_params.ssid);

	#if defined(CONFIG_WIFI_KEY_MGMT_WPA2)
		cnx_params.security = 1;
	#elif defined(CONFIG_WIFI_KEY_MGMT_WPA2_256)
		cnx_params.security = 2;
	#elif defined(CONFIG_WIFI_KEY_MGMT_WPA3)
		cnx_params.security = 3;
	#else
		cnx_params.security = 0;
	#endif

#if !defined(CONFIG_WIFI_KEY_MGMT_NONE)
	cnx_params.psk = CONFIG_WIFI_PASSWORD;
	cnx_params.psk_length = strlen(cnx_params.psk);
#endif
	cnx_params.channel = WIFI_CHANNEL_ANY;

	// MFP (optional)
	cnx_params.mfp = WIFI_MFP_OPTIONAL;

	if (net_mgmt(NET_REQUEST_WIFI_CONNECT, iface, &cnx_params, sizeof(struct wifi_connect_req_params))) {
		LOG_ERR("Connection request failed");
		return -ENOEXEC;
	}
	LOG_INF("Connection requested");
	return 0;
}




//Wi-Fi Event Handler
static void wifi_mgmt_event_handler(struct net_mgmt_event_callback *cb, uint32_t mgmt_event, struct net_if *iface)
{
	switch (mgmt_event) {
	case NET_EVENT_WIFI_CONNECT_RESULT:
		const struct wifi_status *status =(const struct wifi_status *) cb->info;
		if (status->status) {
			LOG_ERR("Connection failed (%d)", status->status);
		} else {
			LOG_INF("Connected");
		}
		break;

	case NET_EVENT_WIFI_DISCONNECT_RESULT:
		LOG_INF("WIFI Disconnected");
		break;

	case NET_EVENT_WIFI_SCAN_RESULT:
			scan_results[scan_results_index] = *(const struct wifi_scan_result *)cb->info;
			scan_results_index++;
		break;

	case NET_EVENT_WIFI_SCAN_DONE:
			LOG_INF("Scan Done!");
		break;
	default:
		break;
	}
}

//Network Management Event Handler
static void net_mgmt_event_handler(struct net_mgmt_event_callback *cb, uint32_t mgmt_event, struct net_if *iface)
{
	switch (mgmt_event) {
	case NET_EVENT_IPV4_DHCP_BOUND:
		/* Get DHCP info from struct net_if_dhcpv4 and print */
		const struct net_if_dhcpv4 *dhcpv4 = cb->info;
		const struct in_addr *addr = &dhcpv4->requested_ip;
		char dhcp_info[128];
		net_addr_ntop(AF_INET, addr, dhcp_info, sizeof(dhcp_info));
		LOG_INF("DHCP IP address: %s", dhcp_info);
		connected = 1;

		break;
	default:
		break;
	}
}

/**@brief Function to get the payload of recived data.
 */
static int get_received_payload(struct mqtt_client *c, size_t length)
{
	int ret;
	int err = 0;

	/* Return an error if the payload is larger than the payload buffer.
	 * Note: To allow new messages, we have to read the payload before returning.
	 */
	if (length > sizeof(payload_buf)) {
		err = -EMSGSIZE;
	}

	/* Truncate payload until it fits in the payload buffer. */
	while (length > sizeof(payload_buf)) {
		ret = mqtt_read_publish_payload_blocking(
				c, payload_buf, (length - sizeof(payload_buf)));
		if (ret == 0) {
			return -EIO;
		} else if (ret < 0) {
			return ret;
		}

		length -= ret;
	}

	ret = mqtt_readall_publish_payload(c, payload_buf, length);
	if (ret) {
		return ret;
	}

	return err;
}

//MQTT Event Handler
void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt)
{
	int err;

	switch (evt->type) {
	case MQTT_EVT_CONNACK:
		if (evt->result != 0) {
			LOG_ERR("MQTT connect failed %d", evt->result);
			break;
		}

		mqtt_connected = true;
		LOG_INF("MQTT client connected!");

		break;

	case MQTT_EVT_DISCONNECT:
		LOG_INF("MQTT client disconnected %d", evt->result);

		//connected = false;
		//clear_fds();

		break;

	case MQTT_EVT_PUBLISH:
	{
		int err;
		const struct mqtt_publish_param *p = &evt->param.publish;
		//Print the length of the recived message 
		LOG_INF("MQTT PUBLISH result=%d len=%d",
			evt->result, p->message.payload.len);

		//Extract the data of the recived message 
		err = get_received_payload(client, p->message.payload.len);
		
		//Send acknowledgment to the broker on receiving QoS1 publish message 
		if (p->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
			const struct mqtt_puback_param ack = {
				.message_id = p->message_id
			};

			/* Send acknowledgment. */
			mqtt_publish_qos1_ack(client, &ack);
		}

		if (err >= 0) {
			data_print("Received: ", payload_buf, p->message.payload.len);

		// Payload buffer is smaller than the received data 
		} else if (err == -EMSGSIZE) {
			LOG_ERR("Received payload (%d bytes) is larger than the payload buffer size (%d bytes).",
				p->message.payload.len, sizeof(payload_buf));
		// Failed to extract data, disconnect 
		} else {
			LOG_ERR("get_received_payload failed: %d", err);
			LOG_INF("Disconnecting MQTT client...");

			err = mqtt_disconnect(client);
			if (err) {
				LOG_ERR("Could not disconnect: %d", err);
			}
		}
	}
		break;

	case MQTT_EVT_PUBACK:
		if (evt->result != 0) {
			LOG_ERR("MQTT PUBACK error %d", evt->result);
			break;
		}

		LOG_INF("PUBACK packet id: %u", evt->param.puback.message_id);

		break;

	case MQTT_EVT_PUBREC:
		if (evt->result != 0) {
			LOG_ERR("MQTT PUBREC error %d", evt->result);
			break;
		}

		LOG_INF("PUBREC packet id: %u", evt->param.pubrec.message_id);

		const struct mqtt_pubrel_param rel_param = {
			.message_id = evt->param.pubrec.message_id
		};

		err = mqtt_publish_qos2_release(client, &rel_param);
		if (err != 0) {
			LOG_ERR("Failed to send MQTT PUBREL: %d", err);
		}

		break;

	case MQTT_EVT_PUBCOMP:
		if (evt->result != 0) {
			LOG_ERR("MQTT PUBCOMP error %d", evt->result);
			break;
		}

		LOG_INF("PUBCOMP packet id: %u",
			evt->param.pubcomp.message_id);

		break;

	case MQTT_EVT_PINGRESP:
		LOG_INF("PINGRESP packet");
		break;

	default:
		break;
	}
}

/**@brief Resolves the configured hostname and
 * initializes the MQTT broker structure
 */
static int broker_init(void)
{
	int ret;
	struct addrinfo *result;
	struct addrinfo *addr;
	struct addrinfo hints = {
		.ai_family = AF_INET,
		.ai_socktype = SOCK_STREAM
	};

	ret = getaddrinfo(CONFIG_MQTT_BROKER_HOSTNAME, NULL, &hints, &result);	//Resolve a domain name to one or more network addresses.
	if (ret) {
		LOG_ERR("getaddrinfo failed: %d", ret);
		return -ECHILD;
	}
	addr = result;
	/* Look for address of the broker. */
	while (addr != NULL) {
		/* IPv4 Address. */
		if (addr->ai_addrlen == sizeof(struct sockaddr_in)) {
			struct sockaddr_in *broker4 = ((struct sockaddr_in *)&broker);
			char ipv4_addr[NET_IPV4_ADDR_LEN];

			broker4->sin_addr.s_addr =((struct sockaddr_in *)addr->ai_addr)->sin_addr.s_addr;
			broker4->sin_family = AF_INET;
			broker4->sin_port = htons(CONFIG_MQTT_BROKER_PORT);

			inet_ntop(AF_INET, &broker4->sin_addr.s_addr, ipv4_addr, sizeof(ipv4_addr));
			LOG_INF("IPv4 Address found %s", ipv4_addr);
			break;
		} else {
			LOG_ERR("ai_addrlen = %u should be %u or %u", (unsigned int)addr->ai_addrlen, (unsigned int)sizeof(struct sockaddr_in), (unsigned int)sizeof(struct sockaddr_in6));
		}

		addr = addr->ai_next;
	}

	/* Free the address. */
	freeaddrinfo(result);

	return ret;
}

/* Function to get the client id */
static const uint8_t* client_id_get(void)
{
	static uint8_t client_id[MAX(sizeof(CONFIG_MQTT_CLIENT_ID), (sizeof(CONFIG_MQTT_CLIENT_ID) + 1 + 10))];

	if (strlen(CONFIG_MQTT_CLIENT_ID) > 0) {
		snprintf(client_id, sizeof(client_id), "%s",
			 CONFIG_MQTT_CLIENT_ID);
		goto exit;
	}

	uint32_t id = sys_rand32_get();
	snprintf(client_id, sizeof(client_id), "%s-%010u", CONFIG_BOARD, id);

exit:
	LOG_DBG("client_id = %s", (char *)client_id);

	return client_id;
}

static int client_init(struct mqtt_client *client)
{
	int ret;

	mqtt_client_init(client);

	ret = broker_init();
	if (ret) {
		LOG_ERR("Failed to initialize broker connection");
		return ret;
	}

	static uint8_t client_id[sizeof(CONFIG_MQTT_CLIENT_ID)];
	snprintf(client_id, sizeof(client_id), "%s", CONFIG_MQTT_CLIENT_ID);
	static uint8_t client_username[sizeof(CONFIG_MQTT_CLIENT_USERNAME)];
	snprintf(client_username, sizeof(client_username), "%s", CONFIG_MQTT_CLIENT_USERNAME);
	static uint8_t client_password[sizeof(CONFIG_MQTT_CLIENT_PASSWORD)];
	snprintf(client_password, sizeof(client_password), "%s", CONFIG_MQTT_CLIENT_PASSWORD);

	struct mqtt_utf8 password = {
		.utf8 = (char *)client_password,
		.size = strlen((char *)client_password)
	};
 
	struct mqtt_utf8 username = {
		.utf8 = (char *)client_username,
		.size = strlen((char *)client_username)
	};

	LOG_INF("client_id = %s", client_id);
	LOG_INF("client_username = %s", client_username);
	LOG_INF("client_password = %s", client_password);
	// MQTT client configuration
	client->broker = &broker;
	client->evt_cb = mqtt_evt_handler;
	//client->client_id.utf8 = (uint8_t *)client_id;
	client->client_id.utf8 = client_id_get();
	client->client_id.size = strlen(client->client_id.utf8);
 	client->password = &password;
 	client->user_name = &username;
	client->protocol_version = MQTT_VERSION_3_1_1;

	// MQTT buffers configuration
	client->rx_buf = rx_buffer;
	client->rx_buf_size = sizeof(rx_buffer);
	client->tx_buf = tx_buffer;
	client->tx_buf_size = sizeof(tx_buffer);



	client->transport.type = MQTT_TRANSPORT_NON_SECURE;
	
	return(ret);
}

/**@brief Initialize the file descriptor structure used by poll.
 */
int fds_init(struct mqtt_client *c, struct pollfd *fds)
{
	if (c->transport.type == MQTT_TRANSPORT_NON_SECURE) {
		fds->fd = c->transport.tcp.sock;
	} else {
		return -ENOTSUP;
	}

	fds->events = POLLIN;

	return 0;
}

void data_print(uint8_t *prefix, uint8_t *data, size_t len)
{
	char buf[len + 1];

	memcpy(buf, data, len);
	buf[len] = 0;
	LOG_INF("%s%s", (char *)prefix, (char *)buf);
}
/**@brief Function to publish data on the configured topic
 */
int data_publish(struct mqtt_client *c, enum mqtt_qos qos,
	uint8_t *data, size_t len)
{
	struct mqtt_publish_param param;

	param.message.topic.qos = qos;
	param.message.topic.topic.utf8 = CONFIG_MQTT_PUB_TOPIC;
	param.message.topic.topic.size = strlen(CONFIG_MQTT_PUB_TOPIC);
	param.message.payload.data = data;
	param.message.payload.len = len;
	param.message_id = sys_rand32_get();;
	param.dup_flag = 0;
	param.retain_flag = 0;

	data_print("Publishing: ", data, len);
	LOG_INF("to topic: %s len: %u",CONFIG_MQTT_PUB_TOPIC, (unsigned int)strlen(CONFIG_MQTT_PUB_TOPIC));

	return mqtt_publish(c, &param);
}

int main(void)
{
	int ret;
	struct net_if *iface = net_if_get_default();		//Get the default network interface

	//nrfx_clock_divider_set(NRF_CLOCK_DOMAIN_HFCLK, NRF_CLOCK_HFCLK_DIV_1);		//Set clock at 128MHz

	//Configure GPIO
	ret = gpio_pin_configure_dt(&button, GPIO_INPUT);

	//Register Wi-Fi callback
	net_mgmt_init_event_callback(&wifi_shell_mgmt_cb, wifi_mgmt_event_handler, WIFI_SHELL_MGMT_EVENTS);
	net_mgmt_add_event_callback(&wifi_shell_mgmt_cb);
	net_mgmt_init_event_callback(&net_shell_mgmt_cb, net_mgmt_event_handler,NET_EVENT_IPV4_DHCP_BOUND);
	net_mgmt_add_event_callback(&net_shell_mgmt_cb);

	//ret = client_init(&client_ctx);
	if (ret != 0) {
		LOG_ERR("client_init: %d", ret);
		return;
	}

	LOG_INF("Starting... wait 3 sec");
	k_sleep(K_MSEC(3000));
	LOG_INF("Start");

	//Scan Wi-Fi Networks
	if (net_mgmt(NET_REQUEST_WIFI_SCAN, iface, NULL, 0)) {LOG_INF("Scan request failed");}else{LOG_INF("Scan requested");}

	while(1){
		


		int val = gpio_pin_get_dt(&button);
		if(val){
			while(gpio_pin_get_dt(&button)){k_sleep(K_MSEC(5));}

			if(!connected){
				for(int i = 0; i < scan_results_index; i++){
					LOG_INF("SSID: %-28s | CHANNEL: %-4u | RSSI: %-4d |  SECURITY: %d | MAC length: %d", scan_results[i].ssid, scan_results[i].channel, scan_results[i].rssi, scan_results[i].security, scan_results[i].mac_length);
					k_sleep(K_MSEC(50));
				}
				LOG_INF("WIFI");
				wifi_connect();
			}else{
				LOG_INF("MQTT INIT");
				do{
				ret = client_init(&client_ctx);
					if (ret != 0) {
						LOG_ERR("client_init: %d", ret);
						k_sleep(K_MSEC(3000));
						LOG_INF("Try again...");
					}else{
						LOG_INF("success client_init!!!: %d", ret);
					}
				}while(ret != 0 );
				LOG_INF("MQTT CONNECTING");
				do{
					ret = mqtt_connect(&client_ctx);

					if (ret != 0) {
						LOG_ERR("mqtt_connect %d", ret);
						LOG_INF("Reconnecting");
					}else{
						LOG_INF("success mqtt_connect!!!: %u", ret);
					}
					k_sleep(K_MSEC(3000));
				}while(ret != 0);

				ret = fds_init(&client_ctx,&fds);

				ret = poll(&fds, 1, mqtt_keepalive_time_left(&client_ctx));
				if (ret < 0) {
					LOG_ERR("Error in poll(): %d", errno);
					break;
				}

				ret = mqtt_live(&client_ctx);
				if ((ret != 0) && (ret != -EAGAIN)) {
					LOG_ERR("Error in mqtt_live: %d", ret);
					break;
				}

				uint8_t data[50];
				sprintf(data,"HELLO");

				do{
					ret = data_publish(&client_ctx, MQTT_QOS_1_AT_LEAST_ONCE, data, strlen(data));
					if (ret != 0) {
						LOG_ERR("Failed to send message, %d", ret);
							k_sleep(K_MSEC(3000));
							LOG_INF("Try again...");
					}else{
						LOG_INF("Publish success!!: %d", ret);
					}

				}while(ret != 0);
				LOG_INF("DONE");
				k_sleep(K_MSEC(10000));
				

			}
		}
		//LOG_INF("Button = %d", val);
		k_sleep(K_MSEC(5));


	}
	return 0;
}

I think I have followed the right operations in order to connect to an MQTT broker, but is there something that I am missing?

Thank you

Parents Reply Children
No Data
Related