This post is older than 2 years and might not be relevant anymore
More Info: Consider searching for newer posts

Unable to change MTU size pc-ble-driver-py

Hello,

I am playing around with pc-ble-driver-py to make a connection towards a BLE peripheral. The peripheral is using a custom service very similar to the Nordic UART Service. I have managed to get it working but I am unable to exchange the MTU size and is stuck with the 23 bytes default MTU.

I have been looking into other code examples and this is what I'm trying to do:

CFG_TAG = 1
nrf_sd_ble_api_ver = config.sd_api_ver_get()
assert nrf_sd_ble_api_ver == 5

# Setup BLEDriver and BLEAdapter
driver = BLEDriver(
    serial_port='/dev/tty.usbmodem0006826680971',
    auto_flash=False,
    baud_rate=1000000,
    log_severity_level="info"
)
adapter = BLEAdapter(driver)

# Later in code
cfg = BLEConfigGapRoleCount(central_role_count=1, periph_role_count=0, central_sec_count=0)
cfg.conn_cfg_tag = CFG_TAG
self.adapter.driver.ble_cfg_set(BLEConfig.role_count, cfg)

cfg = BLEConfigConnGatt(att_mtu=Central.LOCAL_ATT_MTU) # Central.LOCAL_ATT_MTU = 247
cfg.conn_cfg_tag = CFG_TAG
self.adapter.driver.ble_cfg_set(BLEConfig.conn_gatt, cfg)

cfg = BLEConfigConnGap(event_length=320)
cfg.conn_cfg_tag = CFG_TAG
self.adapter.driver.ble_cfg_set(BLEConfig.conn_gap, cfg)

self.adapter.driver.ble_enable()
self.adapter.driver.ble_vs_uuid_add(Central.BASE_UUID)


# Start scan, find device and connect
self.adapter.connect(peer_addr, tag=CFG_TAG)


logging.info('MTU Size: {}'.format(self.adapter.db_conns[self.conn_handle].att_mtu))    # Prints 23
if Central.LOCAL_ATT_MTU > ATT_MTU_DEFAULT:
    logging.info('Requesting MTU exchange')
    self.att_mtu = self.adapter.att_mtu_exchange(self.conn_handle, Central.LOCAL_ATT_MTU)

    max_data_length = 251
    data_length = self.att_mtu + 4
    if data_length > max_data_length:
        data_length = max_data_length
    self.adapter.data_length_update(self.conn_handle, data_length)

Here i get error message:

2020-11-11 10:03:09,265 root         INFO     Requesting MTU exchange
Traceback (most recent call last):
  File "/test/venv/lib/python3.7/site-packages/pc_ble_driver_py/ble_adapter.py", line 220, in att_mtu_exchange
    self.driver.ble_gattc_exchange_mtu_req(conn_handle, mtu)
  File "/test/venv/lib/python3.7/site-packages/pc_ble_driver_py/ble_driver.py", line 107, in wrapper
    error_code=err_code,
pc_ble_driver_py.exceptions.NordicSemiException: Failed to ble_gattc_exchange_mtu_req. Error code: NRF_ERROR_INVALID_PARAM

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/test/test.py", line 227, in <module>
    central.start()
  File "/test/test.py", line 133, in start
    self.att_mtu = self.adapter.att_mtu_exchange(self.conn_handle, Central.LOCAL_ATT_MTU)
  File "/test/venv/lib/python3.7/site-packages/pc_ble_driver_py/ble_adapter.py", line 225, in att_mtu_exchange
    "different config tags used in ble_cfg_set and connect.") from ex
pc_ble_driver_py.exceptions.NordicSemiException: MTU exchange request failed. Common causes are: missing att_mtu setting in ble_cfg_set, different config tags used in ble_cfg_set and connect.

Any suggestion how to proceed with this?

Parents
  • Hello, here is the full application.

    from queue import Queue, Empty
    import logging
    import struct
    
    from pc_ble_driver_py import config
    
    config.__conn_ic_id__ = 'NRF52'
    CFG_TAG = 1
    
    from pc_ble_driver_py.ble_driver import (
        BLEDriver,
        BLEEnableParams,
        BLEConfig,
        BLEConfigConnGatt,
        BLEConfigConnGap, BLEGapAddr, BLEGattcWriteParams, BLEGattWriteOperation, BLEGattExecWriteFlag, BLEEvtID,
        BLEGapScanParams, BLEGapConnParams, BLEConfigGapRoleCount, BLEConfigGatts, BLEGapDataLengthParams,
        BLEConfigConnGattc,
    )
    from pc_ble_driver_py.ble_adapter import BLEAdapter
    from pc_ble_driver_py.observers import BLEDriverObserver, BLEAdapterObserver
    from pc_ble_driver_py.ble_driver import (
        BLEAdvData,
        BLEUUID,
        BLEUUIDBase,
        BLEGattsCharMD,
        BLEGattCharProps,
        BLEGattsAttrMD,
        BLEGattsAttr,
        BLEGattsCharHandles,
        BLEGattHandle,
        BLEGattsHVXParams,
        driver,
    )
    
    from pc_ble_driver_py.ble_driver import BLEAdvData
    
    from pc_ble_driver_py.ble_driver import ATT_MTU_DEFAULT
    from pc_ble_driver_py import config
    global nrf_sd_ble_api_ver
    nrf_sd_ble_api_ver = config.sd_api_ver_get()
    
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
    logger = logging.getLogger(__name__)
    
    from abc import ABC, abstractmethod
    
    class ByteChannelException(Exception):
        pass
    
    
    class ByteChannel(ABC):
    
        @abstractmethod
        def write(self, data):
            pass
    
        @abstractmethod
        def read(self):
            pass
    
        @abstractmethod
        def close(self):
            pass
    
    class BLEAdvManufSpecificData:
    
        def __init__(self, data):
            self.data = data
    
    class Central(BLEDriverObserver, BLEAdapterObserver, ByteChannel):
    
        BASE_UUID = BLEUUIDBase([0xBA, 0xD3, 0xE9, 0x00, 0x43, 0x20, 0x4A, 0x8C, 0xB5, 0x7E, 0x46, 0x46, 0x48, 0x4B, 0x53, 0xA1])
        SERVICE_UUID = BLEUUID(0xE900, BASE_UUID)
        RX_CHAR_UUID = BLEUUID(0x0002)
        TX_CHAR_UUID = BLEUUID(0x0003)
    
        LOCAL_ATT_MTU = 23
    
        def __init__(self, adapter: BLEAdapter):
            super().__init__()
            self.adapter = adapter
            self.conn_q = Queue()
            self.notification_q = Queue()
            self.adapter.observer_register(self)
            self.adapter.driver.observer_register(self)
            self.conn_handle = None
            self.att_mtu = ATT_MTU_DEFAULT
            self.packet_size = self.att_mtu - 3
    
        def open(self):
            self.adapter.open()
    
            assert nrf_sd_ble_api_ver == 5
    
            cfg = BLEConfigGapRoleCount(central_role_count=1, periph_role_count=0, central_sec_count=0)
            cfg.conn_cfg_tag = CFG_TAG
            self.adapter.driver.ble_cfg_set(BLEConfig.role_count, cfg)
    
            #cfg = BLEConfigGatts(service_changed=1, attr_tab_size=1408)
            #cfg.conn_cfg_tag = CFG_TAG
            #self.adapter.driver.ble_cfg_set(BLEConfig.attr_tab_size, cfg)
    
            cfg = BLEConfigConnGatt(att_mtu=Central.LOCAL_ATT_MTU)
            cfg.conn_cfg_tag = CFG_TAG
            self.adapter.driver.ble_cfg_set(BLEConfig.conn_gatt, cfg)
    
            #cfg = BLEConfigConnGattc(write_cmd_tx_queue_size=5)
            #cfg.conn_cfg_tag = CFG_TAG
            #self.adapter.driver.ble_cfg_set(BLEConfig.conn_gattc, cfg)
    
            #cfg = BLEConfigConnGap(event_length=320, conn_count=1)
            #cfg.conn_cfg_tag = CFG_TAG
            #self.adapter.driver.ble_cfg_set(BLEConfig.conn_gap, cfg)
    
            self.adapter.driver.ble_enable()
            self.adapter.driver.ble_vs_uuid_add(Central.BASE_UUID)
    
        def on_gattc_evt_exchange_mtu_rsp(self, ble_driver, conn_handle, status, att_mtu):
            pass
    
        def on_gatts_evt_exchange_mtu_request(self, ble_driver, conn_handle, client_mtu):
            self.att_mtu = client_mtu
            self.packet_size = client_mtu - 3
            logging.info('New MTU: {}, packet size: {}'.format(self.att_mtu, self.packet_size))
    
        def stop(self):
            if self.conn_handle:
                self.adapter.driver.ble_gap_disconnect(self.conn_handle)
    
        def start(self):
            scan_duration = 5
            self.adapter.driver.ble_gap_scan_start()
    
            try:
                self.conn_handle = self.conn_q.get(timeout=scan_duration)
                self.adapter.service_discovery(self.conn_handle)
                tmp = self.adapter.db_conns[self.conn_handle]
    
                self.rx_char = tmp.get_char_value_handle(Central.RX_CHAR_UUID, Central.SERVICE_UUID)
                self.tx_char_cccd = tmp.get_cccd_handle(Central.TX_CHAR_UUID)
                self.tx_char = tmp.get_char_value_handle(Central.TX_CHAR_UUID, Central.SERVICE_UUID)
    
                if Central.LOCAL_ATT_MTU > self.att_mtu:
                    logging.info('Requesting MTU exchange')
                    self.att_mtu = self.adapter.att_mtu_exchange(self.conn_handle, Central.LOCAL_ATT_MTU)
    
                self.adapter.db_conns[self.conn_handle].att_mtu = self.att_mtu
    
                """
                max_data_length = 251
                data_length = self.att_mtu + 4
                if data_length > max_data_length:
                    data_length = max_data_length
                self.adapter.data_length_update(self.conn_handle, 251)
                """
                self.enable_notification(Central.TX_CHAR_UUID)
    
    
            except Empty:
                logger.info('Could not find BLE device')
    
        def write(self, data):
            payload = struct.pack('<i', len(data)) + data
            mtu = 20
            #mtu = self.packet_size
    
            written = 0
            for i in range(0, len(payload), mtu):
                chunk = payload[i:i + mtu]
                logging.info('Writing ({}) {}'.format(len(chunk), hex_rep(chunk)))
                status = self.adapter.write_req(
                    self.conn_handle,
                    None,
                    chunk,
                    self.rx_char
                )
    
        def read_byte_from_queue(self, timeout=None):
            byte = self.notification_q.get(True, timeout)
            if byte is None:
                raise ByteChannelException('Byte channel was closed')
            return byte
    
        def read(self):
            # tmp
            size_bytes = bytearray(4)
            size_bytes[0] = self.read_byte_from_queue()
            size_bytes[1] = self.read_byte_from_queue(0.2)
            size_bytes[2] = self.read_byte_from_queue(0.2)
            size_bytes[3] = self.read_byte_from_queue(0.2)
            size, = struct.unpack('<i', size_bytes)
            response = bytearray(size)
            for i in range(0, size):
                response[i] = self.read_byte_from_queue(0.2)
            return response
    
        def enable_notification(self, uuid):
            self.adapter.enable_notification(self.conn_handle, uuid)
            logger.info(f"Notification enabled.")
    
    
        def disable_notification(self, uuid):
            self.adapter.disable_notification(self.conn_handle, uuid)
            logger.info(f"Notification disabled.")
    
        def on_gap_evt_connected(
                self, ble_driver, conn_handle, peer_addr, role, conn_params
        ):
            self.conn_q.put(conn_handle)
            logger.info(f"(Central) New connection: {conn_handle}.")
    
        def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
            logger.info(f"Disconnected: {conn_handle} {reason}.")
    
        def on_gap_evt_adv_report(
                self, ble_driver, conn_handle, peer_addr: BLEGapAddr, rssi, adv_type, adv_data: BLEAdvData
        ):
    
            if BLEAdvData.Types.manufacturer_specific_data in adv_data.records:
                manuf_data = adv_data.records[BLEAdvData.Types.manufacturer_specific_data]
                if manuf_data[0] == 0x2E and manuf_data[1] == 0x01:
                    logging.info('Found ASSA ABLOY Manufacturer specific data: {}'.format(manuf_data[2:]))
                    self.adapter.connect(
                        peer_addr,
                        scan_params=BLEGapScanParams(interval_ms=50, window_ms=50, timeout_s=10),
                        conn_params=BLEGapConnParams(
                            min_conn_interval_ms=15,
                            max_conn_interval_ms=30,
                            conn_sup_timeout_ms=4000,
                            slave_latency=0,
                        ), tag=CFG_TAG)
    
        def on_notification(self, ble_adapter, conn_handle, uuid, data):
            if uuid.value == Central.TX_CHAR_UUID.value:
                logging.info('Received: {}'.format(hex_rep(data)))
                for i in data:
                    self.notification_q.put(i)
    
        def disconnect(self):
            self.adapter.disconnect(self.conn_handle)
            self.notification_q.put(None)
    
        def close(self):
            self.adapter.disconnect(self.conn_handle)
            self.notification_q.put(None)
    
    driver = BLEDriver(
        serial_port='/dev/tty.usbmodem0006826680971',
        auto_flash=False,
        baud_rate=1000000,
        log_severity_level="info"
    )
    adapter = BLEAdapter(driver)
    central = Central(adapter)
    central.open()
    central.start()
    
    central.write(bytearray.fromhex('53437632010000000000c71f416aca05c574080289510011dc024ee4d0d1a9a5ac82c047ea972dd41312'))
    central.write(bytearray.fromhex('0600afa3cac902ddf760267fff7a79f0cc94ab0bafd7a695de9451ff6cf6bd5024bb189bdc61b8c1350b42592af743fd15a8fa38343af0026ae2a1a2ac892e2ef5628cf8a03e07e2515331324c1305cca6341f27ddd807abc7b97d4a44440752329db98f6b13357b6f47213003d84f3c17d7a64f73334c79'))
    logging.info(central.read().hex())
    
    central.close()

    Quite much playground so far

  • Hi, sorry for not getting back to you on this one. I had some issues with running this application, are you still facing this issue, or did you manage to resolve it?

Reply Children
  • Hello, i haven't been working on this lately either but i gave it a try now. This still does not work, i have tried from both linux and mac. Now i am using an nRF52840 MDK USB Dongle. I have flashed the nRF connectivity firmware into it.

    If i however are using the nRF Connect (v3.6.1) and the BLE tools, i can write a large chunk to the characteristics. And i guess that in the end, these are using the same API towards the connectivity firmware?

    Packages received are bigger than 20.

  • Hello, i haven't been working on this lately either but i gave it a try now. This still does not work, i have tried from both linux and mac. Now i am using an nRF52840 MDK USB Dongle. I have flashed the nRF connectivity firmware into it.

    If i however are using the nRF Connect (v3.6.1) and the BLE tools, i can write a large chunk to the characteristics. And i guess that in the end, these are using the same API towards the connectivity firmware?

    Packages received are bigger than 20.

    Update:
    I digged into the code a bit, and since the peripheral is requesting the MTU exchange i looked into what happens here (ble_adapter.py).

        def on_gatts_evt_exchange_mtu_request(self, ble_driver, conn_handle, client_mtu):
            try:
                ble_driver.ble_gatts_exchange_mtu_reply(conn_handle, self.default_mtu)
            except NordicSemiException as ex:
                raise NordicSemiException(
                    "MTU exchange reply failed. Common causes are: "
                    "missing att_mtu setting in ble_cfg_set, "
                    "different config tags used in ble_cfg_set and adv_start.") from ex


    The ble_adapter.default_mtu is originating from ATT_MTU_DEFAULT which is originating from

        import pc_ble_driver_py.lib.nrf_ble_driver_sd_api_v5 as driver
    
        ATT_MTU_DEFAULT = driver.BLE_GATT_ATT_MTU_DEFAULT
        
        
        
        
        and then:
        
        BLE_GATT_ATT_MTU_DEFAULT = _nrf_ble_driver_sd_api_v5.BLE_GATT_ATT_MTU_DEFAULT
        
        in nrf_ble_driver_sd_api_v5.py
    This value is set to 23. I don't know what the API is to change this.
    If i do this:
            self.adapter = BLEAdapter(driver)
            self.adapter.default_mtu = 247

    It will work.

    What is the proper way to do this?

Related