This post is older than 2 years and might not be relevant anymore
More Info: Consider searching for newer posts

pc-ble-driver-py missing high-rate notifications

Hello,

whilst implementing my embedded device software, I'm developing a tool running on a Windows laptop to validate my work.

The idea is to use pc-ble-driver-py python bindings to implement simple interactions with the device over BLE using the services exposed.

The current setup is:

  • pc-ble-driver-py 0.15.0 (latest)
  • nRF52840 Dongle (PCA10059) running connectivity FW 4.1.2 SD API v5 (latest supported)

Most of the functionalities seem to be working fine.

But when I try to collect notifications from a certain rate on, looks like the system is not able to "catch-up".

To give some context - from the embedded device, I'm polling data and queuing notifications on certain char every 10ms. The connection interval is set 15ms.

The moment I enable the notifications on this char from Client side(after the wanted connection intervals are updated), I start receiving as a return value from sd_ble_gatts_hvx the error code NRF_ERROR_RESOURCES on the server side - it's not always the case, but very often, at least 10/20 times per second.

The only explanation I can give to this is that the Central (i.e. the python script) is not meeting all the connection events at fixed interval (15ms), and this way the notification queue on the Server is increasing till its limit.

I tested this scenario with different platforms (Windows UWP, Android) and this is the only case I'm seeing this behaviour.

What I find rather odd is that using exactly the same dongle (with same FW), but from the nRF Connect Desktop App GUI this is NOT happening - I can subscribe to the notifications and getting them at the rate I expect.

Adding a couple of graph to make things clearer:

Notifications Intervals (ms) from nRF Connect Desktop:

Notifications Intervals (ms) from pc-ble-driver-py:

It's clear that in the first case, two notifications are sometimes very close as the queuing rate is higher than the connection interval (i.e you get more than one notification for conn interval).

Instead in the second case, interval is never below 15ms, meaning that for sure notifications are lost (I imagine due to missed connection events...).

I tried to do the same but changing the dongle with a nRF52-DK. The result is that in this case the system crashes and disconnects after a couple of seconds from the enabling of notifications with this log:

2020-10-20 15:17:21,194 [21092/LogThread] h5_decode error, code: 0x802c, H5 error count: 1. raw packet: c0 f0 0e 02 00 02 39 00 00 00 00 00 00 00 1c 00 01 12 00 f7 03 71 03 d0 02 de 02 fe 02 0e c0 

As a quick workaround, If I try to queue notifications every 20ms (or anything above the connection interval), I don't see any problem and the system works flawlessly.

Can you help me understanding what's going on?

I'm evaluating this python libraries for the system testing framework of our devices - and of course we'll need to evaluate also the high-rate notifications.

I think reproducing the issue would be quite trivial, but maybe there is something I'm missing that can solve this straightaway.

Thanks!

Parents
  • Hello,

    I start receiving as a return value from sd_ble_gatts_hvx the error code NRF_ERROR_RESOURCES on the server side - it's not always the case, but very often, at least 10/20 times per second.

    An exempt from the sd_ble_gatts_hvx API Reference reads:

    NRF_ERROR_RESOURCES Too many notifications queued. Wait for a BLE_GATTS_EVT_HVN_TX_COMPLETE event and retry.

    I suspect in this case that you are indeed queueing notifications faster than you are able to send them, filling the available TX queue. My suspicion is strengthened when you say you are queueing a notification every 10 ms, with a connection interval of 15 ms. If you expect that some notifications might require a retransmit then we may also try to increase the TX queue to resolve the issue ( as long as notifications are being sent faster than they are queued on average ).

    The only explanation I can give to this is that the Central (i.e. the python script) is not meeting all the connection events at fixed interval (15ms), and this way the notification queue on the Server is increasing till its limit.

    This is a good consideration - and there are multiple reasons why this might be the case, for example if you are communicating in an environment with massive 2.4 GHz interference, or over a very long range. I would however first look at the connection parameters, and how often notifications are sent successfully.

    Are you familiar with the nRF Sniffer tool? It is a powerful tool when developing with BLE, which lets you monitor the on-air BLE traffic. You could use this to check whether or not one of your devices are skipping some connection intervals, for when we have exhausted the connection-parameter approach.

    that for sure notifications are lost

    BLE connections are loss-less - if a packet is not ACK'd, it is retransmitted.

    I have a feeling I might have misunderstood your situation and issue, in which case a sniffer trace from the nRF Sniffer would be very helpful to see the whole picture.
    Please do not hesitate to let me know if I have misunderstood your description, or if any part of my reply should be unclear.

    Looking forward to resolving this issue together!

    Best regards,
    Karl

  • Hi ,

    thanks for helping out!

    I suspect in this case that you are indeed queueing notifications faster than you are able to send them, filling the available TX queue. My suspicion is strengthened when you say you are queueing a notification every 10 ms, with a connection interval of 15 ms. If you expect that some notifications might require a retransmit then we may also try to increase the TX queue to resolve the issue ( as long as notifications are being sent faster than they are queued on average ).

    I'm not sure I follow here.
    There's an internal buffer for notifications on Server side (the embedded device).
    So if on the Server I'm queuing data every 10ms with an interval of 15ms I'm just expecting on the Client side to be notified about up to 2 notifications for each event, which should be acceptable.
    BTW packets are just around 20 bytes, whilst the MTU is set to 131 bytes (Data Length Extension 135 bytes).

    Of course when I get the error NRF_ERROR_RESOURCES I could just retry to send, but this is not removing the fundamental problem.

    This is a good consideration - and there are multiple reasons why this might be the case, for example if you are communicating in an environment with massive 2.4 GHz interference, or over a very long range. I would however first look at the connection parameters, and how often notifications are sent successfully.

    What I'm puzzled with is that exactly the same HW configuration and environment (but using nRF Connect Desktop as a notifications collector) is instead working without any problem. I'm transmitting from few centimetres, so I don't think the issue is there.
    I believe the intervals graph I attached in the original post are quite descriptive in this sense.


    This makes me think the issue is on the Client side rather on the way the Server is queuing notifications, as the other tests I did with different entities collecting the notifications are not having the same issue.

    Is it clearer now?

    BLE connections are loss-less - if a packet is not ACK'd, it is retransmitted.

    Is this true also on notifications? I know indications works differently. Please let me know if there's any documentation you suggest to understand these details at protocol level Slight smile

    As for the sniffer - that is definitely the next step for this.

    I'd just need a bit of time to set this up as I never did it, but probably can provide some more info.

    Is there any other quick tests you suggest to at least isolate the problem in a particular area?

    Thanks!

  • Hello again Davege,

    davege said:

    Unfortunately neither myself or my team at the moment has the capacity to move the testing framework implementation to pc-ble-driver, which I used in the past and I can confirm it works great.

    So hopefully I'll just wait here to understand if there's any movement.

    I totally understand, no worries at all. We will get to the bottom of this.

    This might seem trivial, but could you add the following modification to your code, to see if it allows you to receive multiple notification packets per connection event?
    If you are using the heart_rate_collector example, this code block should be inserted in the collector class's open function, in the NRF52 section.
    It should look like this:

        def open(self):
            self.adapter.driver.open()
            if config.__conn_ic_id__.upper() == "NRF51":
                self.adapter.driver.ble_enable(
                    BLEEnableParams(
                        vs_uuid_count=1,
                        service_changed=0,
                        periph_conn_count=0,
                        central_conn_count=1,
                        central_sec_count=0,
                    )
                )
            elif config.__conn_ic_id__.upper() == "NRF52":
                gatt_cfg = BLEConfigConnGatt()
                gatt_cfg.att_mtu = self.adapter.default_mtu
                gatt_cfg.tag = CFG_TAG
                self.adapter.driver.ble_cfg_set(BLEConfig.conn_gatt, gatt_cfg)
    
                conn_cfg = BLEConfigConnGap()
                conn_cfg.conn_count = 1
                conn_cfg.event_length = 320
                self.adapter.driver.ble_cfg_set(BLEConfig.conn_gap, conn_cfg)
    
                self.adapter.driver.ble_enable()

    You will also need to add the BLEConfigConnGap method to the list of globals, imported from driver, which should then look like this:
        global config, BLEDriver, BLEAdvData, BLEEvtID, BLEAdapter, BLEEnableParams, BLEGapTimeoutSrc, BLEUUID, BLEConfigCommon, BLEConfig, BLEConfigConnGatt, BLEConfigConnGap, BLEGapScanParams
        from pc_ble_driver_py import config
    
        config.__conn_ic_id__ = conn_ic_id
        # noinspection PyUnresolvedReferences
        from pc_ble_driver_py.ble_driver import (
            BLEDriver,
            BLEAdvData,
            BLEEvtID,
            BLEEnableParams,
            BLEGapTimeoutSrc,
            BLEUUID,
            BLEGapScanParams,
            BLEConfigCommon,
            BLEConfig,
            BLEConfigConnGatt,
            BLEConfigConnGap,
        )



    The only difference here is that the default event_length parameter is not used, instead, the maximum event_length for the given connection interval is set explicitly.
    If you could test this with your own test-script and let me know if you succeed in getting multiple packets per event, that would be great.

    If it does not enable you to send multiple notifications per connection event, could you provide a trace of the test, with the code above added?

    I will continue to look into this to find the required event_length analytically - instead of just setting it to the maximum value.

    davege said:
    Do you believe that marking this ticket as private can be useful to push things a little bit?

    No, this will not affect the internal requests priority - since the only difference between private tickets and public ones is who may view it.
    Private tickets is of course only viewable by the tickets creator and the Technical Support staff here at Nordic.

    Looking forward to hearing the results of your test!

    Best regards,
    Karl

  • Hi ,

    I tried adding your changes but see no effects whatsoever.

    Did something change with the setup you have?

    I believe that if you find any kind of solution, that will work also on your end (using Nordic UART characteristic or similar).

    I'm a bit frustrated as now I'm at the point where it would be extremely useful to get the notifications running.

    Do you know if nRF Connect Desktop is using pc-ble-driver dll underneath for the connection to the dongle?
    If this is the case, it may be easy to spot the change in the way the API are invoked on two sides.

    Thanks,

    D

  • Hello Davege,

    davege said:
    I'm a bit frustrated as now I'm at the point where it would be extremely useful to get the notifications running.

    Yes, for course. I am sorry to hear that.

    davege said:

    I tried adding your changes but see no effects whatsoever.

    Did something change with the setup you have?

    Could you send a trace of the communication when you test it with the changes?
    With the changes mentioned in my last comment I am seeing multiple notifications per connection event.

    davege said:
    I believe that if you find any kind of solution, that will work also on your end (using Nordic UART characteristic or similar).

    Using the code from my last comment I am seeing multiple notification per connection event on my end.
    If it would be easier for you, I could send you the .py file and .hex file for the peripheral, for you to test this?
    If you are not seeing multiple notifications per event it would be really useful to see a sniffer trace.

    Best regards,
    Karl

  • Hi,

    it would be great to have those, if I can just use a nRF52-DK as a peripheral it is even better.

    I can first test and then providing a sniffing trace to you.

    Thanks

    D

  • Hello Davege,

    Attached you will find a zip file for the peripheral project to be used with SDK v.17.0.2. It is modified to send two notifications every connection event, with a connection interval of 15 ms.
    The attached python file is the file you should run for the central, with the pc-ble-driver-py python package installed.

    Please flash an nRF52 DK with the peripheral firmware, and run a central with the pc-ble-driver-py code, and take a trace of the communication that is happening.
    Ideally we should then see multiple notifications per connection event.

    Looking forward to hearing back from you,

    Best regards,
    Karl
    8255.__ble_app_hrs_multiple_notifications_pca10040.zip

    #
    # Copyright (c) 2016 Nordic Semiconductor ASA
    # All rights reserved.
    #
    # Redistribution and use in source and binary forms, with or without modification,
    # are permitted provided that the following conditions are met:
    #
    #   1. Redistributions of source code must retain the above copyright notice, this
    #   list of conditions and the following disclaimer.
    #
    #   2. Redistributions in binary form must reproduce the above copyright notice, this
    #   list of conditions and the following disclaimer in the documentation and/or
    #   other materials provided with the distribution.
    #
    #   3. Neither the name of Nordic Semiconductor ASA nor the names of other
    #   contributors to this software may be used to endorse or promote products
    #   derived from this software without specific prior written permission.
    #
    #   4. This software must only be used in or with a processor manufactured by Nordic
    #   Semiconductor ASA, or in or with a processor manufactured by a third party that
    #   is used in combination with a processor manufactured by Nordic Semiconductor.
    #
    #   5. Any software provided in binary or object form under this license must not be
    #   reverse engineered, decompiled, modified and/or disassembled.
    #
    # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
    # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
    # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
    # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
    # ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
    # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
    # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
    # ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
    # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
    # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    #
    
    import sys
    import time
    import logging
    from queue import Queue, Empty
    from pc_ble_driver_py.observers import *
    
    TARGET_DEV_NAME = "Nordic_HRM"
    CONNECTIONS = 1
    CFG_TAG = 1
    
    
    def init(conn_ic_id):
        # noinspection PyGlobalUndefined
        global config, BLEDriver, BLEAdvData, BLEEvtID, BLEAdapter, BLEEnableParams, BLEGapTimeoutSrc, BLEUUID, BLEConfigCommon, BLEConfig, BLEConfigConnGatt, BLEConfigConnGap, BLEGapScanParams
        from pc_ble_driver_py import config
    
        config.__conn_ic_id__ = conn_ic_id
        # noinspection PyUnresolvedReferences
        from pc_ble_driver_py.ble_driver import (
            BLEDriver,
            BLEAdvData,
            BLEEvtID,
            BLEEnableParams,
            BLEGapTimeoutSrc,
            BLEUUID,
            BLEGapScanParams,
            BLEConfigCommon,
            BLEConfig,
            BLEConfigConnGatt,
            BLEConfigConnGap,
        )
    
        # noinspection PyUnresolvedReferences
        from pc_ble_driver_py.ble_adapter import BLEAdapter
    
        global nrf_sd_ble_api_ver
        nrf_sd_ble_api_ver = config.sd_api_ver_get()
    
    
    class HRCollector(BLEDriverObserver, BLEAdapterObserver):
        def __init__(self, adapter):
            super(HRCollector, self).__init__()
            self.adapter = adapter
            self.conn_q = Queue()
            self.adapter.observer_register(self)
            self.adapter.driver.observer_register(self)
            self.adapter.default_mtu = 250
    
        def open(self):
            self.adapter.driver.open()
            if config.__conn_ic_id__.upper() == "NRF51":
                self.adapter.driver.ble_enable(
                    BLEEnableParams(
                        vs_uuid_count=1,
                        service_changed=0,
                        periph_conn_count=0,
                        central_conn_count=1,
                        central_sec_count=0,
                    )
                )
            elif config.__conn_ic_id__.upper() == "NRF52":
                gatt_cfg = BLEConfigConnGatt()
                gatt_cfg.att_mtu = self.adapter.default_mtu
                gatt_cfg.tag = CFG_TAG
                self.adapter.driver.ble_cfg_set(BLEConfig.conn_gatt, gatt_cfg)
    
                conn_cfg = BLEConfigConnGap()
                conn_cfg.conn_count = 1
                conn_cfg.event_length = 320
                self.adapter.driver.ble_cfg_set(BLEConfig.conn_gap, conn_cfg)
    
                self.adapter.driver.ble_enable()
    
        def close(self):
            self.adapter.driver.close()
    
        def connect_and_discover(self):
            scan_duration = 5
            params = BLEGapScanParams(interval_ms=200, window_ms=150, timeout_s=scan_duration)
    
            self.adapter.driver.ble_gap_scan_start(scan_params=params)
    
            try:
                new_conn = self.conn_q.get(timeout=scan_duration)
                self.adapter.service_discovery(new_conn)
    
                self.adapter.enable_notification(
                    new_conn, BLEUUID(BLEUUID.Standard.battery_level)
                )
    
                self.adapter.enable_notification(new_conn, BLEUUID(BLEUUID.Standard.heart_rate))
                return new_conn
            except Empty:
                print(f"No heart rate collector advertising with name {TARGET_DEV_NAME} found.")
                return None
    
        def on_gap_evt_connected(
            self, ble_driver, conn_handle, peer_addr, role, conn_params
        ):
            print("New connection: {}".format(conn_handle))
            self.conn_q.put(conn_handle)
    
        def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
            print("Disconnected: {} {}".format(conn_handle, reason))
    
        def on_gap_evt_adv_report(
            self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data
        ):
            if BLEAdvData.Types.complete_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name]
    
            elif BLEAdvData.Types.short_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name]
    
            else:
                return
    
            dev_name = "".join(chr(e) for e in dev_name_list)
            address_string = "".join("{0:02X}".format(b) for b in peer_addr.addr)
            print(
                "Received advertisment report, address: 0x{}, device_name: {}".format(
                    address_string, dev_name
                )
            )
    
            if dev_name == TARGET_DEV_NAME:
                self.adapter.connect(peer_addr, tag=CFG_TAG)
    
        def on_notification(self, ble_adapter, conn_handle, uuid, data):
            if len(data) > 32:
                data = "({}...)".format(data[0:10])
            print("Connection: {}, {} = {}".format(conn_handle, uuid, data))
    
    
    def main(selected_serial_port):
        print("Serial port used: {}".format(selected_serial_port))
        driver = BLEDriver(
            serial_port=selected_serial_port, auto_flash=False, baud_rate=1000000, log_severity_level="debug"
        )
    
        adapter = BLEAdapter(driver)
        collector = HRCollector(adapter)
        collector.open()
        conn = collector.connect_and_discover()
    
        if conn is not None:
            time.sleep(10)
    
        collector.close()
    
    
    def item_choose(item_list):
        for i, it in enumerate(item_list):
            print("\t{} : {}".format(i, it))
        print(" ")
    
        while True:
            try:
                choice = int(input("Enter your choice: "))
                if (choice >= 0) and (choice < len(item_list)):
                    break
            except Exception:
                pass
            print("\tTry again...")
        return choice
    
    
    if __name__ == "__main__":
        logging.basicConfig(
            level="DEBUG",
            format="%(asctime)s [%(thread)d/%(threadName)s] %(message)s",
        )
        serial_port = None
        if len(sys.argv) < 2:
            print("Please specify connectivity IC identifier (NRF51, NRF52)")
            exit(1)
        init(sys.argv[1])
        if len(sys.argv) == 3:
            serial_port = sys.argv[2]
        else:
            descs = BLEDriver.enum_serial_ports()
            choices = ["{}: {}".format(d.port, d.serial_number) for d in descs]
            choice = item_choose(choices)
            serial_port = descs[choice].port
        main(serial_port)
        quit()
    

Reply
  • Hello Davege,

    Attached you will find a zip file for the peripheral project to be used with SDK v.17.0.2. It is modified to send two notifications every connection event, with a connection interval of 15 ms.
    The attached python file is the file you should run for the central, with the pc-ble-driver-py python package installed.

    Please flash an nRF52 DK with the peripheral firmware, and run a central with the pc-ble-driver-py code, and take a trace of the communication that is happening.
    Ideally we should then see multiple notifications per connection event.

    Looking forward to hearing back from you,

    Best regards,
    Karl
    8255.__ble_app_hrs_multiple_notifications_pca10040.zip

    #
    # Copyright (c) 2016 Nordic Semiconductor ASA
    # All rights reserved.
    #
    # Redistribution and use in source and binary forms, with or without modification,
    # are permitted provided that the following conditions are met:
    #
    #   1. Redistributions of source code must retain the above copyright notice, this
    #   list of conditions and the following disclaimer.
    #
    #   2. Redistributions in binary form must reproduce the above copyright notice, this
    #   list of conditions and the following disclaimer in the documentation and/or
    #   other materials provided with the distribution.
    #
    #   3. Neither the name of Nordic Semiconductor ASA nor the names of other
    #   contributors to this software may be used to endorse or promote products
    #   derived from this software without specific prior written permission.
    #
    #   4. This software must only be used in or with a processor manufactured by Nordic
    #   Semiconductor ASA, or in or with a processor manufactured by a third party that
    #   is used in combination with a processor manufactured by Nordic Semiconductor.
    #
    #   5. Any software provided in binary or object form under this license must not be
    #   reverse engineered, decompiled, modified and/or disassembled.
    #
    # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
    # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
    # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
    # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
    # ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
    # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
    # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
    # ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
    # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
    # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    #
    
    import sys
    import time
    import logging
    from queue import Queue, Empty
    from pc_ble_driver_py.observers import *
    
    TARGET_DEV_NAME = "Nordic_HRM"
    CONNECTIONS = 1
    CFG_TAG = 1
    
    
    def init(conn_ic_id):
        # noinspection PyGlobalUndefined
        global config, BLEDriver, BLEAdvData, BLEEvtID, BLEAdapter, BLEEnableParams, BLEGapTimeoutSrc, BLEUUID, BLEConfigCommon, BLEConfig, BLEConfigConnGatt, BLEConfigConnGap, BLEGapScanParams
        from pc_ble_driver_py import config
    
        config.__conn_ic_id__ = conn_ic_id
        # noinspection PyUnresolvedReferences
        from pc_ble_driver_py.ble_driver import (
            BLEDriver,
            BLEAdvData,
            BLEEvtID,
            BLEEnableParams,
            BLEGapTimeoutSrc,
            BLEUUID,
            BLEGapScanParams,
            BLEConfigCommon,
            BLEConfig,
            BLEConfigConnGatt,
            BLEConfigConnGap,
        )
    
        # noinspection PyUnresolvedReferences
        from pc_ble_driver_py.ble_adapter import BLEAdapter
    
        global nrf_sd_ble_api_ver
        nrf_sd_ble_api_ver = config.sd_api_ver_get()
    
    
    class HRCollector(BLEDriverObserver, BLEAdapterObserver):
        def __init__(self, adapter):
            super(HRCollector, self).__init__()
            self.adapter = adapter
            self.conn_q = Queue()
            self.adapter.observer_register(self)
            self.adapter.driver.observer_register(self)
            self.adapter.default_mtu = 250
    
        def open(self):
            self.adapter.driver.open()
            if config.__conn_ic_id__.upper() == "NRF51":
                self.adapter.driver.ble_enable(
                    BLEEnableParams(
                        vs_uuid_count=1,
                        service_changed=0,
                        periph_conn_count=0,
                        central_conn_count=1,
                        central_sec_count=0,
                    )
                )
            elif config.__conn_ic_id__.upper() == "NRF52":
                gatt_cfg = BLEConfigConnGatt()
                gatt_cfg.att_mtu = self.adapter.default_mtu
                gatt_cfg.tag = CFG_TAG
                self.adapter.driver.ble_cfg_set(BLEConfig.conn_gatt, gatt_cfg)
    
                conn_cfg = BLEConfigConnGap()
                conn_cfg.conn_count = 1
                conn_cfg.event_length = 320
                self.adapter.driver.ble_cfg_set(BLEConfig.conn_gap, conn_cfg)
    
                self.adapter.driver.ble_enable()
    
        def close(self):
            self.adapter.driver.close()
    
        def connect_and_discover(self):
            scan_duration = 5
            params = BLEGapScanParams(interval_ms=200, window_ms=150, timeout_s=scan_duration)
    
            self.adapter.driver.ble_gap_scan_start(scan_params=params)
    
            try:
                new_conn = self.conn_q.get(timeout=scan_duration)
                self.adapter.service_discovery(new_conn)
    
                self.adapter.enable_notification(
                    new_conn, BLEUUID(BLEUUID.Standard.battery_level)
                )
    
                self.adapter.enable_notification(new_conn, BLEUUID(BLEUUID.Standard.heart_rate))
                return new_conn
            except Empty:
                print(f"No heart rate collector advertising with name {TARGET_DEV_NAME} found.")
                return None
    
        def on_gap_evt_connected(
            self, ble_driver, conn_handle, peer_addr, role, conn_params
        ):
            print("New connection: {}".format(conn_handle))
            self.conn_q.put(conn_handle)
    
        def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
            print("Disconnected: {} {}".format(conn_handle, reason))
    
        def on_gap_evt_adv_report(
            self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data
        ):
            if BLEAdvData.Types.complete_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name]
    
            elif BLEAdvData.Types.short_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name]
    
            else:
                return
    
            dev_name = "".join(chr(e) for e in dev_name_list)
            address_string = "".join("{0:02X}".format(b) for b in peer_addr.addr)
            print(
                "Received advertisment report, address: 0x{}, device_name: {}".format(
                    address_string, dev_name
                )
            )
    
            if dev_name == TARGET_DEV_NAME:
                self.adapter.connect(peer_addr, tag=CFG_TAG)
    
        def on_notification(self, ble_adapter, conn_handle, uuid, data):
            if len(data) > 32:
                data = "({}...)".format(data[0:10])
            print("Connection: {}, {} = {}".format(conn_handle, uuid, data))
    
    
    def main(selected_serial_port):
        print("Serial port used: {}".format(selected_serial_port))
        driver = BLEDriver(
            serial_port=selected_serial_port, auto_flash=False, baud_rate=1000000, log_severity_level="debug"
        )
    
        adapter = BLEAdapter(driver)
        collector = HRCollector(adapter)
        collector.open()
        conn = collector.connect_and_discover()
    
        if conn is not None:
            time.sleep(10)
    
        collector.close()
    
    
    def item_choose(item_list):
        for i, it in enumerate(item_list):
            print("\t{} : {}".format(i, it))
        print(" ")
    
        while True:
            try:
                choice = int(input("Enter your choice: "))
                if (choice >= 0) and (choice < len(item_list)):
                    break
            except Exception:
                pass
            print("\tTry again...")
        return choice
    
    
    if __name__ == "__main__":
        logging.basicConfig(
            level="DEBUG",
            format="%(asctime)s [%(thread)d/%(threadName)s] %(message)s",
        )
        serial_port = None
        if len(sys.argv) < 2:
            print("Please specify connectivity IC identifier (NRF51, NRF52)")
            exit(1)
        init(sys.argv[1])
        if len(sys.argv) == 3:
            serial_port = sys.argv[2]
        else:
            descs = BLEDriver.enum_serial_ports()
            choices = ["{}: {}".format(d.port, d.serial_number) for d in descs]
            choice = item_choose(choices)
            serial_port = descs[choice].port
        main(serial_port)
        quit()
    

Children
No Data
Related