Macro controlling multiple devices as a global macro?

Hi,

I'm new to all this, I'm using the nRF Connect app and it's amazing!

I'd like to know if there's a way to:
1. create a global macro controlling multiple devices?
2. store values through macros, with a public scope that can be accessed from the other device's macro?
3. run multiple macros at the same time?

My goal is to read the HR from a Polar H10 and control the speed of my FTMS treadmill based on that.

Any tips, leading to guides or other apps even would be much appreaciated!

Thanks!

Parents
  • Anyway, I solved it with Python + bleak if someone will stumble upon this in the future. It's a prototype, but it works as expected.

    import asyncio
    from bleak import BleakClient, BleakError
    import keyboard
    import sys
    
    HR_MONITOR_ADDR = 'DB:60:75:A4:26:EB' # Polar H10
    HR_MONITOR_READ_ID = '00002a37-0000-1000-8000-00805f9b34fb' # Characteristics: Heart Rate (0x1801) -> Heart Rate MEasurement (0x2A37)
    
    TREADMILL_ADDR = '85:83:D6:04:14:0F' # Treadmill with FitShow chip (FS-DBB45F)
    TREADMILL_WRITE_ID = '00002ad9-0000-1000-8000-00805f9b34fb' # Characteristics: Fitness Machine (0x1826) -> Fitness Machine Control Point (0x2AD9)
    
    bpm_max_threshold = 145
    bpm_min_threshold = 110
    
    commands = {
        'Start': bytearray.fromhex('07'),
        'Stop': bytearray.fromhex('08'),
    
        'Slow Tempo': bytearray.fromhex('023601'),   # 3.1 km/h
        'Normal Tempo': bytearray.fromhex('02EE02')  # 7.5 km/h
    }
    
    
    speed_mode = 'Normal Tempo'
    last_speed_value = None
    
    async def handle_device_1(hr_monitor, treadmill):
        global last_speed_value
        try:
            async def notification_handler(sender, data):
                global last_speed_value
                global speed_mode
                
                hex_data = data.hex()
                # print(f'\nPolar H10 data -> {sender}: {data} (Hex: {hex_data})')
    
                bpm_hex = hex_data[2:4] # second byte of data
                bpm = int(bpm_hex, 16)
                print(f'BPM: {bpm}')
    
                if bpm == 0: # HR belt is off
                    print('Polar H10 is off.')
                    return
    
                if bpm > bpm_max_threshold:
                    speed_mode = 'Slow Tempo'
                elif bpm < bpm_min_threshold:
                    speed_mode = 'Normal Tempo'
    
                speed_value = commands.get(speed_mode, commands['Normal Tempo'])
                print(f'Current Speed: {speed_mode}')
    
                if speed_value != last_speed_value:
                    await write_to_device(treadmill, speed_value)
                    last_speed_value = speed_value
                    print(f'>> SPEED SET: {speed_mode} ({speed_value.hex()})')
    
            # start treadmill
            await treadmill.write_gatt_char(TREADMILL_WRITE_ID, commands['Start'])
            print('>> Starting treadmill in 5...')
            
            # initial treadmill countdown
            await asyncio.sleep(5.1)
    
            # subscribe to HR notifications
            await hr_monitor.start_notify(HR_MONITOR_READ_ID, notification_handler)
            print('>> Waiting for Polar HR data...')
    
            # stop on Esc
            while not keyboard.is_pressed('esc'):
                await asyncio.sleep(0.1)
            
            await hr_monitor.stop_notify(HR_MONITOR_READ_ID)
            await treadmill.write_gatt_char(TREADMILL_WRITE_ID, commands['Stop'])
    
            print('Stopping notifications and disconnecting...')
            
        except BleakError as e:
            print(f'>> Error: Polar HR Monitor failed to connect: {e}')
        except Exception as e:
            print(f'>> Unexpected error: {e}')
        finally:
            await hr_monitor.disconnect()
            await treadmill.disconnect()
            print('>> Devices disconnected.')
    
    async def write_to_device(client, value):
        try:
            if client.is_connected:  # check treadmill connection
                await client.write_gatt_char(TREADMILL_WRITE_ID, value)
            else:
                print('>> Error: Treadmill not connected.')
        except Exception as e:
            print(f'>> Error: Failed writing characteristic: {e}')
    
    async def main():
        async with BleakClient(HR_MONITOR_ADDR) as hr_monitor, BleakClient(TREADMILL_ADDR) as treadmill:
            await handle_device_1(hr_monitor, treadmill)
    
    
    if __name__ == '__main__':
        try:
            asyncio.run(main())
        except KeyboardInterrupt:
            print('>> Interrupted'); sys.exit()
        except Exception as e:
            print(f'>> Unexpected error: {e}'); sys.exit()

Reply
  • Anyway, I solved it with Python + bleak if someone will stumble upon this in the future. It's a prototype, but it works as expected.

    import asyncio
    from bleak import BleakClient, BleakError
    import keyboard
    import sys
    
    HR_MONITOR_ADDR = 'DB:60:75:A4:26:EB' # Polar H10
    HR_MONITOR_READ_ID = '00002a37-0000-1000-8000-00805f9b34fb' # Characteristics: Heart Rate (0x1801) -> Heart Rate MEasurement (0x2A37)
    
    TREADMILL_ADDR = '85:83:D6:04:14:0F' # Treadmill with FitShow chip (FS-DBB45F)
    TREADMILL_WRITE_ID = '00002ad9-0000-1000-8000-00805f9b34fb' # Characteristics: Fitness Machine (0x1826) -> Fitness Machine Control Point (0x2AD9)
    
    bpm_max_threshold = 145
    bpm_min_threshold = 110
    
    commands = {
        'Start': bytearray.fromhex('07'),
        'Stop': bytearray.fromhex('08'),
    
        'Slow Tempo': bytearray.fromhex('023601'),   # 3.1 km/h
        'Normal Tempo': bytearray.fromhex('02EE02')  # 7.5 km/h
    }
    
    
    speed_mode = 'Normal Tempo'
    last_speed_value = None
    
    async def handle_device_1(hr_monitor, treadmill):
        global last_speed_value
        try:
            async def notification_handler(sender, data):
                global last_speed_value
                global speed_mode
                
                hex_data = data.hex()
                # print(f'\nPolar H10 data -> {sender}: {data} (Hex: {hex_data})')
    
                bpm_hex = hex_data[2:4] # second byte of data
                bpm = int(bpm_hex, 16)
                print(f'BPM: {bpm}')
    
                if bpm == 0: # HR belt is off
                    print('Polar H10 is off.')
                    return
    
                if bpm > bpm_max_threshold:
                    speed_mode = 'Slow Tempo'
                elif bpm < bpm_min_threshold:
                    speed_mode = 'Normal Tempo'
    
                speed_value = commands.get(speed_mode, commands['Normal Tempo'])
                print(f'Current Speed: {speed_mode}')
    
                if speed_value != last_speed_value:
                    await write_to_device(treadmill, speed_value)
                    last_speed_value = speed_value
                    print(f'>> SPEED SET: {speed_mode} ({speed_value.hex()})')
    
            # start treadmill
            await treadmill.write_gatt_char(TREADMILL_WRITE_ID, commands['Start'])
            print('>> Starting treadmill in 5...')
            
            # initial treadmill countdown
            await asyncio.sleep(5.1)
    
            # subscribe to HR notifications
            await hr_monitor.start_notify(HR_MONITOR_READ_ID, notification_handler)
            print('>> Waiting for Polar HR data...')
    
            # stop on Esc
            while not keyboard.is_pressed('esc'):
                await asyncio.sleep(0.1)
            
            await hr_monitor.stop_notify(HR_MONITOR_READ_ID)
            await treadmill.write_gatt_char(TREADMILL_WRITE_ID, commands['Stop'])
    
            print('Stopping notifications and disconnecting...')
            
        except BleakError as e:
            print(f'>> Error: Polar HR Monitor failed to connect: {e}')
        except Exception as e:
            print(f'>> Unexpected error: {e}')
        finally:
            await hr_monitor.disconnect()
            await treadmill.disconnect()
            print('>> Devices disconnected.')
    
    async def write_to_device(client, value):
        try:
            if client.is_connected:  # check treadmill connection
                await client.write_gatt_char(TREADMILL_WRITE_ID, value)
            else:
                print('>> Error: Treadmill not connected.')
        except Exception as e:
            print(f'>> Error: Failed writing characteristic: {e}')
    
    async def main():
        async with BleakClient(HR_MONITOR_ADDR) as hr_monitor, BleakClient(TREADMILL_ADDR) as treadmill:
            await handle_device_1(hr_monitor, treadmill)
    
    
    if __name__ == '__main__':
        try:
            asyncio.run(main())
        except KeyboardInterrupt:
            print('>> Interrupted'); sys.exit()
        except Exception as e:
            print(f'>> Unexpected error: {e}'); sys.exit()

Children
No Data
Related