Handling 128-bit UUIDs with Python pc_ble_driver

Hi,

I´m developing a BLE-OTA driver for a BLE SoC and I try to use the nRF52840 dongle with my PC as the second device for the OTA mechanism by using Python and the pc_ble_driver.py. I have an OTA service with the UUID AABBA7B0000000000000000000000501 and the service contains two characteristics for the OTA control AABBA7B0000000000000000000000604 and the OTA data AABBA7B0000000000000000000000605.

What is the correct way to send data by using the Python driver? I don´t find any example with 128-bit UUIDs, so I got stuck in the discovery and handling of these UUIDs.

import sys
import time
import logging
from queue import Queue, Empty
from pc_ble_driver_py.observers import *

global config, BLEDriver, BLEAdvData, BLEEvtID, BLEAdapter, BLEEnableParams, BLEGapTimeoutSrc, BLEUUID, BLEUUIDBase, BLEConfigCommon, BLEConfig, BLEConfigConnGatt, BLEGapScanParams
from pc_ble_driver_py import config

config.__conn_ic_id__ = "NRF52"

# noinspection PyUnresolvedReferences
from pc_ble_driver_py.ble_driver import (
	BLEDriver,
	BLEAdvData,
	BLEEvtID,
	BLEEnableParams,
	BLEGapTimeoutSrc,
	BLEUUID,
	BLEUUIDBase,
	BLEGapScanParams,
	BLEConfigCommon,
	BLEConfig,
	BLEConfigConnGatt,
)

# noinspection PyUnresolvedReferences
from pc_ble_driver_py.ble_adapter import BLEAdapter

global nrf_sd_ble_api_ver
nrf_sd_ble_api_ver = config.sd_api_ver_get()

class OTA(BLEDriverObserver, BLEAdapterObserver):
    OTA_BASE_UUID = BLEUUIDBase([0xAA, 0xBB, 0xA7, 0xCC, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x01], 0x02)
    OTA_Control_UUID = BLEUUIDBase([0xAA, 0xBB, 0xA7, 0xCC, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x04], 0x02)
    OTA_Data_UUID = BLEUUIDBase([0xAA, 0xBB, 0xA7, 0xCC, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x05], 0x02)

    def __init__(self, adapter):
        super(OTA, self).__init__()
        self.adapter = adapter
        self.conn_q = Queue()
        self.adapter.observer_register(self)
        self.adapter.driver.observer_register(self)
        self.adapter.default_mtu = 250

    def open(self):
        self.adapter.driver.open()
        gatt_cfg = BLEConfigConnGatt()
        gatt_cfg.att_mtu = self.adapter.default_mtu
        gatt_cfg.tag = 1
        self.adapter.driver.ble_cfg_set(BLEConfig.conn_gatt, gatt_cfg)
        self.adapter.driver.ble_enable()
        self.adapter.driver.ble_vs_uuid_add(OTA.OTA_BASE_UUID)

    def close(self):
        self.adapter.driver.close()

    def connect_and_discover(self):
        scan_duration = 5
        params = BLEGapScanParams(interval_ms = 200, window_ms = 150, timeout_s = scan_duration)

        self.adapter.driver.ble_gap_scan_start(scan_params = params)

        try:
            new_conn = self.conn_q.get(timeout = scan_duration)
            self.adapter.service_discovery(new_conn)

            return new_conn
        except Empty:
            print("No advertising with name 'jerry3' found.")

            return None

    def on_gap_evt_connected(self, ble_driver, conn_handle, peer_addr, role, conn_params):
        print("New connection: {}".format(conn_handle))
        self.conn_q.put(conn_handle)

    def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
        print("Disconnected: {} {}".format(conn_handle, reason))

    def on_gap_evt_adv_report(self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data):
        if(BLEAdvData.Types.complete_local_name in adv_data.records):
            dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name]
        elif(BLEAdvData.Types.short_local_name in adv_data.records):
            dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name]
        else:
            return

        dev_name = "".join(chr(e) for e in dev_name_list)
        address_string = "".join("{0:02X}".format(b) for b in peer_addr.addr)
        print("Received advertisment report, address: 0x{}, device_name: {}".format(address_string, dev_name))

        if(dev_name == "jerry3"):
            self.adapter.connect(peer_addr, tag = 1)

    def on_notification(self, ble_adapter, conn_handle, uuid, data):
        if(len(data) > 32):
            data = "({}...)".format(data[0:10])
        print("Connection: {}, {} = {}".format(conn_handle, uuid, data))

if(__name__ == "__main__"):
    logging.basicConfig(level = "DEBUG", format = "%(asctime)s [%(thread)d/%(threadName)s] %(message)s")

    Driver = BLEDriver(serial_port = "COM9", auto_flash = False, baud_rate = 1000000, log_severity_level = "info")
    Adapter = BLEAdapter(Driver)
    OTA_Handler = OTA(Adapter)
    OTA_Handler.open()
    Connection = OTA_Handler.connect_and_discover()

    if(Connection is not None):
        time.sleep(10)

        while True:
            nus_string = input("NUS string: ").encode()
            Adapter.write_req(Connection, OTA.OTA_Control_UUID, b'a')

    OTA_Handler.close()
    quit()

I need to use only this service and the two characteristics because it should be a simple demonstration device for testing the OTA mechanism. Can somebody help or provide me some examples for the implementation?

Thanks and best regards

Parents
  • Hi Daniel, 

    You can have a look at this example from our coworker Vidar: 
    Adding custome characterstics using pc-ble-driver-py

    It's demonstrating a NUS central on PC and you can test it with a ble_app_uart example that is a NUS server/peripheral. 

  • Hi,

    I have changed the code to the following:

    import sys
    import time
    import logging
    from queue import Queue, Empty
    from pc_ble_driver_py.observers import *
    
    global config, BLEDriver, BLEAdvData, BLEEvtID, BLEAdapter, BLEEnableParams, BLEGapTimeoutSrc, BLEUUID, BLEUUIDBase, BLEConfigCommon, BLEConfig, BLEConfigConnGatt, BLEGapScanParams
    from pc_ble_driver_py import config
    
    config.__conn_ic_id__ = "NRF52"
    
    from pc_ble_driver_py.ble_driver import (
    	BLEDriver,
    	BLEAdvData,
    	BLEEvtID,
    	BLEEnableParams,
    	BLEGapTimeoutSrc,
    	BLEUUID,
    	BLEUUIDBase,
    	BLEGapScanParams,
    	BLEConfigCommon,
    	BLEConfig,
    	BLEConfigConnGatt,
    )
    
    from pc_ble_driver_py.ble_adapter import BLEAdapter
    
    global nrf_sd_ble_api_ver
    nrf_sd_ble_api_ver = config.sd_api_ver_get()
    
    class OTA(BLEDriverObserver, BLEAdapterObserver):
        OTA_BASE_UUID       = BLEUUIDBase(
            [0x4C, 0x74, 0xA7, 0xB0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x01],
            0x02
        )
        OTA_Control_UUID    = BLEUUIDBase(
            [0x4C, 0x74, 0xA7, 0xB0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x04],
            0x02
        )
        OTA_Data_UUID       = BLEUUIDBase(
            [0x4C, 0x74, 0xA7, 0xB0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x05], 
            0x02
        )
    
        def __init__(self, Adapter):
            super(OTA, self).__init__()
            self.adapter = Adapter
            self.conn_q = Queue()
            self.adapter.observer_register(self)
            self.adapter.driver.observer_register(self)
            self.adapter.default_mtu = 250
    
        def open(self):
            self.adapter.driver.open()
            gatt_cfg = BLEConfigConnGatt()
            gatt_cfg.att_mtu = self.adapter.default_mtu
            gatt_cfg.tag = 1
            self.adapter.driver.ble_cfg_set(BLEConfig.conn_gatt, gatt_cfg)
            self.adapter.driver.ble_enable()
            self.adapter.driver.ble_vs_uuid_add(OTA.OTA_BASE_UUID)
            self.adapter.driver.ble_vs_uuid_add(OTA.OTA_Control_UUID)
            self.adapter.driver.ble_vs_uuid_add(OTA.OTA_Data_UUID)
    
        def close(self):
            self.adapter.driver.close()
    
        def connect_and_discover(self):
            scan_duration = 5
            params = BLEGapScanParams(interval_ms = 200, window_ms = 150, timeout_s = scan_duration)
    
            self.adapter.driver.ble_gap_scan_start(scan_params = params)
    
            try:
                new_conn = self.conn_q.get(timeout = scan_duration)
                self.adapter.service_discovery(new_conn)
    
                return new_conn
            except Empty:
                print("No advertising with name 'jerry3' found.")
    
                return None
    
        def on_gap_evt_connected(self, ble_driver, conn_handle, peer_addr, role, conn_params):
            print("New connection: {}".format(conn_handle))
            self.conn_q.put(conn_handle)
    
        def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
            print("Disconnected: {} {}".format(conn_handle, reason))
    
        def on_gap_evt_adv_report(self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data):
            if(BLEAdvData.Types.complete_local_name in adv_data.records):
                dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name]
            elif(BLEAdvData.Types.short_local_name in adv_data.records):
                dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name]
            else:
                return
    
            dev_name = "".join(chr(e) for e in dev_name_list)
            address_string = "".join("{0:02X}".format(b) for b in peer_addr.addr)
            print("Received advertisment report, address: 0x{}, device_name: {}".format(address_string, dev_name))
    
            if(dev_name == "jerry3"):
                self.adapter.connect(peer_addr, tag = 1)
    
        def on_notification(self, ble_adapter, conn_handle, uuid, data):
            if(len(data) > 32):
                data = "({}...)".format(data[0:10])
            print("Connection: {}, {} = {}".format(conn_handle, uuid, data))
    
    if(__name__ == "__main__"):
        logging.basicConfig(level = "DEBUG", format = "%(asctime)s [%(thread)d/%(threadName)s] %(message)s")
    
        Driver = BLEDriver(serial_port = "COM9", auto_flash = False, baud_rate = 1000000, log_severity_level = "info")
        Adapter = BLEAdapter(Driver)
        OTA_Handler = OTA(Adapter)
        OTA_Handler.open()
        Connection = OTA_Handler.connect_and_discover()
    
        if(Connection is not None):
            while True:
                data = [1]
                Adapter.write_cmd(Connection, OTA.OTA_Control_UUID, data)
    
        OTA_Handler.close()
        quit()

    The application throws an exception with the message

    "assert isinstance(uuid, BLEUUID), "Invalid argument type" AssertionError: Invalid argument type"

    Where does the error come from?

  • Hi, 

    Have you tried to test with the example from Vidar ? 

    I'm not sure what the exception is about, but you may want to give the whole log so that I can find more information. 

Reply Children
  • Hi,

    yes, I have tried it and everything is okay. But I don´t know how to use this for my problem.
    The NUS example uses the following base UUID

    0x4C, 0x74, 0xA7, 0xB0, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x01


    And the BLEUUID class is generating the following UUIDs based on a base UUID and an offset

    6E400002-B5A3-F393-E0A9-E50E24DCCA9E
    6E400003-B5A3-F393-E0A9-E50E24DCCA9E


    But my UUID is entirely different. The UUID for the service is

    0x01, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xB0, 0xA7, 0x74, 0x4C
    and for each characteristic it is
    0x04, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xB0, 0xA7, 0x74, 0x4C
    0x05, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xB0, 0xA7, 0x74, 0x4C


    Can I handle this with an offset too? So I have to add
    0x03, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00


    as an offset?
    What is a good way to insert the correct UUIDs in this Python script?
  • Hi Daniel, 

    Is it possible to change the UUID of the characteristic and service in your application ?


    If you are freely to change them I would suggest to follow what we did with the NUS example to have the same UUID base for the characteristic and the service, the only difference is the offset at byte 3rd and 4th. 

    If it's not possible to change the UUIDs, then what you need to do is to define 3 different UUID base , and each of them would have byte 3rd and 4rd set to 0. In your case the 3rd and 4rd bytes are also 0 so you don't need to do anything. 

    For example you can define this:
    self.your_base1 = BLEUUIDBase([
    0x04, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xB0, 0xA7, 0x74, 0x4C
    ])

    self.your_base2 = BLEUUIDBase([
    0x05, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xB0, 0xA7, 0x74, 0x4C
    ])

      self.adapter.driver.ble_vs_uuid_add(self.your_base1)

     self.adapter.driver.ble_vs_uuid_add(self.your_base2)

    Then you can call:

    self.your_first_char = BLEUUID(0x0000, self.your_base1)
    self.your_second_char = BLEUUID(0x0000, self.your_base2)

    After that you can use your_first_char  and your_second_char  the same as what we did with the nus_rx and nus_tx. 

    In short the BLEUUID() is the combination of the 3rd and 4th bytes and the base_uuid. 

  • Hi  ,

    thanks for your response! I will change it :) 

Related