pc-ble-driver-py: throughput limited and maximum amount of peripherals

Hello,

For a bachelor project I'm currently testing the throughput of the nRF52840 Dongle with pc-ble-driver-py and it appears that the pc-ble-driver-py has a maximum notification data rate of roughly 700 Bytes/sec.

In my setup I have a peripheral which sends a notification every 20ms to the nRF Dongle.
I used both blatann and a modified version of the heart_rate_collector.py / nus_collector (on Windows 10) to enable notifications from the peripheral and I can only handle notifications every 30ms in both cases. (rarely, the time between two consecutive notification event is also roughly 15ms).


Setting the minimum connection time interval to 7.5ms did not improve the performance either.

Additionally, by using nRF Connect, I did receive all notifications without problem.

Does this issue happen due to the Python binding? Is there anything i could do to increase the throughput?

Another question regarding the maximum number of peripherals: Using blatann I cannot set the amount of peripherals to more than 4. I would receive the error:
pc_ble_driver_py.exceptions.NordicSemiException: Failed to ble_enable. Error code: NrfError.no_mem

Does this also happen to the Python binding?

Best,
jaclim

Note: About the data throughtput: On blatann I noticed that the ble_event_handler handles every x * 15ms the next ble_event (where x being a number). However increasing the frequency of the notification from the peripheral side (to 100Hz) did not change the behaviour on the driver-py side.

Parents
  • Hello Jaclim,

    I recommend that you try installing pc-ble-driver-py with the latest pc-ble-driver version because this release (link) addresses an issue with USB latencies we've seen on Windows. I managed to get around 7-800kbps after I updated it  my Win 10 PC (note: not tested with python wrapper on top).

    Best regards,

    Vidar

Reply
  • Hello Jaclim,

    I recommend that you try installing pc-ble-driver-py with the latest pc-ble-driver version because this release (link) addresses an issue with USB latencies we've seen on Windows. I managed to get around 7-800kbps after I updated it  my Win 10 PC (note: not tested with python wrapper on top).

    Best regards,

    Vidar

Children
  • Hello Vidar,

    Thanks for the reply! I'm trying to install the pc-ble-driver-py with the latest version by following the Building from source guide from the pc-ble-driver-py GitHub. However, I'm not sure what I should exactly do and whether this is even the correct approach to install the newest driver. Could you maybe explain what exactly I have to do here?

    Best regards,
    jaclim

  • Hello Jaclim,

    It should be possible to build it with the latest driver, but the exact steps for it are not covered by the build instructions. However, the quickest way to try out the new library is to download the pre-compiled lib files from the release page (https://github.com/NordicSemiconductor/pc-ble-driver/releases) and simply replace the lib files in \Lib\site-packages\pc_ble_driver_py\lib\.

    Here is the content of \Lib\site-packages\pc_ble_driver_py\lib\  with pc-ble-driver version 4.1.4 libraries (I renamed the files to 4_1_2 due to hardcoded references in the python code):

    3157.lib.zip

  • Hello Vidar,

    Thank you! The performance for the notifications did indeed improve. I could reach a rate of 1600 bytes/second which is still far away from the 700 kbps. Although, the time between two notification event seems to be rather inconsistent. Sometimes it is 30ms and other times its around 1ms, in both cases where my peripheral sends a notification every 10ms and 20ms. Is this the expected behaviour of the driver?

    I tried to flash the nRF Dongle with the version 4.1.4 provided in the hex folder. But It didn't work afterwards. nRF Connect tells me that the Dongle has not been programmed.

    Best,

    Jaclim

  • Hello Jaclim,

    It's not expected behaviour no. I've tried to test throughput now with the python wrapper, and I actually got better results this time (~1000kbps). Attached below is the test script and FW if you want to try replicate it on your setup.

    Nordic UART Service client python script

    1715.nus_central.py
    #
    # Copyright (c) 2016 Nordic Semiconductor ASA
    # All rights reserved.
    #
    # Redistribution and use in source and binary forms, with or without modification,
    # are permitted provided that the following conditions are met:
    #
    #   1. Redistributions of source code must retain the above copyright notice, this
    #   list of conditions and the following disclaimer.
    #
    #   2. Redistributions in binary form must reproduce the above copyright notice, this
    #   list of conditions and the following disclaimer in the documentation and/or
    #   other materials provided with the distribution.
    #
    #   3. Neither the name of Nordic Semiconductor ASA nor the names of other
    #   contributors to this software may be used to endorse or promote products
    #   derived from this software without specific prior written permission.
    #
    #   4. This software must only be used in or with a processor manufactured by Nordic
    #   Semiconductor ASA, or in or with a processor manufactured by a third party that
    #   is used in combination with a processor manufactured by Nordic Semiconductor.
    #
    #   5. Any software provided in binary or object form under this license must not be
    #   reverse engineered, decompiled, modified and/or disassembled.
    #
    # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
    # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
    # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
    # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
    # ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
    # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
    # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
    # ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
    # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
    # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    #
    
    import sys
    import time
    import logging
    from queue import Queue, Empty
    from pc_ble_driver_py.observers import *
    
    TARGET_DEV_NAME = "Nordic_UART"
    CONNECTIONS = 1
    CFG_TAG = 1
    
    
    def init(conn_ic_id):
        # noinspection PyGlobalUndefined
        global config, BLEDriver, BLEAdvData, BLEEvtID, BLEAdapter, BLEEnableParams, BLEGapTimeoutSrc, BLEUUID, BLEUUIDBase, BLEConfigCommon, BLEConfig, BLEConfigConnGatt, BLEGapScanParams, BLEConfigConnGap, BLEGapConnParams
        from pc_ble_driver_py import config
    
        config.__conn_ic_id__ = conn_ic_id
        # noinspection PyUnresolvedReferences
        from pc_ble_driver_py.ble_driver import (
            BLEDriver,
            BLEAdvData,
            BLEEvtID,
            BLEEnableParams,
            BLEGapTimeoutSrc,
            BLEUUID,
            BLEUUIDBase,
            BLEGapScanParams,
            BLEGapConnParams,
            BLEConfigCommon,
            BLEConfig,
            BLEConfigConnGatt,
            BLEConfigConnGap,
        )
    
        # noinspection PyUnresolvedReferences
        from pc_ble_driver_py.ble_adapter import BLEAdapter
    
        global nrf_sd_ble_api_ver
        nrf_sd_ble_api_ver = config.sd_api_ver_get()
    
    
    class NusCollector(BLEDriverObserver, BLEAdapterObserver):
        def __init__(self, adapter):
            super(NusCollector, self).__init__()
            self.first = 1
            self.adapter = adapter
            self.conn_q = Queue()
            self.adapter.observer_register(self)
            self.adapter.driver.observer_register(self)
            self.adapter.default_mtu = 247
            self.nus_base = BLEUUIDBase([
                0x6e, 0x40, 0x00, 0x00, 0xb5, 0xa3, 0xf3, 0x93, 0xe0, 0xa9, 0xe5, 0x0e, 0x24, 0xdc, 0xca, 0x9e
            ])
            self.nus_rx = BLEUUID(0x0002, self.nus_base)
            self.nus_tx = BLEUUID(0x0003, self.nus_base)
    
        def open(self):
            self.adapter.driver.open()
            if config.__conn_ic_id__ == "NRF51":
                self.adapter.driver.ble_enable(
                    BLEEnableParams(
                        vs_uuid_count=1,
                        service_changed=0,
                        periph_conn_count=0,
                        central_conn_count=1,
                        central_sec_count=0,
                    )
                )
            elif config.__conn_ic_id__ == "NRF52":
                gatt_cfg = BLEConfigConnGatt()
                gatt_cfg.att_mtu = self.adapter.default_mtu
                gatt_cfg.tag = CFG_TAG
                self.adapter.driver.ble_cfg_set(BLEConfig.conn_gatt, gatt_cfg)
    
                conn_cfg = BLEConfigConnGap()
                conn_cfg.conn_count = 1
                conn_cfg.event_length = 320
                self.adapter.driver.ble_cfg_set(BLEConfig.conn_gap, conn_cfg)
    
                self.adapter.driver.ble_enable()
                self.adapter.driver.ble_vs_uuid_add(self.nus_base)
    
    
    
        def close(self):
            self.adapter.driver.close()
    
        def connect_and_discover(self):
            scan_duration = 5
            scan_params = BLEGapScanParams(interval_ms=200, window_ms=150, timeout_s=scan_duration)
            message = "We are connected!\r\n"
    
            self.adapter.driver.ble_gap_scan_start(scan_params=scan_params)
    
            try:
                new_conn = self.conn_q.get(timeout=scan_duration)
                self.adapter.service_discovery(new_conn)
                self.adapter.enable_notification(new_conn, self.nus_tx)
                data = [ord(n) for n in list(message)]
                self.adapter.write_cmd(new_conn, self.nus_rx, data)
                return new_conn
            except Empty:
                print("No device advertising with name {TARGET_DEV_NAME} found.")
                return None
    
        def on_gattc_evt_exchange_mtu_rsp(self, ble_driver, conn_handle, status, att_mtu):
            print("ATT MTU updated to {}".format(att_mtu))
        
        def on_gap_evt_data_length_update(
            self, ble_driver, conn_handle, data_length_params
        ):
            print("Max rx octets: {}".format(data_length_params.max_rx_octets))
            print("Max tx octets: {}".format(data_length_params.max_tx_octets))
            print("Max rx time: {}".format(data_length_params.max_rx_time_us))
            print("Max tx time: {}".format(data_length_params.max_tx_time_us))
    
        def on_gatts_evt_exchange_mtu_request(self, ble_driver, conn_handle, client_mtu):
            print("Client requesting to update ATT MTU to {} bytes".format(client_mtu))
        def on_gap_evt_connected(
            self, ble_driver, conn_handle, peer_addr, role, conn_params
        ):
            print("New connection: {}".format(conn_handle))
            self.conn_q.put(conn_handle)
    
        def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
            print("Disconnected: {} {}".format(conn_handle, reason))
    
        def on_gap_evt_adv_report(
            self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data
        ):
            conn_params = BLEGapConnParams(min_conn_interval_ms=7.5, max_conn_interval_ms=7.5, conn_sup_timeout_ms=4000, slave_latency=0)
            
            if BLEAdvData.Types.complete_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name]
    
            elif BLEAdvData.Types.short_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name]
    
            else:
                return
    
            dev_name = "".join(chr(e) for e in dev_name_list)
            address_string = "".join("{0:02X}".format(b) for b in peer_addr.addr)
            print(
                "Received advertisment report, address: 0x{}, device_name: {}".format(
                    address_string, dev_name
                )
            )
    
            if dev_name == TARGET_DEV_NAME:
                self.adapter.connect(peer_addr, conn_params = conn_params, tag=CFG_TAG)
    
        def on_notification(self, ble_adapter, conn_handle, uuid, data):
            if len(data) > 32:
                data = "({}...)".format(data[0:10])
            print("Connection: {}, {} = {}".format(conn_handle, uuid, data))
    
    
    def main(selected_serial_port):
        print("Serial port used: {}".format(selected_serial_port))
        driver = BLEDriver(
            serial_port=selected_serial_port, auto_flash=False, baud_rate=1000000, log_severity_level="info"
        )
    
        adapter = BLEAdapter(driver)
        collector = NusCollector(adapter)
        collector.open()
        conn = collector.connect_and_discover()
    
        while conn is not None:
            time.sleep(100)
    
        collector.close()
    
    
    def item_choose(item_list):
        for i, it in enumerate(item_list):
            print("\t{} : {}".format(i, it))
        print(" ")
    
        while True:
            try:
                choice = int(input("Enter your choice: "))
                if (choice >= 0) and (choice < len(item_list)):
                    break
            except Exception:
                pass
            print("\tTry again...")
        return choice
    
    
    if __name__ == "__main__":
        #logging.basicConfig(
        #    level="DEBUG",
        #   format="%(asctime)s [%(thread)d/%(threadName)s] %(message)s",
        #)
        serial_port = None
        if len(sys.argv) < 2:
            print("Please specify connectivity IC identifier (NRF51, NRF52)")
            exit(1)
        init(sys.argv[1])
        if len(sys.argv) == 3:
            serial_port = sys.argv[2]
        else:
            descs = BLEDriver.enum_serial_ports()
            choices = ["{}: {}".format(d.port, d.serial_number) for d in descs]
            choice = item_choose(choices)
            serial_port = descs[choice].port
        main(serial_port)
        quit()
    

    Modified Nordic UART peripheral FW to measure throughput

    5582.nrf5_sdk_17.0.2 - ble_app_uart_throughput.zip

    Resulting transfer speed displayed in Segger debug terminal:

Related