From db5a00895c37b29a7abdb216cf01968b6919ae86 Mon Sep 17 00:00:00 2001 From: Alex Frantz Date: Sat, 2 May 2026 23:00:15 -0400 Subject: [PATCH] async --- govee.py | 126 ++++++++++++++++++++++++++++++++------------------ scoreboard.py | 14 +++--- test.py | 4 ++ 3 files changed, 92 insertions(+), 52 deletions(-) create mode 100644 test.py diff --git a/govee.py b/govee.py index 6fa2163..ac5d6fc 100644 --- a/govee.py +++ b/govee.py @@ -1,53 +1,89 @@ -import time +import asyncio import json -import socket +import time +from typing import Optional -GOVEE_API_BASE_URL = "https://openapi.api.govee.com/router/api/v1/" +GOVEE_LAN_PORT = 4003 +INTER_SEGMENT_DELAY = 0.1 + +class GoveeProtocol(asyncio.DatagramProtocol): + def __init__(self): + self.transport = None + self._pending: asyncio.Future | None = None + + def connection_made(self, transport): + self.transport = transport + + def datagram_received(self, data, addr): + if self._pending and not self._pending.done(): + self._pending.set_result(json.loads(data.decode())) + + def error_received(self, exc): + if self._pending and not self._pending.done(): + self._pending.set_exception(exc) class GoveeApi: - device_ip = "" + def __init__(self, device_ip: str, retries: int = 3, retry_delay: float = 0.05): + self.device_ip = device_ip + self.retries = retries + self.retry_delay = retry_delay + self._protocol: Optional[GoveeProtocol] = None - def __init__(self, device_ip): - self.device_ip = device_ip + async def connect(self): + loop = asyncio.get_running_loop() + _, self._protocol = await loop.create_datagram_endpoint( + GoveeProtocol, + remote_addr=(self.device_ip, GOVEE_LAN_PORT) + ) - def send_scene(self, scene_code): - segments = scene_code.split(",") - for segment in segments: - payload = { - "msg": { - "cmd": "ptReal", - "data": { - "command": [segment] - } + async def close(self): + if self._protocol and self._protocol.transport: + self._protocol.transport.close() + + async def __aenter__(self): + await self.connect() + return self + + async def __aexit__(self, *args): + await self.close() + + async def send_scene(self, scene_code: str): + segments = scene_code.split(",") + for i, segment in enumerate(segments): + payload = {"msg": {"cmd": "ptReal", "data": {"command": [segment]}}} + await self._send(payload) + if i < len(segments) - 1: + await asyncio.sleep(INTER_SEGMENT_DELAY) + + async def set_color(self, r: int, g: int, b: int, kelvin: int = 0): + payload = { + "msg": { + "cmd": "colorwc", + "data": { + "color": {"r": r, "g": g, "b": b}, + "colorTemInKelvin": kelvin + } + } } - } - self.send_over_socket(payload) + await self._send(payload) - def set_to_original_color(self): - payload = { - "msg": { - "cmd": "colorwc", - "data": { - "color": {"r": 255, "g": 0, "b": 0}, - "colorTemInKelvin": 0 - } - } - } - self.send_over_socket(payload) - - def send_over_socket(self, payload, retries=2, delay=0.1): - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.settimeout(1) - data = json.dumps(payload).encode() - - for attempt in range(retries): - sock.sendto(data, (self.device_ip, 4003)) - try: - response, _ = sock.recvfrom(1024) - print(f"Got response on attempt {attempt + 1}") - break # stop retrying once we get an ack - except socket.timeout: - if attempt < retries - 1: - time.sleep(delay) - - sock.close() + async def _send(self, payload: dict) -> Optional[dict]: + if not self._protocol: + raise RuntimeError("Not connected — use async with or call connect() first") + + data = json.dumps(payload).encode() + loop = asyncio.get_running_loop() + + for attempt in range(self.retries): + future: asyncio.Future = loop.create_future() + self._protocol._pending = future + self._protocol.transport.sendto(data) + + try: + return await asyncio.wait_for(future, timeout=1.0) + except asyncio.TimeoutError: + if attempt < self.retries - 1: + await asyncio.sleep(self.retry_delay) + + print(f"Warning: no response after {self.retries} attempts") + return None \ No newline at end of file diff --git a/scoreboard.py b/scoreboard.py index 1d01069..985b66c 100644 --- a/scoreboard.py +++ b/scoreboard.py @@ -2,7 +2,7 @@ import time import requests import os import govee -import pygame +import asyncio from time import sleep from PIL import Image, ImageDraw, ImageFont from rgbmatrix import RGBMatrix, RGBMatrixOptions, graphics @@ -282,7 +282,7 @@ def draw_all_games(canvas, games, start_index): canvas.SetPixel(offset + 63, row, 40, 40, 40) # --- Main loop --- -def run(): +async def run(): global canvas games = [] prev_scores = {} @@ -292,10 +292,10 @@ def run(): last_switch = time.time() while True: - govee_api.send_scene(GOVEE_AWS) - play_goal_celebration() - sleep(10) - govee_api.set_to_original_color() + async with govee_api: + await govee.send_scene(GOVEE_AWS) + play_goal_celebration() + await govee_api.set_color(255,0,0) # while True: # now = time.time() @@ -335,4 +335,4 @@ def run(): # time.sleep(0.03) if __name__ == "__main__": - run() \ No newline at end of file + asyncio.run(run()) \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..ee5acd3 --- /dev/null +++ b/test.py @@ -0,0 +1,4 @@ +import govee + +govee_api = govee.GoveeApi(device_ip="172.16.0.15") +govee_api.send_scene("owABAgT/AGQMACr//+YAADb//8k=,o//QAAIDAwAAAAAAAAAAAAAAAI4=,MwUKdwAAAAAAAAAAAAAAAAAAAEs=") \ No newline at end of file