first commit
This commit is contained in:
207
workers/mqtt.go
Normal file
207
workers/mqtt.go
Normal file
@ -0,0 +1,207 @@
|
||||
// SPDX-License-Identifier: MIT
|
||||
// SPDX-FileCopyrightText: 2023 mysubarumq
|
||||
// SPDX-FileContributor: alex-savin
|
||||
|
||||
package workers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.savin.nyc/alex/mysubaru-mq/bus"
|
||||
"git.savin.nyc/alex/mysubaru-mq/config"
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
)
|
||||
|
||||
const (
|
||||
ServiceAvailable = `{"value":"online"}`
|
||||
ServiceUnavailable = `{"value":"offline"}`
|
||||
)
|
||||
|
||||
// UPSClient is a client that connects to the APCUPSd server by establishing tcp connection
|
||||
type MQTTClient struct {
|
||||
sync.RWMutex
|
||||
id string // the internal id of the listener
|
||||
bus *bus.Bus // the internal bus for the interapp communication
|
||||
config *config.Config // configuration values for the listener
|
||||
mqtt mqtt.Client //
|
||||
cancel context.CancelFunc //
|
||||
end uint32 // ensure the close methods are only called once
|
||||
log *slog.Logger // server logger
|
||||
mqttHandlers mqttHandlers //
|
||||
}
|
||||
|
||||
type mqttHandlers struct {
|
||||
publish mqtt.MessageHandler //
|
||||
connected mqtt.OnConnectHandler //
|
||||
disconnected mqtt.ConnectionLostHandler //
|
||||
}
|
||||
|
||||
// NewUPSClient initialises and returns a UPS client
|
||||
func NewMQTTClient(id string, bus *bus.Bus, config *config.Config) *MQTTClient {
|
||||
if config == nil {
|
||||
slog.Error("")
|
||||
}
|
||||
|
||||
return &MQTTClient{
|
||||
id: id,
|
||||
bus: bus,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
// ID returns the id of the listener.
|
||||
func (w *MQTTClient) ID() string {
|
||||
return w.id
|
||||
}
|
||||
|
||||
// ID returns the id of the listener.
|
||||
func (w *MQTTClient) Type() string {
|
||||
return "mqtt-client"
|
||||
}
|
||||
|
||||
// Init .
|
||||
func (w *MQTTClient) Init(log *slog.Logger) error {
|
||||
w.log = log
|
||||
|
||||
opts := mqtt.NewClientOptions()
|
||||
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", w.config.MQTT.Host, w.config.MQTT.Port))
|
||||
opts.SetUsername(w.config.MQTT.Username)
|
||||
opts.SetPassword(w.config.MQTT.Password)
|
||||
opts.SetClientID(w.config.MQTT.ClientId)
|
||||
opts.SetWill(w.config.MQTT.Topic+"/status", ServiceUnavailable, 0, true)
|
||||
opts.OnConnect = func(m mqtt.Client) {
|
||||
w.log.Debug("mqtt client is connected to a mqtt server")
|
||||
m.Publish(w.config.MQTT.Topic+"/status", 0, w.config.MQTT.Retained, ServiceAvailable)
|
||||
}
|
||||
|
||||
w.mqtt = mqtt.NewClient(opts)
|
||||
token := w.mqtt.Connect()
|
||||
for !token.WaitTimeout(1 * time.Second) {
|
||||
}
|
||||
if err := token.Error(); err != nil {
|
||||
w.log.Error("couldn't connect to a mqtt server", "error", err.Error())
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
w.mqttHandlers.publish = func(client mqtt.Client, msg mqtt.Message) {
|
||||
var messages []*bus.Message
|
||||
|
||||
w.log.Debug("received mqtt message", "topic", msg.Topic(), "payload", msg.Payload())
|
||||
switch msg.Topic() {
|
||||
case w.config.Hassio.Topics.Status:
|
||||
// Subscribing for the topic to resend auto discovery topics after Home Assistant restarted and got a status "online"
|
||||
messages = append(messages, &bus.Message{
|
||||
Topic: msg.Topic(),
|
||||
Payload: string(msg.Payload()),
|
||||
})
|
||||
w.bus.Publish("hassio:status", messages)
|
||||
case "mysubarumq/4S4BTGPD0P3199198/lock/set":
|
||||
messages = append(messages, &bus.Message{
|
||||
Topic: msg.Topic(),
|
||||
Payload: string(msg.Payload()),
|
||||
})
|
||||
w.bus.Publish("mysubarumq:4S4BTGPD0P3199198:lock", messages)
|
||||
case "mysubarumq/4S4BTGPD0P3199198/ignition/set":
|
||||
messages = append(messages, &bus.Message{
|
||||
Topic: msg.Topic(),
|
||||
Payload: string(msg.Payload()),
|
||||
})
|
||||
w.bus.Publish("mysubarumq:4S4BTGPD0P3199198:ignition", messages)
|
||||
}
|
||||
}
|
||||
|
||||
w.mqttHandlers.connected = func(client mqtt.Client) {
|
||||
w.log.Debug("mqtt client is connected")
|
||||
}
|
||||
|
||||
w.mqttHandlers.disconnected = func(client mqtt.Client, err error) {
|
||||
w.log.Debug("mqtt lost connection", "error", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OneTime .
|
||||
func (w *MQTTClient) OneTime() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Serve starts waiting for new TCP connections, and calls the establish
|
||||
// connection callback for any received.
|
||||
func (w *MQTTClient) Serve() {
|
||||
if atomic.LoadUint32(&w.end) == 1 {
|
||||
return
|
||||
}
|
||||
|
||||
chMQTTSubscribeCommand, err := w.bus.Subscribe("mqtt:subscribe", "command", w.config.SubscriptionSize["mqtt:subscribe"])
|
||||
if err != nil {
|
||||
w.log.Error("couldn't subscribe to a channel", "channel", "mqtt:subscribe", "error", err.Error())
|
||||
}
|
||||
|
||||
chMQTTPublishStatus, err := w.bus.Subscribe("mqtt:publish", "status", w.config.SubscriptionSize["mqtt:publish"])
|
||||
if err != nil {
|
||||
w.log.Error("couldn't subscribe to a channel", "channel", "mqtt:subscribe", "error", err.Error())
|
||||
}
|
||||
|
||||
// ctx is used only by tests.
|
||||
// ctx, ctxCancel := context.WithCancel(context.Background())
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
w.cancel = cancel
|
||||
|
||||
go w.eventLoop(ctx, chMQTTPublishStatus, chMQTTSubscribeCommand)
|
||||
|
||||
if atomic.LoadUint32(&w.end) == 0 {
|
||||
go func() {
|
||||
if !w.mqtt.IsConnected() {
|
||||
w.bus.Publish("app:connected", &bus.ConnectionStatus{WorkerID: w.ID(), WorkerType: w.Type(), IsConnected: false})
|
||||
w.log.Warn("mqtt client is disconnected")
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the listener and any client connections.
|
||||
func (w *MQTTClient) Close() {
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
|
||||
if atomic.CompareAndSwapUint32(&w.end, 0, 1) {
|
||||
w.cancel()
|
||||
w.bus.Unsubscribe("mqtt:publish", "status")
|
||||
w.mqtt.Publish(w.config.MQTT.Topic+"/status", 0, true, ServiceUnavailable)
|
||||
w.mqtt.Disconnect(250)
|
||||
w.log.Info("disconnected from mqtt server")
|
||||
}
|
||||
}
|
||||
|
||||
// eventLoop loops forever
|
||||
func (w *MQTTClient) eventLoop(ctx context.Context, chMQTTPublishStatus, chMQTTSubscribeCommand chan bus.Event) {
|
||||
w.log.Debug("mqtt communication event loop started")
|
||||
defer w.log.Debug("mqtt communication event loop halted")
|
||||
|
||||
for {
|
||||
select {
|
||||
case event := <-chMQTTPublishStatus:
|
||||
for _, message := range event.Data.([]*bus.Message) {
|
||||
w.log.Debug("publishing mqtt message", "topic", message.Topic, "qos", message.QOS, "retained", message.Retained, "payload", message.Payload)
|
||||
w.mqtt.Publish(message.Topic, message.QOS, message.Retained, message.Payload)
|
||||
}
|
||||
case event := <-chMQTTSubscribeCommand:
|
||||
for _, message := range event.Data.([]*bus.Message) {
|
||||
w.log.Debug("subscribing to a topic", "topic", message.Topic, "qos", message.QOS)
|
||||
// w.mqtt.SubscribeMultiple()
|
||||
w.mqtt.Subscribe(message.Topic, message.QOS, w.mqttHandlers.publish)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
w.log.Info("stopping mqtt communication event loop")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user