From b61c4b59ec26d2ad0d41225a6fc9e9a48771a957 Mon Sep 17 00:00:00 2001 From: Alex Savin Date: Wed, 21 May 2025 13:51:10 -0400 Subject: [PATCH] first commit --- .github/workflows/CI.yml | 144 +++++++++ .gitignore | 53 ++++ Dockerfile | 15 + Makefile | 65 +++++ README.md | 3 + app/app.go | 219 ++++++++++++++ app/hooks.go | 220 ++++++++++++++ bus/bus.go | 118 ++++++++ bus/bus_test.go | 499 ++++++++++++++++++++++++++++++++ bus/messages.go | 18 ++ config.sample.yaml | 43 +++ config/config.go | 119 ++++++++ go.mod | 52 ++++ hooks/debug/debug.go | 71 +++++ hooks/registry/consul/consul.go | 218 ++++++++++++++ listeners/http_stats.go | 123 ++++++++ listeners/listeners.go | 126 ++++++++ listeners/mock.go | 102 +++++++ main.go | 133 +++++++++ system/system.go | 31 ++ workers/mqtt.go | 207 +++++++++++++ workers/mysubaru.go | 389 +++++++++++++++++++++++++ workers/workers.go | 129 +++++++++ 23 files changed, 3097 insertions(+) create mode 100644 .github/workflows/CI.yml create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 Makefile create mode 100644 README.md create mode 100644 app/app.go create mode 100644 app/hooks.go create mode 100644 bus/bus.go create mode 100644 bus/bus_test.go create mode 100644 bus/messages.go create mode 100644 config.sample.yaml create mode 100644 config/config.go create mode 100644 go.mod create mode 100644 hooks/debug/debug.go create mode 100644 hooks/registry/consul/consul.go create mode 100644 listeners/http_stats.go create mode 100644 listeners/listeners.go create mode 100644 listeners/mock.go create mode 100644 main.go create mode 100644 system/system.go create mode 100644 workers/mqtt.go create mode 100644 workers/mysubaru.go create mode 100644 workers/workers.go diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml new file mode 100644 index 0000000..5aa8f34 --- /dev/null +++ b/.github/workflows/CI.yml @@ -0,0 +1,144 @@ +name: Build and Push Docker Image + +on: + push: + branches: + - develop + paths-ignore: + - 'README.md' + pull_request: + branches: + - develop + paths-ignore: + - 'README.md' + workflow_dispatch: + branches: + - develop + paths-ignore: + - 'README.md' + +jobs: + + testing: + runs-on: ubuntu-latest + strategy: + matrix: + go-version: + - 1.24.x + os: + - ubuntu-latest + steps: + - uses: actions/setup-go@v5 + with: + go-version: ${{ matrix.go-version }} + - uses: actions/checkout@v4 + - run: | + go get -u . + go test ./. + + build-and-push: + runs-on: ubuntu-latest + needs: [ testing ] + steps: + + - name: Extract Version + id: version_step + run: | + echo REPO_NAME=$(echo ${GITHUB_REPOSITORY} | awk -F"/" '{print $2}') >> $GITHUB_OUTPUT + echo REPO_VERSION=${GITHUB_REF_NAME#v} >> $GITHUB_OUTPUT + echo RELEASE_DATE=$(date --rfc-3339=date) >> ${GITHUB_ENV} + echo COMMIT_SHA_SHORT=$(echo ${{ github.sha }} | cut -c1-10) >> $GITHUB_OUTPUT + + - name: Dump GitHub context + env: + GITHUB_CONTEXT: ${{ toJson(github) }} + run: echo "$GITHUB_CONTEXT" + + - name: Dump job context + env: + JOB_CONTEXT: ${{ toJson(job) }} + run: echo "$JOB_CONTEXT" + + - name: Dump steps context + env: + STEPS_CONTEXT: ${{ toJson(steps) }} + run: echo "$STEPS_CONTEXT" + # echo "##[set-output name=version;]VERSION=${GITHUB_REF#$"refs/tags/v"}" + # echo "##[set-output name=version_tag;]$GITHUB_REPOSITORY:${GITHUB_REF#$"refs/tags/v"}" + # echo "##[set-output name=latest_tag;]$GITHUB_REPOSITORY:latest" + # - name: Print Version + # run: | + # echo ${{steps.version_step.outputs.version_tag}} + # echo ${{steps.version_step.outputs.latest_tag}} + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker BuildX + uses: docker/setup-buildx-action@v3 + with: + platforms: | + linux/amd64 + + - name: Login to DockerHub + uses: docker/login-action@v3 + with: + registry: docker.savin.nyc + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + # - name: PrepareReg Names + # id: read-docker-image-identifiers + # run: | + # echo VERSION_TAG=$(echo ${{ steps.version_step.outputs.version_tag }} | tr '[:upper:]' '[:lower:]') >> $GITHUB_ENV + # echo LASTEST_TAG=$(echo ${{ steps.version_step.outputs.latest_tag }} | tr '[:upper:]' '[:lower:]') >> $GITHUB_ENV + + - name: Build and Push + id: docker_build + uses: docker/build-push-action@v6 + with: + # context: . + # file: ./Dockerfile + platforms: | + linux/amd64 + push: true + tags: | + docker.savin.nyc/${{ github.event.repository.name }}:nightly + docker.savin.nyc/${{ github.event.repository.name }}:${{ steps.version_step.outputs.COMMIT_SHA_SHORT }} +# ${{ env.DOCKER_ORG }}/${{ steps.meta.outputs.REPO_NAME }}:${{ env.DOCKER_LATEST }} +# latest +# build-args: | +# docker.savin.nyc/${{ github.event.repository.name }}:latest +# build-args: | +# ${{steps.version_step.outputs.version}} +# docker.savin.nyc/${{ github.event.repository.name }}:${{ env.RELEASE_VERSION }} +# tags: | +# ${{env.VERSION_TAG}} +# ${{env.LASTEST_TAG}} + +# - name: Auto Execute a Workflow +# run: | +# echo "URL: http://10.10.11.26:8090/api/v1/repos/${{ github.event.repository.owner.login }}/${{ github.event.repository.name }}/actions/workflows/CD.yml/dispatches" +# curl -X 'POST' \ +# 'http://10.10.11.26:8090/api/v1/repos/${{ github.event.repository.owner.login }}/${{ github.event.repository.name }}/actions/workflows/CD.yml/dispatches' \ +# -H 'Accept: application/json' \ +# -H 'Content-Type: application/json' \ +# -H 'Authorization: Basic ${{ secrets.AUTH_BASIC }}' \ +# -H 'Authorization: token ${{ secrets.GHA_TOKEN }}' \ +# -d '{"ref":"${{ github.ref }}","inputs":{"version":"${{ steps.version_step.outputs.COMMIT_SHA_SHORT }}","env":"${{ github.ref_name}}"}}' +# shell: bash +# with: +# debug: true + +# - name: Auto Deploy (Dev) +## if: ${{ contains(github.ref, 'develop') and env.AUTO_DEPLOY_DEV == 'true' }} +# id: auto-deploy-dev +# uses: https://git.savin.nyc/gh-actions/auto-exec-workflow@v1 +# with: +# GITEA_TOKEN: ${{ secrets.GHA_TOKEN }} +# Version: ${{ steps.version.output.updated-version }} + + - name: Output Summary + id: output-summary + shell: bash + run: | + echo "CI pipeline has been compiled for ${{ github.repository }} with a new version ${{ steps.vars.outputs.COMMIT_SHORT_SHA }}" >> $GITHUB_STEP_SUMMARY diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ec9c431 --- /dev/null +++ b/.gitignore @@ -0,0 +1,53 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so +*.json +*.code-workspace + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gobus.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof + +coverage.out +coverage.txt + +.future + +mysubarumq + +.config.yaml +config.json +config.yaml +config.yml + +.options.json +options.json +options.yaml +options.yml + +.credentials.yaml + +*.go_ + +.vscode + +go.sum + +.backup diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..825eca8 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,15 @@ +FROM golang:1.24.3-alpine AS builder + +WORKDIR /app +COPY ./ /app + +RUN apk add git && \ + go build -v -o mysubarumq . + +FROM alpine:3.21.3 + +COPY --from=builder /app/mysubarumq /usr/local/bin/mysubarumq + +RUN apk add --no-cache tzdata + +ENTRYPOINT ["mysubarumq"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..4ef5664 --- /dev/null +++ b/Makefile @@ -0,0 +1,65 @@ +VERSION := $(shell git describe --tags) +BUILD := $(shell git rev-parse --short HEAD) +PROJECTNAME := $(shell basename "$(PWD)") +BINARY_NAME := "mysubarumq" + +# Go related variables. +GOOS := darwin +GOBASE := $(shell pwd) +GOBIN := $(GOBASE)/bin +GOFILES := $(wildcard *.go) + +# Redirect error output to a file, so we can show it in development mode. +STDERR := /tmp/.$(PROJECTNAME)-stderr.txt + +.PHONY: help + +all: help + +help: + @echo "" + @echo " Choose a command run in "$(PROJECTNAME)":" + @echo "" + @sed -n 's/^##//p' $< | column -t -s ':' | sed -e 's/^/ /' + @echo "" + +## archive: Archive binary to a .zip file +archive: + @echo " > Archiving a file..." + @zip $(PROJECTNAME).zip $(PROJECTNAME) + +## install: Install missing dependencies. Runs `go get` internally. e.g; make install get=github.com/foo/bar +install: go-get + +## compile: Compile the binary. +compile: + @-touch $(STDERR) + @-rm $(STDERR) + @-$(MAKE) -s go-compile 2> $(STDERR) + @cat $(STDERR) | sed -e '1s/.*/\nError:\n/' | sed 's/make\[.*/ /' | sed "/^/s/^/ /" 1>&2 + +go-compile: go-get go-test go-build + +go-build: + @echo " > Building binary..." + @GOPATH=$(GOPATH) GOBIN=$(GOBIN) GOOS=$(GOOS) go build $(LDFLAGS) -mod=mod -o ${BINARY_NAME} $(GOFILES) + GOARCH=amd64 GOOS=darwin go build -o ${BINARY_NAME}-darwin . + GOARCH=amd64 GOOS=linux go build -o ${BINARY_NAME}-linux main.go + GOARCH=amd64 GOOS=window go build -o ${BINARY_NAME}-windows main.go + +go-get: + @echo " > Checking if there is any missing dependencies..." + @GOPATH=$(GOPATH) GOBIN=$(GOBIN) go get -u $(get) + +go-test: + @GOPATH=$(GOPATH) GOBIN=$(GOBIN) go test ./... + +go-test_coverage: + @GOPATH=$(GOPATH) GOBIN=$(GOBIN) go test ./... -coverprofile=coverage.out + +go-clean: + @echo " > Cleaning build cache" + @GOPATH=$(GOPATH) GOBIN=$(GOBIN) go clean + rm ${BINARY_NAME}-darwin + rm ${BINARY_NAME}-linux + rm ${BINARY_NAME}-windows diff --git a/README.md b/README.md new file mode 100644 index 0000000..ef104b8 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# My Subaru Integration (MQTT) + +## About diff --git a/app/app.go b/app/app.go new file mode 100644 index 0000000..75f5345 --- /dev/null +++ b/app/app.go @@ -0,0 +1,219 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2023 mysubarumq +// SPDX-FileContributor: alex-savin + +package app + +import ( + "context" + "errors" + "log/slog" + "runtime" + "sync/atomic" + "time" + + "git.savin.nyc/alex/mysubaru-mq/bus" + "git.savin.nyc/alex/mysubaru-mq/config" + "git.savin.nyc/alex/mysubaru-mq/listeners" + "git.savin.nyc/alex/mysubaru-mq/system" + "git.savin.nyc/alex/mysubaru-mq/workers" +) + +const ( + AppVersion = "1.0.1" // the current application version. + AppName = "mysubarumq" // the short app name +) + +var ( + ErrWorkerIDExists = errors.New("worker id already exists") + ErrListenerIDExists = errors.New("listener id already exists") +) + +type App struct { + Options *config.Config // configurable server options + Workers *workers.Workers // worker are network interfaces which listen for new connections + Listeners *listeners.Listeners // listeners are network interfaces which listen for new connections + Info *system.Info // values about the server commonly known as $SYS topics + Log *slog.Logger // minimal no-alloc logger + bus *bus.Bus // + hooks *Hooks // hooks contains hooks for extra functionality such as auth and persistent storage + loop *loop // loop contains tickers for the system event loop + cancel context.CancelFunc // + // done chan bool // indicate that the server is ending +} + +// loop contains interval tickers for the system events loop. +type loop struct { + sysInfo *time.Ticker // interval ticker for sending updating $SYS topics +} +type Info struct { + AppVersion string `json:"version"` // the current version of the server + Started int64 `json:"started"` // the time the server started in unix seconds + MemoryAlloc int64 `json:"memory_alloc"` // memory currently allocated + Threads int64 `json:"threads"` // number of active goroutines, named as threads for platform ambiguity + Time int64 `json:"time"` // current time on the server + Uptime int64 `json:"uptime"` // the number of seconds the server has been online +} + +// EstablishConnection establishes a new client when a listener accepts a new connection. +// func (a *App) EstablishConnection(listener string, c net.Conn) error { +// cl := a.NewClient(c, listener, "", false) +// return a.attachClient(cl, listener) +// } + +// New returns a new instance of mochi mqtt broker. Optional parameters +// can be specified to override some default settings (see Options). +func New(opts *config.Config, bus *bus.Bus, logger *slog.Logger) *App { + if opts == nil { + slog.Error("empty config") + } + + ctx, cancel := context.WithCancel(context.Background()) + + var m runtime.MemStats + runtime.ReadMemStats(&m) + a := &App{ + Options: opts, + Workers: workers.New(), + Listeners: listeners.New(), + Info: &system.Info{ + Version: AppVersion, + Started: time.Now().Unix(), + MemoryAlloc: int64(m.HeapInuse), + Threads: int64(runtime.NumGoroutine()), + // Time: time.Now().Unix(), + // Uptime: time.Now().Unix() - atomic.LoadInt64(&a.Started), + }, + Log: logger, + bus: bus, + hooks: &Hooks{ + Log: logger, + }, + loop: &loop{ + sysInfo: time.NewTicker(time.Second * time.Duration(1)), + }, + cancel: cancel, + // done: make(chan bool), + } + + chAppWorker, err := a.bus.Subscribe("app:connected", "mysubarumq", a.Options.SubscriptionSize["app:connected"]) + if err != nil { + a.Log.Error("couldn't subscribe to a channel", "error", err.Error()) + } + + go a.eventLoop(ctx, chAppWorker) + + return a +} + +// AddHook attaches a new Hook to the server. Ideally, this should be called +// before the server is started with s.Serve(). +func (a *App) AddHook(hook Hook, config any) error { + nl := a.Log.With("hook", hook.ID()) + hook.SetOpts(nl) + + a.Log.Info("added hook", "hook", hook.ID()) + return a.hooks.Add(hook, config) +} + +// AddListener adds a new network listener to the server, for receiving incoming client connections. +func (a *App) AddWorker(w workers.Worker) error { + if _, ok := a.Workers.Get(w.ID()); ok { + return ErrWorkerIDExists + } + + nl := a.Log.With(slog.String("worker", w.ID())) + err := w.Init(nl) + if err != nil { + return err + } + + a.Workers.Add(w) + + a.Log.Info("attached worker", "id", w.ID(), "type", w.Type()) + return nil +} + +// AddListener adds a new network listener to the server, for receiving incoming client connections. +func (a *App) AddListener(l listeners.Listener) error { + if _, ok := a.Listeners.Get(l.ID()); ok { + return ErrListenerIDExists + } + + nl := a.Log.With(slog.String("listener", l.ID())) + err := l.Init(nl) + if err != nil { + return err + } + + a.Listeners.Add(l) + + a.Log.Info("attached listener", "id", l.ID(), "protocol", l.Protocol(), "address", l.Address()) + return nil +} + +// Serve starts the event loops responsible for establishing client connections +// on all attached listeners, publishing the system topics, and starting all hooks. +func (a *App) Serve() error { + a.Log.Info("mysubarumq starting", "version", AppVersion) + defer a.Log.Info("mysubarumq started") + + a.Workers.ServeAll() // start listening on all workers. + a.Listeners.ServeAll() // start listening on all listeners. + a.publishSysTopics() + a.hooks.OnStarted() + + return nil +} + +// Close attempts to gracefully shut down the server, all listeners, clients, and stores. +func (a *App) Close() error { + // close(a.done) + a.cancel() + a.Log.Info("gracefully stopping mysubarumq") + a.Workers.CloseAll() + a.Listeners.CloseAll() + a.hooks.OnStopped() + a.hooks.Stop() + + a.Log.Info("mysubarumq stopped") + return nil +} + +// eventLoop loops forever +func (a *App) eventLoop(ctx context.Context, chAppWorker chan bus.Event) { + a.Log.Info("app communication event loop started") + defer a.Log.Info("app communication event loop halted") + + for { + select { + case <-a.loop.sysInfo.C: + a.publishSysTopics() + case event := <-chAppWorker: + message := event.Data.(*bus.ConnectionStatus) + a.Log.Info("got a message from worker", "worker", message.WorkerID, "type", message.WorkerType, "isconnected", message.IsConnected) + if !message.IsConnected { + a.hooks.OnWorkerDisconnected() + a.Workers.Close(message.WorkerID) + } + a.hooks.OnWorkerConnected() + case <-ctx.Done(): + a.Log.Info("stopping app communication event loop") + return + } + } +} + +// publishSysTopics publishes the current values to the server $SYS topics. +// Due to the int to string conversions this method is not as cheap as +// some of the others so the publishing interval should be set appropriately. +func (a *App) publishSysTopics() { + var m runtime.MemStats + runtime.ReadMemStats(&m) + atomic.StoreInt64(&a.Info.MemoryAlloc, int64(m.HeapInuse)) + atomic.StoreInt64(&a.Info.Threads, int64(runtime.NumGoroutine())) + atomic.StoreInt64(&a.Info.Time, time.Now().Unix()) + atomic.StoreInt64(&a.Info.Uptime, time.Now().Unix()-atomic.LoadInt64(&a.Info.Started)) + + a.hooks.OnSysInfoTick(a.Info) +} diff --git a/app/hooks.go b/app/hooks.go new file mode 100644 index 0000000..948ccb2 --- /dev/null +++ b/app/hooks.go @@ -0,0 +1,220 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2023 mysubarumq +// SPDX-FileContributor: alex-savin + +package app + +import ( + "errors" + "fmt" + "log/slog" + "sync" + "sync/atomic" + + "git.savin.nyc/alex/mysubaru-mq/system" +) + +const ( + SetOptions byte = iota + OnSysInfoTick + OnStarted + OnStopped + OnWorkerConnected + OnWorkerDisconnected + // OnSysInfoTick +) + +var ( + // ErrInvalidConfigType indicates a different Type of config value was expected to what was received. + ErrInvalidConfigType = errors.New("invalid config type provided") +) + +// Hook provides an interface of handlers for different events which occur +// during the lifecycle of the broker. +type Hook interface { + ID() string + Provides(b byte) bool + Init(config any) error + Stop() error + SetOpts(l *slog.Logger) + OnSysInfoTick(*system.Info) + OnStarted() + OnStopped() + OnWorkerConnected() + OnWorkerDisconnected() + // OnSysInfoTick(*system.Info) +} + +// Hooks is a slice of Hook interfaces to be called in sequence. +type Hooks struct { + Log *slog.Logger // a logger for the hook (from the server) + internal atomic.Value // a slice of []Hook + wg sync.WaitGroup // a waitgroup for syncing hook shutdown + qty int64 // the number of hooks in use + sync.Mutex // a mutex for locking when adding hooks +} + +// Len returns the number of hooks added. +func (h *Hooks) Len() int64 { + return atomic.LoadInt64(&h.qty) +} + +// Provides returns true if any one hook provides any of the requested hook methods. +func (h *Hooks) Provides(b ...byte) bool { + for _, hook := range h.GetAll() { + for _, hb := range b { + if hook.Provides(hb) { + return true + } + } + } + + return false +} + +// Add adds and initializes a new hook. +func (h *Hooks) Add(hook Hook, config any) error { + h.Lock() + defer h.Unlock() + + err := hook.Init(config) + if err != nil { + return fmt.Errorf("failed initialising %s hook: %w", hook.ID(), err) + } + + i, ok := h.internal.Load().([]Hook) + if !ok { + i = []Hook{} + } + + i = append(i, hook) + h.internal.Store(i) + atomic.AddInt64(&h.qty, 1) + h.wg.Add(1) + + return nil +} + +// GetAll returns a slice of all the hooks. +func (h *Hooks) GetAll() []Hook { + i, ok := h.internal.Load().([]Hook) + if !ok { + return []Hook{} + } + + return i +} + +// Stop indicates all attached hooks to gracefully end. +func (h *Hooks) Stop() { + go func() { + for _, hook := range h.GetAll() { + h.Log.Info("stopping hook", "hook", hook.ID()) + if err := hook.Stop(); err != nil { + h.Log.Debug("problem stopping hook", "error", err, "hook", hook.ID()) + } + + h.wg.Done() + } + }() + + h.wg.Wait() +} + +// OnSysInfoTick is called when the $SYS topic values are published out. +func (h *Hooks) OnSysInfoTick(sys *system.Info) { + for _, hook := range h.GetAll() { + if hook.Provides(OnSysInfoTick) { + hook.OnSysInfoTick(sys) + } + } +} + +// OnStarted is called when the server has successfully started. +func (h *Hooks) OnStarted() { + for _, hook := range h.GetAll() { + if hook.Provides(OnStarted) { + hook.OnStarted() + } + } +} + +// OnStopped is called when the server has successfully stopped. +func (h *Hooks) OnStopped() { + for _, hook := range h.GetAll() { + if hook.Provides(OnStopped) { + hook.OnStopped() + } + } +} + +// OnWorkerConnected is called when the worker has successfully connected. +func (h *Hooks) OnWorkerConnected() { + for _, hook := range h.GetAll() { + if hook.Provides(OnWorkerConnected) { + hook.OnWorkerConnected() + } + } +} + +// OnWorkerDisconnected is called when the worker has disconnected. +func (h *Hooks) OnWorkerDisconnected() { + for _, hook := range h.GetAll() { + if hook.Provides(OnWorkerDisconnected) { + hook.OnWorkerDisconnected() + } + } +} + +// HookBase provides a set of default methods for each hook. It should be embedded in +// all hooks. +type HookBase struct { + Hook + Log *slog.Logger +} + +// ID returns the ID of the hook. +func (h *HookBase) ID() string { + return "base" +} + +// Provides indicates which methods a hook provides. The default is none - this method +// should be overridden by the embedding hook. +func (h *HookBase) Provides(b byte) bool { + return false +} + +// Init performs any pre-start initializations for the hook, such as connecting to databases +// or opening files. +func (h *HookBase) Init(config any) error { + return nil +} + +// SetOpts is called by the server to propagate internal values and generally should +// not be called manually. +func (h *HookBase) SetOpts(l *slog.Logger) { + h.Log = l +} + +// Stop is called to gracefully shut down the hook. +func (h *HookBase) Stop() error { + return nil +} + +// OnSysInfoTick is called when the server publishes system info. +func (h *HookBase) OnSysInfoTick(*system.Info) {} + +// OnStarted is called when the server starts. +func (h *HookBase) OnStarted() {} + +// OnStopped is called when the server stops. +func (h *HookBase) OnStopped() {} + +// OnWorkerConnected is called when the worker connected. +func (h *HookBase) OnWorkerConnected() {} + +// OnWorkerDisconnected is called when the worker disconnected. +func (h *HookBase) OnWorkerDisconnected() {} + +// // OnSysInfoTick is called when the server publishes system info. +// func (h *HookBase) OnSysInfoTick(*system.Info) {} diff --git a/bus/bus.go b/bus/bus.go new file mode 100644 index 0000000..bce5abe --- /dev/null +++ b/bus/bus.go @@ -0,0 +1,118 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2023 mysubarumq +// SPDX-FileContributor: alex-savin + +package bus + +import ( + "fmt" + "sync" + + "log/slog" +) + +// Event is used to transport user data over bus. +type Event struct { + Data interface{} + ChannelName string +} + +// eventChannel stores subscriptions for channels. +type eventChannel struct { + subscribers map[string]chan Event +} + +// newEventChannel creates new eventChannel. +func newEventChannel() *eventChannel { + return &eventChannel{ + subscribers: make(map[string]chan Event), + } +} + +// addSubscriber adds new subscriber. +func (ch *eventChannel) addSubscriber(subName string, size int) error { + if _, ok := ch.subscribers[subName]; ok { + return fmt.Errorf("subscriber %s already exists", subName) + } + + ch.subscribers[subName] = make(chan Event, size) + + return nil +} + +// delSubscriber removes subscriber. +func (ch *eventChannel) delSubscriber(subName string) { + delete(ch.subscribers, subName) +} + +// Bus should be created by New(). +type Bus struct { + mu sync.Mutex + channels map[string]*eventChannel +} + +// New creates new Bus instance. +func New() *Bus { + b := &Bus{ + channels: make(map[string]*eventChannel), + } + + return b +} + +// Subscribe adds new subscriber to channel. +func (b *Bus) Subscribe(channelName, subName string, size int) (chan Event, error) { + b.mu.Lock() + defer b.mu.Unlock() + + if _, ok := b.channels[channelName]; !ok { + b.channels[channelName] = newEventChannel() + } + + ch := b.channels[channelName] + + if err := ch.addSubscriber(subName, size); err != nil { + return nil, err + } + + return ch.subscribers[subName], nil + +} + +// Unsubscribe removes subscriber from channel. +// If this is last subscriber channel will be removed. +func (b *Bus) Unsubscribe(channelName, subName string) { + b.mu.Lock() + defer b.mu.Unlock() + + channel, ok := b.channels[channelName] + if !ok { + return + } + channel.delSubscriber(subName) + + if len(channel.subscribers) == 0 { + delete(b.channels, channelName) + } +} + +// Publish data to channel. +func (b *Bus) Publish(channelName string, data interface{}) error { + channel, ok := b.channels[channelName] + if !ok { + return fmt.Errorf("channel %s don't exists", channelName) + } + e := Event{ + ChannelName: channelName, + Data: data, + } + + for subName, subscriber := range channel.subscribers { + if cap(subscriber) > 0 && len(subscriber) >= cap(subscriber) { + slog.Info("channel %s for subscriber %s is full, not publishing new messages", channelName, subName) + continue + } + subscriber <- e + } + return nil +} diff --git a/bus/bus_test.go b/bus/bus_test.go new file mode 100644 index 0000000..31e0dcb --- /dev/null +++ b/bus/bus_test.go @@ -0,0 +1,499 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2023 mysubarumq +// SPDX-FileContributor: alex-savin + +package bus + +import ( + "errors" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestNewEventChannel(t *testing.T) { + ch := newEventChannel() + require.Equal(t, 0, len(ch.subscribers)) +} + +func TestAddSubscriber(t *testing.T) { + tests := []struct { + inputEventChannel *eventChannel + inputSubscriptionName string + inputSubscriptionSize int + expectedError error + expectedSubscribers int + expectedSubscriptionSize int + }{ + { + inputEventChannel: &eventChannel{ + subscribers: make(map[string]chan Event), + }, + inputSubscriptionName: "test", + inputSubscriptionSize: 1024, + expectedSubscribers: 1, + expectedSubscriptionSize: 1024, + }, + { + inputEventChannel: &eventChannel{ + subscribers: map[string]chan Event{ + "existing": make(chan Event), + }, + }, + inputSubscriptionName: "test", + inputSubscriptionSize: 1024, + expectedSubscribers: 2, + expectedSubscriptionSize: 1024, + }, + { + inputEventChannel: &eventChannel{ + subscribers: map[string]chan Event{ + "existing": make(chan Event), + }, + }, + inputSubscriptionName: "test", + inputSubscriptionSize: 0, + expectedSubscribers: 2, + expectedSubscriptionSize: 0, + }, + { + inputEventChannel: &eventChannel{ + subscribers: map[string]chan Event{ + "test": make(chan Event), + }, + }, + inputSubscriptionName: "test", + inputSubscriptionSize: 1024, + expectedSubscribers: 1, + expectedSubscriptionSize: 0, + expectedError: errors.New("subscriber test already exists"), + }, + } + + for _, test := range tests { + err := test.inputEventChannel.addSubscriber(test.inputSubscriptionName, test.inputSubscriptionSize) + require.Equal(t, test.expectedError, err) + require.Equal(t, test.expectedSubscribers, len(test.inputEventChannel.subscribers)) + require.Equal(t, test.expectedSubscriptionSize, cap(test.inputEventChannel.subscribers[test.inputSubscriptionName])) + } +} + +func TestDelSubscriber(t *testing.T) { + tests := []struct { + inputEventChannel *eventChannel + inputSubscriptionName string + expectedSubscribers int + expectedSubscriptionSize int + }{ + { + inputEventChannel: &eventChannel{ + subscribers: make(map[string]chan Event), + }, + inputSubscriptionName: "test", + expectedSubscribers: 0, + }, + { + inputEventChannel: &eventChannel{ + subscribers: map[string]chan Event{ + "existing": make(chan Event), + }, + }, + inputSubscriptionName: "test", + expectedSubscribers: 1, + }, + { + inputEventChannel: &eventChannel{ + subscribers: map[string]chan Event{ + "test": make(chan Event), + }, + }, + inputSubscriptionName: "test", + expectedSubscribers: 0, + }, + } + for _, test := range tests { + test.inputEventChannel.delSubscriber(test.inputSubscriptionName) + require.Equal(t, test.expectedSubscribers, len(test.inputEventChannel.subscribers)) + } + +} + +func TestNew(t *testing.T) { + b := New() + require.Equal(t, 0, len(b.channels)) +} + +func TestSubscribe(t *testing.T) { + tests := []struct { + inputBus *Bus + inputChannelName string + inputSubscriptionName string + inputSubscriptionSize int + expectedError error + expectedChannels int + expectedSubscribers int + expectedSubscriptionSize int + }{ + { + inputBus: New(), + inputChannelName: "test", + inputSubscriptionName: "test", + inputSubscriptionSize: 1024, + expectedChannels: 1, + expectedSubscribers: 1, + expectedSubscriptionSize: 1024, + }, + { + inputBus: &Bus{ + channels: map[string]*eventChannel{ + "existing": newEventChannel(), + }, + }, + inputChannelName: "test", + inputSubscriptionName: "test", + inputSubscriptionSize: 1024, + expectedChannels: 2, + expectedSubscribers: 1, + expectedSubscriptionSize: 1024, + }, + { + inputBus: &Bus{ + channels: map[string]*eventChannel{ + "test": { + subscribers: map[string]chan Event{ + "existing": make(chan Event), + }, + }, + }, + }, + inputChannelName: "test", + inputSubscriptionName: "test", + inputSubscriptionSize: 1024, + expectedChannels: 1, + expectedSubscribers: 2, + expectedSubscriptionSize: 1024, + }, + { + inputBus: &Bus{ + channels: map[string]*eventChannel{ + "test": { + subscribers: map[string]chan Event{ + "test": make(chan Event), + }, + }, + }, + }, + inputChannelName: "test", + inputSubscriptionName: "test", + inputSubscriptionSize: 1024, + expectedChannels: 1, + expectedSubscribers: 1, + expectedSubscriptionSize: 0, + expectedError: errors.New("subscriber test already exists"), + }, + } + for _, test := range tests { + ch, err := test.inputBus.Subscribe(test.inputChannelName, test.inputSubscriptionName, test.inputSubscriptionSize) + require.Equal(t, test.expectedError, err) + require.Equal(t, test.expectedChannels, len(test.inputBus.channels)) + require.Equal(t, test.expectedSubscribers, len(test.inputBus.channels[test.inputChannelName].subscribers)) + require.Equal(t, test.expectedSubscriptionSize, cap(ch)) + } +} + +func TestUnsubscribe(t *testing.T) { + tests := []struct { + inputBus *Bus + inputChannelName string + inputSubscriptionName string + expectedChannels int + expectedSubscribers int + }{ + { + inputBus: New(), + inputChannelName: "test", + inputSubscriptionName: "test", + expectedChannels: 0, + expectedSubscribers: 0, + }, + { + inputBus: &Bus{ + channels: map[string]*eventChannel{ + "existing": newEventChannel(), + }, + }, + inputChannelName: "test", + inputSubscriptionName: "test", + expectedChannels: 1, + expectedSubscribers: 0, + }, + { + inputBus: &Bus{ + channels: map[string]*eventChannel{ + "test": { + subscribers: map[string]chan Event{ + "existing": make(chan Event), + }, + }, + }, + }, + inputChannelName: "test", + inputSubscriptionName: "test", + expectedChannels: 1, + expectedSubscribers: 1, + }, + { + inputBus: &Bus{ + channels: map[string]*eventChannel{ + "test": { + subscribers: map[string]chan Event{ + "test": make(chan Event), + }, + }, + }, + }, + inputChannelName: "test", + inputSubscriptionName: "test", + expectedChannels: 0, + expectedSubscribers: 0, + }, + { + inputBus: &Bus{ + channels: map[string]*eventChannel{ + "test": { + subscribers: map[string]chan Event{ + "existing": make(chan Event), + "test": make(chan Event), + }, + }, + }, + }, + inputChannelName: "test", + inputSubscriptionName: "test", + expectedChannels: 1, + expectedSubscribers: 1, + }, + } + for _, test := range tests { + test.inputBus.Unsubscribe(test.inputChannelName, test.inputSubscriptionName) + require.Equal(t, test.expectedChannels, len(test.inputBus.channels)) + if test.expectedSubscribers > 0 { + require.Equal(t, test.expectedSubscribers, len(test.inputBus.channels[test.inputChannelName].subscribers)) + } + } +} + +func TestPublish(t *testing.T) { + tests := []struct { + inputSubscribers func(*Bus, chan map[string]int) + inputPublish func(*Bus) + expectedOutput map[string]int + }{ + { + inputSubscribers: func(b *Bus, output chan map[string]int) { + s1, err := b.Subscribe("test", "s1", 1024) + require.Nil(t, err) + go func() { + r := make(map[string]int) + for { + select { + case e := <-s1: + require.Equal(t, "test", e.ChannelName) + r["s1"]++ + case <-time.After(5 * time.Millisecond): + output <- r + return + } + } + }() + }, + inputPublish: func(b *Bus) { + err := b.Publish("test", "test") + require.Nil(t, err) + }, + expectedOutput: map[string]int{"s1": 1}, + }, + { + inputSubscribers: func(b *Bus, output chan map[string]int) { + s1, err := b.Subscribe("test", "s1", 1024) + require.Nil(t, err) + go func() { + r := make(map[string]int) + for { + select { + case e := <-s1: + require.Equal(t, "test", e.ChannelName) + r["s1"]++ + case <-time.After(5 * time.Millisecond): + output <- r + return + } + } + }() + }, + inputPublish: func(b *Bus) { + err := b.Publish("test", "test") + require.Nil(t, err) + err = b.Publish("test", "test") + require.Nil(t, err) + }, + expectedOutput: map[string]int{"s1": 2}, + }, + { + inputSubscribers: func(b *Bus, output chan map[string]int) { + s1, err := b.Subscribe("test", "s1", 1024) + require.Nil(t, err) + s2, err := b.Subscribe("test", "s2", 1024) + require.Nil(t, err) + go func() { + r := make(map[string]int) + for { + select { + case e := <-s1: + require.Equal(t, "test", e.ChannelName) + r["s1"]++ + case e := <-s2: + require.Equal(t, "test", e.ChannelName) + r["s2"]++ + case <-time.After(5 * time.Millisecond): + output <- r + return + } + } + }() + }, + inputPublish: func(b *Bus) { + err := b.Publish("test", "test") + require.Nil(t, err) + err = b.Publish("test", "test") + require.Nil(t, err) + }, + expectedOutput: map[string]int{"s1": 2, "s2": 2}, + }, + { + inputSubscribers: func(b *Bus, output chan map[string]int) { + s1, err := b.Subscribe("test", "s1", 1024) + require.Nil(t, err) + s2, err := b.Subscribe("test2", "s2", 1024) + require.Nil(t, err) + go func() { + r := make(map[string]int) + for { + select { + case e := <-s1: + require.Equal(t, "test", e.ChannelName) + r["s1"]++ + case e := <-s2: + require.Equal(t, "test2", e.ChannelName) + r["s2"]++ + case <-time.After(5 * time.Millisecond): + output <- r + return + } + } + }() + }, + inputPublish: func(b *Bus) { + err := b.Publish("test", "test") + require.Nil(t, err) + err = b.Publish("test", "test") + require.Nil(t, err) + }, + expectedOutput: map[string]int{"s1": 2}, + }, + { + inputSubscribers: func(b *Bus, output chan map[string]int) { + s1, err := b.Subscribe("test", "s1", 1024) + require.Nil(t, err) + s2, err := b.Subscribe("test", "s2", 1) + require.Nil(t, err) + go func() { + r := make(map[string]int) + e := <-s2 + require.Equal(t, "test", e.ChannelName) + r["s2"]++ + for { + select { + case e := <-s1: + require.Equal(t, "test", e.ChannelName) + r["s1"]++ + case <-time.After(5 * time.Millisecond): + output <- r + return + } + } + }() + }, + inputPublish: func(b *Bus) { + err := b.Publish("test", "test") + require.Nil(t, err) + err = b.Publish("test", "test") + require.Nil(t, err) + }, + expectedOutput: map[string]int{"s1": 2, "s2": 1}, + }, + { + inputSubscribers: func(b *Bus, output chan map[string]int) { + ch, err := b.Subscribe("test", "s1", 1024) + require.Nil(t, err) + go func() { + var mu sync.Mutex + r := make(map[string]int) + var wg sync.WaitGroup + wg.Add(3) + w := func(r map[string]int, n string) { + e := <-ch + require.Equal(t, "test", e.ChannelName) + mu.Lock() + defer mu.Unlock() + r[n]++ + wg.Done() + } + go w(r, "s1") + go w(r, "s2") + go w(r, "s3") + wg.Wait() + output <- r + }() + }, + inputPublish: func(b *Bus) { + go func(b *Bus) { + err := b.Publish("test", "test") + require.Nil(t, err) + err = b.Publish("test", "test") + require.Nil(t, err) + err = b.Publish("test", "test") + require.Nil(t, err) + }(b) + }, + expectedOutput: map[string]int{"s1": 1, "s2": 1, "s3": 1}, + }, + { + inputSubscribers: func(b *Bus, output chan map[string]int) { + go func() { + r := make(map[string]int) + output <- r + }() + }, + inputPublish: func(b *Bus) { + err := b.Publish("test", "test") + require.Equal(t, errors.New("channel test don't exists"), err) + }, + expectedOutput: map[string]int{}, + }, + } + + for _, test := range tests { + b := New() + ch := make(chan map[string]int) + test.inputSubscribers(b, ch) + test.inputPublish(b) + + received := <-ch + + require.Equal(t, test.expectedOutput, received) + + } +} diff --git a/bus/messages.go b/bus/messages.go new file mode 100644 index 0000000..7ce4a11 --- /dev/null +++ b/bus/messages.go @@ -0,0 +1,18 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2023 mysubarumq +// SPDX-FileContributor: alex-savin + +package bus + +type Message struct { + Topic string + QOS byte + Retained bool + Payload interface{} +} + +type ConnectionStatus struct { + WorkerID string + WorkerType string + IsConnected bool +} diff --git a/config.sample.yaml b/config.sample.yaml new file mode 100644 index 0000000..6d09127 --- /dev/null +++ b/config.sample.yaml @@ -0,0 +1,43 @@ +--- +mqtt: + host: 127.0.0.1 + port: 1883 + username: username + password: password + client_id: mysubarumq + retained: true +mysubaru: + credentials: + username: user@name.com + password: password + pin: "1234" + device_id: MY-SUBARU-DEVICE-ID + device_name: Hassio Golang Integration + region: USA +hassio: + auto_discovery: true + topics: + discovery: "homeassistant" + status: "homeassistant/status" +listeners: + stats: + port: 8889 +consul: + host: consul.savin.nyc + port: 443 + datacenter: dc1 + token: CONSUL-TOKEN-HERE + interface: + lan: "" + wan: "" +subscription_size: + "mqtt:publish": 1024 + "mqtt:subscribe": 1024 + "app:connected": 1024 + "hassio:status": 1024 +timezone: America/New_York +logging: + level: INFO + output: JSON + source: false +share_deidentified_data: true diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..0b26fc2 --- /dev/null +++ b/config/config.go @@ -0,0 +1,119 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2023 mysubarumq +// SPDX-FileContributor: alex-savin + +package config + +import ( + "log/slog" + "os" + "strings" + + ms "git.savin.nyc/alex/mysubaru/config" + "github.com/spf13/viper" +) + +// Config . +type Config struct { + MQTT MQTT `json:"mqtt" yaml:"mqtt"` + MySubaru ms.MySubaru `json:"mysubaru" yaml:"mysubaru"` + Hassio Hassio `json:"hassio" yaml:"hassio"` + Listeners map[string]*Listener `json:"listeners" yaml:"listeners"` + Consul *Consul `json:"consul,omitempty" yaml:"consul,omitempty"` + SubscriptionSize map[string]int `json:"subscription_size" yaml:"subscription_size"` + Timezone string `json:"timezone" yaml:"timezone"` + Logging *Logging `json:"logging" yaml:"logging"` + ShareDeidentifiedData bool `json:"share_deidentified_data,omitempty" yaml:"share_deidentified_data,omitempty"` +} + +// MQTT . +type MQTT struct { + Host string `json:"host" yaml:"host"` + Port int `json:"port" yaml:"port"` + Username string `json:"username" yaml:"username"` + Password string `json:"password" yaml:"password"` + ClientId string `json:"client_id,omitempty" yaml:"client_id,omitempty"` + Retained bool `json:"retained" yaml:"retained"` + Topic string `json:"topic" yaml:"topic"` +} + +type Hassio struct { + AutoDiscovery bool `json:"auto_discovery" yaml:"auto_discovery"` + Topics struct { + Discovery string `json:"discovery" yaml:"discovery"` + Status string `json:"status" yaml:"status"` + } `json:"topics" yaml:"topics"` +} + +// Listeners . +type Listener struct { + ID string + Port int `json:"port" yaml:"port"` + Cert string `json:"cert,omitempty" yaml:"cert,omitempty"` + PKey string `json:"pkey,omitempty" yaml:"pkey,omitempty"` +} + +// Consul . +type Consul struct { + Host string `json:"host" yaml:"host"` + Port int `json:"port,omitempty" yaml:"port,omitempty"` + DataCenter string `json:"datacenter,omitempty" yaml:"datacenter,omitempty"` + Token string `json:"token,omitempty" yaml:"token,omitempty"` + Interfaces Interface `json:"interfaces,omitempty" yaml:"interfaces,omitempty"` +} + +type Interface struct { + WAN string `json:"wan,omitempty" yaml:"wan,omitempty"` + LAN string `json:"lan,omitempty" yaml:"lan,omitempty"` +} + +// Logging . +type Logging struct { + Level string `json:"level" yaml:"level"` + Output string `json:"output" yaml:"output"` + Source bool `json:"source,omitempty" yaml:"source,omitempty"` +} + +// New . +func New() (*Config, error) { + viper.SetConfigName("config") // name of config file (without extension) + viper.SetConfigType("yaml") // REQUIRED if the config file does not have the extension in the name + viper.AddConfigPath(".") // optionally look for config in the working directory + viper.AddConfigPath("/etc/mysubarumq") // optionally look for config in the working directory + + viper.AutomaticEnv() + + viper.SetDefault("MQTT.ClientId", "mysubarumq") + viper.SetDefault("MQTT.Retained", true) + viper.SetDefault("MQTT.Topic", "mysubarumq") + viper.SetDefault("Timezone", "America/New_York") + viper.SetDefault("MySubaru.AutoReconnect", true) + viper.SetDefault("MySubaru.Region", "USA") + viper.SetDefault("Hassio.AutoDiscovery", true) + viper.SetDefault("Logging.Level", "INFO") + viper.SetDefault("Logging.Output", "TEXT") + viper.SetDefault("Logging.Source", "false") + + err := viper.ReadInConfig() // Find and read the config file + if err != nil { // Handle errors reading the config file + slog.Error("cannot open config file", "error", err.Error()) + os.Exit(1) + } + + // viper.WatchConfig() + // viper.OnConfigChange(func(e fsnotify.Event) { + // log.Println("Config file changed:", e.Name) + // }) + + var config Config + if err := viper.Unmarshal(&config); err != nil { + slog.Error("cannot unmarshal config file", "error", err.Error()) + os.Exit(1) + } + + config.MQTT.Topic = strings.TrimSuffix(config.MQTT.Topic, "/") + + slog.Debug("CONFIG", "config", config) + + return &config, err +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..6a4f172 --- /dev/null +++ b/go.mod @@ -0,0 +1,52 @@ +module git.savin.nyc/alex/mysubaru-mq + +go 1.24 + +replace github.com/armon/go-metrics => github.com/hashicorp/go-metrics v0.5.3 + +require ( + git.savin.nyc/alex/mysubaru v0.0.0-20250521052308-a803671a3ce2 + github.com/eclipse/paho.mqtt.golang v1.5.0 + github.com/hashicorp/consul/api v1.32.1 + github.com/spf13/viper v1.20.1 +) + +require ( + github.com/Jeffail/gabs/v2 v2.7.0 // indirect + github.com/armon/go-metrics v0.5.4 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/fatih/color v1.18.0 // indirect + github.com/fsnotify/fsnotify v1.9.0 // indirect + github.com/go-resty/resty/v2 v2.16.5 // indirect + github.com/go-viper/mapstructure/v2 v2.2.1 // indirect + github.com/gorilla/websocket v1.5.3 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-cleanhttp v0.5.2 // indirect + github.com/hashicorp/go-hclog v1.6.3 // indirect + github.com/hashicorp/go-immutable-radix v1.3.1 // indirect + github.com/hashicorp/go-metrics v0.5.4 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-rootcerts v1.0.2 // indirect + github.com/hashicorp/golang-lru v1.0.2 // indirect + github.com/hashicorp/serf v0.10.2 // indirect + github.com/mattn/go-colorable v0.1.14 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/pelletier/go-toml/v2 v2.2.4 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/sagikazarmark/locafero v0.9.0 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect + github.com/spf13/afero v1.14.0 // indirect + github.com/spf13/cast v1.8.0 // indirect + github.com/spf13/pflag v1.0.6 // indirect + github.com/stretchr/testify v1.10.0 // indirect + github.com/subosito/gotenv v1.6.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 // indirect + golang.org/x/net v0.40.0 // indirect + golang.org/x/sync v0.14.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.25.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/hooks/debug/debug.go b/hooks/debug/debug.go new file mode 100644 index 0000000..39762cd --- /dev/null +++ b/hooks/debug/debug.go @@ -0,0 +1,71 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2023 mysubarumq +// SPDX-FileContributor: alex-savin + +package debug + +import ( + "log/slog" + + app "git.savin.nyc/alex/mysubaru-mq/app" +) + +// Options contains configuration settings for the debug output. +type Options struct { + // ShowPacketData bool // include decoded packet data (default false) + // ShowPings bool // show ping requests and responses (default false) + // ShowPasswords bool // show connecting user passwords (default false) +} + +// Hook is a debugging hook which logs additional low-level information from the server. +type Hook struct { + app.HookBase + Log *slog.Logger +} + +// ID returns the ID of the hook. +func (h *Hook) ID() string { + return "debug" +} + +// Provides indicates that this hook provides all methods. +func (h *Hook) Provides(b byte) bool { + return true +} + +// Init is called when the hook is initialized. +func (h *Hook) Init(config any) error { + if _, ok := config.(*Options); !ok && config != nil { + return app.ErrInvalidConfigType + } + + if config == nil { + config = new(Options) + } + + // h.config = config.(*Options) + + return nil +} + +// SetOpts is called when the hook receives inheritable server parameters. +func (h *Hook) SetOpts(l *slog.Logger) { + h.Log = l + h.Log.Debug("", "method", "SetOpts") +} + +// Stop is called when the hook is stopped. +func (h *Hook) Stop() error { + h.Log.Debug("", "method", "Stop") + return nil +} + +// OnStarted is called when the server starts. +func (h *Hook) OnStarted() { + h.Log.Debug("", "method", "OnStarted") +} + +// OnStopped is called when the server stops. +func (h *Hook) OnStopped() { + h.Log.Debug("", "method", "OnStopped") +} diff --git a/hooks/registry/consul/consul.go b/hooks/registry/consul/consul.go new file mode 100644 index 0000000..ea58288 --- /dev/null +++ b/hooks/registry/consul/consul.go @@ -0,0 +1,218 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2023 mysubarumq +// SPDX-FileContributor: alex-savin + +package consul + +import ( + "bytes" + "errors" + "fmt" + "net" + "os" + + app "git.savin.nyc/alex/mysubaru-mq/app" + "github.com/hashicorp/consul/api" +) + +// Options contains the configuration/rules data for the auth ledger. +type Options struct { + Hostname string + Port int // Default port 8500 + DC string + ServiceID string + NodeName string + Tags []string + LocalHost string + LocalPort int +} + +// AllowHook is an authentication hook which allows connection access +// for all users and read and write access to all topics. +type Hook struct { + app.HookBase + config *Options + consul *consul +} + +type consul struct { + client *api.Client + registration *api.AgentServiceRegistration + lock *api.Lock +} + +// ID returns the ID of the hook. +func (h *Hook) ID() string { + return "consul" +} + +// Provides indicates which hook methods this hook provides. +func (h *Hook) Provides(b byte) bool { + return bytes.Contains([]byte{ + app.OnStarted, + app.OnStopped, + }, []byte{b}) +} + +func (h *Hook) Init(config any) error { + if _, ok := config.(*Options); !ok && config != nil { + return app.ErrInvalidConfigType + } + + if config == nil { + config = new(Options) + } + + h.config = config.(*Options) + h.consul = &consul{} + + conf := api.Config{ + Address: fmt.Sprintf("%s:%d", h.config.Hostname, h.config.Port), + Scheme: "https", + Datacenter: h.config.DC, + } + client, err := api.NewClient(&conf) + if err != nil { + return err + } + h.consul.client = client + + address := getHostname() + ip, _ := externalIP() + h.Log.Debug("network information", "ip", ip, "address", address) + + service := &api.AgentServiceRegistration{ + ID: h.config.ServiceID, // ServiceID + Name: h.config.NodeName, // Server Name + Address: h.config.LocalHost, + Port: h.config.LocalPort, + Tags: h.config.Tags, + Checks: []*api.AgentServiceCheck{ + // If fabio doesn't exit cleanly, it doesn't auto-deregister itself + // from Consul. In order to address this, we introduce a TTL check + // to confirm that the fabio instance is alive and able to route + // this service. + // + // The TTL check must be refreshed before its timeout is crossed, + // otherwise the check fails. If the check fails, Consul considers + // this service to have become unhealthy. If the check is failing + // (critical) for the DeregisterCriticalServiceAfter duration, the + // Consul reaper will remove it from Consul. + // + // For more info, read https://www.consul.io/api/agent/check.html#deregistercriticalserviceafter. + // { + // CheckID: computeServiceTTLCheckId(serviceID), + // TTL: "10s", + // DeregisterCriticalServiceAfter: "10s", + // }, + // HTTP check is meant to confirm fabio health endpoint is + // reachable from the Consul agent. If the check fails, Consul + // considers this service to have become unhealthy. + { + HTTP: fmt.Sprintf("http://%s:%d/healthcheck", h.config.LocalHost, h.config.LocalPort), + Interval: "10s", + Timeout: "30s", + }, + }, + } + + h.consul.registration = service + + return nil +} + +// OnStarted returns . +func (h *Hook) OnStarted() { + + err := h.consul.client.Agent().ServiceRegister(h.consul.registration) + if err != nil { + h.Log.Error("failed to register service at consul") + } + + path := "services/" + h.config.NodeName + "/members/" + h.config.ServiceID + p := &api.KVPair{ + Key: path, + Value: []byte("1000"), + Flags: api.LockFlagValue, + } + _, err = h.consul.client.KV().Put(p, nil) + if err != nil { + h.Log.Error("cannot put the consul kv", "error", err) + } + + h.consul.client.LockOpts(&api.LockOptions{ + Key: path, + }) + + h.consul.lock, err = h.consul.client.LockKey(path) + if err != nil { + h.Log.Error("cannot lock the consul kv", "error", err) + } + _, err = h.consul.lock.Lock(nil) + if err != nil { + h.Log.Error("cannot acquire the consul lock", "error", err) + } +} + +// OnStopped . +func (h *Hook) OnStopped() { + err := h.consul.client.Agent().ServiceDeregister(h.consul.registration.ID) + if err != nil { + h.Log.Error("failed to deregister service at consul") + } + + err = h.consul.lock.Unlock() + if err != nil { + h.Log.Error("cannot unlock the consul kv", "error", err) + } + + path := "services/" + h.config.NodeName + "/members/" + h.config.ServiceID + _, err = h.consul.client.KV().Delete(path, nil) + if err != nil { + h.Log.Error("cannot delete the consul kv", "error", err) + } +} + +func getHostname() (hostname string) { + hostname, _ = os.Hostname() + return +} + +func externalIP() (string, error) { + ifaces, err := net.Interfaces() + if err != nil { + return "", err + } + + for _, iface := range ifaces { + if iface.Flags&net.FlagUp == 0 { + continue // interface down + } + if iface.Flags&net.FlagLoopback != 0 { + continue // loopback interface + } + addrs, err := iface.Addrs() + if err != nil { + return "", err + } + + for _, addr := range addrs { + var ip net.IP + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + } + if ip == nil || ip.IsLoopback() { + continue + } + ip = ip.To4() + if ip == nil { + continue // not an ipv4 address + } + return ip.String(), nil + } + } + return "", errors.New("are you connected to the network?") +} diff --git a/listeners/http_stats.go b/listeners/http_stats.go new file mode 100644 index 0000000..6ba2183 --- /dev/null +++ b/listeners/http_stats.go @@ -0,0 +1,123 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2023 mysubarumq +// SPDX-FileContributor: alex-savin + +package listeners + +import ( + "context" + "encoding/json" + "io" + "log/slog" + "net/http" + "sync" + "sync/atomic" + "time" + + "git.savin.nyc/alex/mysubaru-mq/system" +) + +// HTTPStats is a listener for presenting the server $SYS stats on a JSON http endpoint. +type HTTPStats struct { + sync.RWMutex + id string // the internal id of the listener + address string // the network address to bind to + config *Config // configuration values for the listener + listen *http.Server // the http server + sysInfo *system.Info // pointers to the server data + log *slog.Logger // server logger + end uint32 // ensure the close methods are only called once +} + +// NewHTTPStats initialises and returns a new HTTP listener, listening on an address. +func NewHTTPStats(id, address string, config *Config, sysInfo *system.Info) *HTTPStats { + if config == nil { + config = new(Config) + } + return &HTTPStats{ + id: id, + address: address, + sysInfo: sysInfo, + config: config, + } +} + +// ID returns the id of the listener. +func (l *HTTPStats) ID() string { + return l.id +} + +// Address returns the address of the listener. +func (l *HTTPStats) Address() string { + return l.address +} + +// Protocol returns the address of the listener. +func (l *HTTPStats) Protocol() string { + if l.listen != nil && l.listen.TLSConfig != nil { + return "https" + } + + return "http" +} + +// Init initializes the listener. +func (l *HTTPStats) Init(log *slog.Logger) error { + l.log = log + mux := http.NewServeMux() + mux.HandleFunc("/", l.jsonHandler) + l.listen = &http.Server{ + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, + Addr: l.address, + Handler: mux, + } + + if l.config.TLSConfig != nil { + l.listen.TLSConfig = l.config.TLSConfig + } + + return nil +} + +// Serve starts listening for new connections and serving responses. +func (l *HTTPStats) Serve() { + + var err error + if l.listen.TLSConfig != nil { + err = l.listen.ListenAndServeTLS("", "") + } else { + err = l.listen.ListenAndServe() + } + + // After the listener has been shutdown, no need to print the http.ErrServerClosed error. + if err != nil && atomic.LoadUint32(&l.end) == 0 { + l.log.Error("failed to serve.", "error", err, "listener", l.id) + } +} + +// Close closes the listener and any client connections. +func (l *HTTPStats) Close() { + l.Lock() + defer l.Unlock() + + if atomic.CompareAndSwapUint32(&l.end, 0, 1) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = l.listen.Shutdown(ctx) + } + + // closeClients(l.id) +} + +// jsonHandler is an HTTP handler which outputs the $SYS stats as JSON. +func (l *HTTPStats) jsonHandler(w http.ResponseWriter, req *http.Request) { + info := *l.sysInfo.Clone() + + out, err := json.MarshalIndent(info, "", "\t") + if err != nil { + _, _ = io.WriteString(w, err.Error()) + } + + _, _ = w.Write(out) +} diff --git a/listeners/listeners.go b/listeners/listeners.go new file mode 100644 index 0000000..28d44fd --- /dev/null +++ b/listeners/listeners.go @@ -0,0 +1,126 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2023 mysubarumq +// SPDX-FileContributor: alex-savin + +package listeners + +import ( + "crypto/tls" + "sync" + + "log/slog" +) + +// Config contains configuration values for a listener. +type Config struct { + // TLSConfig is a tls.Config configuration to be used with the listener. + // See examples folder for basic and mutual-tls use. + TLSConfig *tls.Config +} + +// Listener is an interface for network listeners. A network listener listens +// for incoming client connections and adds them to the server. +type Listener interface { + Init(*slog.Logger) error // open the network address + Serve() // starting actively listening for new connections + ID() string // return the id of the listener + Address() string // the address of the listener + Protocol() string // the protocol in use by the listener + Close() // stop and close the listener +} + +// Listeners contains the network listeners for the broker. +type Listeners struct { + ClientsWg sync.WaitGroup // a waitgroup that waits for all clients in all listeners to finish. + internal map[string]Listener // a map of active listeners. + sync.RWMutex +} + +// New returns a new instance of Listeners. +func New() *Listeners { + return &Listeners{ + internal: map[string]Listener{}, + } +} + +// Add adds a new listener to the listeners map, keyed on id. +func (l *Listeners) Add(val Listener) { + l.Lock() + defer l.Unlock() + l.internal[val.ID()] = val +} + +// Get returns the value of a listener if it exists. +func (l *Listeners) Get(id string) (Listener, bool) { + l.RLock() + defer l.RUnlock() + val, ok := l.internal[id] + return val, ok +} + +// Len returns the length of the listeners map. +func (l *Listeners) Len() int { + l.RLock() + defer l.RUnlock() + return len(l.internal) +} + +// Delete removes a listener from the internal map. +func (l *Listeners) Delete(id string) { + l.Lock() + defer l.Unlock() + delete(l.internal, id) +} + +// Serve starts a listener serving from the internal map. +func (l *Listeners) Serve(id string) { + l.RLock() + defer l.RUnlock() + listener := l.internal[id] + + go func() { + listener.Serve() + }() +} + +// ServeAll starts all listeners serving from the internal map. +func (l *Listeners) ServeAll() { + l.RLock() + i := 0 + ids := make([]string, len(l.internal)) + for id := range l.internal { + ids[i] = id + i++ + } + l.RUnlock() + + for _, id := range ids { + l.Serve(id) + } +} + +// Close stops a listener from the internal map. +func (l *Listeners) Close(id string) { + l.RLock() + defer l.RUnlock() + if listener, ok := l.internal[id]; ok { + listener.Close() + } +} + +// CloseAll iterates and closes all registered listeners. +func (l *Listeners) CloseAll() { + l.RLock() + i := 0 + ids := make([]string, len(l.internal)) + for id := range l.internal { + ids[i] = id + i++ + } + l.RUnlock() + + for _, id := range ids { + l.Close(id) + } + l.ClientsWg.Wait() +} diff --git a/listeners/mock.go b/listeners/mock.go new file mode 100644 index 0000000..5459533 --- /dev/null +++ b/listeners/mock.go @@ -0,0 +1,102 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2023 mysubarumq +// SPDX-FileContributor: alex-savin + +package listeners + +import ( + "fmt" + "net" + "sync" + + "log/slog" +) + +// MockEstablisher is a function signature which can be used in testing. +func MockEstablisher(id string, c net.Conn) error { + return nil +} + +// MockCloser is a function signature which can be used in testing. +func MockCloser(id string) {} + +// MockListener is a mock listener for establishing client connections. +type MockListener struct { + sync.RWMutex + id string // the id of the listener + address string // the network address the listener binds to + Config *Config // configuration for the listener + done chan bool // indicate the listener is done + Serving bool // indicate the listener is serving + Listening bool // indiciate the listener is listening + ErrListen bool // throw an error on listen +} + +// NewMockListener returns a new instance of MockListener. +func NewMockListener(id, address string) *MockListener { + return &MockListener{ + id: id, + address: address, + done: make(chan bool), + } +} + +// Serve serves the mock listener. +func (l *MockListener) Serve() { + l.Lock() + l.Serving = true + l.Unlock() + + for range l.done { + return + } +} + +// Init initializes the listener. +func (l *MockListener) Init(log *slog.Logger) error { + if l.ErrListen { + return fmt.Errorf("listen failure") + } + + l.Lock() + defer l.Unlock() + l.Listening = true + return nil +} + +// ID returns the id of the mock listener. +func (l *MockListener) ID() string { + return l.id +} + +// Address returns the address of the listener. +func (l *MockListener) Address() string { + return l.address +} + +// Protocol returns the address of the listener. +func (l *MockListener) Protocol() string { + return "mock" +} + +// Close closes the mock listener. +func (l *MockListener) Close() { + l.Lock() + defer l.Unlock() + l.Serving = false + close(l.done) +} + +// IsServing indicates whether the mock listener is serving. +func (l *MockListener) IsServing() bool { + l.Lock() + defer l.Unlock() + return l.Serving +} + +// IsListening indicates whether the mock listener is listening. +func (l *MockListener) IsListening() bool { + l.Lock() + defer l.Unlock() + return l.Listening +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..9a1da52 --- /dev/null +++ b/main.go @@ -0,0 +1,133 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2023 mysubarumq +// SPDX-FileContributor: alex-savin + +package main + +import ( + "fmt" + "log/slog" + "os" + "os/signal" + "strconv" + "syscall" + + "git.savin.nyc/alex/mysubaru-mq/app" + "git.savin.nyc/alex/mysubaru-mq/bus" + "git.savin.nyc/alex/mysubaru-mq/config" + "git.savin.nyc/alex/mysubaru-mq/hooks/debug" + "git.savin.nyc/alex/mysubaru-mq/listeners" + "git.savin.nyc/alex/mysubaru-mq/workers" +) + +// var log = *slog.Logger +var cfg = &config.Config{} + +const ( + LoggingOutputJson = "JSON" + LoggingOutputText = "TEXT" +) + +func configureLogging(config *config.Logging) *slog.Logger { //nolint:unparam + if config == nil { + return nil + } + + var level slog.Level + if err := level.UnmarshalText([]byte(config.Level)); err != nil { + slog.Warn(err.Error()) + slog.Warn(fmt.Sprintf("logging level not recognized, defaulting to level %s", slog.LevelInfo.String())) + level = slog.LevelInfo + } + + var handler slog.Handler + switch config.Output { + case LoggingOutputJson: + handler = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{AddSource: cfg.Logging.Source, Level: level}) + case LoggingOutputText: + handler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{AddSource: cfg.Logging.Source, Level: level}) + default: + handler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{AddSource: cfg.Logging.Source, Level: level}) + } + + return slog.New(handler) +} + +func main() { + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigs + done <- true + }() + + var err error + cfg, err = config.New() + if err != nil { // Handle errors reading the config file + slog.Error("Fatal error config file", "error", err.Error()) + os.Exit(1) + } + + logger := configureLogging(cfg.Logging) + + evBus := bus.New() + + app := app.New(cfg, evBus, logger) + // app.Log = logger + + mqtt := workers.NewMQTTClient("mqtt", evBus, cfg) + err = app.AddWorker(mqtt) + if err != nil { + app.Log.Error(err.Error()) + os.Exit(1) + } + + mysubaru := workers.NewMySubaruClient("mysubaru", evBus, cfg) + err = app.AddWorker(mysubaru) + if err != nil { + app.Log.Error(err.Error()) + os.Exit(1) + } + + if len(cfg.Listeners) > 0 { + for name, opts := range cfg.Listeners { + listener := listeners.NewHTTPStats(name, ":"+strconv.Itoa(opts.Port), nil, app.Info) + err = app.AddListener(listener) + if err != nil { + app.Log.Error(err.Error()) + } + } + } + + err = app.AddHook(new(debug.Hook), nil) + if err != nil { + app.Log.Error(err.Error()) + } + + // err = app.AddHook(new(consul.Hook), &consul.Options{ + // Hostname: cfg.Consul.Host, + // Port: cfg.Consul.Port, + // DC: cfg.Consul.DataCenter, + // ServiceID: "srv-mysubaru-01", + // NodeName: "mysubaru", + // Tags: []string{"mysubaru", "hassio", "mqtt"}, + // LocalHost: cfg.Consul.Interfaces.WAN, + // LocalPort: cfg.Listeners["stats"].Port, + // }) + // if err != nil { + // app.Log.Error(err.Error()) + // } + + go func() { + err := app.Serve() + if err != nil { + app.Log.Error(err.Error()) + } + }() + + <-done + app.Log.Warn("caught signal, stopping...") + _ = app.Close() + app.Log.Info("main.go finished") +} diff --git a/system/system.go b/system/system.go new file mode 100644 index 0000000..7e9cf6d --- /dev/null +++ b/system/system.go @@ -0,0 +1,31 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2023 mysubarumq +// SPDX-FileContributor: alex-savin + +package system + +import "sync/atomic" + +// Info contains atomic counters and values for various server statistics +type Info struct { + App string `json:"app"` + Version string `json:"version"` // the current version of the server + Started int64 `json:"started"` // the time the server started in unix seconds + Time int64 `json:"time"` // current time on the server + Uptime int64 `json:"uptime"` // the number of seconds the server has been online + MemoryAlloc int64 `json:"memory_alloc"` // memory currently allocated + Threads int64 `json:"threads"` // number of active goroutines, named as threads for platform ambiguity +} + +// Clone makes a copy of Info using atomic operation +func (i *Info) Clone() *Info { + return &Info{ + App: "mysubarumq", + Version: i.Version, + Started: atomic.LoadInt64(&i.Started), + Time: atomic.LoadInt64(&i.Time), + Uptime: atomic.LoadInt64(&i.Uptime), + MemoryAlloc: atomic.LoadInt64(&i.MemoryAlloc), + Threads: atomic.LoadInt64(&i.Threads), + } +} diff --git a/workers/mqtt.go b/workers/mqtt.go new file mode 100644 index 0000000..bde7e62 --- /dev/null +++ b/workers/mqtt.go @@ -0,0 +1,207 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2023 mysubarumq +// SPDX-FileContributor: alex-savin + +package workers + +import ( + "context" + "fmt" + "log/slog" + "sync" + "sync/atomic" + "time" + + "git.savin.nyc/alex/mysubaru-mq/bus" + "git.savin.nyc/alex/mysubaru-mq/config" + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +const ( + ServiceAvailable = `{"value":"online"}` + ServiceUnavailable = `{"value":"offline"}` +) + +// UPSClient is a client that connects to the APCUPSd server by establishing tcp connection +type MQTTClient struct { + sync.RWMutex + id string // the internal id of the listener + bus *bus.Bus // the internal bus for the interapp communication + config *config.Config // configuration values for the listener + mqtt mqtt.Client // + cancel context.CancelFunc // + end uint32 // ensure the close methods are only called once + log *slog.Logger // server logger + mqttHandlers mqttHandlers // +} + +type mqttHandlers struct { + publish mqtt.MessageHandler // + connected mqtt.OnConnectHandler // + disconnected mqtt.ConnectionLostHandler // +} + +// NewUPSClient initialises and returns a UPS client +func NewMQTTClient(id string, bus *bus.Bus, config *config.Config) *MQTTClient { + if config == nil { + slog.Error("") + } + + return &MQTTClient{ + id: id, + bus: bus, + config: config, + } +} + +// ID returns the id of the listener. +func (w *MQTTClient) ID() string { + return w.id +} + +// ID returns the id of the listener. +func (w *MQTTClient) Type() string { + return "mqtt-client" +} + +// Init . +func (w *MQTTClient) Init(log *slog.Logger) error { + w.log = log + + opts := mqtt.NewClientOptions() + opts.AddBroker(fmt.Sprintf("tcp://%s:%d", w.config.MQTT.Host, w.config.MQTT.Port)) + opts.SetUsername(w.config.MQTT.Username) + opts.SetPassword(w.config.MQTT.Password) + opts.SetClientID(w.config.MQTT.ClientId) + opts.SetWill(w.config.MQTT.Topic+"/status", ServiceUnavailable, 0, true) + opts.OnConnect = func(m mqtt.Client) { + w.log.Debug("mqtt client is connected to a mqtt server") + m.Publish(w.config.MQTT.Topic+"/status", 0, w.config.MQTT.Retained, ServiceAvailable) + } + + w.mqtt = mqtt.NewClient(opts) + token := w.mqtt.Connect() + for !token.WaitTimeout(1 * time.Second) { + } + if err := token.Error(); err != nil { + w.log.Error("couldn't connect to a mqtt server", "error", err.Error()) + + return err + } + + w.mqttHandlers.publish = func(client mqtt.Client, msg mqtt.Message) { + var messages []*bus.Message + + w.log.Debug("received mqtt message", "topic", msg.Topic(), "payload", msg.Payload()) + switch msg.Topic() { + case w.config.Hassio.Topics.Status: + // Subscribing for the topic to resend auto discovery topics after Home Assistant restarted and got a status "online" + messages = append(messages, &bus.Message{ + Topic: msg.Topic(), + Payload: string(msg.Payload()), + }) + w.bus.Publish("hassio:status", messages) + case "mysubarumq/4S4BTGPD0P3199198/lock/set": + messages = append(messages, &bus.Message{ + Topic: msg.Topic(), + Payload: string(msg.Payload()), + }) + w.bus.Publish("mysubarumq:4S4BTGPD0P3199198:lock", messages) + case "mysubarumq/4S4BTGPD0P3199198/ignition/set": + messages = append(messages, &bus.Message{ + Topic: msg.Topic(), + Payload: string(msg.Payload()), + }) + w.bus.Publish("mysubarumq:4S4BTGPD0P3199198:ignition", messages) + } + } + + w.mqttHandlers.connected = func(client mqtt.Client) { + w.log.Debug("mqtt client is connected") + } + + w.mqttHandlers.disconnected = func(client mqtt.Client, err error) { + w.log.Debug("mqtt lost connection", "error", err) + } + + return nil +} + +// OneTime . +func (w *MQTTClient) OneTime() error { + return nil +} + +// Serve starts waiting for new TCP connections, and calls the establish +// connection callback for any received. +func (w *MQTTClient) Serve() { + if atomic.LoadUint32(&w.end) == 1 { + return + } + + chMQTTSubscribeCommand, err := w.bus.Subscribe("mqtt:subscribe", "command", w.config.SubscriptionSize["mqtt:subscribe"]) + if err != nil { + w.log.Error("couldn't subscribe to a channel", "channel", "mqtt:subscribe", "error", err.Error()) + } + + chMQTTPublishStatus, err := w.bus.Subscribe("mqtt:publish", "status", w.config.SubscriptionSize["mqtt:publish"]) + if err != nil { + w.log.Error("couldn't subscribe to a channel", "channel", "mqtt:subscribe", "error", err.Error()) + } + + // ctx is used only by tests. + // ctx, ctxCancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + + w.cancel = cancel + + go w.eventLoop(ctx, chMQTTPublishStatus, chMQTTSubscribeCommand) + + if atomic.LoadUint32(&w.end) == 0 { + go func() { + if !w.mqtt.IsConnected() { + w.bus.Publish("app:connected", &bus.ConnectionStatus{WorkerID: w.ID(), WorkerType: w.Type(), IsConnected: false}) + w.log.Warn("mqtt client is disconnected") + } + }() + } +} + +// Close closes the listener and any client connections. +func (w *MQTTClient) Close() { + w.Lock() + defer w.Unlock() + + if atomic.CompareAndSwapUint32(&w.end, 0, 1) { + w.cancel() + w.bus.Unsubscribe("mqtt:publish", "status") + w.mqtt.Publish(w.config.MQTT.Topic+"/status", 0, true, ServiceUnavailable) + w.mqtt.Disconnect(250) + w.log.Info("disconnected from mqtt server") + } +} + +// eventLoop loops forever +func (w *MQTTClient) eventLoop(ctx context.Context, chMQTTPublishStatus, chMQTTSubscribeCommand chan bus.Event) { + w.log.Debug("mqtt communication event loop started") + defer w.log.Debug("mqtt communication event loop halted") + + for { + select { + case event := <-chMQTTPublishStatus: + for _, message := range event.Data.([]*bus.Message) { + w.log.Debug("publishing mqtt message", "topic", message.Topic, "qos", message.QOS, "retained", message.Retained, "payload", message.Payload) + w.mqtt.Publish(message.Topic, message.QOS, message.Retained, message.Payload) + } + case event := <-chMQTTSubscribeCommand: + for _, message := range event.Data.([]*bus.Message) { + w.log.Debug("subscribing to a topic", "topic", message.Topic, "qos", message.QOS) + // w.mqtt.SubscribeMultiple() + w.mqtt.Subscribe(message.Topic, message.QOS, w.mqttHandlers.publish) + } + case <-ctx.Done(): + w.log.Info("stopping mqtt communication event loop") + return + } + } +} diff --git a/workers/mysubaru.go b/workers/mysubaru.go new file mode 100644 index 0000000..7353d65 --- /dev/null +++ b/workers/mysubaru.go @@ -0,0 +1,389 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2023 mysubarumq +// SPDX-FileContributor: alex-savin + +package workers + +import ( + "context" + "fmt" + "log/slog" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "git.savin.nyc/alex/mysubaru" + "git.savin.nyc/alex/mysubaru-mq/bus" + "git.savin.nyc/alex/mysubaru-mq/config" +) + +// MySubaruClient is a client that connects to the MySubaru server by establishing tcp connection +type MySubaruClient struct { + sync.RWMutex + id string // the internal id of the listener + bus *bus.Bus // the internal bus for the interapp communication + config *config.Config // configuration values for the listener + mysubaru *mysubaru.Client // + cancel context.CancelFunc // + end uint32 // ensure the close methods are only called once + log *slog.Logger // server logger +} + +// NewMySubaruClient initialises and returns a MySubaru client +func NewMySubaruClient(id string, bus *bus.Bus, config *config.Config) *MySubaruClient { + if config == nil { + slog.Error("") + } + + s := &MySubaruClient{ + id: id, + bus: bus, + config: config, + } + + return s +} + +// ID returns the id of the listener. +func (s *MySubaruClient) ID() string { + return s.id +} + +// ID returns the id of the listener. +func (s *MySubaruClient) Type() string { + return "mysubaru-client" +} + +// Init . +func (s *MySubaruClient) Init(log *slog.Logger) error { + s.log = log + + mys, err := mysubaru.New(log, &s.config.MySubaru) + if err != nil { + s.log.Error("couldn't connect to MySubaru server", "error", err.Error()) + } + + s.mysubaru = mys + + return nil +} + +// OneTime . +func (s *MySubaruClient) OneTime() error { + if s.config.Hassio.AutoDiscovery { + time.Sleep(3 * time.Second) + vehicles := s.mysubaru.GetVehicles() + + for _, vehicle := range vehicles { + err := s.bus.Publish("mqtt:publish", s.mySubaruConfigToMQTTHassioConfig(vehicle)) + if err != nil { + s.log.Error("got an error from bus", "error", err.Error()) + + return err + } + err = s.bus.Publish("mqtt:publish", s.mySubaruStatusToMQTTMessage(vehicle)) + if err != nil { + s.log.Error("got an error from bus", "error", err.Error()) + + return err + } + } + } + + return nil +} + +// Serve starts waiting for new TCP connections, and calls the establish +// connection callback for any received. +func (s *MySubaruClient) Serve() { + if atomic.LoadUint32(&s.end) == 1 { + return + } + + var subs []*bus.Message + // Subscribing for the topic to resend auto discovery topics after Home Assistant restarted and got a status "online" + if s.config.Hassio.AutoDiscovery { + subs = append(subs, &bus.Message{ + Topic: s.config.Hassio.Topics.Status, + QOS: 0, + }) + } + // TODO: Go over MySubaru devices with switch and lock options + subs = append(subs, &bus.Message{ + Topic: "mysubarumq/4S4BTGPD0P3199198/lock/set", + QOS: 0, + }) + subs = append(subs, &bus.Message{ + Topic: "mysubarumq/4S4BTGPD0P3199198/ignition/set", + QOS: 0, + }) + s.bus.Publish("mqtt:subscribe", subs) + + tickerS := time.NewTicker(time.Duration(60) * time.Second) + tickerM := time.NewTicker(time.Duration(60) * time.Minute) + + chMQTTLockStatus, err := s.bus.Subscribe("mysubarumq:4S4BTGPD0P3199198:lock", "mysubarumq:4S4BTGPD0P3199198:lock", s.config.SubscriptionSize["device:95452:status"]) + if err != nil { + s.log.Error("couldn't subscribe to a channel", "channel", "mysubarumq:4S4BTGPD0P3199198:lock", "error", err.Error()) + } + + chMQTTIgnitionStatus, err := s.bus.Subscribe("mysubarumq:4S4BTGPD0P3199198:ignition", "mysubarumq:4S4BTGPD0P3199198:ignition", s.config.SubscriptionSize["device:95452:status"]) + if err != nil { + s.log.Error("couldn't subscribe to a channel", "channel", "mysubarumq:4S4BTGPD0P3199198:ignition", "error", err.Error()) + } + + chMQTTHassioStatus, err := s.bus.Subscribe("hassio:status", s.ID(), s.config.SubscriptionSize["hassio:status"]) + if err != nil { + s.log.Error("couldn't subscribe to a channel", "channel", "hassio:status", "error", err.Error()) + } + + // ctx is used only by tests. + // ctx, ctxCancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + + go s.eventLoop(ctx, chMQTTLockStatus, chMQTTIgnitionStatus, chMQTTHassioStatus) + + if atomic.LoadUint32(&s.end) == 0 { + go func() { + for { + select { + case <-tickerS.C: + vehicles := s.mysubaru.GetVehicles() + for _, vehicle := range vehicles { + err := s.bus.Publish("mqtt:publish", s.mySubaruStatusToMQTTMessage(vehicle)) + if err != nil { + s.log.Error("got an error from bus", "error", err.Error()) + } + } + case <-tickerM.C: + vehicles := s.mysubaru.GetVehicles() + for _, vehicle := range vehicles { + vehicle.GetLocation(true) + err := s.bus.Publish("mqtt:publish", s.mySubaruStatusToMQTTMessage(vehicle)) + if err != nil { + s.log.Error("got an error from bus", "error", err.Error()) + } + } + case <-ctx.Done(): + s.log.Info("stopping communication eventloop", "type", s.Type()) + return + } + } + }() + } +} + +// Close closes the listener and any client connections. +func (s *MySubaruClient) Close() { + s.Lock() + defer s.Unlock() + + if atomic.CompareAndSwapUint32(&s.end, 0, 1) { + s.cancel() + s.log.Info("disconnected from mysubaru server", "type", s.Type()) + } +} + +// eventLoop loops forever +func (s *MySubaruClient) eventLoop(ctx context.Context, chMQTTLockStatus, chMQTTIgnitionStatus, chMQTTHassioStatus chan bus.Event) { + s.log.Debug("mysubaru communication event loop started") + defer s.log.Debug("mysubaru communication event loop halted") + + for { + select { + case event := <-chMQTTLockStatus: + for _, message := range event.Data.([]*bus.Message) { + s.log.Debug("received a message with mysubary lock status", "topic", message.Topic, "payload", message.Payload) + if message.Payload == "LOCK" { + v := s.mysubaru.GetVehicleByVIN("4S4BTGPD0P3199198") + v.Lock() + var msgs []*bus.Message + msgs = s.messages("mysubarumq/4S4BTGPD0P3199198/lock", 0, true, "LOCK", msgs) + s.bus.Publish("mqtt:publish", msgs) + } + if message.Payload == "UNLOCK" { + v := s.mysubaru.GetVehicleByVIN("4S4BTGPD0P3199198") + v.Unlock() + var msgs []*bus.Message + msgs = s.messages("mysubarumq/4S4BTGPD0P3199198/lock", 0, true, "UNLOCK", msgs) + s.bus.Publish("mqtt:publish", msgs) + } + } + case event := <-chMQTTIgnitionStatus: + for _, message := range event.Data.([]*bus.Message) { + s.log.Debug("received a message with mysubary ignition status", "topic", message.Topic, "payload", message.Payload) + if message.Payload == "ON" { + v := s.mysubaru.GetVehicleByVIN("4S4BTGPD0P3199198") + v.EngineStart() + var msgs []*bus.Message + msgs = s.messages("mysubarumq/4S4BTGPD0P3199198/ignition", 0, true, "ON", msgs) + s.bus.Publish("mqtt:publish", msgs) + } + if message.Payload == "OFF" { + v := s.mysubaru.GetVehicleByVIN("4S4BTGPD0P3199198") + v.EngineStop() + var msgs []*bus.Message + msgs = s.messages("mysubarumq/4S4BTGPD0P3199198/ignition", 0, true, "OFF", msgs) + s.bus.Publish("mqtt:publish", msgs) + } + } + case event := <-chMQTTHassioStatus: + for _, message := range event.Data.([]*bus.Message) { + s.log.Info("received a message with hassio instance status", "topic", message.Topic, "payload", message.Payload) + if message.Payload == "online" { + s.OneTime() + } + } + case <-ctx.Done(): + s.log.Info("stopping mqtt communication event loop") + return + } + } +} + +// mySubaruConfigToMQTTHassioConfig . +func (s *MySubaruClient) mySubaruConfigToMQTTHassioConfig(v *mysubaru.Vehicle) []*bus.Message { + // { + // "~": "homeassistant/sensor/VIN_SENSOR_NAME", + // "name": null, + // "uniq_id": "4S4BTGPD0P3199198_SENSOR_NAME", + // "obj_id": "", + // "ic": "", + // "stat_t": "~/state", + // "json_attr_t": "~/state", + // "val_tpl": "{{value_json.value}}", + // "dev_cla": "", + // "stat_cla": "", + // "unit_of_meas": "", + // "en": true, + // "ent_cat": "", + // "ent_pic": "", + // "dev": { + // "ids": [ + // "4S4BTGPD0P3199198" + // ], + // "name": "Subaru Outback Touring TX (2023)", + // "mf": "Subaru", + // "mdl": "Outback Touring TX", + // "sw": "1.0", + // "hw": "PDL" + // }, + // "o": { + // "name": "MySubaruMQ", + // "sw": "1.0.0", + // "url": "https://www.github.com/alex-savin/" + // }, + // "avty": [ + // { + // "topic": "" + // } + // ] + // } + var hassioConfig = map[string]string{} + + // availability := `{"avty":["{"topic":"mysubaru/` + v.Vin + `"}"]}` + origin := `"o":{"name":"MySubaruMQ","sw":"1.0.1","url":"https://www.git.savin.nyc/alex/mysubaru-mq"},` + // availability := `"avty":[{"t":"musubarymq/status"}],"avty_t":"{{value_json.value}}",` + device := `"dev":{"ids":["` + v.Vin + `"],"name":"` + v.CarNickname + `","mf":"Subaru Corp.","mdl":"` + v.CarNickname + `","hw":"` + v.ModelCode + `"},` // TODO chnage model to the proper one + obj_id_prefix := strings.Replace(strings.ToLower(v.CarNickname), " ", "_", -1) + // topic := `"~":"mysubaru/` + v.Vin + `",` + topic := "" + // homeassistant/sensor/mysubaru/VIN-NUMBER-HERE/odometer_km/config + hassioConfig[s.config.Hassio.Topics.Discovery+`/sensor/`+v.Vin+`/odometer_km/config`] = `{` + device + origin + topic + `"name":"Odometer (km)","uniq_id":"` + v.Vin + `_odometer_km","obj_id":"` + obj_id_prefix + `_odometer_km","ic":"mdi:counter","stat_t":"mysubarumq/` + v.Vin + `/state","val_tpl":"{{value_json.odometer_km}}","unit_of_meas":"km"}` + // homeassistant/sensor/mysubaru/VIN-NUMBER-HERE/odometer_mi/config + hassioConfig[s.config.Hassio.Topics.Discovery+`/sensor/`+v.Vin+`/odometer_mi/config`] = `{` + device + origin + topic + `"name":"Odometer (mi)","uniq_id":"` + v.Vin + `_odometer_mi","obj_id":"` + obj_id_prefix + `_odometer_mi","ic":"mdi:counter","stat_t":"mysubarumq/` + v.Vin + `/state","val_tpl":"{{value_json.odometer_mi}}","unit_of_meas":"mi"}` + // homeassistant/sensor/mysubaru/VIN-NUMBER-HERE/dist_to_empty_km/config + hassioConfig[s.config.Hassio.Topics.Discovery+`/sensor/`+v.Vin+`/dist_to_empty_km/config`] = `{` + device + origin + topic + `"name":"Distance to Empty (km)","uniq_id":"` + v.Vin + `_dist_to_empty_km","obj_id":"` + obj_id_prefix + `_dist_to_empty_km","ic":"mdi:map-marker-distance","stat_t":"mysubarumq/` + v.Vin + `/state","val_tpl":"{{value_json.dist_to_empty_km}}","unit_of_meas":"km"}` + // homeassistant/sensor/mysubaru/VIN-NUMBER-HERE/dist_to_empty_mi/config + hassioConfig[s.config.Hassio.Topics.Discovery+`/sensor/`+v.Vin+`/dist_to_empty_mi/config`] = `{` + device + origin + topic + `"name":"Distance to Empty (mi)","uniq_id":"` + v.Vin + `_dist_to_empty_mi","obj_id":"` + obj_id_prefix + `_dist_to_empty_mi","ic":"mdi:map-marker-distance","stat_t":"mysubarumq/` + v.Vin + `/state","val_tpl":"{{value_json.dist_to_empty_mi}}","unit_of_meas":"mi"}` + // homeassistant/sensor/mysubaru/VIN-NUMBER-HERE/dist_to_empty_pc/config + hassioConfig[s.config.Hassio.Topics.Discovery+`/sensor/`+v.Vin+`/dist_to_empty_pc/config`] = `{` + device + origin + topic + `"name":"Gas Tank (%)","uniq_id":"` + v.Vin + `_dist_to_empty_pc","obj_id":"` + obj_id_prefix + `_dist_to_empty_pc","ic":"mdi:gauge","stat_t":"mysubarumq/` + v.Vin + `/state","val_tpl":"{{value_json.dist_to_empty_pc}}","unit_of_meas":"%"}` + // homeassistant/sensor/mysubaru/VIN-NUMBER-HERE/consumption_us/config + hassioConfig[s.config.Hassio.Topics.Discovery+`/sensor/`+v.Vin+`/consumption_us/config`] = `{` + device + origin + topic + `"name":"Consumption (MPG)","uniq_id":"` + v.Vin + `_consumption_us","obj_id":"` + obj_id_prefix + `_consumption_us","stat_t":"mysubarumq/` + v.Vin + `/state","val_tpl":"{{value_json.consumption_us}}","unit_of_meas":"MPG"}` + // homeassistant/sensor/mysubaru/VIN-NUMBER-HERE/consumption_eu/config + hassioConfig[s.config.Hassio.Topics.Discovery+`/sensor/`+v.Vin+`/consumption_eu/config`] = `{` + device + origin + topic + `"name":"Consumption (L/100km)","uniq_id":"` + v.Vin + `_consumption_eu","obj_id":"` + obj_id_prefix + `_consumption_eu","stat_t":"mysubarumq/` + v.Vin + `/state","val_tpl":"{{value_json.consumption_eu}}","unit_of_meas":"L100km"}` + // homeassistant/sensor/mysubaru/VIN-NUMBER-HERE/engine_state/consumption_eu/config + hassioConfig[s.config.Hassio.Topics.Discovery+`/sensor/`+v.Vin+`/engine_state/config`] = `{` + device + origin + topic + `"name":"Engine State","uniq_id":"` + v.Vin + `_engine_state","obj_id":"` + obj_id_prefix + `_engine_state","ic":"mdi:engine","stat_t":"mysubarumq/` + v.Vin + `/state","val_tpl":"{{value_json.engine_state}}"}` + hassioConfig[s.config.Hassio.Topics.Discovery+`/device_tracker/`+v.Vin+`/config`] = `{` + device + origin + `"name":"` + v.CarNickname + `","uniq_id":"` + v.Vin + `_device_tracker","obj_id":"` + obj_id_prefix + `","ic":"mdi:car-connected","json_attr_t":"mysubarumq/` + v.Vin + `/attr"}` + + topicState := `mysubarumq/` + v.Vin + `/ignition` + topicSet := `mysubarumq/` + v.Vin + `/ignition/set` + hassioConfig[s.config.Hassio.Topics.Discovery+`/switch/`+v.Vin+`/ignition/config`] = `{` + origin + device + `"name":"Ignition","cmd_t":"` + topicSet + `","stat_t":"` + topicState + `","name":null,"obj_id":"` + obj_id_prefix + `_ignition","ic":"mdi:engine","pl_off":"OFF","pl_on":"ON","uniq_id":"` + v.Vin + `_ignition"}` + + topicState = `mysubarumq/` + v.Vin + `/lock` + topicSet = `mysubarumq/` + v.Vin + `/lock/set` + hassioConfig[s.config.Hassio.Topics.Discovery+`/lock/`+v.Vin+`/config`] = `{` + origin + device + `"name":"Lock","cmd_t":"` + topicSet + `","stat_t":"` + topicState + `","name":null,"obj_id":"` + obj_id_prefix + `_lock","ic":"mdi:car-key","pl_unlk":"UNLOCK","pl_lock":"LOCK","stat_locked":"LOCK","stat_unlocked":"UNLOCK","uniq_id":"` + v.Vin + `_lock"}` + + hassioConfig[s.config.Hassio.Topics.Discovery+`/event/`+v.Vin+`/engine/config`] = `{` + device + origin + `"name":"Ignition Events","dev_cla":"button","evt_typ":["start","stop"],"uniq_id":"` + v.Vin + `_event_ignition","obj_id":"` + obj_id_prefix + `_event_ignition","ic":"mdi:engine","stat_t":"mysubarumq/` + v.Vin + `/event/ignition"}` + hassioConfig[s.config.Hassio.Topics.Discovery+`/event/`+v.Vin+`/lock/config`] = `{` + device + origin + `"name":"Lock Events","dev_cla":"button","evt_typ":["lock","unlock"],"uniq_id":"` + v.Vin + `_event_lock","obj_id":"` + obj_id_prefix + `_event_lock","ic":"mdi:car-key","stat_t":"mysubarumq/` + v.Vin + `/event/lock"}` + + // {"availability":[{"topic":"zigbee02/bridge/state"}],"availability_mode":"all","command_topic":"zigbee02/bridge/request/restart","device":{"hw_version":"zStack3x0 20230507","identifiers":["zigbee2mqtt_bridge_0x00124b00237e0682"],"manufacturer":"Zigbee2MQTT","model":"Bridge","name":"Zigbee2MQTT Bridge","sw_version":"1.35.1"},"device_class":"restart","name":"Restart","object_id":"zigbee2mqtt_bridge_restart","origin":{"name":"Zigbee2MQTT","sw":"1.35.1","url":"https://www.zigbee2mqtt.io"},"payload_press":"","unique_id":"bridge_0x00124b00237e0682_restart_zigbee02"} + // LOCK + // state_topic: "home-assistant/frontdoor/state" + // code_format: "^\\d{4}$" + // command_topic: "home-assistant/frontdoor/set" + // command_template: '{ "action": "{{ value }}", "code":"{{ code }}" }' + // payload_lock: "LOCK" + // payload_unlock: "UNLOCK" + // state_locked: "LOCK" + // state_unlocked: "UNLOCK" + // state_locking: "LOCKING" + // state_unlocking: "UNLOCKING" + // state_jammed: "MOTOR_JAMMED" + // state_ok: "MOTOR_OK" + // optimistic: false + // qos: 1 + // retain: true + // value_template: "{{ value.x }}" + + // mdi:tire + // mdi:car-door + // mdi:car-door-lock | mdi:car-door-lock-open | mdi:car-key + + var msgs []*bus.Message + for topic, payload := range hassioConfig { + msgs = s.messages(topic, 1, true, payload, msgs) + s.log.Debug("hassio mqtt configuration", "topic", string(topic), "payload", string(payload)) + } + + return msgs +} + +// mySubaruStatusToMQTTMessage . +func (s *MySubaruClient) mySubaruStatusToMQTTMessage(v *mysubaru.Vehicle) []*bus.Message { + var state = map[string]string{} + + state[`mysubarumq/`+v.Vin+`/state`] = `{"odometer_km":` + strconv.Itoa(v.Odometer.Kilometers) + `,"odometer_mi":` + strconv.Itoa(v.Odometer.Miles) + `,"dist_to_empty_km":` + strconv.Itoa(v.DistanceToEmpty.Kilometers) + `,"dist_to_empty_mi":` + strconv.Itoa(v.DistanceToEmpty.Miles) + `,"dist_to_empty_pc":` + strconv.Itoa(v.DistanceToEmpty.Percentage) + `,"consumption_us":` + fmt.Sprintf("%.2f", v.FuelConsumptionAvg.MPG) + `,"consumption_eu":` + fmt.Sprintf("%.2f", v.FuelConsumptionAvg.LP100Km) + `,"engine_state":"` + v.EngineState + `"}` + state[`mysubarumq/`+v.Vin+`/attr`] = `{"source_type":"gps","latitude":` + fmt.Sprintf("%.6f", v.GeoLocation.Latitude) + `,"longitude":` + fmt.Sprintf("%.6f", v.GeoLocation.Longitude) + `,"course":` + strconv.Itoa(v.GeoLocation.Heading) + `,"speed":` + fmt.Sprintf("%.2f", v.GeoLocation.Speed) + `,"friendly_name":"` + v.CarNickname + `"}` + + var msgs []*bus.Message + for topic, payload := range state { + msgs = s.messages(topic, 0, false, payload, msgs) + s.log.Debug("hassio mqtt configuration", "topic", string(topic), "payload", string(payload)) + } + + return msgs +} + +// messages . +func (s *MySubaruClient) messages(t string, q byte, r bool, p string, l []*bus.Message) []*bus.Message { + s.log.Debug("hassio mqtt configuration", "topic", string(t), "qos", q, "retained", r, "payload", string(p)) + m := bus.Message{ + Topic: t, + QOS: q, + Retained: r, + Payload: p, + } + + if l != nil { + l = append(l, &m) + return l + } else { + var l []*bus.Message + l = append(l, &m) + return l + } +} diff --git a/workers/workers.go b/workers/workers.go new file mode 100644 index 0000000..79ee10f --- /dev/null +++ b/workers/workers.go @@ -0,0 +1,129 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2023 mysubarumq +// SPDX-FileContributor: alex-savin + +package workers + +import ( + "sync" + + "log/slog" +) + +type Config struct { + // TLSConfig is a tls.Config configuration to be used with the listener. + // See examples folder for basic and mutual-tls use. + Test string +} + +// EstablishFn is a callback function for establishing new clients. +type EstablishFn func(id string) error + +// CloseFn is a callback function for closing all listener clients. +type CloseFn func(id string) + +// Worker . +type Worker interface { + ID() string // returns ID in string format + Type() string // returns the type of the worker + Init(*slog.Logger) error // + OneTime() error // + Serve() // starting actively listening for new connections + Close() // stop and close the worker +} + +// Workers contains the network workers for the app. +type Workers struct { + ClientsWg sync.WaitGroup // a waitgroup that waits for all clients in all workers to finish. + internal map[string]Worker // a map of active workers. + sync.RWMutex +} + +// New returns a new instance of Workers. +func New() *Workers { + return &Workers{ + internal: map[string]Worker{}, + } +} + +// Add adds a new worker to the workers map, keyed on id. +func (w *Workers) Add(val Worker) { + w.Lock() + defer w.Unlock() + w.internal[val.ID()] = val +} + +// Get returns the value of a worker if it exists. +func (w *Workers) Get(id string) (Worker, bool) { + w.RLock() + defer w.RUnlock() + val, ok := w.internal[id] + return val, ok +} + +// Len returns the length of the workers map. +func (w *Workers) Len() int { + w.RLock() + defer w.RUnlock() + return len(w.internal) +} + +// Delete removes a worker from the internal map. +func (w *Workers) Delete(id string) { + w.Lock() + defer w.Unlock() + delete(w.internal, id) +} + +// Serve starts a worker serving from the internal map. +func (w *Workers) Serve(id string) { + w.RLock() + defer w.RUnlock() + worker := w.internal[id] + + go func() { + worker.Serve() + worker.OneTime() + }() +} + +// ServeAll starts all workers serving from the internal map. +func (w *Workers) ServeAll() { + w.RLock() + i := 0 + ids := make([]string, len(w.internal)) + for id := range w.internal { + ids[i] = id + i++ + } + w.RUnlock() + + for _, id := range ids { + w.Serve(id) + } +} + +// Close stops a worker from the internal map. +func (w *Workers) Close(id string) { + w.RLock() + defer w.RUnlock() + if worker, ok := w.internal[id]; ok { + worker.Close() + } +} + +// CloseAll iterates and closes all registered workere. +func (w *Workers) CloseAll() { + w.RLock() + i := 0 + ids := make([]string, len(w.internal)) + for id := range w.internal { + ids[i] = id + i++ + } + w.RUnlock() + + for _, id := range ids { + w.Close(id) + } +}