how to connect to a target ble device using pc_ble_driver_py APIs

Hi

I'm encountering an issue while working with the pc_ble_driver_py API and was hoping for your guidance.

My goal is to establish a connection between an nRF52840 dongle (as the central device) and a target BLE peripheral. I've modeled my script after the sample heart rate collector, but it appears I'm missing a command or section of code. Although the nRF52840 dongle successfully detects the target device, it fails to establish a connection. Here's my code:

import sys
import time
import logging
from queue import Queue, Empty
from pc_ble_driver_py.observers import *
from pc_ble_driver_py.exceptions import NordicSemiException

TARGET_DEV_NAME = "EPI"
TARGET_ADDR_IMPL = "D22E497791AA"
CONNECTIONS = 1
CFG_TAG = 1

conn_implant = None
connections = 0

def init(conn_ic_id):
# noinspection PyGlobalUndefined
global config, BLEDriver, BLEAdvData, BLEEvtID, BLEUUIDBase,BLEAdapter, BLEEnableParams, BLEGapTimeoutSrc, BLEUUID, BLEConfigCommon, BLEConfig, BLEConfigConnGatt, BLEGapScanParams, driver, BLEConfigConnGap, BLEConfigGapRoleCount, BLEGapConnParams
from pc_ble_driver_py import config

config.__conn_ic_id__ = conn_ic_id
# noinspection PyUnresolvedReferences
from pc_ble_driver_py.ble_driver import (
BLEDriver,
BLEAdvData,
BLEEvtID,
BLEEnableParams,
BLEGapTimeoutSrc,
BLEUUID,
BLEUUIDBase,
BLEGapScanParams,
BLEConfigCommon,
BLEConfig,
BLEConfigConnGatt,
driver,
BLEConfigConnGap,
BLEConfigGapRoleCount,
BLEGapConnParams
)

# noinspection PyUnresolvedReferences
from pc_ble_driver_py.ble_adapter import BLEAdapter

global nrf_sd_ble_api_ver
nrf_sd_ble_api_ver = config.sd_api_ver_get()


class nRF52Dongle(BLEDriverObserver, BLEAdapterObserver):

def __init__(self, adapter):
super(nRF52Dongle, self).__init__()
self.adapter = adapter
self.conn_q = Queue()
self.adapter.observer_register(self)
self.adapter.driver.observer_register(self)
self.adapter.default_mtu = 247
self.adapter.default_dataLength = 251
self.adapter.default_phy = driver.BLE_GAP_PHY_2MBPS


def open(self):
self.adapter.driver.open()
if config.__conn_ic_id__.upper() == "NRF51":
self.adapter.driver.ble_enable(
BLEEnableParams(
vs_uuid_count=1,
service_changed=0,
periph_conn_count=0,
central_conn_count=1,
central_sec_count=0,
)
)
elif config.__conn_ic_id__.upper() == "NRF52" or config.__conn_ic_id__.upper() == "NRF53":
gatt_cfg = BLEConfigConnGatt()
gatt_cfg.att_mtu = self.adapter.default_mtu
gatt_cfg.tag = CFG_TAG
self.adapter.driver.ble_cfg_set(BLEConfig.conn_gatt, gatt_cfg)
self.adapter.driver.ble_enable()

def close(self):
print("collector.close()")
self.adapter.driver.close()


def connect_and_discover(self):
scan_duration = 2
params = BLEGapScanParams(interval_ms=200, window_ms=150, timeout_s=scan_duration) # from EEG_Epiminder

self.adapter.driver.ble_gap_scan_start(scan_params=params)

try:
new_conn = self.conn_q.get(timeout=scan_duration)
self.adapter.service_discovery(new_conn)
self.att_mtu = self.adapter.att_mtu_exchange(
new_conn,
self.adapter.default_mtu
)
print('MTU: ' + str(self.att_mtu))
self.data_length = self.adapter.data_length_update(
new_conn,
self.adapter.default_dataLength
)
print('DataLength: ' + str(self.data_length))
resp_phy = self.adapter.phy_update(
new_conn,
[
self.adapter.default_phy,
self.adapter.default_phy
]
)
print('PHY: ' + str(resp_phy))
for s in self.adapter.db_conns[new_conn].services:
print(str(s))
print("Service UUID = " + str(s.uuid) + ' ' + str(s.uuid.value))
for c in s.chars:
print(" Char UUID = " + str(c.uuid) + ' ' + str(c.uuid.value))
if (c.uuid == self.STREAM_CHARACTERISTIC_UUID):
conn_implant = new_conn

elif (c.uuid == self.WEARABLE_POWER_LEVEL_CHARACTERISTIC_UUID): # and setup_has_wearable
conn_wear = new_conn

attr_handle_power_level = self.adapter.db_conns[new_conn].get_char_handle(c.uuid) + 1

for d in c.descs:
print(" Desc UUID = " + str(d.uuid) + ' ' + str(d.uuid.value))

return new_conn
except Empty:
print(f"Not found: {TARGET_DEV_NAME}.")
return None


def on_gap_evt_connected(
self, ble_driver, conn_handle, peer_addr, role, conn_params
):
print("New connection: {}".format(conn_handle))
self.conn_q.put(conn_handle)

def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
print("Disconnected: {} {}".format(conn_handle, reason))

def on_gap_evt_adv_report(
self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data
):
if BLEAdvData.Types.complete_local_name in adv_data.records:
dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name]

elif BLEAdvData.Types.short_local_name in adv_data.records:
dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name]

else:
return

dev_name = "".join(chr(e) for e in dev_name_list)
address_string = "".join("{0:02X}".format(b) for b in peer_addr.addr)
print(
"Received advertisment report, address: 0x{}, device_name: {}".format(
address_string, dev_name
)
)
print("Expecting: ", TARGET_DEV_NAME, TARGET_ADDR_IMPL)

if (dev_name == TARGET_DEV_NAME) and (str(address_string) == TARGET_ADDR_IMPL):
print ("Attempt connection: ", dev_name, address_string)
conn_interval = 7.5
conn_params = BLEGapConnParams(min_conn_interval_ms=conn_interval, max_conn_interval_ms=conn_interval, conn_sup_timeout_ms=4000, slave_latency=0)
self.adapter.connect(peer_addr, conn_params=conn_params, tag=CFG_TAG) # connection request to TARGET_DEV_NAME

def on_notification(self, ble_adapter, conn_handle, uuid, data): # callback → sending notification without requiring acknowledgement
if len(data) > 32:
data = "({}...)".format(data[0:10])
print("Connection: {}, {} = {}".format(conn_handle, uuid, data))

def main(selected_serial_port):
print("Serial port used: {}".format(selected_serial_port))
driver = BLEDriver(
serial_port=selected_serial_port, auto_flash=False, baud_rate=1000000, log_severity_level="info"
)

adapter = BLEAdapter(driver)
collector = nRF52Dongle(adapter)
collector.open()

conn = collector.connect_and_discover()

if conn is not None:
connections = conn + 1
print(conn)
print("Connection 1/1")
else:
collector.close()


logging.basicConfig(
level="DEBUG",
format="%(asctime)s [%(thread)d/%(threadName)s] %(message)s",
)

if len(sys.argv) < 2:
print("Please specify connectivity IC identifier (NRF51, NRF52)")
exit(1)
init(sys.argv[1])

serial_port = sys.argv[2]


def run():
main(serial_port)

run()

Your insights or suggestions would be greatly appreciated. Thank you!

Parents Reply Children
Related