Problem -116 error using mqtt protocal to connect EMQX broker server(GCP) in nRF9151 board

Hi,

I'm trying to connect to the EMQX(what I made on GCP) using mqtt protocal. I'm using the sample code that nordic provides in lesson. Based on the Exercise4 code which is l4_el_sol.

I successfully connected with a test server which is mqtt.nordicsemi.academy. So I changed hostname to EMQX host IP. And it doesn't work. I just changed the client->username, password

# Logging
CONFIG_LOG=y
 
# Button and LED support
CONFIG_DK_LIBRARY=y
 
# Newlib
CONFIG_NEWLIB_LIBC=y
CONFIG_NEWLIB_LIBC_FLOAT_PRINTF=y
# Networking
CONFIG_NETWORKING=y
CONFIG_NET_NATIVE=n
CONFIG_NET_SOCKETS_OFFLOAD=y
CONFIG_NET_SOCKETS=y
CONFIG_POSIX_API=y
 
# Memory
CONFIG_MAIN_STACK_SIZE=4096
CONFIG_HEAP_MEM_POOL_SIZE=2048
 
# Modem library
CONFIG_NRF_MODEM_LIB=y
 
# LTE link control
CONFIG_LTE_LINK_CONTROL=y
CONFIG_LTE_NETWORK_MODE_LTE_M_NBIOT=y
 
# MQTT
# STEP 2.1 - Enable and configure the MQTT library
CONFIG_MQTT_LIB=y
CONFIG_MQTT_CLEAN_SESSION=y
 
# Application
# STEP 2.2 - Configure the broker name, TCP port, topic names, and message
CONFIG_MQTT_PUB_TOPIC="zephyr/test"
CONFIG_MQTT_SUB_TOPIC="devacademy/subscribe/topic"
CONFIG_MQTT_BROKER_HOSTNAME="34.46.65.84"
CONFIG_MQTT_BROKER_PORT=1883
 
 
 
#i2c configuration
CONFIG_I2C=y            # i2c function add
CONFIG_I2C_NRFX=y
 
CONFIG_NRFX_TWIM0=n
CONFIG_NRFX_TWIM1=n
CONFIG_NRFX_TWIM2=y
CONFIG_NRFX_TWIM3=n
 
CONFIG_PRINTK=y      
CONFIG_CONSOLE=y          
CONFIG_UART_CONSOLE=y
CONFIG_SERIAL=y          
 
 
 
CONFIG_ASSERT=y
CONFIG_I2C_LOG_LEVEL_DBG=y
CONFIG_LOG_MODE_IMMEDIATE=y
 
 
CONFIG_CBPRINTF_FP_SUPPORT=y   # float %f
CONFIG_MQTT_KEEPALIVE=120
 
CONFIG_MQTT_CLIENT_ID="JunyoungJeon"
CONFIG_MQTT_MESSAGE_BUFFER_SIZE=1024
 
CONFIG_NET_IPV4=y
#include <stdio.h>
#include <ncs_version.h>
#include <zephyr/kernel.h>
#include <zephyr/net/socket.h>
 
#include <zephyr/logging/log.h>
#include <modem/nrf_modem_lib.h>
#include <modem/lte_lc.h>
/* Include the header file for the MQTT Library*/
#include <zephyr/net/mqtt.h>
 
#include "mqtt_connection.h"
 
/* I2C Headers */
#include <zephyr/sys/printk.h>
#include <zephyr/drivers/i2c.h>
 
 
#define SEN66_NODE DT_NODELABEL(sen66)
#define Measure 100
#define REMEASURE 30
#define STARTING_TIME 60
#define WATING_TIME 100
#define Exucution_Time 1500
 
 
 
 
/* I2C Function */
 
int read_measurement(const struct i2c_dt_spec *dev, uint8_t *buf, size_t len)
{
    int ret;
    uint8_t cmd_read[] = {0x03, 0x00};
 
    ret = i2c_write(dev->bus, cmd_read, sizeof(cmd_read), dev->addr);
    if (ret != 0) {
        printk("[SEN] Failed to send read command: %d\r\n", ret);
        return ret;
    }
 
    k_sleep(K_SECONDS(2));  // 센서가 데이터 준비할 시간
 
    ret = i2c_read(dev->bus, buf, len, dev->addr);
    if (ret != 0) {
        printk("[SEN] Failed to read measurement: %d\r\n", ret);
        return ret;
    }
 
    k_sleep(K_SECONDS(2));  // 처리 여유 시간 (필요 없다면 줄일 수 있음)
     
    return 0;  
}
 
 
uint16_t read_u16(uint8_t msb, uint8_t lsb) {
    return ((uint16_t)msb << 8) | lsb;
}
 
int16_t read_s16(uint8_t msb, uint8_t lsb) {
    return (int16_t)(((uint16_t)msb << 8) | lsb);  // - value possible
}
 
 
 
/* THE MQTT CONNECTION FOUNCTION */
static struct mqtt_client client;
 
/* File descriptor */
static struct pollfd fds;
 
static K_SEM_DEFINE(lte_connected, 0, 1);
 
LOG_MODULE_REGISTER(DRI, LOG_LEVEL_INF);
 
static void lte_handler(const struct lte_lc_evt *const evt)
{
     switch (evt->type) {
     case LTE_LC_EVT_NW_REG_STATUS:
        if ((evt->nw_reg_status != LTE_LC_NW_REG_REGISTERED_HOME) &&
            (evt->nw_reg_status != LTE_LC_NW_REG_REGISTERED_ROAMING)) {
            break;
        }
        printk("Network registration status: %s",
                evt->nw_reg_status == LTE_LC_NW_REG_REGISTERED_HOME ?
                "Connected - home network" : "Connected - roaming");
        k_sem_give(&lte_connected);
        break;
    // case LTE_LC_EVT_RRC_UPDATE:
    //   static enum lte_lc_rrc_mode prev_rrc_mode = -1;
 
    //     if (prev_rrc_mode != evt->rrc_mode) {
    //     printk("[RRC] Mode changed: %s \r\n", evt->rrc_mode == LTE_LC_RRC_MODE_CONNECTED ? "Connected" : "Idle");
    //     prev_rrc_mode = evt->rrc_mode;
    // }
    // break;
     default:
             break;
     }
}
 
static int modem_configure(void)
{
    int err;
 
    printk("Initializing modem library");
    err = nrf_modem_lib_init();
    if (err) {
        LOG_ERR("Failed to initialize the modem library, error: %d \r\n", err);
        return err;
    }
   
    printk("Connecting to LTE network\r\n");
    err = lte_lc_connect_async(lte_handler);
    if (err) {
        LOG_ERR("Error in lte_lc_connect_async, error: %d", err);
        return err;
    }
 
    k_sem_take(&lte_connected, K_FOREVER);
    printk("Connected to LTE network\r\n");
 
    return 0;
}
 
int publish_sensor_data(float pm1, float pm25, float pm4, float pm10,
                        float rh, float temp, float voc, float nox, float co2)
{
    char payload[128];
    int len = snprintf(payload, sizeof(payload),
        "{\"pm1\":%.2f,\"pm25\":%.2f,\"pm4\":%.2f,\"pm10\":%.2f,"
        "\"rh\":%.2f,\"temp\":%.2f,\"voc\":%.2f,\"nox\":%.2f,\"co2\":%.2f}",
        pm1, pm25, pm4, pm10,
        rh, temp, voc, nox, co2);
 
    return data_publish(&client,
                        MQTT_QOS_1_AT_LEAST_ONCE,
                        (const uint8_t *)payload,
                        len);
}
 
static void maintain_mqtt(void)
{
    // 소켓에 들어온 데이터(응답)가 있는지 확인
    int rc = poll(&fds, 1, 0);
    if (rc < 0) {
        LOG_ERR("poll 에러: %d", errno);
        return;
    }
    // 응답이 있으면 가져와서 처리
    if (rc > 0) {
        mqtt_input(&client);
    }
    // PINGREQ 보내고 내부 재전송 등 유지 작업 수행
    rc = mqtt_live(&client);
    if (rc && rc != -EAGAIN) {
        LOG_ERR("mqtt_live 에러: %d", rc);
    }
}
 
 
 
/* MAIN CODE START POINT */
 
void main(void)
{
    int err;
    int ret;
    float pm1_array[5];
    float pm25_array[5];
    float pm4_array[5];
    float pm10_array[5];
    float Humidity_array[5];
    float Temp_array[5];
    float VOC__array_Index[5];
    float NOx__array_Index[5];
    uint8_t co2_array[5];
    uint8_t data_buf[30];
    float sum = 0.0;
    uint32_t connect_attempt = 0;
 
 
    printk("[SEN] Sensor will be started \r\n");
 
    k_sleep(K_MSEC(WATING_TIME));
 
    const struct i2c_dt_spec sen66 = I2C_DT_SPEC_GET(SEN66_NODE);
 
    if (!device_is_ready(sen66.bus)) {
    LOG_ERR("[SEN] I2C bus %s is not ready!\r\n",sen66.bus->name);
   
    } else {
    printk("[SEN] I2C bus %s is ready \r\n", sen66.bus->name);
    }
 
    uint8_t cmd_start[] = {0x00, 0x21};
    ret = i2c_write_dt(&sen66, cmd_start, sizeof(cmd_start));
    if (ret == 0) {
        printk("[SEN] I2C Measurement is starting \r\n");
    } else {
        printk("[SEN] I2C MEASUREMENT IS FAILED : %d \r\n", ret);
       
    }
 
    k_sleep(K_MSEC(STARTING_TIME));
 
 
 
    //--------------------------------------------------------------------------//
 
 
    err = modem_configure();
    if (err) {
        LOG_ERR("Failed to configure the modem");
       
    }
 
    err = client_init(&client);
    if (err) {
        LOG_ERR("Failed to initialize MQTT client: %d", err);
       
    }
 
do_connect:
    if (connect_attempt++ > 0) {
        LOG_INF("Reconnecting in %d seconds...",
            CONFIG_MQTT_RECONNECT_DELAY_S);
        k_sleep(K_SECONDS(CONFIG_MQTT_RECONNECT_DELAY_S));
    }
 
    err = mqtt_connect(&client);
    if (err) {
        LOG_ERR("Error in mqtt_connect: %d", err);
        goto do_connect;
    }
 
    err = fds_init(&client,&fds);
    if (err) {
        LOG_ERR("Error in fds_init: %d", err);
    }
 
  int i;
   
 
 
 
  K_SEM_DEFINE(mqtt_connected, 0, 1);
 
void mqtt_evt_handler(struct mqtt_client *const c, const struct mqtt_evt *evt)
{
    switch (evt->type) {
    case MQTT_EVT_CONNACK:
        if (evt->result != 0) {
            LOG_ERR("MQTT connect failed: %d", evt->result);
            break;
        }
        LOG_INF("MQTT connected successfully");
        k_sem_give(&mqtt_connected); // 연결 성공 시 세마포어 give
        break;
 
    case MQTT_EVT_DISCONNECT:
        LOG_INF("MQTT disconnected: %d", evt->result);
        break;
 
    default:
        break;
    }
}
 
 
 
 
 
 
 
   
 
while (1) {
    sum=0.0;
 
    for ( i = 0; i <5; i++){
   
    maintain_mqtt();
    ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
 
    uint16_t pm1_raw = read_u16(data_buf[0], data_buf[1]);
    float pm1 = pm1_raw / 10.0f;
    pm1_array[i] = pm1;
    sum += pm1;
    printk("[SEN] pm1_array[%d]= %.2f ug/m3 \r\n", i , pm1_array[i] );
    k_sleep(K_MSEC(REMEASURE));
    maintain_mqtt();
 
 }
    float avg_pm1 = sum / 5.0f;
    printk("[SEN] sum %.2f \n\r",sum);
    printk("\r\n");
    printk(" -- PM25_Array Values -- \r\n");
    sum = 0.0f;
 
 
    for ( i = 0; i <5; i++){
    maintain_mqtt();
     
     ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
 
     uint16_t pm25_raw = read_u16(data_buf[3], data_buf[4]);
    float pm25 = pm25_raw / 10.0f;
    pm25_array[i] = pm25;
    sum += pm25;
    printk("[SEN] pm25_array[%d]= %.2f ug/m3 \r\n", i , pm25 );
    k_sleep(K_MSEC(REMEASURE));
    maintain_mqtt();
}
   
    float avg_pm25 = sum /5.0f;
    printk("[SEN] sum %.2f",sum);
   
    printk("\r\n");
    printk(" -- PM4_Array Values -- \r\n");
    sum = 0.0f;
for ( i = 0; i <5; i++){
    maintain_mqtt();
    ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
 
    uint16_t pm4_raw = read_u16(data_buf[6], data_buf[7]);
    float pm4 = pm4_raw / 10.0f;
    pm4_array[i] = pm4;
    sum += pm4;
    printk("[SEN] pm4_array[%d]= %.2f ug/m3 \r\n", i , pm4 );
    k_sleep(K_MSEC(REMEASURE));
   maintain_mqtt();
 
}
   
    float avg_pm4 = sum /5.0f;
    printk("[SEN] sum %.2f",sum);
 
    printk("\r\n");
    printk(" -- PM10_Array Values -- \r\n");
    sum = 0.0f;
for ( i = 0; i <5; i++){
    maintain_mqtt();
    ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
 
    uint16_t pm10_raw = read_u16(data_buf[9], data_buf[10]);
    float pm10 = pm10_raw / 10.0f;
    pm10_array[i] = pm10;
    sum += pm10;
    printk("[SEN] pm10_array[%d]= %.2f ug/m3 \r\n", i , pm10 );
    k_sleep(K_MSEC(REMEASURE));
    maintain_mqtt();
}
 
    float avg_pm10 = sum /5.0f;
    printk("[SEN] sum %.2f",sum);
   
    printk("\r\n");
    printk(" -- Humidity_Array Values -- \r\n");
    sum = 0.0f;
for(i=0; i<5; i++){
    maintain_mqtt();
    ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
 
    int16_t Humidity_raw = read_s16(data_buf[12], data_buf[13]);
    float Humidity = Humidity_raw / 100.0f;
    Humidity_array[i] = Humidity;
    sum += Humidity;
    printk("[SEN] Humidity_array[%d]= %.2f ug/m3 \r\n", i ,Humidity  );
    k_sleep(K_MSEC(REMEASURE));
    maintain_mqtt();
}
 
    float avg_Humidity = sum /5.0f;
    printk("[SEN] sum %.2f",sum);
   
    printk("\r\n");
    printk(" -- Temp_Array Values -- \r\n");
    sum = 0.0f;
for(i=0; i<5; i++){
    maintain_mqtt();
    ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
 
    int16_t Temp_raw = read_s16(data_buf[15], data_buf[16]);
    float Temp = Temp_raw  / 200.0f;
    Temp_array[i] = Temp;
    sum += Temp;
    printk("[SEN] Temp_array[%d]= %.2f T[C] \r\n", i , Temp );
    k_sleep(K_MSEC(REMEASURE));
    maintain_mqtt();
    }
 
    float avg_Temp = sum /5.0f;
    printk("[SEN] sum %.2f",sum);
   
    printk("\r\n");
    printk(" -- VOC_Array Values -- \r\n");
    sum = 0.0f;
for(i=0; i<5; i++){
    maintain_mqtt();
    ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
 
    int16_t VOC_Index_raw = read_u16(data_buf[18], data_buf[19]);
    float VOC_Index = VOC_Index_raw  / 10.0f;
    VOC__array_Index[i] = VOC_Index;
    sum += VOC_Index;
    printk("[SEN] VOC_array[%d]= %.2f ug/m3 \r\n", i , VOC_Index  );
    k_sleep(K_MSEC(REMEASURE));
   maintain_mqtt();
}
    float avg_VOC_Index = sum /5.0f;
    printk("[SEN] sum %.2f",sum);
 
    printk("\r\n");
    printk(" -- Nox_Array Values -- \r\n");
    sum = 0.0f;    
for(i=0; i<5; i++){
    maintain_mqtt();
    ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
 
    int16_t NOx_Index_raw = read_u16(data_buf[21], data_buf[22]);
    float NOx_Index = NOx_Index_raw  / 10.0f;
    NOx__array_Index[i] = NOx_Index;
    sum +=NOx_Index;
    printk("[SEN] NOx_array[%d]= %.2f ug/m3 \r\n", i , NOx_Index );
    k_sleep(K_MSEC(REMEASURE));
   maintain_mqtt();
}
    float avg_NOx_Index = sum / 5.0f;
    printk("[SEN] sum %.2f",sum);
 
    printk("\r\n");
    printk(" -- co2 values -- \r\n");
    sum = 0.0f;
for(i=0; i<5; i++){
    maintain_mqtt();
    ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
 
    uint16_t co2 = read_u16(data_buf[24], data_buf[25]);
    co2_array[i] =  co2;
    sum += co2;
    printk("[SEN] co2_array[%d]= %u ppm \r\n", i , co2  );
    k_sleep(K_MSEC(REMEASURE));
   
    maintain_mqtt();
}
 
    uint16_t avg_co2 = sum / 5.0f;
    printk("[SEN] sum %.2f",sum);
    printk("\r\n");
    printk(" -- Average values -- \r\n");
 
    k_sleep(K_SECONDS(5));
 
     // 5. Print values
    printk("[SEN] PM1.0     : %.2f ug/m3\r\n",avg_pm1);
    printk("[SEN] PM2.5     : %.2f ug/m3\r\n",avg_pm25);
    printk("[SEN] PM4.0     : %.2f ug/m3\r\n",avg_pm4);
    printk("[SEN] PM10.0    : %.2f ug/m3\r\n",avg_pm10);
    printk("[SEN] RH [%%]   : %.2f ug/m3  \n",avg_Humidity);
    printk("[SEN] T[C]      : %.2f T[C]\r\n" ,avg_Temp);
    printk("[SEN] VOC Index : %.2f ppb\r\n",avg_VOC_Index);
    printk("[SEN] NOx Index : %.2f ppb\r\n",avg_NOx_Index );
    printk("[SEN] CO2       : %u   ppm\r\n",avg_co2);  
 
    printk("\r\n");
    printk(" -- Measurements are done -- \r\n");
    printk("\r\n");
   
 
    maintain_mqtt();
 
 
        // MQTT Sending
    err = publish_sensor_data(avg_pm1,avg_pm25,avg_pm4,avg_pm10,avg_Humidity,avg_Temp,avg_VOC_Index,avg_NOx_Index,avg_co2);
    if (err) printk("MQTT sending is failed: %d\n", err);
    else printk("MQTT Sending is success\n");
 
 
       
    maintain_mqtt();
 
   
    }
 
    printk("Disconnecting MQTT client");
 
    err = mqtt_disconnect(&client, NULL);
    if (err) {
        LOG_ERR("Could not disconnect MQTT client: %d", err);
    }
    goto do_connect;
 
    k_sleep(K_SECONDS(60));
}
if you can find what problem is, please let me know. 

-ps connection.c and connection.h is exactly same as a l4_le_sol sample file

Parents Reply Children
Related