UART Asynch API

Hello there,

I am trying to solve the following issue. On the UART I will be receiving a stream of messages. These messages can be each of different lengths. The length of the message is parsed later. So as I am receiving these messages, the first receive buffer is being filled up with data and at some point the buffer will be switched with a second one as per the documentation. As much as I understand this choice I am wondering if there is a way to reset the received data offset, other than disabling and re-enabling the UART.  

struct uart_event_rx {
	/** @brief Pointer to current buffer. */
	uint8_t *buf;
	/** @brief Currently received data offset in bytes. */
	size_t offset;
	/** @brief Number of new bytes received. */
	size_t len;
};

Thank you!
  • On a second question, is there a different method for receiving the serial stream? I have went for the UART Asynch API as it has been recommended in the DevAcademy training

  • In a sense my issue very similar to this one: (+) UART async UART_RX_RDY timeout - Nordic Q&A - Nordic DevZone - Nordic DevZone (nordicsemi.com) .
    On a third question, what should be the UART data protocol look like in order to reliably use the UART Aynch API? The thread is mentioning SOF/EOF characters, but in my case these can be part of the payload. It would mean having a LENGTH field in order to read a certain amount of characters. If so, do you have an example that could showcase this?

  • I am wondering if there is a way to reset the received data offset

    So I can understand the question better:
    Can you explain why you want to reset the offset?

    RadicalTomato said:
    is there a different method for receiving the serial stream?

    The answer here is "yes multiple". For example, you can use nrfx drivers directly to write your own UART handler.
    Or we have lpuart.

    But the one which is most frequently used is the Zephyr UART async API, so unless you have specific needs, I think I would suggest that one.

    RadicalTomato said:
    On a third question, what should be the UART data protocol look like in order to reliably use the UART Aynch API? The thread is mentioning SOF/EOF characters, but in my case these can be part of the payload. It would mean having a LENGTH field in order to read a certain amount of characters. If so, do you have an example that could showcase this?

    Let's get back to this after you have answered my question above

  • Hei  ,

    Reason for resetting the offset is to be able to "prepare" the buffer to receive another message, of varying length. I mean resetting the offset as soon as the contents of the receive buffer has been read is the desired feature. 
    The behaviour I noticed in the USART Asynch API is that every single time a new message is being received (and by received, the UART_RX_RDY event is triggered ... and yes, I am aware this event gets triggered for 2 different type of reasons; and maybe here lies the core of the issue, that the same event is triggered for two different types of events), the offset is being increased with the received message length. Eventually the offset+length will equal the receive buffer, but assume there is more data available from the UART comm. 

    What is happening for this case is the series of events below:

    1. UART_RX_RDY - current buffer filled up
    2. UART_RX_BUF_RELEASED - as the current buffer has been filled up it is being released
    3. UART_RX_BUF_REQUEST - as there has been a release event a new buffer is being assigned
    4. UART_RX_RDY - the rest of the data is being received in the new buffer

    (code below is based on this example: GitHub - too1/ncs-uart-async-count-rx )

    /*
     * Copyright (c) 2016 Intel Corporation
     *
     * SPDX-License-Identifier: Apache-2.0
     */
    
    #include <zephyr/kernel.h>
    #include <zephyr/device.h>
    #include <zephyr/sys/ring_buffer.h>
    #include <zephyr/drivers/gpio.h>
    #include <zephyr/drivers/uart.h>
    #include <string.h>
    #include <zephyr/logging/log.h>
    
    #define UART_BUF_SIZE		8
    #define UART_QUEUE_MSG_SIZE		16
    #define UART_TX_TIMEOUT_MS	100
    #define UART_RX_TIMEOUT_MS	100
    
    K_SEM_DEFINE(tx_done, 1, 1);
    K_SEM_DEFINE(rx_disabled, 0, 1);
    K_SEM_DEFINE(rx_rdy, 0, 1);
    
    #define UART_TX_BUF_SIZE  		256
    #define UART_RX_MSG_QUEUE_SIZE	8
    struct uart_msg_queue_item {
    	uint8_t bytes[UART_QUEUE_MSG_SIZE];
    	uint32_t length;
    };
    
    // UART TX fifo
    RING_BUF_DECLARE(app_tx_fifo, UART_TX_BUF_SIZE);
    volatile int bytes_claimed;
    
    LOG_MODULE_REGISTER(app,LOG_LEVEL_INF);
    
    // UART RX primary buffers
    uint8_t uart_double_buffer[2][UART_BUF_SIZE];
    uint8_t *uart_buf_next = uart_double_buffer[1];
    
    static const struct device *const dev_uart = DEVICE_DT_GET(DT_NODELABEL(uart0));
    
    static void app_uart_init(void);
    
    // UART RX message queue
    K_MSGQ_DEFINE(uart_rx_msgq, sizeof(struct uart_msg_queue_item), UART_RX_MSG_QUEUE_SIZE, 4);
    
    static int uart_tx_get_from_queue(void)
    {
    	uint8_t *data_ptr;
    	// Try to claim any available bytes in the FIFO
    	bytes_claimed = ring_buf_get_claim(&app_tx_fifo, &data_ptr, UART_TX_BUF_SIZE);
    
    	if(bytes_claimed > 0) {
    		// Start a UART transmission based on the number of available bytes
    		uart_tx(dev_uart, data_ptr, bytes_claimed, SYS_FOREVER_MS);
    	}
    	return bytes_claimed;
    }
    
    void app_uart_async_callback(const struct device *uart_dev,
    							 struct uart_event *evt, void *user_data)
    {
    	static struct uart_msg_queue_item new_message;
    	static bool send_to_queue = false;
    	static bool wait_for_more_data = false;
    
    	switch (evt->type) {
    		case UART_TX_DONE:
    			// Free up the written bytes in the TX FIFO
    			ring_buf_get_finish(&app_tx_fifo, bytes_claimed);
    
    			// If there is more data in the TX fifo, start the transmission
    			if(uart_tx_get_from_queue() == 0) {
    				// Or release the semaphore if the TX fifo is empty
    				k_sem_give(&tx_done);
    			}
    			break;
    		
    		case UART_RX_RDY:
    			//LOG_INF("Off: %d", evt->data.rx.offset);
    			//LOG_INF("Len: %d", evt->data.rx.len);
    			LOG_INF("R");
    			
    
    			if ((evt->data.rx.offset + evt->data.rx.len) < (UART_BUF_SIZE))
    			{
    				if (wait_for_more_data == false)
    				{
    			 		memcpy(new_message.bytes, evt->data.rx.buf + evt->data.rx.offset, evt->data.rx.len);
    			 		new_message.length = evt->data.rx.len;
    			 		send_to_queue = true;
    
    				}
    			 	else if (wait_for_more_data == true)
    			 	{
    			 		memcpy(new_message.bytes+new_message.length, evt->data.rx.buf + evt->data.rx.offset, evt->data.rx.len);
    					new_message.length = (new_message.length + evt->data.rx.len);
    					wait_for_more_data = false;
    					send_to_queue = true;
    			 	}
    			}
    			else
    			{
    				memcpy(new_message.bytes, evt->data.rx.buf + evt->data.rx.offset, evt->data.rx.len);
    				new_message.length = evt->data.rx.len;
    			}
    
    			if (true == send_to_queue)
    			{
    			 	if(k_msgq_put(&uart_rx_msgq, &new_message, K_NO_WAIT) != 0){
    			 		printk("Error: Uart RX message queue full!\n");
    				}
    				send_to_queue = false;
    				evt->data.rx.offset = 0;
    				//k_sem_give(&rx_rdy);
    			}
    			
    			break;
    		
    		case UART_RX_BUF_REQUEST:
    			uart_rx_buf_rsp(dev_uart, uart_buf_next, UART_BUF_SIZE);
    			wait_for_more_data = true;
    			LOG_INF("Q");
    			break;
    
    		case UART_RX_BUF_RELEASED:
    			//uart_buf_next = evt->data.rx_buf.buf;
    			uart_buf_next = ((uart_buf_next == uart_double_buffer[1]) ? uart_double_buffer[0] : uart_double_buffer[1]);
    			LOG_INF("L");
    			break;
    
    		case UART_RX_DISABLED:
    			LOG_INF("DIS");
    			//app_uart_init();
    			break;
    		
    		default:
    			break;
    	}
    }
    
    static void app_uart_init(void)
    {
    	if(!device_is_ready(dev_uart))
    	{
    		printk("UART Device not ready\r\n");
    		return;
    	}
    
    	uart_callback_set(dev_uart, app_uart_async_callback, NULL);
    	uart_rx_enable(dev_uart, uart_double_buffer[0], UART_BUF_SIZE, UART_RX_TIMEOUT_MS);
    }
    
    // Function to send UART data, by writing it to a ring buffer (FIFO) in the application
    // WARNING: This function is not thread safe! If you want to call this function from multiple threads a semaphore should be used
    static int app_uart_send(const uint8_t * data_ptr, uint32_t data_len)
    {
    	while(1) {
    		// Try to move the data into the TX ring buffer
    		uint32_t written_to_buf = ring_buf_put(&app_tx_fifo, data_ptr, data_len);
    		data_len -= written_to_buf;
    		
    		// In case the UART TX is idle, start transmission
    		if(k_sem_take(&tx_done, K_NO_WAIT) == 0) {
    			uart_tx_get_from_queue();
    		}	
    		
    		// In case all the data was written, exit the loop
    		if(data_len == 0) break;
    
    		// In case some data is still to be written, sleep for some time and run the loop one more time
    		k_msleep(10);
    		data_ptr += written_to_buf;
    	}
    
    	return 0;
    }
    
    void main(void)
    {
    	printk("UART Async example started\n");
    	
    	app_uart_init();
    
    	const uint8_t test_string[] = "Hello world through the UART async driver\r\n";
    	app_uart_send(test_string, strlen(test_string));
    
    	struct uart_msg_queue_item incoming_message;
    
    	while (1) {
    
    		// This function will not return until a new message is ready
    		k_msgq_get(&uart_rx_msgq, &incoming_message, K_FOREVER);
    
    		// Process the message here.
    		static uint8_t string_buffer[UART_QUEUE_MSG_SIZE + 1];
    		memcpy(string_buffer, incoming_message.bytes, incoming_message.length);
    		string_buffer[incoming_message.length] = 0;
    		LOG_INF("RX %i: %s", incoming_message.length, string_buffer);
    	}
    }
    
     

    CONFIG_SERIAL=y
    CONFIG_UART_ASYNC_API=y
    CONFIG_RING_BUFFER=y
    CONFIG_LOG=y
    CONFIG_LOG_MODE_MINIMAL=y
    
    #CONFIG_UART_0_NRF_HW_ASYNC=y
    #CONFIG_UART_0_NRF_HW_ASYNC_TIMER=1
    
    CONFIG_USE_SEGGER_RTT=y
    CONFIG_RTT_CONSOLE=y
    CONFIG_UART_CONSOLE=n
    CONFIG_LOG_BACKEND_RTT=y



    I chose a buffer size of 8 bytes and messages of length 3 bytes (but the message length is unknown and it can be anything, while 8 bytes for the buffer is to get the behavior to ). I am alternatively sending "abc" or "123". But as I mentioned, the messages can be of any length. As you can see, the last case (which is the worse thing that can happen), when the received amount of data, matches the amount of data available in the buffer, there will be only 3 events:

    1. UART_RX_RDY - current buffer filled up
    2. UART_RX_BUF_RELEASED - as the current buffer has been filled up it is being released
    3. UART_RX_BUF_REQUEST - as there has been a release event a new buffer is being assigned

    ... and once more data is being sent over the UART, there is a UART_RX_RDY event, hence the all data can be read. 
    I hope this somehow explains the need:
    • to reset the offset
    • or provide extra information as to what cause the UART_RX_RDY event, such as a TIMEOUT event. 

    I had an initial look at the lpuart example. It is closer to what I was having in mind, but the example is basic. IMO it would need a circular buffer to place the received data for further processing. However since I am a newbie when it comes to Zephyr I do lack of the clarity at the moment and request further help. 
    Thanks! 

  • I am still looking into this, but have not gotten anything yet. I will try to give you an answer tomorrow or monday.
    Bonus: on monday the creator of the https://github.com/too1/ncs-uart-async-count-rx sample is back in office, so I can ask him for help then.

Related