import time from enum import Enum from typing import Dict import paho.mqtt.client as mqtt import json import threading scene = "up-dark" class Position(Enum): UP = 1 DOWN = 2 HALF = 3 class Fyrtur: def __init__(self, topic, top, bottom): self.topic = topic self.top = top self.bottom = bottom self.current_position = 100 self.wanted_position = 100 def update_position(self, payload): self.current_position = payload["position"] def needs_publish(self): return self.wanted_position != self.current_position def topic_and_payload_for_set(self): payload = {"position": self.wanted_position} return ("%s/set" % self.topic), json.dumps(payload) class FyrturWatcher: def __init__(self, fyrturs: Dict[str, Fyrtur]): self.fyrturs = fyrturs def get_topics(self): return [fyrtur.topic for fyrtur in self.fyrturs.values()] def update_position(self, topic, payload): for fyrtur in self.fyrturs.values(): if fyrtur.topic == topic: fyrtur.update_position(payload) return def update(self, name, position: Position): fyrtur: Fyrtur = self.fyrturs.get(name) if position == Position.UP: fyrtur.wanted_position = fyrtur.top elif position == Position.DOWN: fyrtur.wanted_position = fyrtur.bottom elif position == Position.HALF: fyrtur.wanted_position = round( (fyrtur.top - fyrtur.bottom) / 2 + fyrtur.bottom ) def publish(self, client): for fyrtur in self.fyrturs.values(): if fyrtur.needs_publish(): topic, payload = fyrtur.topic_and_payload_for_set() client.publish(topic, payload) time.sleep(2) watcher = FyrturWatcher( { "office1": Fyrtur(topic="zigbee2mqtt/office_fyrtur_1", top=100, bottom=16), "office2": Fyrtur(topic="zigbee2mqtt/office_fyrtur_2", top=100, bottom=22), "bedroom": Fyrtur(topic="zigbee2mqtt/bedroom_fyrtur_1", top=100, bottom=16), } ) # The callback for when the client receives a CONNACK response from the server. def on_connect(client, _userdata, _flags, rc): print("Connected with result code " + str(rc)) threading.Thread(target=loop_thread, args=(client,), daemon=True).start() # Subscribing in on_connect() means that if we lose the connection and # reconnect then subscriptions will be renewed. client.subscribe("control/lights/set") for topic in watcher.get_topics(): client.subscribe(topic) # The callback for when a PUBLISH message is received from the server. def on_message(client, _userdata, msg): global scene (topic, payload) = parse_message(msg) if topic == "control/lights/set": print("set scene %s -> %s" % (scene, payload["scene"])) scene = payload["scene"] update_scene(client) else: print("got %s" % topic) watcher.update_position(topic, payload) def parse_message(msg): m_decode = str(msg.payload.decode("utf-8", "ignore")) payload = json.loads(m_decode) # decode json data return msg.topic, payload def update_scene(client): if scene in ["night", "down"]: watcher.update("office1", Position.DOWN) watcher.update("office2", Position.DOWN) watcher.update("bedroom", Position.DOWN) elif scene in ["default", "up-bright", "up-dark", "outside"]: watcher.update("office1", Position.UP) watcher.update("office2", Position.UP) watcher.update("bedroom", Position.UP) elif scene in ["half"]: watcher.update("office1", Position.HALF) watcher.update("office2", Position.HALF) watcher.update("bedroom", Position.HALF) else: watcher.update("office1", Position.UP) watcher.update("office2", Position.UP) watcher.update("bedroom", Position.UP) watcher.publish(client) def loop_thread(client): while True: watcher.publish(client) time.sleep(120) if __name__ == "__main__": mqttClient = mqtt.Client() mqttClient.on_connect = on_connect mqttClient.on_message = on_message mqttClient.username_pw_set("homeassistant", password="password") mqttClient.connect("pepe.private", 1883, 60) # Blocking call that processes network traffic, dispatches callbacks and # handles reconnecting. # Other loop*() functions are available that give a threaded interface and a # manual interface. mqttClient.loop_forever()