This post is older than 2 years and might not be relevant anymore
More Info: Consider searching for newer posts

Publishing MQTT data using k_timer to send data out every 1 hour

Hi Dev Team,

I am using the mqtt_simple example to publish my data out every 1 hour. I tried the example and it is working well and I was able to test it via MQTT.fx with an open source broker. 

Now, I want to publish my sensor data through MQTT every 1 hour. For that, I was thinking to use the k_timer() , Zephyr timers to call the data_publish() function every 1 hour.

I modified the example by adding a timer to call the function but it doesn't seem to work.

I have attached the code with what I am trying to do. The code modification is from the function on line 415.

/*
 * Copyright (c) 2018 Nordic Semiconductor ASA
 *
 * SPDX-License-Identifier: LicenseRef-BSD-5-Clause-Nordic
 */

#include <zephyr.h>
#include <stdio.h>
#include <inttypes.h>
#include <drivers/uart.h>
#include <string.h>
#include <stdlib.h>
#include <device.h>

#include <nrfx.h>
#include <lightmodbus.h>
#include <master.h>
#include <slave.h>
#include <hal/nrf_gpio.h>
#include <drivers/gpio.h>

#include <net/mqtt.h>
#include <net/socket.h>
#include <modem/lte_lc.h>
#if defined(CONFIG_LWM2M_CARRIER)
#include <lwm2m_carrier.h>
#endif

struct k_timer my_mqtt_timer;

struct k_timer my_modbus_timer;

#define SERIAL_Q_SIZE 10

static struct work_data{    
    struct k_work work;    
    u8_t data[1024]; 
    u8_t data_len;
}w_data[SERIAL_Q_SIZE];



/* Buffers for MQTT client. */
static u8_t rx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
static u8_t tx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
static u8_t payload_buf[CONFIG_MQTT_PAYLOAD_BUFFER_SIZE];

/* The mqtt client struct */
static struct mqtt_client client;

/* MQTT Broker details. */
static struct sockaddr_storage broker;

/* Connected flag */
static bool connected;

/* File descriptor */
static struct pollfd fds;

#if defined(CONFIG_BSD_LIBRARY)

/**@brief Recoverable BSD library error. */
void bsd_recoverable_error_handler(uint32_t err)
{
	printk("bsdlib recoverable error: %u\n", (unsigned int)err);
}

#endif /* defined(CONFIG_BSD_LIBRARY) */

#if defined(CONFIG_LWM2M_CARRIER)
K_SEM_DEFINE(carrier_registered, 0, 1);

void lwm2m_carrier_event_handler(const lwm2m_carrier_event_t *event)
{
	switch (event->type) {
	case LWM2M_CARRIER_EVENT_BSDLIB_INIT:
		printk("LWM2M_CARRIER_EVENT_BSDLIB_INIT\n");
		break;
	case LWM2M_CARRIER_EVENT_CONNECT:
		printk("LWM2M_CARRIER_EVENT_CONNECT\n");
		break;
	case LWM2M_CARRIER_EVENT_DISCONNECT:
		printk("LWM2M_CARRIER_EVENT_DISCONNECT\n");
		break;
	case LWM2M_CARRIER_EVENT_READY:
		printk("LWM2M_CARRIER_EVENT_READY\n");
		k_sem_give(&carrier_registered);
		break;
	case LWM2M_CARRIER_EVENT_FOTA_START:
		printk("LWM2M_CARRIER_EVENT_FOTA_START\n");
		break;
	case LWM2M_CARRIER_EVENT_REBOOT:
		printk("LWM2M_CARRIER_EVENT_REBOOT\n");
		break;
	}
}
#endif /* defined(CONFIG_LWM2M_CARRIER) */

/**@brief Function to print strings without null-termination
 */
static void data_print(u8_t *prefix, u8_t *data, size_t len)
{
	char buf[len + 1];

	memcpy(buf, data, len);
	buf[len] = 0;
	printk("%s%s\n", prefix, buf);
}

/**@brief Function to publish data on the configured topic
 */
static int data_publish(struct mqtt_client *c, enum mqtt_qos qos,
	u8_t *data, size_t len)
{
	struct mqtt_publish_param param;

	param.message.topic.qos = qos;
	param.message.topic.topic.utf8 = CONFIG_MQTT_PUB_TOPIC;
	param.message.topic.topic.size = strlen(CONFIG_MQTT_PUB_TOPIC);
	param.message.payload.data = data;
	param.message.payload.len = len;
	param.message_id = sys_rand32_get();
	param.dup_flag = 0;
	param.retain_flag = 0;

	data_print("Publishing: ", data, len);
	printk("to topic: %s len: %u\n",
		CONFIG_MQTT_PUB_TOPIC,
		(unsigned int)strlen(CONFIG_MQTT_PUB_TOPIC));

	return mqtt_publish(c, &param);
}

/**@brief Function to subscribe to the configured topic
 */
static int subscribe(void)
{
	struct mqtt_topic subscribe_topic = {
		.topic = {
			.utf8 = CONFIG_MQTT_SUB_TOPIC,
			.size = strlen(CONFIG_MQTT_SUB_TOPIC)
		},
		.qos = MQTT_QOS_1_AT_LEAST_ONCE
	};

	const struct mqtt_subscription_list subscription_list = {
		.list = &subscribe_topic,
		.list_count = 1,
		.message_id = 1234
	};

	printk("Subscribing to: %s len %u\n", CONFIG_MQTT_SUB_TOPIC,
		(unsigned int)strlen(CONFIG_MQTT_SUB_TOPIC));

	return mqtt_subscribe(&client, &subscription_list);
}

/**@brief Function to read the published payload.
 */
static int publish_get_payload(struct mqtt_client *c, size_t length)
{
	u8_t *buf = payload_buf;
	u8_t *end = buf + length;

	if (length > sizeof(payload_buf)) {
		return -EMSGSIZE;
	}

	while (buf < end) {
		int ret = mqtt_read_publish_payload(c, buf, end - buf);

		if (ret < 0) {
			int err;

			if (ret != -EAGAIN) {
				return ret;
			}

			printk("mqtt_read_publish_payload: EAGAIN\n");

			err = poll(&fds, 1,
				   CONFIG_MQTT_KEEPALIVE * MSEC_PER_SEC);
			if (err > 0 && (fds.revents & POLLIN) == POLLIN) {
				continue;
			} else {
				return -EIO;
			}
		}

		if (ret == 0) {
			return -EIO;
		}

		buf += ret;
	}

	return 0;
}

/**@brief MQTT client event handler
 */
void mqtt_evt_handler(struct mqtt_client *const c,
		      const struct mqtt_evt *evt)
{
	int err;

	switch (evt->type) {
	case MQTT_EVT_CONNACK:
		if (evt->result != 0) {
			printk("MQTT connect failed %d\n", evt->result);
			break;
		}

		connected = true;
		printk("[%s:%d] MQTT client connected!\n", __func__, __LINE__);
		subscribe();
		break;

	case MQTT_EVT_DISCONNECT:
		printk("[%s:%d] MQTT client disconnected %d\n", __func__,
		       __LINE__, evt->result);

		connected = false;
		break;

	case MQTT_EVT_PUBLISH: {
		const struct mqtt_publish_param *p = &evt->param.publish;

		printk("[%s:%d] MQTT PUBLISH result=%d len=%d\n", __func__,
		       __LINE__, evt->result, p->message.payload.len);
		err = publish_get_payload(c, p->message.payload.len);
		if (err >= 0) {
			data_print("Received: ", payload_buf,
				p->message.payload.len);
			/* Echo back received data */
			data_publish(&client, MQTT_QOS_1_AT_LEAST_ONCE,
				payload_buf, p->message.payload.len);
		} else {
			printk("mqtt_read_publish_payload: Failed! %d\n", err);
			printk("Disconnecting MQTT client...\n");

			err = mqtt_disconnect(c);
			if (err) {
				printk("Could not disconnect: %d\n", err);
			}
		}
	} break;

	case MQTT_EVT_PUBACK:
		if (evt->result != 0) {
			printk("MQTT PUBACK error %d\n", evt->result);
			break;
		}

		printk("[%s:%d] PUBACK packet id: %u\n", __func__, __LINE__,
				evt->param.puback.message_id);
		break;

	case MQTT_EVT_SUBACK:
		if (evt->result != 0) {
			printk("MQTT SUBACK error %d\n", evt->result);
			break;
		}

		printk("[%s:%d] SUBACK packet id: %u\n", __func__, __LINE__,
				evt->param.suback.message_id);
		break;

	default:
		printk("[%s:%d] default: %d\n", __func__, __LINE__,
				evt->type);
		break;
	}
}

/**@brief Resolves the configured hostname and
 * initializes the MQTT broker structure
 */
static void broker_init(void)
{
	int err;
	struct addrinfo *result;
	struct addrinfo *addr;
	struct addrinfo hints = {
		.ai_family = AF_INET,
		.ai_socktype = SOCK_STREAM
	};

	err = getaddrinfo(CONFIG_MQTT_BROKER_HOSTNAME, NULL, &hints, &result);
	if (err) {
		printk("ERROR: getaddrinfo failed %d\n", err);

		return;
	}

	addr = result;
	err = -ENOENT;

	/* Look for address of the broker. */
	while (addr != NULL) {
		/* IPv4 Address. */
		if (addr->ai_addrlen == sizeof(struct sockaddr_in)) {
			struct sockaddr_in *broker4 =
				((struct sockaddr_in *)&broker);
			char ipv4_addr[NET_IPV4_ADDR_LEN];

			broker4->sin_addr.s_addr =
				((struct sockaddr_in *)addr->ai_addr)
				->sin_addr.s_addr;
			broker4->sin_family = AF_INET;
			broker4->sin_port = htons(CONFIG_MQTT_BROKER_PORT);

			inet_ntop(AF_INET, &broker4->sin_addr.s_addr,
				  ipv4_addr, sizeof(ipv4_addr));
			printk("IPv4 Address found %s\n", ipv4_addr);

			break;
		} else {
			printk("ai_addrlen = %u should be %u or %u\n",
				(unsigned int)addr->ai_addrlen,
				(unsigned int)sizeof(struct sockaddr_in),
				(unsigned int)sizeof(struct sockaddr_in6));
		}

		addr = addr->ai_next;
		break;
	}

	/* Free the address. */
	freeaddrinfo(result);
}

/**@brief Initialize the MQTT client structure
 */
static void client_init(struct mqtt_client *client)
{
	mqtt_client_init(client);

	broker_init();

	/* MQTT client configuration */
	client->broker = &broker;
	client->evt_cb = mqtt_evt_handler;
	client->client_id.utf8 = (u8_t *)CONFIG_MQTT_CLIENT_ID;
	client->client_id.size = strlen(CONFIG_MQTT_CLIENT_ID);
	client->password = NULL;
	client->user_name = NULL;
	client->protocol_version = MQTT_VERSION_3_1_1;

	/* MQTT buffers configuration */
	client->rx_buf = rx_buffer;
	client->rx_buf_size = sizeof(rx_buffer);
	client->tx_buf = tx_buffer;
	client->tx_buf_size = sizeof(tx_buffer);

	/* MQTT transport configuration */
#if defined(CONFIG_MQTT_LIB_TLS)
	client->transport.type = MQTT_TRANSPORT_SECURE;
#else
	client->transport.type = MQTT_TRANSPORT_NON_SECURE;
#endif
}

/**@brief Initialize the file descriptor structure used by poll.
 */
static int fds_init(struct mqtt_client *c)
{
	if (c->transport.type == MQTT_TRANSPORT_NON_SECURE) {
		fds.fd = c->transport.tcp.sock;
	} else {
#if defined(CONFIG_MQTT_LIB_TLS)
		fds.fd = c->transport.tls.sock;
#else
		return -ENOTSUP;
#endif
	}

	fds.events = POLLIN;

	return 0;
}

/**@brief Configures modem to provide LTE link. Blocks until link is
 * successfully established.
 */
static void modem_configure(void)
{
#if defined(CONFIG_LTE_LINK_CONTROL)
	if (IS_ENABLED(CONFIG_LTE_AUTO_INIT_AND_CONNECT)) {
		/* Do nothing, modem is already turned on
		 * and connected.
		 */
	} else {
#if defined(CONFIG_LWM2M_CARRIER)
		/* Wait for the LWM2M_CARRIER to configure the modem and
		 * start the connection.
		 */
		printk("Waitng for carrier registration...\n");
		k_sem_take(&carrier_registered, K_FOREVER);
		printk("Registered!\n");
#else /* defined(CONFIG_LWM2M_CARRIER) */
		int err;

		printk("LTE Link Connecting ...\n");
		err = lte_lc_init_and_connect();
		__ASSERT(err == 0, "LTE link could not be established.");
		printk("LTE Link Connected!\n");
#endif /* defined(CONFIG_LWM2M_CARRIER) */
	}
#endif /* defined(CONFIG_LTE_LINK_CONTROL) */
}

// Changes made here to try and make the function act like a timer

void static mqttLoop(struct k_timer *timer_id)
{

      printk("MQTT Sample started...\n");

      int err;
      client_init(&client);

	err = mqtt_connect(&client);
	if (err != 0) {
		printk("ERROR: mqtt_connect %d\n", err);
		return;
	}

	err = fds_init(&client);
	if (err != 0) {
		printk("ERROR: fds_init %d\n", err);
		return;
	}

	
		err = poll(&fds, 1, K_SECONDS(CONFIG_MQTT_KEEPALIVE));
		if (err < 0) {
			printk("ERROR: poll %d\n", errno);
			
		}

		err = mqtt_live(&client);
		if (err != 0) {
			printk("ERROR: mqtt_live %d\n", err);
			
		}

		if ((fds.revents & POLLIN) == POLLIN) {
			err = mqtt_input(&client);
			if (err != 0) {
				printk("ERROR: mqtt_input %d\n", err);
				
			}
		}

		if ((fds.revents & POLLERR) == POLLERR) {
			printk("POLLERR\n");
			
		}

		if ((fds.revents & POLLNVAL) == POLLNVAL) {
			printk("POLLNVAL\n");
			
		}
                
                char dPayload[100];
                sprintf(dPayload,"sample data");
                data_publish(&client,0,dPayload,strlen(dPayload));
                k_sleep(K_SECONDS(3));
	

	printk("Disconnecting MQTT client...\n");

	err = mqtt_disconnect(&client);
	if (err) {
		printk("Could not disconnect MQTT client. Error: %d\n", err);
	}

        err = mqtt_live(&client);
        if (err != 0) {
                printk("ERROR: mqtt_live %d\n", err);
        }
}


K_TIMER_DEFINE(get_mqtt, mqttLoop, NULL);

void main(void)
{


        int i;

        int err;

        modem_configure();

        err = lte_lc_psm_req(true);
        if (err) {
                printk("ERROR: set psm %d\n", err);
                return;
        }

        //k_timer_init(&my_mqtt_timer, mqttLoop, NULL);

        k_timer_start(&get_mqtt, K_SECONDS(30), K_SECONDS(3600));

       /* while(true)
        {
        mqttLoop();
        k_sleep(K_SECONDS(5));
        } */


        
}

The code enters the function and gets stuck after the print line at the start "MQTT sample started".

Can you help me identify the issue and how it could be corrected ?

Regards,

Adeel

  • Hi Adeel,

        The functions in the while loop of mqtt_simple are used to maintain and keep the MQTT connection, so they should run all the time. Instead, you can set a timer to just run the data_publish function every 1 hour. I think this is a simple solution for your needs.

  • Hi Charlie,

    Thanks for the idea. I tried it out as well but the data_publish function works only for the first time. It gets stuck later.

    
    
    void static mqttLoop()
    {
    
          uint16_t val;
    
          printk("MQTT Sample started...\n");
    
          modem_configure();
    
          int err;
          client_init(&client);
    
    	err = mqtt_connect(&client);
    	if (err != 0) {
    		printk("ERROR: mqtt_connect %d\n", err);
    		return;
    	}
    
    	err = fds_init(&client);
    	if (err != 0) {
    		printk("ERROR: fds_init %d\n", err);
    		return;
    	}
    
    	
    		err = poll(&fds, 1, K_SECONDS(CONFIG_MQTT_KEEPALIVE));
    		if (err < 0) {
    			printk("ERROR: poll %d\n", errno);
    			
    		}
    
    		err = mqtt_live(&client);
    		if (err != 0) {
    			printk("ERROR: mqtt_live %d\n", err);
    			
    		}
    
    		if ((fds.revents & POLLIN) == POLLIN) {
    			err = mqtt_input(&client);
    			if (err != 0) {
    				printk("ERROR: mqtt_input %d\n", err);
    				
    			}
    		}
    
    		if ((fds.revents & POLLERR) == POLLERR) {
    			printk("POLLERR\n");
    			
    		}
    
    		if ((fds.revents & POLLNVAL) == POLLNVAL) {
    			printk("POLLNVAL\n");
    			
    		}
                    
                    
                    k_sleep(K_SECONDS(2));
    
    	printk("Disconnecting MQTT client...\n");
    
    	err = mqtt_disconnect(&client);
    	if (err) {
    		printk("Could not disconnect MQTT client. Error: %d\n", err);
    	}
    
            err = mqtt_live(&client);
            if (err != 0) {
                    printk("ERROR: mqtt_live %d\n", err);
            }
    }
    
    
    void my_mqtt_function(struct k_timer *timer_id)
    {
    
          
          uint16_t val;
          char dPayload[100];
          val = sprintf(dPayload, "%f", w);
          data_publish(&client,0,dPayload,strlen(dPayload));
    
          printk("Data being published\n");
    
    }
    
    K_TIMER_DEFINE(get_mqtt, my_mqtt_function, NULL);
    
    void main(void)
    {
            int i;
            int err;
    
            printk("Code starts here...\n");
    
            modem_configure();
            
            k_timer_start(&get_mqtt, K_SECONDS(30), K_SECONDS(30));
            
             while(true)
            {
             mqttLoop();
            //k_sleep(K_SECONDS(5));
            } 
           
    }

    This is the snippet for my mqtt_loop() and my_mqtt_fucntion() as timer. I moved the data_publish function in the timer expiry function. 

    I initiated the timer in the main() to regularly publish every 30 secs to check. It executes the first time but gets stuck again after the first execution. 

    I kept a check in the timer function through a print statement but that does not get printed (printk("Data being published\n");). So, I guess it gets stuck after publishing the first time. 

    Any idea for this issue ?

    Regards,

    Adeel. 

  • Hi Charlie,

    I tried to work around a bit with the same code and I see that the code is rebooting itself as soon as it executes the function for the first time.

    Can you help me in identifying the issue here ?

    Regards,

    Adeel.

  • Hi Adeel,

    I write an example based on mqtt_simple from ncs1.4.0, it works fine after a simple test. Please pay attention that the data_publish function should operate in a work thread instead timer handler. Here is the code:

    /*
     * Copyright (c) 2018 Nordic Semiconductor ASA
     *
     * SPDX-License-Identifier: LicenseRef-BSD-5-Clause-Nordic
     */
    
    #include <zephyr.h>
    #include <stdio.h>
    #include <drivers/uart.h>
    #include <string.h>
    #include <random/rand32.h>
    #include <net/mqtt.h>
    #include <net/socket.h>
    #include <modem/lte_lc.h>
    #if defined(CONFIG_MODEM_KEY_MGMT)
    #include <modem/modem_key_mgmt.h>
    #endif
    #if defined(CONFIG_LWM2M_CARRIER)
    #include <lwm2m_carrier.h>
    #endif
    
    #include "certificates.h"
    
    /* Buffers for MQTT client. */
    static uint8_t rx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
    static uint8_t tx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
    static uint8_t payload_buf[CONFIG_MQTT_PAYLOAD_BUFFER_SIZE];
    
    /* The mqtt client struct */
    static struct mqtt_client client;
    
    /* MQTT Broker details. */
    static struct sockaddr_storage broker;
    
    /* File descriptor */
    static struct pollfd fds;
    
    #if defined(CONFIG_MQTT_LIB_TLS)
    static int certificates_provision(void)
    {
    	int err = 0;
    
    	printk("Provisioning certificates\n");
    
    #if defined(CONFIG_BSD_LIBRARY) && defined(CONFIG_MODEM_KEY_MGMT)
    
    	err = modem_key_mgmt_write(CONFIG_MQTT_TLS_SEC_TAG,
    				   MODEM_KEY_MGMT_CRED_TYPE_CA_CHAIN,
    				   CA_CERTIFICATE,
    				   strlen(CA_CERTIFICATE));
    	if (err) {
    		printk("Failed to provision CA certificate, err %d\n", err);
    		return err;
    	}
    
    #elif defined(CONFIG_BOARD_QEMU_X86) && defined(CONFIG_NET_SOCKETS_SOCKOPT_TLS)
    
    	err = tls_credential_add(CONFIG_MQTT_TLS_SEC_TAG,
    				 TLS_CREDENTIAL_CA_CERTIFICATE,
    				 CA_CERTIFICATE,
    				 sizeof(CA_CERTIFICATE));
    	if (err) {
    		printk("Failed to register CA certificate, error: %d\n", err);
    		return err;
    	}
    
    #endif
    
    	return err;
    }
    #endif /* defined(CONFIG_MQTT_LIB_TLS) */
    
    #if defined(CONFIG_BSD_LIBRARY)
    
    /**@brief Recoverable BSD library error. */
    void bsd_recoverable_error_handler(uint32_t err)
    {
    	printk("bsdlib recoverable error: %u\n", (unsigned int)err);
    }
    
    #endif /* defined(CONFIG_BSD_LIBRARY) */
    
    #if defined(CONFIG_LWM2M_CARRIER)
    K_SEM_DEFINE(carrier_registered, 0, 1);
    
    void lwm2m_carrier_event_handler(const lwm2m_carrier_event_t *event)
    {
    	switch (event->type) {
    	case LWM2M_CARRIER_EVENT_BSDLIB_INIT:
    		printk("LWM2M_CARRIER_EVENT_BSDLIB_INIT\n");
    		break;
    	case LWM2M_CARRIER_EVENT_CONNECT:
    		printk("LWM2M_CARRIER_EVENT_CONNECT\n");
    		break;
    	case LWM2M_CARRIER_EVENT_DISCONNECT:
    		printk("LWM2M_CARRIER_EVENT_DISCONNECT\n");
    		break;
    	case LWM2M_CARRIER_EVENT_READY:
    		printk("LWM2M_CARRIER_EVENT_READY\n");
    		k_sem_give(&carrier_registered);
    		break;
    	case LWM2M_CARRIER_EVENT_FOTA_START:
    		printk("LWM2M_CARRIER_EVENT_FOTA_START\n");
    		break;
    	case LWM2M_CARRIER_EVENT_REBOOT:
    		printk("LWM2M_CARRIER_EVENT_REBOOT\n");
    		break;
    	}
    }
    #endif /* defined(CONFIG_LWM2M_CARRIER) */
    
    /**@brief Function to print strings without null-termination
     */
    static void data_print(uint8_t *prefix, uint8_t *data, size_t len)
    {
    	char buf[len + 1];
    
    	memcpy(buf, data, len);
    	buf[len] = 0;
    	printk("%s%s\n", prefix, buf);
    }
    
    /**@brief Function to publish data on the configured topic
     */
    static int data_publish(struct mqtt_client *c, enum mqtt_qos qos,
    	uint8_t *data, size_t len)
    {
    	struct mqtt_publish_param param;
    
    	param.message.topic.qos = qos;
    	param.message.topic.topic.utf8 = CONFIG_MQTT_PUB_TOPIC;
    	param.message.topic.topic.size = strlen(CONFIG_MQTT_PUB_TOPIC);
    	param.message.payload.data = data;
    	param.message.payload.len = len;
    	param.message_id = sys_rand32_get();
    	param.dup_flag = 0;
    	param.retain_flag = 0;
    
    	data_print("Publishing: ", data, len);
    	printk("to topic: %s len: %u\n",
    		CONFIG_MQTT_PUB_TOPIC,
    		(unsigned int)strlen(CONFIG_MQTT_PUB_TOPIC));
    
    	return mqtt_publish(c, &param);
    }
    
    /**@brief Function to subscribe to the configured topic
     */
    static int subscribe(void)
    {
    	struct mqtt_topic subscribe_topic = {
    		.topic = {
    			.utf8 = CONFIG_MQTT_SUB_TOPIC,
    			.size = strlen(CONFIG_MQTT_SUB_TOPIC)
    		},
    		.qos = MQTT_QOS_1_AT_LEAST_ONCE
    	};
    
    	const struct mqtt_subscription_list subscription_list = {
    		.list = &subscribe_topic,
    		.list_count = 1,
    		.message_id = 1234
    	};
    
    	printk("Subscribing to: %s len %u\n", CONFIG_MQTT_SUB_TOPIC,
    		(unsigned int)strlen(CONFIG_MQTT_SUB_TOPIC));
    
    	return mqtt_subscribe(&client, &subscription_list);
    }
    
    /**@brief Function to read the published payload.
     */
    static int publish_get_payload(struct mqtt_client *c, size_t length)
    {
    	uint8_t *buf = payload_buf;
    	uint8_t *end = buf + length;
    
    	if (length > sizeof(payload_buf)) {
    		return -EMSGSIZE;
    	}
    
    	while (buf < end) {
    		int ret = mqtt_read_publish_payload(c, buf, end - buf);
    
    		if (ret < 0) {
    			int err;
    
    			if (ret != -EAGAIN) {
    				return ret;
    			}
    
    			printk("mqtt_read_publish_payload: EAGAIN\n");
    
    			err = poll(&fds, 1,
    				   CONFIG_MQTT_KEEPALIVE * MSEC_PER_SEC);
    			if (err > 0 && (fds.revents & POLLIN) == POLLIN) {
    				continue;
    			} else {
    				return -EIO;
    			}
    		}
    
    		if (ret == 0) {
    			return -EIO;
    		}
    
    		buf += ret;
    	}
    
    	return 0;
    }
    
    /**@brief MQTT client event handler
     */
    void mqtt_evt_handler(struct mqtt_client *const c,
    		      const struct mqtt_evt *evt)
    {
    	int err;
    
    	switch (evt->type) {
    	case MQTT_EVT_CONNACK:
    		if (evt->result != 0) {
    			printk("MQTT connect failed %d\n", evt->result);
    			break;
    		}
    
    		printk("[%s:%d] MQTT client connected!\n", __func__, __LINE__);
    
    #if defined(CONFIG_MQTT_LIB_TLS)
    		printk("Connected with TLS\n");
    #endif
    
    		subscribe();
    		break;
    
    	case MQTT_EVT_DISCONNECT:
    		printk("[%s:%d] MQTT client disconnected %d\n", __func__,
    		       __LINE__, evt->result);
    
    		break;
    
    	case MQTT_EVT_PUBLISH: {
    		const struct mqtt_publish_param *p = &evt->param.publish;
    
    		printk("[%s:%d] MQTT PUBLISH result=%d len=%d\n", __func__,
    		       __LINE__, evt->result, p->message.payload.len);
    		err = publish_get_payload(c, p->message.payload.len);
    		if (err >= 0) {
    			data_print("Received: ", payload_buf,
    				p->message.payload.len);
    			/* Echo back received data */
    			data_publish(&client, MQTT_QOS_1_AT_LEAST_ONCE,
    				payload_buf, p->message.payload.len);
    		} else {
    			printk("mqtt_read_publish_payload: Failed! %d\n", err);
    			printk("Disconnecting MQTT client...\n");
    
    			err = mqtt_disconnect(c);
    			if (err) {
    				printk("Could not disconnect: %d\n", err);
    			}
    		}
    	} break;
    
    	case MQTT_EVT_PUBACK:
    		if (evt->result != 0) {
    			printk("MQTT PUBACK error %d\n", evt->result);
    			break;
    		}
    
    		printk("[%s:%d] PUBACK packet id: %u\n", __func__, __LINE__,
    				evt->param.puback.message_id);
    		break;
    
    	case MQTT_EVT_SUBACK:
    		if (evt->result != 0) {
    			printk("MQTT SUBACK error %d\n", evt->result);
    			break;
    		}
    
    		printk("[%s:%d] SUBACK packet id: %u\n", __func__, __LINE__,
    				evt->param.suback.message_id);
    		break;
    
    	case MQTT_EVT_PINGRESP:
    		if (evt->result != 0) {
    			printk("MQTT PINGRESP error %d\n", evt->result);
    		}
    		break;
    
    	default:
    		printk("[%s:%d] default: %d\n", __func__, __LINE__,
    				evt->type);
    		break;
    	}
    }
    
    /**@brief Resolves the configured hostname and
     * initializes the MQTT broker structure
     */
    static int broker_init(void)
    {
    	int err;
    	struct addrinfo *result;
    	struct addrinfo *addr;
    	struct addrinfo hints = {
    		.ai_family = AF_INET,
    		.ai_socktype = SOCK_STREAM
    	};
    
    	err = getaddrinfo(CONFIG_MQTT_BROKER_HOSTNAME, NULL, &hints, &result);
    	if (err) {
    		printk("ERROR: getaddrinfo failed %d\n", err);
    		return -ECHILD;
    	}
    
    	addr = result;
    
    	/* Look for address of the broker. */
    	while (addr != NULL) {
    		/* IPv4 Address. */
    		if (addr->ai_addrlen == sizeof(struct sockaddr_in)) {
    			struct sockaddr_in *broker4 =
    				((struct sockaddr_in *)&broker);
    			char ipv4_addr[NET_IPV4_ADDR_LEN];
    
    			broker4->sin_addr.s_addr =
    				((struct sockaddr_in *)addr->ai_addr)
    				->sin_addr.s_addr;
    			broker4->sin_family = AF_INET;
    			broker4->sin_port = htons(CONFIG_MQTT_BROKER_PORT);
    
    			inet_ntop(AF_INET, &broker4->sin_addr.s_addr,
    				  ipv4_addr, sizeof(ipv4_addr));
    			printk("IPv4 Address found %s\n", ipv4_addr);
    			break;
    		} else {
    			printk("ai_addrlen = %u should be %u or %u\n",
    				(unsigned int)addr->ai_addrlen,
    				(unsigned int)sizeof(struct sockaddr_in),
    				(unsigned int)sizeof(struct sockaddr_in6));
    		}
    
    		addr = addr->ai_next;
    	}
    
    	/* Free the address. */
    	freeaddrinfo(result);
    
    	return err;
    }
    
    /**@brief Initialize the MQTT client structure
     */
    static int client_init(struct mqtt_client *client)
    {
    	int err;
    
    	mqtt_client_init(client);
    
    	err = broker_init();
    	if (err) {
    		printk("Failed to initialize broker connection\n");
    		return err;
    	}
    
    	/* MQTT client configuration */
    	client->broker = &broker;
    	client->evt_cb = mqtt_evt_handler;
    	client->client_id.utf8 = (uint8_t *)CONFIG_MQTT_CLIENT_ID;
    	client->client_id.size = strlen(CONFIG_MQTT_CLIENT_ID);
    	client->password = NULL;
    	client->user_name = NULL;
    	client->protocol_version = MQTT_VERSION_3_1_1;
    
    	/* MQTT buffers configuration */
    	client->rx_buf = rx_buffer;
    	client->rx_buf_size = sizeof(rx_buffer);
    	client->tx_buf = tx_buffer;
    	client->tx_buf_size = sizeof(tx_buffer);
    
    	/* MQTT transport configuration */
    #if defined(CONFIG_MQTT_LIB_TLS)
    	struct mqtt_sec_config *tls_cfg = &(client->transport).tls.config;
    	static sec_tag_t sec_tag_list[] = { CONFIG_MQTT_TLS_SEC_TAG };
    
    	client->transport.type = MQTT_TRANSPORT_SECURE;
    
    	tls_cfg->peer_verify = CONFIG_MQTT_TLS_PEER_VERIFY;
    	tls_cfg->cipher_count = 0;
    	tls_cfg->cipher_list = NULL;
    	tls_cfg->sec_tag_count = ARRAY_SIZE(sec_tag_list);
    	tls_cfg->sec_tag_list = sec_tag_list;
    	tls_cfg->hostname = CONFIG_MQTT_BROKER_HOSTNAME;
    
    #if defined(CONFIG_BSD_LIBRARY)
    	tls_cfg->session_cache = IS_ENABLED(CONFIG_MQTT_TLS_SESSION_CACHING) ?
    					    TLS_SESSION_CACHE_ENABLED :
    					    TLS_SESSION_CACHE_DISABLED;
    #else
    	/* TLS session caching is not supported by the Zephyr network stack */
    	tls_cfg->session_cache = TLS_SESSION_CACHE_DISABLED;
    
    #endif
    
    #else
    	client->transport.type = MQTT_TRANSPORT_NON_SECURE;
    #endif
    
    	return err;
    }
    
    /**@brief Initialize the file descriptor structure used by poll.
     */
    static int fds_init(struct mqtt_client *c)
    {
    	if (c->transport.type == MQTT_TRANSPORT_NON_SECURE) {
    		fds.fd = c->transport.tcp.sock;
    	} else {
    #if defined(CONFIG_MQTT_LIB_TLS)
    		fds.fd = c->transport.tls.sock;
    #else
    		return -ENOTSUP;
    #endif
    	}
    
    	fds.events = POLLIN;
    
    	return 0;
    }
    
    /**@brief Configures modem to provide LTE link. Blocks until link is
     * successfully established.
     */
    static void modem_configure(void)
    {
    #if defined(CONFIG_LTE_LINK_CONTROL)
    	if (IS_ENABLED(CONFIG_LTE_AUTO_INIT_AND_CONNECT)) {
    		/* Do nothing, modem is already turned on
    		 * and connected.
    		 */
    	} else {
    #if defined(CONFIG_LWM2M_CARRIER)
    		/* Wait for the LWM2M_CARRIER to configure the modem and
    		 * start the connection.
    		 */
    		printk("Waitng for carrier registration...\n");
    		k_sem_take(&carrier_registered, K_FOREVER);
    		printk("Registered!\n");
    #else /* defined(CONFIG_LWM2M_CARRIER) */
    		int err;
    
    		printk("LTE Link Connecting ...\n");
    		err = lte_lc_init_and_connect();
    		__ASSERT(err == 0, "LTE link could not be established.");
    		printk("LTE Link Connected!\n");
    #endif /* defined(CONFIG_LWM2M_CARRIER) */
    	}
    #endif /* defined(CONFIG_LTE_LINK_CONTROL) */
    }
    
    struct k_work my_work;
    struct k_timer my_timer;
    
    void my_work_handler(struct k_work *work)
    {
        printk("Timer triggered, publishing MQTT message!!!!\n");
    	data_publish(&client, MQTT_QOS_1_AT_LEAST_ONCE, "Hello Nordic!", 14);
    }
    K_WORK_DEFINE(my_work, my_work_handler);
    	
    void my_expiry_function(struct k_timer *timer_id) {
    	k_work_submit(&my_work);
    };
    K_TIMER_DEFINE(my_timer, my_expiry_function, NULL);
    
    void main(void)
    {
    	int err;
    
    	printk("The MQTT simple sample started\n");
    
    #if defined(CONFIG_MQTT_LIB_TLS)
    	err = certificates_provision();
    	if (err != 0) {
    		printk("Failed to provision certificates\n");
    		return;
    	}
    #endif /* defined(CONFIG_MQTT_LIB_TLS) */
    
    	modem_configure();
    
    	err = client_init(&client);
    	if (err != 0) {
    		printk("ERROR: client_init %d\n", err);
    		return;
    	}
    
    	err = mqtt_connect(&client);
    	if (err != 0) {
    		printk("ERROR: mqtt_connect %d\n", err);
    		return;
    	}
    
    	err = fds_init(&client);
    	if (err != 0) {
    		printk("ERROR: fds_init %d\n", err);
    		return;
    	}
    
    	k_timer_start(&my_timer, K_SECONDS(10), K_SECONDS(10));
    
    
    	while (1) {
    		err = poll(&fds, 1, mqtt_keepalive_time_left(&client));
    		if (err < 0) {
    			printk("ERROR: poll %d\n", errno);
    			break;
    		}
    
    		err = mqtt_live(&client);
    		if ((err != 0) && (err != -EAGAIN)) {
    			printk("ERROR: mqtt_live %d\n", err);
    			break;
    		}
    
    		if ((fds.revents & POLLIN) == POLLIN) {
    			err = mqtt_input(&client);
    			if (err != 0) {
    				printk("ERROR: mqtt_input %d\n", err);
    				break;
    			}
    		}
    
    		if ((fds.revents & POLLERR) == POLLERR) {
    			printk("POLLERR\n");
    			break;
    		}
    
    		if ((fds.revents & POLLNVAL) == POLLNVAL) {
    			printk("POLLNVAL\n");
    			break;
    		}
    	}
    
    	printk("Disconnecting MQTT client...\n");
    
    	err = mqtt_disconnect(&client);
    	if (err) {
    		printk("Could not disconnect MQTT client. Error: %d\n", err);
    	}
    }
    
     

  • Hi Charlie,

    Thanks a lot. This did the trick and gave me an insight on how to use work_thread Slight smile.

    Regards,

    Adeel.

Related