Files
go-npfd-printer/main.go
2025-02-03 16:58:00 -05:00

419 lines
12 KiB
Go

package main
import (
"bufio"
"context"
"fmt"
"html"
"os"
"os/signal"
"regexp"
"sync"
"time"
"git.savin.nyc/alex/go-npfd-printer/parser"
"git.savin.nyc/alex/go-npfd-printer/printer"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/jackc/pgx/v4"
logrus "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"google.golang.org/api/gmail/v1"
)
var log = logrus.New()
var config Config
var conn *pgx.Conn
var prntr *printer.Escpos
var readerP *bufio.Reader
var writerP *bufio.Writer
var readWriter *bufio.ReadWriter
var ctx context.Context
// Config .
type Config struct {
MQTT struct {
Host string `json:"host"`
Port int `json:"port"`
Username string `json:"username"`
Password string `json:"password"`
ClientId string `json:"clientid"`
Retained bool `json:"retained"`
Topic string `json:"topic"`
} `json:"mqtt"`
Postgres struct {
Host string `json:"host"`
Port int `json:"port"`
Username string `json:"username"`
Password string `json:"password"`
Database string `json:"database"`
} `json:"postgres"`
Update time.Duration `json:"update"`
Printer string `json:"printer"`
Log struct {
Level string `json:"level"`
} `json:"log"`
}
// gMSG .
type gMSG struct {
gmailID string
date string // retrieved from message header
snippet string
}
// mqttConnect .
func mqttConnect(wg *sync.WaitGroup) mqtt.Client {
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", config.MQTT.Host, config.MQTT.Port))
opts.SetUsername(config.MQTT.Username)
opts.SetPassword(config.MQTT.Password)
opts.SetClientID(config.MQTT.ClientId)
// opts.OnConnect = func(m mqtt.Client) {
// log.Printf("MQTT Client is connected to a MQTT Broker\n")
// starter(wg, m)
// }
opts.OnConnect = func(m mqtt.Client) {
log.Printf("MQTT Client is connected to a MQTT Broker\n")
starter(wg, m)
if token := m.Subscribe(config.MQTT.Topic, byte(0), onMessageReceived); token.Wait() && token.Error() != nil {
panic(token.Error())
}
}
client := mqtt.NewClient(opts)
token := client.Connect()
for !token.WaitTimeout(3 * time.Second) {
}
if err := token.Error(); err != nil {
log.Fatalf("Couldn't connect to a MQTT Broker (%s)\n", err.Error())
os.Exit(1)
}
return client
}
// onMessageReceived .
func onMessageReceived(m mqtt.Client, message mqtt.Message) {
log.Printf("TOPIC: %s\n", message.Topic())
log.Printf("MSG: %s\n", message.Payload())
re := regexp.MustCompile(`^\{\"time\"\:(?P<time>[0-9]*)(?:\,\"value\"\:)?(?P<value>[0-5]+)?\}$`)
if re.Match([]byte(message.Payload())) {
groups := re.SubexpNames()
result := re.FindAllStringSubmatch(string(message.Payload()), -1)
rt := map[string]string{}
for i, n := range result[0] {
rt[groups[i]] = n
}
log.Printf("PARESED MESSAGE: %+v\n", rt)
if rt["value"] == "0" {
log.Printf("LET'S PRINT IT\n")
toPrinter()
}
}
}
// starter .
func starter(wg *sync.WaitGroup, m mqtt.Client) {
log.Println("Executing a starter func connection...")
wg.Add(1)
go scanner(wg, m)
}
// reverse .
func reverse(msgs []gMSG) []gMSG {
newMsgs := make([]gMSG, 0, len(msgs))
for i := len(msgs) - 1; i >= 0; i-- {
newMsgs = append(newMsgs, msgs[i])
}
return newMsgs
}
// scanner .
func scanner(wg *sync.WaitGroup, m mqtt.Client) {
defer wg.Done()
srv := parser.Client()
ticker := time.NewTicker(180 * time.Second)
defer ticker.Stop()
log.Println("Setup GMail message scanner...")
for {
select {
case <-ticker.C:
log.Print("Updating messages...\n")
msgs := []gMSG{}
pageToken := ""
for {
req := srv.Users.Messages.List("me").LabelIds("UNREAD").Q("is:unread").Q("from:messaging@iamresponding.com")
// req := srv.Users.Messages.List("me").Q("from:messaging@iamresponding.com")
if pageToken != "" {
req.PageToken(pageToken)
}
r, err := req.Do()
if err != nil {
log.Fatalf("Unable to retrieve messages: %v", err)
}
log.Printf("Processing %v messages...\n", len(r.Messages))
for _, m := range r.Messages {
msg, err := srv.Users.Messages.Get("me", m.Id).Format("full").Do()
if err != nil {
log.Fatalf("Unable to retrieve message %v: %v", m.Id, err)
}
date := ""
for _, h := range msg.Payload.Headers {
if h.Name == "Date" {
date = h.Value
}
// break
}
msgs = append(msgs, gMSG{
gmailID: msg.Id,
date: date,
snippet: html.UnescapeString(msg.Snippet),
})
}
if r.NextPageToken == "" {
break
}
pageToken = r.NextPageToken
}
msgs = reverse(msgs)
count, deleted := 0, 0
for _, m := range msgs {
count++
re := regexp.MustCompile(`^(?P<address>[^\s\[\[].*)\s\[\[(?P<city>[^\]\]].*)\]\]\s\((?P<type>[^\)].*)\)\s\-\s(?P<description>.*?)(?P<phone>\(\d{3}\)\s\d{3}\-\d{4})?\s?(?:F\d{9})?\s?(?:\d{7})?\s?(?P<time>\d{2}\:\d{2})?$`)
if re.Match([]byte(m.snippet)) {
groups := re.SubexpNames()
result := re.FindAllStringSubmatch(m.snippet, -1)
rt := map[string]string{}
for i, n := range result[0] {
rt[groups[i]] = n
}
// fmt.Printf("%v >> %+v\n", m.gmailID, rt)
_, err := srv.Users.Messages.Modify("me", m.gmailID, &gmail.ModifyMessageRequest{RemoveLabelIds: []string{"UNREAD"}}).Do()
if err != nil {
log.Fatalf("unable to modify message %v: %v", m.gmailID, err)
}
// log.Printf("Modified message %v.\n", msg)
var mutual_aid bool
re := regexp.MustCompile(`^([\*]+).*$`)
if re.Match([]byte(rt["address"])) {
mutual_aid = true
} else {
mutual_aid = false
}
saveCallData(m.gmailID, rt["address"], rt["city"], rt["type"], rt["description"], rt["phone"], m.snippet, m.date, mutual_aid)
} else {
log.Printf("Couldn't parse a Gmail Message... (%s)\n", m.gmailID)
_, err := srv.Users.Messages.Modify("me", m.gmailID, &gmail.ModifyMessageRequest{RemoveLabelIds: []string{"UNREAD"}, AddLabelIds: []string{"Label_2438996450476414779"}}).Do()
if err != nil {
log.Fatalf("unable to modify message %v: %v", m.gmailID, err)
}
}
}
log.Printf("Done. %v messages processed, %v deleted\n", count, deleted)
case <-ctx.Done():
log.Print("Caller has told us to stop\n")
return
}
}
}
func saveCallData(gid, iaddress, icity, itype, idescription, iphone, snippet, itime string, imutual_iad bool) error {
_, err := conn.Exec(context.Background(), "INSERT INTO incidents(gmessage_id, address, city, type, description, phone, mutual_aid, snippet, time, created_on) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", gid, iaddress, icity, itype, idescription, iphone, imutual_iad, snippet, itime, time.Now())
return err
}
func toPrinter() {
fmt.Print("I'm in the Printer\n")
var address, city, itype, description, phone, itime string
// var mutual_aid bool
// var created_on time.Time
var err error
// err = conn.QueryRow(context.Background(), "SELECT address, city, type, description, phone, time FROM incidents WHERE inc_id=$1", 566).Scan(&address, &city, &itype, &description, &phone, &itime)
err = conn.QueryRow(context.Background(), "SELECT address, city, type, description, phone, time FROM incidents ORDER BY created_on DESC LIMIT 1").Scan(&address, &city, &itype, &description, &phone, &itime)
if err != nil {
fmt.Fprintf(os.Stderr, "QueryRow failed: %v\n", err)
os.Exit(1)
}
prntr.Init()
prntr.SetSmooth(1)
prntr.SetFontSize(2, 3)
prntr.SetFont("A")
prntr.Formfeed()
prntr.Write(" NEW PROVIDENCE FIRE DEPT")
prntr.Formfeed()
prntr.SetFont("B")
prntr.Write(itime)
prntr.Formfeed()
if len(address) > 0 {
prntr.Write("--- ADDRESS ------------------------------")
prntr.SetFont("A")
prntr.Write(string(address))
prntr.Formfeed()
} else {
fmt.Print("There is no address value\n")
}
if len(city) > 0 {
prntr.SetFont("B")
prntr.Write("--- CITY ---------------------------------")
prntr.SetFont("A")
prntr.Write(string(city))
prntr.Formfeed()
} else {
fmt.Print("There is no city value\n")
}
if len(itype) > 0 {
prntr.SetFont("B")
prntr.Write("--- TYPE ---------------------------------")
prntr.SetFont("A")
prntr.Write(string(itype))
prntr.Formfeed()
} else {
fmt.Print("There is no type value\n")
}
if len(description) > 0 {
prntr.SetFont("B")
prntr.Write("--- DESCRIPTION --------------------------")
prntr.SetFont("A")
prntr.Write(string(description))
prntr.Formfeed()
} else {
fmt.Print("There is no description value\n")
}
if len(phone) > 0 {
prntr.SetFont("B")
prntr.Write("--- PHONE --------------------------------")
prntr.SetFont("A")
prntr.Write(string(phone))
prntr.Formfeed()
} else {
fmt.Print("There is no phone value\n")
}
// if value, ok := rt["time"]; ok {
// prntr.Write("Time: " + value)
// prntr.Formfeed()
// } else {
// fmt.Print("There is no time value\n")
// }
prntr.FormfeedN(3)
// prntr.Cut()
prntr.End()
writerP.Flush()
readWriter.Flush()
log.Print("Printed message.\n")
}
func main() {
viper.SetConfigName("config") // name of config file (without extension)
viper.SetConfigType("yml") // REQUIRED if the config file does not have the extension in the name
// viper.AddConfigPath("/data/") //
viper.AddConfigPath(".") // optionally look for config in the working directory
err := viper.ReadInConfig() // Find and read the config file
if err != nil { // Handle errors reading the config file
log.Fatalf("Fatal error config file: %s \n", err.Error())
os.Exit(1)
}
// viper.WatchConfig()
// viper.OnConfigChange(func(e fsnotify.Event) {
// log.Println("Config file changed:", e.Name)
// })
if err := viper.Unmarshal(&config); err != nil {
log.Fatalf("Fatal error config file: %s \n", err.Error())
os.Exit(1)
}
logLevel, err := logrus.ParseLevel(viper.GetString("log.level"))
if err != nil {
logLevel = logrus.DebugLevel
}
switch viper.GetString("log.level") {
case "debug":
log.SetLevel(logrus.DebugLevel)
case "info":
log.SetLevel(logrus.InfoLevel)
case "warn":
log.SetLevel(logrus.WarnLevel)
case "error":
log.SetLevel(logrus.ErrorLevel)
default:
log.SetLevel(logrus.DebugLevel)
log.Warnf("Invalid log level supplied: '%s'", logLevel)
}
customFormatter := new(logrus.TextFormatter)
customFormatter.TimestampFormat = "2006-01-02 15:04:05"
customFormatter.FullTimestamp = true
log.SetFormatter(customFormatter)
log.Printf("CONFIG: %+v", config)
// create a context that we can cancel
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
conn, err = pgx.Connect(context.Background(), fmt.Sprintf("postgres://%s:%s@%s:%d/%s", config.Postgres.Username, config.Postgres.Password, config.Postgres.Host, config.Postgres.Port, config.Postgres.Database))
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
os.Exit(1)
}
log.Println("Setup PostgreSQL connection...")
defer conn.Close(context.Background())
/// PRINTER START
printerFile, err := os.OpenFile(config.Printer, os.O_RDWR, 0)
if err != nil {
panic(err)
}
defer printerFile.Close()
readerP = bufio.NewReader(printerFile)
writerP = bufio.NewWriter(printerFile)
readWriter = bufio.NewReadWriter(readerP, writerP)
prntr = printer.New(readWriter)
/// PRINTER END
// a WaitGroup for the goroutines to tell us they've stopped
wg := sync.WaitGroup{}
mqttClient := mqttConnect(&wg)
defer mqttClient.Disconnect(250)
// listen for C-c
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
log.Println("Main: received CTRL-c - shutting down")
// tell the goroutines to stop
log.Println("Main: telling goroutines to stop")
cancel()
// and wait for them both to reply back
wg.Wait()
log.Println("Main: all goroutines have told us they've finished")
}