Connecting to device using pc-ble-driver-py causes disconnect after 60 seconds

In order to test my custom BLE protocol on my device I am attempting to use the pc-ble-driver-py Python module (either 0.11.4 or latest version) to connect to my BLE peripheral running with a modified version of the app-template from SDK v12.3.0 on nRF52832.

After scanning for, connecting to, and discovering MTU and services I have my Python script do a busy wait.

On both the device and in my Python script I observe a disconnect after ~63 seconds.

If I connect to my device using nRFConnect PC tool, I don't observe this problem and am able to stay connected indefinitely to my device.

One theory is that my BLE peripheral is failing the BLE connection parameter update sequence but I am not setting the "disconnect on failure" flag when I initialize this (see code below). Is there some other way that I might be triggering a disconnect somehow?

#define FIRST_CONN_PARAMS_UPDATE_DELAY   APP_TIMER_TICKS(5000, APP_TIMER_PRESCALER) /**< Time from initiating event (connect or start of notification) to first time sd_ble_gap_conn_param_update is called (5 seconds). */
#define NEXT_CONN_PARAMS_UPDATE_DELAY    APP_TIMER_TICKS(30000, APP_TIMER_PRESCALER)/**< Time between each call to sd_ble_gap_conn_param_update after the first call (30 seconds). */
#define MAX_CONN_PARAMS_UPDATE_COUNT     3                                          /**< Number of attempts before giving up the connection parameter negotiation. */

/**@brief Function for initializing the Connection Parameters module.
 */
static void BLE_ConnectionParamsInit(void)
{
    uint32_t               err_code;
    ble_conn_params_init_t cp_init;

    memset(&cp_init, 0, sizeof(cp_init));

    cp_init.p_conn_params                  = NULL;
    cp_init.first_conn_params_update_delay = FIRST_CONN_PARAMS_UPDATE_DELAY;
    cp_init.next_conn_params_update_delay  = NEXT_CONN_PARAMS_UPDATE_DELAY;
    cp_init.max_conn_params_update_count   = MAX_CONN_PARAMS_UPDATE_COUNT;
    cp_init.start_on_notify_cccd_handle    = BLE_GATT_HANDLE_INVALID;
    cp_init.disconnect_on_fail             = false;
    cp_init.evt_handler                    = BLE_OnConnectionParamsEvent;
    cp_init.error_handler                  = BLE_OnConnectionParamsError;

    err_code = ble_conn_params_init(&cp_init);
    APP_ERROR_CHECK(err_code);
}

/**@brief Function for handling a Connection Parameters error.
 *
 * @param[in] nrf_error  Error code containing information about what went wrong.
 */
static void BLE_OnConnectionParamsError(uint32_t nrf_error)
{
    APP_ERROR_HANDLER(nrf_error);
}

/**@brief Function for handling the Connection Parameters Module.
 *
 * @details This function will be called for all events in the Connection Parameters Module which
 *          are passed to the application.
 *          @note All this function does is to disconnect. This could have been done by simply
 *                setting the disconnect_on_fail config parameter, but instead we use the event
 *                handler mechanism to demonstrate its use.
 *
 * @param[in] p_evt  Event received from the Connection Parameters Module.
 */
static void BLE_OnConnectionParamsEvent(ble_conn_params_evt_t * p_evt)
{
    uint32_t err_code;

    if (p_evt->evt_type == BLE_CONN_PARAMS_EVT_FAILED)
    {
        DBG_PrintLn(DBG_APPLICATION, "Connection Parameter interval unacceptable");
        err_code = sd_ble_gap_disconnect(sConnectionHandle, BLE_HCI_CONN_INTERVAL_UNACCEPTABLE);
        APP_ERROR_CHECK(err_code);
    }
}

Parents
No Data
Reply
  • controller.py
    import re
    import time
    try:
        import Queue
    except ImportError:
        import queue as Queue
    import logging
    logger = logging.getLogger(__name__)
    
    from pc_ble_driver_py import config
    
    
    config.__conn_ic_id__ = "NRF52"
    nrf_sd_ble_api_ver = config.sd_api_ver_get()
    
    
    # These imports must come after config import and assigning of __conn_ic_id__
    from pc_ble_driver_py.ble_driver import (
        BLEDriver,
        BLEAdvData,
        BLEEvtID,
        BLEEnableParams,
        BLEGapTimeoutSrc,
        BLEGattStatusCode,
        BLEUUID,
        BLEUUIDBase)
    if nrf_sd_ble_api_ver >= 5:
        from pc_ble_driver_py.ble_driver import (
            BLEConfig,
            BLEConfigConnGatt)
        CFG_TAG = 1
    from pc_ble_driver_py.ble_adapter import BLEAdapter
    from pc_ble_driver_py.exceptions import NordicSemiException
    from pc_ble_driver_py.observers  import BLEAdapterObserver, BLEDriverObserver, get_addr_str
    
    
    CONNECTIONS = 1
    
    
    def get_input_as_list(data):
        if isinstance(data, str):
            return map(ord, data)
        else:
            return list(data)
    
    
    class Central(BLEDriverObserver):
        def __init__(self, serial_port):
            super(Central, self).__init__()
            self.adapter = BLEAdapter(BLEDriver(serial_port=serial_port,
                                                auto_flash=True))
            self.connect_filter = None
            self.conn_q = Queue.Queue()
            self.disc_q = Queue.Queue()
            self.scan_q = Queue.Queue()
            self.debug = False
            self.peripherals = dict()
            self.dev_name_to_address = dict()
            self.scan_data = dict()
            self.scan_filter = None
            self.adapter.driver.observer_register(self)
    
    
        def open(self, vs_uuid_count=1):
            self.adapter.driver.open()
            if nrf_sd_ble_api_ver >= 5:
                self.adapter.default_mtu = 250
                gatt_cfg = BLEConfigConnGatt()
                gatt_cfg.att_mtu = self.adapter.default_mtu
                gatt_cfg.tag = CFG_TAG
                self.adapter.driver.ble_cfg_set(BLEConfig.conn_gatt, gatt_cfg)
                self.adapter.driver.ble_enable()
            elif nrf_sd_ble_api_ver >= 3:
                ble_enable_params = BLEEnableParams(vs_uuid_count      = vs_uuid_count,
                                                    service_changed    = False,
                                                    periph_conn_count  = 0,
                                                    central_conn_count = CONNECTIONS,
                                                    central_sec_count  = CONNECTIONS)
                ble_enable_params.att_mtu = 50
                self.adapter.driver.ble_enable(ble_enable_params)
    
    
        def close(self):
            self.adapter.driver.close()
    
    
        def add_custom_uuid(self, custom_uuid):
            # Add big-endian reversed version of custom_uuid
            uuid_base = BLEUUIDBase(custom_uuid[::-1])
            self.adapter.driver.ble_vs_uuid_add(uuid_base)
            return uuid_base
    
    
        def connect(self, dev_name_or_address, timeout=20):
            if self.scan_filter is None:
                short_scan = True
                self.scan_start(dev_name_or_address)
            else:
                short_scan = False
            self.connect_filter = dev_name_or_address
            try:
                peer_addr = self.scan_q.get(timeout=timeout)
            except Queue.Empty:
                peer_addr = None
            if short_scan:
                self.scan_stop()
            if peer_addr:
                self.connect_filter = None
                conn_handle = self.__connect_and_verify(peer_addr, conn_timeout=timeout)
                if conn_handle is not None:
                    start_time = time.time()
                    logger.info('Discovering services and MTU')
                    # Perform MTU exchange if applicable
                    if nrf_sd_ble_api_ver >= 5:
                        att_mtu = self.adapter.att_mtu_exchange(conn_handle, mtu=self.adapter.default_mtu)
                    elif nrf_sd_ble_api_ver >= 3:
                        att_mtu = self.adapter.att_mtu_exchange(conn_handle)
                    else:
                        att_mtu = 23
                    self.adapter.service_discovery(conn_handle)
                    self.peripherals[conn_handle] = Peripheral(self, conn_handle, att_mtu, peer_addr, start_time)
                    logger.info('Connection complete for "{}"'.format(dev_name_or_address))
                    return self.peripherals[conn_handle]
            else:
                logger.error('Peripheral name or address "{}" not found'.format(dev_name_or_address))
            return None
    
    
        def get_scan_data(self, dev_name_or_address, timeout=10):
            if dev_name_or_address in self.dev_name_to_address:
                address = self.dev_name_to_address[dev_name_or_address]
            else:
                address = dev_name_or_address
            # FIXME: Replace this if with a busy wait loop up to timeout?
            if address not in self.scan_data:
                return None
            try:
                return self.scan_data[address].get(timeout=timeout)
            except Queue.Empty:
                pass
            return None
    
    
        def scan_start(self, scan_filter=None):
            if scan_filter:
                logger.info('Scanning for names or addresses that match {!r}'.format(scan_filter))
            else:
                logger.info('Scanning for all peripherals')
            if scan_filter != self.scan_filter:
                self.dev_name_to_address.clear()
            self.scan_filter = scan_filter
            self.adapter.driver.ble_gap_scan_start()
    
    
        def scan_stop(self):
            try:
                self.adapter.driver.ble_gap_scan_stop()
            except NordicSemiException:
                pass
    
    
        def on_gap_evt_connected(self, ble_driver, conn_handle, peer_addr, role, conn_params):
            super(Central, self).on_gap_evt_connected(ble_driver, conn_handle, peer_addr, role, conn_params)
            self.conn_q.put(conn_handle)
    
    
        def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
            super(Central, self).on_gap_evt_disconnected(ble_driver, conn_handle, reason)
            if conn_handle in self.peripherals:
                self.peripherals[conn_handle].remote_close(reason)
            else:
                logger.info('Connection closed: {}'.format(reason))
            self.disc_q.put(conn_handle)
    
    
        def on_gap_evt_timeout(self, ble_driver, conn_handle, src):
            super(Central, self).on_gap_evt_timeout(ble_driver, conn_handle, src)
            if src == BLEGapTimeoutSrc.scan:
                if self.debug:
                    logger.debug('Restarting scan')
                ble_driver.ble_gap_scan_start()
    
    
        def on_gap_evt_adv_report(self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data):
            super(Central, self).on_gap_evt_adv_report(ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data)
            dev_name_list = None
            address  = get_addr_str(peer_addr)
            if BLEAdvData.Types.complete_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name]
            elif BLEAdvData.Types.short_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name]
            elif BLEAdvData.Types.manufacturer_specific_data in adv_data.records:
                if address in self.scan_data:
                    data = adv_data.records[BLEAdvData.Types.manufacturer_specific_data]
                    if self.debug:
                        logger.debug("SCN: {} {}".format(address, "".join("{:02x}".format(c) for c in data)))
                    self.scan_data[address].put(bytearray(data))
                return
            else:
                return
    
            dev_name = "".join(chr(e) for e in dev_name_list)
            if self.scan_filter is None or re.search(self.scan_filter, dev_name) or re.search(self.scan_filter, address):
                # Allow collecting scan data from this address
                if address not in self.scan_data:
                    # Add device name to address lookup
                    if len(dev_name) > 1:
                        self.dev_name_to_address[dev_name] = address
                    self.scan_data[address] = Queue.Queue()
                if self.debug:
                    logger.debug('Adv: address: 0x{}, rssi: {}, device_name: {}'.format(address,
                                                                                 rssi,
                                                                                 dev_name))
            # Are we waiting to connect to this peripheral? Then return peer_addr
            if self.connect_filter and (re.search(self.connect_filter, dev_name) or
                                        re.search(self.connect_filter, address)):
                self.scan_q.put(peer_addr)
    
    
        def __connect_and_verify(self, peer_addr, conn_timeout=20, disc_timeout=1):
            address = get_addr_str(peer_addr)
            retries = 1
            while retries <= 5:
                # Attempt connection
                logger.info('Connecting to "{}" (attempt {})'.format(address, retries))
                if nrf_sd_ble_api_ver >= 5:
                    self.adapter.connect(peer_addr, tag=CFG_TAG)
                else:
                    self.adapter.connect(peer_addr)
                try:
                    conn_handle = self.conn_q.get(timeout=conn_timeout)
                except Queue.Empty:
                    logger.error('Unable to connect to "{}" in {}s'.format(address, conn_timeout))
                    retries += 1
                    continue
                
                # Verify connection (monitor for disconnects)
                disc_handle = None
                while conn_handle != disc_handle:
                    try:
                        disc_handle = self.disc_q.get(timeout=disc_timeout)
                    except Queue.Empty:
                        # Connection is stable if no disconnect events occurred within timeout
                        break
                if conn_handle == disc_handle:
                    retries += 1
                    # Wait 1 second between attempts
                    time.sleep(1)
                    continue
                else:
                    # Connection is stable
                    break
            else:
                raise RuntimeError('Unable to establish stable connection to "{}"'.format(address))
            return conn_handle
    
    
    class Peripheral(BLEAdapterObserver):
        def __init__(self, central, conn_handle, att_mtu, peer_addr, start_time):
            self.central = central
            self.conn_handle = conn_handle
            self.att_mtu = att_mtu
            self.peer_addr = peer_addr
            self.disconnect_reason = None
            self.notifications = dict()
            self.start_time = start_time
            self.central.adapter.observer_register(self)
    
    
        def is_connected(self):
            return self.conn_handle is not None
    
    
        def disconnect(self):
            if self.conn_handle is not None:
                logger.info('Requesting disconnect...')
                self.central.adapter.disconnect(self.conn_handle)
    
    
        def remote_close(self, disconnect_reason):
            elapsed_time = time.time() - self.start_time
            logger.info('Connection closed after {} seconds with reason {}'.format(elapsed_time, disconnect_reason))
            self.central.adapter.observer_unregister(self)
            self.disconnect_reason = None
            self.conn_handle = None
    
    
        def enable_notification(self, uuid, base=None):
            self.notifications[uuid] = Queue.Queue()
            self.central.adapter.enable_notification(self.conn_handle, BLEUUID(uuid, base=base))
    
    
        def disable_notification(self, uuid, base=None):
            self.central.adapter.disable_notification(self.conn_handle, BLEUUID(uuid, base=base))
            del self.notifications[uuid.value]
    
    
        def get_notification(self, uuid, timeout=10):
            if uuid not in self.notifications:
                raise RuntimeError("Notification {:02X} is not enabled".format(uuid))
            try:
                return self.notifications[uuid].get(timeout=timeout)
            except Queue.Empty:
                pass
            return None
    
    
        def read(self, uuid, base=None):
            # Use read_req to get first few bytes (offset=0)
            status, data = self._read_req(uuid, base=base)
            if not status:
                return None
            # If data filled maximum size allowed there might be more data
            elif len(data) == (self.att_mtu - 1):
                # Read more while status is OK and length is not 0
                message = []
                while status and len(data):
                    if data:
                        message.extend(data)
                    status, data = self._read_req(uuid, len(message), base=base)
            # Otherwise, just return the short data received
            else:
                return bytearray(data)
            return bytearray(message)
    
    
        def write(self, uuid, data, base=None, no_response=False):
            # Use write command if data is within att_mtu - 3 bytes of overhead
            if len(data) <= (self.att_mtu - 3):
                if no_response:
                    self._write_req(uuid, data, base=base)
                else:
                    self._write_cmd(uuid, data, base=base)
            else:
                # Due to overhead, delayed writes are 5 bytes less than att_mtu
                step = self.att_mtu - 5
                for i in range(0, len(data), step):
                    self._write_prep(uuid, data[i:i+step], i, base=base)
                self._write_exec()
    
    
        def print_db(self):
            for s in self.central.adapter.db_conns[self.conn_handle].services:
                print(s)
                for ch in s.chars:
                    print(ch)
                    for d in ch.descs:
                        print(d)
                print("================================================================================")
    
    
        def on_indication(self, ble_adapter, conn_handle, uuid, data):
            if self.central.debug:
                logger.debug('Indication: {}, {} = {}'.format(conn_handle, uuid, "".join("{:02x}".format(c) for c in data)))
    
    
        def on_notification(self, ble_adapter, conn_handle, uuid, data):
            if self.central.debug:
                logger.debug('Notification: {}, {} = {}'.format(conn_handle, uuid, "".join("{:02x}".format(c) for c in data)))
            if uuid.value not in self.notifications:
                self.notifications[uuid.value] = Queue.Queue()
            self.notifications[uuid.value].put(bytearray(data))
    
    
        def on_conn_param_update_request(self, ble_adapter, conn_handle, conn_params):
            if self.central.debug:
                elapsed_time = time.time() - self.start_time
                logger.debug('Connection parameters updated at {} seconds'.format(elapsed_time))
    
    
        def _read_req(self, uuid, offset=0, base=None):
            status, data = self.central.adapter.read_req(self.conn_handle, BLEUUID(uuid, base=base), offset=offset)
            return status == BLEGattStatusCode.success, data
    
    
        def _write_cmd(self, uuid, data, base=None):
            self.central.adapter.write_cmd(self.conn_handle, BLEUUID(uuid, base=base), get_input_as_list(data))
    
    
        def _write_req(self, uuid, data, base=None):
            self.central.adapter.write_req(self.conn_handle, BLEUUID(uuid, base=base), get_input_as_list(data))
    
    
        def _write_prep(self, uuid, data, offset, base=None):
            self.central.adapter.write_prep(self.conn_handle, BLEUUID(uuid, base=base), get_input_as_list(data), offset)
    
    
        def _write_exec(self):
            self.central.adapter.write_exec(self.conn_handle)
    
    nrftester.py
    import sys
    import time
    import logging
    logger = logging.getLogger(__name__)
    logging.basicConfig(filename="debug.log",
                        level=logging.DEBUG,
                        format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S')
    
    # Local imports
    import controller
    
    TARGET_ADDRESS = "E8:FB:13:55:02:24"
    
    def main(serial_port):
        logger.debug('Serial port used: {}'.format(serial_port))
        central = controller.Central(serial_port)
        central.open()
        peripheral = central.connect(TARGET_ADDRESS)
        if peripheral:
            time.sleep(10)
            while peripheral.is_connected():
                time.sleep(10)
            peripheral.disconnect()
            time.sleep(3)
        logger.info('Closing central')
        central.close()
    
    
    if __name__ == "__main__":
        if len(sys.argv) != 2:
            print("{} <serial_port>".format(sys.argv[0]))
            sys.exit(1)
        main(sys.argv[1])
        quit()
    

    Attached is the Python script which demonstrates the issue. Review the debug.log file produced.

    python nrftester.py COM4

    Using a nrf52832 development board as the central and probably peripheral. You will likely see what I see which is this line in the debug.log file:
    2020-02-14 12:58:11.706 INFO controller - remote_close: Connection closed after 64.07205009460449 seconds with reason BLEHci.remote_user_terminated_connection

    I believe that the Peripheral is initiating the close, but am unsure why. I have all of my sd_gap_disconnect code paths printing debug statements and I am not hitting any of them.

Children
Related