This post is older than 2 years and might not be relevant anymore
More Info: Consider searching for newer posts

How to support MSG_WAITALL flag for recv() operation

Hi 

We need the recv() be blocking until the full amount of requested data can be returned or the operation was timeout, how to achieve this?

I tried flag MSG_WAITALL but failed to compile with error 'MSG_WAITALL' undeclared;

I also tried flag NRF_MSG_WAITALL but did not work, the recv() also returned the smaller amount of data for first message.

Parents
  • Hello,

    this was fixed a long time ago, so recv should be blocking by default. If it isn't, you should consider updating your NCS repos, and also upgrading the firmware.

  • Hi

    Sorry I cannot find the answer from your link, which flag should I use, MSG_WAITALL or NRF_MSG_WAITALL?

    To obviate misunderstanding, let me explain more about my question:

    1, My TCP client recv() could be blocking untill timeout if TCP server did not send any data;

    2, Conversely, if TCP server sent a part of client requested data before recv() timeout, the client will return the part of data immediately instead of waiting for the whole requested data untill timeout. This is my issue, we need the recv() waiting untill ether it was timeout or the client received the requested complete data.

    SDK: NCS v1.2

    modem: mfw_nrf9160_1.1.1

  • Hi Hakon,

    I cannot agree with you.

    According to https://pubs.opengroup.org/onlinepubs/9699919799/functions/recv.html

    MSG_WAITALL
    On SOCK_STREAM sockets this requests that the function block until the full amount of data can be returned. 
    

    If your NRF_MSG_WAITALL is the equivalent of MSG_WAITALL, the expected behavior is: block until the full amount of data can be returned, right?

  • It's blocking mode. I said it can be blocked if server did not send any data:

    1, My TCP client recv() could be blocking untill timeout if TCP server did not send any data;

    Can you have a try on your side as the previous post? it spends more than 10 days and I just need your confirmation that whether the flag is supported or not.

  • Is there a chance that I can look at the code. I want to see how you are setting up the sockets.

  • First we open socket and connect to the server;

    Second, invoke the do_socket_receive() to receive data from server with timeout.

    static int do_socket_open(u16_t type, char* url, u16_t dst_port, u16_t src_port)
    {
    	int err = -EINVAL;
    	int client_fd;
    	enum net_sock_type sock_type;
    	enum net_ip_protocol protocol;
    	struct sockaddr_in local_addr;
    	int addr_len;
    	//int reuse = 1;
    
    	sock_type = type ? SOCK_STREAM : SOCK_DGRAM;
    	protocol = type ? IPPROTO_TCP : IPPROTO_UDP;
    
    	LOG_DBG("%s:%d", log_strdup(url), dst_port);
    
    	err = modem_info_params_get(&modem_param);
    	if (err) {
    		LOG_ERR("Unable to obtain modem parameters (%d)", err);
    		return -1;
    	}
    	/* Check network connection status by checking local IP address */
    	addr_len = strlen(modem_param.network.ip_address.value_string);
    	if (addr_len == 0) {
    		LOG_ERR("LTE not connected yet");
    		return -1;
    	}
    	if (!check_for_ipv4(modem_param.network.ip_address.value_string,
    			addr_len)) {
    		LOG_ERR("Invalid local address");
    		return -1;
    	}
    
    	if (check_for_ipv4(url, strlen(url))) {
    		err = parse_host_by_ipv4(url, dst_port);
    	} else {
    		err = parse_host_by_name(url, dst_port, sock_type);
    	}
    	if (err) {
    		LOG_ERR("Parse failed: %d", err);
    		return err;
    	}
    
    	client_fd = socket(AF_INET, sock_type, protocol);
    	if (client_fd < 0) {
    		LOG_ERR("socket() failed: %d", -errno);
    		return -errno;
    	}
    /*	err = setsockopt(client_fd, SOL_SOCKET, SO_REUSEADDR,
    					(const char *)&reuse, sizeof(reuse));
    	if (err < 0) {
    		close(client_fd);
    		LOG_ERR("setsockopt() error: %d", -errno);
    		return -errno;
    	}*/
    
    	sc_client.sock = client_fd;
    	sc_client.protocol = protocol;
    
    	LOG_DBG("Socket opened");
    
    	local_addr.sin_family = AF_INET;
    	local_addr.sin_port = htons(src_port);
    
    	/* NOTE inet_pton() returns 1 as success */
    	if (inet_pton(AF_INET, modem_param.network.ip_address.value_string,
    		&local_addr.sin_addr.s_addr) != 1) {
    		LOG_ERR("Parse local IP address failed: %d", -errno);
    		return -EINVAL;
    	}
    
    	if(protocol == IPPROTO_TCP) {
    		err = connect(client_fd, (struct sockaddr *)&sc_client.remote,
    				  sizeof(struct sockaddr_in));
    		if (err < 0) {
    			LOG_ERR("connect err: %d errno: %d", err, errno);
    			do_socket_close();
    			return -errno;
    		}
    		sc_client.connected = true;
    	} else { //UDP use bind() then share the send() and recv()
    		err = bind(client_fd, (struct sockaddr *)&local_addr,
    			   sizeof(local_addr));
    		if (err) {
    			LOG_ERR("bind err: %d errno: %d", err, errno);
    			do_socket_close();
    			return -errno;
    		}
    	}
    
    	sc_client.running = true;
    	LOG_INF("Socket connection service started");
    	return err;
    }
    
    
    
    static int do_socket_receive(u16_t size, u16_t timeout_ms, bool crc32_enabled)
    {
    	int ret;
    	u8_t data[NET_IPV4_MTU] = {0};
    	char hexstr[2*NET_IPV4_MTU+4];
    	int sockaddr_len = sizeof(struct sockaddr_in);
    	int sock = sc_client.sock;
    	struct timeval tmo = {
    		.tv_sec = (timeout_ms / 1000),
    		.tv_usec = (timeout_ms % 1000) * 1000,
    	};
    	bool gps_is_active;
    	u32_t crc32;
    
    	if (size > sizeof(data)) {
    		LOG_ERR("socket receive err: requested size %d exceeded data buffer(%d)", size, sizeof(data));
    		return -EINVAL;
    	}
    
    	gps_is_active = gps_is_running();
    	if(gps_is_active) {
    		ret = gps_control_stop();
    	}
    
    	ret = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO,
    			&tmo, sizeof(struct timeval));
    	if (ret < 0) {
    		//do_socket_close(-errno);
    		LOG_ERR("setsockopt() error: %d", -errno);
    		goto error;
    	}
    
    	if(sc_client.protocol == IPPROTO_TCP) {
    		if(!sc_client.connected) {
    			LOG_ERR("Socket receive error: TCP client has not connected to server");
    			ret = -ENOPROTOOPT;
    			goto error;
    		}
    		ret = recv(sock, data, size, NRF_MSG_WAITALL);
    	}
    	else {
    		ret = recvfrom(sock, data, size, NRF_MSG_WAITALL,
    			(struct sockaddr *)&sc_client.remote, &sockaddr_len);
    	}
    
    	if (ret < 0) {
    		LOG_WRN("recv() error: %d", -errno);
    		if (errno != EAGAIN && errno != ETIMEDOUT && errno != EINVAL) {
    			do_socket_close();
    			ret = -errno;
    			//sprintf(buf, "+MSC: %d\r\n", -errno);
    			//sc_client.callback(buf);
    		} else {
    			sprintf(buf, "+MSCRECV: 0\r\n");
    			sc_client.callback(buf);
    			ret = 0;
    		}
    	} else if (ret == 0) {
    		/**
    		 * When a stream socket peer has performed an orderly shutdown,
    		 * the return value will be 0 (the traditional "end-of-file")
    		 * The value 0 may also be returned if the requested number of
    		 * bytes to receive from a stream socket was 0
    		 * In both cases, treat as normal shutdown by remote
    		 */
    		LOG_WRN("recv() return 0");
    		sprintf(buf, "+MSCRECV: 0\r\n");
    		sc_client.callback(buf);
    	} else {
    		data[ret] = '\0';
    		LOG_INF("MSCRECV: %d, data[%s]", ret, log_strdup(data));
    		if(crc32_enabled) {
    			crc32 = crc32_ieee(data, ret);
    			LOG_DBG("CRC32:0x%04X", crc32);
    			sprintf(buf, "+MSCRECV: %d,\"%04X\",", ret, crc32);
    		}
    		else {
    			sprintf(buf, "+MSCRECV: %d,\"00000000\",", ret);
    		}
    
    		int_to_hexstring(data, hexstr, ret, 2*MAX_DATA_SIZE+4);
    		sc_client.callback(buf);
    		sc_client.callback(hexstr);
    		ret = 0;
    	}
    
    error:
    	if(gps_is_active) {
    		gps_control_start();
    	}
    	return ret;
    }

Reply
  • First we open socket and connect to the server;

    Second, invoke the do_socket_receive() to receive data from server with timeout.

    static int do_socket_open(u16_t type, char* url, u16_t dst_port, u16_t src_port)
    {
    	int err = -EINVAL;
    	int client_fd;
    	enum net_sock_type sock_type;
    	enum net_ip_protocol protocol;
    	struct sockaddr_in local_addr;
    	int addr_len;
    	//int reuse = 1;
    
    	sock_type = type ? SOCK_STREAM : SOCK_DGRAM;
    	protocol = type ? IPPROTO_TCP : IPPROTO_UDP;
    
    	LOG_DBG("%s:%d", log_strdup(url), dst_port);
    
    	err = modem_info_params_get(&modem_param);
    	if (err) {
    		LOG_ERR("Unable to obtain modem parameters (%d)", err);
    		return -1;
    	}
    	/* Check network connection status by checking local IP address */
    	addr_len = strlen(modem_param.network.ip_address.value_string);
    	if (addr_len == 0) {
    		LOG_ERR("LTE not connected yet");
    		return -1;
    	}
    	if (!check_for_ipv4(modem_param.network.ip_address.value_string,
    			addr_len)) {
    		LOG_ERR("Invalid local address");
    		return -1;
    	}
    
    	if (check_for_ipv4(url, strlen(url))) {
    		err = parse_host_by_ipv4(url, dst_port);
    	} else {
    		err = parse_host_by_name(url, dst_port, sock_type);
    	}
    	if (err) {
    		LOG_ERR("Parse failed: %d", err);
    		return err;
    	}
    
    	client_fd = socket(AF_INET, sock_type, protocol);
    	if (client_fd < 0) {
    		LOG_ERR("socket() failed: %d", -errno);
    		return -errno;
    	}
    /*	err = setsockopt(client_fd, SOL_SOCKET, SO_REUSEADDR,
    					(const char *)&reuse, sizeof(reuse));
    	if (err < 0) {
    		close(client_fd);
    		LOG_ERR("setsockopt() error: %d", -errno);
    		return -errno;
    	}*/
    
    	sc_client.sock = client_fd;
    	sc_client.protocol = protocol;
    
    	LOG_DBG("Socket opened");
    
    	local_addr.sin_family = AF_INET;
    	local_addr.sin_port = htons(src_port);
    
    	/* NOTE inet_pton() returns 1 as success */
    	if (inet_pton(AF_INET, modem_param.network.ip_address.value_string,
    		&local_addr.sin_addr.s_addr) != 1) {
    		LOG_ERR("Parse local IP address failed: %d", -errno);
    		return -EINVAL;
    	}
    
    	if(protocol == IPPROTO_TCP) {
    		err = connect(client_fd, (struct sockaddr *)&sc_client.remote,
    				  sizeof(struct sockaddr_in));
    		if (err < 0) {
    			LOG_ERR("connect err: %d errno: %d", err, errno);
    			do_socket_close();
    			return -errno;
    		}
    		sc_client.connected = true;
    	} else { //UDP use bind() then share the send() and recv()
    		err = bind(client_fd, (struct sockaddr *)&local_addr,
    			   sizeof(local_addr));
    		if (err) {
    			LOG_ERR("bind err: %d errno: %d", err, errno);
    			do_socket_close();
    			return -errno;
    		}
    	}
    
    	sc_client.running = true;
    	LOG_INF("Socket connection service started");
    	return err;
    }
    
    
    
    static int do_socket_receive(u16_t size, u16_t timeout_ms, bool crc32_enabled)
    {
    	int ret;
    	u8_t data[NET_IPV4_MTU] = {0};
    	char hexstr[2*NET_IPV4_MTU+4];
    	int sockaddr_len = sizeof(struct sockaddr_in);
    	int sock = sc_client.sock;
    	struct timeval tmo = {
    		.tv_sec = (timeout_ms / 1000),
    		.tv_usec = (timeout_ms % 1000) * 1000,
    	};
    	bool gps_is_active;
    	u32_t crc32;
    
    	if (size > sizeof(data)) {
    		LOG_ERR("socket receive err: requested size %d exceeded data buffer(%d)", size, sizeof(data));
    		return -EINVAL;
    	}
    
    	gps_is_active = gps_is_running();
    	if(gps_is_active) {
    		ret = gps_control_stop();
    	}
    
    	ret = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO,
    			&tmo, sizeof(struct timeval));
    	if (ret < 0) {
    		//do_socket_close(-errno);
    		LOG_ERR("setsockopt() error: %d", -errno);
    		goto error;
    	}
    
    	if(sc_client.protocol == IPPROTO_TCP) {
    		if(!sc_client.connected) {
    			LOG_ERR("Socket receive error: TCP client has not connected to server");
    			ret = -ENOPROTOOPT;
    			goto error;
    		}
    		ret = recv(sock, data, size, NRF_MSG_WAITALL);
    	}
    	else {
    		ret = recvfrom(sock, data, size, NRF_MSG_WAITALL,
    			(struct sockaddr *)&sc_client.remote, &sockaddr_len);
    	}
    
    	if (ret < 0) {
    		LOG_WRN("recv() error: %d", -errno);
    		if (errno != EAGAIN && errno != ETIMEDOUT && errno != EINVAL) {
    			do_socket_close();
    			ret = -errno;
    			//sprintf(buf, "+MSC: %d\r\n", -errno);
    			//sc_client.callback(buf);
    		} else {
    			sprintf(buf, "+MSCRECV: 0\r\n");
    			sc_client.callback(buf);
    			ret = 0;
    		}
    	} else if (ret == 0) {
    		/**
    		 * When a stream socket peer has performed an orderly shutdown,
    		 * the return value will be 0 (the traditional "end-of-file")
    		 * The value 0 may also be returned if the requested number of
    		 * bytes to receive from a stream socket was 0
    		 * In both cases, treat as normal shutdown by remote
    		 */
    		LOG_WRN("recv() return 0");
    		sprintf(buf, "+MSCRECV: 0\r\n");
    		sc_client.callback(buf);
    	} else {
    		data[ret] = '\0';
    		LOG_INF("MSCRECV: %d, data[%s]", ret, log_strdup(data));
    		if(crc32_enabled) {
    			crc32 = crc32_ieee(data, ret);
    			LOG_DBG("CRC32:0x%04X", crc32);
    			sprintf(buf, "+MSCRECV: %d,\"%04X\",", ret, crc32);
    		}
    		else {
    			sprintf(buf, "+MSCRECV: %d,\"00000000\",", ret);
    		}
    
    		int_to_hexstring(data, hexstr, ret, 2*MAX_DATA_SIZE+4);
    		sc_client.callback(buf);
    		sc_client.callback(hexstr);
    		ret = 0;
    	}
    
    error:
    	if(gps_is_active) {
    		gps_control_start();
    	}
    	return ret;
    }

Children
Related