Hello!
I am doing some development with pc-ble-driver-py and ran in to some unexpected behaviours when reading characteristics. I have been modifying the nus_collector.py script that was given as an example on this post. I am also using a PCA10028 to scan for devices.
Firstly, when scanning for devices, they last few characters for some devices are missing or corrupted. For example, when receiving the advertising name of a device with the name of "TestDev", the received name is captured as follows:
TesÝgSt -> [84, 101, 115, 221, 103, 83, 116] TesÝÝÝÝ -> [84, 101, 115, 221, 221, 221, 221] TesÝÝÝÝ -> [84, 101, 115, 221, 221, 221, 221] TesÝÝÝ -> [84, 101, 115, 221, 221, 221, 0] TesÝÝÝ -> [84, 101, 115, 221, 221, 221, 0] TesÝwar -> [84, 101, 115, 221, 119, 97, 114] TesÝÝÝ5 -> [84, 101, 115, 221, 221, 221, 53] TesÝÝÝÝ -> [84, 101, 115, 221, 221, 221, 221] TesÝÝÝI -> [84, 101, 115, 221, 221, 221, 73] TesÝÝÝÝ -> [84, 101, 115, 221, 221, 221, 221] TesÝÝÝÝ -> [84, 101, 115, 221, 221, 221, 221] TesÝÝÝÝ -> [84, 101, 115, 221, 221, 221, 221]
where the expected output should be:
TestDev -> [84, 101, 115, 116, 68, 101, 118]
Upon using nRF Connect (either PC or mobile versions), the correct device name is displayed:
This end of device name string corruption does not just happen for this particular device - I have tested scanning other devices where I see a similar behaviour with differing amounts of corruption (e.g. some devices only have 1 character that is corrupted at the end of the string). However, not all of the devices I have scanned display the corruption; some devices have their advertising names being received correctly.
Secondly, when receiving heart rate notifications from a connected device, the data is tens of thousands of values in length. Is there any documentation on how to convert this data so it can be interpreted in beats per minute (bpm)? Note: the heart rate is correctly displayed on nRF Connect (both PC and mobile).
Here is the modified nus_collector.py script that I have been using to test:
import sys import time import queue import logging from enum import Enum from functools import wraps CONN_IC_ID = 'NRF51' from pc_ble_driver_py import config config.__conn_ic_id__ = CONN_IC_ID from pc_ble_driver_py.ble_driver import BLEUUID, BLEUUIDBase, BLEDriverObserver, BLEDriver, BLEEnableParams, BLEAdvData, BLEGapTimeoutSrc from pc_ble_driver_py.ble_adapter import BLEAdapterObserver, BLEAdapter TARGET_DEV_NAME = "TestDev" TARGET_DEV_MAC_ADDRESS = 'DEA01D552902' CONNECTIONS = 1 def init(conn_ic_id): global nrf_sd_ble_api_ver nrf_sd_ble_api_ver = config.sd_api_ver_get() class NUSCollector(BLEDriverObserver, BLEAdapterObserver): def __init__(self, adapter): super(NUSCollector, self).__init__() self.adapter = adapter self.conn_q = queue.Queue() self.adapter.observer_register(self) self.adapter.driver.observer_register(self) global GFIT_CS_BASE_UUID, HRS_UUID, DEVICE_NAME_UUID, MANUFACTURER_NAME_UUID global FIRMWARE_REVISION_UUID, SOFTWARE_REVISION_UUID, FITNESS_MACHINE_FEATURES_UUID global FITNESS_MACHINE_STATUS_UUID, TREADMILL_DATA_UUID, CROSS_TRAINER_DATA_UUID global STEP_CLIMBER_DATA_UUID, ROWER_DATA_UUID, INDOOR_BIKE_DATA_UUID global SUPPORTED_INCLINATION_RANGE_UUID, SUPPORTED_RESISTANCE_LEVEL_RANGE_UUID global SUPPORTED_HEART_RATE_RANGE_UUID, SUPPORTED_POWER_RANGE_UUID global GFIT_CS_CONTROL_POINT_UUID, GFIT_CS_CUSTOM_DATA_UUID HRS_UUID = BLEUUID(BLEUUID.Standard.heart_rate) DEVICE_NAME_UUID = BLEUUID(0x2A00) MANUFACTURER_NAME_UUID = BLEUUID(0x2A29) FIRMWARE_REVISION_UUID = BLEUUID(0x2A26) SOFTWARE_REVISION_UUID = BLEUUID(0x2A28) FITNESS_MACHINE_FEATURES_UUID = BLEUUID(0x2ACC) FITNESS_MACHINE_STATUS_UUID = BLEUUID(0x2ADA) TREADMILL_DATA_UUID = BLEUUID(0x2ACD) CROSS_TRAINER_DATA_UUID = BLEUUID(0x2ACE) STEP_CLIMBER_DATA_UUID = BLEUUID(0x2ACF) ROWER_DATA_UUID = BLEUUID(0x2AD1) INDOOR_BIKE_DATA_UUID = BLEUUID(0x2AD2) SUPPORTED_INCLINATION_RANGE_UUID = BLEUUID(0x2AD5) SUPPORTED_RESISTANCE_LEVEL_RANGE_UUID = BLEUUID(0x2AD6) SUPPORTED_HEART_RATE_RANGE_UUID = BLEUUID(0x2AD7) SUPPORTED_POWER_RANGE_UUID = BLEUUID(0x2AD8) def open(self): self.adapter.driver.open() ble_enable_params = BLEEnableParams(vs_uuid_count = 1, service_changed = False, periph_conn_count = 0, central_conn_count = CONNECTIONS, central_sec_count = CONNECTIONS) if nrf_sd_ble_api_ver >= 3: print("Enabling larger ATT MTUs") ble_enable_params.att_mtu = 50 self.adapter.driver.ble_enable(ble_enable_params) def close(self): self.adapter.driver.close() def connect_and_discover(self): self.adapter.driver.ble_gap_scan_start() new_conn = self.conn_q.get(timeout = 60) if nrf_sd_ble_api_ver >= 3: att_mtu = self.adapter.att_mtu_exchange(new_conn) self.adapter.service_discovery(new_conn) status, data = self.adapter.read_req(new_conn, DEVICE_NAME_UUID) data = ''.join([chr(c) for c in data]) print('Device Name = {} status = {}'.format(data, status)) self.adapter.enable_notification(new_conn, HRS_UUID) return new_conn def on_gap_evt_connected(self, ble_driver, conn_handle, peer_addr, role, conn_params): print('New connection: {}'.format(conn_handle)) self.conn_q.put(conn_handle) def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason): print('Disconnected: {} {}'.format(conn_handle, reason)) def on_gap_evt_timeout(self, ble_driver, conn_handle, src): if src == BLEGapTimeoutSrc.scan: ble_driver.ble_gap_scan_start() def on_gap_evt_adv_report(self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data): dev_name_list = None if BLEAdvData.Types.complete_local_name in adv_data.records: dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name] elif BLEAdvData.Types.short_local_name in adv_data.records: dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name] else: return dev_name = ''.join(chr(e) for e in dev_name_list) print('{} -> {}'.format(dev_name, dev_name_list)) address_string = ''.join("{0:02X}".format(b) for b in peer_addr.addr) # print('Received advertisement report, address: 0x{}, device_name: {}'.format(address_string, # dev_name)) if (dev_name in TARGET_DEV_NAME): self.adapter.connect(peer_addr) def on_notification(self, ble_adapter, conn_handle, uuid, data): if (uuid.value.value == BLEUUID.Standard.heart_rate.value): if (len(data) > 32): print('Heart Rate Received: {}...'.format(data[:10])) print('Data Length: {}'.format(len(data))) else: print('Heart Rate Received: {}'.format(data)) else: print('Connection: {}, {} = {}'.format(conn_handle, uuid, data)) def on_gattc_evt_read_rsp(self, ble_driver, conn_handle, status, error_handle, attr_handle, offset, data): print('Read Response event - Connection: {}, {} = {}'.format(conn_handle, attr_handle, data)) def on_att_mtu_exchanged(self, ble_driver, conn_handle, att_mtu): print('ATT MTU exchanged: conn_handle={} att_mtu={}'.format(conn_handle, att_mtu)) def on_gattc_evt_exchange_mtu_rsp(self, ble_driver, conn_handle, **kwargs): print('ATT MTU exchange response: conn_handle={}'.format(conn_handle)) def main(serial_port): print('Serial port used: {}'.format(serial_port)) driver = BLEDriver(serial_port=serial_port, auto_flash=True) adapter = BLEAdapter(driver) collector = NUSCollector(adapter) collector.open() global name for i in range(CONNECTIONS): conn_handle = collector.connect_and_discover() time.sleep(10) print('Closing') collector.close() def item_choose(item_list): for i, it in enumerate(item_list): print('\t{} : {}'.format(i, it)) print(' ') while True: try: choice = int(input('Enter your choice: ')) if ((choice >= 0) and (choice < len(item_list))): break except Exception: pass print ('\tTry again...') return choice if __name__ == "__main__": serial_port = None if len(sys.argv) < 2: print("Please specify connectivity IC identifier (NRF51, NRF52)") exit(1) init(sys.argv[1]) if len(sys.argv) == 3: serial_port = sys.argv[2] else: descs = list(BLEDriver.enum_serial_ports()) choices = ['{}: {}'.format(d.port, d.serial_number) for d in descs] choice = item_choose(choices) serial_port = descs[choice].port main(serial_port) quit()
Any help with either of these issues would be greatly appreciated!
Regards,
Scott