Hi Dev Team,
I am using the mqtt_simple example to publish my data out every 1 hour. I tried the example and it is working well and I was able to test it via MQTT.fx with an open source broker.
Now, I want to publish my sensor data through MQTT every 1 hour. For that, I was thinking to use the k_timer() , Zephyr timers to call the data_publish() function every 1 hour.
I modified the example by adding a timer to call the function but it doesn't seem to work.
I have attached the code with what I am trying to do. The code modification is from the function on line 415.
/* * Copyright (c) 2018 Nordic Semiconductor ASA * * SPDX-License-Identifier: LicenseRef-BSD-5-Clause-Nordic */ #include <zephyr.h> #include <stdio.h> #include <inttypes.h> #include <drivers/uart.h> #include <string.h> #include <stdlib.h> #include <device.h> #include <nrfx.h> #include <lightmodbus.h> #include <master.h> #include <slave.h> #include <hal/nrf_gpio.h> #include <drivers/gpio.h> #include <net/mqtt.h> #include <net/socket.h> #include <modem/lte_lc.h> #if defined(CONFIG_LWM2M_CARRIER) #include <lwm2m_carrier.h> #endif struct k_timer my_mqtt_timer; struct k_timer my_modbus_timer; #define SERIAL_Q_SIZE 10 static struct work_data{ struct k_work work; u8_t data[1024]; u8_t data_len; }w_data[SERIAL_Q_SIZE]; /* Buffers for MQTT client. */ static u8_t rx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE]; static u8_t tx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE]; static u8_t payload_buf[CONFIG_MQTT_PAYLOAD_BUFFER_SIZE]; /* The mqtt client struct */ static struct mqtt_client client; /* MQTT Broker details. */ static struct sockaddr_storage broker; /* Connected flag */ static bool connected; /* File descriptor */ static struct pollfd fds; #if defined(CONFIG_BSD_LIBRARY) /**@brief Recoverable BSD library error. */ void bsd_recoverable_error_handler(uint32_t err) { printk("bsdlib recoverable error: %u\n", (unsigned int)err); } #endif /* defined(CONFIG_BSD_LIBRARY) */ #if defined(CONFIG_LWM2M_CARRIER) K_SEM_DEFINE(carrier_registered, 0, 1); void lwm2m_carrier_event_handler(const lwm2m_carrier_event_t *event) { switch (event->type) { case LWM2M_CARRIER_EVENT_BSDLIB_INIT: printk("LWM2M_CARRIER_EVENT_BSDLIB_INIT\n"); break; case LWM2M_CARRIER_EVENT_CONNECT: printk("LWM2M_CARRIER_EVENT_CONNECT\n"); break; case LWM2M_CARRIER_EVENT_DISCONNECT: printk("LWM2M_CARRIER_EVENT_DISCONNECT\n"); break; case LWM2M_CARRIER_EVENT_READY: printk("LWM2M_CARRIER_EVENT_READY\n"); k_sem_give(&carrier_registered); break; case LWM2M_CARRIER_EVENT_FOTA_START: printk("LWM2M_CARRIER_EVENT_FOTA_START\n"); break; case LWM2M_CARRIER_EVENT_REBOOT: printk("LWM2M_CARRIER_EVENT_REBOOT\n"); break; } } #endif /* defined(CONFIG_LWM2M_CARRIER) */ /**@brief Function to print strings without null-termination */ static void data_print(u8_t *prefix, u8_t *data, size_t len) { char buf[len + 1]; memcpy(buf, data, len); buf[len] = 0; printk("%s%s\n", prefix, buf); } /**@brief Function to publish data on the configured topic */ static int data_publish(struct mqtt_client *c, enum mqtt_qos qos, u8_t *data, size_t len) { struct mqtt_publish_param param; param.message.topic.qos = qos; param.message.topic.topic.utf8 = CONFIG_MQTT_PUB_TOPIC; param.message.topic.topic.size = strlen(CONFIG_MQTT_PUB_TOPIC); param.message.payload.data = data; param.message.payload.len = len; param.message_id = sys_rand32_get(); param.dup_flag = 0; param.retain_flag = 0; data_print("Publishing: ", data, len); printk("to topic: %s len: %u\n", CONFIG_MQTT_PUB_TOPIC, (unsigned int)strlen(CONFIG_MQTT_PUB_TOPIC)); return mqtt_publish(c, ¶m); } /**@brief Function to subscribe to the configured topic */ static int subscribe(void) { struct mqtt_topic subscribe_topic = { .topic = { .utf8 = CONFIG_MQTT_SUB_TOPIC, .size = strlen(CONFIG_MQTT_SUB_TOPIC) }, .qos = MQTT_QOS_1_AT_LEAST_ONCE }; const struct mqtt_subscription_list subscription_list = { .list = &subscribe_topic, .list_count = 1, .message_id = 1234 }; printk("Subscribing to: %s len %u\n", CONFIG_MQTT_SUB_TOPIC, (unsigned int)strlen(CONFIG_MQTT_SUB_TOPIC)); return mqtt_subscribe(&client, &subscription_list); } /**@brief Function to read the published payload. */ static int publish_get_payload(struct mqtt_client *c, size_t length) { u8_t *buf = payload_buf; u8_t *end = buf + length; if (length > sizeof(payload_buf)) { return -EMSGSIZE; } while (buf < end) { int ret = mqtt_read_publish_payload(c, buf, end - buf); if (ret < 0) { int err; if (ret != -EAGAIN) { return ret; } printk("mqtt_read_publish_payload: EAGAIN\n"); err = poll(&fds, 1, CONFIG_MQTT_KEEPALIVE * MSEC_PER_SEC); if (err > 0 && (fds.revents & POLLIN) == POLLIN) { continue; } else { return -EIO; } } if (ret == 0) { return -EIO; } buf += ret; } return 0; } /**@brief MQTT client event handler */ void mqtt_evt_handler(struct mqtt_client *const c, const struct mqtt_evt *evt) { int err; switch (evt->type) { case MQTT_EVT_CONNACK: if (evt->result != 0) { printk("MQTT connect failed %d\n", evt->result); break; } connected = true; printk("[%s:%d] MQTT client connected!\n", __func__, __LINE__); subscribe(); break; case MQTT_EVT_DISCONNECT: printk("[%s:%d] MQTT client disconnected %d\n", __func__, __LINE__, evt->result); connected = false; break; case MQTT_EVT_PUBLISH: { const struct mqtt_publish_param *p = &evt->param.publish; printk("[%s:%d] MQTT PUBLISH result=%d len=%d\n", __func__, __LINE__, evt->result, p->message.payload.len); err = publish_get_payload(c, p->message.payload.len); if (err >= 0) { data_print("Received: ", payload_buf, p->message.payload.len); /* Echo back received data */ data_publish(&client, MQTT_QOS_1_AT_LEAST_ONCE, payload_buf, p->message.payload.len); } else { printk("mqtt_read_publish_payload: Failed! %d\n", err); printk("Disconnecting MQTT client...\n"); err = mqtt_disconnect(c); if (err) { printk("Could not disconnect: %d\n", err); } } } break; case MQTT_EVT_PUBACK: if (evt->result != 0) { printk("MQTT PUBACK error %d\n", evt->result); break; } printk("[%s:%d] PUBACK packet id: %u\n", __func__, __LINE__, evt->param.puback.message_id); break; case MQTT_EVT_SUBACK: if (evt->result != 0) { printk("MQTT SUBACK error %d\n", evt->result); break; } printk("[%s:%d] SUBACK packet id: %u\n", __func__, __LINE__, evt->param.suback.message_id); break; default: printk("[%s:%d] default: %d\n", __func__, __LINE__, evt->type); break; } } /**@brief Resolves the configured hostname and * initializes the MQTT broker structure */ static void broker_init(void) { int err; struct addrinfo *result; struct addrinfo *addr; struct addrinfo hints = { .ai_family = AF_INET, .ai_socktype = SOCK_STREAM }; err = getaddrinfo(CONFIG_MQTT_BROKER_HOSTNAME, NULL, &hints, &result); if (err) { printk("ERROR: getaddrinfo failed %d\n", err); return; } addr = result; err = -ENOENT; /* Look for address of the broker. */ while (addr != NULL) { /* IPv4 Address. */ if (addr->ai_addrlen == sizeof(struct sockaddr_in)) { struct sockaddr_in *broker4 = ((struct sockaddr_in *)&broker); char ipv4_addr[NET_IPV4_ADDR_LEN]; broker4->sin_addr.s_addr = ((struct sockaddr_in *)addr->ai_addr) ->sin_addr.s_addr; broker4->sin_family = AF_INET; broker4->sin_port = htons(CONFIG_MQTT_BROKER_PORT); inet_ntop(AF_INET, &broker4->sin_addr.s_addr, ipv4_addr, sizeof(ipv4_addr)); printk("IPv4 Address found %s\n", ipv4_addr); break; } else { printk("ai_addrlen = %u should be %u or %u\n", (unsigned int)addr->ai_addrlen, (unsigned int)sizeof(struct sockaddr_in), (unsigned int)sizeof(struct sockaddr_in6)); } addr = addr->ai_next; break; } /* Free the address. */ freeaddrinfo(result); } /**@brief Initialize the MQTT client structure */ static void client_init(struct mqtt_client *client) { mqtt_client_init(client); broker_init(); /* MQTT client configuration */ client->broker = &broker; client->evt_cb = mqtt_evt_handler; client->client_id.utf8 = (u8_t *)CONFIG_MQTT_CLIENT_ID; client->client_id.size = strlen(CONFIG_MQTT_CLIENT_ID); client->password = NULL; client->user_name = NULL; client->protocol_version = MQTT_VERSION_3_1_1; /* MQTT buffers configuration */ client->rx_buf = rx_buffer; client->rx_buf_size = sizeof(rx_buffer); client->tx_buf = tx_buffer; client->tx_buf_size = sizeof(tx_buffer); /* MQTT transport configuration */ #if defined(CONFIG_MQTT_LIB_TLS) client->transport.type = MQTT_TRANSPORT_SECURE; #else client->transport.type = MQTT_TRANSPORT_NON_SECURE; #endif } /**@brief Initialize the file descriptor structure used by poll. */ static int fds_init(struct mqtt_client *c) { if (c->transport.type == MQTT_TRANSPORT_NON_SECURE) { fds.fd = c->transport.tcp.sock; } else { #if defined(CONFIG_MQTT_LIB_TLS) fds.fd = c->transport.tls.sock; #else return -ENOTSUP; #endif } fds.events = POLLIN; return 0; } /**@brief Configures modem to provide LTE link. Blocks until link is * successfully established. */ static void modem_configure(void) { #if defined(CONFIG_LTE_LINK_CONTROL) if (IS_ENABLED(CONFIG_LTE_AUTO_INIT_AND_CONNECT)) { /* Do nothing, modem is already turned on * and connected. */ } else { #if defined(CONFIG_LWM2M_CARRIER) /* Wait for the LWM2M_CARRIER to configure the modem and * start the connection. */ printk("Waitng for carrier registration...\n"); k_sem_take(&carrier_registered, K_FOREVER); printk("Registered!\n"); #else /* defined(CONFIG_LWM2M_CARRIER) */ int err; printk("LTE Link Connecting ...\n"); err = lte_lc_init_and_connect(); __ASSERT(err == 0, "LTE link could not be established."); printk("LTE Link Connected!\n"); #endif /* defined(CONFIG_LWM2M_CARRIER) */ } #endif /* defined(CONFIG_LTE_LINK_CONTROL) */ } // Changes made here to try and make the function act like a timer void static mqttLoop(struct k_timer *timer_id) { printk("MQTT Sample started...\n"); int err; client_init(&client); err = mqtt_connect(&client); if (err != 0) { printk("ERROR: mqtt_connect %d\n", err); return; } err = fds_init(&client); if (err != 0) { printk("ERROR: fds_init %d\n", err); return; } err = poll(&fds, 1, K_SECONDS(CONFIG_MQTT_KEEPALIVE)); if (err < 0) { printk("ERROR: poll %d\n", errno); } err = mqtt_live(&client); if (err != 0) { printk("ERROR: mqtt_live %d\n", err); } if ((fds.revents & POLLIN) == POLLIN) { err = mqtt_input(&client); if (err != 0) { printk("ERROR: mqtt_input %d\n", err); } } if ((fds.revents & POLLERR) == POLLERR) { printk("POLLERR\n"); } if ((fds.revents & POLLNVAL) == POLLNVAL) { printk("POLLNVAL\n"); } char dPayload[100]; sprintf(dPayload,"sample data"); data_publish(&client,0,dPayload,strlen(dPayload)); k_sleep(K_SECONDS(3)); printk("Disconnecting MQTT client...\n"); err = mqtt_disconnect(&client); if (err) { printk("Could not disconnect MQTT client. Error: %d\n", err); } err = mqtt_live(&client); if (err != 0) { printk("ERROR: mqtt_live %d\n", err); } } K_TIMER_DEFINE(get_mqtt, mqttLoop, NULL); void main(void) { int i; int err; modem_configure(); err = lte_lc_psm_req(true); if (err) { printk("ERROR: set psm %d\n", err); return; } //k_timer_init(&my_mqtt_timer, mqttLoop, NULL); k_timer_start(&get_mqtt, K_SECONDS(30), K_SECONDS(3600)); /* while(true) { mqttLoop(); k_sleep(K_SECONDS(5)); } */ }
The code enters the function and gets stuck after the print line at the start "MQTT sample started".
Can you help me identify the issue and how it could be corrected ?
Regards,
Adeel