Files
2025-05-21 13:51:10 -04:00

219 lines
5.2 KiB
Go

// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mysubarumq
// SPDX-FileContributor: alex-savin
package consul
import (
"bytes"
"errors"
"fmt"
"net"
"os"
app "git.savin.nyc/alex/mysubaru-mq/app"
"github.com/hashicorp/consul/api"
)
// Options contains the configuration/rules data for the auth ledger.
type Options struct {
Hostname string
Port int // Default port 8500
DC string
ServiceID string
NodeName string
Tags []string
LocalHost string
LocalPort int
}
// AllowHook is an authentication hook which allows connection access
// for all users and read and write access to all topics.
type Hook struct {
app.HookBase
config *Options
consul *consul
}
type consul struct {
client *api.Client
registration *api.AgentServiceRegistration
lock *api.Lock
}
// ID returns the ID of the hook.
func (h *Hook) ID() string {
return "consul"
}
// Provides indicates which hook methods this hook provides.
func (h *Hook) Provides(b byte) bool {
return bytes.Contains([]byte{
app.OnStarted,
app.OnStopped,
}, []byte{b})
}
func (h *Hook) Init(config any) error {
if _, ok := config.(*Options); !ok && config != nil {
return app.ErrInvalidConfigType
}
if config == nil {
config = new(Options)
}
h.config = config.(*Options)
h.consul = &consul{}
conf := api.Config{
Address: fmt.Sprintf("%s:%d", h.config.Hostname, h.config.Port),
Scheme: "https",
Datacenter: h.config.DC,
}
client, err := api.NewClient(&conf)
if err != nil {
return err
}
h.consul.client = client
address := getHostname()
ip, _ := externalIP()
h.Log.Debug("network information", "ip", ip, "address", address)
service := &api.AgentServiceRegistration{
ID: h.config.ServiceID, // ServiceID
Name: h.config.NodeName, // Server Name
Address: h.config.LocalHost,
Port: h.config.LocalPort,
Tags: h.config.Tags,
Checks: []*api.AgentServiceCheck{
// If fabio doesn't exit cleanly, it doesn't auto-deregister itself
// from Consul. In order to address this, we introduce a TTL check
// to confirm that the fabio instance is alive and able to route
// this service.
//
// The TTL check must be refreshed before its timeout is crossed,
// otherwise the check fails. If the check fails, Consul considers
// this service to have become unhealthy. If the check is failing
// (critical) for the DeregisterCriticalServiceAfter duration, the
// Consul reaper will remove it from Consul.
//
// For more info, read https://www.consul.io/api/agent/check.html#deregistercriticalserviceafter.
// {
// CheckID: computeServiceTTLCheckId(serviceID),
// TTL: "10s",
// DeregisterCriticalServiceAfter: "10s",
// },
// HTTP check is meant to confirm fabio health endpoint is
// reachable from the Consul agent. If the check fails, Consul
// considers this service to have become unhealthy.
{
HTTP: fmt.Sprintf("http://%s:%d/healthcheck", h.config.LocalHost, h.config.LocalPort),
Interval: "10s",
Timeout: "30s",
},
},
}
h.consul.registration = service
return nil
}
// OnStarted returns .
func (h *Hook) OnStarted() {
err := h.consul.client.Agent().ServiceRegister(h.consul.registration)
if err != nil {
h.Log.Error("failed to register service at consul")
}
path := "services/" + h.config.NodeName + "/members/" + h.config.ServiceID
p := &api.KVPair{
Key: path,
Value: []byte("1000"),
Flags: api.LockFlagValue,
}
_, err = h.consul.client.KV().Put(p, nil)
if err != nil {
h.Log.Error("cannot put the consul kv", "error", err)
}
h.consul.client.LockOpts(&api.LockOptions{
Key: path,
})
h.consul.lock, err = h.consul.client.LockKey(path)
if err != nil {
h.Log.Error("cannot lock the consul kv", "error", err)
}
_, err = h.consul.lock.Lock(nil)
if err != nil {
h.Log.Error("cannot acquire the consul lock", "error", err)
}
}
// OnStopped .
func (h *Hook) OnStopped() {
err := h.consul.client.Agent().ServiceDeregister(h.consul.registration.ID)
if err != nil {
h.Log.Error("failed to deregister service at consul")
}
err = h.consul.lock.Unlock()
if err != nil {
h.Log.Error("cannot unlock the consul kv", "error", err)
}
path := "services/" + h.config.NodeName + "/members/" + h.config.ServiceID
_, err = h.consul.client.KV().Delete(path, nil)
if err != nil {
h.Log.Error("cannot delete the consul kv", "error", err)
}
}
func getHostname() (hostname string) {
hostname, _ = os.Hostname()
return
}
func externalIP() (string, error) {
ifaces, err := net.Interfaces()
if err != nil {
return "", err
}
for _, iface := range ifaces {
if iface.Flags&net.FlagUp == 0 {
continue // interface down
}
if iface.Flags&net.FlagLoopback != 0 {
continue // loopback interface
}
addrs, err := iface.Addrs()
if err != nil {
return "", err
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip == nil || ip.IsLoopback() {
continue
}
ip = ip.To4()
if ip == nil {
continue // not an ipv4 address
}
return ip.String(), nil
}
}
return "", errors.New("are you connected to the network?")
}