Issue with BLE FOTA on nRF5340 Using Python (No Notifications for Chunks)

Hi Nordic Tech Team,

I am implementing BLE-based FOTA for the application core on an nRF5340 using MCUboot. The update works successfully when using the nRF Connect mobile app, but when I attempt the update using my Python script with Bleak, the firmware does not apply after reboot.

Issue Details:

  • The firmware file is sent chunk by chunk over BLE.
  • I can see some notifications, but I am not receiving notifications for each chunk.
  • The device does not reboot into the new firmware after the upload.
  • Manually rebooting does not apply the update.
  • The same firmware file updates correctly using the nRF Connect mobile app.

My Setup:

  • Device: nRF5340
  • Bootloader: MCUboot
  • BLE Library: Bleak (Python)
  • Firmware Update Protocol: SMP over BLE
  • MTU Size: Detected as 512 bytes
  • Chunk Size: 200 bytes

Observations & Debugging Attempts:

  1. Notifications: Some notifications are received, but not consistently for every chunk.
  2. Timing: I added small delays (0.05s) between chunks, but it did not resolve the issue.
  3. Confirm Command: I send an SMP_IMG_CONFIRM command, but the update does not apply.
  4. Manual Reboot: Even after a manual reboot, the new firmware is not active.
  5. Logging: I have not seen any errors in my Python script, but I suspect the device is not fully processing the uploaded chunks.

Questions:

  • Is there a specific way I should wait for notifications before sending the next chunk?
  • Are there any known issue
    import asyncio
    import base64
    import json
    import os
    import struct
    import time
    from bleak import BleakClient, BleakScanner
    from bleak.exc import BleakError
    
    # ---------------------------------------------------------------------------
    # DFU Configuration
    # ---------------------------------------------------------------------------
    DEVICE_ADDRESS = "D9:E1:E4:73:11:33"  # Replace with your device's BLE address
    FIRMWARE_FILE = "app_update.bin"      # Firmware binary to upload
    DEFAULT_CHUNK_SIZE = 200             # Reduced default chunk size
    MAX_RETRIES = 3                       # Number of retries for failed operations
    
    # SMP Service and Characteristic UUIDs (MCUboot/Zephyr SMP over BLE)
    SMP_SERVICE_UUID = "8D53DC1D-1DB7-4CD3-868B-8A527460AA84"
    SMP_CHAR_UUID = "DA2E7828-FBCE-4E01-AE9E-261174997C48"
    
    # ---------------------------------------------------------------------------
    # SMP Command Definitions
    # ---------------------------------------------------------------------------
    # SMP protocol opcodes (group:id)
    SMP_IMG_LIST = bytearray([0x00, 0x00, 0x00, 0x00])    # List images
    SMP_IMG_UPLOAD = bytearray([0x00, 0x00, 0x00, 0x01])  # Upload image
    SMP_IMG_CONFIRM = bytearray([0x00, 0x00, 0x00, 0x06]) # Confirm image
    SMP_RESET = bytearray([0x00, 0x01, 0x00, 0x00])       # Reset device
    
    # ---------------------------------------------------------------------------
    # Helper: Build SMP Packet
    # ---------------------------------------------------------------------------
    def build_smp_packet(opcode: bytearray, data: bytes = b"", seq_num: int = 0) -> bytearray:
        """
        Build an SMP packet with a 4-byte header, 2-byte length (little-endian),
        1-byte flags, and 1-byte sequence number followed by the payload.
        """
        header = bytearray(opcode)  # 4 bytes (group and id)
        length = len(data)
        header.append(length & 0xFF)           # length LSB
        header.append((length >> 8) & 0xFF)    # length MSB
        header.append(0x00)                    # flags
        header.append(seq_num & 0xFF)          # sequence
        return header + data
    
    # ---------------------------------------------------------------------------
    # BLE Device Discovery and Notifications
    # ---------------------------------------------------------------------------
    async def discover_device():
        """Scan for and return the BLE device with the given address."""
        print("🔍 Scanning for BLE device...")
        
        for retry in range(3):  # Try up to 3 times
            device = await BleakScanner.find_device_by_address(DEVICE_ADDRESS)
            if device:
                print(f"✅ Found device: {device.name or 'Unknown'} ({device.address})")
                return device
            print(f"⚠️ Device not found on attempt {retry+1}/3. Retrying...")
            await asyncio.sleep(1)
            
        print(f"❌ Device not found at address {DEVICE_ADDRESS} after multiple attempts.")
        return None
    
    # ---------------------------------------------------------------------------
    # DFU Functions: Upload, Check, Confirm, Reset
    # ---------------------------------------------------------------------------
    async def calculate_image_hash(filename):
        """Calculate a simple hash of the firmware file for verification."""
        if not os.path.exists(filename):
            return None
            
        # Simple CRC32-like hash (for demonstration)
        hash_val = 0
        with open(filename, 'rb') as f:
            while chunk := f.read(4096):
                for byte in chunk:
                    hash_val = ((hash_val << 8) | byte) ^ ((hash_val >> 24) & 0xFF)
                    
        return hash_val & 0xFFFFFFFF
    
    # ---------------------------------------------------------------------------
    # Main DFU Process - Simplified for more stability
    # ---------------------------------------------------------------------------
    async def main():
        # Discover the device
        device = await discover_device()
        if not device:
            return
        
        # Track response states manually (without events to avoid loop issues)
        responses = {}
        
        def notification_handler(sender, data):
            """Handle incoming notifications from the device."""
            print(f"📩 Received {len(data)} bytes from {sender}")
            
            # Store raw response data for processing
            op_group = data[0] if len(data) > 0 else None
            op_id = data[3] if len(data) > 3 else None
            
            responses[(op_group, op_id)] = data
            
            # Simple logging of raw data for debugging
            print(f"📝 Raw response: {data.hex()}")
        
        try:
            async with BleakClient(device, timeout=20.0) as client:
                print("🔗 Connecting to device...")
                
                # Enable notifications
                await client.start_notify(SMP_CHAR_UUID, notification_handler)
                print("🔔 Notifications enabled.")
                
                # Get MTU if available (platform-dependent)
                mtu_size = getattr(client, "mtu_size", 512)
                print("mtu_size : ",mtu_size)
                # Use a conservative chunk size (much smaller than MTU)
                chunk_size = min(200, mtu_size - 80)
                print(f"📊 Using chunk size: {chunk_size} bytes")
                
                # Upload firmware
                total_size = os.path.getsize(FIRMWARE_FILE)
                if total_size == 0:
                    print("❌ Firmware file is empty!")
                    return
                    
                print(f"📦 Uploading firmware '{FIRMWARE_FILE}' ({total_size} bytes)")
                
                # Calculate image hash for verification
                image_hash = await calculate_image_hash(FIRMWARE_FILE)
                print(f"🔐 Firmware hash: {image_hash:08x}")
                
                # Track upload progress
                offset = 0
                seq_num = 0
                start_time = time.time()
                
                with open(FIRMWARE_FILE, "rb") as f:
                    while offset < total_size:
                        chunk = f.read(chunk_size)
                        if not chunk:
                            break
                            
                        # For display purposes
                        percent_complete = (offset / total_size) * 100
                        elapsed_time = time.time() - start_time
                        speed = offset / elapsed_time if elapsed_time > 0 else 0
                        
                        print(f"📤 Uploading: {percent_complete:.1f}% ({offset}/{total_size} bytes), "
                              f"{speed:.1f} B/s, chunk #{seq_num}")
                              
                        # Base64 encode the chunk
                        b64_chunk = base64.b64encode(chunk).decode('utf-8')
                        
                        # Create JSON payload with offset and data
                        chunk_payload = json.dumps({
                            "image": {
                                "data": b64_chunk,
                                "off": offset
                            }
                        }).encode("utf-8")
                        
                        # Create SMP packet
                        packet = build_smp_packet(SMP_IMG_UPLOAD, chunk_payload, seq_num)
                        
                        # Ensure packet size isn't too large
                        max_size = mtu_size - 3
                        if len(packet) > max_size:
                            print(f"⚠️ Packet size {len(packet)} exceeds {max_size} bytes; truncating.")
                            packet = packet[:max_size]
                        
                        # Clear previous responses
                        responses.clear()
                        
                        # Send the packet
                        await client.write_gatt_char(SMP_CHAR_UUID, packet)
                        
                        # Wait briefly for response (simpler approach)
                        await asyncio.sleep(0.05)
                        
                        # Move to next chunk
                        offset += len(chunk)
                        seq_num = (seq_num + 1) % 256
                        
                        # Brief pause between chunks
                        await asyncio.sleep(0.05)
                
                elapsed_time = time.time() - start_time
                print(f"🎉 Firmware upload complete! Time: {elapsed_time:.1f}s, "
                      f"Speed: {total_size/elapsed_time:.1f} B/s")
                
                # Pause to let device process the upload
                print("⏳ Waiting for device to process upload...")
                await asyncio.sleep(2)
                
                # List images
                print("📄 Checking image list on device...")
                list_packet = build_smp_packet(SMP_IMG_LIST)
                responses.clear()
                await client.write_gatt_char(SMP_CHAR_UUID, list_packet)
                await asyncio.sleep(1)  # Wait for response
                
                # Confirm firmware
                print("🔄 Confirming new firmware...")
                confirm_payload = json.dumps({}).encode("utf-8")
                confirm_packet = build_smp_packet(SMP_IMG_CONFIRM, confirm_payload)
                responses.clear()
                await client.write_gatt_char(SMP_CHAR_UUID, confirm_packet)
                await asyncio.sleep(1)  # Wait for response
                
                # Reset device
                print("🔄 Resetting device...")
                reset_packet = build_smp_packet(SMP_RESET)
                responses.clear()
                await client.write_gatt_char(SMP_CHAR_UUID, reset_packet)
                await asyncio.sleep(1)  # Brief wait before disconnect
                
                print("🎉 DFU process completed!")
    
        except Exception as e:
            print(f"❌ DFU Error: {str(e)}")
            import traceback
            traceback.print_exc()
    
    # ---------------------------------------------------------------------------
    # Run the DFU Process
    # ---------------------------------------------------------------------------
    if __name__ == "__main__":
        # Use asyncio.run() which creates a new event loop each time
        asyncio.run(main())
    s with MCUboot handling BLE DFU via Python?
  • How does the nRF Connect app differ in its handling of chunk uploads?
  • Do I need to send an extra "confirm" command before reset?

I have attached my Python script below for reference. Any guidance on debugging and resolving this issue would be greatly appreciated.

Thanks in advance for your help!

Best regards,

Parents Reply Children
No Data
Related