Hi,
I have a service that receives an uint8_t and it looks as shown below in the nRF Connect app
I try to configure it via the PC-BLE framework by using an nRF52840 dongle, but I get the error "Invalid argument type" every time when I want to schedule a write request:
What is wrong here and how can I fix this issue?
I use the following code (I can not add the code as code block, so I have to paste it...)
import time
import logging
import struct
from queue import Queue, Empty
from pc_ble_driver_py.observers import *
CONNECTIONS = 2
CFG_TAG = 1
def init():
# noinspection PyGlobalUndefined
global config, BLEDriver, BLEAdvData, BLEEvtID, BLEAdapter, BLEEnableParams, BLEGapTimeoutSrc, BLEUUID, BLEUUIDBase, BLEConfigCommon, BLEConfig, BLEConfigConnGatt, BLEConfigConnGap, BLEGapScanParams, BLEConfigGapRoleCount
from pc_ble_driver_py import config
config.__conn_ic_id__ = "NRF52"
# noinspection PyUnresolvedReferences
from pc_ble_driver_py.ble_driver import (
BLEDriver,
BLEAdvData,
BLEEvtID,
BLEEnableParams,
BLEGapTimeoutSrc,
BLEUUID,
BLEUUIDBase,
BLEGapScanParams,
BLEConfigCommon,
BLEConfig,
BLEConfigConnGatt,
BLEConfigConnGap,
BLEConfigGapRoleCount,
)
# noinspection PyUnresolvedReferences
from pc_ble_driver_py.ble_adapter import BLEAdapter
global nrf_sd_ble_api_ver
nrf_sd_ble_api_ver = config.sd_api_ver_get()
class Collector(BLEDriverObserver, BLEAdapterObserver):
def __init__(self, adapter):
self.adapter = adapter
self.conn_q = Queue()
self.conn = 0
self.adapter.observer_register(self)
self.adapter.driver.observer_register(self)
self.tl_heartbeat = BLEUUIDBase([0x02, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xB0, 0xA7, 0x74, 0x4C])
def open(self):
self.adapter.driver.open()
gatt_cfg = BLEConfigConnGatt()
gatt_cfg.att_mtu = self.adapter.default_mtu
gatt_cfg.tag = CFG_TAG
self.adapter.driver.ble_cfg_set(BLEConfig.conn_gatt, gatt_cfg)
gap_cfg = BLEConfigConnGap()
gap_cfg.conn_count = CONNECTIONS
gap_cfg.event_length = 6
self.adapter.driver.ble_cfg_set(BLEConfig.conn_gap, gap_cfg)
gap_role_cfg = BLEConfigGapRoleCount()
gap_role_cfg.central_role_count = CONNECTIONS
gap_role_cfg.periph_role_count = 0
gap_role_cfg.central_sec_count = 0
self.adapter.driver.ble_cfg_set(BLEConfig.role_count, gap_role_cfg)
self.adapter.driver.ble_enable()
def close(self):
self.adapter.driver.close()
def connect_and_discover(self):
scan_duration = 5
params = BLEGapScanParams(interval_ms = 200, window_ms = 150, timeout_s = scan_duration)
self.adapter.driver.ble_gap_scan_start(scan_params = params)
try:
self.conn = self.conn_q.get(timeout = scan_duration)
self.adapter.service_discovery(self.conn)
self.adapter.write_req(self.conn, self.service, struct.pack('b', 20))
return self.conn
except Empty:
return None
def on_gap_evt_connected(self, ble_driver, conn_handle, peer_addr, role, conn_params):
print("New connection: {}".format(conn_handle))
self.conn_q.put(conn_handle)
def on_gap_evt_disconnected(self, ble_driver, conn_handle, reason):
print("Disconnected: {} {}".format(conn_handle, reason))
def on_gap_evt_adv_report(self, ble_driver, conn_handle, peer_addr, rssi, adv_type, adv_data):
if BLEAdvData.Types.complete_local_name in adv_data.records:
dev_name_list = adv_data.records[BLEAdvData.Types.complete_local_name]
elif BLEAdvData.Types.short_local_name in adv_data.records:
dev_name_list = adv_data.records[BLEAdvData.Types.short_local_name]
else:
return
dev_name = "".join(chr(e) for e in dev_name_list)
address_string = "".join("{0:02X}".format(b) for b in peer_addr.addr)
print("Received advertisment report, address: 0x{}, device_name: {}".format(address_string, dev_name))
if "tl_" in dev_name:
self.adapter.connect(peer_addr, tag=CFG_TAG)
def main(selected_serial_port):
print("Serial port used: {}".format(selected_serial_port))
driver = BLEDriver(serial_port=selected_serial_port, auto_flash=False, baud_rate=1000000, log_severity_level="info")
adapter = BLEAdapter(driver)
collector = Collector(adapter)
collector.open()
connections = 0
while connections < CONNECTIONS:
conn = collector.connect_and_discover()
if conn is not None:
connections = conn + 1
print("Connection {0}/{1}".format(connections, CONNECTIONS))
if conn is not None:
time.sleep(10)
collector.close()
if(__name__ == "__main__"):
logging.basicConfig(level="WARNING",format="%(asctime)s [%(thread)d/%(threadName)s] %(message)s",)
init()
serial_port = "COM14"
main(serial_port)
quit()