This commit is contained in:
2026-05-02 23:00:15 -04:00
parent 9801871c8a
commit db5a00895c
3 changed files with 92 additions and 52 deletions
+78 -42
View File
@@ -1,53 +1,89 @@
import time import asyncio
import json import json
import socket import time
from typing import Optional
GOVEE_API_BASE_URL = "https://openapi.api.govee.com/router/api/v1/" GOVEE_LAN_PORT = 4003
INTER_SEGMENT_DELAY = 0.1
class GoveeProtocol(asyncio.DatagramProtocol):
def __init__(self):
self.transport = None
self._pending: asyncio.Future | None = None
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
if self._pending and not self._pending.done():
self._pending.set_result(json.loads(data.decode()))
def error_received(self, exc):
if self._pending and not self._pending.done():
self._pending.set_exception(exc)
class GoveeApi: class GoveeApi:
device_ip = "" def __init__(self, device_ip: str, retries: int = 3, retry_delay: float = 0.05):
self.device_ip = device_ip
self.retries = retries
self.retry_delay = retry_delay
self._protocol: Optional[GoveeProtocol] = None
def __init__(self, device_ip): async def connect(self):
self.device_ip = device_ip loop = asyncio.get_running_loop()
_, self._protocol = await loop.create_datagram_endpoint(
GoveeProtocol,
remote_addr=(self.device_ip, GOVEE_LAN_PORT)
)
def send_scene(self, scene_code): async def close(self):
segments = scene_code.split(",") if self._protocol and self._protocol.transport:
for segment in segments: self._protocol.transport.close()
payload = {
"msg": { async def __aenter__(self):
"cmd": "ptReal", await self.connect()
"data": { return self
"command": [segment]
} async def __aexit__(self, *args):
await self.close()
async def send_scene(self, scene_code: str):
segments = scene_code.split(",")
for i, segment in enumerate(segments):
payload = {"msg": {"cmd": "ptReal", "data": {"command": [segment]}}}
await self._send(payload)
if i < len(segments) - 1:
await asyncio.sleep(INTER_SEGMENT_DELAY)
async def set_color(self, r: int, g: int, b: int, kelvin: int = 0):
payload = {
"msg": {
"cmd": "colorwc",
"data": {
"color": {"r": r, "g": g, "b": b},
"colorTemInKelvin": kelvin
}
}
} }
} await self._send(payload)
self.send_over_socket(payload)
def set_to_original_color(self): async def _send(self, payload: dict) -> Optional[dict]:
payload = { if not self._protocol:
"msg": { raise RuntimeError("Not connected — use async with or call connect() first")
"cmd": "colorwc",
"data": {
"color": {"r": 255, "g": 0, "b": 0},
"colorTemInKelvin": 0
}
}
}
self.send_over_socket(payload)
def send_over_socket(self, payload, retries=2, delay=0.1): data = json.dumps(payload).encode()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) loop = asyncio.get_running_loop()
sock.settimeout(1)
data = json.dumps(payload).encode()
for attempt in range(retries): for attempt in range(self.retries):
sock.sendto(data, (self.device_ip, 4003)) future: asyncio.Future = loop.create_future()
try: self._protocol._pending = future
response, _ = sock.recvfrom(1024) self._protocol.transport.sendto(data)
print(f"Got response on attempt {attempt + 1}")
break # stop retrying once we get an ack
except socket.timeout:
if attempt < retries - 1:
time.sleep(delay)
sock.close() try:
return await asyncio.wait_for(future, timeout=1.0)
except asyncio.TimeoutError:
if attempt < self.retries - 1:
await asyncio.sleep(self.retry_delay)
print(f"Warning: no response after {self.retries} attempts")
return None
+7 -7
View File
@@ -2,7 +2,7 @@ import time
import requests import requests
import os import os
import govee import govee
import pygame import asyncio
from time import sleep from time import sleep
from PIL import Image, ImageDraw, ImageFont from PIL import Image, ImageDraw, ImageFont
from rgbmatrix import RGBMatrix, RGBMatrixOptions, graphics from rgbmatrix import RGBMatrix, RGBMatrixOptions, graphics
@@ -282,7 +282,7 @@ def draw_all_games(canvas, games, start_index):
canvas.SetPixel(offset + 63, row, 40, 40, 40) canvas.SetPixel(offset + 63, row, 40, 40, 40)
# --- Main loop --- # --- Main loop ---
def run(): async def run():
global canvas global canvas
games = [] games = []
prev_scores = {} prev_scores = {}
@@ -292,10 +292,10 @@ def run():
last_switch = time.time() last_switch = time.time()
while True: while True:
govee_api.send_scene(GOVEE_AWS) async with govee_api:
play_goal_celebration() await govee.send_scene(GOVEE_AWS)
sleep(10) play_goal_celebration()
govee_api.set_to_original_color() await govee_api.set_color(255,0,0)
# while True: # while True:
# now = time.time() # now = time.time()
@@ -335,4 +335,4 @@ def run():
# time.sleep(0.03) # time.sleep(0.03)
if __name__ == "__main__": if __name__ == "__main__":
run() asyncio.run(run())
+4
View File
@@ -0,0 +1,4 @@
import govee
govee_api = govee.GoveeApi(device_ip="172.16.0.15")
govee_api.send_scene("owABAgT/AGQMACr//+YAADb//8k=,o//QAAIDAwAAAAAAAAAAAAAAAI4=,MwUKdwAAAAAAAAAAAAAAAAAAAEs=")