MQTT connection dropped with LTE-M

Hi all,
I'm experiencing issues with MQTT client connection on an nRF9160 board via LTE-M.
Some context: I'm trying to connect to a MQTT broker via LTE-M (actually public test brokers like HIVEMQ or mosquitto) with a hour keepalive timeout. For LTE-M connectivity, eDRX is enabled (eDRX checks every 60seconds). Like in the examples, a thread is polling the socket for POLLIN, POLLERR, POLLNVAL and POLLHUP events.
At app startup, everything is fine, i'm able to send and receive MQTT messages. But after some time of inactivity (device in idle, modem in idle for let's say, 10minutes), I can send messages from the device (QoS0) without errors, but messages never reach broker. Furthermore, none of the incoming messages are received by the device.
It seems like the connection is dropped at some point but polling isn't able to notice it.
Am I missing something ? Do I need to set a specific configuration on the socket ?
Thanks
Parents
  • Hello, 

    What version of the nRF Connect SDK are you working with? What modem FW are you running on your device? I was not able to open the modem trace you provided. 

    Are you running our MQTT sample or your own custom code? What value is your MQTT keep alive configured to? E.g. 

    CONFIG_MQTT_KEEPALIVE=1200
    Kind regards,
    Øyvind
  • Hi, thank you for your quick response !

    I'm currently running nRF Connect SDK version 2.6.1 with Zephyr version 2.6.0. Modem firmware has been updated to 1.3.6.

    I'm currently using my custom code, which is based on the examples. Basically, there is one thread that is constantly polling the socket, and another one that is publishing messages.

    static void polling_thread( void* unused1, void* unused2, void* unused3 ){
        /* While termination has not been requested */
        while( 1 ){      
            /* Handle state */
            switch(context.state){
                case STATE_INIT:
                {
                    struct addrinfo *broker_address_result;
                    struct addrinfo *broker_address_lookup;
                    struct addrinfo broker_hints = {
                        .ai_family = AF_INET,
                        .ai_socktype = SOCK_STREAM
                    };
    
                    /* Wait for LTE-M network connection */
                    ltem_mng_wait_for_connection(&net_connected_sem);
                    __ASSERT(k_sem_take(&net_connected_sem, K_FOREVER) == 0, "Failed to acquire connected semaphore");
    
                    /* Initialize client instance */
                    mqtt_client_init(&context.client);
    
                    /* Resolve MQTT broker hostname */
                    __ASSERT(getaddrinfo(MQTT_BROKER_HOST, NULL, &broker_hints, &broker_address_result) == 0, "Failed to resolve MQTT broker hostname");
    
                    /* Look after MQTT broker address */
                    broker_address_lookup = broker_address_result;
                    while( broker_address_lookup != NULL ){
                        /* Found IPv4 address */
                        if( broker_address_lookup->ai_addrlen == sizeof(struct sockaddr_in) ){
                            struct sockaddr_in *broker4 = ((struct sockaddr_in *)&context.broker);
                            char ipv4_addr[NET_IPV4_ADDR_LEN];
    
                            broker4->sin_addr.s_addr = ((struct sockaddr_in *)broker_address_lookup->ai_addr)->sin_addr.s_addr;
                            broker4->sin_family = AF_INET;
                            broker4->sin_port = htons(MQTT_BROKER_PORT);
    
                            inet_ntop(AF_INET, &broker4->sin_addr.s_addr, ipv4_addr, sizeof(ipv4_addr));
                            LOG_DBG("Broker IPv4 Address found %s", (char *)(ipv4_addr));
                            break;
                        }
                        else{
                            LOG_DBG("ai_addrlen = %u should be %u or %u",
                                (unsigned int)broker_address_lookup->ai_addrlen,
                                (unsigned int)sizeof(struct sockaddr_in),
                                (unsigned int)sizeof(struct sockaddr_in6));
                        }
    
                        broker_address_lookup = broker_address_lookup->ai_next;
                    }
    
                    /* Free the address result struct */
                    freeaddrinfo(broker_address_result);
                    
                    /* Retrieve device IMEI to use it as client id */
                    char client_id[128] = { 0 };
                    char device_imei[30] = { 0 };
    
                    __ASSERT(ltem_mng_get_device_imei(device_imei, sizeof(device_imei)) == 0, "Failed to get device IMEI");
                    snprintf(client_id, sizeof(client_id) - 1, "rsibridge_%s", device_imei);
    
                    /* Construct every topic names */
                    for(uint32_t idx = 0; idx < TOPIC_NB; idx++){
                        snprintf(topic_name[idx], sizeof(topic_name[idx]) - 1, topic_format[idx], device_imei);
                    }
    
                    /* MQTT client configuration */
                    context.client.broker = &context.broker;
                    context.client.evt_cb = mqtt_evt_handler;
                    context.client.client_id.utf8 = client_id;
                    context.client.client_id.size = strlen(context.client.client_id.utf8);
                    context.client.password = NULL;
                    context.client.user_name = NULL;
                    context.client.protocol_version = MQTT_VERSION_3_1_1;
                    context.client.keepalive = config_mng_get_mqtt_keepalive_timeout_s();
    
                    /* Set up will topic */
                    will_topic.topic.utf8 = topic_name[TOPIC_DT_CONNECTION];
                    will_topic.topic.size = strlen(topic_name[TOPIC_DT_CONNECTION]);
                    will_topic.qos = 1;
                    context.client.will_topic = &will_topic;
                    context.client.will_message = &will_message;
    
                    /* MQTT buffers configuration */
                    context.client.rx_buf = context.client_rx_buffer;
                    context.client.rx_buf_size = sizeof(context.client_rx_buffer);
                    context.client.tx_buf = context.client_tx_buffer;
                    context.client.tx_buf_size = sizeof(context.client_tx_buffer);
    
                    /* Not using TLS for now */
                    context.client.transport.type = MQTT_TRANSPORT_NON_SECURE;
    
                    /* Set state */
                    context.state = STATE_CONNECT;
                }
                break;
    
                case STATE_CONNECT:
                {
                    /* Try to connnect to MQTT broker */
                    int32_t err = mqtt_connect(&context.client);
                    if( err ){
                        LOG_ERR("Error in mqtt_connect: %d", err);
                        
                        /* Indicate error occured */
                        error_mng_send_error(ERROR_BROKER_CONNECTION_FAILED, err);
    
                        LOG_WRN("Retrying connection in 2seconds..");
                        k_msleep(2000);
    
                        /* Set state */
                        context.state = STATE_INIT;
                    }
                    else{
                        /* Get MQTT socket to periodically poll it */
                        if( context.client.transport.type == MQTT_TRANSPORT_NON_SECURE ){
                            context.socket_fds.fd = context.client.transport.tcp.sock;
                        }
                        else{
                            __ASSERT_NO_MSG(0); /* TLS not implemented now */
                        }
                        context.socket_fds.events = POLLIN;
    
                        /* Set state */
                        context.state = STATE_IDLE;
                    }
                }
                break;
    
                case STATE_IDLE:
                {
                    int32_t error = 0;
    
                    /* Poll the socket for incoming packets with MQTT keepalive timeout */
                    error = poll(&context.socket_fds, 1, mqtt_keepalive_time_left(&context.client));
                    if( error < 0 ){
                        LOG_ERR("Polling error: %d", errno);
                        
                        /* End communication */
                        end_communication();
    
                        /* Indicate error occured */
                        error_mng_send_error(ERROR_POLLING_FAILED, error);
    
                        /* Set state */
                        context.state = STATE_INIT;
                    }
    
                    /* Ping MQTT broker to keep connection alive */
                    error = mqtt_live(&context.client);
                    if( (error != 0) && (error != -EAGAIN) ) {
                        LOG_ERR("mqtt_live error %d", error);
                        
                        /* End communication */
                        end_communication();
    
                        /* Indicate error occured */
                        error_mng_send_error(ERROR_BROKER_PING_FAILED, error);
    
                        /* Set state */
                        context.state = STATE_INIT;
                    }
    
                    /* If socket poll gave POLLIN event, run MQTT */
                    if( (context.socket_fds.revents & POLLIN) == POLLIN ){
                        error = mqtt_input(&context.client);
                        if( error != 0 ){
                            LOG_ERR("mqtt_input error: %d", error);
                            
                            /* End communication */
                            end_communication();
    
                            /* Indicate error occured */
                            error_mng_send_error(ERROR_MQTT_PROCESS_FAILED, error);
    
                            /* Set state */
                            context.state = STATE_INIT;
                        }
                    }
    
                    /* Error during polling */
                    if( ((context.socket_fds.revents & POLLERR) == POLLERR)
                        || ((context.socket_fds.revents & POLLNVAL) == POLLNVAL) ){
                        LOG_ERR("Error during polling");
    
                        /* End communication */
                        end_communication();
    
                        /* Indicate error occured */
                        error_mng_send_error(ERROR_BROKER_CONNECTION_TERMINATED, error);
    
                        /* Set state */
                        context.state = STATE_INIT;
                    }
                }
                break;
    
                default:
                {
                    /* Nothing to do here */
                }
                break;
            }
    
            /* Yield to let other threads run */
            k_yield();
        }
    }

    Here is the polling thread. The other one is just "mqtt_publish" calls, nothing fancy.

    MQTT keepalive has been set to 3600seconds.

Reply
  • Hi, thank you for your quick response !

    I'm currently running nRF Connect SDK version 2.6.1 with Zephyr version 2.6.0. Modem firmware has been updated to 1.3.6.

    I'm currently using my custom code, which is based on the examples. Basically, there is one thread that is constantly polling the socket, and another one that is publishing messages.

    static void polling_thread( void* unused1, void* unused2, void* unused3 ){
        /* While termination has not been requested */
        while( 1 ){      
            /* Handle state */
            switch(context.state){
                case STATE_INIT:
                {
                    struct addrinfo *broker_address_result;
                    struct addrinfo *broker_address_lookup;
                    struct addrinfo broker_hints = {
                        .ai_family = AF_INET,
                        .ai_socktype = SOCK_STREAM
                    };
    
                    /* Wait for LTE-M network connection */
                    ltem_mng_wait_for_connection(&net_connected_sem);
                    __ASSERT(k_sem_take(&net_connected_sem, K_FOREVER) == 0, "Failed to acquire connected semaphore");
    
                    /* Initialize client instance */
                    mqtt_client_init(&context.client);
    
                    /* Resolve MQTT broker hostname */
                    __ASSERT(getaddrinfo(MQTT_BROKER_HOST, NULL, &broker_hints, &broker_address_result) == 0, "Failed to resolve MQTT broker hostname");
    
                    /* Look after MQTT broker address */
                    broker_address_lookup = broker_address_result;
                    while( broker_address_lookup != NULL ){
                        /* Found IPv4 address */
                        if( broker_address_lookup->ai_addrlen == sizeof(struct sockaddr_in) ){
                            struct sockaddr_in *broker4 = ((struct sockaddr_in *)&context.broker);
                            char ipv4_addr[NET_IPV4_ADDR_LEN];
    
                            broker4->sin_addr.s_addr = ((struct sockaddr_in *)broker_address_lookup->ai_addr)->sin_addr.s_addr;
                            broker4->sin_family = AF_INET;
                            broker4->sin_port = htons(MQTT_BROKER_PORT);
    
                            inet_ntop(AF_INET, &broker4->sin_addr.s_addr, ipv4_addr, sizeof(ipv4_addr));
                            LOG_DBG("Broker IPv4 Address found %s", (char *)(ipv4_addr));
                            break;
                        }
                        else{
                            LOG_DBG("ai_addrlen = %u should be %u or %u",
                                (unsigned int)broker_address_lookup->ai_addrlen,
                                (unsigned int)sizeof(struct sockaddr_in),
                                (unsigned int)sizeof(struct sockaddr_in6));
                        }
    
                        broker_address_lookup = broker_address_lookup->ai_next;
                    }
    
                    /* Free the address result struct */
                    freeaddrinfo(broker_address_result);
                    
                    /* Retrieve device IMEI to use it as client id */
                    char client_id[128] = { 0 };
                    char device_imei[30] = { 0 };
    
                    __ASSERT(ltem_mng_get_device_imei(device_imei, sizeof(device_imei)) == 0, "Failed to get device IMEI");
                    snprintf(client_id, sizeof(client_id) - 1, "rsibridge_%s", device_imei);
    
                    /* Construct every topic names */
                    for(uint32_t idx = 0; idx < TOPIC_NB; idx++){
                        snprintf(topic_name[idx], sizeof(topic_name[idx]) - 1, topic_format[idx], device_imei);
                    }
    
                    /* MQTT client configuration */
                    context.client.broker = &context.broker;
                    context.client.evt_cb = mqtt_evt_handler;
                    context.client.client_id.utf8 = client_id;
                    context.client.client_id.size = strlen(context.client.client_id.utf8);
                    context.client.password = NULL;
                    context.client.user_name = NULL;
                    context.client.protocol_version = MQTT_VERSION_3_1_1;
                    context.client.keepalive = config_mng_get_mqtt_keepalive_timeout_s();
    
                    /* Set up will topic */
                    will_topic.topic.utf8 = topic_name[TOPIC_DT_CONNECTION];
                    will_topic.topic.size = strlen(topic_name[TOPIC_DT_CONNECTION]);
                    will_topic.qos = 1;
                    context.client.will_topic = &will_topic;
                    context.client.will_message = &will_message;
    
                    /* MQTT buffers configuration */
                    context.client.rx_buf = context.client_rx_buffer;
                    context.client.rx_buf_size = sizeof(context.client_rx_buffer);
                    context.client.tx_buf = context.client_tx_buffer;
                    context.client.tx_buf_size = sizeof(context.client_tx_buffer);
    
                    /* Not using TLS for now */
                    context.client.transport.type = MQTT_TRANSPORT_NON_SECURE;
    
                    /* Set state */
                    context.state = STATE_CONNECT;
                }
                break;
    
                case STATE_CONNECT:
                {
                    /* Try to connnect to MQTT broker */
                    int32_t err = mqtt_connect(&context.client);
                    if( err ){
                        LOG_ERR("Error in mqtt_connect: %d", err);
                        
                        /* Indicate error occured */
                        error_mng_send_error(ERROR_BROKER_CONNECTION_FAILED, err);
    
                        LOG_WRN("Retrying connection in 2seconds..");
                        k_msleep(2000);
    
                        /* Set state */
                        context.state = STATE_INIT;
                    }
                    else{
                        /* Get MQTT socket to periodically poll it */
                        if( context.client.transport.type == MQTT_TRANSPORT_NON_SECURE ){
                            context.socket_fds.fd = context.client.transport.tcp.sock;
                        }
                        else{
                            __ASSERT_NO_MSG(0); /* TLS not implemented now */
                        }
                        context.socket_fds.events = POLLIN;
    
                        /* Set state */
                        context.state = STATE_IDLE;
                    }
                }
                break;
    
                case STATE_IDLE:
                {
                    int32_t error = 0;
    
                    /* Poll the socket for incoming packets with MQTT keepalive timeout */
                    error = poll(&context.socket_fds, 1, mqtt_keepalive_time_left(&context.client));
                    if( error < 0 ){
                        LOG_ERR("Polling error: %d", errno);
                        
                        /* End communication */
                        end_communication();
    
                        /* Indicate error occured */
                        error_mng_send_error(ERROR_POLLING_FAILED, error);
    
                        /* Set state */
                        context.state = STATE_INIT;
                    }
    
                    /* Ping MQTT broker to keep connection alive */
                    error = mqtt_live(&context.client);
                    if( (error != 0) && (error != -EAGAIN) ) {
                        LOG_ERR("mqtt_live error %d", error);
                        
                        /* End communication */
                        end_communication();
    
                        /* Indicate error occured */
                        error_mng_send_error(ERROR_BROKER_PING_FAILED, error);
    
                        /* Set state */
                        context.state = STATE_INIT;
                    }
    
                    /* If socket poll gave POLLIN event, run MQTT */
                    if( (context.socket_fds.revents & POLLIN) == POLLIN ){
                        error = mqtt_input(&context.client);
                        if( error != 0 ){
                            LOG_ERR("mqtt_input error: %d", error);
                            
                            /* End communication */
                            end_communication();
    
                            /* Indicate error occured */
                            error_mng_send_error(ERROR_MQTT_PROCESS_FAILED, error);
    
                            /* Set state */
                            context.state = STATE_INIT;
                        }
                    }
    
                    /* Error during polling */
                    if( ((context.socket_fds.revents & POLLERR) == POLLERR)
                        || ((context.socket_fds.revents & POLLNVAL) == POLLNVAL) ){
                        LOG_ERR("Error during polling");
    
                        /* End communication */
                        end_communication();
    
                        /* Indicate error occured */
                        error_mng_send_error(ERROR_BROKER_CONNECTION_TERMINATED, error);
    
                        /* Set state */
                        context.state = STATE_INIT;
                    }
                }
                break;
    
                default:
                {
                    /* Nothing to do here */
                }
                break;
            }
    
            /* Yield to let other threads run */
            k_yield();
        }
    }

    Here is the polling thread. The other one is just "mqtt_publish" calls, nothing fancy.

    MQTT keepalive has been set to 3600seconds.

Children
Related