LOG freeze at mqtt

Using a NRF9160DK. SDK2.0.0. Modem 1.3.2.

I have a problem when setting up a new NRF board to run with MQTT with AWS. 

When the setup is running, I get LOG <inf> for modem connect and the broker IP and then when mqtt_connect is run there are no more LOG events. (And the callback function is not activated). I have tried setting in some printk, which prints fine after the mqtt_connect. The printk sees at is get CONACK and SUBACK and that I publish to the MQTT. AWS logs also show that the publish is received and a response is created and published. 

The code is borrowed from mqtt_simple. the mqtt_connect function returns errorcode 0.

The strange thing is I have 1 board running as should.

Parents Reply Children
  • I have written to you for how you want the code.

  • Excuse me? I'm not sure what you mean here.

    It would be easier for me to help you figure out what the issue is if you could please share your code.

    -Einar

  • Do you want the code pasted here or maybe a git link or some other way?

  • Pasted here is fine, I'm most interested in seeing your c files and your prj.conf file

  • It is at aws.c by the 536 the log stops. when mqtt_connect is called.

    The prj.conf file

    # Logging interface
    CONFIG_LOG=y
    
    # Stacks and heaps
    CONFIG_HEAP_MEM_POOL_SIZE=16384 
    CONFIG_MAIN_STACK_SIZE=16384
    CONFIG_SYSTEM_WORKQUEUE_STACK_SIZE=4096
    
    # UART interface
    CONFIG_SERIAL=y
    CONFIG_UART_INTERRUPT_DRIVEN=y
    
    # cJSON interface
    CONFIG_CJSON_LIB=y
    
    # AT host interface
    CONFIG_AT_HOST_LIBRARY=y
    
    # Date time interface
    CONFIG_DATE_TIME=y
    
    # Use GPIO
    CONFIG_GPIO=y
    CONFIG_GPIO_NRFX=y
    CONFIG_NRFX_GPIOTE=y
    
    # MQTT interface
    CONFIG_MQTT_LIB=y
    CONFIG_MQTT_LIB_TLS=y
    CONFIG_MQTT_CLEAN_SESSION=y
    
    # Network interface
    CONFIG_NETWORKING=y
    CONFIG_NET_SOCKETS=y
    CONFIG_NET_NATIVE=n
    
    # Library for the LTE network
    CONFIG_LTE_LINK_CONTROL=y
    CONFIG_LTE_AUTO_INIT_AND_CONNECT=n
    CONFIG_LTE_NETWORK_MODE_LTE_M=y
    CONFIG_MODEM_KEY_MGMT=y
    CONFIG_NRF_MODEM_LIB=y
    
    #Enable reboot from software
    CONFIG_REBOOT=y
    
    # newlibc
    CONFIG_NEWLIB_LIBC=y
    CONFIG_NEWLIB_LIBC_FLOAT_PRINTF=y
    
    # Use PWM
    CONFIG_PWM=y
    CONFIG_PWM_NRFX=y
    
    # use ADC
    CONFIG_ADC=y
    CONFIG_ADC_NRFX_SAADC=y
    
    # USE I2C
    CONFIG_I2C=y
    CONFIG_I2C_NRFX=y
    
    # WatchDog interface
    CONFIG_WATCHDOG=y
    CONFIG_WDT_DISABLE_AT_BOOT=n
    
    # Pin control
    CONFIG_PINCTRL=y
    

    The main file:

    #include <zephyr.h>
    #include <sys/printk.h>
    #include <kernel.h>
    #include <stdio.h>
    
    #include "modem.h"
    #include "scheduler.h"
    #include "https.h"
    #include "cloud.h"
    #include "aws.h"
    #include "watch.h"
    
    /**
     * @brief Setting up the initializing functions.
     */
     void configure_system_on_start_up(void) {
         // Set the modem client id.
        modem_set_client_id();
        
        // Initialize modem module.
        modem_init();
    
        // Initialize the cloud.
        cloud_init(modem_get_client_id());
    
        // Initialize the watch.
        watch_init();
    
        // Set time offset.
        watch_set_offset(1);
    
        // Initialize the scheduler.
        scheduler_init();
    
        // Connect the modem to LTE network.
        modem_connect();
    
        // Connect to AWS.
        aws_connect();
    
        // Synchronize the watch.
        watch_update();
        k_sleep(K_MSEC(500));
    
     }
    
    /**
     * @brief The application main entry.
     */
    void main(void) {
    
        // Initialize system.
        configure_system_on_start_up();
    
        // Enter main loop.
        while(1) {
    
            // Run client.
            /*CLI_RUN();
    
            // Update the outlet current.
            va_update_outlet_current();
    
            // Run through the outlets.
            for(uint8_t i = 0; i <= number_of_outlets; i++) {
    
                // Update state machine.
                state_update(i);
            }
    
            // Feed the dog.
            wdt_feed_dog();*/
        }
    }

    Modem.h

    #ifndef MODEM_H_
    #define MODEM_H_
    
    /**
     * @brief   Initialize the modem.
     */
    void modem_init(void);
    
    /**
     * @brief   Connect the modem to the LTE network.
     */
    void modem_connect(void);
    
    /**
     * @brief   Disconnect the modem from the LTE network.
     */
    void modem_disconnect(void);
    
    /**
     * @brief   Set the client id.
     */
    void modem_set_client_id(void);
    
    /**
     * @brief   Get the client ID.
     *
     * @return  The client ID.
     */
    const uint8_t* modem_get_client_id(void);
    
    #endif /* MODEM_H_ */

    The modem.c file:

    #include <stdio.h>
    #include <string.h>
    
    #include <zephyr.h>
    #include <logging/log.h>
    #include <modem/lte_lc.h>
    #include <modem/modem_key_mgmt.h>
    #include <nrf_modem_at.h>
    
    
    //! Register the modem module to the logging system.
    LOG_MODULE_REGISTER(Modem, 3);
    
    //! The modem connection status flag.
    static bool is_connected;
    
    //! An array holding the client id.
    static uint8_t client_id[20];
    
    void modem_init(void) {
    
        int error;
    
        // Turn off the psm mode.
        error = lte_lc_psm_req(false);
    
        if(error) {
    
            LOG_ERR("Failed to turn off PSM mode, error: %d.", error);
    
            return;
        }
    
        // Turn off the edrx mode.
        error = lte_lc_edrx_req(false);
    
        if(error) {
    
            LOG_ERR("Failed to turn off EDRX mode, error: %d.", error);
    
            return;
        }
    
        // Initialize the LTE network.
        error = lte_lc_init();
    
        if(error) {
    
            LOG_ERR("Failed to initialize the modem, error: %d.", error);
    
            return;
        }
    }
    
    void modem_set_client_id(void) {
    
        // Fetch the IMEI number from the modem.
        char imei_buffer[26];
        modem_write_at_command("AT+CGSN", imei_buffer, sizeof(imei_buffer));
    
        // Add null terminator to the IMEI number.
        imei_buffer[15] = '\0';
    
        // Create the client id.
        snprintf(client_id, sizeof(client_id), "acdc-%.*s", 15, imei_buffer);
    
        LOG_INF("Client ID: %s.", log_strdup(client_id));
    }
    
    const uint8_t* modem_get_client_id(void) {
    
        return client_id;
    }

    aws.h

    #ifndef AWS_H_
    #define AWS_H_
    
    #include <stdint.h>
    #include <stdbool.h>
    
    #include "software_settings.h"
    
    /**
     * @brief   Subscribe data used when packets are published on AWS.
     */
    typedef struct {
    
        bool is_subscribed;             //!< Flag indicating if the topic is subscribed.
        char* topic;                    //!< The subscribe/publish topic.
        char* failure_message;          //!< The payload message when failed on AWS.
        char* success_message;          //!< The payload message when succeeded on AWS.
        char* qos;                      //!< The Quality of Service.
    
    } aws_mqtt_status_t;
    
    //! Event identifiers used by the @ref aws_event_callback_t.
    typedef enum  {
    
        AWS_EVENT_NONE,                 //!< This ID is never used. Dummy value for completeness.
        AWS_EVENT_RESPONSE_READY,       //!< A complete AWS response packet is ready.
    
    } aws_event_t;
    
    /**
     * @brief   AWS publish information.
     */
    typedef struct {
    
        char* topic;                    //!< The topic that is published to.
        uint8_t* data;                  //!< The data to publish.
    
    } aws_publish_info_t;
    
    //! Cloud event callback function type.
    typedef void (*aws_event_callback_t)(const aws_event_t, const uint8_t* const);
    
    /**
     * @brief   Initialize the AWS and MQTT broker.
     *
     * @param   _client_id  The client id.
     * @param   _callback   Function to be called when a aws event is detected.
     */
    void aws_init(const uint8_t* const _client_id, const aws_event_callback_t _callback);
    
    /**
     * @brief   Connect to the AWS MQTT broker.
     */
    void aws_connect(void);
    
    /**
     * @brief   Disconnect from the AWS MQTT broker.
     */
    void aws_disconnect(void);
    
    /**
     * @brief   Verify if the AWS MQTT broker connection is requested.
     *
     * @retval  false   MQTT broker connection is not requested.
     * @retval  true    MQTT broker connection is requested.
     */
    bool aws_is_connection_requested(void);
    
    /**
     * @brief   Run for the AWS MQTT broker events and inputs.
     *          This function shall only be called inside a thread.
     */
    void aws_run(void);
    
    /**
     * @brief   Subscribe to the configured topic.
     *
     * @param   _subscribe_topic    The topic that shall be subscribed to.
     *
     * @retval  0           Success.
     * @retval  Negative    Error.
     */
    int16_t aws_subscribe(const char* const _subscribe_topic);
    
    /**
     * @brief   Publish data on the configured topic.
     *
     * @note    The functionality is handled through a work queue.
     *
     * @param   _topic      The topic that the data shall be published to.
     * @param   _data       The data that shall be published.
     */
    void aws_publish(const char* const _topic, const uint8_t* const _data);
    
    /**
     * @brief   Get the MQTT status.
     *
     * @param   _aws_mqtt_status    The MQTT status.
     */
    void aws_get_status(aws_mqtt_status_t* _aws_mqtt_status);
    
    #endif /* AWS_H_ */
    
    

    aws.c file: (notice defines for broker address and port)

    #include <stdio.h>
    
    #include <zephyr.h>
    #include <logging/log.h>
    #include <net/mqtt.h>
    #include <net/socket.h>
    #include <random/rand32.h>
    
    #include "aws.h"
    #include "watch.h"
    
    //! Register the AWS module to the logging system.
    LOG_MODULE_REGISTER(AWS, 3);
    
    //! The MQTT broker address details structure.
    static struct sockaddr_storage mqtt_broker;
    
    //! The MQTT polling file descriptor structure.
    static struct pollfd mqtt_pollfd;
    
    //! The MQTT client structure.
    static struct mqtt_client client;
    
    //! The MQTT publish mutex structure.
    static struct k_mutex aws_publish_mutex;
    
    //! The MQTT publish worker structure.
    static struct k_work aws_publish_work;
    
    //! The AWS callback function.
    static aws_event_callback_t aws_callback = NULL;
    
    //! The MQTT status when data is published on AWS.
    static aws_mqtt_status_t aws_mqtt_status;
    
    //! The MQTT client RX buffer.
    static uint8_t mqtt_rx_buffer[8192];
    
    //! The MQTT client TX buffer.
    static uint8_t mqtt_tx_buffer[8192];
    
    //! The MQTT payload buffer.
    static uint8_t mqtt_payload_buffer[8192];
    
    //! The MQTT broker connection requested flag.
    static atomic_t is_connect_requested;
    
    //! The AWS publish information.
    static aws_publish_info_t aws_publish_info;
    
    #define BROKER_HOSTNAME "xxxxxx"
    #define MQTT_BROKER_PORT 8883
    
    /**
     * @brief   Publish data to the AWS (Work).
     *
     * @param   work     The k_work structure.
     */
    static void aws_publish_work_handler(struct k_work* work) {
    
        ARG_UNUSED(work);
    
        // Lock the AWS publish mutex and has it succeeded?
        if(0 != k_mutex_lock(&aws_publish_mutex, K_MSEC(1000U))) {
    
            LOG_ERR("AWS publish mutex cannot be locked.");
    
            return;
        }
    
        // The parameters for the published message.
        struct mqtt_publish_param mqtt_param;
        memset(&mqtt_param, 0, sizeof(mqtt_param));
    
        // Fill out the parameters.
        mqtt_param.message.topic.qos = MQTT_QOS_1_AT_LEAST_ONCE;
        mqtt_param.message.topic.topic.utf8 = aws_publish_info.topic;
        mqtt_param.message.topic.topic.size = strlen(aws_publish_info.topic);
        mqtt_param.message.payload.data = aws_publish_info.data;
        mqtt_param.message.payload.len = strlen(aws_publish_info.data);
        mqtt_param.message_id = sys_rand32_get();
        mqtt_param.dup_flag = 0;
        mqtt_param.retain_flag = 0;
    
        LOG_INF("MQTT published to topic: %s, length: %u.", log_strdup(aws_publish_info.topic), strlen(aws_publish_info.data));
    
        // Publish the message to the topic.
        int16_t error = mqtt_publish(&client, &mqtt_param);
    
        if(0 > error) {
    
            LOG_ERR("Publish data failed: %d.", error);
        }
    
        // Unlock the AWS publish mutex.
        (void)k_mutex_unlock(&aws_publish_mutex);
    }
    
    /**
     * @brief   Get the published MQTT payload.
     *
     * @param   _c          The MQTT client.
     * @param   _length     The length of the payload.
     *
     * @retval  0           Success.
     * @retval  Negative    Error.
     */
    static int16_t aws_mqtt_publish_get_payload(struct mqtt_client* const _c, const size_t _length) {
    
        // Is the published payload too long?
        if(sizeof(mqtt_payload_buffer) < _length) {
    
            return -EMSGSIZE;
        }
    
        // Read all the published payload.
        return mqtt_readall_publish_payload(_c, mqtt_payload_buffer, _length);
    }
    
    /**
     * @brief   Handle the MQTT client events (Callback).
     *
     * @param   c       The MQTT client.
     * @param   evt     The MQTT event data.
     */
    static void aws_mqtt_event_handler(struct mqtt_client* const c, const struct mqtt_evt* evt) {
    
        int16_t error;
    
        // Reset the AWS event.
        uint8_t event = AWS_EVENT_NONE;
    
        // Go through the event type.
        switch(evt->type) {
    
            // Acknowledgement of connection request.
            case MQTT_EVT_CONNACK:
    
                // Has an error occurred?
                if(0 != evt->result) {
    
                    LOG_ERR("MQTT connection failed: %d.", evt->result);
    
                    break;
                }
    
                LOG_INF("MQTT client is connected.");
    
                // Subscribe the status topic to AWS.
                error = aws_subscribe(aws_mqtt_status.topic);
    
                // Has an error occurred?
                if(0 > error) {
    
                    LOG_ERR("Subscribe Status failed: %d.", error);
                }
    
                // Or has it succeeded?
                else {
    
                    LOG_INF("Subscribe Status succeeded.");
                }
    
                break;
    
            // Disconnection event.
            case MQTT_EVT_DISCONNECT:
    
                LOG_INF("MQTT client is disconnected.");
    
                atomic_set(&is_connect_requested, 0);
    
                break;
    
            // Publish event received when message is published on a topic client is subscribed to.
            case MQTT_EVT_PUBLISH: {
    
                // Read the parameters for the publish message.
                const struct mqtt_publish_param* p = &evt->param.publish;
    
                LOG_INF("MQTT PUBLISH result = %d, length = %d.", evt->result, p->message.payload.len);
    
                // Is QoS set to MQTT_QOS_1_AT_LEAST_ONCE?
                if(MQTT_QOS_1_AT_LEAST_ONCE == p->message.topic.qos) {
    
                    // Prepare the QoS1 ack message.
                    const struct mqtt_puback_param ack = {
    
                        .message_id = p->message_id
                    };
    
                    // Send the ack message.
                    error = mqtt_publish_qos1_ack(&client, &ack);
    
                    // Has an error occurred?
                    if(0 != error) {
    
                        LOG_ERR("MQTT publish QoS1 ack failed: %d.", error);
                    }
                }
    
                // Is QoS set to MQTT_QOS_2_EXACTLY_ONCE?
                else if(MQTT_QOS_2_EXACTLY_ONCE == p->message.topic.qos) {
    
                    // Prepare the QoS2 receive message.
                    const struct mqtt_pubrec_param receive = {
    
                        .message_id = p->message_id
                    };
    
                    // Send the receive message.
                    error = mqtt_publish_qos2_receive(&client, &receive);
    
                    // Has an error occurred?
                    if(0 != error) {
    
                        LOG_ERR("MQTT publish QoS2 receive failed: %d.", error);
                    }
                }
    
                // Read the published MQTT payload.
                error = aws_mqtt_publish_get_payload(c, p->message.payload.len);
    
                // Has an error occurred?
                if(0 != error) {
    
                    LOG_ERR("MQTT PUBLISH payload failed: %d.", error);
    
                    break;
                }
    
                // Has the payload been read successfully?
                LOG_INF("MQTT PUBLISH topic: %s.", log_strdup(p->message.topic.topic.utf8));
    
                // Has an AWS status topic occurred?
                if(0 == strcmp(p->message.topic.topic.utf8, aws_mqtt_status.topic)) {
    
                    // Set the AWS response ready event.
                    event = AWS_EVENT_RESPONSE_READY;
                }
    
                // Or is the response unknown?
                else {
    
                    LOG_INF("MQTT PUBLISH response is unknown.");
                }
    
                break;
            }
    
            // Acknowledgement for published message with QoS 1.
            case MQTT_EVT_PUBACK:
    
                // Has an error occurred?
                if(0 != evt->result) {
    
                    LOG_ERR("MQTT PUBACK received error: %d.", evt->result);
    
                    break;
                }
    
                LOG_INF("MQTT PUBACK received with id: %u.", evt->param.puback.message_id);
    
                break;
    
            // Reception confirmation for published message with QoS 2.
            case MQTT_EVT_PUBREC: {
    
                // Has an error occurred?
                if(0 != evt->result) {
    
                    LOG_ERR("MQTT QoS2 PUBREC received error: %d.", evt->result);
    
                    break;
                }
    
                // Read the parameters for the pubrec message.
                const struct mqtt_pubrec_param* p = &evt->param.pubrec;
    
                // Prepare the release message.
                const struct mqtt_pubrel_param release = {
    
                    .message_id = p->message_id
                };
    
                // Send the release message.
                error = mqtt_publish_qos2_release(&client, &release);
    
                // Has an error occurred?
                if(0 != error) {
    
                    LOG_ERR("MQTT publish QoS2 release failed: %d.", error);
    
                    break;
                }
    
                LOG_INF("MQTT PUBREC received with id: %u.", p->message_id);
    
                break;
            }
    
            // Release of published message with QoS 2.
            case MQTT_EVT_PUBREL: {
    
                // Has an error occurred?
                if(0 != evt->result) {
    
                    LOG_ERR("MQTT QoS2 PUBREL received error: %d.", evt->result);
    
                    break;
                }
    
                // Read the parameters for the pubrel message.
                const struct mqtt_pubrel_param* p = &evt->param.pubrel;
    
                // Prepare the complete message.
                const struct mqtt_pubcomp_param complete = {
    
                    .message_id = p->message_id
                };
    
                // Send the complete message.
                error = mqtt_publish_qos2_complete(&client, &complete);
    
                // Has an error occurred?
                if(0 != error) {
    
                    LOG_ERR("MQTT publish QoS2 complete failed: %d.", error);
    
                    break;
                }
    
                LOG_INF("MQTT PUBREL received with id: %u.", p->message_id);
    
                break;
            }
    
            // Confirmation to a publish release message with QoS 2.
            case MQTT_EVT_PUBCOMP: {
    
                // Has an error occurred?
                if(0 != evt->result) {
    
                    LOG_ERR("MQTT PUBCOMP received error: %d.", evt->result);
    
                    break;
                }
    
                LOG_INF("MQTT PUBCOMP received with id: %u.", evt->param.pubcomp.message_id);
    
                break;
            }
    
            // Acknowledgement to a subscribe request.
            case MQTT_EVT_SUBACK:
    
                // Has an error occurred?
                if(0 != evt->result) {
    
                    LOG_ERR("MQTT SUBACK received error: %d.", evt->result);
    
                    break;
                }
    
                LOG_INF("MQTT SUBACK received with id: %u.", evt->param.suback.message_id);
    
                aws_mqtt_status.is_subscribed = true;
    
                break;
    
            // Acknowledgment to a unsubscribe request.
            case MQTT_EVT_UNSUBACK:
    
                // Has an error occurred?
                if(0 != evt->result) {
    
                    LOG_ERR("MQTT UNSUBACK received error: %d.", evt->result);
    
                    break;
                }
    
                LOG_INF("MQTT UNSUBACK received with id: %u.", evt->param.unsuback.message_id);
    
                aws_mqtt_status.is_subscribed = false;
    
                break;
    
            // Ping Response from server.
            case MQTT_EVT_PINGRESP:
    
                break;
    
            default:
    
                break;
        }
    
        // Has a AWS callback been set?
        if((AWS_EVENT_NONE != event) && (NULL != aws_callback)) {
    
            // Set the callback function.
            aws_callback(event, mqtt_payload_buffer);
        }
    }
    
    /**
     * @brief   Initialize the MQTT broker structure.
     */
    static void aws_broker_init(void) {
    
        // Fill out the initial address info struct.
        struct addrinfo hints = {
    
            .ai_flags = 0,
            .ai_family = AF_INET,
            .ai_socktype = SOCK_STREAM,
            .ai_protocol = 0,
        };
    
        // The resulting address info struct.
        struct addrinfo* result;
        struct addrinfo* address;
    
        // Convert text strings representing hostnames/IP addresses into a linked list of struct addrinfo structure.
        int error = getaddrinfo(BROKER_HOSTNAME, NULL, &hints, &result);
    
        // Did an error occur?
        if(0 != error) {
    
            LOG_ERR("Getaddrinfo failed, error: %d.", errno);
    
            return;
        }
    
        // Store the resulting address info.
        address = result;
    
        // Look for the IPv4 Address of the broker.
        while(NULL != address) {
    
            // Is it an IPv4 Address?
            if(sizeof(struct sockaddr_in) == address->ai_addrlen) {
    
                // Initialize the broker structure.
                struct sockaddr_in* broker = ((struct sockaddr_in*)&mqtt_broker);
    
                // Fill out the socket IPv4 address for the broker.
                broker->sin_addr.s_addr = ((struct sockaddr_in*)address->ai_addr)->sin_addr.s_addr;
                broker->sin_family = AF_INET;
                broker->sin_port = htons(MQTT_BROKER_PORT);
    
                // An array holding the IP address.
                char ipv4_address[NET_IPV4_ADDR_LEN];
    
                // Convert the IP address from internal to numeric ASCII form.
                inet_ntop(AF_INET, &broker->sin_addr.s_addr, ipv4_address, sizeof(ipv4_address));
    
                LOG_INF("MQTT broker is connected with IP address: %s.", log_strdup(ipv4_address));
    
                break;
            }
    
            // Increment the address pointer.
            address = address->ai_next;
        }
    
        // Free the resulting memory struct.
        freeaddrinfo(result);
    }
    
    void aws_init(const uint8_t* const _client_id, const aws_event_callback_t _callback) {
    
        // Set the callback function.
        aws_callback = _callback;
    
        // Initialize the MQTT client instance.
        mqtt_client_init(&client);
    
        // MQTT client configuration.
        client.broker = &mqtt_broker;
        client.evt_cb = aws_mqtt_event_handler;
        client.client_id.utf8 = _client_id;
        client.client_id.size = strlen(client.client_id.utf8);
        client.password = NULL;
        client.user_name = NULL;
        client.protocol_version = MQTT_VERSION_3_1_1;
    
        // MQTT buffers configuration.
        client.rx_buf = mqtt_rx_buffer;
        client.rx_buf_size = sizeof(mqtt_rx_buffer);
        client.tx_buf = mqtt_tx_buffer;
        client.tx_buf_size = sizeof(mqtt_tx_buffer);
    
        // MQTT transport configuration.
        client.transport.type = MQTT_TRANSPORT_SECURE;
    
        // The security tag.
        static sec_tag_t sec_tag_list[] = { CC_MODEM_CERTIFICATE_SEC_TAG };
    
        // TLS configuration for secure MQTT transports.
        struct mqtt_sec_config* tls_config = &(client.transport).tls.config;
        tls_config->peer_verify = 2;
        tls_config->cipher_count = 0;
        tls_config->cipher_list = NULL;
        tls_config->sec_tag_count = ARRAY_SIZE(sec_tag_list);
        tls_config->sec_tag_list = sec_tag_list;
        tls_config->hostname = BROKER_HOSTNAME;
        tls_config->session_cache = TLS_SESSION_CACHE_DISABLED;
    
        // Fill out the MQTT status topic struct.
        aws_mqtt_status.is_subscribed = false;
        aws_mqtt_status.topic = (uint8_t*)_client_id;
        aws_mqtt_status.failure_message = "FAILURE";
        aws_mqtt_status.success_message = "SUCCESS";
        aws_mqtt_status.qos = "1";
    
        // Initialize a mutex.
        k_mutex_init(&aws_publish_mutex);
    
        // Initialize a work queue.
        k_work_init(&aws_publish_work, aws_publish_work_handler);
    }
    
    void aws_connect(void) {
    
        // Is the connection already requested?
        if(1 == atomic_get(&is_connect_requested)) {
    
            return;
        }
    
        // Initialize the MQTT broker.
        aws_broker_init();
    
        // Connect to the MQTT broker.
        int error = mqtt_connect(&client);
    
        // Has an error occurred?
        if(0 != error) {
    
            LOG_ERR("MQTT connection failed: %d.", error);
    
            return;
        }
    
        // Set the MQTT transport to secure.
        mqtt_pollfd.fd = client.transport.tls.sock;
        mqtt_pollfd.events = POLLIN;
    
        // The connection is now requested.
        atomic_set(&is_connect_requested, 1);
    }
    
    // cppcheck-suppress    unusedFunction
    void aws_disconnect(void) {
    
        // Is the connection not requested?
        if(0 == atomic_get(&is_connect_requested)) {
    
            return;
        }
    
        // Disconnect from the MQTT broker.
        int error = mqtt_disconnect(&client);
    
        // Has an error occurred?
        if(0 != error) {
    
            LOG_ERR("MQTT disconnection failed: %d.", error);
        }
    }
    
    bool aws_is_connection_requested(void) {
    
        return (bool)atomic_get(&is_connect_requested);
    }
    
    void aws_run(void) {
    
        // Wait on an event on the file descriptor.
        int error = poll(&mqtt_pollfd, 1,  mqtt_keepalive_time_left(&client));
    
        // Has an error occurred?
        if(0 > error) {
    
            LOG_ERR("MQTT poll failed: %d.", errno);
    
            return;
        }
    
        // Keep the connection alive by sending a ping request.
        error = mqtt_live(&client);
    
        // Has an error occurred?
        if((0 != error) && (-EAGAIN != error)) {
    
            LOG_ERR("MQTT live failed: %d.", error);
    
            return;
        }
    
        // Is there data to read (returned event)?
        if(POLLIN == (mqtt_pollfd.revents & POLLIN)) {
    
            // Receive an incoming MQTT packet. The registered callback will be called with the packet content.
            error = mqtt_input(&client);
    
            // Has an error occurred?
            if(0 != error) {
    
                LOG_ERR("MQTT input failed: %d.", error);
    
                return;
            }
        }
    
        // Does the returned event contain an error condition?
        if(POLLERR == (mqtt_pollfd.revents & POLLERR)) {
    
            LOG_ERR("MQTT failed with POLLERR.");
    
            return;
        }
    
        // Does the returned event contain an invalid request?
        if(POLLNVAL == (mqtt_pollfd.revents & POLLNVAL)) {
    
            LOG_ERR("MQTT failed with POLLNVAL.");
    
            return;
        }
    }
    
    int16_t aws_subscribe(const char* const _subscribe_topic) {
    
        // Is the topic already subscribed?
        if(true == aws_mqtt_status.is_subscribed) {
    
            return 0;
        }
    
        // Create the subscribed topic.
        struct mqtt_topic topic = {
    
            .topic = {
    
                .utf8 = _subscribe_topic,
                .size = strlen(_subscribe_topic)
            },
    
            .qos = MQTT_QOS_1_AT_LEAST_ONCE
        };
    
        // Add the topic to the subscription request.
        const struct mqtt_subscription_list subscription_list = {
    
            .list = &topic,
            .list_count = 1,
            .message_id = sys_rand32_get()
        };
    
        LOG_INF("MQTT subscribed to topic: %s", log_strdup(_subscribe_topic));
    
        // Subscribe to the topic.
        return mqtt_subscribe(&client, &subscription_list);
    }
    
    void aws_publish(const char* const _topic, const uint8_t* const _data) {
    
        // Lock the AWS publish mutex and has it succeeded?
        if(0 != k_mutex_lock(&aws_publish_mutex, K_MSEC(CC_WORK_MUTEX_LOCK_TIMEOUT_MS))) {
    
            LOG_ERR("AWS publish mutex cannot be locked.");
    
            return;
        }
    
        // Store the AWS publish information.
        aws_publish_info.topic = (char*)_topic;
        aws_publish_info.data = (uint8_t*)_data;
    
        // Unlock the AWS publish mutex.
        (void)k_mutex_unlock(&aws_publish_mutex);
    
        // Submit a work item to the system work queue.
        (void)k_work_submit(&aws_publish_work);
    }
    
    void aws_get_status(aws_mqtt_status_t* _aws_mqtt_status) {
    
        // Get the AWS MQTT status struct.
        memcpy(_aws_mqtt_status, &aws_mqtt_status, sizeof(aws_mqtt_status));
    }

    cloud.h

    #ifndef CLOUD_H_
    #define CLOUD_H_
    
    //! Cloud topic types.
    typedef enum  {
    
        CLOUD_TOPIC_TYPE_GET_PCC_LOAD,                  //!< The "Get PCC load" type.
        CLOUD_TOPIC_NUMBER_OF_TYPES                     //!< The number of types.
    
    } cloud_topic_type_t;
    
    //! Event identifiers used by the @ref cloud_event_callback_t.
    typedef enum  {
    
        CLOUD_EVENT_NONE,                               //!< This ID is never used. Dummy value for completeness.
        CLOUD_EVENT_GET_PCC_LOAD_PACKET_SUCCEEDED,      //!< A complete "Get PCC Load" packet has succeeded.
        CLOUD_EVENT_GET_PCC_LOAD_PACKET_FAILED          //!< A complete "Get PCC load" packet has failed.
    
    } cloud_event_t;
    
    /**
     * @brief   Cloud publish information.
     */
    typedef struct {
    
        char* topic;                                    //!< The topic that is published to.
        uint8_t* data;                                  //!< The data to publish.
    
    } cloud_publish_info_t;
    
    //! Event callback function type.
    typedef void (*cloud_event_callback_t)(const cloud_event_t);
    
    /**
     * @brief   Initialize the cloud.
     *
     * @param   _client_id      The client id.
     */
    void cloud_init(const uint8_t* const _client_id);
    
    /**
     * @brief   Set the callback function.
     *
     * @param   _callback   Function to be called when an event is detected.
     */
    void cloud_set_callback(const cloud_event_callback_t _callback);
    
    /**
     * @brief   Create a JSON object so it is ready for publishing.
     *
     * @note    The function acts as an interface handling all different object types defined by @ref cloud_topic_type_t.
     *          The data parameters consist of different data depending of the type (2 set of data arrays are needed for some types).
     *
     * @param   _type                   The packet type.
     * @param   _primary_data           The generic primary data. (Unused = NULL).
     * @param   _primary_data_length    The length of the primary data. (Unused = NULL).
     * @param   _secondary_data         The generic secondary data. (Unused = NULL).
     * @param   _secondary_data_length  The length of the secondary data. (Unused = NULL).
     */
    void cloud_create_object(const uint8_t _type,
                             const uint8_t** const _primary_data,
                             const uint16_t* const _primary_data_length,
                             const uint8_t** const _secondary_data,
                             const uint16_t* const _secondary_data_length);
    
    /**
     * @brief   Publish the stored data to the AWS topic.
     */
    void cloud_publish(void);
    
    #endif /* CLOUD_H_ */

    cloud.c:

    #include <string.h>
    #include <stdio.h>
    #include <stdlib.h>
    
    #include <zephyr.h>
    #include <logging/log.h>
    #include <cJSON.h>
    
    #include "cloud.h"
    #include "watch.h"
    #include "aws.h"
    
    //! Register the cloud module to the logging system.
    LOG_MODULE_REGISTER(CLOUD, 3);
    
    //! The callback function.
    static cloud_event_callback_t cloud_callback = NULL;
    
    //! A list of the topics that can be published to AWS MQTT.
    static const char* const topics[CLOUD_TOPIC_NUMBER_OF_TYPES] = { "acdc/get_pcc_load" };
    
    //! A list of the response types from AWS MQTT.
    static const char* const response_types[CLOUD_TOPIC_NUMBER_OF_TYPES] = { "getPCCLoad" };
    
    //! The cloud publish information.
    static cloud_publish_info_t cloud_publish_info;
    
    /**
     * @brief   Find the response type index from the type string.
     *
     * @param   _type       The type in string.
     *
     * @retval  0x00-0xFE   The type index.
     * @retval  0xFF        No type is found.
     */
    static uint8_t cloud_find_response_type_index(const char* const _type) {
    
        // Loop through the response types.
        for(uint16_t i = 0; i < sizeof(response_types); i++) {
    
            // Has the response type been found, then return the index.
            if(0 == strcmp(response_types[i], _type)) {
    
                return i;
            }
        }
    
        // If the response type is not found, return 0xFF.
        return 0xFF;
    }
    
    /**
     * @brief   Handle the PCC load result from AWS.
     *
     * @param   _aws_mqtt_status    The AWS MQTT status parameters.
     * @param   _results            The JSON object containing the results.
     *
     * @retval  false               An error has occurred.
     * @retval  true                No error has occurred.
     */
    static bool cloud_handle_pcc_load_result(const aws_mqtt_status_t _aws_mqtt_status, const cJSON* const _results) {
    
        // Loop through the results array.
        for(uint16_t i = 0; i < cJSON_GetArraySize(_results); i++) {
    
            // Get the next array item.
            cJSON* result = cJSON_GetArrayItem(_results, i);
    
            // Get the status code.
            cJSON* status_code = cJSON_GetObjectItem(result, "statusCode");
    
            // Has a success message been received?
            if(0 == strcmp(status_code->valuestring, _aws_mqtt_status.success_message)) {
    
                // Get the id and PCC Load.
                cJSON* id = cJSON_GetObjectItem(result, "id");
                cJSON* pcc_load = cJSON_GetObjectItem(result, "pccLoad");
    
                LOG_INF("PCC load Result(%s): %s.", log_strdup(id->valuestring), log_strdup(pcc_load->valuestring));
    
                // Set the PCC load.
                va_set_pcc_load((uint16_t)atoi(pcc_load->valuestring));
            }
    
            // Has a failure message been received?
            else {
    
                // Get the id and message.
                cJSON* id = cJSON_GetObjectItem(result, "id");
                cJSON* message = cJSON_GetObjectItem(result, "message");
    
                LOG_INF("PCC load Result(%s): %s", log_strdup(id->valuestring),
                        log_strdup(message->valuestring));
    
                return false;
            }
        }
    
        return true;
    }
    
    /**
     * @brief       Handle events from the AWS.
     *
     * @param       _aws_event      Event generated by AWS.
     * @param       _data           Received data.
     */
    static void cloud_event_handler(const aws_event_t _aws_event, const uint8_t* const _data) {
    
        // Reset the event.
        uint8_t event = CLOUD_EVENT_NONE;
    
        // Go through the event.
        switch(_aws_event) {
    
            case AWS_EVENT_RESPONSE_READY: {
    
                // Get the AWS MQTT status parameters.
                aws_mqtt_status_t aws_mqtt_status;
                aws_get_status(&aws_mqtt_status);
    
                // Create the JSON object from the received data.
                cJSON* response = cJSON_Parse(_data);
    
                // Fetch the packet type object from the response.
                cJSON* type = cJSON_GetObjectItem(response, "type");
    
                LOG_INF("AWS response type: %s.", log_strdup(type->valuestring));
    
                // Go through the response type.
                switch(cloud_find_response_type_index(type->valuestring)) {
    
                    case CLOUD_TOPIC_TYPE_GET_PCC_LOAD: {
    
                        // A "Get PCC load" packet has been received.
                        event = CLOUD_EVENT_GET_PCC_LOAD_PACKET_SUCCEEDED;
    
                        // Fetch the pccLoadResults from the response.
                        cJSON* pcc_load_results = cJSON_GetObjectItem(response, "pccLoadResults");
    
                        // Handle the PCC load result.
                        if(false == cloud_handle_pcc_load_result(aws_mqtt_status, pcc_load_results)) {
    
                            // An error has occurred.
                            event = CLOUD_EVENT_GET_PCC_LOAD_PACKET_FAILED;
                        }
    
                        break;
                    }
    
                    default:
    
                        break;
                }
    
                // Remove the cJSON object.
                cJSON_Delete(response);
    
                break;
            }
    
            default:
    
                break;
        }
    
        // Has a Cloud callback been set?
        if((CLOUD_EVENT_NONE != event) && (NULL != cloud_callback)) {
    
            // Set the callback function.
            cloud_callback(event);
        }
    }
    
    /**
     * @brief       Create "Add Response" JSON object.
     *
     * @param       _response       The final JSON object.
     */
    static void cloud_create_add_response(cJSON* _response) {
    
        // Get the AWS MQTT status parameters.
        aws_mqtt_status_t aws_mqtt_status;
        aws_get_status(&aws_mqtt_status);
    
        // Add the elements to the response object.
        cJSON_AddItemToObject(_response, "topic", cJSON_CreateString(aws_mqtt_status.topic));
        cJSON_AddItemToObject(_response, "failureMessage", cJSON_CreateString(aws_mqtt_status.failure_message));
        cJSON_AddItemToObject(_response, "successMessage", cJSON_CreateString(aws_mqtt_status.success_message));
        cJSON_AddItemToObject(_response, "qos", cJSON_CreateString(aws_mqtt_status.qos));
    }
    
    /**
     * @brief   Run the cloud functionality.
     *          This function shall only be called inside a thread.
     */
    static void cloud_run(void) {
    
        while(1) {
    
            // Wait until connection is requested.
            if(false == aws_is_connection_requested()) {
    
                k_sleep(K_MSEC(10));
    
                continue;
            }
    
            // Keep the AWS MQTT broker connection alive and polling for new events.
            aws_run();
        }
    }
    
    //! Statically define and initialize the cloud thread.
    K_THREAD_DEFINE(cloud_thread, 8192,
                    cloud_run, NULL, NULL, NULL,
                    K_LOWEST_APPLICATION_THREAD_PRIO, 0, 0);
    
    void cloud_init(const uint8_t* const _client_id) {
    
        // Initialize the AWS.
        aws_init(_client_id, cloud_event_handler);
    }
    
    // cppcheck-suppress    unusedFunction
    void cloud_set_callback(const cloud_event_callback_t _callback) {
    
        // Set the callback function.
        cloud_callback = _callback;
    }
    
    void cloud_create_object(const uint8_t _type,
                             const uint8_t** const _primary_data,
                             const uint16_t* const _primary_data_length,
                             const uint8_t** const _secondary_data,
                             const uint16_t* const _secondary_data_length) {
    
        // Get the current date and time.
        watch_t watch;
        bool watch_status = watch_get(&watch);
    
        // Is the watch valid?
        if(false == watch_status) {
            LOG_ERR("Watch is invalid.");
            return;
        }
    
        // Create a JSON object holding the complete event.
        cJSON* event_data = cJSON_CreateObject();
    
        // Go through the type of the cloud publish.
        switch(_type) {
    
            case CLOUD_TOPIC_TYPE_GET_PCC_LOAD: {
    
                // Add the Charging load id string to the final event object.
                cJSON_AddItemToObject(event_data, "chargingLoadId", cJSON_CreateString("9b605eca-b857-42d3-a824-ef5ca15e8a74"));
    
                break;
            }
    
            default:
    
                break;
        }
    
        // Create an object holding the response.
        cJSON* response = cJSON_CreateObject();
    
        // Add the response to the object.
        cloud_create_add_response(response);
    
        // Add the response object to the final event object.
        cJSON_AddItemToObject(event_data, "response", response);
    
        // Store the AWS publish data.
        cloud_publish_info.topic = (char*)topics[_type];
        cloud_publish_info.data = cJSON_Print(event_data);
    
        // Free the cJSON object.
        cJSON_free(event_data);
    
        // Remove the cJSON object.
        cJSON_Delete(event_data);
    }
    
    void cloud_publish(void) {
    
        // Publish the data to the AWS.
        aws_publish(cloud_publish_info.topic, cloud_publish_info.data);
    }

    schedular_h:

    #ifndef SCHEDULER_H_
    #define SCHEDULER_H_
    
    /**
     * @brief   Initialize the scheduler.
     */
    void scheduler_init(void);
    
    #endif /* SCHEDULER_H_ */

    schedular.c:

    #include <stdio.h>
    
    #include <zephyr.h>
    #include <logging/log.h>
    
    #include "scheduler.h"
    #include "watch.h"
    #include "cloud.h"
    
    //! Register the scheduler module to the logging system.
    LOG_MODULE_REGISTER(Scheduler, 3);
    
    /**
     * @brief   Handle the scheduler events (Callback).
     *
     * @param   event   The scheduler event.
     * @param   watch   The current date and time.
     */
    static void scheduler_event_handler(const scheduler_event_t event, const watch_t* const watch) {
    
        // Look through the event.
        switch(event) {
    
            // Event SCHEDULER_EVENT_TICK_OCCURRED occurred.
            case SCHEDULER_EVENT_TICK_OCCURRED: {
    
    
                // Create JSON object and get it published to the topic.
                cloud_create_object(CLOUD_TOPIC_TYPE_GET_PCC_LOAD, NULL, NULL, NULL, NULL);
                cloud_publish();
    
                break;
            }
    
            // Event SCHEDULER_EVENT_INVALID occurred.
            case SCHEDULER_EVENT_INVALID:
    
                LOG_ERR("Date & time is invalid.");
    
                break;
    
            default:
    
                break;
        }
    }
    
    void scheduler_init(void) {
    
        // Set the scheduler callback.
        watch_set_scheduler_callback(scheduler_event_handler);
    }

Related