import asyncio
import base64
import json
import os
import struct
import time
from bleak import BleakClient, BleakScanner
from bleak.exc import BleakError
# ---------------------------------------------------------------------------
# DFU Configuration
# ---------------------------------------------------------------------------
DEVICE_ADDRESS = "D9:E1:E4:73:11:33" # Replace with your device's BLE address
FIRMWARE_FILE = "app_update.bin" # Firmware binary to upload
DEFAULT_CHUNK_SIZE = 200 # Reduced default chunk size
MAX_RETRIES = 3 # Number of retries for failed operations
# SMP Service and Characteristic UUIDs (MCUboot/Zephyr SMP over BLE)
SMP_SERVICE_UUID = "8D53DC1D-1DB7-4CD3-868B-8A527460AA84"
SMP_CHAR_UUID = "DA2E7828-FBCE-4E01-AE9E-261174997C48"
# ---------------------------------------------------------------------------
# SMP Command Definitions
# ---------------------------------------------------------------------------
# SMP protocol opcodes (group:id)
SMP_IMG_LIST = bytearray([0x00, 0x00, 0x00, 0x00]) # List images
SMP_IMG_UPLOAD = bytearray([0x00, 0x00, 0x00, 0x01]) # Upload image
SMP_IMG_CONFIRM = bytearray([0x00, 0x00, 0x00, 0x06]) # Confirm image
SMP_RESET = bytearray([0x00, 0x01, 0x00, 0x00]) # Reset device
# ---------------------------------------------------------------------------
# Helper: Build SMP Packet
# ---------------------------------------------------------------------------
def build_smp_packet(opcode: bytearray, data: bytes = b"", seq_num: int = 0) -> bytearray:
"""
Build an SMP packet with a 4-byte header, 2-byte length (little-endian),
1-byte flags, and 1-byte sequence number followed by the payload.
"""
header = bytearray(opcode) # 4 bytes (group and id)
length = len(data)
header.append(length & 0xFF) # length LSB
header.append((length >> 8) & 0xFF) # length MSB
header.append(0x00) # flags
header.append(seq_num & 0xFF) # sequence
return header + data
# ---------------------------------------------------------------------------
# BLE Device Discovery and Notifications
# ---------------------------------------------------------------------------
async def discover_device():
"""Scan for and return the BLE device with the given address."""
print("🔍 Scanning for BLE device...")
for retry in range(3): # Try up to 3 times
device = await BleakScanner.find_device_by_address(DEVICE_ADDRESS)
if device:
print(f"✅ Found device: {device.name or 'Unknown'} ({device.address})")
return device
print(f"⚠️ Device not found on attempt {retry+1}/3. Retrying...")
await asyncio.sleep(1)
print(f"❌ Device not found at address {DEVICE_ADDRESS} after multiple attempts.")
return None
# ---------------------------------------------------------------------------
# DFU Functions: Upload, Check, Confirm, Reset
# ---------------------------------------------------------------------------
async def calculate_image_hash(filename):
"""Calculate a simple hash of the firmware file for verification."""
if not os.path.exists(filename):
return None
# Simple CRC32-like hash (for demonstration)
hash_val = 0
with open(filename, 'rb') as f:
while chunk := f.read(4096):
for byte in chunk:
hash_val = ((hash_val << 8) | byte) ^ ((hash_val >> 24) & 0xFF)
return hash_val & 0xFFFFFFFF
# ---------------------------------------------------------------------------
# Main DFU Process - Simplified for more stability
# ---------------------------------------------------------------------------
async def main():
# Discover the device
device = await discover_device()
if not device:
return
# Track response states manually (without events to avoid loop issues)
responses = {}
def notification_handler(sender, data):
"""Handle incoming notifications from the device."""
print(f"📩 Received {len(data)} bytes from {sender}")
# Store raw response data for processing
op_group = data[0] if len(data) > 0 else None
op_id = data[3] if len(data) > 3 else None
responses[(op_group, op_id)] = data
# Simple logging of raw data for debugging
print(f"📝 Raw response: {data.hex()}")
try:
async with BleakClient(device, timeout=20.0) as client:
print("🔗 Connecting to device...")
# Enable notifications
await client.start_notify(SMP_CHAR_UUID, notification_handler)
print("🔔 Notifications enabled.")
# Get MTU if available (platform-dependent)
mtu_size = getattr(client, "mtu_size", 512)
print("mtu_size : ",mtu_size)
# Use a conservative chunk size (much smaller than MTU)
chunk_size = min(200, mtu_size - 80)
print(f"📊 Using chunk size: {chunk_size} bytes")
# Upload firmware
total_size = os.path.getsize(FIRMWARE_FILE)
if total_size == 0:
print("❌ Firmware file is empty!")
return
print(f"📦 Uploading firmware '{FIRMWARE_FILE}' ({total_size} bytes)")
# Calculate image hash for verification
image_hash = await calculate_image_hash(FIRMWARE_FILE)
print(f"🔐 Firmware hash: {image_hash:08x}")
# Track upload progress
offset = 0
seq_num = 0
start_time = time.time()
with open(FIRMWARE_FILE, "rb") as f:
while offset < total_size:
chunk = f.read(chunk_size)
if not chunk:
break
# For display purposes
percent_complete = (offset / total_size) * 100
elapsed_time = time.time() - start_time
speed = offset / elapsed_time if elapsed_time > 0 else 0
print(f"📤 Uploading: {percent_complete:.1f}% ({offset}/{total_size} bytes), "
f"{speed:.1f} B/s, chunk #{seq_num}")
# Base64 encode the chunk
b64_chunk = base64.b64encode(chunk).decode('utf-8')
# Create JSON payload with offset and data
chunk_payload = json.dumps({
"image": {
"data": b64_chunk,
"off": offset
}
}).encode("utf-8")
# Create SMP packet
packet = build_smp_packet(SMP_IMG_UPLOAD, chunk_payload, seq_num)
# Ensure packet size isn't too large
max_size = mtu_size - 3
if len(packet) > max_size:
print(f"⚠️ Packet size {len(packet)} exceeds {max_size} bytes; truncating.")
packet = packet[:max_size]
# Clear previous responses
responses.clear()
# Send the packet
await client.write_gatt_char(SMP_CHAR_UUID, packet)
# Wait briefly for response (simpler approach)
await asyncio.sleep(0.05)
# Move to next chunk
offset += len(chunk)
seq_num = (seq_num + 1) % 256
# Brief pause between chunks
await asyncio.sleep(0.05)
elapsed_time = time.time() - start_time
print(f"🎉 Firmware upload complete! Time: {elapsed_time:.1f}s, "
f"Speed: {total_size/elapsed_time:.1f} B/s")
# Pause to let device process the upload
print("⏳ Waiting for device to process upload...")
await asyncio.sleep(2)
# List images
print("📄 Checking image list on device...")
list_packet = build_smp_packet(SMP_IMG_LIST)
responses.clear()
await client.write_gatt_char(SMP_CHAR_UUID, list_packet)
await asyncio.sleep(1) # Wait for response
# Confirm firmware
print("🔄 Confirming new firmware...")
confirm_payload = json.dumps({}).encode("utf-8")
confirm_packet = build_smp_packet(SMP_IMG_CONFIRM, confirm_payload)
responses.clear()
await client.write_gatt_char(SMP_CHAR_UUID, confirm_packet)
await asyncio.sleep(1) # Wait for response
# Reset device
print("🔄 Resetting device...")
reset_packet = build_smp_packet(SMP_RESET)
responses.clear()
await client.write_gatt_char(SMP_CHAR_UUID, reset_packet)
await asyncio.sleep(1) # Brief wait before disconnect
print("🎉 DFU process completed!")
except Exception as e:
print(f"❌ DFU Error: {str(e)}")
import traceback
traceback.print_exc()
# ---------------------------------------------------------------------------
# Run the DFU Process
# ---------------------------------------------------------------------------
if __name__ == "__main__":
# Use asyncio.run() which creates a new event loop each time
asyncio.run(main())Hi Nordic Tech Team and All,
I am currently implementing BLE-based FOTA for the application core on an nRF5340 using MCUboot. I can successfully update the application core using the nRF Connect mobile app, but I am facing issues when attempting to perform the update via a Python script using Bleak.
Issue Details:
- I am able to establish a BLE connection and start the DFU process.
- The firmware upload appears to progress successfully, but after completion, the update does not take effect.
- The same firmware file updates successfully when using the nRF Connect mobile app.
- The device does not reboot into the new firmware after sending the reset command.
- I am not sure if the firmware confirmation step is being processed correctly.
My Setup:
- Hardware: nRF5340
- Bootloader: MCUboot
- BLE Library: Bleak (Python)
- Firmware Update Method: SMP over BLE
- MTU Size: Detected as 512 bytes
Python Script Overview:
- Scans for the BLE device and establishes a connection.
- Uploads the firmware in chunks using the SMP Image Upload command.
- Sends an SMP Image Confirm command.
- Sends a Reset command to reboot into the new firmware.
Observed Behavior:
- The firmware file is sent chunk by chunk, and I can see BLE notifications being received.
- After completing the upload, the device does not reboot into the new firmware.
- Manually rebooting the device does not seem to apply the update either.
- The same firmware file works perfectly when flashed using nRF Connect mobile.
I have attached my Python script below. Could you help me identify what might be going wrong? Are there any additional steps needed for proper firmware validation and activation?
Thanks in advance for your help!