Hello!
I am doing some development with pc-ble-driver-py and ran in to some unexpected behaviours when reading characteristics. I have been modifying the nus_collector.py script that was given as an example on this post. I am also using a PCA10028 to scan for devices.
Firstly, when scanning for devices, they last few characters for some devices are missing or corrupted. For example, when receiving the advertising name of a device with the name of "TestDev", the received name is captured as follows:
TesÝgSt -> [84, 101, 115, 221, 103, 83, 116] TesÝÝÝÝ -> [84, 101, 115, 221, 221, 221, 221] TesÝÝÝÝ -> [84, 101, 115, 221, 221, 221, 221] TesÝÝÝ -> [84, 101, 115, 221, 221, 221, 0] TesÝÝÝ -> [84, 101, 115, 221, 221, 221, 0] TesÝwar -> [84, 101, 115, 221, 119, 97, 114] TesÝÝÝ5 -> [84, 101, 115, 221, 221, 221, 53] TesÝÝÝÝ -> [84, 101, 115, 221, 221, 221, 221] TesÝÝÝI -> [84, 101, 115, 221, 221, 221, 73] TesÝÝÝÝ -> [84, 101, 115, 221, 221, 221, 221] TesÝÝÝÝ -> [84, 101, 115, 221, 221, 221, 221] TesÝÝÝÝ -> [84, 101, 115, 221, 221, 221, 221]
where the expected output should be:
TestDev -> [84, 101, 115, 116, 68, 101, 118]
Upon using nRF Connect (either PC or mobile versions), the correct device name is displayed:

This end of device name string corruption does not just happen for this particular device - I have tested scanning other devices where I see a similar behaviour with differing amounts of corruption (e.g. some devices only have 1 character that is corrupted at the end of the string). However, not all of the devices I have scanned display the corruption; some devices have their advertising names being received correctly.
Secondly, when receiving heart rate notifications from a connected device, the data is tens of thousands of values in length. Is there any documentation on how to convert this data so it can be interpreted in beats per minute (bpm)? Note: the heart rate is correctly displayed on nRF Connect (both PC and mobile).
Here is the modified nus_collector.py script that I have been using to test:
import sys
import time
import queue
import logging
from enum import Enum
from functools import wraps
CONN_IC_ID = 'NRF51'
from pc_ble_driver_py import config
config.__conn_ic_id__ = CONN_IC_ID
from pc_ble_driver_py.ble_driver import BLEUUID, BLEUUIDBase, BLEDriverObserver, BLEDriver, BLEEnableParams, BLEAdvData, BLEGapTimeoutSrc
from pc_ble_driver_py.ble_adapter import BLEAdapterObserver, BLEAdapter
TARGET_DEV_NAME = "TestDev"
TARGET_DEV_MAC_ADDRESS = 'DEA01D552902'
CONNECTIONS = 1
def init(conn_ic_id):
global nrf_sd_ble_api_ver
nrf_sd_ble_api_ver = config.sd_api_ver_get()
class NUSCollector(BLEDriverObserver, BLEAdapterObserver):
def __init__(self, adapter):
super(NUSCollector, self).__init__()
self.adapter = adapter
self.conn_q = queue.Queue()
self.adapter.observer_register(self)
self.adapter.driver.observer_register(self)
global GFIT_CS_BASE_UUID, HRS_UUID, DEVICE_NAME_UUID, MANUFACTURER_NAME_UUID
global FIRMWARE_REVISION_UUID, SOFTWARE_REVISION_UUID, FITNESS_MACHINE_FEATURES_UUID
global FITNESS_MACHINE_STATUS_UUID, TREADMILL_DATA_UUID, CROSS_TRAINER_DATA_UUID
global STEP_CLIMBER_DATA_UUID, ROWER_DATA_UUID, INDOOR_BIKE_DATA_UUID
global SUPPORTED_INCLINATION_RANGE_UUID, SUPPORTED_RESISTANCE_LEVEL_RANGE_UUID
global SUPPORTED_HEART_RATE_RANGE_UUID, SUPPORTED_POWER_RANGE_UUID
global GFIT_CS_CONTROL_POINT_UUID, GFIT_CS_CUSTOM_DATA_UUID
HRS_UUID = BLEUUID(BLEUUID.Standard.heart_rate)
DEVICE_NAME_UUID = BLEUUID(0x2A00)
MANUFACTURER_NAME_UUID = BLEUUID(0x2A29)
FIRMWARE_REVISION_UUID = BLEUUID(0x2A26)
SOFTWARE_REVISION_UUID = BLEUUID(0x2A28)
FITNESS_MACHINE_FEATURES_UUID = BLEUUID(0x2ACC)
FITNESS_MACHINE_STATUS_UUID = BLEUUID(0x2ADA)
TREADMILL_DATA_UUID = BLEUUID(0x2ACD)
CROSS_TRAINER_DATA_UUID = BLEUUID(0x2ACE)
STEP_CLIMBER_DATA_UUID = BLEUUID(0x2ACF)
ROWER_DATA_UUID = BLEUUID(0x2AD1)
INDOOR_BIKE_DATA_UUID = BLEUUID(0x2AD2)
SUPPORTED_INCLINATION_RANGE_UUID = BLEUUID(0x2AD5)
SUPPORTED_RESISTANCE_LEVEL_RANGE_UUID = BLEUUID(0x2AD6)
SUPPORTED_HEART_RATE_RANGE_UUID = BLEUUID(0x2AD7)
SUPPORTED_POWER_RANGE_UUID = BLEUUID(0x2AD8)
def open(self):
self.adapter.driver.open()
ble_enable_params = BLEEnableParams(vs_uuid_count = 1,
service_changed = False,
periph_conn_count = 0,
central_conn_count = CONNECTIONS,
central_sec_count = CONNECTIONS)
if nrf_sd_ble_api_ver >= 3:
print("Enabling larger ATT MTUs")
ble_enable_params.att_mtu = 50
self.adapter.driver.ble_enable(ble_enable_params)
def close(self):
self.adapter.driver.close()
def connect_and_discover(self):
self.adapter.driver.ble_gap_scan_start()
new_conn = self.conn_q.get(timeout = 60)
if nrf_sd_ble_api_ver >= 3:
att_mtu = self.adapter.att_mtu_exchange(new_conn)
self.adapter.service_discovery(new_conn)
status, data = self.adapter.read_req(new_conn, DEVICE_NAME_UUID)
data = ''.join([chr(c) for c in data])
print('Device Name = {} status = {}'.format(data, status))
self.adapter.enable_notification(new_conn, HRS_UUID)
return new_conn
def on_gap_evt_connected(self, ble_driver, conn_handle, peer_addr, role, conn_params):
print('New connection: {}'.format(conn_handle))
self.conn_q.put(conn_handle)
def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
print('Disconnected: {} {}'.format(conn_handle, reason))
def on_gap_evt_timeout(self, ble_driver, conn_handle, src):
if src == BLEGapTimeoutSrc.scan:
ble_driver.ble_gap_scan_start()
def on_gap_evt_adv_report(self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data):
dev_name_list = None
if BLEAdvData.Types.complete_local_name in adv_data.records:
dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name]
elif BLEAdvData.Types.short_local_name in adv_data.records:
dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name]
else:
return
dev_name = ''.join(chr(e) for e in dev_name_list)
print('{} -> {}'.format(dev_name, dev_name_list))
address_string = ''.join("{0:02X}".format(b) for b in peer_addr.addr)
# print('Received advertisement report, address: 0x{}, device_name: {}'.format(address_string,
# dev_name))
if (dev_name in TARGET_DEV_NAME):
self.adapter.connect(peer_addr)
def on_notification(self, ble_adapter, conn_handle, uuid, data):
if (uuid.value.value == BLEUUID.Standard.heart_rate.value):
if (len(data) > 32):
print('Heart Rate Received: {}...'.format(data[:10]))
print('Data Length: {}'.format(len(data)))
else:
print('Heart Rate Received: {}'.format(data))
else:
print('Connection: {}, {} = {}'.format(conn_handle, uuid, data))
def on_gattc_evt_read_rsp(self, ble_driver, conn_handle, status, error_handle, attr_handle, offset, data):
print('Read Response event - Connection: {}, {} = {}'.format(conn_handle, attr_handle, data))
def on_att_mtu_exchanged(self, ble_driver, conn_handle, att_mtu):
print('ATT MTU exchanged: conn_handle={} att_mtu={}'.format(conn_handle, att_mtu))
def on_gattc_evt_exchange_mtu_rsp(self, ble_driver, conn_handle, **kwargs):
print('ATT MTU exchange response: conn_handle={}'.format(conn_handle))
def main(serial_port):
print('Serial port used: {}'.format(serial_port))
driver = BLEDriver(serial_port=serial_port, auto_flash=True)
adapter = BLEAdapter(driver)
collector = NUSCollector(adapter)
collector.open()
global name
for i in range(CONNECTIONS):
conn_handle = collector.connect_and_discover()
time.sleep(10)
print('Closing')
collector.close()
def item_choose(item_list):
for i, it in enumerate(item_list):
print('\t{} : {}'.format(i, it))
print(' ')
while True:
try:
choice = int(input('Enter your choice: '))
if ((choice >= 0) and (choice < len(item_list))):
break
except Exception:
pass
print ('\tTry again...')
return choice
if __name__ == "__main__":
serial_port = None
if len(sys.argv) < 2:
print("Please specify connectivity IC identifier (NRF51, NRF52)")
exit(1)
init(sys.argv[1])
if len(sys.argv) == 3:
serial_port = sys.argv[2]
else:
descs = list(BLEDriver.enum_serial_ports())
choices = ['{}: {}'.format(d.port, d.serial_number) for d in descs]
choice = item_choose(choices)
serial_port = descs[choice].port
main(serial_port)
quit()
Any help with either of these issues would be greatly appreciated!
Regards,
Scott