220 lines
6.8 KiB
Go
220 lines
6.8 KiB
Go
// SPDX-License-Identifier: MIT
|
|
// SPDX-FileCopyrightText: 2023 mysubarumq
|
|
// SPDX-FileContributor: alex-savin
|
|
|
|
package app
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"log/slog"
|
|
"runtime"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.savin.nyc/alex/mysubaru-mq/bus"
|
|
"git.savin.nyc/alex/mysubaru-mq/config"
|
|
"git.savin.nyc/alex/mysubaru-mq/listeners"
|
|
"git.savin.nyc/alex/mysubaru-mq/system"
|
|
"git.savin.nyc/alex/mysubaru-mq/workers"
|
|
)
|
|
|
|
const (
|
|
AppVersion = "1.0.1" // the current application version.
|
|
AppName = "mysubarumq" // the short app name
|
|
)
|
|
|
|
var (
|
|
ErrWorkerIDExists = errors.New("worker id already exists")
|
|
ErrListenerIDExists = errors.New("listener id already exists")
|
|
)
|
|
|
|
type App struct {
|
|
Options *config.Config // configurable server options
|
|
Workers *workers.Workers // worker are network interfaces which listen for new connections
|
|
Listeners *listeners.Listeners // listeners are network interfaces which listen for new connections
|
|
Info *system.Info // values about the server commonly known as $SYS topics
|
|
Log *slog.Logger // minimal no-alloc logger
|
|
bus *bus.Bus //
|
|
hooks *Hooks // hooks contains hooks for extra functionality such as auth and persistent storage
|
|
loop *loop // loop contains tickers for the system event loop
|
|
cancel context.CancelFunc //
|
|
// done chan bool // indicate that the server is ending
|
|
}
|
|
|
|
// loop contains interval tickers for the system events loop.
|
|
type loop struct {
|
|
sysInfo *time.Ticker // interval ticker for sending updating $SYS topics
|
|
}
|
|
type Info struct {
|
|
AppVersion string `json:"version"` // the current version of the server
|
|
Started int64 `json:"started"` // the time the server started in unix seconds
|
|
MemoryAlloc int64 `json:"memory_alloc"` // memory currently allocated
|
|
Threads int64 `json:"threads"` // number of active goroutines, named as threads for platform ambiguity
|
|
Time int64 `json:"time"` // current time on the server
|
|
Uptime int64 `json:"uptime"` // the number of seconds the server has been online
|
|
}
|
|
|
|
// EstablishConnection establishes a new client when a listener accepts a new connection.
|
|
// func (a *App) EstablishConnection(listener string, c net.Conn) error {
|
|
// cl := a.NewClient(c, listener, "", false)
|
|
// return a.attachClient(cl, listener)
|
|
// }
|
|
|
|
// New returns a new instance of mochi mqtt broker. Optional parameters
|
|
// can be specified to override some default settings (see Options).
|
|
func New(opts *config.Config, bus *bus.Bus, logger *slog.Logger) *App {
|
|
if opts == nil {
|
|
slog.Error("empty config")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
var m runtime.MemStats
|
|
runtime.ReadMemStats(&m)
|
|
a := &App{
|
|
Options: opts,
|
|
Workers: workers.New(),
|
|
Listeners: listeners.New(),
|
|
Info: &system.Info{
|
|
Version: AppVersion,
|
|
Started: time.Now().Unix(),
|
|
MemoryAlloc: int64(m.HeapInuse),
|
|
Threads: int64(runtime.NumGoroutine()),
|
|
// Time: time.Now().Unix(),
|
|
// Uptime: time.Now().Unix() - atomic.LoadInt64(&a.Started),
|
|
},
|
|
Log: logger,
|
|
bus: bus,
|
|
hooks: &Hooks{
|
|
Log: logger,
|
|
},
|
|
loop: &loop{
|
|
sysInfo: time.NewTicker(time.Second * time.Duration(1)),
|
|
},
|
|
cancel: cancel,
|
|
// done: make(chan bool),
|
|
}
|
|
|
|
chAppWorker, err := a.bus.Subscribe("app:connected", "mysubarumq", a.Options.SubscriptionSize["app:connected"])
|
|
if err != nil {
|
|
a.Log.Error("couldn't subscribe to a channel", "error", err.Error())
|
|
}
|
|
|
|
go a.eventLoop(ctx, chAppWorker)
|
|
|
|
return a
|
|
}
|
|
|
|
// AddHook attaches a new Hook to the server. Ideally, this should be called
|
|
// before the server is started with s.Serve().
|
|
func (a *App) AddHook(hook Hook, config any) error {
|
|
nl := a.Log.With("hook", hook.ID())
|
|
hook.SetOpts(nl)
|
|
|
|
a.Log.Info("added hook", "hook", hook.ID())
|
|
return a.hooks.Add(hook, config)
|
|
}
|
|
|
|
// AddListener adds a new network listener to the server, for receiving incoming client connections.
|
|
func (a *App) AddWorker(w workers.Worker) error {
|
|
if _, ok := a.Workers.Get(w.ID()); ok {
|
|
return ErrWorkerIDExists
|
|
}
|
|
|
|
nl := a.Log.With(slog.String("worker", w.ID()))
|
|
err := w.Init(nl)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
a.Workers.Add(w)
|
|
|
|
a.Log.Info("attached worker", "id", w.ID(), "type", w.Type())
|
|
return nil
|
|
}
|
|
|
|
// AddListener adds a new network listener to the server, for receiving incoming client connections.
|
|
func (a *App) AddListener(l listeners.Listener) error {
|
|
if _, ok := a.Listeners.Get(l.ID()); ok {
|
|
return ErrListenerIDExists
|
|
}
|
|
|
|
nl := a.Log.With(slog.String("listener", l.ID()))
|
|
err := l.Init(nl)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
a.Listeners.Add(l)
|
|
|
|
a.Log.Info("attached listener", "id", l.ID(), "protocol", l.Protocol(), "address", l.Address())
|
|
return nil
|
|
}
|
|
|
|
// Serve starts the event loops responsible for establishing client connections
|
|
// on all attached listeners, publishing the system topics, and starting all hooks.
|
|
func (a *App) Serve() error {
|
|
a.Log.Info("mysubarumq starting", "version", AppVersion)
|
|
defer a.Log.Info("mysubarumq started")
|
|
|
|
a.Workers.ServeAll() // start listening on all workers.
|
|
a.Listeners.ServeAll() // start listening on all listeners.
|
|
a.publishSysTopics()
|
|
a.hooks.OnStarted()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close attempts to gracefully shut down the server, all listeners, clients, and stores.
|
|
func (a *App) Close() error {
|
|
// close(a.done)
|
|
a.cancel()
|
|
a.Log.Info("gracefully stopping mysubarumq")
|
|
a.Workers.CloseAll()
|
|
a.Listeners.CloseAll()
|
|
a.hooks.OnStopped()
|
|
a.hooks.Stop()
|
|
|
|
a.Log.Info("mysubarumq stopped")
|
|
return nil
|
|
}
|
|
|
|
// eventLoop loops forever
|
|
func (a *App) eventLoop(ctx context.Context, chAppWorker chan bus.Event) {
|
|
a.Log.Info("app communication event loop started")
|
|
defer a.Log.Info("app communication event loop halted")
|
|
|
|
for {
|
|
select {
|
|
case <-a.loop.sysInfo.C:
|
|
a.publishSysTopics()
|
|
case event := <-chAppWorker:
|
|
message := event.Data.(*bus.ConnectionStatus)
|
|
a.Log.Info("got a message from worker", "worker", message.WorkerID, "type", message.WorkerType, "isconnected", message.IsConnected)
|
|
if !message.IsConnected {
|
|
a.hooks.OnWorkerDisconnected()
|
|
a.Workers.Close(message.WorkerID)
|
|
}
|
|
a.hooks.OnWorkerConnected()
|
|
case <-ctx.Done():
|
|
a.Log.Info("stopping app communication event loop")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// publishSysTopics publishes the current values to the server $SYS topics.
|
|
// Due to the int to string conversions this method is not as cheap as
|
|
// some of the others so the publishing interval should be set appropriately.
|
|
func (a *App) publishSysTopics() {
|
|
var m runtime.MemStats
|
|
runtime.ReadMemStats(&m)
|
|
atomic.StoreInt64(&a.Info.MemoryAlloc, int64(m.HeapInuse))
|
|
atomic.StoreInt64(&a.Info.Threads, int64(runtime.NumGoroutine()))
|
|
atomic.StoreInt64(&a.Info.Time, time.Now().Unix())
|
|
atomic.StoreInt64(&a.Info.Uptime, time.Now().Unix()-atomic.LoadInt64(&a.Info.Started))
|
|
|
|
a.hooks.OnSysInfoTick(a.Info)
|
|
}
|