217 lines
4.9 KiB
Go
217 lines
4.9 KiB
Go
package app
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"git.savin.nyc/alex/go-receipt-tracker/system"
|
|
)
|
|
|
|
const (
|
|
SetOptions byte = iota
|
|
OnSysInfoTick
|
|
OnStarted
|
|
OnStopped
|
|
OnWorkerConnected
|
|
OnWorkerDisconnected
|
|
// OnSysInfoTick
|
|
)
|
|
|
|
var (
|
|
// ErrInvalidConfigType indicates a different Type of config value was expected to what was received.
|
|
ErrInvalidConfigType = errors.New("invalid config type provided")
|
|
)
|
|
|
|
// Hook provides an interface of handlers for different events which occur
|
|
// during the lifecycle of the broker.
|
|
type Hook interface {
|
|
ID() string
|
|
Provides(b byte) bool
|
|
Init(config any) error
|
|
Stop() error
|
|
SetOpts(l *slog.Logger)
|
|
OnSysInfoTick(*system.Info)
|
|
OnStarted()
|
|
OnStopped()
|
|
OnWorkerConnected()
|
|
OnWorkerDisconnected()
|
|
// OnSysInfoTick(*system.Info)
|
|
}
|
|
|
|
// Hooks is a slice of Hook interfaces to be called in sequence.
|
|
type Hooks struct {
|
|
Log *slog.Logger // a logger for the hook (from the server)
|
|
internal atomic.Value // a slice of []Hook
|
|
wg sync.WaitGroup // a waitgroup for syncing hook shutdown
|
|
qty int64 // the number of hooks in use
|
|
sync.Mutex // a mutex for locking when adding hooks
|
|
}
|
|
|
|
// Len returns the number of hooks added.
|
|
func (h *Hooks) Len() int64 {
|
|
return atomic.LoadInt64(&h.qty)
|
|
}
|
|
|
|
// Provides returns true if any one hook provides any of the requested hook methods.
|
|
func (h *Hooks) Provides(b ...byte) bool {
|
|
for _, hook := range h.GetAll() {
|
|
for _, hb := range b {
|
|
if hook.Provides(hb) {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// Add adds and initializes a new hook.
|
|
func (h *Hooks) Add(hook Hook, config any) error {
|
|
h.Lock()
|
|
defer h.Unlock()
|
|
|
|
err := hook.Init(config)
|
|
if err != nil {
|
|
return fmt.Errorf("failed initialising %s hook: %w", hook.ID(), err)
|
|
}
|
|
|
|
i, ok := h.internal.Load().([]Hook)
|
|
if !ok {
|
|
i = []Hook{}
|
|
}
|
|
|
|
i = append(i, hook)
|
|
h.internal.Store(i)
|
|
atomic.AddInt64(&h.qty, 1)
|
|
h.wg.Add(1)
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetAll returns a slice of all the hooks.
|
|
func (h *Hooks) GetAll() []Hook {
|
|
i, ok := h.internal.Load().([]Hook)
|
|
if !ok {
|
|
return []Hook{}
|
|
}
|
|
|
|
return i
|
|
}
|
|
|
|
// Stop indicates all attached hooks to gracefully end.
|
|
func (h *Hooks) Stop() {
|
|
go func() {
|
|
for _, hook := range h.GetAll() {
|
|
h.Log.Info("stopping hook", "hook", hook.ID())
|
|
if err := hook.Stop(); err != nil {
|
|
h.Log.Debug("problem stopping hook", "error", err, "hook", hook.ID())
|
|
}
|
|
|
|
h.wg.Done()
|
|
}
|
|
}()
|
|
|
|
h.wg.Wait()
|
|
}
|
|
|
|
// OnSysInfoTick is called when the $SYS topic values are published out.
|
|
func (h *Hooks) OnSysInfoTick(sys *system.Info) {
|
|
for _, hook := range h.GetAll() {
|
|
if hook.Provides(OnSysInfoTick) {
|
|
hook.OnSysInfoTick(sys)
|
|
}
|
|
}
|
|
}
|
|
|
|
// OnStarted is called when the server has successfully started.
|
|
func (h *Hooks) OnStarted() {
|
|
for _, hook := range h.GetAll() {
|
|
if hook.Provides(OnStarted) {
|
|
hook.OnStarted()
|
|
}
|
|
}
|
|
}
|
|
|
|
// OnStopped is called when the server has successfully stopped.
|
|
func (h *Hooks) OnStopped() {
|
|
for _, hook := range h.GetAll() {
|
|
if hook.Provides(OnStopped) {
|
|
hook.OnStopped()
|
|
}
|
|
}
|
|
}
|
|
|
|
// OnWorkerConnected is called when the worker has successfully connected.
|
|
func (h *Hooks) OnWorkerConnected() {
|
|
for _, hook := range h.GetAll() {
|
|
if hook.Provides(OnWorkerConnected) {
|
|
hook.OnWorkerConnected()
|
|
}
|
|
}
|
|
}
|
|
|
|
// OnWorkerDisconnected is called when the worker has disconnected.
|
|
func (h *Hooks) OnWorkerDisconnected() {
|
|
for _, hook := range h.GetAll() {
|
|
if hook.Provides(OnWorkerDisconnected) {
|
|
hook.OnWorkerDisconnected()
|
|
}
|
|
}
|
|
}
|
|
|
|
// HookBase provides a set of default methods for each hook. It should be embedded in
|
|
// all hooks.
|
|
type HookBase struct {
|
|
Hook
|
|
Log *slog.Logger
|
|
}
|
|
|
|
// ID returns the ID of the hook.
|
|
func (h *HookBase) ID() string {
|
|
return "base"
|
|
}
|
|
|
|
// Provides indicates which methods a hook provides. The default is none - this method
|
|
// should be overridden by the embedding hook.
|
|
func (h *HookBase) Provides(b byte) bool {
|
|
return false
|
|
}
|
|
|
|
// Init performs any pre-start initializations for the hook, such as connecting to databases
|
|
// or opening files.
|
|
func (h *HookBase) Init(config any) error {
|
|
return nil
|
|
}
|
|
|
|
// SetOpts is called by the server to propagate internal values and generally should
|
|
// not be called manually.
|
|
func (h *HookBase) SetOpts(l *slog.Logger) {
|
|
h.Log = l
|
|
}
|
|
|
|
// Stop is called to gracefully shut down the hook.
|
|
func (h *HookBase) Stop() error {
|
|
return nil
|
|
}
|
|
|
|
// OnSysInfoTick is called when the server publishes system info.
|
|
func (h *HookBase) OnSysInfoTick(*system.Info) {}
|
|
|
|
// OnStarted is called when the server starts.
|
|
func (h *HookBase) OnStarted() {}
|
|
|
|
// OnStopped is called when the server stops.
|
|
func (h *HookBase) OnStopped() {}
|
|
|
|
// OnWorkerConnected is called when the worker connected.
|
|
func (h *HookBase) OnWorkerConnected() {}
|
|
|
|
// OnWorkerDisconnected is called when the worker disconnected.
|
|
func (h *HookBase) OnWorkerDisconnected() {}
|
|
|
|
// // OnSysInfoTick is called when the server publishes system info.
|
|
// func (h *HookBase) OnSysInfoTick(*system.Info) {}
|