// SPDX-License-Identifier: MIT // SPDX-FileCopyrightText: 2023 mysubarumq // SPDX-FileContributor: alex-savin package workers import ( "context" "fmt" "log/slog" "sync" "sync/atomic" "time" "git.savin.nyc/alex/mysubaru-mq/bus" "git.savin.nyc/alex/mysubaru-mq/config" mqtt "github.com/eclipse/paho.mqtt.golang" ) const ( ServiceAvailable = `{"value":"online"}` ServiceUnavailable = `{"value":"offline"}` ) // UPSClient is a client that connects to the APCUPSd server by establishing tcp connection type MQTTClient struct { sync.RWMutex id string // the internal id of the listener bus *bus.Bus // the internal bus for the interapp communication config *config.Config // configuration values for the listener mqtt mqtt.Client // cancel context.CancelFunc // end uint32 // ensure the close methods are only called once log *slog.Logger // server logger mqttHandlers mqttHandlers // } type mqttHandlers struct { publish mqtt.MessageHandler // connected mqtt.OnConnectHandler // disconnected mqtt.ConnectionLostHandler // } // NewUPSClient initialises and returns a UPS client func NewMQTTClient(id string, bus *bus.Bus, config *config.Config) *MQTTClient { if config == nil { slog.Error("") } return &MQTTClient{ id: id, bus: bus, config: config, } } // ID returns the id of the listener. func (w *MQTTClient) ID() string { return w.id } // ID returns the id of the listener. func (w *MQTTClient) Type() string { return "mqtt-client" } // Init . func (w *MQTTClient) Init(log *slog.Logger) error { w.log = log opts := mqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%d", w.config.MQTT.Host, w.config.MQTT.Port)) opts.SetUsername(w.config.MQTT.Username) opts.SetPassword(w.config.MQTT.Password) opts.SetClientID(w.config.MQTT.ClientId) opts.SetWill(w.config.MQTT.Topic+"/status", ServiceUnavailable, 0, true) opts.OnConnect = func(m mqtt.Client) { w.log.Debug("mqtt client is connected to a mqtt server") m.Publish(w.config.MQTT.Topic+"/status", 0, w.config.MQTT.Retained, ServiceAvailable) } w.mqtt = mqtt.NewClient(opts) token := w.mqtt.Connect() for !token.WaitTimeout(1 * time.Second) { } if err := token.Error(); err != nil { w.log.Error("couldn't connect to a mqtt server", "error", err.Error()) return err } w.mqttHandlers.publish = func(client mqtt.Client, msg mqtt.Message) { var messages []*bus.Message w.log.Debug("received mqtt message", "topic", msg.Topic(), "payload", msg.Payload()) switch msg.Topic() { case w.config.Hassio.Topics.Status: // Subscribing for the topic to resend auto discovery topics after Home Assistant restarted and got a status "online" messages = append(messages, &bus.Message{ Topic: msg.Topic(), Payload: string(msg.Payload()), }) w.bus.Publish("hassio:status", messages) case "mysubarumq/4S4BTGPD0P3199198/lock/set": messages = append(messages, &bus.Message{ Topic: msg.Topic(), Payload: string(msg.Payload()), }) w.bus.Publish("mysubarumq:4S4BTGPD0P3199198:lock", messages) case "mysubarumq/4S4BTGPD0P3199198/ignition/set": messages = append(messages, &bus.Message{ Topic: msg.Topic(), Payload: string(msg.Payload()), }) w.bus.Publish("mysubarumq:4S4BTGPD0P3199198:ignition", messages) } } w.mqttHandlers.connected = func(client mqtt.Client) { w.log.Debug("mqtt client is connected") } w.mqttHandlers.disconnected = func(client mqtt.Client, err error) { w.log.Debug("mqtt lost connection", "error", err) } return nil } // OneTime . func (w *MQTTClient) OneTime() error { return nil } // Serve starts waiting for new TCP connections, and calls the establish // connection callback for any received. func (w *MQTTClient) Serve() { if atomic.LoadUint32(&w.end) == 1 { return } chMQTTSubscribeCommand, err := w.bus.Subscribe("mqtt:subscribe", "command", w.config.SubscriptionSize["mqtt:subscribe"]) if err != nil { w.log.Error("couldn't subscribe to a channel", "channel", "mqtt:subscribe", "error", err.Error()) } chMQTTPublishStatus, err := w.bus.Subscribe("mqtt:publish", "status", w.config.SubscriptionSize["mqtt:publish"]) if err != nil { w.log.Error("couldn't subscribe to a channel", "channel", "mqtt:subscribe", "error", err.Error()) } // ctx is used only by tests. // ctx, ctxCancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background()) w.cancel = cancel go w.eventLoop(ctx, chMQTTPublishStatus, chMQTTSubscribeCommand) if atomic.LoadUint32(&w.end) == 0 { go func() { if !w.mqtt.IsConnected() { w.bus.Publish("app:connected", &bus.ConnectionStatus{WorkerID: w.ID(), WorkerType: w.Type(), IsConnected: false}) w.log.Warn("mqtt client is disconnected") } }() } } // Close closes the listener and any client connections. func (w *MQTTClient) Close() { w.Lock() defer w.Unlock() if atomic.CompareAndSwapUint32(&w.end, 0, 1) { w.cancel() w.bus.Unsubscribe("mqtt:publish", "status") w.mqtt.Publish(w.config.MQTT.Topic+"/status", 0, true, ServiceUnavailable) w.mqtt.Disconnect(250) w.log.Info("disconnected from mqtt server") } } // eventLoop loops forever func (w *MQTTClient) eventLoop(ctx context.Context, chMQTTPublishStatus, chMQTTSubscribeCommand chan bus.Event) { w.log.Debug("mqtt communication event loop started") defer w.log.Debug("mqtt communication event loop halted") for { select { case event := <-chMQTTPublishStatus: for _, message := range event.Data.([]*bus.Message) { w.log.Debug("publishing mqtt message", "topic", message.Topic, "qos", message.QOS, "retained", message.Retained, "payload", message.Payload) w.mqtt.Publish(message.Topic, message.QOS, message.Retained, message.Payload) } case event := <-chMQTTSubscribeCommand: for _, message := range event.Data.([]*bus.Message) { w.log.Debug("subscribing to a topic", "topic", message.Topic, "qos", message.QOS) // w.mqtt.SubscribeMultiple() w.mqtt.Subscribe(message.Topic, message.QOS, w.mqttHandlers.publish) } case <-ctx.Done(): w.log.Info("stopping mqtt communication event loop") return } } }