Macro controlling multiple devices as a global macro?

Hi,

I'm new to all this, I'm using the nRF Connect app and it's amazing!

I'd like to know if there's a way to:
1. create a global macro controlling multiple devices?
2. store values through macros, with a public scope that can be accessed from the other device's macro?
3. run multiple macros at the same time?

My goal is to read the HR from a Polar H10 and control the speed of my FTMS treadmill based on that.

Any tips, leading to guides or other apps even would be much appreaciated!

Thanks!

  • Hi

    As of version 4.4.0 of the nRF Connect for Mobile app it's possible to automate some operations using macros at least by recording or importing them from an CML file. Read more on Macros here: https://github.com/NordicSemiconductor/Android-nRF-Connect/blob/main/documentation/Macros/README.md 

    Best regards,

    Simon

  • Anyway, I solved it with Python + bleak if someone will stumble upon this in the future. It's a prototype, but it works as expected.

    import asyncio
    from bleak import BleakClient, BleakError
    import keyboard
    import sys
    
    HR_MONITOR_ADDR = 'DB:60:75:A4:26:EB' # Polar H10
    HR_MONITOR_READ_ID = '00002a37-0000-1000-8000-00805f9b34fb' # Characteristics: Heart Rate (0x1801) -> Heart Rate MEasurement (0x2A37)
    
    TREADMILL_ADDR = '85:83:D6:04:14:0F' # Treadmill with FitShow chip (FS-DBB45F)
    TREADMILL_WRITE_ID = '00002ad9-0000-1000-8000-00805f9b34fb' # Characteristics: Fitness Machine (0x1826) -> Fitness Machine Control Point (0x2AD9)
    
    bpm_max_threshold = 145
    bpm_min_threshold = 110
    
    commands = {
        'Start': bytearray.fromhex('07'),
        'Stop': bytearray.fromhex('08'),
    
        'Slow Tempo': bytearray.fromhex('023601'),   # 3.1 km/h
        'Normal Tempo': bytearray.fromhex('02EE02')  # 7.5 km/h
    }
    
    
    speed_mode = 'Normal Tempo'
    last_speed_value = None
    
    async def handle_device_1(hr_monitor, treadmill):
        global last_speed_value
        try:
            async def notification_handler(sender, data):
                global last_speed_value
                global speed_mode
                
                hex_data = data.hex()
                # print(f'\nPolar H10 data -> {sender}: {data} (Hex: {hex_data})')
    
                bpm_hex = hex_data[2:4] # second byte of data
                bpm = int(bpm_hex, 16)
                print(f'BPM: {bpm}')
    
                if bpm == 0: # HR belt is off
                    print('Polar H10 is off.')
                    return
    
                if bpm > bpm_max_threshold:
                    speed_mode = 'Slow Tempo'
                elif bpm < bpm_min_threshold:
                    speed_mode = 'Normal Tempo'
    
                speed_value = commands.get(speed_mode, commands['Normal Tempo'])
                print(f'Current Speed: {speed_mode}')
    
                if speed_value != last_speed_value:
                    await write_to_device(treadmill, speed_value)
                    last_speed_value = speed_value
                    print(f'>> SPEED SET: {speed_mode} ({speed_value.hex()})')
    
            # start treadmill
            await treadmill.write_gatt_char(TREADMILL_WRITE_ID, commands['Start'])
            print('>> Starting treadmill in 5...')
            
            # initial treadmill countdown
            await asyncio.sleep(5.1)
    
            # subscribe to HR notifications
            await hr_monitor.start_notify(HR_MONITOR_READ_ID, notification_handler)
            print('>> Waiting for Polar HR data...')
    
            # stop on Esc
            while not keyboard.is_pressed('esc'):
                await asyncio.sleep(0.1)
            
            await hr_monitor.stop_notify(HR_MONITOR_READ_ID)
            await treadmill.write_gatt_char(TREADMILL_WRITE_ID, commands['Stop'])
    
            print('Stopping notifications and disconnecting...')
            
        except BleakError as e:
            print(f'>> Error: Polar HR Monitor failed to connect: {e}')
        except Exception as e:
            print(f'>> Unexpected error: {e}')
        finally:
            await hr_monitor.disconnect()
            await treadmill.disconnect()
            print('>> Devices disconnected.')
    
    async def write_to_device(client, value):
        try:
            if client.is_connected:  # check treadmill connection
                await client.write_gatt_char(TREADMILL_WRITE_ID, value)
            else:
                print('>> Error: Treadmill not connected.')
        except Exception as e:
            print(f'>> Error: Failed writing characteristic: {e}')
    
    async def main():
        async with BleakClient(HR_MONITOR_ADDR) as hr_monitor, BleakClient(TREADMILL_ADDR) as treadmill:
            await handle_device_1(hr_monitor, treadmill)
    
    
    if __name__ == '__main__':
        try:
            asyncio.run(main())
        except KeyboardInterrupt:
            print('>> Interrupted'); sys.exit()
        except Exception as e:
            print(f'>> Unexpected error: {e}'); sys.exit()

Related