#!/usr/bin/env python3 """ BLE Debug for WellNuo WP sensors Test all commands """ import asyncio from bleak import BleakClient, BleakScanner # Sensor BLE UUIDs SERVICE_UUID = "4fafc201-1fb5-459e-8fcc-c5c9c331914b" CHAR_UUID = "beb5483e-36e1-4688-b7f5-ea07361b26a8" DEVICE_PIN = "7856" response_data = None response_event = asyncio.Event() def notification_handler(sender, data): global response_data decoded = data.decode('utf-8', errors='replace') print(f" [NOTIFY] {decoded}") response_data = decoded response_event.set() async def send_and_wait(client, command, timeout=10): global response_data response_data = None response_event.clear() print(f"\n>>> Sending: {command}") await client.write_gatt_char(CHAR_UUID, command.encode('utf-8')) try: await asyncio.wait_for(response_event.wait(), timeout=timeout) return response_data except asyncio.TimeoutError: # Try reading try: data = await client.read_gatt_char(CHAR_UUID) decoded = data.decode('utf-8', errors='replace') print(f" [READ] {decoded}") return decoded except: print(f" [TIMEOUT] No response") return None async def main(): print("=" * 60) print("WellNuo Sensor BLE Debug Tool") print("=" * 60) print("\nScanning for sensors...") devices = await BleakScanner.discover(timeout=5.0) wp_device = None for d in devices: if d.name and d.name.startswith("WP_"): wp_device = d print(f" Found: {d.name} ({d.address})") break if not wp_device: print("No WP sensor found!") return print(f"\nConnecting to {wp_device.name}...") async with BleakClient(wp_device.address) as client: print("Connected!") # Start notifications await client.start_notify(CHAR_UUID, notification_handler) # Read initial status print("\n--- Reading current status ---") data = await client.read_gatt_char(CHAR_UUID) print(f"Status: {data.decode('utf-8', errors='replace')}") # Step 1: Unlock with PIN print("\n--- Step 1: Unlock with PIN ---") response = await send_and_wait(client, f"pin|{DEVICE_PIN}") if response and "ok" in response.lower(): print("āœ“ Sensor unlocked!") else: print("āœ— Unlock failed!") return await asyncio.sleep(1) # Step 2: Get WiFi list (multiple attempts with longer timeout) print("\n--- Step 2: WiFi Scan (30 sec timeout) ---") print("Note: Sensor needs time to scan 2.4GHz networks...") response = await send_and_wait(client, "W|list", timeout=30) if response: print(f"\nFull response: {response}") if "|W|list|" in response: parts = response.split("|W|list|") if len(parts) > 1: networks = parts[1].split("|") print("\nšŸ“¶ Available WiFi networks:") for i, net in enumerate(networks, 1): if "," in net: ssid, rssi = net.rsplit(",", 1) print(f" {i}. {ssid} (signal: {rssi})") else: print("No WiFi list received") # Try reading again print("\nReading characteristic again...") await asyncio.sleep(2) data = await client.read_gatt_char(CHAR_UUID) print(f"Status: {data.decode('utf-8', errors='replace')}") # Step 3: Try a status command print("\n--- Step 3: Status check ---") response = await send_and_wait(client, "status") await client.stop_notify(CHAR_UUID) print("\n" + "=" * 60) print("Debug session complete") print("=" * 60) if __name__ == "__main__": asyncio.run(main())