nixos-config/mqtt/fyrtur.py

145 lines
4.4 KiB
Python
Raw Normal View History

2021-03-17 09:09:44 +01:00
import time
from enum import Enum
from typing import Dict
import paho.mqtt.client as mqtt
import json
import threading
scene = "up-dark"
class Position(Enum):
UP = 1
DOWN = 2
HALF = 3
class Fyrtur:
2021-03-20 13:28:40 +01:00
def __init__(self, topic, top, bottom):
2021-03-17 09:09:44 +01:00
self.topic = topic
self.top = top
self.bottom = bottom
self.current_position = 100
self.wanted_position = 100
def update_position(self, payload):
self.current_position = payload["position"]
def needs_publish(self):
return self.wanted_position != self.current_position
2021-03-20 13:28:40 +01:00
def topic_and_payload_for_set(self):
2021-03-17 09:09:44 +01:00
payload = {"position": self.wanted_position}
2021-03-20 13:28:40 +01:00
return ("%s/set" % self.topic), json.dumps(payload)
2021-03-17 09:09:44 +01:00
class FyrturWatcher:
def __init__(self, fyrturs: Dict[str, Fyrtur]):
self.fyrturs = fyrturs
def get_topics(self):
return [fyrtur.topic for fyrtur in self.fyrturs.values()]
def update_position(self, topic, payload):
for fyrtur in self.fyrturs.values():
if fyrtur.topic == topic:
fyrtur.update_position(payload)
return
2021-03-18 13:15:51 +01:00
def update(self, name, position: Position):
2021-03-17 09:09:44 +01:00
fyrtur: Fyrtur = self.fyrturs.get(name)
if position == Position.UP:
fyrtur.wanted_position = fyrtur.top
elif position == Position.DOWN:
fyrtur.wanted_position = fyrtur.bottom
elif position == Position.HALF:
2021-03-18 13:15:51 +01:00
fyrtur.wanted_position = round((fyrtur.top - fyrtur.bottom) / 2 + fyrtur.bottom)
2021-03-17 09:09:44 +01:00
def publish(self, client):
for fyrtur in self.fyrturs.values():
if fyrtur.needs_publish():
2021-03-20 13:28:40 +01:00
topic, payload = fyrtur.topic_and_payload_for_set()
client.publish(topic, payload)
2021-03-17 09:09:44 +01:00
time.sleep(2)
watcher = FyrturWatcher({
2021-03-20 13:28:40 +01:00
"office1": Fyrtur(topic="zigbee2mqtt/office_fyrtur_1", top=100, bottom=16),
"office2": Fyrtur(topic="zigbee2mqtt/office_fyrtur_2", top=100, bottom=22),
"bedroom": Fyrtur(topic="zigbee2mqtt/bedroom_fyrtur_1", top=100, bottom=16),
2021-03-17 09:09:44 +01:00
})
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, _userdata, _flags, rc):
print("Connected with result code " + str(rc))
threading.Thread(target=loop_thread, args=(client,), daemon=True).start()
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("control/lights/set")
for topic in watcher.get_topics():
client.subscribe(topic)
# The callback for when a PUBLISH message is received from the server.
def on_message(client, _userdata, msg):
global scene
(topic, payload) = parse_message(msg)
if topic == "control/lights/set":
print("set scene %s -> %s" % (scene, payload['scene']))
scene = payload['scene']
update_scene(client)
else:
print("got %s" % topic)
watcher.update_position(topic, payload)
def parse_message(msg):
m_decode = str(msg.payload.decode("utf-8", "ignore"))
payload = json.loads(m_decode) # decode json data
return msg.topic, payload
def update_scene(client):
if scene in ["night", "down"]:
watcher.update("office1", Position.DOWN)
watcher.update("office2", Position.DOWN)
watcher.update("bedroom", Position.DOWN)
elif scene in ["default", "up-bright", "up-dark", "outside"]:
2021-03-17 09:09:44 +01:00
watcher.update("office1", Position.UP)
watcher.update("office2", Position.UP)
watcher.update("bedroom", Position.UP)
elif scene in ["half"]:
watcher.update("office1", Position.HALF)
watcher.update("office2", Position.HALF)
watcher.update("bedroom", Position.HALF)
else:
watcher.update("office1", Position.UP)
watcher.update("office2", Position.UP)
watcher.update("bedroom", Position.UP)
2021-03-17 09:09:44 +01:00
watcher.publish(client)
def loop_thread(client):
while True:
watcher.publish(client)
time.sleep(120)
if __name__ == "__main__":
mqttClient = mqtt.Client()
mqttClient.on_connect = on_connect
mqttClient.on_message = on_message
mqttClient.username_pw_set("homeassistant", password="password")
mqttClient.connect("pepe.private", 1883, 60)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
mqttClient.loop_forever()