update govee class

This commit is contained in:
2026-05-02 23:12:12 -04:00
parent f84b52543a
commit 00fbcc1ce6
+34 -70
View File
@@ -1,53 +1,16 @@
import asyncio
import json
import time
from typing import Optional
import json
import socket
GOVEE_LAN_PORT = 4003
INTER_SEGMENT_DELAY = 0.1
class GoveeProtocol(asyncio.DatagramProtocol):
def __init__(self):
self.transport = None
self._pending: asyncio.Future | None = None
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
if self._pending and not self._pending.done():
self._pending.set_result(json.loads(data.decode()))
def error_received(self, exc):
if self._pending and not self._pending.done():
self._pending.set_exception(exc)
GOVEE_API_BASE_URL = "https://openapi.api.govee.com/router/api/v1/"
class GoveeApi:
def __init__(self, device_ip: str, retries: int = 3, retry_delay: float = 0.05):
device_ip = ""
def __init__(self, device_ip):
self.device_ip = device_ip
self.retries = retries
self.retry_delay = retry_delay
self._protocol: Optional[GoveeProtocol] = None
async def connect(self):
loop = asyncio.get_running_loop()
_, self._protocol = await loop.create_datagram_endpoint(
GoveeProtocol,
remote_addr=(self.device_ip, GOVEE_LAN_PORT)
)
async def close(self):
if self._protocol and self._protocol.transport:
self._protocol.transport.close()
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, *args):
await self.close()
async def send_scene(self, scene_code: str):
def send_scene(self, scene_code):
power_payload = {
"msg": {
"cmd": "turn",
@@ -56,45 +19,46 @@ class GoveeApi:
}
}
}
self._send(power_payload)
self.send_over_socket(power_payload)
time.sleep(0.5)
segments = scene_code.split(",")
for i, segment in enumerate(segments):
payload = {"msg": {"cmd": "ptReal", "data": {"command": [segment]}}}
await self._send(payload)
if i < len(segments) - 1:
await asyncio.sleep(INTER_SEGMENT_DELAY)
for segment in segments:
payload = {
"msg": {
"cmd": "ptReal",
"data": {
"command": [segment]
}
}
}
self.send_over_socket(payload)
async def set_color(self, r: int, g: int, b: int, kelvin: int = 0):
def set_to_original_color(self):
payload = {
"msg": {
"cmd": "colorwc",
"data": {
"color": {"r": r, "g": g, "b": b},
"colorTemInKelvin": kelvin
"color": {"r": 255, "g": 0, "b": 0},
"colorTemInKelvin": 0
}
}
}
await self._send(payload)
async def _send(self, payload: dict) -> Optional[dict]:
if not self._protocol:
raise RuntimeError("Not connected — use async with or call connect() first")
self.send_over_socket(payload)
def send_over_socket(self, payload, retries=2, delay=0.1):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1)
data = json.dumps(payload).encode()
loop = asyncio.get_running_loop()
for attempt in range(self.retries):
future: asyncio.Future = loop.create_future()
self._protocol._pending = future
self._protocol.transport.sendto(data)
for attempt in range(retries):
sock.sendto(data, (self.device_ip, 4003))
try:
return await asyncio.wait_for(future, timeout=1.0)
except asyncio.TimeoutError:
if attempt < self.retries - 1:
await asyncio.sleep(self.retry_delay)
response, _ = sock.recvfrom(1024)
print(f"Got response on attempt {attempt + 1}")
break # stop retrying once we get an ack
except socket.timeout:
if attempt < retries - 1:
time.sleep(delay)
print(f"Warning: no response after {self.retries} attempts")
return None
sock.close()