// SPDX-License-Identifier: MIT // SPDX-FileCopyrightText: 2023 mysubarumq // SPDX-FileContributor: alex-savin package app import ( "context" "errors" "log/slog" "runtime" "sync/atomic" "time" "git.savin.nyc/alex/mysubaru-mq/bus" "git.savin.nyc/alex/mysubaru-mq/config" "git.savin.nyc/alex/mysubaru-mq/listeners" "git.savin.nyc/alex/mysubaru-mq/system" "git.savin.nyc/alex/mysubaru-mq/workers" ) const ( AppVersion = "1.0.1" // the current application version. AppName = "mysubarumq" // the short app name ) var ( ErrWorkerIDExists = errors.New("worker id already exists") ErrListenerIDExists = errors.New("listener id already exists") ) type App struct { Options *config.Config // configurable server options Workers *workers.Workers // worker are network interfaces which listen for new connections Listeners *listeners.Listeners // listeners are network interfaces which listen for new connections Info *system.Info // values about the server commonly known as $SYS topics Log *slog.Logger // minimal no-alloc logger bus *bus.Bus // hooks *Hooks // hooks contains hooks for extra functionality such as auth and persistent storage loop *loop // loop contains tickers for the system event loop cancel context.CancelFunc // // done chan bool // indicate that the server is ending } // loop contains interval tickers for the system events loop. type loop struct { sysInfo *time.Ticker // interval ticker for sending updating $SYS topics } type Info struct { AppVersion string `json:"version"` // the current version of the server Started int64 `json:"started"` // the time the server started in unix seconds MemoryAlloc int64 `json:"memory_alloc"` // memory currently allocated Threads int64 `json:"threads"` // number of active goroutines, named as threads for platform ambiguity Time int64 `json:"time"` // current time on the server Uptime int64 `json:"uptime"` // the number of seconds the server has been online } // EstablishConnection establishes a new client when a listener accepts a new connection. // func (a *App) EstablishConnection(listener string, c net.Conn) error { // cl := a.NewClient(c, listener, "", false) // return a.attachClient(cl, listener) // } // New returns a new instance of mochi mqtt broker. Optional parameters // can be specified to override some default settings (see Options). func New(opts *config.Config, bus *bus.Bus, logger *slog.Logger) *App { if opts == nil { slog.Error("empty config") } ctx, cancel := context.WithCancel(context.Background()) var m runtime.MemStats runtime.ReadMemStats(&m) a := &App{ Options: opts, Workers: workers.New(), Listeners: listeners.New(), Info: &system.Info{ Version: AppVersion, Started: time.Now().Unix(), MemoryAlloc: int64(m.HeapInuse), Threads: int64(runtime.NumGoroutine()), // Time: time.Now().Unix(), // Uptime: time.Now().Unix() - atomic.LoadInt64(&a.Started), }, Log: logger, bus: bus, hooks: &Hooks{ Log: logger, }, loop: &loop{ sysInfo: time.NewTicker(time.Second * time.Duration(1)), }, cancel: cancel, // done: make(chan bool), } chAppWorker, err := a.bus.Subscribe("app:connected", "mysubarumq", a.Options.SubscriptionSize["app:connected"]) if err != nil { a.Log.Error("couldn't subscribe to a channel", "error", err.Error()) } go a.eventLoop(ctx, chAppWorker) return a } // AddHook attaches a new Hook to the server. Ideally, this should be called // before the server is started with s.Serve(). func (a *App) AddHook(hook Hook, config any) error { nl := a.Log.With("hook", hook.ID()) hook.SetOpts(nl) a.Log.Info("added hook", "hook", hook.ID()) return a.hooks.Add(hook, config) } // AddListener adds a new network listener to the server, for receiving incoming client connections. func (a *App) AddWorker(w workers.Worker) error { if _, ok := a.Workers.Get(w.ID()); ok { return ErrWorkerIDExists } nl := a.Log.With(slog.String("worker", w.ID())) err := w.Init(nl) if err != nil { return err } a.Workers.Add(w) a.Log.Info("attached worker", "id", w.ID(), "type", w.Type()) return nil } // AddListener adds a new network listener to the server, for receiving incoming client connections. func (a *App) AddListener(l listeners.Listener) error { if _, ok := a.Listeners.Get(l.ID()); ok { return ErrListenerIDExists } nl := a.Log.With(slog.String("listener", l.ID())) err := l.Init(nl) if err != nil { return err } a.Listeners.Add(l) a.Log.Info("attached listener", "id", l.ID(), "protocol", l.Protocol(), "address", l.Address()) return nil } // Serve starts the event loops responsible for establishing client connections // on all attached listeners, publishing the system topics, and starting all hooks. func (a *App) Serve() error { a.Log.Info("mysubarumq starting", "version", AppVersion) defer a.Log.Info("mysubarumq started") a.Workers.ServeAll() // start listening on all workers. a.Listeners.ServeAll() // start listening on all listeners. a.publishSysTopics() a.hooks.OnStarted() return nil } // Close attempts to gracefully shut down the server, all listeners, clients, and stores. func (a *App) Close() error { // close(a.done) a.cancel() a.Log.Info("gracefully stopping mysubarumq") a.Workers.CloseAll() a.Listeners.CloseAll() a.hooks.OnStopped() a.hooks.Stop() a.Log.Info("mysubarumq stopped") return nil } // eventLoop loops forever func (a *App) eventLoop(ctx context.Context, chAppWorker chan bus.Event) { a.Log.Info("app communication event loop started") defer a.Log.Info("app communication event loop halted") for { select { case <-a.loop.sysInfo.C: a.publishSysTopics() case event := <-chAppWorker: message := event.Data.(*bus.ConnectionStatus) a.Log.Info("got a message from worker", "worker", message.WorkerID, "type", message.WorkerType, "isconnected", message.IsConnected) if !message.IsConnected { a.hooks.OnWorkerDisconnected() a.Workers.Close(message.WorkerID) } a.hooks.OnWorkerConnected() case <-ctx.Done(): a.Log.Info("stopping app communication event loop") return } } } // publishSysTopics publishes the current values to the server $SYS topics. // Due to the int to string conversions this method is not as cheap as // some of the others so the publishing interval should be set appropriately. func (a *App) publishSysTopics() { var m runtime.MemStats runtime.ReadMemStats(&m) atomic.StoreInt64(&a.Info.MemoryAlloc, int64(m.HeapInuse)) atomic.StoreInt64(&a.Info.Threads, int64(runtime.NumGoroutine())) atomic.StoreInt64(&a.Info.Time, time.Now().Unix()) atomic.StoreInt64(&a.Info.Uptime, time.Now().Unix()-atomic.LoadInt64(&a.Info.Started)) a.hooks.OnSysInfoTick(a.Info) }