alpha version
This commit is contained in:
90
bus/bus.go
Normal file
90
bus/bus.go
Normal file
@ -0,0 +1,90 @@
|
||||
package bus
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"log/slog"
|
||||
|
||||
"github.com/alex-savin/go-receipt-tracker/config"
|
||||
)
|
||||
|
||||
// Bus should be created by New().
|
||||
type Bus struct {
|
||||
opts *config.Bus
|
||||
log *slog.Logger
|
||||
channels map[string]*eventChannel
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// New creates new Bus instance.
|
||||
func New(opts *config.Bus, log *slog.Logger) *Bus {
|
||||
b := &Bus{
|
||||
opts: opts,
|
||||
log: log,
|
||||
channels: make(map[string]*eventChannel),
|
||||
}
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
// Subscribe adds new subscriber to channel.
|
||||
func (b *Bus) Subscribe(channelName, subName string) (chan Event, error) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
if _, ok := b.channels[channelName]; !ok {
|
||||
b.channels[channelName] = newEventChannel()
|
||||
b.log.Debug("created a new channel", "channel", channelName)
|
||||
}
|
||||
|
||||
ch := b.channels[channelName]
|
||||
|
||||
if err := ch.addSubscriber(subName, b.opts.SubscriptionSize[channelName]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b.log.Debug("added a new subscriber to a channel", "channel", channelName, "subscriber", subName)
|
||||
|
||||
return ch.subscribers[subName], nil
|
||||
}
|
||||
|
||||
// Unsubscribe removes subscriber from channel.
|
||||
// If this is last subscriber channel will be removed.
|
||||
func (b *Bus) Unsubscribe(channelName, subName string) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
channel, ok := b.channels[channelName]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
channel.delSubscriber(subName)
|
||||
|
||||
if len(channel.subscribers) == 0 {
|
||||
delete(b.channels, channelName)
|
||||
}
|
||||
}
|
||||
|
||||
// Publish data to channel.
|
||||
func (b *Bus) Publish(channelName string, payload interface{}) error {
|
||||
channel, ok := b.channels[channelName]
|
||||
if !ok {
|
||||
return fmt.Errorf("channel %s don't exists", channelName)
|
||||
}
|
||||
e := Event{
|
||||
ChannelName: channelName,
|
||||
Payload: payload,
|
||||
}
|
||||
|
||||
for subName, subscriber := range channel.subscribers {
|
||||
b.log.Debug("some channel info", "channel", channelName, "cap", cap(subscriber), "len", len(subscriber))
|
||||
if cap(subscriber) > 0 && len(subscriber) >= cap(subscriber) {
|
||||
b.log.Info("channel %s for subscriber %s is full, not publishing new messages", channelName, subName)
|
||||
continue
|
||||
}
|
||||
subscriber <- e
|
||||
b.log.Debug("published a new message to a channel", "channel", channelName, "subscriber", subName)
|
||||
// b.log.Debug("published a new message to a channel", "channel", channelName, "subscriber", subName, "payload", payload)
|
||||
}
|
||||
return nil
|
||||
}
|
495
bus/bus_test.go_
Normal file
495
bus/bus_test.go_
Normal file
@ -0,0 +1,495 @@
|
||||
package bus
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewEventChannel(t *testing.T) {
|
||||
ch := newEventChannel()
|
||||
require.Equal(t, 0, len(ch.subscribers))
|
||||
}
|
||||
|
||||
func TestAddSubscriber(t *testing.T) {
|
||||
tests := []struct {
|
||||
inputEventChannel *eventChannel
|
||||
inputSubscriptionName string
|
||||
inputSubscriptionSize int
|
||||
expectedError error
|
||||
expectedSubscribers int
|
||||
expectedSubscriptionSize int
|
||||
}{
|
||||
{
|
||||
inputEventChannel: &eventChannel{
|
||||
subscribers: make(map[string]chan Event),
|
||||
},
|
||||
inputSubscriptionName: "test",
|
||||
inputSubscriptionSize: 1024,
|
||||
expectedSubscribers: 1,
|
||||
expectedSubscriptionSize: 1024,
|
||||
},
|
||||
{
|
||||
inputEventChannel: &eventChannel{
|
||||
subscribers: map[string]chan Event{
|
||||
"existing": make(chan Event),
|
||||
},
|
||||
},
|
||||
inputSubscriptionName: "test",
|
||||
inputSubscriptionSize: 1024,
|
||||
expectedSubscribers: 2,
|
||||
expectedSubscriptionSize: 1024,
|
||||
},
|
||||
{
|
||||
inputEventChannel: &eventChannel{
|
||||
subscribers: map[string]chan Event{
|
||||
"existing": make(chan Event),
|
||||
},
|
||||
},
|
||||
inputSubscriptionName: "test",
|
||||
inputSubscriptionSize: 0,
|
||||
expectedSubscribers: 2,
|
||||
expectedSubscriptionSize: 0,
|
||||
},
|
||||
{
|
||||
inputEventChannel: &eventChannel{
|
||||
subscribers: map[string]chan Event{
|
||||
"test": make(chan Event),
|
||||
},
|
||||
},
|
||||
inputSubscriptionName: "test",
|
||||
inputSubscriptionSize: 1024,
|
||||
expectedSubscribers: 1,
|
||||
expectedSubscriptionSize: 0,
|
||||
expectedError: errors.New("subscriber test already exists"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
err := test.inputEventChannel.addSubscriber(test.inputSubscriptionName, test.inputSubscriptionSize)
|
||||
require.Equal(t, test.expectedError, err)
|
||||
require.Equal(t, test.expectedSubscribers, len(test.inputEventChannel.subscribers))
|
||||
require.Equal(t, test.expectedSubscriptionSize, cap(test.inputEventChannel.subscribers[test.inputSubscriptionName]))
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelSubscriber(t *testing.T) {
|
||||
tests := []struct {
|
||||
inputEventChannel *eventChannel
|
||||
inputSubscriptionName string
|
||||
expectedSubscribers int
|
||||
expectedSubscriptionSize int
|
||||
}{
|
||||
{
|
||||
inputEventChannel: &eventChannel{
|
||||
subscribers: make(map[string]chan Event),
|
||||
},
|
||||
inputSubscriptionName: "test",
|
||||
expectedSubscribers: 0,
|
||||
},
|
||||
{
|
||||
inputEventChannel: &eventChannel{
|
||||
subscribers: map[string]chan Event{
|
||||
"existing": make(chan Event),
|
||||
},
|
||||
},
|
||||
inputSubscriptionName: "test",
|
||||
expectedSubscribers: 1,
|
||||
},
|
||||
{
|
||||
inputEventChannel: &eventChannel{
|
||||
subscribers: map[string]chan Event{
|
||||
"test": make(chan Event),
|
||||
},
|
||||
},
|
||||
inputSubscriptionName: "test",
|
||||
expectedSubscribers: 0,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
test.inputEventChannel.delSubscriber(test.inputSubscriptionName)
|
||||
require.Equal(t, test.expectedSubscribers, len(test.inputEventChannel.subscribers))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
b := New()
|
||||
require.Equal(t, 0, len(b.channels))
|
||||
}
|
||||
|
||||
func TestSubscribe(t *testing.T) {
|
||||
tests := []struct {
|
||||
inputBus *Bus
|
||||
inputChannelName string
|
||||
inputSubscriptionName string
|
||||
inputSubscriptionSize int
|
||||
expectedError error
|
||||
expectedChannels int
|
||||
expectedSubscribers int
|
||||
expectedSubscriptionSize int
|
||||
}{
|
||||
{
|
||||
inputBus: New(),
|
||||
inputChannelName: "test",
|
||||
inputSubscriptionName: "test",
|
||||
inputSubscriptionSize: 1024,
|
||||
expectedChannels: 1,
|
||||
expectedSubscribers: 1,
|
||||
expectedSubscriptionSize: 1024,
|
||||
},
|
||||
{
|
||||
inputBus: &Bus{
|
||||
channels: map[string]*eventChannel{
|
||||
"existing": newEventChannel(),
|
||||
},
|
||||
},
|
||||
inputChannelName: "test",
|
||||
inputSubscriptionName: "test",
|
||||
inputSubscriptionSize: 1024,
|
||||
expectedChannels: 2,
|
||||
expectedSubscribers: 1,
|
||||
expectedSubscriptionSize: 1024,
|
||||
},
|
||||
{
|
||||
inputBus: &Bus{
|
||||
channels: map[string]*eventChannel{
|
||||
"test": {
|
||||
subscribers: map[string]chan Event{
|
||||
"existing": make(chan Event),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
inputChannelName: "test",
|
||||
inputSubscriptionName: "test",
|
||||
inputSubscriptionSize: 1024,
|
||||
expectedChannels: 1,
|
||||
expectedSubscribers: 2,
|
||||
expectedSubscriptionSize: 1024,
|
||||
},
|
||||
{
|
||||
inputBus: &Bus{
|
||||
channels: map[string]*eventChannel{
|
||||
"test": {
|
||||
subscribers: map[string]chan Event{
|
||||
"test": make(chan Event),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
inputChannelName: "test",
|
||||
inputSubscriptionName: "test",
|
||||
inputSubscriptionSize: 1024,
|
||||
expectedChannels: 1,
|
||||
expectedSubscribers: 1,
|
||||
expectedSubscriptionSize: 0,
|
||||
expectedError: errors.New("subscriber test already exists"),
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
ch, err := test.inputBus.Subscribe(test.inputChannelName, test.inputSubscriptionName, test.inputSubscriptionSize)
|
||||
require.Equal(t, test.expectedError, err)
|
||||
require.Equal(t, test.expectedChannels, len(test.inputBus.channels))
|
||||
require.Equal(t, test.expectedSubscribers, len(test.inputBus.channels[test.inputChannelName].subscribers))
|
||||
require.Equal(t, test.expectedSubscriptionSize, cap(ch))
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnsubscribe(t *testing.T) {
|
||||
tests := []struct {
|
||||
inputBus *Bus
|
||||
inputChannelName string
|
||||
inputSubscriptionName string
|
||||
expectedChannels int
|
||||
expectedSubscribers int
|
||||
}{
|
||||
{
|
||||
inputBus: New(),
|
||||
inputChannelName: "test",
|
||||
inputSubscriptionName: "test",
|
||||
expectedChannels: 0,
|
||||
expectedSubscribers: 0,
|
||||
},
|
||||
{
|
||||
inputBus: &Bus{
|
||||
channels: map[string]*eventChannel{
|
||||
"existing": newEventChannel(),
|
||||
},
|
||||
},
|
||||
inputChannelName: "test",
|
||||
inputSubscriptionName: "test",
|
||||
expectedChannels: 1,
|
||||
expectedSubscribers: 0,
|
||||
},
|
||||
{
|
||||
inputBus: &Bus{
|
||||
channels: map[string]*eventChannel{
|
||||
"test": {
|
||||
subscribers: map[string]chan Event{
|
||||
"existing": make(chan Event),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
inputChannelName: "test",
|
||||
inputSubscriptionName: "test",
|
||||
expectedChannels: 1,
|
||||
expectedSubscribers: 1,
|
||||
},
|
||||
{
|
||||
inputBus: &Bus{
|
||||
channels: map[string]*eventChannel{
|
||||
"test": {
|
||||
subscribers: map[string]chan Event{
|
||||
"test": make(chan Event),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
inputChannelName: "test",
|
||||
inputSubscriptionName: "test",
|
||||
expectedChannels: 0,
|
||||
expectedSubscribers: 0,
|
||||
},
|
||||
{
|
||||
inputBus: &Bus{
|
||||
channels: map[string]*eventChannel{
|
||||
"test": {
|
||||
subscribers: map[string]chan Event{
|
||||
"existing": make(chan Event),
|
||||
"test": make(chan Event),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
inputChannelName: "test",
|
||||
inputSubscriptionName: "test",
|
||||
expectedChannels: 1,
|
||||
expectedSubscribers: 1,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
test.inputBus.Unsubscribe(test.inputChannelName, test.inputSubscriptionName)
|
||||
require.Equal(t, test.expectedChannels, len(test.inputBus.channels))
|
||||
if test.expectedSubscribers > 0 {
|
||||
require.Equal(t, test.expectedSubscribers, len(test.inputBus.channels[test.inputChannelName].subscribers))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPublish(t *testing.T) {
|
||||
tests := []struct {
|
||||
inputSubscribers func(*Bus, chan map[string]int)
|
||||
inputPublish func(*Bus)
|
||||
expectedOutput map[string]int
|
||||
}{
|
||||
{
|
||||
inputSubscribers: func(b *Bus, output chan map[string]int) {
|
||||
s1, err := b.Subscribe("test", "s1", 1024)
|
||||
require.Nil(t, err)
|
||||
go func() {
|
||||
r := make(map[string]int)
|
||||
for {
|
||||
select {
|
||||
case e := <-s1:
|
||||
require.Equal(t, "test", e.ChannelName)
|
||||
r["s1"]++
|
||||
case <-time.After(5 * time.Millisecond):
|
||||
output <- r
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
},
|
||||
inputPublish: func(b *Bus) {
|
||||
err := b.Publish("test", "test")
|
||||
require.Nil(t, err)
|
||||
},
|
||||
expectedOutput: map[string]int{"s1": 1},
|
||||
},
|
||||
{
|
||||
inputSubscribers: func(b *Bus, output chan map[string]int) {
|
||||
s1, err := b.Subscribe("test", "s1", 1024)
|
||||
require.Nil(t, err)
|
||||
go func() {
|
||||
r := make(map[string]int)
|
||||
for {
|
||||
select {
|
||||
case e := <-s1:
|
||||
require.Equal(t, "test", e.ChannelName)
|
||||
r["s1"]++
|
||||
case <-time.After(5 * time.Millisecond):
|
||||
output <- r
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
},
|
||||
inputPublish: func(b *Bus) {
|
||||
err := b.Publish("test", "test")
|
||||
require.Nil(t, err)
|
||||
err = b.Publish("test", "test")
|
||||
require.Nil(t, err)
|
||||
},
|
||||
expectedOutput: map[string]int{"s1": 2},
|
||||
},
|
||||
{
|
||||
inputSubscribers: func(b *Bus, output chan map[string]int) {
|
||||
s1, err := b.Subscribe("test", "s1", 1024)
|
||||
require.Nil(t, err)
|
||||
s2, err := b.Subscribe("test", "s2", 1024)
|
||||
require.Nil(t, err)
|
||||
go func() {
|
||||
r := make(map[string]int)
|
||||
for {
|
||||
select {
|
||||
case e := <-s1:
|
||||
require.Equal(t, "test", e.ChannelName)
|
||||
r["s1"]++
|
||||
case e := <-s2:
|
||||
require.Equal(t, "test", e.ChannelName)
|
||||
r["s2"]++
|
||||
case <-time.After(5 * time.Millisecond):
|
||||
output <- r
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
},
|
||||
inputPublish: func(b *Bus) {
|
||||
err := b.Publish("test", "test")
|
||||
require.Nil(t, err)
|
||||
err = b.Publish("test", "test")
|
||||
require.Nil(t, err)
|
||||
},
|
||||
expectedOutput: map[string]int{"s1": 2, "s2": 2},
|
||||
},
|
||||
{
|
||||
inputSubscribers: func(b *Bus, output chan map[string]int) {
|
||||
s1, err := b.Subscribe("test", "s1", 1024)
|
||||
require.Nil(t, err)
|
||||
s2, err := b.Subscribe("test2", "s2", 1024)
|
||||
require.Nil(t, err)
|
||||
go func() {
|
||||
r := make(map[string]int)
|
||||
for {
|
||||
select {
|
||||
case e := <-s1:
|
||||
require.Equal(t, "test", e.ChannelName)
|
||||
r["s1"]++
|
||||
case e := <-s2:
|
||||
require.Equal(t, "test2", e.ChannelName)
|
||||
r["s2"]++
|
||||
case <-time.After(5 * time.Millisecond):
|
||||
output <- r
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
},
|
||||
inputPublish: func(b *Bus) {
|
||||
err := b.Publish("test", "test")
|
||||
require.Nil(t, err)
|
||||
err = b.Publish("test", "test")
|
||||
require.Nil(t, err)
|
||||
},
|
||||
expectedOutput: map[string]int{"s1": 2},
|
||||
},
|
||||
{
|
||||
inputSubscribers: func(b *Bus, output chan map[string]int) {
|
||||
s1, err := b.Subscribe("test", "s1", 1024)
|
||||
require.Nil(t, err)
|
||||
s2, err := b.Subscribe("test", "s2", 1)
|
||||
require.Nil(t, err)
|
||||
go func() {
|
||||
r := make(map[string]int)
|
||||
e := <-s2
|
||||
require.Equal(t, "test", e.ChannelName)
|
||||
r["s2"]++
|
||||
for {
|
||||
select {
|
||||
case e := <-s1:
|
||||
require.Equal(t, "test", e.ChannelName)
|
||||
r["s1"]++
|
||||
case <-time.After(5 * time.Millisecond):
|
||||
output <- r
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
},
|
||||
inputPublish: func(b *Bus) {
|
||||
err := b.Publish("test", "test")
|
||||
require.Nil(t, err)
|
||||
err = b.Publish("test", "test")
|
||||
require.Nil(t, err)
|
||||
},
|
||||
expectedOutput: map[string]int{"s1": 2, "s2": 1},
|
||||
},
|
||||
{
|
||||
inputSubscribers: func(b *Bus, output chan map[string]int) {
|
||||
ch, err := b.Subscribe("test", "s1", 1024)
|
||||
require.Nil(t, err)
|
||||
go func() {
|
||||
var mu sync.Mutex
|
||||
r := make(map[string]int)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(3)
|
||||
w := func(r map[string]int, n string) {
|
||||
e := <-ch
|
||||
require.Equal(t, "test", e.ChannelName)
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
r[n]++
|
||||
wg.Done()
|
||||
}
|
||||
go w(r, "s1")
|
||||
go w(r, "s2")
|
||||
go w(r, "s3")
|
||||
wg.Wait()
|
||||
output <- r
|
||||
}()
|
||||
},
|
||||
inputPublish: func(b *Bus) {
|
||||
go func(b *Bus) {
|
||||
err := b.Publish("test", "test")
|
||||
require.Nil(t, err)
|
||||
err = b.Publish("test", "test")
|
||||
require.Nil(t, err)
|
||||
err = b.Publish("test", "test")
|
||||
require.Nil(t, err)
|
||||
}(b)
|
||||
},
|
||||
expectedOutput: map[string]int{"s1": 1, "s2": 1, "s3": 1},
|
||||
},
|
||||
{
|
||||
inputSubscribers: func(b *Bus, output chan map[string]int) {
|
||||
go func() {
|
||||
r := make(map[string]int)
|
||||
output <- r
|
||||
}()
|
||||
},
|
||||
inputPublish: func(b *Bus) {
|
||||
err := b.Publish("test", "test")
|
||||
require.Equal(t, errors.New("channel test don't exists"), err)
|
||||
},
|
||||
expectedOutput: map[string]int{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
b := New()
|
||||
ch := make(chan map[string]int)
|
||||
test.inputSubscribers(b, ch)
|
||||
test.inputPublish(b)
|
||||
|
||||
received := <-ch
|
||||
|
||||
require.Equal(t, test.expectedOutput, received)
|
||||
|
||||
}
|
||||
}
|
38
bus/events.go
Normal file
38
bus/events.go
Normal file
@ -0,0 +1,38 @@
|
||||
package bus
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Event is used to transport user data over bus.
|
||||
type Event struct {
|
||||
ChannelName string
|
||||
Payload interface{}
|
||||
}
|
||||
|
||||
// eventChannel stores subscriptions for channels.
|
||||
type eventChannel struct {
|
||||
subscribers map[string]chan Event
|
||||
}
|
||||
|
||||
// newEventChannel creates new eventChannel.
|
||||
func newEventChannel() *eventChannel {
|
||||
return &eventChannel{
|
||||
subscribers: make(map[string]chan Event),
|
||||
}
|
||||
}
|
||||
|
||||
// addSubscriber adds new subscriber.
|
||||
func (ch *eventChannel) addSubscriber(subName string, size int) error {
|
||||
if _, ok := ch.subscribers[subName]; ok {
|
||||
return fmt.Errorf("subscriber %s already exists", subName)
|
||||
}
|
||||
ch.subscribers[subName] = make(chan Event, size)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// delSubscriber removes subscriber.
|
||||
func (ch *eventChannel) delSubscriber(subName string) {
|
||||
delete(ch.subscribers, subName)
|
||||
}
|
31
bus/messages.go
Normal file
31
bus/messages.go
Normal file
@ -0,0 +1,31 @@
|
||||
package bus
|
||||
|
||||
import (
|
||||
"github.com/alex-savin/go-receipt-tracker/models"
|
||||
tele "gopkg.in/telebot.v4"
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
TBCmd *models.TelegramBotCommand
|
||||
TBParseMode string
|
||||
TbContext tele.Context //
|
||||
Text string //
|
||||
Image *Image //
|
||||
InlineKeyboard *tele.ReplyMarkup //
|
||||
ReplyType string //
|
||||
}
|
||||
|
||||
type Image struct {
|
||||
ID string //
|
||||
Filename string //
|
||||
Type string //
|
||||
Base64 string //
|
||||
Caption byte //
|
||||
Parsed map[string]string //
|
||||
}
|
||||
|
||||
type ConnectionStatus struct {
|
||||
WorkerID string
|
||||
WorkerType string
|
||||
IsConnected bool
|
||||
}
|
Reference in New Issue
Block a user