Hi!
I am currently playing around with the example combining uart and MQTT from this thread, and it seems to work okay. But when i try to send more than 10 characters over UART, the nrf9160dk just stop at the last character and therefore doesn't forward via MQTT.
With baudrate of 9600 I am able to send 10 bytes, while with highes ones, it caps at 8 bytes.
My code:
/*
* Copyright (c) 2018 Nordic Semiconductor ASA
*
* SPDX-License-Identifier: LicenseRef-BSD-5-Clause-Nordic
*/
#include <zephyr.h>
#include <stdio.h>
#include <drivers/uart.h>
#include <string.h>
#include <net/mqtt.h>
#include <net/socket.h>
#include <lte_lc.h>
#include <inttypes.h>
#if defined(CONFIG_LWM2M_CARRIER)
#include <lwm2m_carrier.h>
#endif
u8_t uart_buf[1024];
/* Buffers for MQTT client. */
static u8_t rx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
static u8_t tx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE];
static u8_t payload_buf[CONFIG_MQTT_PAYLOAD_BUFFER_SIZE];
/* The mqtt client struct */
static struct mqtt_client client;
/* MQTT Broker details. */
static struct sockaddr_storage broker;
/* Connected flag */
static bool connected;
/* File descriptor */
static struct pollfd fds;
#define SERIAL_Q_SIZE 5
struct work_data{
struct k_work work;
u8_t data[1024];
u8_t data_len;
}w_data[SERIAL_Q_SIZE];
#if defined(CONFIG_BSD_LIBRARY)
/**@brief Recoverable BSD library error. */
void bsd_recoverable_error_handler(uint32_t err)
{
printk("bsdlib recoverable error: %u\n", (unsigned int)err);
}
#endif /* defined(CONFIG_BSD_LIBRARY) */
#if defined(CONFIG_LWM2M_CARRIER)
K_SEM_DEFINE(carrier_registered, 0, 1);
void lwm2m_carrier_event_handler(const lwm2m_carrier_event_t *event)
{
switch (event->type) {
case LWM2M_CARRIER_EVENT_BSDLIB_INIT:
printk("LWM2M_CARRIER_EVENT_BSDLIB_INIT\n");
break;
case LWM2M_CARRIER_EVENT_CONNECT:
printk("LWM2M_CARRIER_EVENT_CONNECT\n");
break;
case LWM2M_CARRIER_EVENT_DISCONNECT:
printk("LWM2M_CARRIER_EVENT_DISCONNECT\n");
break;
case LWM2M_CARRIER_EVENT_READY:
printk("LWM2M_CARRIER_EVENT_READY\n");
k_sem_give(&carrier_registered);
break;
case LWM2M_CARRIER_EVENT_FOTA_START:
printk("LWM2M_CARRIER_EVENT_FOTA_START\n");
break;
case LWM2M_CARRIER_EVENT_REBOOT:
printk("LWM2M_CARRIER_EVENT_REBOOT\n");
break;
}
}
#endif /* defined(CONFIG_LWM2M_CARRIER) */
/**@brief Function to print strings without null-termination
*/
static void data_print(u8_t *prefix, u8_t *data, size_t len)
{
char buf[len + 1];
memcpy(buf, data, len);
buf[len] = 0;
printk("%s%s\n", prefix, buf);
}
/**@brief Function to publish data on the configured topic
*/
static int data_publish(struct mqtt_client *c, enum mqtt_qos qos,
u8_t *data, size_t len)
{
struct mqtt_publish_param param;
param.message.topic.qos = qos;
param.message.topic.topic.utf8 = CONFIG_MQTT_PUB_TOPIC;
param.message.topic.topic.size = strlen(CONFIG_MQTT_PUB_TOPIC);
param.message.payload.data = data;
param.message.payload.len = len;
param.message_id = sys_rand32_get();
param.dup_flag = 0;
param.retain_flag = 0;
data_print("Publishing: ", data, len);
printk("to topic: %s len: %u\n",
CONFIG_MQTT_PUB_TOPIC,
(unsigned int)strlen(CONFIG_MQTT_PUB_TOPIC));
return mqtt_publish(c, ¶m);
}
/**@brief Function to subscribe to the configured topic
*/
static int subscribe(void)
{
struct mqtt_topic subscribe_topic = {
.topic = {
.utf8 = CONFIG_MQTT_SUB_TOPIC,
.size = strlen(CONFIG_MQTT_SUB_TOPIC)
},
.qos = MQTT_QOS_1_AT_LEAST_ONCE
};
const struct mqtt_subscription_list subscription_list = {
.list = &subscribe_topic,
.list_count = 1,
.message_id = 1234
};
printk("Subscribing to: %s len %u\n", CONFIG_MQTT_SUB_TOPIC,
(unsigned int)strlen(CONFIG_MQTT_SUB_TOPIC));
return mqtt_subscribe(&client, &subscription_list);
}
/**@brief Function to read the published payload.
*/
static int publish_get_payload(struct mqtt_client *c, size_t length)
{
u8_t *buf = payload_buf;
u8_t *end = buf + length;
if (length > sizeof(payload_buf)) {
return -EMSGSIZE;
}
while (buf < end) {
int ret = mqtt_read_publish_payload(c, buf, end - buf);
if (ret < 0) {
int err;
if (ret != -EAGAIN) {
return ret;
}
printk("mqtt_read_publish_payload: EAGAIN\n");
err = poll(&fds, 1, K_SECONDS(CONFIG_MQTT_KEEPALIVE));
if (err > 0 && (fds.revents & POLLIN) == POLLIN) {
continue;
} else {
return -EIO;
}
}
if (ret == 0) {
return -EIO;
}
buf += ret;
}
return 0;
}
/**@brief MQTT client event handler
*/
void mqtt_evt_handler(struct mqtt_client *const c,
const struct mqtt_evt *evt)
{
int err;
switch (evt->type) {
case MQTT_EVT_CONNACK:
if (evt->result != 0) {
printk("MQTT connect failed %d\n", evt->result);
break;
}
connected = true;
printk("[%s:%d] MQTT client connected!\n", __func__, __LINE__);
subscribe();
break;
case MQTT_EVT_DISCONNECT:
printk("[%s:%d] MQTT client disconnected %d\n", __func__,
__LINE__, evt->result);
connected = false;
break;
case MQTT_EVT_PUBLISH: {
const struct mqtt_publish_param *p = &evt->param.publish;
printk("[%s:%d] MQTT PUBLISH result=%d len=%d\n", __func__,
__LINE__, evt->result, p->message.payload.len);
err = publish_get_payload(c, p->message.payload.len);
if (err >= 0) {
data_print("Received: ", payload_buf,
p->message.payload.len);
/* Echo back received data */
data_publish(&client, MQTT_QOS_1_AT_LEAST_ONCE,
payload_buf, p->message.payload.len);
} else {
printk("mqtt_read_publish_payload: Failed! %d\n", err);
printk("Disconnecting MQTT client...\n");
err = mqtt_disconnect(c);
if (err) {
printk("Could not disconnect: %d\n", err);
}
}
} break;
case MQTT_EVT_PUBACK:
if (evt->result != 0) {
printk("MQTT PUBACK error %d\n", evt->result);
break;
}
printk("[%s:%d] PUBACK packet id: %u\n", __func__, __LINE__,
evt->param.puback.message_id);
break;
case MQTT_EVT_SUBACK:
if (evt->result != 0) {
printk("MQTT SUBACK error %d\n", evt->result);
break;
}
printk("[%s:%d] SUBACK packet id: %u\n", __func__, __LINE__,
evt->param.suback.message_id);
break;
default:
printk("[%s:%d] default: %d\n", __func__, __LINE__,
evt->type);
break;
}
}
/**@brief Resolves the configured hostname and
* initializes the MQTT broker structure
*/
static void broker_init(void)
{
int err;
struct addrinfo *result;
struct addrinfo *addr;
struct addrinfo hints = {
.ai_family = AF_INET,
.ai_socktype = SOCK_STREAM
};
err = getaddrinfo(CONFIG_MQTT_BROKER_HOSTNAME, NULL, &hints, &result);
if (err) {
printk("ERROR: getaddrinfo failed %d\n", err);
return;
}
addr = result;
err = -ENOENT;
/* Look for address of the broker. */
while (addr != NULL) {
/* IPv4 Address. */
if (addr->ai_addrlen == sizeof(struct sockaddr_in)) {
struct sockaddr_in *broker4 =
((struct sockaddr_in *)&broker);
char ipv4_addr[NET_IPV4_ADDR_LEN];
broker4->sin_addr.s_addr =
((struct sockaddr_in *)addr->ai_addr)
->sin_addr.s_addr;
broker4->sin_family = AF_INET;
broker4->sin_port = htons(CONFIG_MQTT_BROKER_PORT);
inet_ntop(AF_INET, &broker4->sin_addr.s_addr,
ipv4_addr, sizeof(ipv4_addr));
printk("IPv4 Address found %s\n", ipv4_addr);
break;
} else {
printk("ai_addrlen = %u should be %u or %u\n",
(unsigned int)addr->ai_addrlen,
(unsigned int)sizeof(struct sockaddr_in),
(unsigned int)sizeof(struct sockaddr_in6));
}
addr = addr->ai_next;
break;
}
/* Free the address. */
freeaddrinfo(result);
}
/**@brief Initialize the MQTT client structure
*/
static void client_init(struct mqtt_client *client)
{
mqtt_client_init(client);
broker_init();
/* MQTT client configuration */
client->broker = &broker;
client->evt_cb = mqtt_evt_handler;
client->client_id.utf8 = (u8_t *)CONFIG_MQTT_CLIENT_ID;
client->client_id.size = strlen(CONFIG_MQTT_CLIENT_ID);
client->password = NULL;
client->user_name = NULL;
/*client->password->utf8 = (u8_t *)CONFIG_MQTT_BROKER_PASSWORD;
client->password->size = strlen(CONFIG_MQTT_BROKER_PASSWORD);
client->user_name->utf8 = (u8_t *)CONFIG_MQTT_BROKER_USERNAME;
client->user_name->size = strlen(CONFIG_MQTT_BROKER_USERNAME);*/
client->protocol_version = MQTT_VERSION_3_1_1;
/* MQTT buffers configuration */
client->rx_buf = rx_buffer;
client->rx_buf_size = sizeof(rx_buffer);
client->tx_buf = tx_buffer;
client->tx_buf_size = sizeof(tx_buffer);
/* MQTT transport configuration */
client->transport.type = MQTT_TRANSPORT_NON_SECURE;
}
/**@brief Initialize the file descriptor structure used by poll.
*/
static int fds_init(struct mqtt_client *c)
{
if (c->transport.type == MQTT_TRANSPORT_NON_SECURE) {
fds.fd = c->transport.tcp.sock;
} else {
#if defined(CONFIG_MQTT_LIB_TLS)
fds.fd = c->transport.tls.sock;
#else
return -ENOTSUP;
#endif
}
fds.events = POLLIN;
return 0;
}
/**@brief Configures modem to provide LTE link. Blocks until link is
* successfully established.
*/
static void modem_configure(void)
{
#if defined(CONFIG_LTE_LINK_CONTROL)
if (IS_ENABLED(CONFIG_LTE_AUTO_INIT_AND_CONNECT)) {
/* Do nothing, modem is already turned on
* and connected.
*/
} else {
#if defined(CONFIG_LWM2M_CARRIER)
/* Wait for the LWM2M_CARRIER to configure the modem and
* start the connection.
*/
printk("Waitng for carrier registration...\n");
k_sem_take(&carrier_registered, K_FOREVER);
printk("Registered!\n");
#else /* defined(CONFIG_LWM2M_CARRIER) */
int err;
printk("LTE Link Connecting ...\n");
err = lte_lc_init_and_connect();
__ASSERT(err == 0, "LTE link could not be established.");
printk("LTE Link Connected!\n");
#endif /* defined(CONFIG_LWM2M_CARRIER) */
}
#endif /* defined(CONFIG_LTE_LINK_CONTROL) */
}
void pub_uart_mqtt(struct k_work *item)
{
struct work_data *wrk_data = CONTAINER_OF(item, struct work_data, work);
printk("Data: %s\n", wrk_data->data);
printk("Length of data:%d\n",wrk_data->data_len);
data_publish(&client, MQTT_QOS_1_AT_LEAST_ONCE, wrk_data->data, wrk_data->data_len);
//printk("Got error on device %s\n", the_device->name);
}
void uart_cb(struct device *x)
{
uart_irq_update(x);
int data_length = 0;
static int pos, serial_in_count, tot_len = 0;
if (uart_irq_rx_ready(x)) {
data_length = uart_fifo_read(x, uart_buf, sizeof(uart_buf));
tot_len = tot_len + data_length;
memcpy(&w_data[serial_in_count].data[pos], uart_buf, data_length);
printk("Position is: %d\n", pos);
//Check if last char is 'LF'
if(uart_buf[data_length-1] == 10){
printk("last char is 'LF', call k_work_submit(), word number %d \n", serial_in_count);
w_data[serial_in_count].data_len = tot_len - 1;
k_work_submit(&w_data[serial_in_count].work);
pos = 0;
tot_len = 0;
if(serial_in_count < SERIAL_Q_SIZE-1){
serial_in_count++;
}else{
serial_in_count=0;
}
}else{
printk("last char is: %d\n", uart_buf[data_length-1]);
pos = pos + data_length;
}
}
//printk("%s", uart_buf);
}
void main(void)
{
bool start = true;
int err;
printk("The MQTT simple modified sample started\n");
for(int i=0; i < SERIAL_Q_SIZE; i++){
k_work_init(&w_data[i].work, pub_uart_mqtt);
}
struct device *uart = device_get_binding("UART_2");
uart_irq_callback_set(uart, uart_cb);
uart_irq_rx_enable(uart);
modem_configure();
client_init(&client);
err = mqtt_connect(&client);
if (err != 0) {
printk("ERROR: mqtt_connect %d\n", err);
return;
}
err = fds_init(&client);
if (err != 0) {
printk("ERROR: fds_init %d\n", err);
return;
}
while (1) {
err = poll(&fds, 1, mqtt_keepalive_time_left(&client));
if (err < 0) {
printk("ERROR: poll %d\n", errno);
break;
}
err = mqtt_live(&client);
if ((err != 0) && (err != -EAGAIN)) {
printk("ERROR: mqtt_live %d\n", err);
break;
}
if ((fds.revents & POLLIN) == POLLIN) {
err = mqtt_input(&client);
if (err != 0) {
printk("ERROR: mqtt_input %d\n", err);
break;
}
}
if ((fds.revents & POLLERR) == POLLERR) {
printk("POLLERR\n");
break;
}
if ((fds.revents & POLLNVAL) == POLLNVAL) {
printk("POLLNVAL\n");
break;
}
if (start){
data_publish(&client, MQTT_QOS_1_AT_LEAST_ONCE, "started", strlen("started"));
start = false;
}
}
printk("Disconnecting MQTT client...\n");
err = mqtt_disconnect(&client);
if (err) {
printk("Could not disconnect MQTT client. Error: %d\n", err);
}
}
My prj.conf:
# # Copyright (c) 2019 Nordic Semiconductor ASA # # SPDX-License-Identifier: LicenseRef-BSD-5-Clause-Nordic # # Networking CONFIG_NETWORKING=y CONFIG_NET_NATIVE=n CONFIG_NET_SOCKETS_OFFLOAD=y CONFIG_NET_SOCKETS=y CONFIG_NET_SOCKETS_POSIX_NAMES=y # LTE link control CONFIG_LTE_LINK_CONTROL=y CONFIG_LTE_AUTO_INIT_AND_CONNECT=n # BSD library CONFIG_BSD_LIBRARY=y # AT Host CONFIG_UART_INTERRUPT_DRIVEN=y CONFIG_AT_HOST_LIBRARY=y # MQTT CONFIG_MQTT_LIB=y CONFIG_MQTT_LIB_TLS=n # Appliaction #CONFIG_MQTT_PUB_TOPIC="/my/publish/topic" #CONFIG_MQTT_SUB_TOPIC="/my/subscribe/topic" #CONFIG_MQTT_CLIENT_ID="my-client-id" #CONFIG_MQTT_BROKER_HOSTNAME="mqtt.eclipse.org" #CONFIG_MQTT_BROKER_PORT=1883 # Main thread CONFIG_MAIN_STACK_SIZE=4096 CONFIG_HEAP_MEM_POOL_SIZE=2048 CONFIG_SERIAL=y CONFIG_UART_INTERRUPT_DRIVEN=y CONFIG_UART_2_NRF_UARTE=y
My overlay:
&uart2 {
status = "okay";
current-speed = <9600>;
tx-pin = <11>;
rx-pin = <10>;
rts-pin = <0xFFFFFFFF>;
cts-pin = <0xFFFFFFFF>;
};
Hope you can help me.
Thanks in advance.