Why 10 KB payload takes ~6 seconds, but 6 bytes takes ~1 second

/*
 * Copyright (c) 2023 Lucas Dietrich <[email protected]>
 *
 * SPDX-License-Identifier: Apache-2.0
 */

#include "creds/creds.h"
#include "dhcp.h"

#include <errno.h>
#include <stdio.h>
#include <stdlib.h>

#include <zephyr/net/socket.h>
#include <zephyr/net/dns_resolve.h>
#include <zephyr/net/mqtt.h>
#include <zephyr/net/sntp.h>
#include <zephyr/net/tls_credentials.h>
#include <zephyr/random/random.h>
#include <zephyr/posix/time.h>
#include <zephyr/logging/log.h>
#include <zephyr/kernel.h>


#if defined(CONFIG_MBEDTLS_MEMORY_DEBUG)
#include <mbedtls/memory_buffer_alloc.h>
#endif


#include <modem/nrf_modem_lib.h>
#include <modem/lte_lc.h>

static K_SEM_DEFINE(lte_connected, 0, 1);

LOG_MODULE_REGISTER(aws, LOG_LEVEL_DBG);

// #define SNTP_SERVER "0.pool.ntp.org"
#define SNTP_SERVER "pool.ntp.org"


#define AWS_BROKER_PORT CONFIG_AWS_MQTT_PORT

#define MQTT_RX_BUFFER_SIZE 512u
#define MQTT_TX_BUFFER_SIZE 512u
#define APP_BUFFER_SIZE	 256u

#define MAX_RETRIES	    10u
#define BACKOFF_EXP_BASE_MS 1000u
#define BACKOFF_EXP_MAX_MS  60000u
#define BACKOFF_CONST_MS    5000u

static struct sockaddr_in aws_broker;

static uint8_t rx_buffer[MQTT_RX_BUFFER_SIZE];
static uint8_t tx_buffer[MQTT_TX_BUFFER_SIZE];
static uint8_t buffer[APP_BUFFER_SIZE]; /* Shared between published and received messages */

static struct mqtt_client client_ctx;

static const char mqtt_client_name[] = CONFIG_AWS_THING_NAME;

static bool do_publish;	  /* Trigger client to publish */
static bool do_subscribe; /* Trigger client to subscribe */

#if (CONFIG_AWS_MQTT_PORT == 443 && !defined(CONFIG_MQTT_LIB_WEBSOCKET))
static const char * const alpn_list[] = {"x-amzn-mqtt-ca"};
#endif

static const sec_tag_t sec_tls_tags[] = { 212 };


static int subscribe_topic(void)
{
	int ret;
	struct mqtt_topic topics[] = {{
		.topic = {.utf8 = CONFIG_AWS_SUBSCRIBE_TOPIC,
			  .size = strlen(CONFIG_AWS_SUBSCRIBE_TOPIC)},
		.qos = CONFIG_AWS_QOS,
	}};
	const struct mqtt_subscription_list sub_list = {
		.list = topics,
		.list_count = ARRAY_SIZE(topics),
		.message_id = 1u,
	};

	LOG_INF("Subscribing to %hu topic(s)", sub_list.list_count);

	ret = mqtt_subscribe(&client_ctx, &sub_list);
	if (ret != 0) {
		LOG_ERR("Failed to subscribe to topics: %d", ret);
	}

	return ret;
}

static int publish_message(const char *topic, size_t topic_len, uint8_t *payload,
			   size_t payload_len)
{
	static uint32_t message_id = 1u;

	int ret;
	struct mqtt_publish_param msg;

	msg.retain_flag = 0u;
	msg.message.topic.topic.utf8 = topic;
	msg.message.topic.topic.size = topic_len;
	msg.message.topic.qos = CONFIG_AWS_QOS;
	msg.message.payload.data = payload;
	msg.message.payload.len = payload_len;
	msg.message_id = message_id++;
 for (int i = 0; i < 3; i++)
 {
	 ret = mqtt_publish(&client_ctx, &msg);
	if (ret != 0) {
		LOG_ERR("Failed to publish message: %d", ret);
	}

	LOG_INF("PUBLISHED on topic \"%s\" [ id: %u qos: %u ], payload: %u B", topic,
		msg.message_id, msg.message.topic.qos, payload_len);
		k_msleep(100);

 }
 
	
	return ret;
}

static ssize_t handle_published_message(const struct mqtt_publish_param *pub)
{
	int ret;
	size_t received = 0u;
	const size_t message_size = pub->message.payload.len;

	LOG_INF("RECEIVED on topic \"%s\" [ id: %u qos: %u ] payload: %u B",
		(const char *)pub->message.topic.topic.utf8, pub->message_id,
		pub->message.topic.qos, message_size);

	while (received < message_size) {
		size_t to_read = MIN(message_size - received, sizeof(buffer));

		ret = mqtt_read_publish_payload_blocking(&client_ctx, buffer, to_read);
		if (ret < 0) {
			return ret;
		} else if (ret == 0) {
			break;
		}

		received += ret;
		LOG_HEXDUMP_DBG(buffer, ret, "Received payload:");
	}

	/* Send ACK */
	switch (pub->message.topic.qos) {
	case MQTT_QOS_1_AT_LEAST_ONCE: {
		struct mqtt_puback_param puback;

		puback.message_id = pub->message_id;
		mqtt_publish_qos1_ack(&client_ctx, &puback);
	} break;
	case MQTT_QOS_2_EXACTLY_ONCE: /* unhandled (not supported by AWS) */
	case MQTT_QOS_0_AT_MOST_ONCE: /* nothing to do */
	default:
		break;
	}

	return received;
}

const char *mqtt_evt_type_to_str(enum mqtt_evt_type type)
{
	static const char *const types[] = {
		"CONNACK", "DISCONNECT", "PUBLISH", "PUBACK",	"PUBREC",
		"PUBREL",  "PUBCOMP",	 "SUBACK",  "UNSUBACK", "PINGRESP",
	};

	return (type < ARRAY_SIZE(types)) ? types[type] : "<unknown>";
}

static void mqtt_event_cb(struct mqtt_client *client, const struct mqtt_evt *evt)
{
	LOG_DBG("MQTT event: %s [%u] result: %d", mqtt_evt_type_to_str(evt->type), evt->type,
		evt->result);

	switch (evt->type) {
	case MQTT_EVT_CONNACK: {
		do_subscribe = true;
	} break;

	case MQTT_EVT_PUBLISH: {
		const struct mqtt_publish_param *pub = &evt->param.publish;

		handle_published_message(pub);
	} break;

	case MQTT_EVT_SUBACK: {
		do_publish = true;
	} break;

	case MQTT_EVT_PUBACK:
	case MQTT_EVT_DISCONNECT:
	case MQTT_EVT_PUBREC:
	case MQTT_EVT_PUBREL:
	case MQTT_EVT_PUBCOMP:
	case MQTT_EVT_PINGRESP:
	case MQTT_EVT_UNSUBACK:
	default:
		break;
	}
}

static void aws_client_setup(void)
{
	mqtt_client_init(&client_ctx);

	client_ctx.broker = &aws_broker;
	client_ctx.evt_cb = mqtt_event_cb;

	client_ctx.client_id.utf8 = (uint8_t *)mqtt_client_name;
	client_ctx.client_id.size = sizeof(mqtt_client_name) - 1;
	client_ctx.password = NULL;
	client_ctx.user_name = NULL;

	client_ctx.keepalive = CONFIG_MQTT_KEEPALIVE;

	client_ctx.protocol_version = MQTT_VERSION_3_1_1;

	client_ctx.rx_buf = rx_buffer;
	client_ctx.rx_buf_size = sizeof(rx_buffer);
	client_ctx.tx_buf = tx_buffer;
	client_ctx.tx_buf_size = sizeof(tx_buffer);

	/* setup TLS */
	client_ctx.transport.type = MQTT_TRANSPORT_SECURE;
	struct mqtt_sec_config *const tls_config = &client_ctx.transport.tls.config;

	tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED;
	tls_config->cipher_list = NULL;
	tls_config->sec_tag_list = sec_tls_tags;
	tls_config->sec_tag_count = ARRAY_SIZE(sec_tls_tags);
	tls_config->hostname = CONFIG_AWS_ENDPOINT;
	tls_config->cert_nocopy = TLS_CERT_NOCOPY_NONE;
#if (CONFIG_AWS_MQTT_PORT == 443 && !defined(CONFIG_MQTT_LIB_WEBSOCKET))
	tls_config->alpn_protocol_name_list = alpn_list;
	tls_config->alpn_protocol_name_count = ARRAY_SIZE(alpn_list);
#endif
}

struct backoff_context {
	uint16_t retries_count;
	uint16_t max_retries;

#if defined(CONFIG_AWS_EXPONENTIAL_BACKOFF)
	uint32_t attempt_max_backoff; /* ms */
	uint32_t max_backoff;	      /* ms */
#endif
};

static void backoff_context_init(struct backoff_context *bo)
{
	__ASSERT_NO_MSG(bo != NULL);

	bo->retries_count = 0u;
	bo->max_retries = MAX_RETRIES;

#if defined(CONFIG_AWS_EXPONENTIAL_BACKOFF)
	bo->attempt_max_backoff = BACKOFF_EXP_BASE_MS;
	bo->max_backoff = BACKOFF_EXP_MAX_MS;
#endif
}

static void backoff_get_next(struct backoff_context *bo, uint32_t *next_backoff_ms)
{
	__ASSERT_NO_MSG(bo != NULL);
	__ASSERT_NO_MSG(next_backoff_ms != NULL);

#if defined(CONFIG_AWS_EXPONENTIAL_BACKOFF)
	if (bo->retries_count <= bo->max_retries) {
		*next_backoff_ms = sys_rand32_get() % (bo->attempt_max_backoff + 1u);

		/* Calculate max backoff for the next attempt (~ 2**attempt) */
		bo->attempt_max_backoff = MIN(bo->attempt_max_backoff * 2u, bo->max_backoff);
		bo->retries_count++;
	}
#else
	*next_backoff_ms = BACKOFF_CONST_MS;
#endif
}

static int aws_client_try_connect(void)
{
	int ret;
	uint32_t backoff_ms;
	struct backoff_context bo;

	backoff_context_init(&bo);

	while (bo.retries_count <= bo.max_retries) {
		ret = mqtt_connect(&client_ctx);
		if (ret == 0) {
			goto exit;
		}

		backoff_get_next(&bo, &backoff_ms);

		LOG_ERR("Failed to connect: %d backoff delay: %u ms", ret, backoff_ms);
		k_msleep(backoff_ms);
	}

exit:
	return ret;
}

// static const char publish_payload[] = "aaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaAAAAAaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa00000000000";
static const char publish_payload[] = "aaaaaa";
static int publish(void)
{
	// LOG_INF("%s", buffer);
	return publish_message(CONFIG_AWS_PUBLISH_TOPIC, strlen(CONFIG_AWS_PUBLISH_TOPIC), (uint8_t *)publish_payload,
			       strlen(publish_payload));
}

void aws_client_loop(void)
{
	int rc;
	int timeout;
	struct pollfd fds;
	static int64_t last_publish = 0;

	aws_client_setup();

	rc = aws_client_try_connect();
	if (rc != 0) {
		goto cleanup;
	}

	fds.fd = client_ctx.transport.tcp.sock;
	fds.events = POLLIN;

	for (;;) {
		timeout = mqtt_keepalive_time_left(&client_ctx);
		rc = poll(&fds, 1u, timeout);
		if (rc >= 0) {
			if (fds.revents & POLLIN) {
				rc = mqtt_input(&client_ctx);
				if (rc != 0) {
					LOG_ERR("Failed to read MQTT input: %d", rc);
					break;
				}
			}

			if (fds.revents & (POLLHUP | POLLERR)) {
				LOG_ERR("Socket closed/error");
				break;
			}

			rc = mqtt_live(&client_ctx);
			if ((rc != 0) && (rc != -EAGAIN)) {
				LOG_ERR("Failed to live MQTT: %d", rc);
				break;
			}
		} else {
			LOG_ERR("poll failed: %d", rc);
			break;
		}

		if (do_publish) {
			do_publish = false;
			publish();
			last_publish = k_uptime_get();
		}

		if (do_subscribe) {
			do_subscribe = false;
			subscribe_topic();
		}

		if (!do_publish && !do_subscribe) {
			int64_t now = k_uptime_get();
			if (now - last_publish >= 1000) {
				do_publish = true;
			}
		}
	}

cleanup:
	mqtt_disconnect(&client_ctx);

	close(fds.fd);
	fds.fd = -1;
}

static int resolve_broker_addr(struct sockaddr_in *broker)
{
	int ret;
	struct addrinfo *ai = NULL;

	const struct addrinfo hints = {
		.ai_family = AF_INET,
		.ai_socktype = SOCK_STREAM,
		.ai_protocol = 0,
	};
	char port_string[6] = {0};

	sprintf(port_string, "%d", AWS_BROKER_PORT);
	ret = getaddrinfo(CONFIG_AWS_ENDPOINT, port_string, &hints, &ai);
	if (ret == 0) {
		char addr_str[INET_ADDRSTRLEN];

		memcpy(broker, ai->ai_addr, MIN(ai->ai_addrlen, sizeof(struct sockaddr_storage)));

		inet_ntop(AF_INET, &broker->sin_addr, addr_str, sizeof(addr_str));
		LOG_INF("Resolved: %s:%u", addr_str, htons(broker->sin_port));
	} else {
		LOG_ERR("failed to resolve hostname err = %d (errno = %d)", ret, errno);
	}

	freeaddrinfo(ai);

	return ret;
}

static void lte_handler(const struct lte_lc_evt *const evt)
{
    switch (evt->type)
    {
    case LTE_LC_EVT_NW_REG_STATUS:
        if ((evt->nw_reg_status != LTE_LC_NW_REG_REGISTERED_HOME) &&
            (evt->nw_reg_status != LTE_LC_NW_REG_REGISTERED_ROAMING))
        {
            break;
        }
        LOG_INF("Network registration status: %s",
               evt->nw_reg_status == LTE_LC_NW_REG_REGISTERED_HOME ? "Connected - home network" : "Connected - roaming");
        k_sem_give(&lte_connected);
        break;
    case LTE_LC_EVT_RRC_UPDATE:
        LOG_INF("RRC mode: %s", evt->rrc_mode == LTE_LC_RRC_MODE_CONNECTED ? "Connected" : "Idle");
        break;
    default:
        break;
    }
}
static int modem_configure(void)
{
    int err = nrf_modem_lib_init();
    if (err) {
        return err;
    }
    err = lte_lc_connect_async(lte_handler);
    if (err) {
        return err;
    }
    k_sem_take(&lte_connected, K_FOREVER);
    return 0;
}

int main(void)
{
	modem_configure();
#if defined(CONFIG_NET_DHCPV4)
	app_dhcpv4_startup();
#endif

	for (;;) {
		resolve_broker_addr(&aws_broker);

		aws_client_loop();

#if defined(CONFIG_MBEDTLS_MEMORY_DEBUG)
		size_t cur_used, cur_blocks, max_used, max_blocks;

		mbedtls_memory_buffer_alloc_cur_get(&cur_used, &cur_blocks);
		mbedtls_memory_buffer_alloc_max_get(&max_used, &max_blocks);
		LOG_INF("mbedTLS heap usage: MAX %u/%u (%u) CUR %u (%u)", max_used,
			CONFIG_MBEDTLS_HEAP_SIZE, max_blocks, cur_used, cur_blocks);
#endif

		k_sleep(K_SECONDS(1));
	}

	return 0;
}




CONFIG_AWS_IOT_LOG_LEVEL_DBG=y
CONFIG_AWS_TEST_SUITE_DQP=n

CONFIG_MAIN_STACK_SIZE=8192
CONFIG_ENTROPY_GENERATOR=y
CONFIG_TEST_RANDOM_GENERATOR=y
CONFIG_INIT_STACKS=y
CONFIG_HW_STACK_PROTECTION=y
CONFIG_REQUIRES_FULL_LIBC=y
CONFIG_SNTP=y
CONFIG_JSON_LIBRARY=y
CONFIG_POSIX_API=y

# DNS
CONFIG_DNS_RESOLVER=y
CONFIG_DNS_RESOLVER_ADDITIONAL_BUF_CTR=2
CONFIG_DNS_RESOLVER_MAX_SERVERS=1
CONFIG_DNS_SERVER_IP_ADDRESSES=y
CONFIG_DNS_SERVER1="8.8.8.8"
CONFIG_NET_SOCKETS_DNS_TIMEOUT=5000
CONFIG_DNS_RESOLVER_LOG_LEVEL_DBG=n

# Generic networking options
CONFIG_NETWORKING=y
CONFIG_NET_UDP=y
CONFIG_NET_TCP=y
CONFIG_NET_IPV6=y
CONFIG_NET_IPV4=y
CONFIG_NET_SOCKETS=y
CONFIG_NET_SOCKETS_SOCKOPT_TLS=y

# Logging
CONFIG_LOG=y

# Network buffers
CONFIG_NET_PKT_RX_COUNT=32
CONFIG_NET_PKT_TX_COUNT=16
CONFIG_NET_BUF_RX_COUNT=64
CONFIG_NET_BUF_TX_COUNT=32

# MQTT
CONFIG_MQTT_LIB=y
CONFIG_MQTT_LIB_TLS=y
CONFIG_MQTT_KEEPALIVE=600
CONFIG_MQTT_LIB_TLS_USE_ALPN=y

# TLS
CONFIG_NRF_SECURITY=y
CONFIG_MBEDTLS_TLS_LIBRARY=y
CONFIG_MBEDTLS_LEGACY_CRYPTO_C=y
CONFIG_MBEDTLS_ENABLE_HEAP=y
CONFIG_MBEDTLS_HEAP_SIZE=10240
CONFIG_MBEDTLS_SSL_MAX_CONTENT_LEN=10240
CONFIG_MBEDTLS_PEM_CERTIFICATE_FORMAT=y
CONFIG_MBEDTLS_SERVER_NAME_INDICATION=y
CONFIG_MBEDTLS_AES_ROM_TABLES=y
CONFIG_MBEDTLS_TLS_VERSION_1_2=y
CONFIG_MBEDTLS_MEMORY_DEBUG=y
CONFIG_MBEDTLS_HAVE_TIME_DATE=y
CONFIG_MBEDTLS_SSL_ALPN=y

CONFIG_MBEDTLS_SSL_CLI_C=y
CONFIG_MBEDTLS_X509_CRT_PARSE_C=y
CONFIG_MBEDTLS_KEY_EXCHANGE_ECDHE_ECDSA_ENABLED=y
CONFIG_MBEDTLS_KEY_EXCHANGE_ECDH_ECDSA_ENABLED=y
CONFIG_MBEDTLS_KEY_EXCHANGE_PSK_ENABLED=y
CONFIG_MBEDTLS_KEY_EXCHANGE_ECDHE_PSK_ENABLED=y

CONFIG_MBEDTLS_SSL_SRV_C=y
CONFIG_MBEDTLS_SSL_CLI_C=y
CONFIG_MBEDTLS_KEY_EXCHANGE_RSA_ENABLED=y

CONFIG_HEAP_MEM_POOL_SIZE=8192

CONFIG_MBEDTLS_CIPHER=y
CONFIG_MBEDTLS_MD=y

# Required mbedTLS dependencies
CONFIG_MBEDTLS_PK_C=y
CONFIG_MBEDTLS_PK_PARSE_C=y
CONFIG_MBEDTLS_PK_WRITE_C=y
CONFIG_MBEDTLS_RSA_C=y
CONFIG_MBEDTLS_PKCS1_V15=y
CONFIG_MBEDTLS_ECP_C=y
CONFIG_MBEDTLS_ECDSA_C=y
CONFIG_MBEDTLS_ECDH_C=y
CONFIG_MBEDTLS_DHM_C=y
CONFIG_MBEDTLS_GCM_C=y
CONFIG_MBEDTLS_SHA256_C=y
CONFIG_MBEDTLS_X509_USE_C=y
CONFIG_MBEDTLS_X509_CRT_PARSE_C=y

CONFIG_MBEDTLS_RSA_C=y
CONFIG_MBEDTLS_DHM_C=y

CONFIG_MBEDTLS_KEY_EXCHANGE_ECDHE_ECDSA_ENABLED=y
CONFIG_MBEDTLS_KEY_EXCHANGE_ECDH_ECDSA_ENABLED=y
CONFIG_MBEDTLS_KEY_EXCHANGE_PSK_ENABLED=y
CONFIG_MBEDTLS_KEY_EXCHANGE_ECDHE_PSK_ENABLED=y
CONFIG_MBEDTLS_SSL_CONTEXT_SERIALIZATION=y
CONFIG_MBEDTLS_SSL_PROTO_TLS1_2=y



CONFIG_CONSOLE=n #for wifi uart keep this n
CONFIG_USE_SEGGER_RTT=y
CONFIG_RTT_CONSOLE=y
CONFIG_LOG_BACKEND_RTT=y
CONFIG_LOG_BACKEND_UART=n
CONFIG_UART_CONSOLE=n

CONFIG_NRF_MODEM_LIB=y
CONFIG_LTE_LINK_CONTROL=y


# Let the modem handle sockets, DNS, and SNTP
CONFIG_NET_NATIVE=n
CONFIG_NET_SOCKETS_OFFLOAD=y

CONFIG_SNTP_LOG_LEVEL_DBG=y

in nrf 9151 and sdk 2.9.0

we use cellular with aws

but here when we publish 10KB payload it take 6 sec and 6 byte payload take 1 sec so what we need to change for fast transmission

log:

00> [00:01:14.933,380] <inf> aws: RRC mode: Connected
00> [00:01:24.036,407] <inf> aws: Network registration status: Connected - roaming
00> [00:01:24.459,625] <inf> aws: Resolved: 3.111.212.253:8883
00> [00:01:32.697,906] <dbg> aws: mqtt_event_cb: MQTT event: CONNACK [0] result: 0
00> [00:01:32.697,967] <inf> aws: Subscribing to 1 topic(s)
00> [00:01:33.297,943] <dbg> aws: mqtt_event_cb: MQTT event: SUBACK [7] result: 0
00> [00:01:39.516,448] <inf> aws: PUBLISHED on topic "zephyr_sample/data" [ id: 1 qos: 0 ], payload: 9319 B
00> [00:01:45.363,739] <inf> aws: PUBLISHED on topic "zephyr_sample/data" [ id: 1 qos: 0 ], payload: 9319 B
00> [00:01:52.676,635] <inf> aws: PUBLISHED on topic "zephyr_sample/data" [ id: 1 qos: 0 ], payload: 9319 B
00> [00:02:24.236,785] <inf> aws: RRC mode: Idle

so what will do for fast transmission of 10 KB payload



Parents
  • Hi

    Can you confirm you're connected to an LTE network? NB-IoT would not have high enough data rates to allow this. It also depends on your coverage, but if you are connected to an LTE network and have a good connection, you should be able to achieve up to 70kB/s. 

    You can try to test throughput with iperf3 in our modem shell sample to see what throughput you can expect with your connection.

    Best regards,

    Simon

  • Hi Simon,

    Thanks for the clarification.

    We are using the nRF9151 with SDK 2.9.0.
    Our goal is to publish a 10 KB payload every second using LTE-M (or NB-IoT if possible).
    However, with our current implementation, even a small payload takes around 4–10 seconds per publish.

    Could you please guide us on what we should check or modify?
    We can share our code below if needed.

    /**
     * cel_aws.c
     * Cellular AWS IoT Driver
     */
    
    #include <zephyr/logging/log.h>
    #include <zephyr/net/socket.h>
    #include <zephyr/net/dns_resolve.h>
    #include <zephyr/net/mqtt.h>
    #include <zephyr/net/tls_credentials.h>
    #include <zephyr/random/random.h>
    #include <modem/lte_lc.h>
    #include <modem/nrf_modem_lib.h>
    #include <string.h>
    #include <stdio.h>
    
    LOG_MODULE_REGISTER(cel_aws, LOG_LEVEL_DBG);
    
    /* Configurable constants */
    #define SNTP_SERVER "pool.ntp.org"
    #define AWS_BROKER_PORT CONFIG_AWS_MQTT_PORT
    
    #define MQTT_RX_BUF_SIZE (512)
    #define MQTT_TX_BUF_SIZE (512)
    #define APP_BUF_SIZE 256
    
    #define MAX_RETRIES 10
    #define BACKOFF_BASE_MS 1000
    #define BACKOFF_MAX_MS 60000
    #define BACKOFF_CONST_MS 5000
    
    /* TLS */
    static const sec_tag_t sec_tls_tags[] = {212};
    #if (CONFIG_AWS_MQTT_PORT == 443 && !defined(CONFIG_MQTT_LIB_WEBSOCKET))
    static const char *const alpn_list[] = {"x-amzn-mqtt-ca"};
    #endif
    
    /* Static variables */
    struct mqtt_client client_ctx;
    static uint8_t rx_buffer[MQTT_RX_BUF_SIZE];
    static uint8_t tx_buffer[MQTT_TX_BUF_SIZE];
    static uint8_t app_buffer[APP_BUF_SIZE];
    static struct sockaddr_in broker_addr;
    
    // static cel_aws_rx_cb_t user_rx_cb = NULL;
    static K_SEM_DEFINE(lte_ready, 0, 1);
    static int mqtt_sock = -1;
    
    /* Forward declarations */
    static void lte_event_handler(const struct lte_lc_evt *evt);
    static void mqtt_event_handler(struct mqtt_client *client, const struct mqtt_evt *evt);
    static int resolve_broker(void);
    static int backoff_wait(uint32_t attempt);
    
    /* === LTE Functions === */
    
    int cel_lte_connect(void)
    {
        int err;
    
        err = nrf_modem_lib_init();
        if (err) {
            LOG_ERR("Modem lib init failed: %d", err);
            return err;
        }
    
        err = lte_lc_system_mode_set(LTE_LC_SYSTEM_MODE_NBIOT, LTE_LC_SYSTEM_MODE_PREFER_AUTO);
        if (err) {
            LOG_ERR("Failed to set NB-IoT mode: %d", err);
            return err;
        }
    
        err = lte_lc_connect_async(lte_event_handler);
        if (err) {
            LOG_ERR("LTE connect async failed: %d", err);
            return err;
        }
    
        LOG_INF("Waiting for LTE connection...");
        k_sem_take(&lte_ready, K_FOREVER);
        LOG_INF("LTE Connected");
    
        return 0;
    }
    
    void cel_lte_disconnect(void)
    {
        lte_lc_power_off();
        nrf_modem_lib_shutdown();
        LOG_INF("LTE disconnected and modem powered off");
    }
    
    static void lte_event_handler(const struct lte_lc_evt *evt)
    {
        switch (evt->type) {
        case LTE_LC_EVT_NW_REG_STATUS:
            if (evt->nw_reg_status == LTE_LC_NW_REG_REGISTERED_HOME ||
                evt->nw_reg_status == LTE_LC_NW_REG_REGISTERED_ROAMING) {
                LOG_INF("LTE Registered: %s",
                        evt->nw_reg_status == LTE_LC_NW_REG_REGISTERED_HOME ? "Home" : "Roaming");
                k_sem_give(&lte_ready);
            }
            break;
        case LTE_LC_EVT_RRC_UPDATE:
            LOG_INF("RRC: %s", evt->rrc_mode == LTE_LC_RRC_MODE_CONNECTED ? "Connected" : "Idle");
            break;
        default:
            break;
        }
    }
    
    /* === AWS MQTT Functions === */
    
    static int resolve_broker(void)
    {
        struct addrinfo *res;
        struct addrinfo hints = {
            .ai_family = AF_INET,
            .ai_socktype = SOCK_STREAM,
        };
        char port_str[6];
        snprintf(port_str, sizeof(port_str), "%d", AWS_BROKER_PORT);
    
        int ret = getaddrinfo(CONFIG_AWS_ENDPOINT, port_str, &hints, &res);
        if (ret != 0) {
            LOG_ERR("DNS resolve failed: %d", ret);
            return -EINVAL;
        }
    
        memcpy(&broker_addr, res->ai_addr, sizeof(struct sockaddr_in));
        freeaddrinfo(res);
    
        char ip_str[INET_ADDRSTRLEN];
        inet_ntop(AF_INET, &broker_addr.sin_addr, ip_str, sizeof(ip_str));
        LOG_INF("Broker resolved: %s:%d", ip_str, ntohs(broker_addr.sin_port));
    
        return 0;
    }
    
    static void cel_aws_client_init(void)
    {
        mqtt_client_init(&client_ctx);
    
        client_ctx.broker = &broker_addr;
        client_ctx.evt_cb = mqtt_event_handler;
        client_ctx.client_id.utf8 = (uint8_t *)CONFIG_AWS_THING_NAME;
        client_ctx.client_id.size = strlen(CONFIG_AWS_THING_NAME);
        client_ctx.password = NULL;
        client_ctx.user_name = NULL;
        client_ctx.protocol_version = MQTT_VERSION_3_1_1;
        client_ctx.rx_buf = rx_buffer;
        client_ctx.rx_buf_size = sizeof(rx_buffer);
        client_ctx.tx_buf = tx_buffer;
        client_ctx.tx_buf_size = sizeof(tx_buffer);
        client_ctx.keepalive = CONFIG_MQTT_KEEPALIVE;
    
        /* TLS Config */
        client_ctx.transport.type = MQTT_TRANSPORT_SECURE;
        struct mqtt_sec_config *tls = &client_ctx.transport.tls.config;
        tls->peer_verify = TLS_PEER_VERIFY_REQUIRED;
        tls->sec_tag_list = sec_tls_tags;
        tls->sec_tag_count = ARRAY_SIZE(sec_tls_tags);
        tls->hostname = CONFIG_AWS_ENDPOINT;
    #if (CONFIG_AWS_MQTT_PORT == 443 && !defined(CONFIG_MQTT_LIB_WEBSOCKET))
        tls->alpn_protocol_name_list = alpn_list;
        tls->alpn_protocol_name_count = ARRAY_SIZE(alpn_list);
    #endif
    }
    
    int cel_aws_connect(void)
    {
        int ret;
    
        ret = resolve_broker();
        if (ret != 0) return ret;
    
        cel_aws_client_init();
    
        for (int i = 0; i <= MAX_RETRIES; i++) {
            ret = mqtt_connect(&client_ctx);
            if (ret == 0) {
                mqtt_sock = client_ctx.transport.tls.sock;
                LOG_INF("AWS MQTT Connected");
                return 0;
            }
            int delay = backoff_wait(i);
            LOG_ERR("MQTT connect failed: %d, retry in %d ms", ret, delay);
            k_msleep(delay);
        }
    
        return ret;
    }
    
    void cel_aws_disconnect(void)
    {
        if (mqtt_sock >= 0) {
            mqtt_disconnect(&client_ctx);
            close(mqtt_sock);
            mqtt_sock = -1;
            LOG_INF("AWS MQTT Disconnected");
        }
    }
    
    int cel_aws_subscribe(const char *topic, enum mqtt_qos qos)
    {
        struct mqtt_topic t = {
            .topic.utf8 = (uint8_t *)topic,
            .topic.size = strlen(topic),
            .qos = qos
        };
        struct mqtt_subscription_list list = {
            .list = &t,
            .list_count = 1,
            .message_id = (uint16_t)sys_rand32_get()
        };
    
        int ret = mqtt_subscribe(&client_ctx, &list);
        if (ret == 0) {
            LOG_INF("Subscribed to: %s", topic);
        } else {
            LOG_ERR("Subscribe failed: %d", ret);
        }
        return ret;
    }
    
    int cel_aws_publish(const char *topic, const uint8_t *payload, size_t payload_len, enum mqtt_qos qos)
    {
        struct mqtt_publish_param param = {
            .message.topic.topic.utf8 = (uint8_t *)topic,
            .message.topic.topic.size = strlen(topic),
            .message.topic.qos = qos,
            .message.payload.data = (uint8_t *)payload,
            .message.payload.len = payload_len,
            .message_id = (uint16_t)sys_rand32_get(),
            .retain_flag = 0
        };
    
        int ret = mqtt_publish(&client_ctx, &param);
        if (ret == 0) {
            LOG_INF("Published %u bytes to %s", payload_len, topic);
        } else {
            LOG_ERR("Publish failed: %d", ret);
        }
        return ret;
    }
    
    int cel_aws_process(int timeout_ms)
    {
        struct pollfd fds = { .fd = mqtt_sock, .events = POLLIN };
        int ret = poll(&fds, 1, timeout_ms);
    
        if (ret > 0 && (fds.revents & POLLIN)) {
            if (mqtt_input(&client_ctx) != 0) return -1;
        }
    
        if (fds.revents & (POLLHUP | POLLERR)) {
            LOG_ERR("MQTT socket error");
            return -1;
        }
    
        if (mqtt_live(&client_ctx) != 0 && mqtt_live(&client_ctx) != -EAGAIN) {
            return -1;
        }
    
        return 0;
    }
    
    
    /* === Event Handlers === */
    
    static void mqtt_event_handler(struct mqtt_client *client, const struct mqtt_evt *evt)
    {
        switch (evt->type) {
        case MQTT_EVT_PUBLISH: {
            const struct mqtt_publish_param *p = &evt->param.publish;
            size_t received = 0;
    
            LOG_INF("RX on %.*s [%u bytes]", p->message.topic.topic.size,
                    p->message.topic.topic.utf8, p->message.payload.len);
    
            while (received < p->message.payload.len) {
                size_t to_read = MIN(p->message.payload.len - received, sizeof(app_buffer));
                int len = mqtt_read_publish_payload_blocking(&client_ctx, app_buffer, to_read);
                if (len <= 0) break;
                received += len;
    
                // if (user_rx_cb) {
                //     user_rx_cb((const char *)p->message.topic.topic.utf8, app_buffer, len);
                // }
            }
    
            if (p->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
                struct mqtt_puback_param ack = { .message_id = p->message_id };
                mqtt_publish_qos1_ack(&client_ctx, &ack);
            }
            break;
        }
        case MQTT_EVT_CONNACK:
            LOG_INF("MQTT CONNACK received");
            break;
        case MQTT_EVT_DISCONNECT:
            LOG_INF("MQTT Disconnected");
            break;
        default:
            break;
        }
    }
    
    static int backoff_wait(uint32_t attempt)
    {
    #ifdef CONFIG_AWS_EXPONENTIAL_BACKOFF
        uint32_t max_backoff = BACKOFF_BASE_MS << attempt;
        if (max_backoff > BACKOFF_MAX_MS) max_backoff = BACKOFF_MAX_MS;
        uint32_t delay = sys_rand32_get() % (max_backoff + 1);
        return delay > 0 ? delay : BACKOFF_BASE_MS;
    #else
        return BACKOFF_CONST_MS;
    #endif
    }
    
    
    // Callback for incoming messages
    void on_mqtt_rx(const char *topic, const uint8_t *data, size_t len)
    {
        char buf[APP_BUF_SIZE + 1] = {0};
        size_t copy = MIN(len, APP_BUF_SIZE);
        memcpy(buf, data, copy);
        LOG_INF("Received on %s: %.*s", topic, copy, buf);
    }
    
    void main(void)
    {
        LOG_INF("Starting Cellular AWS Application");
    
        // 1. Connect LTE
        if (cel_lte_connect() != 0) {
            LOG_ERR("LTE failed");
            return;
        }
    
        // 2. Connect to AWS
        if (cel_aws_connect() != 0) {
            LOG_ERR("AWS connect failed");
            cel_lte_disconnect();
            return;
        }
    
        // 4. Subscribe
        cel_aws_subscribe(CONFIG_AWS_SUBSCRIBE_TOPIC, MQTT_QOS_1_AT_LEAST_ONCE);
    
        // 5. Publish 5 messages
        for (int i = 0; i < 5; i++) {
            char msg[64];
            snprintf(msg, sizeof(msg), "%s [%d]", payload, i + 1);
            cel_aws_publish(CONFIG_AWS_PUBLISH_TOPIC, (uint8_t *)msg, strlen(msg), MQTT_QOS_1_AT_LEAST_ONCE);
    
            // Process MQTT events
            int timeout = mqtt_keepalive_time_left(&client_ctx);
            cel_aws_process(timeout > 0 ? timeout : 1000);
    
            k_sleep(K_SECONDS(2));
        }
    
        // 6. Cleanup
        cel_aws_disconnect();
        cel_lte_disconnect();
    
        LOG_INF("Application done");
    }


    CONFIG_NET_SOCKETS_SOCKOPT_TLS=y
    CONFIG_NET_UDP=y
    CONFIG_NET_TCP=y
    CONFIG_NET_IPV4=y
    # CONFIG_NET_IPV6=y
    
    # DNS
    CONFIG_DNS_RESOLVER=y
    CONFIG_DNS_RESOLVER_ADDITIONAL_BUF_CTR=2
    CONFIG_DNS_RESOLVER_MAX_SERVERS=1
    CONFIG_DNS_SERVER_IP_ADDRESSES=y
    CONFIG_DNS_SERVER1="8.8.8.8"
    CONFIG_NET_SOCKETS_DNS_TIMEOUT=5000
    
    CONFIG_JSON_LIBRARY=y
    
    # AWS IoT MQTT
    CONFIG_AWS_IOT_LOG_LEVEL_DBG=y
    CONFIG_AWS_TEST_SUITE_DQP=n
    CONFIG_MQTT_LIB=y
    CONFIG_MQTT_LIB_TLS=y
    CONFIG_MQTT_KEEPALIVE=600
    CONFIG_MQTT_LIB_TLS_USE_ALPN=y
    
    # TLS (nRF Security only)
    CONFIG_NRF_SECURITY=y
    CONFIG_MBEDTLS_TLS_LIBRARY=y
    CONFIG_MBEDTLS_LEGACY_CRYPTO_C=y
    CONFIG_MBEDTLS_ENABLE_HEAP=y
    CONFIG_MBEDTLS_HEAP_SIZE=4096
    CONFIG_MBEDTLS_SSL_MAX_CONTENT_LEN=8192
    CONFIG_MBEDTLS_PEM_CERTIFICATE_FORMAT=y
    CONFIG_MBEDTLS_SERVER_NAME_INDICATION=y
    CONFIG_MBEDTLS_AES_ROM_TABLES=y
    CONFIG_MBEDTLS_TLS_VERSION_1_2=y
    CONFIG_MBEDTLS_MEMORY_DEBUG=y
    CONFIG_MBEDTLS_HAVE_TIME_DATE=y
    CONFIG_MBEDTLS_SSL_ALPN=y
    CONFIG_MBEDTLS_SSL_CLI_C=y
    CONFIG_MBEDTLS_X509_CRT_PARSE_C=y
    CONFIG_MBEDTLS_KEY_EXCHANGE_ECDHE_ECDSA_ENABLED=y
    CONFIG_MBEDTLS_KEY_EXCHANGE_ECDH_ECDSA_ENABLED=y
    CONFIG_MBEDTLS_KEY_EXCHANGE_PSK_ENABLED=y
    CONFIG_MBEDTLS_KEY_EXCHANGE_ECDHE_PSK_ENABLED=y
    CONFIG_MBEDTLS_SSL_SRV_C=y
    CONFIG_MBEDTLS_KEY_EXCHANGE_RSA_ENABLED=y
    CONFIG_MBEDTLS_CIPHER=y
    CONFIG_MBEDTLS_MD=y
    CONFIG_MBEDTLS_PK_C=y
    CONFIG_MBEDTLS_PK_PARSE_C=y
    CONFIG_MBEDTLS_PK_WRITE_C=y
    CONFIG_MBEDTLS_RSA_C=y
    CONFIG_MBEDTLS_PKCS1_V15=y
    CONFIG_MBEDTLS_ECP_C=y
    CONFIG_MBEDTLS_ECDSA_C=y
    CONFIG_MBEDTLS_ECDH_C=y
    CONFIG_MBEDTLS_DHM_C=y
    CONFIG_MBEDTLS_GCM_C=y
    CONFIG_MBEDTLS_SHA256_C=y
    CONFIG_MBEDTLS_X509_USE_C=y
    CONFIG_MBEDTLS_SSL_CONTEXT_SERIALIZATION=y
    CONFIG_MBEDTLS_SSL_PROTO_TLS1_2=y
    
    # LTE/Modem
    CONFIG_NRF_MODEM_LIB=y
    CONFIG_LTE_LINK_CONTROL=y
    
    # Sockets offload for modem (if using LTE for AWS)
    CONFIG_NET_NATIVE=n
    CONFIG_NET_SOCKETS_OFFLOAD=y
    


    Best regards,
    Milan

Reply
  • Hi Simon,

    Thanks for the clarification.

    We are using the nRF9151 with SDK 2.9.0.
    Our goal is to publish a 10 KB payload every second using LTE-M (or NB-IoT if possible).
    However, with our current implementation, even a small payload takes around 4–10 seconds per publish.

    Could you please guide us on what we should check or modify?
    We can share our code below if needed.

    /**
     * cel_aws.c
     * Cellular AWS IoT Driver
     */
    
    #include <zephyr/logging/log.h>
    #include <zephyr/net/socket.h>
    #include <zephyr/net/dns_resolve.h>
    #include <zephyr/net/mqtt.h>
    #include <zephyr/net/tls_credentials.h>
    #include <zephyr/random/random.h>
    #include <modem/lte_lc.h>
    #include <modem/nrf_modem_lib.h>
    #include <string.h>
    #include <stdio.h>
    
    LOG_MODULE_REGISTER(cel_aws, LOG_LEVEL_DBG);
    
    /* Configurable constants */
    #define SNTP_SERVER "pool.ntp.org"
    #define AWS_BROKER_PORT CONFIG_AWS_MQTT_PORT
    
    #define MQTT_RX_BUF_SIZE (512)
    #define MQTT_TX_BUF_SIZE (512)
    #define APP_BUF_SIZE 256
    
    #define MAX_RETRIES 10
    #define BACKOFF_BASE_MS 1000
    #define BACKOFF_MAX_MS 60000
    #define BACKOFF_CONST_MS 5000
    
    /* TLS */
    static const sec_tag_t sec_tls_tags[] = {212};
    #if (CONFIG_AWS_MQTT_PORT == 443 && !defined(CONFIG_MQTT_LIB_WEBSOCKET))
    static const char *const alpn_list[] = {"x-amzn-mqtt-ca"};
    #endif
    
    /* Static variables */
    struct mqtt_client client_ctx;
    static uint8_t rx_buffer[MQTT_RX_BUF_SIZE];
    static uint8_t tx_buffer[MQTT_TX_BUF_SIZE];
    static uint8_t app_buffer[APP_BUF_SIZE];
    static struct sockaddr_in broker_addr;
    
    // static cel_aws_rx_cb_t user_rx_cb = NULL;
    static K_SEM_DEFINE(lte_ready, 0, 1);
    static int mqtt_sock = -1;
    
    /* Forward declarations */
    static void lte_event_handler(const struct lte_lc_evt *evt);
    static void mqtt_event_handler(struct mqtt_client *client, const struct mqtt_evt *evt);
    static int resolve_broker(void);
    static int backoff_wait(uint32_t attempt);
    
    /* === LTE Functions === */
    
    int cel_lte_connect(void)
    {
        int err;
    
        err = nrf_modem_lib_init();
        if (err) {
            LOG_ERR("Modem lib init failed: %d", err);
            return err;
        }
    
        err = lte_lc_system_mode_set(LTE_LC_SYSTEM_MODE_NBIOT, LTE_LC_SYSTEM_MODE_PREFER_AUTO);
        if (err) {
            LOG_ERR("Failed to set NB-IoT mode: %d", err);
            return err;
        }
    
        err = lte_lc_connect_async(lte_event_handler);
        if (err) {
            LOG_ERR("LTE connect async failed: %d", err);
            return err;
        }
    
        LOG_INF("Waiting for LTE connection...");
        k_sem_take(&lte_ready, K_FOREVER);
        LOG_INF("LTE Connected");
    
        return 0;
    }
    
    void cel_lte_disconnect(void)
    {
        lte_lc_power_off();
        nrf_modem_lib_shutdown();
        LOG_INF("LTE disconnected and modem powered off");
    }
    
    static void lte_event_handler(const struct lte_lc_evt *evt)
    {
        switch (evt->type) {
        case LTE_LC_EVT_NW_REG_STATUS:
            if (evt->nw_reg_status == LTE_LC_NW_REG_REGISTERED_HOME ||
                evt->nw_reg_status == LTE_LC_NW_REG_REGISTERED_ROAMING) {
                LOG_INF("LTE Registered: %s",
                        evt->nw_reg_status == LTE_LC_NW_REG_REGISTERED_HOME ? "Home" : "Roaming");
                k_sem_give(&lte_ready);
            }
            break;
        case LTE_LC_EVT_RRC_UPDATE:
            LOG_INF("RRC: %s", evt->rrc_mode == LTE_LC_RRC_MODE_CONNECTED ? "Connected" : "Idle");
            break;
        default:
            break;
        }
    }
    
    /* === AWS MQTT Functions === */
    
    static int resolve_broker(void)
    {
        struct addrinfo *res;
        struct addrinfo hints = {
            .ai_family = AF_INET,
            .ai_socktype = SOCK_STREAM,
        };
        char port_str[6];
        snprintf(port_str, sizeof(port_str), "%d", AWS_BROKER_PORT);
    
        int ret = getaddrinfo(CONFIG_AWS_ENDPOINT, port_str, &hints, &res);
        if (ret != 0) {
            LOG_ERR("DNS resolve failed: %d", ret);
            return -EINVAL;
        }
    
        memcpy(&broker_addr, res->ai_addr, sizeof(struct sockaddr_in));
        freeaddrinfo(res);
    
        char ip_str[INET_ADDRSTRLEN];
        inet_ntop(AF_INET, &broker_addr.sin_addr, ip_str, sizeof(ip_str));
        LOG_INF("Broker resolved: %s:%d", ip_str, ntohs(broker_addr.sin_port));
    
        return 0;
    }
    
    static void cel_aws_client_init(void)
    {
        mqtt_client_init(&client_ctx);
    
        client_ctx.broker = &broker_addr;
        client_ctx.evt_cb = mqtt_event_handler;
        client_ctx.client_id.utf8 = (uint8_t *)CONFIG_AWS_THING_NAME;
        client_ctx.client_id.size = strlen(CONFIG_AWS_THING_NAME);
        client_ctx.password = NULL;
        client_ctx.user_name = NULL;
        client_ctx.protocol_version = MQTT_VERSION_3_1_1;
        client_ctx.rx_buf = rx_buffer;
        client_ctx.rx_buf_size = sizeof(rx_buffer);
        client_ctx.tx_buf = tx_buffer;
        client_ctx.tx_buf_size = sizeof(tx_buffer);
        client_ctx.keepalive = CONFIG_MQTT_KEEPALIVE;
    
        /* TLS Config */
        client_ctx.transport.type = MQTT_TRANSPORT_SECURE;
        struct mqtt_sec_config *tls = &client_ctx.transport.tls.config;
        tls->peer_verify = TLS_PEER_VERIFY_REQUIRED;
        tls->sec_tag_list = sec_tls_tags;
        tls->sec_tag_count = ARRAY_SIZE(sec_tls_tags);
        tls->hostname = CONFIG_AWS_ENDPOINT;
    #if (CONFIG_AWS_MQTT_PORT == 443 && !defined(CONFIG_MQTT_LIB_WEBSOCKET))
        tls->alpn_protocol_name_list = alpn_list;
        tls->alpn_protocol_name_count = ARRAY_SIZE(alpn_list);
    #endif
    }
    
    int cel_aws_connect(void)
    {
        int ret;
    
        ret = resolve_broker();
        if (ret != 0) return ret;
    
        cel_aws_client_init();
    
        for (int i = 0; i <= MAX_RETRIES; i++) {
            ret = mqtt_connect(&client_ctx);
            if (ret == 0) {
                mqtt_sock = client_ctx.transport.tls.sock;
                LOG_INF("AWS MQTT Connected");
                return 0;
            }
            int delay = backoff_wait(i);
            LOG_ERR("MQTT connect failed: %d, retry in %d ms", ret, delay);
            k_msleep(delay);
        }
    
        return ret;
    }
    
    void cel_aws_disconnect(void)
    {
        if (mqtt_sock >= 0) {
            mqtt_disconnect(&client_ctx);
            close(mqtt_sock);
            mqtt_sock = -1;
            LOG_INF("AWS MQTT Disconnected");
        }
    }
    
    int cel_aws_subscribe(const char *topic, enum mqtt_qos qos)
    {
        struct mqtt_topic t = {
            .topic.utf8 = (uint8_t *)topic,
            .topic.size = strlen(topic),
            .qos = qos
        };
        struct mqtt_subscription_list list = {
            .list = &t,
            .list_count = 1,
            .message_id = (uint16_t)sys_rand32_get()
        };
    
        int ret = mqtt_subscribe(&client_ctx, &list);
        if (ret == 0) {
            LOG_INF("Subscribed to: %s", topic);
        } else {
            LOG_ERR("Subscribe failed: %d", ret);
        }
        return ret;
    }
    
    int cel_aws_publish(const char *topic, const uint8_t *payload, size_t payload_len, enum mqtt_qos qos)
    {
        struct mqtt_publish_param param = {
            .message.topic.topic.utf8 = (uint8_t *)topic,
            .message.topic.topic.size = strlen(topic),
            .message.topic.qos = qos,
            .message.payload.data = (uint8_t *)payload,
            .message.payload.len = payload_len,
            .message_id = (uint16_t)sys_rand32_get(),
            .retain_flag = 0
        };
    
        int ret = mqtt_publish(&client_ctx, &param);
        if (ret == 0) {
            LOG_INF("Published %u bytes to %s", payload_len, topic);
        } else {
            LOG_ERR("Publish failed: %d", ret);
        }
        return ret;
    }
    
    int cel_aws_process(int timeout_ms)
    {
        struct pollfd fds = { .fd = mqtt_sock, .events = POLLIN };
        int ret = poll(&fds, 1, timeout_ms);
    
        if (ret > 0 && (fds.revents & POLLIN)) {
            if (mqtt_input(&client_ctx) != 0) return -1;
        }
    
        if (fds.revents & (POLLHUP | POLLERR)) {
            LOG_ERR("MQTT socket error");
            return -1;
        }
    
        if (mqtt_live(&client_ctx) != 0 && mqtt_live(&client_ctx) != -EAGAIN) {
            return -1;
        }
    
        return 0;
    }
    
    
    /* === Event Handlers === */
    
    static void mqtt_event_handler(struct mqtt_client *client, const struct mqtt_evt *evt)
    {
        switch (evt->type) {
        case MQTT_EVT_PUBLISH: {
            const struct mqtt_publish_param *p = &evt->param.publish;
            size_t received = 0;
    
            LOG_INF("RX on %.*s [%u bytes]", p->message.topic.topic.size,
                    p->message.topic.topic.utf8, p->message.payload.len);
    
            while (received < p->message.payload.len) {
                size_t to_read = MIN(p->message.payload.len - received, sizeof(app_buffer));
                int len = mqtt_read_publish_payload_blocking(&client_ctx, app_buffer, to_read);
                if (len <= 0) break;
                received += len;
    
                // if (user_rx_cb) {
                //     user_rx_cb((const char *)p->message.topic.topic.utf8, app_buffer, len);
                // }
            }
    
            if (p->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
                struct mqtt_puback_param ack = { .message_id = p->message_id };
                mqtt_publish_qos1_ack(&client_ctx, &ack);
            }
            break;
        }
        case MQTT_EVT_CONNACK:
            LOG_INF("MQTT CONNACK received");
            break;
        case MQTT_EVT_DISCONNECT:
            LOG_INF("MQTT Disconnected");
            break;
        default:
            break;
        }
    }
    
    static int backoff_wait(uint32_t attempt)
    {
    #ifdef CONFIG_AWS_EXPONENTIAL_BACKOFF
        uint32_t max_backoff = BACKOFF_BASE_MS << attempt;
        if (max_backoff > BACKOFF_MAX_MS) max_backoff = BACKOFF_MAX_MS;
        uint32_t delay = sys_rand32_get() % (max_backoff + 1);
        return delay > 0 ? delay : BACKOFF_BASE_MS;
    #else
        return BACKOFF_CONST_MS;
    #endif
    }
    
    
    // Callback for incoming messages
    void on_mqtt_rx(const char *topic, const uint8_t *data, size_t len)
    {
        char buf[APP_BUF_SIZE + 1] = {0};
        size_t copy = MIN(len, APP_BUF_SIZE);
        memcpy(buf, data, copy);
        LOG_INF("Received on %s: %.*s", topic, copy, buf);
    }
    
    void main(void)
    {
        LOG_INF("Starting Cellular AWS Application");
    
        // 1. Connect LTE
        if (cel_lte_connect() != 0) {
            LOG_ERR("LTE failed");
            return;
        }
    
        // 2. Connect to AWS
        if (cel_aws_connect() != 0) {
            LOG_ERR("AWS connect failed");
            cel_lte_disconnect();
            return;
        }
    
        // 4. Subscribe
        cel_aws_subscribe(CONFIG_AWS_SUBSCRIBE_TOPIC, MQTT_QOS_1_AT_LEAST_ONCE);
    
        // 5. Publish 5 messages
        for (int i = 0; i < 5; i++) {
            char msg[64];
            snprintf(msg, sizeof(msg), "%s [%d]", payload, i + 1);
            cel_aws_publish(CONFIG_AWS_PUBLISH_TOPIC, (uint8_t *)msg, strlen(msg), MQTT_QOS_1_AT_LEAST_ONCE);
    
            // Process MQTT events
            int timeout = mqtt_keepalive_time_left(&client_ctx);
            cel_aws_process(timeout > 0 ? timeout : 1000);
    
            k_sleep(K_SECONDS(2));
        }
    
        // 6. Cleanup
        cel_aws_disconnect();
        cel_lte_disconnect();
    
        LOG_INF("Application done");
    }


    CONFIG_NET_SOCKETS_SOCKOPT_TLS=y
    CONFIG_NET_UDP=y
    CONFIG_NET_TCP=y
    CONFIG_NET_IPV4=y
    # CONFIG_NET_IPV6=y
    
    # DNS
    CONFIG_DNS_RESOLVER=y
    CONFIG_DNS_RESOLVER_ADDITIONAL_BUF_CTR=2
    CONFIG_DNS_RESOLVER_MAX_SERVERS=1
    CONFIG_DNS_SERVER_IP_ADDRESSES=y
    CONFIG_DNS_SERVER1="8.8.8.8"
    CONFIG_NET_SOCKETS_DNS_TIMEOUT=5000
    
    CONFIG_JSON_LIBRARY=y
    
    # AWS IoT MQTT
    CONFIG_AWS_IOT_LOG_LEVEL_DBG=y
    CONFIG_AWS_TEST_SUITE_DQP=n
    CONFIG_MQTT_LIB=y
    CONFIG_MQTT_LIB_TLS=y
    CONFIG_MQTT_KEEPALIVE=600
    CONFIG_MQTT_LIB_TLS_USE_ALPN=y
    
    # TLS (nRF Security only)
    CONFIG_NRF_SECURITY=y
    CONFIG_MBEDTLS_TLS_LIBRARY=y
    CONFIG_MBEDTLS_LEGACY_CRYPTO_C=y
    CONFIG_MBEDTLS_ENABLE_HEAP=y
    CONFIG_MBEDTLS_HEAP_SIZE=4096
    CONFIG_MBEDTLS_SSL_MAX_CONTENT_LEN=8192
    CONFIG_MBEDTLS_PEM_CERTIFICATE_FORMAT=y
    CONFIG_MBEDTLS_SERVER_NAME_INDICATION=y
    CONFIG_MBEDTLS_AES_ROM_TABLES=y
    CONFIG_MBEDTLS_TLS_VERSION_1_2=y
    CONFIG_MBEDTLS_MEMORY_DEBUG=y
    CONFIG_MBEDTLS_HAVE_TIME_DATE=y
    CONFIG_MBEDTLS_SSL_ALPN=y
    CONFIG_MBEDTLS_SSL_CLI_C=y
    CONFIG_MBEDTLS_X509_CRT_PARSE_C=y
    CONFIG_MBEDTLS_KEY_EXCHANGE_ECDHE_ECDSA_ENABLED=y
    CONFIG_MBEDTLS_KEY_EXCHANGE_ECDH_ECDSA_ENABLED=y
    CONFIG_MBEDTLS_KEY_EXCHANGE_PSK_ENABLED=y
    CONFIG_MBEDTLS_KEY_EXCHANGE_ECDHE_PSK_ENABLED=y
    CONFIG_MBEDTLS_SSL_SRV_C=y
    CONFIG_MBEDTLS_KEY_EXCHANGE_RSA_ENABLED=y
    CONFIG_MBEDTLS_CIPHER=y
    CONFIG_MBEDTLS_MD=y
    CONFIG_MBEDTLS_PK_C=y
    CONFIG_MBEDTLS_PK_PARSE_C=y
    CONFIG_MBEDTLS_PK_WRITE_C=y
    CONFIG_MBEDTLS_RSA_C=y
    CONFIG_MBEDTLS_PKCS1_V15=y
    CONFIG_MBEDTLS_ECP_C=y
    CONFIG_MBEDTLS_ECDSA_C=y
    CONFIG_MBEDTLS_ECDH_C=y
    CONFIG_MBEDTLS_DHM_C=y
    CONFIG_MBEDTLS_GCM_C=y
    CONFIG_MBEDTLS_SHA256_C=y
    CONFIG_MBEDTLS_X509_USE_C=y
    CONFIG_MBEDTLS_SSL_CONTEXT_SERIALIZATION=y
    CONFIG_MBEDTLS_SSL_PROTO_TLS1_2=y
    
    # LTE/Modem
    CONFIG_NRF_MODEM_LIB=y
    CONFIG_LTE_LINK_CONTROL=y
    
    # Sockets offload for modem (if using LTE for AWS)
    CONFIG_NET_NATIVE=n
    CONFIG_NET_SOCKETS_OFFLOAD=y
    


    Best regards,
    Milan

Children
No Data
Related