Hi,
I'm trying to connect to the EMQX(what I made on GCP) using mqtt protocal. I'm using the sample code that nordic provides in lesson. Based on the Exercise4 code which is l4_el_sol.
I successfully connected with a test server which is mqtt.nordicsemi.academy. So I changed hostname to EMQX host IP. And it doesn't work. I just changed the client->username, password
# Logging CONFIG_LOG=y # Button and LED support CONFIG_DK_LIBRARY=y # Newlib CONFIG_NEWLIB_LIBC=y CONFIG_NEWLIB_LIBC_FLOAT_PRINTF=y # Networking CONFIG_NETWORKING=y CONFIG_NET_NATIVE=n CONFIG_NET_SOCKETS_OFFLOAD=y CONFIG_NET_SOCKETS=y CONFIG_POSIX_API=y # Memory CONFIG_MAIN_STACK_SIZE=4096 CONFIG_HEAP_MEM_POOL_SIZE=2048 # Modem library CONFIG_NRF_MODEM_LIB=y # LTE link control CONFIG_LTE_LINK_CONTROL=y CONFIG_LTE_NETWORK_MODE_LTE_M_NBIOT=y # MQTT # STEP 2.1 - Enable and configure the MQTT library CONFIG_MQTT_LIB=y CONFIG_MQTT_CLEAN_SESSION=y # Application # STEP 2.2 - Configure the broker name, TCP port, topic names, and message CONFIG_MQTT_PUB_TOPIC="zephyr/test" CONFIG_MQTT_SUB_TOPIC="devacademy/subscribe/topic" CONFIG_MQTT_BROKER_HOSTNAME="34.46.65.84" CONFIG_MQTT_BROKER_PORT=1883 #i2c configuration CONFIG_I2C=y # i2c function add CONFIG_I2C_NRFX=y CONFIG_NRFX_TWIM0=n CONFIG_NRFX_TWIM1=n CONFIG_NRFX_TWIM2=y CONFIG_NRFX_TWIM3=n CONFIG_PRINTK=y CONFIG_CONSOLE=y CONFIG_UART_CONSOLE=y CONFIG_SERIAL=y CONFIG_ASSERT=y CONFIG_I2C_LOG_LEVEL_DBG=y CONFIG_LOG_MODE_IMMEDIATE=y CONFIG_CBPRINTF_FP_SUPPORT=y # float %f CONFIG_MQTT_KEEPALIVE=120 CONFIG_MQTT_CLIENT_ID="JunyoungJeon" CONFIG_MQTT_MESSAGE_BUFFER_SIZE=1024 CONFIG_NET_IPV4=y
#include <stdio.h> #include <ncs_version.h> #include <zephyr/kernel.h> #include <zephyr/net/socket.h> #include <zephyr/logging/log.h> #include <modem/nrf_modem_lib.h> #include <modem/lte_lc.h> /* Include the header file for the MQTT Library*/ #include <zephyr/net/mqtt.h> #include "mqtt_connection.h" /* I2C Headers */ #include <zephyr/sys/printk.h> #include <zephyr/drivers/i2c.h> #define SEN66_NODE DT_NODELABEL(sen66) #define Measure 100 #define REMEASURE 30 #define STARTING_TIME 60 #define WATING_TIME 100 #define Exucution_Time 1500 /* I2C Function */ int read_measurement(const struct i2c_dt_spec *dev, uint8_t *buf, size_t len) { int ret; uint8_t cmd_read[] = {0x03, 0x00}; ret = i2c_write(dev->bus, cmd_read, sizeof(cmd_read), dev->addr); if (ret != 0) { printk("[SEN] Failed to send read command: %d\r\n", ret); return ret; } k_sleep(K_SECONDS(2)); // 센서가 데이터 준비할 시간 ret = i2c_read(dev->bus, buf, len, dev->addr); if (ret != 0) { printk("[SEN] Failed to read measurement: %d\r\n", ret); return ret; } k_sleep(K_SECONDS(2)); // 처리 여유 시간 (필요 없다면 줄일 수 있음) return 0; } uint16_t read_u16(uint8_t msb, uint8_t lsb) { return ((uint16_t)msb << 8) | lsb; } int16_t read_s16(uint8_t msb, uint8_t lsb) { return (int16_t)(((uint16_t)msb << 8) | lsb); // - value possible } /* THE MQTT CONNECTION FOUNCTION */ static struct mqtt_client client; /* File descriptor */ static struct pollfd fds; static K_SEM_DEFINE(lte_connected, 0, 1); LOG_MODULE_REGISTER(DRI, LOG_LEVEL_INF); static void lte_handler(const struct lte_lc_evt *const evt) { switch (evt->type) { case LTE_LC_EVT_NW_REG_STATUS: if ((evt->nw_reg_status != LTE_LC_NW_REG_REGISTERED_HOME) && (evt->nw_reg_status != LTE_LC_NW_REG_REGISTERED_ROAMING)) { break; } printk("Network registration status: %s", evt->nw_reg_status == LTE_LC_NW_REG_REGISTERED_HOME ? "Connected - home network" : "Connected - roaming"); k_sem_give(<e_connected); break; // case LTE_LC_EVT_RRC_UPDATE: // static enum lte_lc_rrc_mode prev_rrc_mode = -1; // if (prev_rrc_mode != evt->rrc_mode) { // printk("[RRC] Mode changed: %s \r\n", evt->rrc_mode == LTE_LC_RRC_MODE_CONNECTED ? "Connected" : "Idle"); // prev_rrc_mode = evt->rrc_mode; // } // break; default: break; } } static int modem_configure(void) { int err; printk("Initializing modem library"); err = nrf_modem_lib_init(); if (err) { LOG_ERR("Failed to initialize the modem library, error: %d \r\n", err); return err; } printk("Connecting to LTE network\r\n"); err = lte_lc_connect_async(lte_handler); if (err) { LOG_ERR("Error in lte_lc_connect_async, error: %d", err); return err; } k_sem_take(<e_connected, K_FOREVER); printk("Connected to LTE network\r\n"); return 0; } int publish_sensor_data(float pm1, float pm25, float pm4, float pm10, float rh, float temp, float voc, float nox, float co2) { char payload[128]; int len = snprintf(payload, sizeof(payload), "{\"pm1\":%.2f,\"pm25\":%.2f,\"pm4\":%.2f,\"pm10\":%.2f," "\"rh\":%.2f,\"temp\":%.2f,\"voc\":%.2f,\"nox\":%.2f,\"co2\":%.2f}", pm1, pm25, pm4, pm10, rh, temp, voc, nox, co2); return data_publish(&client, MQTT_QOS_1_AT_LEAST_ONCE, (const uint8_t *)payload, len); } static void maintain_mqtt(void) { // 소켓에 들어온 데이터(응답)가 있는지 확인 int rc = poll(&fds, 1, 0); if (rc < 0) { LOG_ERR("poll 에러: %d", errno); return; } // 응답이 있으면 가져와서 처리 if (rc > 0) { mqtt_input(&client); } // PINGREQ 보내고 내부 재전송 등 유지 작업 수행 rc = mqtt_live(&client); if (rc && rc != -EAGAIN) { LOG_ERR("mqtt_live 에러: %d", rc); } } /* MAIN CODE START POINT */ void main(void) { int err; int ret; float pm1_array[5]; float pm25_array[5]; float pm4_array[5]; float pm10_array[5]; float Humidity_array[5]; float Temp_array[5]; float VOC__array_Index[5]; float NOx__array_Index[5]; uint8_t co2_array[5]; uint8_t data_buf[30]; float sum = 0.0; uint32_t connect_attempt = 0; printk("[SEN] Sensor will be started \r\n"); k_sleep(K_MSEC(WATING_TIME)); const struct i2c_dt_spec sen66 = I2C_DT_SPEC_GET(SEN66_NODE); if (!device_is_ready(sen66.bus)) { LOG_ERR("[SEN] I2C bus %s is not ready!\r\n",sen66.bus->name); } else { printk("[SEN] I2C bus %s is ready \r\n", sen66.bus->name); } uint8_t cmd_start[] = {0x00, 0x21}; ret = i2c_write_dt(&sen66, cmd_start, sizeof(cmd_start)); if (ret == 0) { printk("[SEN] I2C Measurement is starting \r\n"); } else { printk("[SEN] I2C MEASUREMENT IS FAILED : %d \r\n", ret); } k_sleep(K_MSEC(STARTING_TIME)); //--------------------------------------------------------------------------// err = modem_configure(); if (err) { LOG_ERR("Failed to configure the modem"); } err = client_init(&client); if (err) { LOG_ERR("Failed to initialize MQTT client: %d", err); } do_connect: if (connect_attempt++ > 0) { LOG_INF("Reconnecting in %d seconds...", CONFIG_MQTT_RECONNECT_DELAY_S); k_sleep(K_SECONDS(CONFIG_MQTT_RECONNECT_DELAY_S)); } err = mqtt_connect(&client); if (err) { LOG_ERR("Error in mqtt_connect: %d", err); goto do_connect; } err = fds_init(&client,&fds); if (err) { LOG_ERR("Error in fds_init: %d", err); } int i; K_SEM_DEFINE(mqtt_connected, 0, 1); void mqtt_evt_handler(struct mqtt_client *const c, const struct mqtt_evt *evt) { switch (evt->type) { case MQTT_EVT_CONNACK: if (evt->result != 0) { LOG_ERR("MQTT connect failed: %d", evt->result); break; } LOG_INF("MQTT connected successfully"); k_sem_give(&mqtt_connected); // 연결 성공 시 세마포어 give break; case MQTT_EVT_DISCONNECT: LOG_INF("MQTT disconnected: %d", evt->result); break; default: break; } } while (1) { sum=0.0; for ( i = 0; i <5; i++){ maintain_mqtt(); ret = read_measurement(&sen66, data_buf, sizeof(data_buf)); uint16_t pm1_raw = read_u16(data_buf[0], data_buf[1]); float pm1 = pm1_raw / 10.0f; pm1_array[i] = pm1; sum += pm1; printk("[SEN] pm1_array[%d]= %.2f ug/m3 \r\n", i , pm1_array[i] ); k_sleep(K_MSEC(REMEASURE)); maintain_mqtt(); } float avg_pm1 = sum / 5.0f; printk("[SEN] sum %.2f \n\r",sum); printk("\r\n"); printk(" -- PM25_Array Values -- \r\n"); sum = 0.0f; for ( i = 0; i <5; i++){ maintain_mqtt(); ret = read_measurement(&sen66, data_buf, sizeof(data_buf)); uint16_t pm25_raw = read_u16(data_buf[3], data_buf[4]); float pm25 = pm25_raw / 10.0f; pm25_array[i] = pm25; sum += pm25; printk("[SEN] pm25_array[%d]= %.2f ug/m3 \r\n", i , pm25 ); k_sleep(K_MSEC(REMEASURE)); maintain_mqtt(); } float avg_pm25 = sum /5.0f; printk("[SEN] sum %.2f",sum); printk("\r\n"); printk(" -- PM4_Array Values -- \r\n"); sum = 0.0f; for ( i = 0; i <5; i++){ maintain_mqtt(); ret = read_measurement(&sen66, data_buf, sizeof(data_buf)); uint16_t pm4_raw = read_u16(data_buf[6], data_buf[7]); float pm4 = pm4_raw / 10.0f; pm4_array[i] = pm4; sum += pm4; printk("[SEN] pm4_array[%d]= %.2f ug/m3 \r\n", i , pm4 ); k_sleep(K_MSEC(REMEASURE)); maintain_mqtt(); } float avg_pm4 = sum /5.0f; printk("[SEN] sum %.2f",sum); printk("\r\n"); printk(" -- PM10_Array Values -- \r\n"); sum = 0.0f; for ( i = 0; i <5; i++){ maintain_mqtt(); ret = read_measurement(&sen66, data_buf, sizeof(data_buf)); uint16_t pm10_raw = read_u16(data_buf[9], data_buf[10]); float pm10 = pm10_raw / 10.0f; pm10_array[i] = pm10; sum += pm10; printk("[SEN] pm10_array[%d]= %.2f ug/m3 \r\n", i , pm10 ); k_sleep(K_MSEC(REMEASURE)); maintain_mqtt(); } float avg_pm10 = sum /5.0f; printk("[SEN] sum %.2f",sum); printk("\r\n"); printk(" -- Humidity_Array Values -- \r\n"); sum = 0.0f; for(i=0; i<5; i++){ maintain_mqtt(); ret = read_measurement(&sen66, data_buf, sizeof(data_buf)); int16_t Humidity_raw = read_s16(data_buf[12], data_buf[13]); float Humidity = Humidity_raw / 100.0f; Humidity_array[i] = Humidity; sum += Humidity; printk("[SEN] Humidity_array[%d]= %.2f ug/m3 \r\n", i ,Humidity ); k_sleep(K_MSEC(REMEASURE)); maintain_mqtt(); } float avg_Humidity = sum /5.0f; printk("[SEN] sum %.2f",sum); printk("\r\n"); printk(" -- Temp_Array Values -- \r\n"); sum = 0.0f; for(i=0; i<5; i++){ maintain_mqtt(); ret = read_measurement(&sen66, data_buf, sizeof(data_buf)); int16_t Temp_raw = read_s16(data_buf[15], data_buf[16]); float Temp = Temp_raw / 200.0f; Temp_array[i] = Temp; sum += Temp; printk("[SEN] Temp_array[%d]= %.2f T[C] \r\n", i , Temp ); k_sleep(K_MSEC(REMEASURE)); maintain_mqtt(); } float avg_Temp = sum /5.0f; printk("[SEN] sum %.2f",sum); printk("\r\n"); printk(" -- VOC_Array Values -- \r\n"); sum = 0.0f; for(i=0; i<5; i++){ maintain_mqtt(); ret = read_measurement(&sen66, data_buf, sizeof(data_buf)); int16_t VOC_Index_raw = read_u16(data_buf[18], data_buf[19]); float VOC_Index = VOC_Index_raw / 10.0f; VOC__array_Index[i] = VOC_Index; sum += VOC_Index; printk("[SEN] VOC_array[%d]= %.2f ug/m3 \r\n", i , VOC_Index ); k_sleep(K_MSEC(REMEASURE)); maintain_mqtt(); } float avg_VOC_Index = sum /5.0f; printk("[SEN] sum %.2f",sum); printk("\r\n"); printk(" -- Nox_Array Values -- \r\n"); sum = 0.0f; for(i=0; i<5; i++){ maintain_mqtt(); ret = read_measurement(&sen66, data_buf, sizeof(data_buf)); int16_t NOx_Index_raw = read_u16(data_buf[21], data_buf[22]); float NOx_Index = NOx_Index_raw / 10.0f; NOx__array_Index[i] = NOx_Index; sum +=NOx_Index; printk("[SEN] NOx_array[%d]= %.2f ug/m3 \r\n", i , NOx_Index ); k_sleep(K_MSEC(REMEASURE)); maintain_mqtt(); } float avg_NOx_Index = sum / 5.0f; printk("[SEN] sum %.2f",sum); printk("\r\n"); printk(" -- co2 values -- \r\n"); sum = 0.0f; for(i=0; i<5; i++){ maintain_mqtt(); ret = read_measurement(&sen66, data_buf, sizeof(data_buf)); uint16_t co2 = read_u16(data_buf[24], data_buf[25]); co2_array[i] = co2; sum += co2; printk("[SEN] co2_array[%d]= %u ppm \r\n", i , co2 ); k_sleep(K_MSEC(REMEASURE)); maintain_mqtt(); } uint16_t avg_co2 = sum / 5.0f; printk("[SEN] sum %.2f",sum); printk("\r\n"); printk(" -- Average values -- \r\n"); k_sleep(K_SECONDS(5)); // 5. Print values printk("[SEN] PM1.0 : %.2f ug/m3\r\n",avg_pm1); printk("[SEN] PM2.5 : %.2f ug/m3\r\n",avg_pm25); printk("[SEN] PM4.0 : %.2f ug/m3\r\n",avg_pm4); printk("[SEN] PM10.0 : %.2f ug/m3\r\n",avg_pm10); printk("[SEN] RH [%%] : %.2f ug/m3 \n",avg_Humidity); printk("[SEN] T[C] : %.2f T[C]\r\n" ,avg_Temp); printk("[SEN] VOC Index : %.2f ppb\r\n",avg_VOC_Index); printk("[SEN] NOx Index : %.2f ppb\r\n",avg_NOx_Index ); printk("[SEN] CO2 : %u ppm\r\n",avg_co2); printk("\r\n"); printk(" -- Measurements are done -- \r\n"); printk("\r\n"); maintain_mqtt(); // MQTT Sending err = publish_sensor_data(avg_pm1,avg_pm25,avg_pm4,avg_pm10,avg_Humidity,avg_Temp,avg_VOC_Index,avg_NOx_Index,avg_co2); if (err) printk("MQTT sending is failed: %d\n", err); else printk("MQTT Sending is success\n"); maintain_mqtt(); } printk("Disconnecting MQTT client"); err = mqtt_disconnect(&client, NULL); if (err) { LOG_ERR("Could not disconnect MQTT client: %d", err); } goto do_connect; k_sleep(K_SECONDS(60)); }
-ps connection.c and connection.h is exactly same as a l4_le_sol sample file