Hi,
I'm trying to connect to the EMQX(what I made on GCP) using mqtt protocal. I'm using the sample code that nordic provides in lesson. Based on the Exercise4 code which is l4_el_sol.
I successfully connected with a test server which is mqtt.nordicsemi.academy. So I changed hostname to EMQX host IP. And it doesn't work. I just changed the client->username, password
# Logging CONFIG_LOG=y # Button and LED support CONFIG_DK_LIBRARY=y # Newlib CONFIG_NEWLIB_LIBC=y CONFIG_NEWLIB_LIBC_FLOAT_PRINTF=y # Networking CONFIG_NETWORKING=y CONFIG_NET_NATIVE=n CONFIG_NET_SOCKETS_OFFLOAD=y CONFIG_NET_SOCKETS=y CONFIG_POSIX_API=y # Memory CONFIG_MAIN_STACK_SIZE=4096 CONFIG_HEAP_MEM_POOL_SIZE=2048 # Modem library CONFIG_NRF_MODEM_LIB=y # LTE link control CONFIG_LTE_LINK_CONTROL=y CONFIG_LTE_NETWORK_MODE_LTE_M_NBIOT=y # MQTT # STEP 2.1 - Enable and configure the MQTT library CONFIG_MQTT_LIB=y CONFIG_MQTT_CLEAN_SESSION=y # Application # STEP 2.2 - Configure the broker name, TCP port, topic names, and message CONFIG_MQTT_PUB_TOPIC="zephyr/test" CONFIG_MQTT_SUB_TOPIC="devacademy/subscribe/topic" CONFIG_MQTT_BROKER_HOSTNAME="34.46.65.84" CONFIG_MQTT_BROKER_PORT=1883 #i2c configuration CONFIG_I2C=y # i2c function add CONFIG_I2C_NRFX=y CONFIG_NRFX_TWIM0=n CONFIG_NRFX_TWIM1=n CONFIG_NRFX_TWIM2=y CONFIG_NRFX_TWIM3=n CONFIG_PRINTK=y CONFIG_CONSOLE=y CONFIG_UART_CONSOLE=y CONFIG_SERIAL=y CONFIG_ASSERT=y CONFIG_I2C_LOG_LEVEL_DBG=y CONFIG_LOG_MODE_IMMEDIATE=y CONFIG_CBPRINTF_FP_SUPPORT=y # float %f CONFIG_MQTT_KEEPALIVE=120 CONFIG_MQTT_CLIENT_ID="JunyoungJeon" CONFIG_MQTT_MESSAGE_BUFFER_SIZE=1024 CONFIG_NET_IPV4=y
#include <stdio.h>
#include <ncs_version.h>
#include <zephyr/kernel.h>
#include <zephyr/net/socket.h>
#include <zephyr/logging/log.h>
#include <modem/nrf_modem_lib.h>
#include <modem/lte_lc.h>
/* Include the header file for the MQTT Library*/
#include <zephyr/net/mqtt.h>
#include "mqtt_connection.h"
/* I2C Headers */
#include <zephyr/sys/printk.h>
#include <zephyr/drivers/i2c.h>
#define SEN66_NODE DT_NODELABEL(sen66)
#define Measure 100
#define REMEASURE 30
#define STARTING_TIME 60
#define WATING_TIME 100
#define Exucution_Time 1500
/* I2C Function */
int read_measurement(const struct i2c_dt_spec *dev, uint8_t *buf, size_t len)
{
int ret;
uint8_t cmd_read[] = {0x03, 0x00};
ret = i2c_write(dev->bus, cmd_read, sizeof(cmd_read), dev->addr);
if (ret != 0) {
printk("[SEN] Failed to send read command: %d\r\n", ret);
return ret;
}
k_sleep(K_SECONDS(2)); // 센서가 데이터 준비할 시간
ret = i2c_read(dev->bus, buf, len, dev->addr);
if (ret != 0) {
printk("[SEN] Failed to read measurement: %d\r\n", ret);
return ret;
}
k_sleep(K_SECONDS(2)); // 처리 여유 시간 (필요 없다면 줄일 수 있음)
return 0;
}
uint16_t read_u16(uint8_t msb, uint8_t lsb) {
return ((uint16_t)msb << 8) | lsb;
}
int16_t read_s16(uint8_t msb, uint8_t lsb) {
return (int16_t)(((uint16_t)msb << 8) | lsb); // - value possible
}
/* THE MQTT CONNECTION FOUNCTION */
static struct mqtt_client client;
/* File descriptor */
static struct pollfd fds;
static K_SEM_DEFINE(lte_connected, 0, 1);
LOG_MODULE_REGISTER(DRI, LOG_LEVEL_INF);
static void lte_handler(const struct lte_lc_evt *const evt)
{
switch (evt->type) {
case LTE_LC_EVT_NW_REG_STATUS:
if ((evt->nw_reg_status != LTE_LC_NW_REG_REGISTERED_HOME) &&
(evt->nw_reg_status != LTE_LC_NW_REG_REGISTERED_ROAMING)) {
break;
}
printk("Network registration status: %s",
evt->nw_reg_status == LTE_LC_NW_REG_REGISTERED_HOME ?
"Connected - home network" : "Connected - roaming");
k_sem_give(<e_connected);
break;
// case LTE_LC_EVT_RRC_UPDATE:
// static enum lte_lc_rrc_mode prev_rrc_mode = -1;
// if (prev_rrc_mode != evt->rrc_mode) {
// printk("[RRC] Mode changed: %s \r\n", evt->rrc_mode == LTE_LC_RRC_MODE_CONNECTED ? "Connected" : "Idle");
// prev_rrc_mode = evt->rrc_mode;
// }
// break;
default:
break;
}
}
static int modem_configure(void)
{
int err;
printk("Initializing modem library");
err = nrf_modem_lib_init();
if (err) {
LOG_ERR("Failed to initialize the modem library, error: %d \r\n", err);
return err;
}
printk("Connecting to LTE network\r\n");
err = lte_lc_connect_async(lte_handler);
if (err) {
LOG_ERR("Error in lte_lc_connect_async, error: %d", err);
return err;
}
k_sem_take(<e_connected, K_FOREVER);
printk("Connected to LTE network\r\n");
return 0;
}
int publish_sensor_data(float pm1, float pm25, float pm4, float pm10,
float rh, float temp, float voc, float nox, float co2)
{
char payload[128];
int len = snprintf(payload, sizeof(payload),
"{\"pm1\":%.2f,\"pm25\":%.2f,\"pm4\":%.2f,\"pm10\":%.2f,"
"\"rh\":%.2f,\"temp\":%.2f,\"voc\":%.2f,\"nox\":%.2f,\"co2\":%.2f}",
pm1, pm25, pm4, pm10,
rh, temp, voc, nox, co2);
return data_publish(&client,
MQTT_QOS_1_AT_LEAST_ONCE,
(const uint8_t *)payload,
len);
}
static void maintain_mqtt(void)
{
// 소켓에 들어온 데이터(응답)가 있는지 확인
int rc = poll(&fds, 1, 0);
if (rc < 0) {
LOG_ERR("poll 에러: %d", errno);
return;
}
// 응답이 있으면 가져와서 처리
if (rc > 0) {
mqtt_input(&client);
}
// PINGREQ 보내고 내부 재전송 등 유지 작업 수행
rc = mqtt_live(&client);
if (rc && rc != -EAGAIN) {
LOG_ERR("mqtt_live 에러: %d", rc);
}
}
/* MAIN CODE START POINT */
void main(void)
{
int err;
int ret;
float pm1_array[5];
float pm25_array[5];
float pm4_array[5];
float pm10_array[5];
float Humidity_array[5];
float Temp_array[5];
float VOC__array_Index[5];
float NOx__array_Index[5];
uint8_t co2_array[5];
uint8_t data_buf[30];
float sum = 0.0;
uint32_t connect_attempt = 0;
printk("[SEN] Sensor will be started \r\n");
k_sleep(K_MSEC(WATING_TIME));
const struct i2c_dt_spec sen66 = I2C_DT_SPEC_GET(SEN66_NODE);
if (!device_is_ready(sen66.bus)) {
LOG_ERR("[SEN] I2C bus %s is not ready!\r\n",sen66.bus->name);
} else {
printk("[SEN] I2C bus %s is ready \r\n", sen66.bus->name);
}
uint8_t cmd_start[] = {0x00, 0x21};
ret = i2c_write_dt(&sen66, cmd_start, sizeof(cmd_start));
if (ret == 0) {
printk("[SEN] I2C Measurement is starting \r\n");
} else {
printk("[SEN] I2C MEASUREMENT IS FAILED : %d \r\n", ret);
}
k_sleep(K_MSEC(STARTING_TIME));
//--------------------------------------------------------------------------//
err = modem_configure();
if (err) {
LOG_ERR("Failed to configure the modem");
}
err = client_init(&client);
if (err) {
LOG_ERR("Failed to initialize MQTT client: %d", err);
}
do_connect:
if (connect_attempt++ > 0) {
LOG_INF("Reconnecting in %d seconds...",
CONFIG_MQTT_RECONNECT_DELAY_S);
k_sleep(K_SECONDS(CONFIG_MQTT_RECONNECT_DELAY_S));
}
err = mqtt_connect(&client);
if (err) {
LOG_ERR("Error in mqtt_connect: %d", err);
goto do_connect;
}
err = fds_init(&client,&fds);
if (err) {
LOG_ERR("Error in fds_init: %d", err);
}
int i;
K_SEM_DEFINE(mqtt_connected, 0, 1);
void mqtt_evt_handler(struct mqtt_client *const c, const struct mqtt_evt *evt)
{
switch (evt->type) {
case MQTT_EVT_CONNACK:
if (evt->result != 0) {
LOG_ERR("MQTT connect failed: %d", evt->result);
break;
}
LOG_INF("MQTT connected successfully");
k_sem_give(&mqtt_connected); // 연결 성공 시 세마포어 give
break;
case MQTT_EVT_DISCONNECT:
LOG_INF("MQTT disconnected: %d", evt->result);
break;
default:
break;
}
}
while (1) {
sum=0.0;
for ( i = 0; i <5; i++){
maintain_mqtt();
ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
uint16_t pm1_raw = read_u16(data_buf[0], data_buf[1]);
float pm1 = pm1_raw / 10.0f;
pm1_array[i] = pm1;
sum += pm1;
printk("[SEN] pm1_array[%d]= %.2f ug/m3 \r\n", i , pm1_array[i] );
k_sleep(K_MSEC(REMEASURE));
maintain_mqtt();
}
float avg_pm1 = sum / 5.0f;
printk("[SEN] sum %.2f \n\r",sum);
printk("\r\n");
printk(" -- PM25_Array Values -- \r\n");
sum = 0.0f;
for ( i = 0; i <5; i++){
maintain_mqtt();
ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
uint16_t pm25_raw = read_u16(data_buf[3], data_buf[4]);
float pm25 = pm25_raw / 10.0f;
pm25_array[i] = pm25;
sum += pm25;
printk("[SEN] pm25_array[%d]= %.2f ug/m3 \r\n", i , pm25 );
k_sleep(K_MSEC(REMEASURE));
maintain_mqtt();
}
float avg_pm25 = sum /5.0f;
printk("[SEN] sum %.2f",sum);
printk("\r\n");
printk(" -- PM4_Array Values -- \r\n");
sum = 0.0f;
for ( i = 0; i <5; i++){
maintain_mqtt();
ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
uint16_t pm4_raw = read_u16(data_buf[6], data_buf[7]);
float pm4 = pm4_raw / 10.0f;
pm4_array[i] = pm4;
sum += pm4;
printk("[SEN] pm4_array[%d]= %.2f ug/m3 \r\n", i , pm4 );
k_sleep(K_MSEC(REMEASURE));
maintain_mqtt();
}
float avg_pm4 = sum /5.0f;
printk("[SEN] sum %.2f",sum);
printk("\r\n");
printk(" -- PM10_Array Values -- \r\n");
sum = 0.0f;
for ( i = 0; i <5; i++){
maintain_mqtt();
ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
uint16_t pm10_raw = read_u16(data_buf[9], data_buf[10]);
float pm10 = pm10_raw / 10.0f;
pm10_array[i] = pm10;
sum += pm10;
printk("[SEN] pm10_array[%d]= %.2f ug/m3 \r\n", i , pm10 );
k_sleep(K_MSEC(REMEASURE));
maintain_mqtt();
}
float avg_pm10 = sum /5.0f;
printk("[SEN] sum %.2f",sum);
printk("\r\n");
printk(" -- Humidity_Array Values -- \r\n");
sum = 0.0f;
for(i=0; i<5; i++){
maintain_mqtt();
ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
int16_t Humidity_raw = read_s16(data_buf[12], data_buf[13]);
float Humidity = Humidity_raw / 100.0f;
Humidity_array[i] = Humidity;
sum += Humidity;
printk("[SEN] Humidity_array[%d]= %.2f ug/m3 \r\n", i ,Humidity );
k_sleep(K_MSEC(REMEASURE));
maintain_mqtt();
}
float avg_Humidity = sum /5.0f;
printk("[SEN] sum %.2f",sum);
printk("\r\n");
printk(" -- Temp_Array Values -- \r\n");
sum = 0.0f;
for(i=0; i<5; i++){
maintain_mqtt();
ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
int16_t Temp_raw = read_s16(data_buf[15], data_buf[16]);
float Temp = Temp_raw / 200.0f;
Temp_array[i] = Temp;
sum += Temp;
printk("[SEN] Temp_array[%d]= %.2f T[C] \r\n", i , Temp );
k_sleep(K_MSEC(REMEASURE));
maintain_mqtt();
}
float avg_Temp = sum /5.0f;
printk("[SEN] sum %.2f",sum);
printk("\r\n");
printk(" -- VOC_Array Values -- \r\n");
sum = 0.0f;
for(i=0; i<5; i++){
maintain_mqtt();
ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
int16_t VOC_Index_raw = read_u16(data_buf[18], data_buf[19]);
float VOC_Index = VOC_Index_raw / 10.0f;
VOC__array_Index[i] = VOC_Index;
sum += VOC_Index;
printk("[SEN] VOC_array[%d]= %.2f ug/m3 \r\n", i , VOC_Index );
k_sleep(K_MSEC(REMEASURE));
maintain_mqtt();
}
float avg_VOC_Index = sum /5.0f;
printk("[SEN] sum %.2f",sum);
printk("\r\n");
printk(" -- Nox_Array Values -- \r\n");
sum = 0.0f;
for(i=0; i<5; i++){
maintain_mqtt();
ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
int16_t NOx_Index_raw = read_u16(data_buf[21], data_buf[22]);
float NOx_Index = NOx_Index_raw / 10.0f;
NOx__array_Index[i] = NOx_Index;
sum +=NOx_Index;
printk("[SEN] NOx_array[%d]= %.2f ug/m3 \r\n", i , NOx_Index );
k_sleep(K_MSEC(REMEASURE));
maintain_mqtt();
}
float avg_NOx_Index = sum / 5.0f;
printk("[SEN] sum %.2f",sum);
printk("\r\n");
printk(" -- co2 values -- \r\n");
sum = 0.0f;
for(i=0; i<5; i++){
maintain_mqtt();
ret = read_measurement(&sen66, data_buf, sizeof(data_buf));
uint16_t co2 = read_u16(data_buf[24], data_buf[25]);
co2_array[i] = co2;
sum += co2;
printk("[SEN] co2_array[%d]= %u ppm \r\n", i , co2 );
k_sleep(K_MSEC(REMEASURE));
maintain_mqtt();
}
uint16_t avg_co2 = sum / 5.0f;
printk("[SEN] sum %.2f",sum);
printk("\r\n");
printk(" -- Average values -- \r\n");
k_sleep(K_SECONDS(5));
// 5. Print values
printk("[SEN] PM1.0 : %.2f ug/m3\r\n",avg_pm1);
printk("[SEN] PM2.5 : %.2f ug/m3\r\n",avg_pm25);
printk("[SEN] PM4.0 : %.2f ug/m3\r\n",avg_pm4);
printk("[SEN] PM10.0 : %.2f ug/m3\r\n",avg_pm10);
printk("[SEN] RH [%%] : %.2f ug/m3 \n",avg_Humidity);
printk("[SEN] T[C] : %.2f T[C]\r\n" ,avg_Temp);
printk("[SEN] VOC Index : %.2f ppb\r\n",avg_VOC_Index);
printk("[SEN] NOx Index : %.2f ppb\r\n",avg_NOx_Index );
printk("[SEN] CO2 : %u ppm\r\n",avg_co2);
printk("\r\n");
printk(" -- Measurements are done -- \r\n");
printk("\r\n");
maintain_mqtt();
// MQTT Sending
err = publish_sensor_data(avg_pm1,avg_pm25,avg_pm4,avg_pm10,avg_Humidity,avg_Temp,avg_VOC_Index,avg_NOx_Index,avg_co2);
if (err) printk("MQTT sending is failed: %d\n", err);
else printk("MQTT Sending is success\n");
maintain_mqtt();
}
printk("Disconnecting MQTT client");
err = mqtt_disconnect(&client, NULL);
if (err) {
LOG_ERR("Could not disconnect MQTT client: %d", err);
}
goto do_connect;
k_sleep(K_SECONDS(60));
}
-ps connection.c and connection.h is exactly same as a l4_le_sol sample file