// SPDX-License-Identifier: MIT // SPDX-FileCopyrightText: 2023 mysubarumq // SPDX-FileContributor: alex-savin package consul import ( "bytes" "errors" "fmt" "net" "os" app "git.savin.nyc/alex/mysubaru-mq/app" "github.com/hashicorp/consul/api" ) // Options contains the configuration/rules data for the auth ledger. type Options struct { Hostname string Port int // Default port 8500 DC string ServiceID string NodeName string Tags []string LocalHost string LocalPort int } // AllowHook is an authentication hook which allows connection access // for all users and read and write access to all topics. type Hook struct { app.HookBase config *Options consul *consul } type consul struct { client *api.Client registration *api.AgentServiceRegistration lock *api.Lock } // ID returns the ID of the hook. func (h *Hook) ID() string { return "consul" } // Provides indicates which hook methods this hook provides. func (h *Hook) Provides(b byte) bool { return bytes.Contains([]byte{ app.OnStarted, app.OnStopped, }, []byte{b}) } func (h *Hook) Init(config any) error { if _, ok := config.(*Options); !ok && config != nil { return app.ErrInvalidConfigType } if config == nil { config = new(Options) } h.config = config.(*Options) h.consul = &consul{} conf := api.Config{ Address: fmt.Sprintf("%s:%d", h.config.Hostname, h.config.Port), Scheme: "https", Datacenter: h.config.DC, } client, err := api.NewClient(&conf) if err != nil { return err } h.consul.client = client address := getHostname() ip, _ := externalIP() h.Log.Debug("network information", "ip", ip, "address", address) service := &api.AgentServiceRegistration{ ID: h.config.ServiceID, // ServiceID Name: h.config.NodeName, // Server Name Address: h.config.LocalHost, Port: h.config.LocalPort, Tags: h.config.Tags, Checks: []*api.AgentServiceCheck{ // If fabio doesn't exit cleanly, it doesn't auto-deregister itself // from Consul. In order to address this, we introduce a TTL check // to confirm that the fabio instance is alive and able to route // this service. // // The TTL check must be refreshed before its timeout is crossed, // otherwise the check fails. If the check fails, Consul considers // this service to have become unhealthy. If the check is failing // (critical) for the DeregisterCriticalServiceAfter duration, the // Consul reaper will remove it from Consul. // // For more info, read https://www.consul.io/api/agent/check.html#deregistercriticalserviceafter. // { // CheckID: computeServiceTTLCheckId(serviceID), // TTL: "10s", // DeregisterCriticalServiceAfter: "10s", // }, // HTTP check is meant to confirm fabio health endpoint is // reachable from the Consul agent. If the check fails, Consul // considers this service to have become unhealthy. { HTTP: fmt.Sprintf("http://%s:%d/healthcheck", h.config.LocalHost, h.config.LocalPort), Interval: "10s", Timeout: "30s", }, }, } h.consul.registration = service return nil } // OnStarted returns . func (h *Hook) OnStarted() { err := h.consul.client.Agent().ServiceRegister(h.consul.registration) if err != nil { h.Log.Error("failed to register service at consul") } path := "services/" + h.config.NodeName + "/members/" + h.config.ServiceID p := &api.KVPair{ Key: path, Value: []byte("1000"), Flags: api.LockFlagValue, } _, err = h.consul.client.KV().Put(p, nil) if err != nil { h.Log.Error("cannot put the consul kv", "error", err) } h.consul.client.LockOpts(&api.LockOptions{ Key: path, }) h.consul.lock, err = h.consul.client.LockKey(path) if err != nil { h.Log.Error("cannot lock the consul kv", "error", err) } _, err = h.consul.lock.Lock(nil) if err != nil { h.Log.Error("cannot acquire the consul lock", "error", err) } } // OnStopped . func (h *Hook) OnStopped() { err := h.consul.client.Agent().ServiceDeregister(h.consul.registration.ID) if err != nil { h.Log.Error("failed to deregister service at consul") } err = h.consul.lock.Unlock() if err != nil { h.Log.Error("cannot unlock the consul kv", "error", err) } path := "services/" + h.config.NodeName + "/members/" + h.config.ServiceID _, err = h.consul.client.KV().Delete(path, nil) if err != nil { h.Log.Error("cannot delete the consul kv", "error", err) } } func getHostname() (hostname string) { hostname, _ = os.Hostname() return } func externalIP() (string, error) { ifaces, err := net.Interfaces() if err != nil { return "", err } for _, iface := range ifaces { if iface.Flags&net.FlagUp == 0 { continue // interface down } if iface.Flags&net.FlagLoopback != 0 { continue // loopback interface } addrs, err := iface.Addrs() if err != nil { return "", err } for _, addr := range addrs { var ip net.IP switch v := addr.(type) { case *net.IPNet: ip = v.IP case *net.IPAddr: ip = v.IP } if ip == nil || ip.IsLoopback() { continue } ip = ip.To4() if ip == nil { continue // not an ipv4 address } return ip.String(), nil } } return "", errors.New("are you connected to the network?") }