Problems to use MQTT with RabbitMQ broker

Hi everyone,

I´m work on a project were i have to stablish a mqtt connection with a RabbitMQ broker thats have user/pass auth and tls without certificate validate. My application works fine with a lot of others brokers (Nordic, Eclipse, Mosquitto and our own broker), but cant´t connect with broker of our customer. After analyze the rabbitmq server logs , seems that are some question about tls handshake timming and this makes 9160 close the connection early. I tried find some parameter were i can change the tls handshake timeout, but without success. Somone already see something like that?

Some help are welcome.

  • Hi,

    Is your issue similar to what was discussed here

    Have you considered reaching out to RabbitMQ Community on Discord?

    Best regards,
    Dejan

  • Hi Dejans,

    Other aditional information is that the servers of my customer works fine with yours own applications and devices and i don´t have access or any possibility to change anything in her. Only think that i can do now is search what in my application cause this behavior. We have tested this broker with a python and java script run on desktop and both works fine.

    Is a hard problem to solve because 9160 don´t log nothing about that. In 9160 log, we can see entire MQTT process occurs without any error, but none data appears on broker.

    The server log of rabbitmq show that the 9160 connect with broker and itself disconnect by  something like "tls handshake timeout". 

    My main question now is: why it´s work with python and java but not in 9160? (Maybe a more bigger latency introduced by mobile network?)

    This is the python scrip that i used to test and it work and can send data corretly for my customer´s broker:

    import socket
    import ssl
    import time
    import logging
    
    # logging config
    logging.basicConfig(
        filename="mqtt_test.log",
        level=logging.DEBUG,
        format="%(asctime)s - %(levelname)s - %(message)s"
    )
    
    # Add a handker to show logs on terminal
    console = logging.StreamHandler()
    console.setLevel(logging.DEBUG)
    formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
    console.setFormatter(formatter)
    logging.getLogger().addHandler(console)
    
    # MQTT broker config
    BROKER = "putyourbrokerurlhere"  # broker address. Changed do hide customer informations
    PORT = 8883  # Port of mqtt broker with tls
    CLIENT_ID = "mqtt_tester"
    USERNAME = "putyouruserhere"  # broker username. Changed do hide customer informations
    PASSWORD = "putyourpasshere"  # broker password. Changed do hide customer informations
    TOPIC = "test/mqtt"
    MESSAGE_TEMPLATE = "Teste de conexão MQTT - Contador: {}"
    QOS = 1  # QoS 1
    
    # Funtion to encode remain lenght
    def encode_remaining_length(length):
        encoded_bytes = bytearray()
        while True:
            encoded_byte = length % 128
            length //= 128
            # If still have data, define most significant bit = 1
            if length > 0:
                encoded_byte |= 128
            encoded_bytes.append(encoded_byte)
            if length == 0:
                break
        return encoded_bytes
    
    # Funtion to build CONNECT packet with auth
    def create_connect_packet(client_id, username, password):
        packet = bytearray()
        packet.extend(b"\x10")  # Type of packet CONNECT
        variable_header = b"\x00\x04MQTT\x04\xC2\x00<"  # Flags adjusted to auth
        payload = bytes([0, len(client_id)]) + client_id.encode()
        payload += bytes([0, len(username)]) + username.encode()
        payload += bytes([0, len(password)]) + password.encode()
        remaining_length = encode_remaining_length(len(variable_header) + len(payload))
        packet.extend(remaining_length)
        packet.extend(variable_header)
        packet.extend(payload)
        return packet
    
    # Funtion to build PUBLISH packer with QoS 1
    def create_publish_packet(topic, message):
        packet = bytearray()
        packet.extend(b"\x32")  # Type of packet PUBLISH with QoS 1
        topic_length = len(topic).to_bytes(2, "big")
        packet_id = (1).to_bytes(2, "big")  # Packet identifyer
        payload = topic_length + topic.encode() + packet_id + message.encode()
        remaining_length = encode_remaining_length(len(payload))
        packet.extend(remaining_length)
        packet.extend(payload)
        return packet
    
    # Funtion to build DISCONNECT packet
    def create_disconnect_packet():
        return b"\xe0\x00"  # Type of packet DISCONNECT
    
    def main():
        counter = 0
        while True:
            try:
                logging.debug("Tentando conectar ao broker via SSL/TLS...")
                with socket.create_connection((BROKER, PORT)) as sock:
                    with ssl.create_default_context().wrap_socket(sock, server_hostname=BROKER) as mqtt_socket:
                        logging.debug("Conexão segura estabelecida com o broker MQTT")
                        
                        # Send packet CONNECT with autenticação
                        connect_packet = create_connect_packet(CLIENT_ID, USERNAME, PASSWORD)
                        mqtt_socket.send(connect_packet)
                        logging.debug("Pacote CONNECT enviado com autenticação")
                        
                        # Create message with counter
                        counter += 1
                        message = MESSAGE_TEMPLATE.format(counter)
                        
                        # Send PUBLISH with QoS 1
                        publish_packet = create_publish_packet(TOPIC, message)
                        mqtt_socket.send(publish_packet)
                        logging.debug(f"Mensagem enviada: {message}")
                        
                        # Wait delivery confirmation (PUBACK)
                        puback = mqtt_socket.recv(4)
                        if puback and puback[0] == 0x40:
                            logging.debug(f"PUBACK recebido para a mensagem {counter}")
                        else:
                            logging.error(f"ERRO - Falha no recebimento do PUBACK para a mensagem {counter}")
                        
                        time.sleep(0.5)
                        # Send DISCONNECT
                        disconnect_packet = create_disconnect_packet()
                        mqtt_socket.send(disconnect_packet)
                        logging.debug("Desconectado do broker MQTT")
                        
                time.sleep(0.1)  
            
            except Exception as e:
                logging.error(f"ERRO - Exceção inesperada: {e}")
                time.sleep(0.1)  
    
    if __name__ == "__main__":
        main()
    

    So, is it for now.

    Best regards,

    Felipe

  • Hi Felipe,

    Thank you for additional information.

    Felipe Dantas said:
    My main question now is: why it´s work with python and java but not in 9160? (Maybe a more bigger latency introduced by mobile network?)

    The issue could potentially happen due to increased latency. We would need a log for further investigation of the error.

    I have found some similar issues online. You could look at the links below just to see if there is anything that might be causing the issue in your case. Here are some links:
    rabbitmq network troubleshooting network connectivity
    rabbitmq server issues
    handshake timeout error
    TLS authentication handshake timeout error
    log TLS handshake timeouts
    silent drop of TLS connection

    dejans said:
    Is your issue similar to what was discussed here

    Have you considered reaching out to RabbitMQ Community on Discord?

    Have you looked at these links?

    Are any of provided links useful in your case? 

    Best regards,
    Dejan

  • Hi Dejans,

    I couldn't read everything, but I think the main point is that the handshake timeout occurs on the 9160 side and not on the server side. As I mentioned, I don't have access to change or request changes to our customer's server implementation, since it is already in a large production environment and works normally for the applications that our customer already had. Our customer's IT maintainer only informed us that based on the server logs he concluded that our hardware does not wait long enough to complete the handshake process and when the server tries to continue the process, the hardware has already closed the connection on its own. It is worth mentioning that with the simcard of some mobile operators the 9160 can deposit the data on server, and from what we can see, these operators simply have more stable networks, with lower pings. I made a really weird code that assembles the MQTT packets with TLS in a raw way with posix and native sockets and it works in 100% of the time, but as I said, it is an absurd solution to use in a production environment. I believe that what would solve this type of solution in a true and definitive way would be something that allowed adjusting these parameters through the code natively (or something like that). One more time, some help to understand what are occurs here with this scenary are very helpfull.

    Best regards,

    Felipe

  • Hi Felipe,

    Which mobile network does your device connect to?
    Which SIM card do you use?
    Is this problem observed on a single device or on multiple devices?
    Do all affected devices connect to the same network?
    Do you have any devices that connect to the same network as affected devices, but without problems?

    Felipe Dantas said:
    The server log of rabbitmq show that the 9160 connect with broker and itself disconnect by  something like "tls handshake timeout". 

    Can you share your RabbitMQ server log and your application log?

    Best regards,
    Dejan

Related