Issue with BLE FOTA on nRF5340 Using Python (No Notifications for Chunks)

Hi Nordic Tech Team,

I am implementing BLE-based FOTA for the application core on an nRF5340 using MCUboot. The update works successfully when using the nRF Connect mobile app, but when I attempt the update using my Python script with Bleak, the firmware does not apply after reboot.

Issue Details:

  • The firmware file is sent chunk by chunk over BLE.
  • I can see some notifications, but I am not receiving notifications for each chunk.
  • The device does not reboot into the new firmware after the upload.
  • Manually rebooting does not apply the update.
  • The same firmware file updates correctly using the nRF Connect mobile app.

My Setup:

  • Device: nRF5340
  • Bootloader: MCUboot
  • BLE Library: Bleak (Python)
  • Firmware Update Protocol: SMP over BLE
  • MTU Size: Detected as 512 bytes
  • Chunk Size: 200 bytes

Observations & Debugging Attempts:

  1. Notifications: Some notifications are received, but not consistently for every chunk.
  2. Timing: I added small delays (0.05s) between chunks, but it did not resolve the issue.
  3. Confirm Command: I send an SMP_IMG_CONFIRM command, but the update does not apply.
  4. Manual Reboot: Even after a manual reboot, the new firmware is not active.
  5. Logging: I have not seen any errors in my Python script, but I suspect the device is not fully processing the uploaded chunks.

Questions:

  • Is there a specific way I should wait for notifications before sending the next chunk?
  • Are there any known issue
    import asyncio
    import base64
    import json
    import os
    import struct
    import time
    from bleak import BleakClient, BleakScanner
    from bleak.exc import BleakError
    
    # ---------------------------------------------------------------------------
    # DFU Configuration
    # ---------------------------------------------------------------------------
    DEVICE_ADDRESS = "D9:E1:E4:73:11:33"  # Replace with your device's BLE address
    FIRMWARE_FILE = "app_update.bin"      # Firmware binary to upload
    DEFAULT_CHUNK_SIZE = 200             # Reduced default chunk size
    MAX_RETRIES = 3                       # Number of retries for failed operations
    
    # SMP Service and Characteristic UUIDs (MCUboot/Zephyr SMP over BLE)
    SMP_SERVICE_UUID = "8D53DC1D-1DB7-4CD3-868B-8A527460AA84"
    SMP_CHAR_UUID = "DA2E7828-FBCE-4E01-AE9E-261174997C48"
    
    # ---------------------------------------------------------------------------
    # SMP Command Definitions
    # ---------------------------------------------------------------------------
    # SMP protocol opcodes (group:id)
    SMP_IMG_LIST = bytearray([0x00, 0x00, 0x00, 0x00])    # List images
    SMP_IMG_UPLOAD = bytearray([0x00, 0x00, 0x00, 0x01])  # Upload image
    SMP_IMG_CONFIRM = bytearray([0x00, 0x00, 0x00, 0x06]) # Confirm image
    SMP_RESET = bytearray([0x00, 0x01, 0x00, 0x00])       # Reset device
    
    # ---------------------------------------------------------------------------
    # Helper: Build SMP Packet
    # ---------------------------------------------------------------------------
    def build_smp_packet(opcode: bytearray, data: bytes = b"", seq_num: int = 0) -> bytearray:
        """
        Build an SMP packet with a 4-byte header, 2-byte length (little-endian),
        1-byte flags, and 1-byte sequence number followed by the payload.
        """
        header = bytearray(opcode)  # 4 bytes (group and id)
        length = len(data)
        header.append(length & 0xFF)           # length LSB
        header.append((length >> 8) & 0xFF)    # length MSB
        header.append(0x00)                    # flags
        header.append(seq_num & 0xFF)          # sequence
        return header + data
    
    # ---------------------------------------------------------------------------
    # BLE Device Discovery and Notifications
    # ---------------------------------------------------------------------------
    async def discover_device():
        """Scan for and return the BLE device with the given address."""
        print("🔍 Scanning for BLE device...")
        
        for retry in range(3):  # Try up to 3 times
            device = await BleakScanner.find_device_by_address(DEVICE_ADDRESS)
            if device:
                print(f"✅ Found device: {device.name or 'Unknown'} ({device.address})")
                return device
            print(f"⚠️ Device not found on attempt {retry+1}/3. Retrying...")
            await asyncio.sleep(1)
            
        print(f"❌ Device not found at address {DEVICE_ADDRESS} after multiple attempts.")
        return None
    
    # ---------------------------------------------------------------------------
    # DFU Functions: Upload, Check, Confirm, Reset
    # ---------------------------------------------------------------------------
    async def calculate_image_hash(filename):
        """Calculate a simple hash of the firmware file for verification."""
        if not os.path.exists(filename):
            return None
            
        # Simple CRC32-like hash (for demonstration)
        hash_val = 0
        with open(filename, 'rb') as f:
            while chunk := f.read(4096):
                for byte in chunk:
                    hash_val = ((hash_val << 8) | byte) ^ ((hash_val >> 24) & 0xFF)
                    
        return hash_val & 0xFFFFFFFF
    
    # ---------------------------------------------------------------------------
    # Main DFU Process - Simplified for more stability
    # ---------------------------------------------------------------------------
    async def main():
        # Discover the device
        device = await discover_device()
        if not device:
            return
        
        # Track response states manually (without events to avoid loop issues)
        responses = {}
        
        def notification_handler(sender, data):
            """Handle incoming notifications from the device."""
            print(f"📩 Received {len(data)} bytes from {sender}")
            
            # Store raw response data for processing
            op_group = data[0] if len(data) > 0 else None
            op_id = data[3] if len(data) > 3 else None
            
            responses[(op_group, op_id)] = data
            
            # Simple logging of raw data for debugging
            print(f"📝 Raw response: {data.hex()}")
        
        try:
            async with BleakClient(device, timeout=20.0) as client:
                print("🔗 Connecting to device...")
                
                # Enable notifications
                await client.start_notify(SMP_CHAR_UUID, notification_handler)
                print("🔔 Notifications enabled.")
                
                # Get MTU if available (platform-dependent)
                mtu_size = getattr(client, "mtu_size", 512)
                print("mtu_size : ",mtu_size)
                # Use a conservative chunk size (much smaller than MTU)
                chunk_size = min(200, mtu_size - 80)
                print(f"📊 Using chunk size: {chunk_size} bytes")
                
                # Upload firmware
                total_size = os.path.getsize(FIRMWARE_FILE)
                if total_size == 0:
                    print("❌ Firmware file is empty!")
                    return
                    
                print(f"📦 Uploading firmware '{FIRMWARE_FILE}' ({total_size} bytes)")
                
                # Calculate image hash for verification
                image_hash = await calculate_image_hash(FIRMWARE_FILE)
                print(f"🔐 Firmware hash: {image_hash:08x}")
                
                # Track upload progress
                offset = 0
                seq_num = 0
                start_time = time.time()
                
                with open(FIRMWARE_FILE, "rb") as f:
                    while offset < total_size:
                        chunk = f.read(chunk_size)
                        if not chunk:
                            break
                            
                        # For display purposes
                        percent_complete = (offset / total_size) * 100
                        elapsed_time = time.time() - start_time
                        speed = offset / elapsed_time if elapsed_time > 0 else 0
                        
                        print(f"📤 Uploading: {percent_complete:.1f}% ({offset}/{total_size} bytes), "
                              f"{speed:.1f} B/s, chunk #{seq_num}")
                              
                        # Base64 encode the chunk
                        b64_chunk = base64.b64encode(chunk).decode('utf-8')
                        
                        # Create JSON payload with offset and data
                        chunk_payload = json.dumps({
                            "image": {
                                "data": b64_chunk,
                                "off": offset
                            }
                        }).encode("utf-8")
                        
                        # Create SMP packet
                        packet = build_smp_packet(SMP_IMG_UPLOAD, chunk_payload, seq_num)
                        
                        # Ensure packet size isn't too large
                        max_size = mtu_size - 3
                        if len(packet) > max_size:
                            print(f"⚠️ Packet size {len(packet)} exceeds {max_size} bytes; truncating.")
                            packet = packet[:max_size]
                        
                        # Clear previous responses
                        responses.clear()
                        
                        # Send the packet
                        await client.write_gatt_char(SMP_CHAR_UUID, packet)
                        
                        # Wait briefly for response (simpler approach)
                        await asyncio.sleep(0.05)
                        
                        # Move to next chunk
                        offset += len(chunk)
                        seq_num = (seq_num + 1) % 256
                        
                        # Brief pause between chunks
                        await asyncio.sleep(0.05)
                
                elapsed_time = time.time() - start_time
                print(f"🎉 Firmware upload complete! Time: {elapsed_time:.1f}s, "
                      f"Speed: {total_size/elapsed_time:.1f} B/s")
                
                # Pause to let device process the upload
                print("⏳ Waiting for device to process upload...")
                await asyncio.sleep(2)
                
                # List images
                print("📄 Checking image list on device...")
                list_packet = build_smp_packet(SMP_IMG_LIST)
                responses.clear()
                await client.write_gatt_char(SMP_CHAR_UUID, list_packet)
                await asyncio.sleep(1)  # Wait for response
                
                # Confirm firmware
                print("🔄 Confirming new firmware...")
                confirm_payload = json.dumps({}).encode("utf-8")
                confirm_packet = build_smp_packet(SMP_IMG_CONFIRM, confirm_payload)
                responses.clear()
                await client.write_gatt_char(SMP_CHAR_UUID, confirm_packet)
                await asyncio.sleep(1)  # Wait for response
                
                # Reset device
                print("🔄 Resetting device...")
                reset_packet = build_smp_packet(SMP_RESET)
                responses.clear()
                await client.write_gatt_char(SMP_CHAR_UUID, reset_packet)
                await asyncio.sleep(1)  # Brief wait before disconnect
                
                print("🎉 DFU process completed!")
    
        except Exception as e:
            print(f"❌ DFU Error: {str(e)}")
            import traceback
            traceback.print_exc()
    
    # ---------------------------------------------------------------------------
    # Run the DFU Process
    # ---------------------------------------------------------------------------
    if __name__ == "__main__":
        # Use asyncio.run() which creates a new event loop each time
        asyncio.run(main())
    s with MCUboot handling BLE DFU via Python?
  • How does the nRF Connect app differ in its handling of chunk uploads?
  • Do I need to send an extra "confirm" command before reset?

I have attached my Python script below for reference. Any guidance on debugging and resolving this issue would be greatly appreciated.

Thanks in advance for your help!

Best regards,

Related