This post is older than 2 years and might not be relevant anymore
More Info: Consider searching for newer posts

Custom 128bit uuid operations (write and read) in pc-ble-driver-py

How to define four different 128 bit uuid service characteristic? write and read operations on 128 bit uuid
please give me some examples operations on 128bit uuid in python

Parents
  • Hi,

    I posted the instructions for adding and writing 128-bit UUIDs in pc-ble-driver-py a couple of years ago. The examples may have changed a bit since then, but the principle should be the same.

    Best regards,
    Jørgen

  • Hi,

    Attached an NUS example that works with the latest version of pc-ble-driver-py, that both supports both notifications for TX and also writes for RX.

    #
    # Copyright (c) 2016 Nordic Semiconductor ASA
    # All rights reserved.
    #
    # Redistribution and use in source and binary forms, with or without modification,
    # are permitted provided that the following conditions are met:
    #
    #   1. Redistributions of source code must retain the above copyright notice, this
    #   list of conditions and the following disclaimer.
    #
    #   2. Redistributions in binary form must reproduce the above copyright notice, this
    #   list of conditions and the following disclaimer in the documentation and/or
    #   other materials provided with the distribution.
    #
    #   3. Neither the name of Nordic Semiconductor ASA nor the names of other
    #   contributors to this software may be used to endorse or promote products
    #   derived from this software without specific prior written permission.
    #
    #   4. This software must only be used in or with a processor manufactured by Nordic
    #   Semiconductor ASA, or in or with a processor manufactured by a third party that
    #   is used in combination with a processor manufactured by Nordic Semiconductor.
    #
    #   5. Any software provided in binary or object form under this license must not be
    #   reverse engineered, decompiled, modified and/or disassembled.
    #
    # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
    # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
    # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
    # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
    # ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
    # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
    # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
    # ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
    # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
    # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    #
    
    import sys
    import time
    import logging
    from queue import Queue, Empty
    from pc_ble_driver_py.observers import *
    
    global config, BLEDriver, BLEAdvData, BLEEvtID, BLEAdapter, BLEEnableParams, BLEGapTimeoutSrc, BLEUUID, BLEUUIDBase, BLEConfigCommon, BLEConfig, BLEConfigConnGatt, BLEGapScanParams
    from pc_ble_driver_py import config
    
    config.__conn_ic_id__ = "NRF52"
    # noinspection PyUnresolvedReferences
    from pc_ble_driver_py.ble_driver import (
    	BLEDriver,
    	BLEAdvData,
    	BLEEvtID,
    	BLEEnableParams,
    	BLEGapTimeoutSrc,
    	BLEUUID,
    	BLEUUIDBase,
    	BLEGapScanParams,
    	BLEConfigCommon,
    	BLEConfig,
    	BLEConfigConnGatt,
    )
    
    # noinspection PyUnresolvedReferences
    from pc_ble_driver_py.ble_adapter import BLEAdapter
    
    global nrf_sd_ble_api_ver
    nrf_sd_ble_api_ver = config.sd_api_ver_get()
    
    TARGET_DEV_NAME = "Nordic_UART"
    CONNECTIONS = 1
    CFG_TAG = 1
    
    
    def init(conn_ic_id):
        # noinspection PyGlobalUndefined
        global config, BLEDriver, BLEAdvData, BLEEvtID, BLEAdapter, BLEEnableParams, BLEGapTimeoutSrc, BLEUUID, BLEUUIDBase, BLEConfigCommon, BLEConfig, BLEConfigConnGatt, BLEGapScanParams
        from pc_ble_driver_py import config
    
        config.__conn_ic_id__ = conn_ic_id
        # noinspection PyUnresolvedReferences
        from pc_ble_driver_py.ble_driver import (
            BLEDriver,
            BLEAdvData,
            BLEEvtID,
            BLEEnableParams,
            BLEGapTimeoutSrc,
            BLEUUID,
    		BLEUUIDBase,
            BLEGapScanParams,
            BLEConfigCommon,
            BLEConfig,
            BLEConfigConnGatt,
        )
    
        # noinspection PyUnresolvedReferences
        from pc_ble_driver_py.ble_adapter import BLEAdapter
    
        global nrf_sd_ble_api_ver
        nrf_sd_ble_api_ver = config.sd_api_ver_get()
    
    
    class HRCollector(BLEDriverObserver, BLEAdapterObserver):
    
        NUS_BASE_UUID   = BLEUUIDBase([0x6E, 0x40, 0x00, 0x00, 0xB5, 0xA3, 0xF3, 0x93, 
                                       0xE0, 0xA9, 0xE5, 0x0E, 0x24, 0xDC, 0xCA, 0x9E], 0x02) # 0x02 is the type BLE_UUID_TYPE_VENDOR_BEGIN 
        NUS_RX_UUID = BLEUUID(0x0002, NUS_BASE_UUID)
        NUS_TX_UUID = BLEUUID(0x0003, NUS_BASE_UUID)
    
        def __init__(self, adapter):
            super(HRCollector, self).__init__()
            self.adapter = adapter
            self.conn_q = Queue()
            self.adapter.observer_register(self)
            self.adapter.driver.observer_register(self)
            self.adapter.default_mtu = 250
    		
            
    
        def open(self):
            self.adapter.driver.open()
            if config.__conn_ic_id__ == "NRF51":
                self.adapter.driver.ble_enable(
                    BLEEnableParams(
                        vs_uuid_count=10,
                        service_changed=0,
                        periph_conn_count=0,
                        central_conn_count=1,
                        central_sec_count=0,
                    )
                )
            elif config.__conn_ic_id__ == "NRF52":
                gatt_cfg = BLEConfigConnGatt()
                gatt_cfg.att_mtu = self.adapter.default_mtu
                gatt_cfg.tag = CFG_TAG
                self.adapter.driver.ble_cfg_set(BLEConfig.conn_gatt, gatt_cfg)
    
                self.adapter.driver.ble_enable()
    
            self.adapter.driver.ble_vs_uuid_add(HRCollector.NUS_BASE_UUID)
    
        def close(self):
            self.adapter.driver.close()
    
        def connect_and_discover(self):
            scan_duration = 180
            params = BLEGapScanParams(interval_ms=200, window_ms=150, timeout_s=scan_duration)
    
            self.adapter.driver.ble_gap_scan_start(scan_params=params)
    
            try:
                new_conn = self.conn_q.get(timeout=scan_duration)
                self.adapter.service_discovery(new_conn)
    
                self.adapter.enable_notification(new_conn, HRCollector.NUS_TX_UUID)
                return new_conn
            except Empty:
                print(f"No heart rate collector advertising with name {TARGET_DEV_NAME} found.")
                return None
    
        def on_gap_evt_connected(
            self, ble_driver, conn_handle, peer_addr, role, conn_params
        ):
            print("New connection: {}".format(conn_handle))
            self.conn_q.put(conn_handle)
    
        def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
            print("Disconnected: {} {}".format(conn_handle, reason))
    
        def on_gap_evt_adv_report(
            self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data
        ):
            if BLEAdvData.Types.complete_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name]
    
            elif BLEAdvData.Types.short_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name]
    
            else:
                return
    
            dev_name = "".join(chr(e) for e in dev_name_list)
            address_string = "".join("{0:02X}".format(b) for b in peer_addr.addr)
            print(
                "Received advertisment report, address: 0x{}, device_name: {}".format(
                    address_string, dev_name
                )
            )
    
            if dev_name == TARGET_DEV_NAME:
                self.adapter.connect(peer_addr, tag=CFG_TAG)
    
        def on_notification(self, ble_adapter, conn_handle, uuid, data):
            if len(data) > 32:
                data = "({}...)".format(data[0:10])
            print("Connection: {}, {} = {}".format(conn_handle, uuid, data))
            print("NUS_RX: " + ''.join(map(chr,data)))
    
    
    def main(selected_serial_port):
        print("Serial port used: {}".format(selected_serial_port))
        driver = BLEDriver(
            serial_port=selected_serial_port, auto_flash=False, baud_rate=1000000, log_severity_level="info"
        )
    
        adapter = BLEAdapter(driver)
        collector = HRCollector(adapter)
        collector.open()
        conn = collector.connect_and_discover()
    
        if conn is not None:
            #time.sleep(1000)
            while True:
                nus_string = input("NUS string: ").encode()
                adapter.write_req(conn, HRCollector.NUS_RX_UUID, nus_string + b'\n')
    
        collector.close()
    
    
    def item_choose(item_list):
        for i, it in enumerate(item_list):
            print("\t{} : {}".format(i, it))
        print(" ")
    
        while True:
            try:
                choice = int(input("Enter your choice: "))
                if (choice >= 0) and (choice < len(item_list)):
                    break
            except Exception:
                pass
            print("\tTry again...")
        return choice
    
    
    if __name__ == "__main__":
        logging.basicConfig(
            level="DEBUG",
            format="%(asctime)s [%(thread)d/%(threadName)s] %(message)s",
        )
        serial_port = None
        if len(sys.argv) < 2:
            print("Please specify connectivity IC identifier (NRF51, NRF52)")
            exit(1)
        init(sys.argv[1])
        if len(sys.argv) == 3:
            serial_port = sys.argv[2]
        else:
            descs = BLEDriver.enum_serial_ports()
            choices = ["{}: {}".format(d.port, d.serial_number) for d in descs]
            choice = item_choose(choices)
            serial_port = descs[choice].port
        main(serial_port)
        quit()
    

    Best regards,
    Jørgen

Reply
  • Hi,

    Attached an NUS example that works with the latest version of pc-ble-driver-py, that both supports both notifications for TX and also writes for RX.

    #
    # Copyright (c) 2016 Nordic Semiconductor ASA
    # All rights reserved.
    #
    # Redistribution and use in source and binary forms, with or without modification,
    # are permitted provided that the following conditions are met:
    #
    #   1. Redistributions of source code must retain the above copyright notice, this
    #   list of conditions and the following disclaimer.
    #
    #   2. Redistributions in binary form must reproduce the above copyright notice, this
    #   list of conditions and the following disclaimer in the documentation and/or
    #   other materials provided with the distribution.
    #
    #   3. Neither the name of Nordic Semiconductor ASA nor the names of other
    #   contributors to this software may be used to endorse or promote products
    #   derived from this software without specific prior written permission.
    #
    #   4. This software must only be used in or with a processor manufactured by Nordic
    #   Semiconductor ASA, or in or with a processor manufactured by a third party that
    #   is used in combination with a processor manufactured by Nordic Semiconductor.
    #
    #   5. Any software provided in binary or object form under this license must not be
    #   reverse engineered, decompiled, modified and/or disassembled.
    #
    # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
    # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
    # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
    # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
    # ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
    # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
    # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
    # ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
    # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
    # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    #
    
    import sys
    import time
    import logging
    from queue import Queue, Empty
    from pc_ble_driver_py.observers import *
    
    global config, BLEDriver, BLEAdvData, BLEEvtID, BLEAdapter, BLEEnableParams, BLEGapTimeoutSrc, BLEUUID, BLEUUIDBase, BLEConfigCommon, BLEConfig, BLEConfigConnGatt, BLEGapScanParams
    from pc_ble_driver_py import config
    
    config.__conn_ic_id__ = "NRF52"
    # noinspection PyUnresolvedReferences
    from pc_ble_driver_py.ble_driver import (
    	BLEDriver,
    	BLEAdvData,
    	BLEEvtID,
    	BLEEnableParams,
    	BLEGapTimeoutSrc,
    	BLEUUID,
    	BLEUUIDBase,
    	BLEGapScanParams,
    	BLEConfigCommon,
    	BLEConfig,
    	BLEConfigConnGatt,
    )
    
    # noinspection PyUnresolvedReferences
    from pc_ble_driver_py.ble_adapter import BLEAdapter
    
    global nrf_sd_ble_api_ver
    nrf_sd_ble_api_ver = config.sd_api_ver_get()
    
    TARGET_DEV_NAME = "Nordic_UART"
    CONNECTIONS = 1
    CFG_TAG = 1
    
    
    def init(conn_ic_id):
        # noinspection PyGlobalUndefined
        global config, BLEDriver, BLEAdvData, BLEEvtID, BLEAdapter, BLEEnableParams, BLEGapTimeoutSrc, BLEUUID, BLEUUIDBase, BLEConfigCommon, BLEConfig, BLEConfigConnGatt, BLEGapScanParams
        from pc_ble_driver_py import config
    
        config.__conn_ic_id__ = conn_ic_id
        # noinspection PyUnresolvedReferences
        from pc_ble_driver_py.ble_driver import (
            BLEDriver,
            BLEAdvData,
            BLEEvtID,
            BLEEnableParams,
            BLEGapTimeoutSrc,
            BLEUUID,
    		BLEUUIDBase,
            BLEGapScanParams,
            BLEConfigCommon,
            BLEConfig,
            BLEConfigConnGatt,
        )
    
        # noinspection PyUnresolvedReferences
        from pc_ble_driver_py.ble_adapter import BLEAdapter
    
        global nrf_sd_ble_api_ver
        nrf_sd_ble_api_ver = config.sd_api_ver_get()
    
    
    class HRCollector(BLEDriverObserver, BLEAdapterObserver):
    
        NUS_BASE_UUID   = BLEUUIDBase([0x6E, 0x40, 0x00, 0x00, 0xB5, 0xA3, 0xF3, 0x93, 
                                       0xE0, 0xA9, 0xE5, 0x0E, 0x24, 0xDC, 0xCA, 0x9E], 0x02) # 0x02 is the type BLE_UUID_TYPE_VENDOR_BEGIN 
        NUS_RX_UUID = BLEUUID(0x0002, NUS_BASE_UUID)
        NUS_TX_UUID = BLEUUID(0x0003, NUS_BASE_UUID)
    
        def __init__(self, adapter):
            super(HRCollector, self).__init__()
            self.adapter = adapter
            self.conn_q = Queue()
            self.adapter.observer_register(self)
            self.adapter.driver.observer_register(self)
            self.adapter.default_mtu = 250
    		
            
    
        def open(self):
            self.adapter.driver.open()
            if config.__conn_ic_id__ == "NRF51":
                self.adapter.driver.ble_enable(
                    BLEEnableParams(
                        vs_uuid_count=10,
                        service_changed=0,
                        periph_conn_count=0,
                        central_conn_count=1,
                        central_sec_count=0,
                    )
                )
            elif config.__conn_ic_id__ == "NRF52":
                gatt_cfg = BLEConfigConnGatt()
                gatt_cfg.att_mtu = self.adapter.default_mtu
                gatt_cfg.tag = CFG_TAG
                self.adapter.driver.ble_cfg_set(BLEConfig.conn_gatt, gatt_cfg)
    
                self.adapter.driver.ble_enable()
    
            self.adapter.driver.ble_vs_uuid_add(HRCollector.NUS_BASE_UUID)
    
        def close(self):
            self.adapter.driver.close()
    
        def connect_and_discover(self):
            scan_duration = 180
            params = BLEGapScanParams(interval_ms=200, window_ms=150, timeout_s=scan_duration)
    
            self.adapter.driver.ble_gap_scan_start(scan_params=params)
    
            try:
                new_conn = self.conn_q.get(timeout=scan_duration)
                self.adapter.service_discovery(new_conn)
    
                self.adapter.enable_notification(new_conn, HRCollector.NUS_TX_UUID)
                return new_conn
            except Empty:
                print(f"No heart rate collector advertising with name {TARGET_DEV_NAME} found.")
                return None
    
        def on_gap_evt_connected(
            self, ble_driver, conn_handle, peer_addr, role, conn_params
        ):
            print("New connection: {}".format(conn_handle))
            self.conn_q.put(conn_handle)
    
        def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
            print("Disconnected: {} {}".format(conn_handle, reason))
    
        def on_gap_evt_adv_report(
            self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data
        ):
            if BLEAdvData.Types.complete_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name]
    
            elif BLEAdvData.Types.short_local_name in adv_data.records:
                dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name]
    
            else:
                return
    
            dev_name = "".join(chr(e) for e in dev_name_list)
            address_string = "".join("{0:02X}".format(b) for b in peer_addr.addr)
            print(
                "Received advertisment report, address: 0x{}, device_name: {}".format(
                    address_string, dev_name
                )
            )
    
            if dev_name == TARGET_DEV_NAME:
                self.adapter.connect(peer_addr, tag=CFG_TAG)
    
        def on_notification(self, ble_adapter, conn_handle, uuid, data):
            if len(data) > 32:
                data = "({}...)".format(data[0:10])
            print("Connection: {}, {} = {}".format(conn_handle, uuid, data))
            print("NUS_RX: " + ''.join(map(chr,data)))
    
    
    def main(selected_serial_port):
        print("Serial port used: {}".format(selected_serial_port))
        driver = BLEDriver(
            serial_port=selected_serial_port, auto_flash=False, baud_rate=1000000, log_severity_level="info"
        )
    
        adapter = BLEAdapter(driver)
        collector = HRCollector(adapter)
        collector.open()
        conn = collector.connect_and_discover()
    
        if conn is not None:
            #time.sleep(1000)
            while True:
                nus_string = input("NUS string: ").encode()
                adapter.write_req(conn, HRCollector.NUS_RX_UUID, nus_string + b'\n')
    
        collector.close()
    
    
    def item_choose(item_list):
        for i, it in enumerate(item_list):
            print("\t{} : {}".format(i, it))
        print(" ")
    
        while True:
            try:
                choice = int(input("Enter your choice: "))
                if (choice >= 0) and (choice < len(item_list)):
                    break
            except Exception:
                pass
            print("\tTry again...")
        return choice
    
    
    if __name__ == "__main__":
        logging.basicConfig(
            level="DEBUG",
            format="%(asctime)s [%(thread)d/%(threadName)s] %(message)s",
        )
        serial_port = None
        if len(sys.argv) < 2:
            print("Please specify connectivity IC identifier (NRF51, NRF52)")
            exit(1)
        init(sys.argv[1])
        if len(sys.argv) == 3:
            serial_port = sys.argv[2]
        else:
            descs = BLEDriver.enum_serial_ports()
            choices = ["{}: {}".format(d.port, d.serial_number) for d in descs]
            choice = item_choose(choices)
            serial_port = descs[choice].port
        main(serial_port)
        quit()
    

    Best regards,
    Jørgen

Children
No Data
Related