import asyncio import base64 import json import os import struct import time from bleak import BleakClient, BleakScanner from bleak.exc import BleakError # --------------------------------------------------------------------------- # DFU Configuration # --------------------------------------------------------------------------- DEVICE_ADDRESS = "D9:E1:E4:73:11:33" # Replace with your device's BLE address FIRMWARE_FILE = "app_update.bin" # Firmware binary to upload DEFAULT_CHUNK_SIZE = 200 # Reduced default chunk size MAX_RETRIES = 3 # Number of retries for failed operations # SMP Service and Characteristic UUIDs (MCUboot/Zephyr SMP over BLE) SMP_SERVICE_UUID = "8D53DC1D-1DB7-4CD3-868B-8A527460AA84" SMP_CHAR_UUID = "DA2E7828-FBCE-4E01-AE9E-261174997C48" # --------------------------------------------------------------------------- # SMP Command Definitions # --------------------------------------------------------------------------- # SMP protocol opcodes (group:id) SMP_IMG_LIST = bytearray([0x00, 0x00, 0x00, 0x00]) # List images SMP_IMG_UPLOAD = bytearray([0x00, 0x00, 0x00, 0x01]) # Upload image SMP_IMG_CONFIRM = bytearray([0x00, 0x00, 0x00, 0x06]) # Confirm image SMP_RESET = bytearray([0x00, 0x01, 0x00, 0x00]) # Reset device # --------------------------------------------------------------------------- # Helper: Build SMP Packet # --------------------------------------------------------------------------- def build_smp_packet(opcode: bytearray, data: bytes = b"", seq_num: int = 0) -> bytearray: """ Build an SMP packet with a 4-byte header, 2-byte length (little-endian), 1-byte flags, and 1-byte sequence number followed by the payload. """ header = bytearray(opcode) # 4 bytes (group and id) length = len(data) header.append(length & 0xFF) # length LSB header.append((length >> 8) & 0xFF) # length MSB header.append(0x00) # flags header.append(seq_num & 0xFF) # sequence return header + data # --------------------------------------------------------------------------- # BLE Device Discovery and Notifications # --------------------------------------------------------------------------- async def discover_device(): """Scan for and return the BLE device with the given address.""" print("🔍 Scanning for BLE device...") for retry in range(3): # Try up to 3 times device = await BleakScanner.find_device_by_address(DEVICE_ADDRESS) if device: print(f"✅ Found device: {device.name or 'Unknown'} ({device.address})") return device print(f"⚠️ Device not found on attempt {retry+1}/3. Retrying...") await asyncio.sleep(1) print(f"❌ Device not found at address {DEVICE_ADDRESS} after multiple attempts.") return None # --------------------------------------------------------------------------- # DFU Functions: Upload, Check, Confirm, Reset # --------------------------------------------------------------------------- async def calculate_image_hash(filename): """Calculate a simple hash of the firmware file for verification.""" if not os.path.exists(filename): return None # Simple CRC32-like hash (for demonstration) hash_val = 0 with open(filename, 'rb') as f: while chunk := f.read(4096): for byte in chunk: hash_val = ((hash_val << 8) | byte) ^ ((hash_val >> 24) & 0xFF) return hash_val & 0xFFFFFFFF # --------------------------------------------------------------------------- # Main DFU Process - Simplified for more stability # --------------------------------------------------------------------------- async def main(): # Discover the device device = await discover_device() if not device: return # Track response states manually (without events to avoid loop issues) responses = {} def notification_handler(sender, data): """Handle incoming notifications from the device.""" print(f"📩 Received {len(data)} bytes from {sender}") # Store raw response data for processing op_group = data[0] if len(data) > 0 else None op_id = data[3] if len(data) > 3 else None responses[(op_group, op_id)] = data # Simple logging of raw data for debugging print(f"📝 Raw response: {data.hex()}") try: async with BleakClient(device, timeout=20.0) as client: print("🔗 Connecting to device...") # Enable notifications await client.start_notify(SMP_CHAR_UUID, notification_handler) print("🔔 Notifications enabled.") # Get MTU if available (platform-dependent) mtu_size = getattr(client, "mtu_size", 512) print("mtu_size : ",mtu_size) # Use a conservative chunk size (much smaller than MTU) chunk_size = min(200, mtu_size - 80) print(f"📊 Using chunk size: {chunk_size} bytes") # Upload firmware total_size = os.path.getsize(FIRMWARE_FILE) if total_size == 0: print("❌ Firmware file is empty!") return print(f"📦 Uploading firmware '{FIRMWARE_FILE}' ({total_size} bytes)") # Calculate image hash for verification image_hash = await calculate_image_hash(FIRMWARE_FILE) print(f"🔐 Firmware hash: {image_hash:08x}") # Track upload progress offset = 0 seq_num = 0 start_time = time.time() with open(FIRMWARE_FILE, "rb") as f: while offset < total_size: chunk = f.read(chunk_size) if not chunk: break # For display purposes percent_complete = (offset / total_size) * 100 elapsed_time = time.time() - start_time speed = offset / elapsed_time if elapsed_time > 0 else 0 print(f"📤 Uploading: {percent_complete:.1f}% ({offset}/{total_size} bytes), " f"{speed:.1f} B/s, chunk #{seq_num}") # Base64 encode the chunk b64_chunk = base64.b64encode(chunk).decode('utf-8') # Create JSON payload with offset and data chunk_payload = json.dumps({ "image": { "data": b64_chunk, "off": offset } }).encode("utf-8") # Create SMP packet packet = build_smp_packet(SMP_IMG_UPLOAD, chunk_payload, seq_num) # Ensure packet size isn't too large max_size = mtu_size - 3 if len(packet) > max_size: print(f"⚠️ Packet size {len(packet)} exceeds {max_size} bytes; truncating.") packet = packet[:max_size] # Clear previous responses responses.clear() # Send the packet await client.write_gatt_char(SMP_CHAR_UUID, packet) # Wait briefly for response (simpler approach) await asyncio.sleep(0.05) # Move to next chunk offset += len(chunk) seq_num = (seq_num + 1) % 256 # Brief pause between chunks await asyncio.sleep(0.05) elapsed_time = time.time() - start_time print(f"🎉 Firmware upload complete! Time: {elapsed_time:.1f}s, " f"Speed: {total_size/elapsed_time:.1f} B/s") # Pause to let device process the upload print("⏳ Waiting for device to process upload...") await asyncio.sleep(2) # List images print("📄 Checking image list on device...") list_packet = build_smp_packet(SMP_IMG_LIST) responses.clear() await client.write_gatt_char(SMP_CHAR_UUID, list_packet) await asyncio.sleep(1) # Wait for response # Confirm firmware print("🔄 Confirming new firmware...") confirm_payload = json.dumps({}).encode("utf-8") confirm_packet = build_smp_packet(SMP_IMG_CONFIRM, confirm_payload) responses.clear() await client.write_gatt_char(SMP_CHAR_UUID, confirm_packet) await asyncio.sleep(1) # Wait for response # Reset device print("🔄 Resetting device...") reset_packet = build_smp_packet(SMP_RESET) responses.clear() await client.write_gatt_char(SMP_CHAR_UUID, reset_packet) await asyncio.sleep(1) # Brief wait before disconnect print("🎉 DFU process completed!") except Exception as e: print(f"❌ DFU Error: {str(e)}") import traceback traceback.print_exc() # --------------------------------------------------------------------------- # Run the DFU Process # --------------------------------------------------------------------------- if __name__ == "__main__": # Use asyncio.run() which creates a new event loop each time asyncio.run(main())Hi Nordic Tech Team and All,
I am currently implementing BLE-based FOTA for the application core on an nRF5340 using MCUboot. I can successfully update the application core using the nRF Connect mobile app, but I am facing issues when attempting to perform the update via a Python script using Bleak.
Issue Details:
- I am able to establish a BLE connection and start the DFU process.
- The firmware upload appears to progress successfully, but after completion, the update does not take effect.
- The same firmware file updates successfully when using the nRF Connect mobile app.
- The device does not reboot into the new firmware after sending the reset command.
- I am not sure if the firmware confirmation step is being processed correctly.
My Setup:
- Hardware: nRF5340
- Bootloader: MCUboot
- BLE Library: Bleak (Python)
- Firmware Update Method: SMP over BLE
- MTU Size: Detected as 512 bytes
Python Script Overview:
- Scans for the BLE device and establishes a connection.
- Uploads the firmware in chunks using the SMP Image Upload command.
- Sends an SMP Image Confirm command.
- Sends a Reset command to reboot into the new firmware.
Observed Behavior:
- The firmware file is sent chunk by chunk, and I can see BLE notifications being received.
- After completing the upload, the device does not reboot into the new firmware.
- Manually rebooting the device does not seem to apply the update either.
- The same firmware file works perfectly when flashed using nRF Connect mobile.
I have attached my Python script below. Could you help me identify what might be going wrong? Are there any additional steps needed for proper firmware validation and activation?
Thanks in advance for your help!