This post is older than 2 years and might not be relevant anymore
More Info: Consider searching for newer posts

Bonding with pc-ble-driver-py

Hello

System:

Windows 10

nRF Connect v2.6.0 and Bluetooth Low Energy v2.2.0

Python 2.7.13

After some research I didn't find any examples of bonding with pc-ble-driver-py so I decided to ask here.

When I'm trying to do bond using nRF Connect it always done succsessful, I just checking all checkboxes and type passcode after request.

But when I'm trying to do that with pc-ble-driver-py it won't work.

I'm using authenticate() method to do bonding:

res = self.adapter.authenticate(conn_handle, bond     = True,
                                             mitm     = True,
                                             oob      = True,
                                             lesc     = True,
                                             keypress = True)

But in some time after code executed it just will crash.

When I'm trying to pass more arguments to this method I will get the next message: No handlers could be found for logger "pc_ble_driver_py.ble_driver"

To be clear:

I'm trying to bond just after the device is connected(also tested with delay = 3s after connection, nothing changed)

Connection params are exactly the same as in heart_rate_collector.py

Firmware in the dongle: connectivity_1.2.3_usb_with_s132_3.1.hex

Is there a working examples of bonding with pc-ble-driver-py? It would be helpful in solving my problem.

  • Hi,

    Do you get any error codes when it crash? Have you seen this thread? Unfortunately, we do not have any examples of this in the Python version.

    Best regards,
    Jørgen

  • Hello,

    I haven't got any errors when it crash, but I found that the program crashes while executing sd_ble_gap_sec_params_reply()

  • Can you post the code you are using, for reproducing and debugging this?

  • Sure:

    import sys
    import time
    import Queue
    import logging
    
    from pc_ble_driver_py.observers     import *
    
    TARGET_DEV_ID = "E364492DC7B3"
    CONNECTIONS   = 1
    
    def init(conn_ic_id):
        global BLEDriver,        \
               BLEAdvData,       \
               BLEEvtID,         \
               BLEAdapter,       \
               BLEEnableParams,  \
               BLEGapTimeoutSrc, \
               BLEGapIOCaps,     \
               BLEUUID,          \
               BLEUUIDBase,      \
               BLEGapSecKDist,   \
               BLEGapSecParams,  \
               BLEGapSecStatus
               
        from pc_ble_driver_py import config
        config.__conn_ic_id__ = conn_ic_id
    
        from pc_ble_driver_py.ble_driver    import BLEDriver,        \
                                                   BLEAdvData,       \
                                                   BLEEvtID,         \
                                                   BLEEnableParams,  \
                                                   BLEGapTimeoutSrc, \
                                                   BLEGapIOCaps,     \
                                                   BLEUUID,          \
                                                   BLEUUIDBase,      \
                                                   BLEGapSecKDist,   \
                                                   BLEGapSecParams,  \
                                                   BLEGapSecStatus
                                                   
        from pc_ble_driver_py.ble_adapter   import BLEAdapter
    
        global nrf_sd_ble_api_ver
        nrf_sd_ble_api_ver = config.sd_api_ver_get()
    
    class HRCollector(BLEDriverObserver, BLEAdapterObserver):
        def __init__(self, adapter):
            super(HRCollector, self).__init__()
            self.adapter    = adapter
            self.conn_q     = Queue.Queue()
            self.adapter.observer_register(self)
            self.adapter.driver.observer_register(self)
    
    
        def open(self):
            self.adapter.driver.open()
    
            ble_enable_params = BLEEnableParams(vs_uuid_count      = 1,
                                                service_changed    = False,
                                                periph_conn_count  = 0,
                                                central_conn_count = CONNECTIONS,
                                                central_sec_count  = CONNECTIONS)
            if nrf_sd_ble_api_ver >= 3:
                print("Enabling larger ATT MTUs")
                ble_enable_params.att_mtu = 50
    
            self.adapter.driver.ble_enable(ble_enable_params)
    
    
        def close(self):
            self.adapter.driver.close()
    
    
        def connect_and_discover(self):
            self.adapter.driver.ble_gap_scan_start()
            new_conn = self.conn_q.get(timeout = 60)
    
            if nrf_sd_ble_api_ver >= 3:
                att_mtu = self.adapter.att_mtu_exchange(new_conn)
    
            self.adapter.service_discovery(new_conn)
            #self.adapter.enable_notification(new_conn, BLEUUID(BLEUUID.Standard.battery_level))
            #self.adapter.enable_notification(new_conn, BLEUUID(BLEUUID.Standard.heart_rate))
            return new_conn
    
    
        def on_gap_evt_connected(self, ble_driver, conn_handle, peer_addr, role, conn_params):
            print('New connection: {}'.format(conn_handle))
            #self.conn_q.put(conn_handle)
            
            self.authenticate(conn_handle)
    
        def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
            print('Disconnected: {} {}'.format(conn_handle, reason))
    
    
        def on_gap_evt_timeout(self, ble_driver, conn_handle, src):
            if src == BLEGapTimeoutSrc.scan:
                ble_driver.ble_gap_scan_start()
    
    
        def on_gap_evt_adv_report(self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data):
            dev_name_list = None
            if BLEAdvData.Types.complete_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name]
    
            elif BLEAdvData.Types.short_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name]
    
            else:
                return
    
            dev_name        = "".join(chr(e) for e in dev_name_list)
            address_string  = "".join("{0:02X}".format(b) for b in peer_addr.addr)
            print('Received advertisment report, address: 0x{}, device_name: {}'.format(address_string,
                                                                                        dev_name))
    
            if (address_string == TARGET_DEV_ID):
                self.adapter.connect(peer_addr)
    
        def authenticate(self, conn_handle):
            adapter    = self.adapter
            driver     = self.adapter.driver
            
            kdist_own   = BLEGapSecKDist(enc  = 1,
                                         id   = 1,
                                         sign = 0,
                                         link = 0)
                                         
            kdist_peer  = BLEGapSecKDist(enc  = 1,
                                         id   = 1,
                                         sign = 0,
                                         link = 0)
            
            sec_params  = BLEGapSecParams(bond          = True,
                                          mitm          = True,
                                          lesc          = True,
                                          oob           = True,
                                          keypress      = True,
                                          io_caps       = BLEGapIOCaps.keyboard_display,
                                          min_key_size  = 7,
                                          max_key_size  = 16,
                                          kdist_own     = kdist_own,
                                          kdist_peer    = kdist_peer)
            
            # Authenticate
            driver.ble_gap_authenticate(conn_handle, sec_params)
            
            # Reply after event
            adapter.evt_sync[conn_handle].wait(evt = BLEEvtID.gap_evt_sec_params_request)
            driver.ble_gap_sec_params_reply(conn_handle,
                                            BLEGapSecStatus.success,
                                            sec_params=None)
            # Save keys on success
            result = self.evt_sync[conn_handle].wait(evt = BLEEvtID.gap_evt_auth_status)
            if result['auth_status'] ==  BLEGapSecStatus.success:
                adapter.db_conns[conn_handle]._keyset = BLEGapSecKeyset.from_c(self.driver._keyset)
                
            return result['auth_status']
    
        def on_notification(self, ble_adapter, conn_handle, uuid, data):
            print('Connection: {}, {} = {}'.format(conn_handle, uuid, data))
    
    
        def on_att_mtu_exchanged(self, ble_driver, conn_handle, att_mtu):
            print('ATT MTU exchanged: conn_handle={} att_mtu={}'.format(conn_handle, att_mtu))
    
    
        def on_gattc_evt_exchange_mtu_rsp(self, ble_driver, conn_handle, **kwargs):
            print('ATT MTU exchange response: conn_handle={}'.format(conn_handle))
        
    
    def main(serial_port):
        print('Serial port used: {}'.format(serial_port))
        driver    = BLEDriver(serial_port=serial_port, auto_flash=False)
        adapter   = BLEAdapter(driver)
        collector = HRCollector(adapter)
        collector.open()
        for i in xrange(CONNECTIONS):
            conn_handle = collector.connect_and_discover()
    
        time.sleep(30)
        print('Closing')
        collector.close()
    
    
    def item_choose(item_list):
        for i, it in enumerate(item_list):
            print('\t{} : {}'.format(i, it))
        print(' ')
    
        while True:
            try:
                choice = int(raw_input('Enter your choice: '))
                if ((choice >= 0) and (choice < len(item_list))):
                    break
            except Exception:
                pass
            print ('\tTry again...')
        return choice
    
    
    if __name__ == "__main__":
        serial_port = None
        if len(sys.argv) < 2:
            print("Please specify connectivity IC identifier (NRF51, NRF52)")
            exit(1)
        init(sys.argv[1])
        if len(sys.argv) == 3:
            serial_port = sys.argv[2]
        else:
            descs       = BLEDriver.enum_serial_ports()
            choices     = ['{}: {}'.format(d.port, d.serial_number) for d in descs]
            choice      = item_choose(choices)
            serial_port = descs[choice].port
        main(serial_port)
        quit()
    

    I resolved the problem when I couldn't pass more agruments to the authenticate method, but still can't pair. Also I don't know kdist_own and kdist_peer parameters that used in nRF Connect by default, may it can help to avoid crashes if I put it correctly.

    EDIT: It just slightly edited heart_rate_collector.py but with authenticate() method that is called by on_gap_evt_connected()

  • Well, now I think I resolved the problem with crushes by moving authentication method into the main function instead of event handler.

Related