Issue with BLE FOTA Process Using Python Script

import asyncio
import base64
import json
import os
import struct
import time
from bleak import BleakClient, BleakScanner
from bleak.exc import BleakError

# ---------------------------------------------------------------------------
# DFU Configuration
# ---------------------------------------------------------------------------
DEVICE_ADDRESS = "D9:E1:E4:73:11:33"  # Replace with your device's BLE address
FIRMWARE_FILE = "app_update.bin"      # Firmware binary to upload
DEFAULT_CHUNK_SIZE = 200             # Reduced default chunk size
MAX_RETRIES = 3                       # Number of retries for failed operations

# SMP Service and Characteristic UUIDs (MCUboot/Zephyr SMP over BLE)
SMP_SERVICE_UUID = "8D53DC1D-1DB7-4CD3-868B-8A527460AA84"
SMP_CHAR_UUID = "DA2E7828-FBCE-4E01-AE9E-261174997C48"

# ---------------------------------------------------------------------------
# SMP Command Definitions
# ---------------------------------------------------------------------------
# SMP protocol opcodes (group:id)
SMP_IMG_LIST = bytearray([0x00, 0x00, 0x00, 0x00])    # List images
SMP_IMG_UPLOAD = bytearray([0x00, 0x00, 0x00, 0x01])  # Upload image
SMP_IMG_CONFIRM = bytearray([0x00, 0x00, 0x00, 0x06]) # Confirm image
SMP_RESET = bytearray([0x00, 0x01, 0x00, 0x00])       # Reset device

# ---------------------------------------------------------------------------
# Helper: Build SMP Packet
# ---------------------------------------------------------------------------
def build_smp_packet(opcode: bytearray, data: bytes = b"", seq_num: int = 0) -> bytearray:
    """
    Build an SMP packet with a 4-byte header, 2-byte length (little-endian),
    1-byte flags, and 1-byte sequence number followed by the payload.
    """
    header = bytearray(opcode)  # 4 bytes (group and id)
    length = len(data)
    header.append(length & 0xFF)           # length LSB
    header.append((length >> 8) & 0xFF)    # length MSB
    header.append(0x00)                    # flags
    header.append(seq_num & 0xFF)          # sequence
    return header + data

# ---------------------------------------------------------------------------
# BLE Device Discovery and Notifications
# ---------------------------------------------------------------------------
async def discover_device():
    """Scan for and return the BLE device with the given address."""
    print("🔍 Scanning for BLE device...")
    
    for retry in range(3):  # Try up to 3 times
        device = await BleakScanner.find_device_by_address(DEVICE_ADDRESS)
        if device:
            print(f"✅ Found device: {device.name or 'Unknown'} ({device.address})")
            return device
        print(f"⚠️ Device not found on attempt {retry+1}/3. Retrying...")
        await asyncio.sleep(1)
        
    print(f"❌ Device not found at address {DEVICE_ADDRESS} after multiple attempts.")
    return None

# ---------------------------------------------------------------------------
# DFU Functions: Upload, Check, Confirm, Reset
# ---------------------------------------------------------------------------
async def calculate_image_hash(filename):
    """Calculate a simple hash of the firmware file for verification."""
    if not os.path.exists(filename):
        return None
        
    # Simple CRC32-like hash (for demonstration)
    hash_val = 0
    with open(filename, 'rb') as f:
        while chunk := f.read(4096):
            for byte in chunk:
                hash_val = ((hash_val << 8) | byte) ^ ((hash_val >> 24) & 0xFF)
                
    return hash_val & 0xFFFFFFFF

# ---------------------------------------------------------------------------
# Main DFU Process - Simplified for more stability
# ---------------------------------------------------------------------------
async def main():
    # Discover the device
    device = await discover_device()
    if not device:
        return
    
    # Track response states manually (without events to avoid loop issues)
    responses = {}
    
    def notification_handler(sender, data):
        """Handle incoming notifications from the device."""
        print(f"📩 Received {len(data)} bytes from {sender}")
        
        # Store raw response data for processing
        op_group = data[0] if len(data) > 0 else None
        op_id = data[3] if len(data) > 3 else None
        
        responses[(op_group, op_id)] = data
        
        # Simple logging of raw data for debugging
        print(f"📝 Raw response: {data.hex()}")
    
    try:
        async with BleakClient(device, timeout=20.0) as client:
            print("🔗 Connecting to device...")
            
            # Enable notifications
            await client.start_notify(SMP_CHAR_UUID, notification_handler)
            print("🔔 Notifications enabled.")
            
            # Get MTU if available (platform-dependent)
            mtu_size = getattr(client, "mtu_size", 512)
            print("mtu_size : ",mtu_size)
            # Use a conservative chunk size (much smaller than MTU)
            chunk_size = min(200, mtu_size - 80)
            print(f"📊 Using chunk size: {chunk_size} bytes")
            
            # Upload firmware
            total_size = os.path.getsize(FIRMWARE_FILE)
            if total_size == 0:
                print("❌ Firmware file is empty!")
                return
                
            print(f"📦 Uploading firmware '{FIRMWARE_FILE}' ({total_size} bytes)")
            
            # Calculate image hash for verification
            image_hash = await calculate_image_hash(FIRMWARE_FILE)
            print(f"🔐 Firmware hash: {image_hash:08x}")
            
            # Track upload progress
            offset = 0
            seq_num = 0
            start_time = time.time()
            
            with open(FIRMWARE_FILE, "rb") as f:
                while offset < total_size:
                    chunk = f.read(chunk_size)
                    if not chunk:
                        break
                        
                    # For display purposes
                    percent_complete = (offset / total_size) * 100
                    elapsed_time = time.time() - start_time
                    speed = offset / elapsed_time if elapsed_time > 0 else 0
                    
                    print(f"📤 Uploading: {percent_complete:.1f}% ({offset}/{total_size} bytes), "
                          f"{speed:.1f} B/s, chunk #{seq_num}")
                          
                    # Base64 encode the chunk
                    b64_chunk = base64.b64encode(chunk).decode('utf-8')
                    
                    # Create JSON payload with offset and data
                    chunk_payload = json.dumps({
                        "image": {
                            "data": b64_chunk,
                            "off": offset
                        }
                    }).encode("utf-8")
                    
                    # Create SMP packet
                    packet = build_smp_packet(SMP_IMG_UPLOAD, chunk_payload, seq_num)
                    
                    # Ensure packet size isn't too large
                    max_size = mtu_size - 3
                    if len(packet) > max_size:
                        print(f"⚠️ Packet size {len(packet)} exceeds {max_size} bytes; truncating.")
                        packet = packet[:max_size]
                    
                    # Clear previous responses
                    responses.clear()
                    
                    # Send the packet
                    await client.write_gatt_char(SMP_CHAR_UUID, packet)
                    
                    # Wait briefly for response (simpler approach)
                    await asyncio.sleep(0.05)
                    
                    # Move to next chunk
                    offset += len(chunk)
                    seq_num = (seq_num + 1) % 256
                    
                    # Brief pause between chunks
                    await asyncio.sleep(0.05)
            
            elapsed_time = time.time() - start_time
            print(f"🎉 Firmware upload complete! Time: {elapsed_time:.1f}s, "
                  f"Speed: {total_size/elapsed_time:.1f} B/s")
            
            # Pause to let device process the upload
            print("⏳ Waiting for device to process upload...")
            await asyncio.sleep(2)
            
            # List images
            print("📄 Checking image list on device...")
            list_packet = build_smp_packet(SMP_IMG_LIST)
            responses.clear()
            await client.write_gatt_char(SMP_CHAR_UUID, list_packet)
            await asyncio.sleep(1)  # Wait for response
            
            # Confirm firmware
            print("🔄 Confirming new firmware...")
            confirm_payload = json.dumps({}).encode("utf-8")
            confirm_packet = build_smp_packet(SMP_IMG_CONFIRM, confirm_payload)
            responses.clear()
            await client.write_gatt_char(SMP_CHAR_UUID, confirm_packet)
            await asyncio.sleep(1)  # Wait for response
            
            # Reset device
            print("🔄 Resetting device...")
            reset_packet = build_smp_packet(SMP_RESET)
            responses.clear()
            await client.write_gatt_char(SMP_CHAR_UUID, reset_packet)
            await asyncio.sleep(1)  # Brief wait before disconnect
            
            print("🎉 DFU process completed!")

    except Exception as e:
        print(f"❌ DFU Error: {str(e)}")
        import traceback
        traceback.print_exc()

# ---------------------------------------------------------------------------
# Run the DFU Process
# ---------------------------------------------------------------------------
if __name__ == "__main__":
    # Use asyncio.run() which creates a new event loop each time
    asyncio.run(main())
Hi Nordic Tech Team and All,

I am currently implementing BLE-based FOTA for the application core on an nRF5340 using MCUboot. I can successfully update the application core using the nRF Connect mobile app, but I am facing issues when attempting to perform the update via a Python script using Bleak.

Issue Details:

  • I am able to establish a BLE connection and start the DFU process.
  • The firmware upload appears to progress successfully, but after completion, the update does not take effect.
  • The same firmware file updates successfully when using the nRF Connect mobile app.
  • The device does not reboot into the new firmware after sending the reset command.
  • I am not sure if the firmware confirmation step is being processed correctly.

My Setup:

  • Hardware: nRF5340
  • Bootloader: MCUboot
  • BLE Library: Bleak (Python)
  • Firmware Update Method: SMP over BLE
  • MTU Size: Detected as 512 bytes

Python Script Overview:

  • Scans for the BLE device and establishes a connection.
  • Uploads the firmware in chunks using the SMP Image Upload command.
  • Sends an SMP Image Confirm command.
  • Sends a Reset command to reboot into the new firmware.

Observed Behavior:

  1. The firmware file is sent chunk by chunk, and I can see BLE notifications being received.
  2. After completing the upload, the device does not reboot into the new firmware.
  3. Manually rebooting the device does not seem to apply the update either.
  4. The same firmware file works perfectly when flashed using nRF Connect mobile.

I have attached my Python script below. Could you help me identify what might be going wrong? Are there any additional steps needed for proper firmware validation and activation?

Thanks in advance for your help!

Related