Files
go-receipt-tracker/bus/bus.go
2025-02-03 17:50:47 -05:00

91 lines
2.2 KiB
Go

package bus
import (
"fmt"
"sync"
"log/slog"
"git.savin.nyc/alex/go-receipt-tracker/config"
)
// Bus should be created by New().
type Bus struct {
opts *config.Bus
log *slog.Logger
channels map[string]*eventChannel
mu sync.Mutex
}
// New creates new Bus instance.
func New(opts *config.Bus, log *slog.Logger) *Bus {
b := &Bus{
opts: opts,
log: log,
channels: make(map[string]*eventChannel),
}
return b
}
// Subscribe adds new subscriber to channel.
func (b *Bus) Subscribe(channelName, subName string) (chan Event, error) {
b.mu.Lock()
defer b.mu.Unlock()
if _, ok := b.channels[channelName]; !ok {
b.channels[channelName] = newEventChannel()
b.log.Debug("created a new channel", "channel", channelName)
}
ch := b.channels[channelName]
if err := ch.addSubscriber(subName, b.opts.SubscriptionSize[channelName]); err != nil {
return nil, err
}
b.log.Debug("added a new subscriber to a channel", "channel", channelName, "subscriber", subName)
return ch.subscribers[subName], nil
}
// Unsubscribe removes subscriber from channel.
// If this is last subscriber channel will be removed.
func (b *Bus) Unsubscribe(channelName, subName string) {
b.mu.Lock()
defer b.mu.Unlock()
channel, ok := b.channels[channelName]
if !ok {
return
}
channel.delSubscriber(subName)
if len(channel.subscribers) == 0 {
delete(b.channels, channelName)
}
}
// Publish data to channel.
func (b *Bus) Publish(channelName string, payload interface{}) error {
channel, ok := b.channels[channelName]
if !ok {
return fmt.Errorf("channel %s don't exists", channelName)
}
e := Event{
ChannelName: channelName,
Payload: payload,
}
for subName, subscriber := range channel.subscribers {
b.log.Debug("some channel info", "channel", channelName, "cap", cap(subscriber), "len", len(subscriber))
if cap(subscriber) > 0 && len(subscriber) >= cap(subscriber) {
b.log.Info("channel %s for subscriber %s is full, not publishing new messages", channelName, subName)
continue
}
subscriber <- e
b.log.Debug("published a new message to a channel", "channel", channelName, "subscriber", subName)
// b.log.Debug("published a new message to a channel", "channel", channelName, "subscriber", subName, "payload", payload)
}
return nil
}