import time from enum import Enum from typing import Dict import paho.mqtt.client as mqtt import json import threading # initial scene from heater.modules.heater import Heater from heater.modules.watcher import Watcher scene = "up-dark" class Position(Enum): UP = 1 DOWN = 2 HALF = 3 class Fyrtur: def __init__(self, topic, set_topic, top, bottom): self.topic = topic self.set_topic = set_topic self.top = top self.bottom = bottom self.current_position = 100 self.wanted_position = 100 def update_position(self, payload): self.current_position = payload["position"] def needs_publish(self): return self.wanted_position != self.current_position def payload(self): payload = {"position": self.wanted_position} return json.dumps(payload) class FyrturWatcher: def __init__(self, fyrturs: Dict[str, Fyrtur]): self.fyrturs = fyrturs def get_topics(self): return [fyrtur.topic for fyrtur in self.fyrturs.values()] def update_position(self, topic, payload): for fyrtur in self.fyrturs.values(): if fyrtur.topic == topic: fyrtur.update_position(payload) return def update(self, name, position : Position): fyrtur: Fyrtur = self.fyrturs.get(name) if position == Position.UP: fyrtur.wanted_position = fyrtur.top elif position == Position.DOWN: fyrtur.wanted_position = fyrtur.bottom elif position == Position.HALF: fyrtur.wanted_position = round((fyrtur.top - fyrtur.bottom) / 2) def publish(self, client): for fyrtur in self.fyrturs.values(): if fyrtur.needs_publish(): client.publish(fyrtur.set_topic, fyrtur.payload()) time.sleep(2) watcher = FyrturWatcher({ "office1": Fyrtur(topic="zigbee2mqtt/fyrtur1", set_topic="zigbee2mqtt/fyrtur1/set", top=100, bottom=16), "office2": Fyrtur(topic="zigbee2mqtt/fyrtur4", set_topic="zigbee2mqtt/fyrtur4/set", top=100, bottom=22), "bedroom": Fyrtur(topic="zigbee2mqtt/fyrtur2", set_topic="zigbee2mqtt/fyrtur2/set", top=100, bottom=16), }) # The callback for when the client receives a CONNACK response from the server. def on_connect(client, _userdata, _flags, rc): print("Connected with result code " + str(rc)) threading.Thread(target=loop_thread, args=(client,), daemon=True).start() # Subscribing in on_connect() means that if we lose the connection and # reconnect then subscriptions will be renewed. client.subscribe("control/lights/set") for topic in watcher.get_topics(): client.subscribe(topic) # watcher.pull_values(client) # The callback for when a PUBLISH message is received from the server. def on_message(client, _userdata, msg): global scene (topic, payload) = parse_message(msg) if topic == "control/lights/set": print("set scene %s -> %s" % (scene, payload['scene'])) scene = payload['scene'] update_scene(client) else: print("got %s" % topic) watcher.update_position(topic, payload) def parse_message(msg): m_decode = str(msg.payload.decode("utf-8", "ignore")) payload = json.loads(m_decode) # decode json data return msg.topic, payload def update_scene(client): if scene in ["night", "down"]: watcher.update("office1", Position.DOWN) watcher.update("office2", Position.DOWN) watcher.update("bedroom", Position.DOWN) elif scene in ["default", "up-bright", "up-dark"]: watcher.update("office1", Position.UP) watcher.update("office2", Position.UP) watcher.update("bedroom", Position.UP) elif scene in ["half"]: watcher.update("office1", Position.HALF) watcher.update("office2", Position.HALF) watcher.update("bedroom", Position.HALF) else: watcher.update("office1", Position.DOWN) watcher.update("office2", Position.DOWN) watcher.update("bedroom", Position.DOWN) watcher.publish(client) def loop_thread(client): while True: watcher.publish(client) time.sleep(120) if __name__ == "__main__": mqttClient = mqtt.Client() mqttClient.on_connect = on_connect mqttClient.on_message = on_message mqttClient.username_pw_set("homeassistant", password="password") mqttClient.connect("pepe.private", 1883, 60) # Blocking call that processes network traffic, dispatches callbacks and # handles reconnecting. # Other loop*() functions are available that give a threaded interface and a # manual interface. mqttClient.loop_forever()