nRF52840 Dongle TX and RX with python

Hi, Im trying to write a simply python script to allow connect, write data, read data and disconnect from my targeted bluetooth device using nRF52840-Dongle.

Basically I had successfully perform the connect and disconnect feature. The only thing which I lack is write to my target bluetooth device and read respond from it.

I can't find any information about the python api regarding Nordic UART to transmit and receive data and had been struggling with this issue for quite sometime. It will be great if someone able to provide me some guidance on how shall python api for the write and read feature.

Basically what I required is to write data using the UUID:1401 and read the respond using UUID: 1403 as highlighted below.

The data to write will be in hexadecimal format where sample (05 00 10 20) when i write into the tab.

I'm attach in here my python script which I found in the web where we can utilize  "self.adapter.write_req(new_conn, self.NUS_TX_UUID, byte_array)" to write command but it seems like it does not write successfully. So I'm not sure if my UUID base and TX UUID are setup correct?

import pdb
import sys
import time
import logging
from queue import Queue, Empty
from pc_ble_driver_py.observers import *

TARGET_DEV_NAME = "ABCD"
CONNECTIONS = 1
CFG_TAG = 1

global config, BLEDriver, BLEAdvData, BLEEvtID, BLEAdapter, BLEEnableParams, BLEGapTimeoutSrc, BLEUUID, BLEUUIDBase, BLEConfigCommon, BLEConfig, BLEConfigConnGatt, BLEGapScanParams, BLEConfigConnGap, BLEGapConnParams
from pc_ble_driver_py import config
config.__conn_ic_id__ = "NRF52"
# noinspection PyUnresolvedReferences
from pc_ble_driver_py.ble_driver import (
        BLEDriver,
    BLEAdvData,
    BLEEvtID,
    BLEEnableParams,
    BLEGapTimeoutSrc,
    BLEUUID,
    BLEUUIDBase,
    BLEGapScanParams,
    BLEGapConnParams,
    BLEConfigCommon,
    BLEConfig,
    BLEConfigConnGatt,
    BLEConfigConnGap,
)


def init(conn_ic_id):
    # noinspection PyGlobalUndefined
    global config, BLEDriver, BLEAdvData, BLEEvtID, BLEAdapter, BLEEnableParams, BLEGapTimeoutSrc, BLEUUID, BLEUUIDBase, BLEConfigCommon, BLEConfig, BLEConfigConnGatt, BLEGapScanParams, BLEConfigConnGap, BLEGapConnParams
    from pc_ble_driver_py import config

    config.__conn_ic_id__ = conn_ic_id
    # noinspection PyUnresolvedReferences
    from pc_ble_driver_py.ble_driver import (
        BLEDriver,
        BLEAdvData,
        BLEEvtID,
        BLEEnableParams,
        BLEGapTimeoutSrc,
        BLEUUID,
        BLEUUIDBase,
        BLEGapScanParams,
        BLEGapConnParams,
        BLEConfigCommon,
        BLEConfig,
        BLEConfigConnGatt,
        BLEConfigConnGap,
    )

    # noinspection PyUnresolvedReferences
    from pc_ble_driver_py.ble_adapter import BLEAdapter

    global nrf_sd_ble_api_ver
    nrf_sd_ble_api_ver = config.sd_api_ver_get()


class NusCollector(BLEDriverObserver, BLEAdapterObserver):
    
    def __init__(self, adapter):
        super(NusCollector, self).__init__()
        self.first = 1
        self.adapter = adapter
        self.conn_q = Queue()
        self.adapter.observer_register(self)
        self.adapter.driver.observer_register(self)
        self.adapter.default_mtu = 247
        global BASE_UUID, NUS_TX_UUID
        
        self.BASE_UUID   = BLEUUIDBase([0x00, 0x41], 0x01) # 0x02 is the type BLE_UUID_TYPE_VENDOR_BEGIN 
        self.NUS_TX_UUID = BLEUUID(0x0001, self.BASE_UUID)
        
    def open(self):
        self.adapter.driver.open()
        if config.__conn_ic_id__ == "NRF51":
            self.adapter.driver.ble_enable(
                BLEEnableParams(
                    vs_uuid_count=1,
                    service_changed=0,
                    periph_conn_count=0,
                    central_conn_count=1,
                    central_sec_count=0,
                )
            )
        elif config.__conn_ic_id__ == "NRF52":
            gatt_cfg = BLEConfigConnGatt()
            gatt_cfg.att_mtu = self.adapter.default_mtu
            gatt_cfg.tag = CFG_TAG
            self.adapter.driver.ble_cfg_set(BLEConfig.conn_gatt, gatt_cfg)

            conn_cfg = BLEConfigConnGap()
            conn_cfg.conn_count = 1
            conn_cfg.event_length = 320
            self.adapter.driver.ble_cfg_set(BLEConfig.conn_gap, conn_cfg)

            self.adapter.driver.ble_enable()
            self.adapter.driver.ble_vs_uuid_add(self.BASE_UUID)



    def close(self):
        self.adapter.driver.close()

    def connect_and_discover(self):
        byte_array = bytearray(b'\x05\x00\x10\x20')
        scan_duration = 5
        scan_params = BLEGapScanParams(interval_ms=200, window_ms=150, timeout_s=scan_duration)

        self.adapter.driver.ble_gap_scan_start(scan_params=scan_params)

        try:
            
            new_conn = self.conn_q.get(timeout=scan_duration)
            self.adapter.service_discovery(new_conn)
            print("Connection Established.")
            #self.adapter.driver.ble_gap_rssi_start(new_conn, 0, 0)
            self.adapter.write_req(new_conn, self.NUS_TX_UUID, byte_array)
            return new_conn
        except Empty:
            print("No device advertising with name ABCD found.")
            return None


    def on_gattc_evt_exchange_mtu_rsp(self, ble_driver, conn_handle, status, att_mtu):
        print("ATT MTU updated to {}".format(att_mtu))
    
    def on_gap_evt_data_length_update(
        self, ble_driver, conn_handle, data_length_params
    ):
        print("Max rx octets: {}".format(data_length_params.max_rx_octets))
        print("Max tx octets: {}".format(data_length_params.max_tx_octets))
        print("Max rx time: {}".format(data_length_params.max_rx_time_us))
        print("Max tx time: {}".format(data_length_params.max_tx_time_us))

    def on_gatts_evt_exchange_mtu_request(self, ble_driver, conn_handle, client_mtu):
        print("Client requesting to update ATT MTU to {} bytes".format(client_mtu))
    def on_gap_evt_connected(
        self, ble_driver, conn_handle, peer_addr, role, conn_params
    ):
        print("New connection:1 {}".format(conn_handle))
        self.conn_q.put(conn_handle)

    def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
        print("Disconnected: {} {}".format(conn_handle, reason))

    def on_gap_evt_adv_report(
        self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data
    ):
        conn_params = BLEGapConnParams(min_conn_interval_ms=7.5, max_conn_interval_ms=7.5, conn_sup_timeout_ms=4000, slave_latency=0)
        
        if BLEAdvData.Types.complete_local_name in adv_data.records:
            dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name]

        elif BLEAdvData.Types.short_local_name in adv_data.records:
            dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name]

        else:
            return

        dev_name = "".join(chr(e) for e in dev_name_list)
        address_string = "".join("{0:02X}".format(b) for b in peer_addr.addr)
        print(
            "Received advertisment report, address: 0x{}, device_name: {}".format(
                address_string, dev_name
            )
        )

        if dev_name == TARGET_DEV_NAME:
            self.adapter.connect(peer_addr, conn_params = conn_params, tag=CFG_TAG)

    def on_notification(self, ble_adapter, conn_handle, uuid, data):
        if len(data) > 32:
            data = "({}...)".format(data[0:10])
        print("Connection: {}, {} = {}".format(conn_handle, uuid, data)


def main(selected_serial_port):
    print("Serial port used: {}".format(selected_serial_port))
    driver = BLEDriver(
        serial_port=selected_serial_port, auto_flash=False, baud_rate=1000000, log_severity_level="info"
    )

    adapter = BLEAdapter(driver)
    collector = NusCollector(adapter)
    collector.open()
    conn = collector.connect_and_discover()


def item_choose(item_list):
    for i, it in enumerate(item_list):
        print("\t{} : {}".format(i, it))
    print(" ")

    while True:
        try:
            choice = int(input("Enter your choice: "))
            if (choice >= 0) and (choice < len(item_list)):
                break
        except Exception:
            pass
        print("\tTry again...")
    return choice


if __name__ == "__main__":
    logging.basicConfig(
        level="INFO",
       format="%(asctime)s [%(thread)d/%(threadName)s] %(message)s",
    )
    serial_port = None
    if len(sys.argv) < 2:
        print("Please specify connectivity IC identifier (NRF51, NRF52)")
        exit(1)
    init(sys.argv[1])
    if len(sys.argv) == 3:
        serial_port = sys.argv[2]
    else:
        descs = BLEDriver.enum_serial_ports()
        choices = ["{}: {}".format(d.port, d.serial_number) for d in descs]
        choice = item_choose(choices)
        print("Choices".format(choices))
        serial_port = descs[choice].port
    main(serial_port)
    quit()

 

Related