This post is older than 2 years and might not be relevant anymore
More Info: Consider searching for newer posts

Bonding with pc-ble-driver-py

Hello

System:

Windows 10

nRF Connect v2.6.0 and Bluetooth Low Energy v2.2.0

Python 2.7.13

After some research I didn't find any examples of bonding with pc-ble-driver-py so I decided to ask here.

When I'm trying to do bond using nRF Connect it always done succsessful, I just checking all checkboxes and type passcode after request.

But when I'm trying to do that with pc-ble-driver-py it won't work.

I'm using authenticate() method to do bonding:

res = self.adapter.authenticate(conn_handle, bond     = True,
                                             mitm     = True,
                                             oob      = True,
                                             lesc     = True,
                                             keypress = True)

But in some time after code executed it just will crash.

When I'm trying to pass more arguments to this method I will get the next message: No handlers could be found for logger "pc_ble_driver_py.ble_driver"

To be clear:

I'm trying to bond just after the device is connected(also tested with delay = 3s after connection, nothing changed)

Connection params are exactly the same as in heart_rate_collector.py

Firmware in the dongle: connectivity_1.2.3_usb_with_s132_3.1.hex

Is there a working examples of bonding with pc-ble-driver-py? It would be helpful in solving my problem.

Parents
  • Hi,

    Do you get any error codes when it crash? Have you seen this thread? Unfortunately, we do not have any examples of this in the Python version.

    Best regards,
    Jørgen

  • Can you post the code you are using, for reproducing and debugging this?

  • Sure:

    import sys
    import time
    import Queue
    import logging
    
    from pc_ble_driver_py.observers     import *
    
    TARGET_DEV_ID = "E364492DC7B3"
    CONNECTIONS   = 1
    
    def init(conn_ic_id):
        global BLEDriver,        \
               BLEAdvData,       \
               BLEEvtID,         \
               BLEAdapter,       \
               BLEEnableParams,  \
               BLEGapTimeoutSrc, \
               BLEGapIOCaps,     \
               BLEUUID,          \
               BLEUUIDBase,      \
               BLEGapSecKDist,   \
               BLEGapSecParams,  \
               BLEGapSecStatus
               
        from pc_ble_driver_py import config
        config.__conn_ic_id__ = conn_ic_id
    
        from pc_ble_driver_py.ble_driver    import BLEDriver,        \
                                                   BLEAdvData,       \
                                                   BLEEvtID,         \
                                                   BLEEnableParams,  \
                                                   BLEGapTimeoutSrc, \
                                                   BLEGapIOCaps,     \
                                                   BLEUUID,          \
                                                   BLEUUIDBase,      \
                                                   BLEGapSecKDist,   \
                                                   BLEGapSecParams,  \
                                                   BLEGapSecStatus
                                                   
        from pc_ble_driver_py.ble_adapter   import BLEAdapter
    
        global nrf_sd_ble_api_ver
        nrf_sd_ble_api_ver = config.sd_api_ver_get()
    
    class HRCollector(BLEDriverObserver, BLEAdapterObserver):
        def __init__(self, adapter):
            super(HRCollector, self).__init__()
            self.adapter    = adapter
            self.conn_q     = Queue.Queue()
            self.adapter.observer_register(self)
            self.adapter.driver.observer_register(self)
    
    
        def open(self):
            self.adapter.driver.open()
    
            ble_enable_params = BLEEnableParams(vs_uuid_count      = 1,
                                                service_changed    = False,
                                                periph_conn_count  = 0,
                                                central_conn_count = CONNECTIONS,
                                                central_sec_count  = CONNECTIONS)
            if nrf_sd_ble_api_ver >= 3:
                print("Enabling larger ATT MTUs")
                ble_enable_params.att_mtu = 50
    
            self.adapter.driver.ble_enable(ble_enable_params)
    
    
        def close(self):
            self.adapter.driver.close()
    
    
        def connect_and_discover(self):
            self.adapter.driver.ble_gap_scan_start()
            new_conn = self.conn_q.get(timeout = 60)
    
            if nrf_sd_ble_api_ver >= 3:
                att_mtu = self.adapter.att_mtu_exchange(new_conn)
    
            self.adapter.service_discovery(new_conn)
            #self.adapter.enable_notification(new_conn, BLEUUID(BLEUUID.Standard.battery_level))
            #self.adapter.enable_notification(new_conn, BLEUUID(BLEUUID.Standard.heart_rate))
            return new_conn
    
    
        def on_gap_evt_connected(self, ble_driver, conn_handle, peer_addr, role, conn_params):
            print('New connection: {}'.format(conn_handle))
            #self.conn_q.put(conn_handle)
            
            self.authenticate(conn_handle)
    
        def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
            print('Disconnected: {} {}'.format(conn_handle, reason))
    
    
        def on_gap_evt_timeout(self, ble_driver, conn_handle, src):
            if src == BLEGapTimeoutSrc.scan:
                ble_driver.ble_gap_scan_start()
    
    
        def on_gap_evt_adv_report(self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data):
            dev_name_list = None
            if BLEAdvData.Types.complete_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name]
    
            elif BLEAdvData.Types.short_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name]
    
            else:
                return
    
            dev_name        = "".join(chr(e) for e in dev_name_list)
            address_string  = "".join("{0:02X}".format(b) for b in peer_addr.addr)
            print('Received advertisment report, address: 0x{}, device_name: {}'.format(address_string,
                                                                                        dev_name))
    
            if (address_string == TARGET_DEV_ID):
                self.adapter.connect(peer_addr)
    
        def authenticate(self, conn_handle):
            adapter    = self.adapter
            driver     = self.adapter.driver
            
            kdist_own   = BLEGapSecKDist(enc  = 1,
                                         id   = 1,
                                         sign = 0,
                                         link = 0)
                                         
            kdist_peer  = BLEGapSecKDist(enc  = 1,
                                         id   = 1,
                                         sign = 0,
                                         link = 0)
            
            sec_params  = BLEGapSecParams(bond          = True,
                                          mitm          = True,
                                          lesc          = True,
                                          oob           = True,
                                          keypress      = True,
                                          io_caps       = BLEGapIOCaps.keyboard_display,
                                          min_key_size  = 7,
                                          max_key_size  = 16,
                                          kdist_own     = kdist_own,
                                          kdist_peer    = kdist_peer)
            
            # Authenticate
            driver.ble_gap_authenticate(conn_handle, sec_params)
            
            # Reply after event
            adapter.evt_sync[conn_handle].wait(evt = BLEEvtID.gap_evt_sec_params_request)
            driver.ble_gap_sec_params_reply(conn_handle,
                                            BLEGapSecStatus.success,
                                            sec_params=None)
            # Save keys on success
            result = self.evt_sync[conn_handle].wait(evt = BLEEvtID.gap_evt_auth_status)
            if result['auth_status'] ==  BLEGapSecStatus.success:
                adapter.db_conns[conn_handle]._keyset = BLEGapSecKeyset.from_c(self.driver._keyset)
                
            return result['auth_status']
    
        def on_notification(self, ble_adapter, conn_handle, uuid, data):
            print('Connection: {}, {} = {}'.format(conn_handle, uuid, data))
    
    
        def on_att_mtu_exchanged(self, ble_driver, conn_handle, att_mtu):
            print('ATT MTU exchanged: conn_handle={} att_mtu={}'.format(conn_handle, att_mtu))
    
    
        def on_gattc_evt_exchange_mtu_rsp(self, ble_driver, conn_handle, **kwargs):
            print('ATT MTU exchange response: conn_handle={}'.format(conn_handle))
        
    
    def main(serial_port):
        print('Serial port used: {}'.format(serial_port))
        driver    = BLEDriver(serial_port=serial_port, auto_flash=False)
        adapter   = BLEAdapter(driver)
        collector = HRCollector(adapter)
        collector.open()
        for i in xrange(CONNECTIONS):
            conn_handle = collector.connect_and_discover()
    
        time.sleep(30)
        print('Closing')
        collector.close()
    
    
    def item_choose(item_list):
        for i, it in enumerate(item_list):
            print('\t{} : {}'.format(i, it))
        print(' ')
    
        while True:
            try:
                choice = int(raw_input('Enter your choice: '))
                if ((choice >= 0) and (choice < len(item_list))):
                    break
            except Exception:
                pass
            print ('\tTry again...')
        return choice
    
    
    if __name__ == "__main__":
        serial_port = None
        if len(sys.argv) < 2:
            print("Please specify connectivity IC identifier (NRF51, NRF52)")
            exit(1)
        init(sys.argv[1])
        if len(sys.argv) == 3:
            serial_port = sys.argv[2]
        else:
            descs       = BLEDriver.enum_serial_ports()
            choices     = ['{}: {}'.format(d.port, d.serial_number) for d in descs]
            choice      = item_choose(choices)
            serial_port = descs[choice].port
        main(serial_port)
        quit()
    

    I resolved the problem when I couldn't pass more agruments to the authenticate method, but still can't pair. Also I don't know kdist_own and kdist_peer parameters that used in nRF Connect by default, may it can help to avoid crashes if I put it correctly.

    EDIT: It just slightly edited heart_rate_collector.py but with authenticate() method that is called by on_gap_evt_connected()

Reply
  • Sure:

    import sys
    import time
    import Queue
    import logging
    
    from pc_ble_driver_py.observers     import *
    
    TARGET_DEV_ID = "E364492DC7B3"
    CONNECTIONS   = 1
    
    def init(conn_ic_id):
        global BLEDriver,        \
               BLEAdvData,       \
               BLEEvtID,         \
               BLEAdapter,       \
               BLEEnableParams,  \
               BLEGapTimeoutSrc, \
               BLEGapIOCaps,     \
               BLEUUID,          \
               BLEUUIDBase,      \
               BLEGapSecKDist,   \
               BLEGapSecParams,  \
               BLEGapSecStatus
               
        from pc_ble_driver_py import config
        config.__conn_ic_id__ = conn_ic_id
    
        from pc_ble_driver_py.ble_driver    import BLEDriver,        \
                                                   BLEAdvData,       \
                                                   BLEEvtID,         \
                                                   BLEEnableParams,  \
                                                   BLEGapTimeoutSrc, \
                                                   BLEGapIOCaps,     \
                                                   BLEUUID,          \
                                                   BLEUUIDBase,      \
                                                   BLEGapSecKDist,   \
                                                   BLEGapSecParams,  \
                                                   BLEGapSecStatus
                                                   
        from pc_ble_driver_py.ble_adapter   import BLEAdapter
    
        global nrf_sd_ble_api_ver
        nrf_sd_ble_api_ver = config.sd_api_ver_get()
    
    class HRCollector(BLEDriverObserver, BLEAdapterObserver):
        def __init__(self, adapter):
            super(HRCollector, self).__init__()
            self.adapter    = adapter
            self.conn_q     = Queue.Queue()
            self.adapter.observer_register(self)
            self.adapter.driver.observer_register(self)
    
    
        def open(self):
            self.adapter.driver.open()
    
            ble_enable_params = BLEEnableParams(vs_uuid_count      = 1,
                                                service_changed    = False,
                                                periph_conn_count  = 0,
                                                central_conn_count = CONNECTIONS,
                                                central_sec_count  = CONNECTIONS)
            if nrf_sd_ble_api_ver >= 3:
                print("Enabling larger ATT MTUs")
                ble_enable_params.att_mtu = 50
    
            self.adapter.driver.ble_enable(ble_enable_params)
    
    
        def close(self):
            self.adapter.driver.close()
    
    
        def connect_and_discover(self):
            self.adapter.driver.ble_gap_scan_start()
            new_conn = self.conn_q.get(timeout = 60)
    
            if nrf_sd_ble_api_ver >= 3:
                att_mtu = self.adapter.att_mtu_exchange(new_conn)
    
            self.adapter.service_discovery(new_conn)
            #self.adapter.enable_notification(new_conn, BLEUUID(BLEUUID.Standard.battery_level))
            #self.adapter.enable_notification(new_conn, BLEUUID(BLEUUID.Standard.heart_rate))
            return new_conn
    
    
        def on_gap_evt_connected(self, ble_driver, conn_handle, peer_addr, role, conn_params):
            print('New connection: {}'.format(conn_handle))
            #self.conn_q.put(conn_handle)
            
            self.authenticate(conn_handle)
    
        def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
            print('Disconnected: {} {}'.format(conn_handle, reason))
    
    
        def on_gap_evt_timeout(self, ble_driver, conn_handle, src):
            if src == BLEGapTimeoutSrc.scan:
                ble_driver.ble_gap_scan_start()
    
    
        def on_gap_evt_adv_report(self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data):
            dev_name_list = None
            if BLEAdvData.Types.complete_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name]
    
            elif BLEAdvData.Types.short_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name]
    
            else:
                return
    
            dev_name        = "".join(chr(e) for e in dev_name_list)
            address_string  = "".join("{0:02X}".format(b) for b in peer_addr.addr)
            print('Received advertisment report, address: 0x{}, device_name: {}'.format(address_string,
                                                                                        dev_name))
    
            if (address_string == TARGET_DEV_ID):
                self.adapter.connect(peer_addr)
    
        def authenticate(self, conn_handle):
            adapter    = self.adapter
            driver     = self.adapter.driver
            
            kdist_own   = BLEGapSecKDist(enc  = 1,
                                         id   = 1,
                                         sign = 0,
                                         link = 0)
                                         
            kdist_peer  = BLEGapSecKDist(enc  = 1,
                                         id   = 1,
                                         sign = 0,
                                         link = 0)
            
            sec_params  = BLEGapSecParams(bond          = True,
                                          mitm          = True,
                                          lesc          = True,
                                          oob           = True,
                                          keypress      = True,
                                          io_caps       = BLEGapIOCaps.keyboard_display,
                                          min_key_size  = 7,
                                          max_key_size  = 16,
                                          kdist_own     = kdist_own,
                                          kdist_peer    = kdist_peer)
            
            # Authenticate
            driver.ble_gap_authenticate(conn_handle, sec_params)
            
            # Reply after event
            adapter.evt_sync[conn_handle].wait(evt = BLEEvtID.gap_evt_sec_params_request)
            driver.ble_gap_sec_params_reply(conn_handle,
                                            BLEGapSecStatus.success,
                                            sec_params=None)
            # Save keys on success
            result = self.evt_sync[conn_handle].wait(evt = BLEEvtID.gap_evt_auth_status)
            if result['auth_status'] ==  BLEGapSecStatus.success:
                adapter.db_conns[conn_handle]._keyset = BLEGapSecKeyset.from_c(self.driver._keyset)
                
            return result['auth_status']
    
        def on_notification(self, ble_adapter, conn_handle, uuid, data):
            print('Connection: {}, {} = {}'.format(conn_handle, uuid, data))
    
    
        def on_att_mtu_exchanged(self, ble_driver, conn_handle, att_mtu):
            print('ATT MTU exchanged: conn_handle={} att_mtu={}'.format(conn_handle, att_mtu))
    
    
        def on_gattc_evt_exchange_mtu_rsp(self, ble_driver, conn_handle, **kwargs):
            print('ATT MTU exchange response: conn_handle={}'.format(conn_handle))
        
    
    def main(serial_port):
        print('Serial port used: {}'.format(serial_port))
        driver    = BLEDriver(serial_port=serial_port, auto_flash=False)
        adapter   = BLEAdapter(driver)
        collector = HRCollector(adapter)
        collector.open()
        for i in xrange(CONNECTIONS):
            conn_handle = collector.connect_and_discover()
    
        time.sleep(30)
        print('Closing')
        collector.close()
    
    
    def item_choose(item_list):
        for i, it in enumerate(item_list):
            print('\t{} : {}'.format(i, it))
        print(' ')
    
        while True:
            try:
                choice = int(raw_input('Enter your choice: '))
                if ((choice >= 0) and (choice < len(item_list))):
                    break
            except Exception:
                pass
            print ('\tTry again...')
        return choice
    
    
    if __name__ == "__main__":
        serial_port = None
        if len(sys.argv) < 2:
            print("Please specify connectivity IC identifier (NRF51, NRF52)")
            exit(1)
        init(sys.argv[1])
        if len(sys.argv) == 3:
            serial_port = sys.argv[2]
        else:
            descs       = BLEDriver.enum_serial_ports()
            choices     = ['{}: {}'.format(d.port, d.serial_number) for d in descs]
            choice      = item_choose(choices)
            serial_port = descs[choice].port
        main(serial_port)
        quit()
    

    I resolved the problem when I couldn't pass more agruments to the authenticate method, but still can't pair. Also I don't know kdist_own and kdist_peer parameters that used in nRF Connect by default, may it can help to avoid crashes if I put it correctly.

    EDIT: It just slightly edited heart_rate_collector.py but with authenticate() method that is called by on_gap_evt_connected()

Children
  • Well, now I think I resolved the problem with crushes by moving authentication method into the main function instead of event handler.

  • But I still can't perform bond.

    As you will see in the logs below, I'm facing BLE_GAP_SEC_STATUS_CONFIRM_VALUE error, but ble_gap_authenticate(), ble_gap_sec_params_reply() and sd_ble_gap_auth_key_reply() has done succsessfully(I was checking their output from ble_driver.py when they called a softdevice api functions)

    Here is the log from the console output with debug log:

    Auth 1
    Auth completed
    DEBUG:pc_ble_driver_py.observers:evt> sec_params_request conn(0)
    peer_params(Security Parameters bond(1) mitm(1) lesc(0) keypress(0) io_caps(BLEGapIOCaps.display_only) oob(0) max_key_size(16) min_key_size(0) kdist_own(enc(1) id(1) sign(0) link(0)) kdist_peer(enc(1) id(1) sign(0) link(0)))
    Sec params replying done
    DEBUG:pc_ble_driver_py.observers:evt> auth_key_request conn(0)
    key_type(1)
    Key reply finished with code: 0
    DEBUG:pc_ble_driver_py.observers:evt> auth_status conn(0)
    error_src(1)
    bonded(0)
    sm1_levels(<pc_ble_driver_sd_api_v3.ble_gap_sec_levels_t; proxy of <Swig Object of type 'ble_gap_sec_levels_t *' at 0x01ADF1D0> >)
    sm2_levels(<pc_ble_driver_sd_api_v3.ble_gap_sec_levels_t; proxy of <Swig Object of type 'ble_gap_sec_levels_t *' at 0x01ADF2D8> >)
    kdist_own(enc(0) id(1) sign(1) link(1))
    kdist_peer(enc(0) id(1) sign(1) link(1))
    auth_status(BLEGapSecStatus.confirm_value)
    Result: 1
    Pairing done

    And the code I'm using:

    def authenticate(self, conn_handle):
            
            adapter    = self.adapter                # adapter
            driver     = self.adapter.driver         # driver
            sd_driver  = BLESoftDevice               # softdevice api
            
            kdist_own   = BLEGapSecKDist(enc  = 1,
                                         id   = 1,
                                         sign = 0,
                                         link = 0)
                                         
            kdist_peer  = BLEGapSecKDist(enc  = 1,
                                         id   = 1,
                                         sign = 0,
                                         link = 0)
            
            sec_params  = BLEGapSecParams(bond          = True,
                                          mitm          = True,
                                          lesc          = True,
                                          oob           = True,
                                          keypress      = False,
                                          io_caps       = BLEGapIOCaps.keyboard_display,
                                          min_key_size  = 7,
                                          max_key_size  = 16,
                                          kdist_own     = kdist_own,
                                          kdist_peer    = kdist_peer)
            
            print("Auth 1")
            driver.ble_gap_authenticate(conn_handle, sec_params)
            
            print("Auth completed")
            # Send sec params
            adapter.evt_sync[conn_handle].wait(evt = BLEEvtID.gap_evt_sec_params_request)
            driver.ble_gap_sec_params_reply(conn_handle,
                                            BLEGapSecStatus.success,
                                            None)
                                                         
            print("Sec params replying done")
          
            # Send passcode
            adapter.evt_sync[conn_handle].wait(evt = BLEEvtID.gap_evt_auth_key_request)
            
            key = util.list_to_uint8_array([0x30,0x30,0x30,0x30,0x30,0x30]).cast()
            res = sd_driver.sd_ble_gap_auth_key_reply(driver.rpc_adapter,
                                                      conn_handle,
                                                      0x01,     # PASSCODE
                                                      key)      # ASCII: 000000
            
            print("Key reply finished with code: {}".format(res))       
                    
            # Get pairing results and save key
            result = adapter.evt_sync[conn_handle].wait(evt = BLEEvtID.gap_evt_auth_status)
            
            print("Result: {}".format(result['error_src']))
            if result['auth_status'] ==  BLEGapSecStatus.success:
                print("Success")
                adapter.db_conns[conn_handle]._keyset = BLEGapSecKeyset.from_c(sd_driver._keyset)
                
            print("Pairing done")

  • I solved the problem above, what I had to do is authenticate with display_only iocaps.

  • Thanks .  I'm attempting to do a project that uses a passcode as well.  Do you mind sharing your working example?  I've looked elsewhere but I'm finding it surprisingly hard to find any Python examples.

  • Hi there, I don't have any code or devices right now, so I writed something may help you

    I didn't tested bonding with passcode, so it probably will not work without some changes

    import sys
    import time
    import Queue
    import logging
    
    from pc_ble_driver_py.observers     import *
    
    TARGET_DEV_ID = "000"       #Put your target MAC here (in format like ABCDABCDABCD not AB:CD:AB:CD:AB:CD)
    CONNECTIONS   = 1
    
    def init(conn_ic_id):
        global BLEDriver,        \
               BLEAdvData,       \
               BLEEvtID,         \
               BLEAdapter,       \
               BLEEnableParams,  \
               BLEGapTimeoutSrc, \
               BLEGapIOCaps,     \
               BLEUUID,          \
               BLEUUIDBase,      \
               BLEGapSecKDist,   \
               BLEGapSecParams,  \
               BLEGapSecStatus
               
        from pc_ble_driver_py import config
        config.__conn_ic_id__ = conn_ic_id
    
        from pc_ble_driver_py.ble_driver    import BLEDriver,        \
                                                   BLEAdvData,       \
                                                   BLEEvtID,         \
                                                   BLEEnableParams,  \
                                                   BLEGapTimeoutSrc, \
                                                   BLEGapIOCaps,     \
                                                   BLEUUID,          \
                                                   BLEUUIDBase,      \
                                                   BLEGapSecKDist,   \
                                                   BLEGapSecParams,  \
                                                   BLEGapSecStatus
                                                   
        from pc_ble_driver_py.ble_adapter   import BLEAdapter
    
        global nrf_sd_ble_api_ver
        nrf_sd_ble_api_ver = config.sd_api_ver_get()
    
    class HRCollector(BLEDriverObserver, BLEAdapterObserver):
        def __init__(self, adapter):
            super(HRCollector, self).__init__()
            self.adapter    = adapter
            self.conn_q     = Queue.Queue()
            self.adapter.observer_register(self)
            self.adapter.driver.observer_register(self)
            
            self.keyset = None       # There we will store keys after bonding
            self.conn_handle = None  # Connection handler
    
    
        def open(self):
            self.adapter.driver.open()
    
            ble_enable_params = BLEEnableParams(vs_uuid_count      = 1,
                                                service_changed    = False,
                                                periph_conn_count  = 0,
                                                central_conn_count = CONNECTIONS,
                                                central_sec_count  = CONNECTIONS)
            if nrf_sd_ble_api_ver >= 3:
                print("Enabling larger ATT MTUs")
                ble_enable_params.att_mtu = 50
    
            self.adapter.driver.ble_enable(ble_enable_params)
    
    
        def close(self):
            self.adapter.driver.close()
    
    
        def connect_and_discover(self):
            self.adapter.driver.ble_gap_scan_start()
            new_conn = self.conn_q.get(timeout = 60)
    
            if nrf_sd_ble_api_ver >= 3:
                att_mtu = self.adapter.att_mtu_exchange(new_conn)
    
            self.adapter.service_discovery(new_conn)
            #self.adapter.enable_notification(new_conn, BLEUUID(BLEUUID.Standard.battery_level))
            #self.adapter.enable_notification(new_conn, BLEUUID(BLEUUID.Standard.heart_rate))
            return new_conn
    
        # Don't remember exactly on which event we have to reply after encryption (1 or 2)
        # 1
        #def on_gap_evt_sec_params_request(self, ble_driver, conn_handle, peer_params):
        #    pass
         
        # 2
        #def on_gap_evt_sec_info_request(self, ble_driver, conn_handle, peer_addr, master_id, enc_info, id_info, sign_info):
        #    pass
    
        def on_gap_evt_connected(self, ble_driver, conn_handle, peer_addr, role, conn_params):
            print('New connection: {}'.format(conn_handle))
            self.conn_q.put(conn_handle)
            
            self.conn_handle = conn_handle
            
            if self.keyset is not None:
                keys    = self.keyset
                
                ediv    = keys.enc_key.master_id.ediv
                rand    = keys.enc_key.master_id.rand
                
                ltk     = keys.enc_key.enc_info.ltk
                auth    = keys.enc_key.enc_info.auth
                lesc    = keys.enc_key.enc_info.auth
                ltk_len = keys.enc_key.enc_info.ltk_len
            
                self.adapter.encrypt(conn_handle,
                                     ediv=ediv,
                                     rand=rand,
                                     ltk=ltk,
                                     auth=auth,
                                     lesc=lesc,
                                     ltk_len=ltk_len)
    
        def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
            print('Disconnected: {} {}'.format(conn_handle, reason))
            
            self.conn_handle = None
    
    
        def on_gap_evt_timeout(self, ble_driver, conn_handle, src):
            if src == BLEGapTimeoutSrc.scan:
                ble_driver.ble_gap_scan_start()
    
    
        def on_gap_evt_adv_report(self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data):
            dev_name_list = None
            if BLEAdvData.Types.complete_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name]
    
            elif BLEAdvData.Types.short_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name]
    
            else:
                return
    
            dev_name        = "".join(chr(e) for e in dev_name_list)
            address_string  = "".join("{0:02X}".format(b) for b in peer_addr.addr)
            print('Received advertisment report, address: 0x{}, device_name: {}'.format(address_string,
                                                                                        dev_name))
    
            if (address_string == TARGET_DEV_ID):
                self.adapter.connect(peer_addr)
    
        def authenticate(self):
            """Do bonding
            
            :return: Success status (True/False)
            """
            if self.conn_handle is None:
                return False
        
            adapter    = self.adapter
            driver     = self.adapter.driver
            
            kdist_own   = BLEGapSecKDist(enc  = 1,
                                         id   = 1,
                                         sign = 0,
                                         link = 0)
                                         
            kdist_peer  = BLEGapSecKDist(enc  = 1,
                                         id   = 1,
                                         sign = 0,
                                         link = 0)
            
            sec_params  = BLEGapSecParams(bond          = True,
                                          mitm          = True,
                                          lesc          = True,
                                          oob           = True,
                                          keypress      = True,
                                          io_caps       = BLEGapIOCaps.keyboard_display,
                                          min_key_size  = 7,
                                          max_key_size  = 16,
                                          kdist_own     = kdist_own,
                                          kdist_peer    = kdist_peer)
            
            # Authenticate
            driver.ble_gap_authenticate(conn_handle, sec_params)
            
            # Reply after event
            adapter.evt_sync[conn_handle].wait(evt = BLEEvtID.gap_evt_sec_params_request)
            driver.ble_gap_sec_params_reply(conn_handle,
                                            BLEGapSecStatus.success,
                                            sec_params=None)
            # Save keys on success
            result = self.evt_sync[conn_handle].wait(evt = BLEEvtID.gap_evt_auth_status)
            if result['auth_status'] ==  BLEGapSecStatus.success:
                self.keyset = adapter.db_conns[conn_handle]._keyset
                return True
               
            return False
    
        def save_keys(self):
            """Saving keys into the file
            """
            
            # Save self.keyset
            
            pass
            
        def load_keys(self):
            """Load keys from the file
            """
            
            # Load into self.keyset from file
            
            adapter.db_conns[conn_handle]._keyset = self.keyset # And load it into the driver
            
            pass
         
        def on_notification(self, ble_adapter, conn_handle, uuid, data):
            print('Connection: {}, {} = {}'.format(conn_handle, uuid, data))
    
    
        def on_att_mtu_exchanged(self, ble_driver, conn_handle, att_mtu):
            print('ATT MTU exchanged: conn_handle={} att_mtu={}'.format(conn_handle, att_mtu))
    
    
        def on_gattc_evt_exchange_mtu_rsp(self, ble_driver, conn_handle, **kwargs):
            print('ATT MTU exchange response: conn_handle={}'.format(conn_handle))
        
    
    def main(serial_port):
        print('Serial port used: {}'.format(serial_port))
        driver    = BLEDriver(serial_port=serial_port, auto_flash=False)
        adapter   = BLEAdapter(driver)
        collector = HRCollector(adapter)
        collector.open()
        for i in xrange(CONNECTIONS):
            conn_handle = collector.connect_and_discover()  # Connect
            collector.authenticate()                        # Do bonding (Note: don't use this method in any event handler defined in pc_ble_driver_py.observers
                                                            # such as on_connected(), so that program will not hang)
            
            collector.save_keys()                           # Save keys so in the next program's run we don't have to do bonding again
            #collector.load_keys()
            
            # wait for the device ready to connect
            # wait_ms(5000)
            
            collector.connect_and_discover()                # Connect
    
        time.sleep(30)
        print('Closing')
        collector.close()
    
    
    def item_choose(item_list):
        for i, it in enumerate(item_list):
            print('\t{} : {}'.format(i, it))
        print(' ')
    
        while True:
            try:
                choice = int(raw_input('Enter your choice: '))
                if ((choice >= 0) and (choice < len(item_list))):
                    break
            except Exception:
                pass
            print ('\tTry again...')
        return choice
    
    
    if __name__ == "__main__":
        serial_port = None
        if len(sys.argv) < 2:
            print("Please specify connectivity IC identifier (NRF51, NRF52)")
            exit(1)
        init(sys.argv[1])
        if len(sys.argv) == 3:
            serial_port = sys.argv[2]
        else:
            descs       = BLEDriver.enum_serial_ports()
            choices     = ['{}: {}'.format(d.port, d.serial_number) for d in descs]
            choice      = item_choose(choices)
            serial_port = descs[choice].port
        main(serial_port)
        quit()
    

Related