Hi!
I am currently playing around with the example combining uart and MQTT from this thread, and it seems to work okay. But when i try to send more than 10 characters over UART, the nrf9160dk just stop at the last character and therefore doesn't forward via MQTT.
With baudrate of 9600 I am able to send 10 bytes, while with highes ones, it caps at 8 bytes.
My code:
/* * Copyright (c) 2018 Nordic Semiconductor ASA * * SPDX-License-Identifier: LicenseRef-BSD-5-Clause-Nordic */ #include <zephyr.h> #include <stdio.h> #include <drivers/uart.h> #include <string.h> #include <net/mqtt.h> #include <net/socket.h> #include <lte_lc.h> #include <inttypes.h> #if defined(CONFIG_LWM2M_CARRIER) #include <lwm2m_carrier.h> #endif u8_t uart_buf[1024]; /* Buffers for MQTT client. */ static u8_t rx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE]; static u8_t tx_buffer[CONFIG_MQTT_MESSAGE_BUFFER_SIZE]; static u8_t payload_buf[CONFIG_MQTT_PAYLOAD_BUFFER_SIZE]; /* The mqtt client struct */ static struct mqtt_client client; /* MQTT Broker details. */ static struct sockaddr_storage broker; /* Connected flag */ static bool connected; /* File descriptor */ static struct pollfd fds; #define SERIAL_Q_SIZE 5 struct work_data{ struct k_work work; u8_t data[1024]; u8_t data_len; }w_data[SERIAL_Q_SIZE]; #if defined(CONFIG_BSD_LIBRARY) /**@brief Recoverable BSD library error. */ void bsd_recoverable_error_handler(uint32_t err) { printk("bsdlib recoverable error: %u\n", (unsigned int)err); } #endif /* defined(CONFIG_BSD_LIBRARY) */ #if defined(CONFIG_LWM2M_CARRIER) K_SEM_DEFINE(carrier_registered, 0, 1); void lwm2m_carrier_event_handler(const lwm2m_carrier_event_t *event) { switch (event->type) { case LWM2M_CARRIER_EVENT_BSDLIB_INIT: printk("LWM2M_CARRIER_EVENT_BSDLIB_INIT\n"); break; case LWM2M_CARRIER_EVENT_CONNECT: printk("LWM2M_CARRIER_EVENT_CONNECT\n"); break; case LWM2M_CARRIER_EVENT_DISCONNECT: printk("LWM2M_CARRIER_EVENT_DISCONNECT\n"); break; case LWM2M_CARRIER_EVENT_READY: printk("LWM2M_CARRIER_EVENT_READY\n"); k_sem_give(&carrier_registered); break; case LWM2M_CARRIER_EVENT_FOTA_START: printk("LWM2M_CARRIER_EVENT_FOTA_START\n"); break; case LWM2M_CARRIER_EVENT_REBOOT: printk("LWM2M_CARRIER_EVENT_REBOOT\n"); break; } } #endif /* defined(CONFIG_LWM2M_CARRIER) */ /**@brief Function to print strings without null-termination */ static void data_print(u8_t *prefix, u8_t *data, size_t len) { char buf[len + 1]; memcpy(buf, data, len); buf[len] = 0; printk("%s%s\n", prefix, buf); } /**@brief Function to publish data on the configured topic */ static int data_publish(struct mqtt_client *c, enum mqtt_qos qos, u8_t *data, size_t len) { struct mqtt_publish_param param; param.message.topic.qos = qos; param.message.topic.topic.utf8 = CONFIG_MQTT_PUB_TOPIC; param.message.topic.topic.size = strlen(CONFIG_MQTT_PUB_TOPIC); param.message.payload.data = data; param.message.payload.len = len; param.message_id = sys_rand32_get(); param.dup_flag = 0; param.retain_flag = 0; data_print("Publishing: ", data, len); printk("to topic: %s len: %u\n", CONFIG_MQTT_PUB_TOPIC, (unsigned int)strlen(CONFIG_MQTT_PUB_TOPIC)); return mqtt_publish(c, ¶m); } /**@brief Function to subscribe to the configured topic */ static int subscribe(void) { struct mqtt_topic subscribe_topic = { .topic = { .utf8 = CONFIG_MQTT_SUB_TOPIC, .size = strlen(CONFIG_MQTT_SUB_TOPIC) }, .qos = MQTT_QOS_1_AT_LEAST_ONCE }; const struct mqtt_subscription_list subscription_list = { .list = &subscribe_topic, .list_count = 1, .message_id = 1234 }; printk("Subscribing to: %s len %u\n", CONFIG_MQTT_SUB_TOPIC, (unsigned int)strlen(CONFIG_MQTT_SUB_TOPIC)); return mqtt_subscribe(&client, &subscription_list); } /**@brief Function to read the published payload. */ static int publish_get_payload(struct mqtt_client *c, size_t length) { u8_t *buf = payload_buf; u8_t *end = buf + length; if (length > sizeof(payload_buf)) { return -EMSGSIZE; } while (buf < end) { int ret = mqtt_read_publish_payload(c, buf, end - buf); if (ret < 0) { int err; if (ret != -EAGAIN) { return ret; } printk("mqtt_read_publish_payload: EAGAIN\n"); err = poll(&fds, 1, K_SECONDS(CONFIG_MQTT_KEEPALIVE)); if (err > 0 && (fds.revents & POLLIN) == POLLIN) { continue; } else { return -EIO; } } if (ret == 0) { return -EIO; } buf += ret; } return 0; } /**@brief MQTT client event handler */ void mqtt_evt_handler(struct mqtt_client *const c, const struct mqtt_evt *evt) { int err; switch (evt->type) { case MQTT_EVT_CONNACK: if (evt->result != 0) { printk("MQTT connect failed %d\n", evt->result); break; } connected = true; printk("[%s:%d] MQTT client connected!\n", __func__, __LINE__); subscribe(); break; case MQTT_EVT_DISCONNECT: printk("[%s:%d] MQTT client disconnected %d\n", __func__, __LINE__, evt->result); connected = false; break; case MQTT_EVT_PUBLISH: { const struct mqtt_publish_param *p = &evt->param.publish; printk("[%s:%d] MQTT PUBLISH result=%d len=%d\n", __func__, __LINE__, evt->result, p->message.payload.len); err = publish_get_payload(c, p->message.payload.len); if (err >= 0) { data_print("Received: ", payload_buf, p->message.payload.len); /* Echo back received data */ data_publish(&client, MQTT_QOS_1_AT_LEAST_ONCE, payload_buf, p->message.payload.len); } else { printk("mqtt_read_publish_payload: Failed! %d\n", err); printk("Disconnecting MQTT client...\n"); err = mqtt_disconnect(c); if (err) { printk("Could not disconnect: %d\n", err); } } } break; case MQTT_EVT_PUBACK: if (evt->result != 0) { printk("MQTT PUBACK error %d\n", evt->result); break; } printk("[%s:%d] PUBACK packet id: %u\n", __func__, __LINE__, evt->param.puback.message_id); break; case MQTT_EVT_SUBACK: if (evt->result != 0) { printk("MQTT SUBACK error %d\n", evt->result); break; } printk("[%s:%d] SUBACK packet id: %u\n", __func__, __LINE__, evt->param.suback.message_id); break; default: printk("[%s:%d] default: %d\n", __func__, __LINE__, evt->type); break; } } /**@brief Resolves the configured hostname and * initializes the MQTT broker structure */ static void broker_init(void) { int err; struct addrinfo *result; struct addrinfo *addr; struct addrinfo hints = { .ai_family = AF_INET, .ai_socktype = SOCK_STREAM }; err = getaddrinfo(CONFIG_MQTT_BROKER_HOSTNAME, NULL, &hints, &result); if (err) { printk("ERROR: getaddrinfo failed %d\n", err); return; } addr = result; err = -ENOENT; /* Look for address of the broker. */ while (addr != NULL) { /* IPv4 Address. */ if (addr->ai_addrlen == sizeof(struct sockaddr_in)) { struct sockaddr_in *broker4 = ((struct sockaddr_in *)&broker); char ipv4_addr[NET_IPV4_ADDR_LEN]; broker4->sin_addr.s_addr = ((struct sockaddr_in *)addr->ai_addr) ->sin_addr.s_addr; broker4->sin_family = AF_INET; broker4->sin_port = htons(CONFIG_MQTT_BROKER_PORT); inet_ntop(AF_INET, &broker4->sin_addr.s_addr, ipv4_addr, sizeof(ipv4_addr)); printk("IPv4 Address found %s\n", ipv4_addr); break; } else { printk("ai_addrlen = %u should be %u or %u\n", (unsigned int)addr->ai_addrlen, (unsigned int)sizeof(struct sockaddr_in), (unsigned int)sizeof(struct sockaddr_in6)); } addr = addr->ai_next; break; } /* Free the address. */ freeaddrinfo(result); } /**@brief Initialize the MQTT client structure */ static void client_init(struct mqtt_client *client) { mqtt_client_init(client); broker_init(); /* MQTT client configuration */ client->broker = &broker; client->evt_cb = mqtt_evt_handler; client->client_id.utf8 = (u8_t *)CONFIG_MQTT_CLIENT_ID; client->client_id.size = strlen(CONFIG_MQTT_CLIENT_ID); client->password = NULL; client->user_name = NULL; /*client->password->utf8 = (u8_t *)CONFIG_MQTT_BROKER_PASSWORD; client->password->size = strlen(CONFIG_MQTT_BROKER_PASSWORD); client->user_name->utf8 = (u8_t *)CONFIG_MQTT_BROKER_USERNAME; client->user_name->size = strlen(CONFIG_MQTT_BROKER_USERNAME);*/ client->protocol_version = MQTT_VERSION_3_1_1; /* MQTT buffers configuration */ client->rx_buf = rx_buffer; client->rx_buf_size = sizeof(rx_buffer); client->tx_buf = tx_buffer; client->tx_buf_size = sizeof(tx_buffer); /* MQTT transport configuration */ client->transport.type = MQTT_TRANSPORT_NON_SECURE; } /**@brief Initialize the file descriptor structure used by poll. */ static int fds_init(struct mqtt_client *c) { if (c->transport.type == MQTT_TRANSPORT_NON_SECURE) { fds.fd = c->transport.tcp.sock; } else { #if defined(CONFIG_MQTT_LIB_TLS) fds.fd = c->transport.tls.sock; #else return -ENOTSUP; #endif } fds.events = POLLIN; return 0; } /**@brief Configures modem to provide LTE link. Blocks until link is * successfully established. */ static void modem_configure(void) { #if defined(CONFIG_LTE_LINK_CONTROL) if (IS_ENABLED(CONFIG_LTE_AUTO_INIT_AND_CONNECT)) { /* Do nothing, modem is already turned on * and connected. */ } else { #if defined(CONFIG_LWM2M_CARRIER) /* Wait for the LWM2M_CARRIER to configure the modem and * start the connection. */ printk("Waitng for carrier registration...\n"); k_sem_take(&carrier_registered, K_FOREVER); printk("Registered!\n"); #else /* defined(CONFIG_LWM2M_CARRIER) */ int err; printk("LTE Link Connecting ...\n"); err = lte_lc_init_and_connect(); __ASSERT(err == 0, "LTE link could not be established."); printk("LTE Link Connected!\n"); #endif /* defined(CONFIG_LWM2M_CARRIER) */ } #endif /* defined(CONFIG_LTE_LINK_CONTROL) */ } void pub_uart_mqtt(struct k_work *item) { struct work_data *wrk_data = CONTAINER_OF(item, struct work_data, work); printk("Data: %s\n", wrk_data->data); printk("Length of data:%d\n",wrk_data->data_len); data_publish(&client, MQTT_QOS_1_AT_LEAST_ONCE, wrk_data->data, wrk_data->data_len); //printk("Got error on device %s\n", the_device->name); } void uart_cb(struct device *x) { uart_irq_update(x); int data_length = 0; static int pos, serial_in_count, tot_len = 0; if (uart_irq_rx_ready(x)) { data_length = uart_fifo_read(x, uart_buf, sizeof(uart_buf)); tot_len = tot_len + data_length; memcpy(&w_data[serial_in_count].data[pos], uart_buf, data_length); printk("Position is: %d\n", pos); //Check if last char is 'LF' if(uart_buf[data_length-1] == 10){ printk("last char is 'LF', call k_work_submit(), word number %d \n", serial_in_count); w_data[serial_in_count].data_len = tot_len - 1; k_work_submit(&w_data[serial_in_count].work); pos = 0; tot_len = 0; if(serial_in_count < SERIAL_Q_SIZE-1){ serial_in_count++; }else{ serial_in_count=0; } }else{ printk("last char is: %d\n", uart_buf[data_length-1]); pos = pos + data_length; } } //printk("%s", uart_buf); } void main(void) { bool start = true; int err; printk("The MQTT simple modified sample started\n"); for(int i=0; i < SERIAL_Q_SIZE; i++){ k_work_init(&w_data[i].work, pub_uart_mqtt); } struct device *uart = device_get_binding("UART_2"); uart_irq_callback_set(uart, uart_cb); uart_irq_rx_enable(uart); modem_configure(); client_init(&client); err = mqtt_connect(&client); if (err != 0) { printk("ERROR: mqtt_connect %d\n", err); return; } err = fds_init(&client); if (err != 0) { printk("ERROR: fds_init %d\n", err); return; } while (1) { err = poll(&fds, 1, mqtt_keepalive_time_left(&client)); if (err < 0) { printk("ERROR: poll %d\n", errno); break; } err = mqtt_live(&client); if ((err != 0) && (err != -EAGAIN)) { printk("ERROR: mqtt_live %d\n", err); break; } if ((fds.revents & POLLIN) == POLLIN) { err = mqtt_input(&client); if (err != 0) { printk("ERROR: mqtt_input %d\n", err); break; } } if ((fds.revents & POLLERR) == POLLERR) { printk("POLLERR\n"); break; } if ((fds.revents & POLLNVAL) == POLLNVAL) { printk("POLLNVAL\n"); break; } if (start){ data_publish(&client, MQTT_QOS_1_AT_LEAST_ONCE, "started", strlen("started")); start = false; } } printk("Disconnecting MQTT client...\n"); err = mqtt_disconnect(&client); if (err) { printk("Could not disconnect MQTT client. Error: %d\n", err); } }
My prj.conf:
# # Copyright (c) 2019 Nordic Semiconductor ASA # # SPDX-License-Identifier: LicenseRef-BSD-5-Clause-Nordic # # Networking CONFIG_NETWORKING=y CONFIG_NET_NATIVE=n CONFIG_NET_SOCKETS_OFFLOAD=y CONFIG_NET_SOCKETS=y CONFIG_NET_SOCKETS_POSIX_NAMES=y # LTE link control CONFIG_LTE_LINK_CONTROL=y CONFIG_LTE_AUTO_INIT_AND_CONNECT=n # BSD library CONFIG_BSD_LIBRARY=y # AT Host CONFIG_UART_INTERRUPT_DRIVEN=y CONFIG_AT_HOST_LIBRARY=y # MQTT CONFIG_MQTT_LIB=y CONFIG_MQTT_LIB_TLS=n # Appliaction #CONFIG_MQTT_PUB_TOPIC="/my/publish/topic" #CONFIG_MQTT_SUB_TOPIC="/my/subscribe/topic" #CONFIG_MQTT_CLIENT_ID="my-client-id" #CONFIG_MQTT_BROKER_HOSTNAME="mqtt.eclipse.org" #CONFIG_MQTT_BROKER_PORT=1883 # Main thread CONFIG_MAIN_STACK_SIZE=4096 CONFIG_HEAP_MEM_POOL_SIZE=2048 CONFIG_SERIAL=y CONFIG_UART_INTERRUPT_DRIVEN=y CONFIG_UART_2_NRF_UARTE=y
My overlay:
&uart2 { status = "okay"; current-speed = <9600>; tx-pin = <11>; rx-pin = <10>; rts-pin = <0xFFFFFFFF>; cts-pin = <0xFFFFFFFF>; };
Hope you can help me.
Thanks in advance.