208 lines
6.1 KiB
Go
208 lines
6.1 KiB
Go
// SPDX-License-Identifier: MIT
|
|
// SPDX-FileCopyrightText: 2023 mysubarumq
|
|
// SPDX-FileContributor: alex-savin
|
|
|
|
package workers
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.savin.nyc/alex/mysubaru-mq/bus"
|
|
"git.savin.nyc/alex/mysubaru-mq/config"
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
)
|
|
|
|
const (
|
|
ServiceAvailable = `{"value":"online"}`
|
|
ServiceUnavailable = `{"value":"offline"}`
|
|
)
|
|
|
|
// UPSClient is a client that connects to the APCUPSd server by establishing tcp connection
|
|
type MQTTClient struct {
|
|
sync.RWMutex
|
|
id string // the internal id of the listener
|
|
bus *bus.Bus // the internal bus for the interapp communication
|
|
config *config.Config // configuration values for the listener
|
|
mqtt mqtt.Client //
|
|
cancel context.CancelFunc //
|
|
end uint32 // ensure the close methods are only called once
|
|
log *slog.Logger // server logger
|
|
mqttHandlers mqttHandlers //
|
|
}
|
|
|
|
type mqttHandlers struct {
|
|
publish mqtt.MessageHandler //
|
|
connected mqtt.OnConnectHandler //
|
|
disconnected mqtt.ConnectionLostHandler //
|
|
}
|
|
|
|
// NewUPSClient initialises and returns a UPS client
|
|
func NewMQTTClient(id string, bus *bus.Bus, config *config.Config) *MQTTClient {
|
|
if config == nil {
|
|
slog.Error("")
|
|
}
|
|
|
|
return &MQTTClient{
|
|
id: id,
|
|
bus: bus,
|
|
config: config,
|
|
}
|
|
}
|
|
|
|
// ID returns the id of the listener.
|
|
func (w *MQTTClient) ID() string {
|
|
return w.id
|
|
}
|
|
|
|
// ID returns the id of the listener.
|
|
func (w *MQTTClient) Type() string {
|
|
return "mqtt-client"
|
|
}
|
|
|
|
// Init .
|
|
func (w *MQTTClient) Init(log *slog.Logger) error {
|
|
w.log = log
|
|
|
|
opts := mqtt.NewClientOptions()
|
|
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", w.config.MQTT.Host, w.config.MQTT.Port))
|
|
opts.SetUsername(w.config.MQTT.Username)
|
|
opts.SetPassword(w.config.MQTT.Password)
|
|
opts.SetClientID(w.config.MQTT.ClientId)
|
|
opts.SetWill(w.config.MQTT.Topic+"/status", ServiceUnavailable, 0, true)
|
|
opts.OnConnect = func(m mqtt.Client) {
|
|
w.log.Debug("mqtt client is connected to a mqtt server")
|
|
m.Publish(w.config.MQTT.Topic+"/status", 0, w.config.MQTT.Retained, ServiceAvailable)
|
|
}
|
|
|
|
w.mqtt = mqtt.NewClient(opts)
|
|
token := w.mqtt.Connect()
|
|
for !token.WaitTimeout(1 * time.Second) {
|
|
}
|
|
if err := token.Error(); err != nil {
|
|
w.log.Error("couldn't connect to a mqtt server", "error", err.Error())
|
|
|
|
return err
|
|
}
|
|
|
|
w.mqttHandlers.publish = func(client mqtt.Client, msg mqtt.Message) {
|
|
var messages []*bus.Message
|
|
|
|
w.log.Debug("received mqtt message", "topic", msg.Topic(), "payload", msg.Payload())
|
|
switch msg.Topic() {
|
|
case w.config.Hassio.Topics.Status:
|
|
// Subscribing for the topic to resend auto discovery topics after Home Assistant restarted and got a status "online"
|
|
messages = append(messages, &bus.Message{
|
|
Topic: msg.Topic(),
|
|
Payload: string(msg.Payload()),
|
|
})
|
|
w.bus.Publish("hassio:status", messages)
|
|
case "mysubarumq/4S4BTGPD0P3199198/lock/set":
|
|
messages = append(messages, &bus.Message{
|
|
Topic: msg.Topic(),
|
|
Payload: string(msg.Payload()),
|
|
})
|
|
w.bus.Publish("mysubarumq:4S4BTGPD0P3199198:lock", messages)
|
|
case "mysubarumq/4S4BTGPD0P3199198/ignition/set":
|
|
messages = append(messages, &bus.Message{
|
|
Topic: msg.Topic(),
|
|
Payload: string(msg.Payload()),
|
|
})
|
|
w.bus.Publish("mysubarumq:4S4BTGPD0P3199198:ignition", messages)
|
|
}
|
|
}
|
|
|
|
w.mqttHandlers.connected = func(client mqtt.Client) {
|
|
w.log.Debug("mqtt client is connected")
|
|
}
|
|
|
|
w.mqttHandlers.disconnected = func(client mqtt.Client, err error) {
|
|
w.log.Debug("mqtt lost connection", "error", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// OneTime .
|
|
func (w *MQTTClient) OneTime() error {
|
|
return nil
|
|
}
|
|
|
|
// Serve starts waiting for new TCP connections, and calls the establish
|
|
// connection callback for any received.
|
|
func (w *MQTTClient) Serve() {
|
|
if atomic.LoadUint32(&w.end) == 1 {
|
|
return
|
|
}
|
|
|
|
chMQTTSubscribeCommand, err := w.bus.Subscribe("mqtt:subscribe", "command", w.config.SubscriptionSize["mqtt:subscribe"])
|
|
if err != nil {
|
|
w.log.Error("couldn't subscribe to a channel", "channel", "mqtt:subscribe", "error", err.Error())
|
|
}
|
|
|
|
chMQTTPublishStatus, err := w.bus.Subscribe("mqtt:publish", "status", w.config.SubscriptionSize["mqtt:publish"])
|
|
if err != nil {
|
|
w.log.Error("couldn't subscribe to a channel", "channel", "mqtt:subscribe", "error", err.Error())
|
|
}
|
|
|
|
// ctx is used only by tests.
|
|
// ctx, ctxCancel := context.WithCancel(context.Background())
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
w.cancel = cancel
|
|
|
|
go w.eventLoop(ctx, chMQTTPublishStatus, chMQTTSubscribeCommand)
|
|
|
|
if atomic.LoadUint32(&w.end) == 0 {
|
|
go func() {
|
|
if !w.mqtt.IsConnected() {
|
|
w.bus.Publish("app:connected", &bus.ConnectionStatus{WorkerID: w.ID(), WorkerType: w.Type(), IsConnected: false})
|
|
w.log.Warn("mqtt client is disconnected")
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
// Close closes the listener and any client connections.
|
|
func (w *MQTTClient) Close() {
|
|
w.Lock()
|
|
defer w.Unlock()
|
|
|
|
if atomic.CompareAndSwapUint32(&w.end, 0, 1) {
|
|
w.cancel()
|
|
w.bus.Unsubscribe("mqtt:publish", "status")
|
|
w.mqtt.Publish(w.config.MQTT.Topic+"/status", 0, true, ServiceUnavailable)
|
|
w.mqtt.Disconnect(250)
|
|
w.log.Info("disconnected from mqtt server")
|
|
}
|
|
}
|
|
|
|
// eventLoop loops forever
|
|
func (w *MQTTClient) eventLoop(ctx context.Context, chMQTTPublishStatus, chMQTTSubscribeCommand chan bus.Event) {
|
|
w.log.Debug("mqtt communication event loop started")
|
|
defer w.log.Debug("mqtt communication event loop halted")
|
|
|
|
for {
|
|
select {
|
|
case event := <-chMQTTPublishStatus:
|
|
for _, message := range event.Data.([]*bus.Message) {
|
|
w.log.Debug("publishing mqtt message", "topic", message.Topic, "qos", message.QOS, "retained", message.Retained, "payload", message.Payload)
|
|
w.mqtt.Publish(message.Topic, message.QOS, message.Retained, message.Payload)
|
|
}
|
|
case event := <-chMQTTSubscribeCommand:
|
|
for _, message := range event.Data.([]*bus.Message) {
|
|
w.log.Debug("subscribing to a topic", "topic", message.Topic, "qos", message.QOS)
|
|
// w.mqtt.SubscribeMultiple()
|
|
w.mqtt.Subscribe(message.Topic, message.QOS, w.mqttHandlers.publish)
|
|
}
|
|
case <-ctx.Done():
|
|
w.log.Info("stopping mqtt communication event loop")
|
|
return
|
|
}
|
|
}
|
|
}
|