Why 10 KB payload takes ~6 seconds, but 6 bytes takes ~1 second

/*
 * Copyright (c) 2023 Lucas Dietrich <[email protected]>
 *
 * SPDX-License-Identifier: Apache-2.0
 */

#include "creds/creds.h"
#include "dhcp.h"

#include <errno.h>
#include <stdio.h>
#include <stdlib.h>

#include <zephyr/net/socket.h>
#include <zephyr/net/dns_resolve.h>
#include <zephyr/net/mqtt.h>
#include <zephyr/net/sntp.h>
#include <zephyr/net/tls_credentials.h>
#include <zephyr/random/random.h>
#include <zephyr/posix/time.h>
#include <zephyr/logging/log.h>
#include <zephyr/kernel.h>


#if defined(CONFIG_MBEDTLS_MEMORY_DEBUG)
#include <mbedtls/memory_buffer_alloc.h>
#endif


#include <modem/nrf_modem_lib.h>
#include <modem/lte_lc.h>

static K_SEM_DEFINE(lte_connected, 0, 1);

LOG_MODULE_REGISTER(aws, LOG_LEVEL_DBG);

// #define SNTP_SERVER "0.pool.ntp.org"
#define SNTP_SERVER "pool.ntp.org"


#define AWS_BROKER_PORT CONFIG_AWS_MQTT_PORT

#define MQTT_RX_BUFFER_SIZE 512u
#define MQTT_TX_BUFFER_SIZE 512u
#define APP_BUFFER_SIZE	 256u

#define MAX_RETRIES	    10u
#define BACKOFF_EXP_BASE_MS 1000u
#define BACKOFF_EXP_MAX_MS  60000u
#define BACKOFF_CONST_MS    5000u

static struct sockaddr_in aws_broker;

static uint8_t rx_buffer[MQTT_RX_BUFFER_SIZE];
static uint8_t tx_buffer[MQTT_TX_BUFFER_SIZE];
static uint8_t buffer[APP_BUFFER_SIZE]; /* Shared between published and received messages */

static struct mqtt_client client_ctx;

static const char mqtt_client_name[] = CONFIG_AWS_THING_NAME;

static bool do_publish;	  /* Trigger client to publish */
static bool do_subscribe; /* Trigger client to subscribe */

#if (CONFIG_AWS_MQTT_PORT == 443 && !defined(CONFIG_MQTT_LIB_WEBSOCKET))
static const char * const alpn_list[] = {"x-amzn-mqtt-ca"};
#endif

static const sec_tag_t sec_tls_tags[] = { 212 };


static int subscribe_topic(void)
{
	int ret;
	struct mqtt_topic topics[] = {{
		.topic = {.utf8 = CONFIG_AWS_SUBSCRIBE_TOPIC,
			  .size = strlen(CONFIG_AWS_SUBSCRIBE_TOPIC)},
		.qos = CONFIG_AWS_QOS,
	}};
	const struct mqtt_subscription_list sub_list = {
		.list = topics,
		.list_count = ARRAY_SIZE(topics),
		.message_id = 1u,
	};

	LOG_INF("Subscribing to %hu topic(s)", sub_list.list_count);

	ret = mqtt_subscribe(&client_ctx, &sub_list);
	if (ret != 0) {
		LOG_ERR("Failed to subscribe to topics: %d", ret);
	}

	return ret;
}

static int publish_message(const char *topic, size_t topic_len, uint8_t *payload,
			   size_t payload_len)
{
	static uint32_t message_id = 1u;

	int ret;
	struct mqtt_publish_param msg;

	msg.retain_flag = 0u;
	msg.message.topic.topic.utf8 = topic;
	msg.message.topic.topic.size = topic_len;
	msg.message.topic.qos = CONFIG_AWS_QOS;
	msg.message.payload.data = payload;
	msg.message.payload.len = payload_len;
	msg.message_id = message_id++;
 for (int i = 0; i < 3; i++)
 {
	 ret = mqtt_publish(&client_ctx, &msg);
	if (ret != 0) {
		LOG_ERR("Failed to publish message: %d", ret);
	}

	LOG_INF("PUBLISHED on topic \"%s\" [ id: %u qos: %u ], payload: %u B", topic,
		msg.message_id, msg.message.topic.qos, payload_len);
		k_msleep(100);

 }
 
	
	return ret;
}

static ssize_t handle_published_message(const struct mqtt_publish_param *pub)
{
	int ret;
	size_t received = 0u;
	const size_t message_size = pub->message.payload.len;

	LOG_INF("RECEIVED on topic \"%s\" [ id: %u qos: %u ] payload: %u B",
		(const char *)pub->message.topic.topic.utf8, pub->message_id,
		pub->message.topic.qos, message_size);

	while (received < message_size) {
		size_t to_read = MIN(message_size - received, sizeof(buffer));

		ret = mqtt_read_publish_payload_blocking(&client_ctx, buffer, to_read);
		if (ret < 0) {
			return ret;
		} else if (ret == 0) {
			break;
		}

		received += ret;
		LOG_HEXDUMP_DBG(buffer, ret, "Received payload:");
	}

	/* Send ACK */
	switch (pub->message.topic.qos) {
	case MQTT_QOS_1_AT_LEAST_ONCE: {
		struct mqtt_puback_param puback;

		puback.message_id = pub->message_id;
		mqtt_publish_qos1_ack(&client_ctx, &puback);
	} break;
	case MQTT_QOS_2_EXACTLY_ONCE: /* unhandled (not supported by AWS) */
	case MQTT_QOS_0_AT_MOST_ONCE: /* nothing to do */
	default:
		break;
	}

	return received;
}

const char *mqtt_evt_type_to_str(enum mqtt_evt_type type)
{
	static const char *const types[] = {
		"CONNACK", "DISCONNECT", "PUBLISH", "PUBACK",	"PUBREC",
		"PUBREL",  "PUBCOMP",	 "SUBACK",  "UNSUBACK", "PINGRESP",
	};

	return (type < ARRAY_SIZE(types)) ? types[type] : "<unknown>";
}

static void mqtt_event_cb(struct mqtt_client *client, const struct mqtt_evt *evt)
{
	LOG_DBG("MQTT event: %s [%u] result: %d", mqtt_evt_type_to_str(evt->type), evt->type,
		evt->result);

	switch (evt->type) {
	case MQTT_EVT_CONNACK: {
		do_subscribe = true;
	} break;

	case MQTT_EVT_PUBLISH: {
		const struct mqtt_publish_param *pub = &evt->param.publish;

		handle_published_message(pub);
	} break;

	case MQTT_EVT_SUBACK: {
		do_publish = true;
	} break;

	case MQTT_EVT_PUBACK:
	case MQTT_EVT_DISCONNECT:
	case MQTT_EVT_PUBREC:
	case MQTT_EVT_PUBREL:
	case MQTT_EVT_PUBCOMP:
	case MQTT_EVT_PINGRESP:
	case MQTT_EVT_UNSUBACK:
	default:
		break;
	}
}

static void aws_client_setup(void)
{
	mqtt_client_init(&client_ctx);

	client_ctx.broker = &aws_broker;
	client_ctx.evt_cb = mqtt_event_cb;

	client_ctx.client_id.utf8 = (uint8_t *)mqtt_client_name;
	client_ctx.client_id.size = sizeof(mqtt_client_name) - 1;
	client_ctx.password = NULL;
	client_ctx.user_name = NULL;

	client_ctx.keepalive = CONFIG_MQTT_KEEPALIVE;

	client_ctx.protocol_version = MQTT_VERSION_3_1_1;

	client_ctx.rx_buf = rx_buffer;
	client_ctx.rx_buf_size = sizeof(rx_buffer);
	client_ctx.tx_buf = tx_buffer;
	client_ctx.tx_buf_size = sizeof(tx_buffer);

	/* setup TLS */
	client_ctx.transport.type = MQTT_TRANSPORT_SECURE;
	struct mqtt_sec_config *const tls_config = &client_ctx.transport.tls.config;

	tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED;
	tls_config->cipher_list = NULL;
	tls_config->sec_tag_list = sec_tls_tags;
	tls_config->sec_tag_count = ARRAY_SIZE(sec_tls_tags);
	tls_config->hostname = CONFIG_AWS_ENDPOINT;
	tls_config->cert_nocopy = TLS_CERT_NOCOPY_NONE;
#if (CONFIG_AWS_MQTT_PORT == 443 && !defined(CONFIG_MQTT_LIB_WEBSOCKET))
	tls_config->alpn_protocol_name_list = alpn_list;
	tls_config->alpn_protocol_name_count = ARRAY_SIZE(alpn_list);
#endif
}

struct backoff_context {
	uint16_t retries_count;
	uint16_t max_retries;

#if defined(CONFIG_AWS_EXPONENTIAL_BACKOFF)
	uint32_t attempt_max_backoff; /* ms */
	uint32_t max_backoff;	      /* ms */
#endif
};

static void backoff_context_init(struct backoff_context *bo)
{
	__ASSERT_NO_MSG(bo != NULL);

	bo->retries_count = 0u;
	bo->max_retries = MAX_RETRIES;

#if defined(CONFIG_AWS_EXPONENTIAL_BACKOFF)
	bo->attempt_max_backoff = BACKOFF_EXP_BASE_MS;
	bo->max_backoff = BACKOFF_EXP_MAX_MS;
#endif
}

static void backoff_get_next(struct backoff_context *bo, uint32_t *next_backoff_ms)
{
	__ASSERT_NO_MSG(bo != NULL);
	__ASSERT_NO_MSG(next_backoff_ms != NULL);

#if defined(CONFIG_AWS_EXPONENTIAL_BACKOFF)
	if (bo->retries_count <= bo->max_retries) {
		*next_backoff_ms = sys_rand32_get() % (bo->attempt_max_backoff + 1u);

		/* Calculate max backoff for the next attempt (~ 2**attempt) */
		bo->attempt_max_backoff = MIN(bo->attempt_max_backoff * 2u, bo->max_backoff);
		bo->retries_count++;
	}
#else
	*next_backoff_ms = BACKOFF_CONST_MS;
#endif
}

static int aws_client_try_connect(void)
{
	int ret;
	uint32_t backoff_ms;
	struct backoff_context bo;

	backoff_context_init(&bo);

	while (bo.retries_count <= bo.max_retries) {
		ret = mqtt_connect(&client_ctx);
		if (ret == 0) {
			goto exit;
		}

		backoff_get_next(&bo, &backoff_ms);

		LOG_ERR("Failed to connect: %d backoff delay: %u ms", ret, backoff_ms);
		k_msleep(backoff_ms);
	}

exit:
	return ret;
}

// static const char publish_payload[] = "aaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa00000000000";
static const char publish_payload[] = "aaaaaa";
static int publish(void)
{
	// LOG_INF("%s", buffer);
	return publish_message(CONFIG_AWS_PUBLISH_TOPIC, strlen(CONFIG_AWS_PUBLISH_TOPIC), (uint8_t *)publish_payload,
			       strlen(publish_payload));
}

void aws_client_loop(void)
{
	int rc;
	int timeout;
	struct pollfd fds;
	static int64_t last_publish = 0;

	aws_client_setup();

	rc = aws_client_try_connect();
	if (rc != 0) {
		goto cleanup;
	}

	fds.fd = client_ctx.transport.tcp.sock;
	fds.events = POLLIN;

	for (;;) {
		timeout = mqtt_keepalive_time_left(&client_ctx);
		rc = poll(&fds, 1u, timeout);
		if (rc >= 0) {
			if (fds.revents & POLLIN) {
				rc = mqtt_input(&client_ctx);
				if (rc != 0) {
					LOG_ERR("Failed to read MQTT input: %d", rc);
					break;
				}
			}

			if (fds.revents & (POLLHUP | POLLERR)) {
				LOG_ERR("Socket closed/error");
				break;
			}

			rc = mqtt_live(&client_ctx);
			if ((rc != 0) && (rc != -EAGAIN)) {
				LOG_ERR("Failed to live MQTT: %d", rc);
				break;
			}
		} else {
			LOG_ERR("poll failed: %d", rc);
			break;
		}

		if (do_publish) {
			do_publish = false;
			publish();
			last_publish = k_uptime_get();
		}

		if (do_subscribe) {
			do_subscribe = false;
			subscribe_topic();
		}

		if (!do_publish && !do_subscribe) {
			int64_t now = k_uptime_get();
			if (now - last_publish >= 1000) {
				do_publish = true;
			}
		}
	}

cleanup:
	mqtt_disconnect(&client_ctx);

	close(fds.fd);
	fds.fd = -1;
}

static int resolve_broker_addr(struct sockaddr_in *broker)
{
	int ret;
	struct addrinfo *ai = NULL;

	const struct addrinfo hints = {
		.ai_family = AF_INET,
		.ai_socktype = SOCK_STREAM,
		.ai_protocol = 0,
	};
	char port_string[6] = {0};

	sprintf(port_string, "%d", AWS_BROKER_PORT);
	ret = getaddrinfo(CONFIG_AWS_ENDPOINT, port_string, &hints, &ai);
	if (ret == 0) {
		char addr_str[INET_ADDRSTRLEN];

		memcpy(broker, ai->ai_addr, MIN(ai->ai_addrlen, sizeof(struct sockaddr_storage)));

		inet_ntop(AF_INET, &broker->sin_addr, addr_str, sizeof(addr_str));
		LOG_INF("Resolved: %s:%u", addr_str, htons(broker->sin_port));
	} else {
		LOG_ERR("failed to resolve hostname err = %d (errno = %d)", ret, errno);
	}

	freeaddrinfo(ai);

	return ret;
}

static void lte_handler(const struct lte_lc_evt *const evt)
{
    switch (evt->type)
    {
    case LTE_LC_EVT_NW_REG_STATUS:
        if ((evt->nw_reg_status != LTE_LC_NW_REG_REGISTERED_HOME) &&
            (evt->nw_reg_status != LTE_LC_NW_REG_REGISTERED_ROAMING))
        {
            break;
        }
        LOG_INF("Network registration status: %s",
               evt->nw_reg_status == LTE_LC_NW_REG_REGISTERED_HOME ? "Connected - home network" : "Connected - roaming");
        k_sem_give(&lte_connected);
        break;
    case LTE_LC_EVT_RRC_UPDATE:
        LOG_INF("RRC mode: %s", evt->rrc_mode == LTE_LC_RRC_MODE_CONNECTED ? "Connected" : "Idle");
        break;
    default:
        break;
    }
}
static int modem_configure(void)
{
    int err = nrf_modem_lib_init();
    if (err) {
        return err;
    }
    err = lte_lc_connect_async(lte_handler);
    if (err) {
        return err;
    }
    k_sem_take(&lte_connected, K_FOREVER);
    return 0;
}

int main(void)
{
	modem_configure();
#if defined(CONFIG_NET_DHCPV4)
	app_dhcpv4_startup();
#endif

	for (;;) {
		resolve_broker_addr(&aws_broker);

		aws_client_loop();

#if defined(CONFIG_MBEDTLS_MEMORY_DEBUG)
		size_t cur_used, cur_blocks, max_used, max_blocks;

		mbedtls_memory_buffer_alloc_cur_get(&cur_used, &cur_blocks);
		mbedtls_memory_buffer_alloc_max_get(&max_used, &max_blocks);
		LOG_INF("mbedTLS heap usage: MAX %u/%u (%u) CUR %u (%u)", max_used,
			CONFIG_MBEDTLS_HEAP_SIZE, max_blocks, cur_used, cur_blocks);
#endif

		k_sleep(K_SECONDS(1));
	}

	return 0;
}




CONFIG_AWS_IOT_LOG_LEVEL_DBG=y
CONFIG_AWS_TEST_SUITE_DQP=n

CONFIG_MAIN_STACK_SIZE=8192
CONFIG_ENTROPY_GENERATOR=y
CONFIG_TEST_RANDOM_GENERATOR=y
CONFIG_INIT_STACKS=y
CONFIG_HW_STACK_PROTECTION=y
CONFIG_REQUIRES_FULL_LIBC=y
CONFIG_SNTP=y
CONFIG_JSON_LIBRARY=y
CONFIG_POSIX_API=y

# DNS
CONFIG_DNS_RESOLVER=y
CONFIG_DNS_RESOLVER_ADDITIONAL_BUF_CTR=2
CONFIG_DNS_RESOLVER_MAX_SERVERS=1
CONFIG_DNS_SERVER_IP_ADDRESSES=y
CONFIG_DNS_SERVER1="8.8.8.8"
CONFIG_NET_SOCKETS_DNS_TIMEOUT=5000
CONFIG_DNS_RESOLVER_LOG_LEVEL_DBG=n

# Generic networking options
CONFIG_NETWORKING=y
CONFIG_NET_UDP=y
CONFIG_NET_TCP=y
CONFIG_NET_IPV6=y
CONFIG_NET_IPV4=y
CONFIG_NET_SOCKETS=y
CONFIG_NET_SOCKETS_SOCKOPT_TLS=y

# Logging
CONFIG_LOG=y

# Network buffers
CONFIG_NET_PKT_RX_COUNT=32
CONFIG_NET_PKT_TX_COUNT=16
CONFIG_NET_BUF_RX_COUNT=64
CONFIG_NET_BUF_TX_COUNT=32

# MQTT
CONFIG_MQTT_LIB=y
CONFIG_MQTT_LIB_TLS=y
CONFIG_MQTT_KEEPALIVE=600
CONFIG_MQTT_LIB_TLS_USE_ALPN=y

# TLS
CONFIG_NRF_SECURITY=y
CONFIG_MBEDTLS_TLS_LIBRARY=y
CONFIG_MBEDTLS_LEGACY_CRYPTO_C=y
CONFIG_MBEDTLS_ENABLE_HEAP=y
CONFIG_MBEDTLS_HEAP_SIZE=10240
CONFIG_MBEDTLS_SSL_MAX_CONTENT_LEN=10240
CONFIG_MBEDTLS_PEM_CERTIFICATE_FORMAT=y
CONFIG_MBEDTLS_SERVER_NAME_INDICATION=y
CONFIG_MBEDTLS_AES_ROM_TABLES=y
CONFIG_MBEDTLS_TLS_VERSION_1_2=y
CONFIG_MBEDTLS_MEMORY_DEBUG=y
CONFIG_MBEDTLS_HAVE_TIME_DATE=y
CONFIG_MBEDTLS_SSL_ALPN=y

CONFIG_MBEDTLS_SSL_CLI_C=y
CONFIG_MBEDTLS_X509_CRT_PARSE_C=y
CONFIG_MBEDTLS_KEY_EXCHANGE_ECDHE_ECDSA_ENABLED=y
CONFIG_MBEDTLS_KEY_EXCHANGE_ECDH_ECDSA_ENABLED=y
CONFIG_MBEDTLS_KEY_EXCHANGE_PSK_ENABLED=y
CONFIG_MBEDTLS_KEY_EXCHANGE_ECDHE_PSK_ENABLED=y

CONFIG_MBEDTLS_SSL_SRV_C=y
CONFIG_MBEDTLS_SSL_CLI_C=y
CONFIG_MBEDTLS_KEY_EXCHANGE_RSA_ENABLED=y

CONFIG_HEAP_MEM_POOL_SIZE=8192

CONFIG_MBEDTLS_CIPHER=y
CONFIG_MBEDTLS_MD=y

# Required mbedTLS dependencies
CONFIG_MBEDTLS_PK_C=y
CONFIG_MBEDTLS_PK_PARSE_C=y
CONFIG_MBEDTLS_PK_WRITE_C=y
CONFIG_MBEDTLS_RSA_C=y
CONFIG_MBEDTLS_PKCS1_V15=y
CONFIG_MBEDTLS_ECP_C=y
CONFIG_MBEDTLS_ECDSA_C=y
CONFIG_MBEDTLS_ECDH_C=y
CONFIG_MBEDTLS_DHM_C=y
CONFIG_MBEDTLS_GCM_C=y
CONFIG_MBEDTLS_SHA256_C=y
CONFIG_MBEDTLS_X509_USE_C=y
CONFIG_MBEDTLS_X509_CRT_PARSE_C=y

CONFIG_MBEDTLS_RSA_C=y
CONFIG_MBEDTLS_DHM_C=y

CONFIG_MBEDTLS_KEY_EXCHANGE_ECDHE_ECDSA_ENABLED=y
CONFIG_MBEDTLS_KEY_EXCHANGE_ECDH_ECDSA_ENABLED=y
CONFIG_MBEDTLS_KEY_EXCHANGE_PSK_ENABLED=y
CONFIG_MBEDTLS_KEY_EXCHANGE_ECDHE_PSK_ENABLED=y
CONFIG_MBEDTLS_SSL_CONTEXT_SERIALIZATION=y
CONFIG_MBEDTLS_SSL_PROTO_TLS1_2=y



CONFIG_CONSOLE=n #for wifi uart keep this n
CONFIG_USE_SEGGER_RTT=y
CONFIG_RTT_CONSOLE=y
CONFIG_LOG_BACKEND_RTT=y
CONFIG_LOG_BACKEND_UART=n
CONFIG_UART_CONSOLE=n

CONFIG_NRF_MODEM_LIB=y
CONFIG_LTE_LINK_CONTROL=y


# Let the modem handle sockets, DNS, and SNTP
CONFIG_NET_NATIVE=n
CONFIG_NET_SOCKETS_OFFLOAD=y

CONFIG_SNTP_LOG_LEVEL_DBG=y

in nrf 9151 and sdk 2.9.0

we use cellular with aws

but here when we publish 10KB payload it take 6 sec and 6 byte payload take 1 sec so what we need to change for fast transmission

log:

00> [00:01:14.933,380] <inf> aws: RRC mode: Connected
00> [00:01:24.036,407] <inf> aws: Network registration status: Connected - roaming
00> [00:01:24.459,625] <inf> aws: Resolved: 3.111.212.253:8883
00> [00:01:32.697,906] <dbg> aws: mqtt_event_cb: MQTT event: CONNACK [0] result: 0
00> [00:01:32.697,967] <inf> aws: Subscribing to 1 topic(s)
00> [00:01:33.297,943] <dbg> aws: mqtt_event_cb: MQTT event: SUBACK [7] result: 0
00> [00:01:39.516,448] <inf> aws: PUBLISHED on topic "zephyr_sample/data" [ id: 1 qos: 0 ], payload: 9319 B
00> [00:01:45.363,739] <inf> aws: PUBLISHED on topic "zephyr_sample/data" [ id: 1 qos: 0 ], payload: 9319 B
00> [00:01:52.676,635] <inf> aws: PUBLISHED on topic "zephyr_sample/data" [ id: 1 qos: 0 ], payload: 9319 B
00> [00:02:24.236,785] <inf> aws: RRC mode: Idle

so what will do for fast transmission of 10 KB payload



  • Hi

    Before jumping to assumptions, can you confirm that you are working on a project based on the AWS IoT MQTT sample? https://docs.nordicsemi.com/bundle/ncs-latest/page/zephyr/samples/net/cloud/aws_iot_mqtt/README.html#aws-iot-mqtt 

    If not, please specify what your project is based on and I'll try to assist you as best I can.

    For starters: I think this is mostly due to how often your device publishes messages, and not too much to do with the size of the payload. With the 6 bytes you still have to publish a message, and I see for example that there is a 100ms k_msleep after each published message in your main.c file. Then it also probably takes some time to publish the actual message etc. 

    Can you share what transmission speed you expect/aim for with your 10kB payload exactly?

    Best regards,

    Simon

  • yes this is aws_iot_mqtt example

    we want to publish 10KB/sec speed to publish per payload but it take 6 - 10 sec for one publish insteed of 1 sec

    so please guide us

  • Hi

    Can you confirm you're connected to an LTE network? NB-IoT would not have high enough data rates to allow this. It also depends on your coverage, but if you are connected to an LTE network and have a good connection, you should be able to achieve up to 70kB/s. 

    You can try to test throughput with iperf3 in our modem shell sample to see what throughput you can expect with your connection.

    Best regards,

    Simon

  • Hi Simon,

    Thanks for the clarification.

    We are using the nRF9151 with SDK 2.9.0.
    Our goal is to publish a 10 KB payload every second using LTE-M (or NB-IoT if possible).
    However, with our current implementation, even a small payload takes around 4–10 seconds per publish.

    Could you please guide us on what we should check or modify?
    We can share our code below if needed.

    /**
     * cel_aws.c
     * Cellular AWS IoT Driver
     */
    
    #include <zephyr/logging/log.h>
    #include <zephyr/net/socket.h>
    #include <zephyr/net/dns_resolve.h>
    #include <zephyr/net/mqtt.h>
    #include <zephyr/net/tls_credentials.h>
    #include <zephyr/random/random.h>
    #include <modem/lte_lc.h>
    #include <modem/nrf_modem_lib.h>
    #include <string.h>
    #include <stdio.h>
    
    LOG_MODULE_REGISTER(cel_aws, LOG_LEVEL_DBG);
    
    /* Configurable constants */
    #define SNTP_SERVER "pool.ntp.org"
    #define AWS_BROKER_PORT CONFIG_AWS_MQTT_PORT
    
    #define MQTT_RX_BUF_SIZE (512)
    #define MQTT_TX_BUF_SIZE (512)
    #define APP_BUF_SIZE 256
    
    #define MAX_RETRIES 10
    #define BACKOFF_BASE_MS 1000
    #define BACKOFF_MAX_MS 60000
    #define BACKOFF_CONST_MS 5000
    
    /* TLS */
    static const sec_tag_t sec_tls_tags[] = {212};
    #if (CONFIG_AWS_MQTT_PORT == 443 && !defined(CONFIG_MQTT_LIB_WEBSOCKET))
    static const char *const alpn_list[] = {"x-amzn-mqtt-ca"};
    #endif
    
    /* Static variables */
    struct mqtt_client client_ctx;
    static uint8_t rx_buffer[MQTT_RX_BUF_SIZE];
    static uint8_t tx_buffer[MQTT_TX_BUF_SIZE];
    static uint8_t app_buffer[APP_BUF_SIZE];
    static struct sockaddr_in broker_addr;
    
    // static cel_aws_rx_cb_t user_rx_cb = NULL;
    static K_SEM_DEFINE(lte_ready, 0, 1);
    static int mqtt_sock = -1;
    
    /* Forward declarations */
    static void lte_event_handler(const struct lte_lc_evt *evt);
    static void mqtt_event_handler(struct mqtt_client *client, const struct mqtt_evt *evt);
    static int resolve_broker(void);
    static int backoff_wait(uint32_t attempt);
    
    /* === LTE Functions === */
    
    int cel_lte_connect(void)
    {
        int err;
    
        err = nrf_modem_lib_init();
        if (err) {
            LOG_ERR("Modem lib init failed: %d", err);
            return err;
        }
    
        err = lte_lc_system_mode_set(LTE_LC_SYSTEM_MODE_NBIOT, LTE_LC_SYSTEM_MODE_PREFER_AUTO);
        if (err) {
            LOG_ERR("Failed to set NB-IoT mode: %d", err);
            return err;
        }
    
        err = lte_lc_connect_async(lte_event_handler);
        if (err) {
            LOG_ERR("LTE connect async failed: %d", err);
            return err;
        }
    
        LOG_INF("Waiting for LTE connection...");
        k_sem_take(&lte_ready, K_FOREVER);
        LOG_INF("LTE Connected");
    
        return 0;
    }
    
    void cel_lte_disconnect(void)
    {
        lte_lc_power_off();
        nrf_modem_lib_shutdown();
        LOG_INF("LTE disconnected and modem powered off");
    }
    
    static void lte_event_handler(const struct lte_lc_evt *evt)
    {
        switch (evt->type) {
        case LTE_LC_EVT_NW_REG_STATUS:
            if (evt->nw_reg_status == LTE_LC_NW_REG_REGISTERED_HOME ||
                evt->nw_reg_status == LTE_LC_NW_REG_REGISTERED_ROAMING) {
                LOG_INF("LTE Registered: %s",
                        evt->nw_reg_status == LTE_LC_NW_REG_REGISTERED_HOME ? "Home" : "Roaming");
                k_sem_give(&lte_ready);
            }
            break;
        case LTE_LC_EVT_RRC_UPDATE:
            LOG_INF("RRC: %s", evt->rrc_mode == LTE_LC_RRC_MODE_CONNECTED ? "Connected" : "Idle");
            break;
        default:
            break;
        }
    }
    
    /* === AWS MQTT Functions === */
    
    static int resolve_broker(void)
    {
        struct addrinfo *res;
        struct addrinfo hints = {
            .ai_family = AF_INET,
            .ai_socktype = SOCK_STREAM,
        };
        char port_str[6];
        snprintf(port_str, sizeof(port_str), "%d", AWS_BROKER_PORT);
    
        int ret = getaddrinfo(CONFIG_AWS_ENDPOINT, port_str, &hints, &res);
        if (ret != 0) {
            LOG_ERR("DNS resolve failed: %d", ret);
            return -EINVAL;
        }
    
        memcpy(&broker_addr, res->ai_addr, sizeof(struct sockaddr_in));
        freeaddrinfo(res);
    
        char ip_str[INET_ADDRSTRLEN];
        inet_ntop(AF_INET, &broker_addr.sin_addr, ip_str, sizeof(ip_str));
        LOG_INF("Broker resolved: %s:%d", ip_str, ntohs(broker_addr.sin_port));
    
        return 0;
    }
    
    static void cel_aws_client_init(void)
    {
        mqtt_client_init(&client_ctx);
    
        client_ctx.broker = &broker_addr;
        client_ctx.evt_cb = mqtt_event_handler;
        client_ctx.client_id.utf8 = (uint8_t *)CONFIG_AWS_THING_NAME;
        client_ctx.client_id.size = strlen(CONFIG_AWS_THING_NAME);
        client_ctx.password = NULL;
        client_ctx.user_name = NULL;
        client_ctx.protocol_version = MQTT_VERSION_3_1_1;
        client_ctx.rx_buf = rx_buffer;
        client_ctx.rx_buf_size = sizeof(rx_buffer);
        client_ctx.tx_buf = tx_buffer;
        client_ctx.tx_buf_size = sizeof(tx_buffer);
        client_ctx.keepalive = CONFIG_MQTT_KEEPALIVE;
    
        /* TLS Config */
        client_ctx.transport.type = MQTT_TRANSPORT_SECURE;
        struct mqtt_sec_config *tls = &client_ctx.transport.tls.config;
        tls->peer_verify = TLS_PEER_VERIFY_REQUIRED;
        tls->sec_tag_list = sec_tls_tags;
        tls->sec_tag_count = ARRAY_SIZE(sec_tls_tags);
        tls->hostname = CONFIG_AWS_ENDPOINT;
    #if (CONFIG_AWS_MQTT_PORT == 443 && !defined(CONFIG_MQTT_LIB_WEBSOCKET))
        tls->alpn_protocol_name_list = alpn_list;
        tls->alpn_protocol_name_count = ARRAY_SIZE(alpn_list);
    #endif
    }
    
    int cel_aws_connect(void)
    {
        int ret;
    
        ret = resolve_broker();
        if (ret != 0) return ret;
    
        cel_aws_client_init();
    
        for (int i = 0; i <= MAX_RETRIES; i++) {
            ret = mqtt_connect(&client_ctx);
            if (ret == 0) {
                mqtt_sock = client_ctx.transport.tls.sock;
                LOG_INF("AWS MQTT Connected");
                return 0;
            }
            int delay = backoff_wait(i);
            LOG_ERR("MQTT connect failed: %d, retry in %d ms", ret, delay);
            k_msleep(delay);
        }
    
        return ret;
    }
    
    void cel_aws_disconnect(void)
    {
        if (mqtt_sock >= 0) {
            mqtt_disconnect(&client_ctx);
            close(mqtt_sock);
            mqtt_sock = -1;
            LOG_INF("AWS MQTT Disconnected");
        }
    }
    
    int cel_aws_subscribe(const char *topic, enum mqtt_qos qos)
    {
        struct mqtt_topic t = {
            .topic.utf8 = (uint8_t *)topic,
            .topic.size = strlen(topic),
            .qos = qos
        };
        struct mqtt_subscription_list list = {
            .list = &t,
            .list_count = 1,
            .message_id = (uint16_t)sys_rand32_get()
        };
    
        int ret = mqtt_subscribe(&client_ctx, &list);
        if (ret == 0) {
            LOG_INF("Subscribed to: %s", topic);
        } else {
            LOG_ERR("Subscribe failed: %d", ret);
        }
        return ret;
    }
    
    int cel_aws_publish(const char *topic, const uint8_t *payload, size_t payload_len, enum mqtt_qos qos)
    {
        struct mqtt_publish_param param = {
            .message.topic.topic.utf8 = (uint8_t *)topic,
            .message.topic.topic.size = strlen(topic),
            .message.topic.qos = qos,
            .message.payload.data = (uint8_t *)payload,
            .message.payload.len = payload_len,
            .message_id = (uint16_t)sys_rand32_get(),
            .retain_flag = 0
        };
    
        int ret = mqtt_publish(&client_ctx, &param);
        if (ret == 0) {
            LOG_INF("Published %u bytes to %s", payload_len, topic);
        } else {
            LOG_ERR("Publish failed: %d", ret);
        }
        return ret;
    }
    
    int cel_aws_process(int timeout_ms)
    {
        struct pollfd fds = { .fd = mqtt_sock, .events = POLLIN };
        int ret = poll(&fds, 1, timeout_ms);
    
        if (ret > 0 && (fds.revents & POLLIN)) {
            if (mqtt_input(&client_ctx) != 0) return -1;
        }
    
        if (fds.revents & (POLLHUP | POLLERR)) {
            LOG_ERR("MQTT socket error");
            return -1;
        }
    
        if (mqtt_live(&client_ctx) != 0 && mqtt_live(&client_ctx) != -EAGAIN) {
            return -1;
        }
    
        return 0;
    }
    
    
    /* === Event Handlers === */
    
    static void mqtt_event_handler(struct mqtt_client *client, const struct mqtt_evt *evt)
    {
        switch (evt->type) {
        case MQTT_EVT_PUBLISH: {
            const struct mqtt_publish_param *p = &evt->param.publish;
            size_t received = 0;
    
            LOG_INF("RX on %.*s [%u bytes]", p->message.topic.topic.size,
                    p->message.topic.topic.utf8, p->message.payload.len);
    
            while (received < p->message.payload.len) {
                size_t to_read = MIN(p->message.payload.len - received, sizeof(app_buffer));
                int len = mqtt_read_publish_payload_blocking(&client_ctx, app_buffer, to_read);
                if (len <= 0) break;
                received += len;
    
                // if (user_rx_cb) {
                //     user_rx_cb((const char *)p->message.topic.topic.utf8, app_buffer, len);
                // }
            }
    
            if (p->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
                struct mqtt_puback_param ack = { .message_id = p->message_id };
                mqtt_publish_qos1_ack(&client_ctx, &ack);
            }
            break;
        }
        case MQTT_EVT_CONNACK:
            LOG_INF("MQTT CONNACK received");
            break;
        case MQTT_EVT_DISCONNECT:
            LOG_INF("MQTT Disconnected");
            break;
        default:
            break;
        }
    }
    
    static int backoff_wait(uint32_t attempt)
    {
    #ifdef CONFIG_AWS_EXPONENTIAL_BACKOFF
        uint32_t max_backoff = BACKOFF_BASE_MS << attempt;
        if (max_backoff > BACKOFF_MAX_MS) max_backoff = BACKOFF_MAX_MS;
        uint32_t delay = sys_rand32_get() % (max_backoff + 1);
        return delay > 0 ? delay : BACKOFF_BASE_MS;
    #else
        return BACKOFF_CONST_MS;
    #endif
    }
    
    
    // Callback for incoming messages
    void on_mqtt_rx(const char *topic, const uint8_t *data, size_t len)
    {
        char buf[APP_BUF_SIZE + 1] = {0};
        size_t copy = MIN(len, APP_BUF_SIZE);
        memcpy(buf, data, copy);
        LOG_INF("Received on %s: %.*s", topic, copy, buf);
    }
    
    void main(void)
    {
        LOG_INF("Starting Cellular AWS Application");
    
        // 1. Connect LTE
        if (cel_lte_connect() != 0) {
            LOG_ERR("LTE failed");
            return;
        }
    
        // 2. Connect to AWS
        if (cel_aws_connect() != 0) {
            LOG_ERR("AWS connect failed");
            cel_lte_disconnect();
            return;
        }
    
        // 4. Subscribe
        cel_aws_subscribe(CONFIG_AWS_SUBSCRIBE_TOPIC, MQTT_QOS_1_AT_LEAST_ONCE);
    
        // 5. Publish 5 messages
        for (int i = 0; i < 5; i++) {
            char msg[64];
            snprintf(msg, sizeof(msg), "%s [%d]", payload, i + 1);
            cel_aws_publish(CONFIG_AWS_PUBLISH_TOPIC, (uint8_t *)msg, strlen(msg), MQTT_QOS_1_AT_LEAST_ONCE);
    
            // Process MQTT events
            int timeout = mqtt_keepalive_time_left(&client_ctx);
            cel_aws_process(timeout > 0 ? timeout : 1000);
    
            k_sleep(K_SECONDS(2));
        }
    
        // 6. Cleanup
        cel_aws_disconnect();
        cel_lte_disconnect();
    
        LOG_INF("Application done");
    }


    CONFIG_NET_SOCKETS_SOCKOPT_TLS=y
    CONFIG_NET_UDP=y
    CONFIG_NET_TCP=y
    CONFIG_NET_IPV4=y
    # CONFIG_NET_IPV6=y
    
    # DNS
    CONFIG_DNS_RESOLVER=y
    CONFIG_DNS_RESOLVER_ADDITIONAL_BUF_CTR=2
    CONFIG_DNS_RESOLVER_MAX_SERVERS=1
    CONFIG_DNS_SERVER_IP_ADDRESSES=y
    CONFIG_DNS_SERVER1="8.8.8.8"
    CONFIG_NET_SOCKETS_DNS_TIMEOUT=5000
    
    CONFIG_JSON_LIBRARY=y
    
    # AWS IoT MQTT
    CONFIG_AWS_IOT_LOG_LEVEL_DBG=y
    CONFIG_AWS_TEST_SUITE_DQP=n
    CONFIG_MQTT_LIB=y
    CONFIG_MQTT_LIB_TLS=y
    CONFIG_MQTT_KEEPALIVE=600
    CONFIG_MQTT_LIB_TLS_USE_ALPN=y
    
    # TLS (nRF Security only)
    CONFIG_NRF_SECURITY=y
    CONFIG_MBEDTLS_TLS_LIBRARY=y
    CONFIG_MBEDTLS_LEGACY_CRYPTO_C=y
    CONFIG_MBEDTLS_ENABLE_HEAP=y
    CONFIG_MBEDTLS_HEAP_SIZE=4096
    CONFIG_MBEDTLS_SSL_MAX_CONTENT_LEN=8192
    CONFIG_MBEDTLS_PEM_CERTIFICATE_FORMAT=y
    CONFIG_MBEDTLS_SERVER_NAME_INDICATION=y
    CONFIG_MBEDTLS_AES_ROM_TABLES=y
    CONFIG_MBEDTLS_TLS_VERSION_1_2=y
    CONFIG_MBEDTLS_MEMORY_DEBUG=y
    CONFIG_MBEDTLS_HAVE_TIME_DATE=y
    CONFIG_MBEDTLS_SSL_ALPN=y
    CONFIG_MBEDTLS_SSL_CLI_C=y
    CONFIG_MBEDTLS_X509_CRT_PARSE_C=y
    CONFIG_MBEDTLS_KEY_EXCHANGE_ECDHE_ECDSA_ENABLED=y
    CONFIG_MBEDTLS_KEY_EXCHANGE_ECDH_ECDSA_ENABLED=y
    CONFIG_MBEDTLS_KEY_EXCHANGE_PSK_ENABLED=y
    CONFIG_MBEDTLS_KEY_EXCHANGE_ECDHE_PSK_ENABLED=y
    CONFIG_MBEDTLS_SSL_SRV_C=y
    CONFIG_MBEDTLS_KEY_EXCHANGE_RSA_ENABLED=y
    CONFIG_MBEDTLS_CIPHER=y
    CONFIG_MBEDTLS_MD=y
    CONFIG_MBEDTLS_PK_C=y
    CONFIG_MBEDTLS_PK_PARSE_C=y
    CONFIG_MBEDTLS_PK_WRITE_C=y
    CONFIG_MBEDTLS_RSA_C=y
    CONFIG_MBEDTLS_PKCS1_V15=y
    CONFIG_MBEDTLS_ECP_C=y
    CONFIG_MBEDTLS_ECDSA_C=y
    CONFIG_MBEDTLS_ECDH_C=y
    CONFIG_MBEDTLS_DHM_C=y
    CONFIG_MBEDTLS_GCM_C=y
    CONFIG_MBEDTLS_SHA256_C=y
    CONFIG_MBEDTLS_X509_USE_C=y
    CONFIG_MBEDTLS_SSL_CONTEXT_SERIALIZATION=y
    CONFIG_MBEDTLS_SSL_PROTO_TLS1_2=y
    
    # LTE/Modem
    CONFIG_NRF_MODEM_LIB=y
    CONFIG_LTE_LINK_CONTROL=y
    
    # Sockets offload for modem (if using LTE for AWS)
    CONFIG_NET_NATIVE=n
    CONFIG_NET_SOCKETS_OFFLOAD=y
    


    Best regards,
    Milan

  • Hi

    You can most likely reduce the latency by setting MQTT_QOS_0_AT_MOST_ONCE on your messages to avoid extra ACKs per package and remove the delay. The delay is likely there to fill up the modem's TLS buffer before it's sent to AWS. 

    Anyways it's better to use a while-loop and try sending again with a smaller delay as long send returns -EAGAIN.

    The TLS buffer in the modem is ~2kB, so you can increase the TX buffer from 512 to 2kB at least. It looks like you're sending a buffer of 64 bytes each now, which is very inefficient, so filling the capacity of each send should be possible.

    A small delay after the cel_aws_publish is probably required to avoid starving the system if you have more than one thread, but a full 2 seconds shouldn't be necessary.

    One of our developers also recommended that you look at the aws_iot sample project for reference for this.

    Best regards,

    Simon

Related