419 lines
12 KiB
Go
419 lines
12 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"html"
|
|
"os"
|
|
"os/signal"
|
|
"regexp"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.savin.nyc/alex/go-npfd-printer/parser"
|
|
"git.savin.nyc/alex/go-npfd-printer/printer"
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
"github.com/jackc/pgx/v4"
|
|
logrus "github.com/sirupsen/logrus"
|
|
"github.com/spf13/viper"
|
|
"google.golang.org/api/gmail/v1"
|
|
)
|
|
|
|
var log = logrus.New()
|
|
var config Config
|
|
var conn *pgx.Conn
|
|
var prntr *printer.Escpos
|
|
var readerP *bufio.Reader
|
|
var writerP *bufio.Writer
|
|
var readWriter *bufio.ReadWriter
|
|
var ctx context.Context
|
|
|
|
// Config .
|
|
type Config struct {
|
|
MQTT struct {
|
|
Host string `json:"host"`
|
|
Port int `json:"port"`
|
|
Username string `json:"username"`
|
|
Password string `json:"password"`
|
|
ClientId string `json:"clientid"`
|
|
Retained bool `json:"retained"`
|
|
Topic string `json:"topic"`
|
|
} `json:"mqtt"`
|
|
Postgres struct {
|
|
Host string `json:"host"`
|
|
Port int `json:"port"`
|
|
Username string `json:"username"`
|
|
Password string `json:"password"`
|
|
Database string `json:"database"`
|
|
} `json:"postgres"`
|
|
Update time.Duration `json:"update"`
|
|
Printer string `json:"printer"`
|
|
Log struct {
|
|
Level string `json:"level"`
|
|
} `json:"log"`
|
|
}
|
|
|
|
// gMSG .
|
|
type gMSG struct {
|
|
gmailID string
|
|
date string // retrieved from message header
|
|
snippet string
|
|
}
|
|
|
|
// mqttConnect .
|
|
func mqttConnect(wg *sync.WaitGroup) mqtt.Client {
|
|
opts := mqtt.NewClientOptions()
|
|
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", config.MQTT.Host, config.MQTT.Port))
|
|
opts.SetUsername(config.MQTT.Username)
|
|
opts.SetPassword(config.MQTT.Password)
|
|
opts.SetClientID(config.MQTT.ClientId)
|
|
// opts.OnConnect = func(m mqtt.Client) {
|
|
// log.Printf("MQTT Client is connected to a MQTT Broker\n")
|
|
// starter(wg, m)
|
|
// }
|
|
opts.OnConnect = func(m mqtt.Client) {
|
|
log.Printf("MQTT Client is connected to a MQTT Broker\n")
|
|
starter(wg, m)
|
|
if token := m.Subscribe(config.MQTT.Topic, byte(0), onMessageReceived); token.Wait() && token.Error() != nil {
|
|
panic(token.Error())
|
|
}
|
|
}
|
|
client := mqtt.NewClient(opts)
|
|
token := client.Connect()
|
|
for !token.WaitTimeout(3 * time.Second) {
|
|
}
|
|
if err := token.Error(); err != nil {
|
|
log.Fatalf("Couldn't connect to a MQTT Broker (%s)\n", err.Error())
|
|
os.Exit(1)
|
|
}
|
|
|
|
return client
|
|
}
|
|
|
|
// onMessageReceived .
|
|
func onMessageReceived(m mqtt.Client, message mqtt.Message) {
|
|
log.Printf("TOPIC: %s\n", message.Topic())
|
|
log.Printf("MSG: %s\n", message.Payload())
|
|
re := regexp.MustCompile(`^\{\"time\"\:(?P<time>[0-9]*)(?:\,\"value\"\:)?(?P<value>[0-5]+)?\}$`)
|
|
if re.Match([]byte(message.Payload())) {
|
|
|
|
groups := re.SubexpNames()
|
|
result := re.FindAllStringSubmatch(string(message.Payload()), -1)
|
|
|
|
rt := map[string]string{}
|
|
for i, n := range result[0] {
|
|
rt[groups[i]] = n
|
|
}
|
|
log.Printf("PARESED MESSAGE: %+v\n", rt)
|
|
if rt["value"] == "0" {
|
|
log.Printf("LET'S PRINT IT\n")
|
|
toPrinter()
|
|
}
|
|
}
|
|
}
|
|
|
|
// starter .
|
|
func starter(wg *sync.WaitGroup, m mqtt.Client) {
|
|
|
|
log.Println("Executing a starter func connection...")
|
|
|
|
wg.Add(1)
|
|
go scanner(wg, m)
|
|
}
|
|
|
|
// reverse .
|
|
func reverse(msgs []gMSG) []gMSG {
|
|
newMsgs := make([]gMSG, 0, len(msgs))
|
|
for i := len(msgs) - 1; i >= 0; i-- {
|
|
newMsgs = append(newMsgs, msgs[i])
|
|
}
|
|
|
|
return newMsgs
|
|
}
|
|
|
|
// scanner .
|
|
func scanner(wg *sync.WaitGroup, m mqtt.Client) {
|
|
defer wg.Done()
|
|
|
|
srv := parser.Client()
|
|
|
|
ticker := time.NewTicker(180 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
log.Println("Setup GMail message scanner...")
|
|
for {
|
|
select {
|
|
|
|
case <-ticker.C:
|
|
log.Print("Updating messages...\n")
|
|
msgs := []gMSG{}
|
|
pageToken := ""
|
|
for {
|
|
req := srv.Users.Messages.List("me").LabelIds("UNREAD").Q("is:unread").Q("from:messaging@iamresponding.com")
|
|
// req := srv.Users.Messages.List("me").Q("from:messaging@iamresponding.com")
|
|
if pageToken != "" {
|
|
req.PageToken(pageToken)
|
|
}
|
|
r, err := req.Do()
|
|
if err != nil {
|
|
log.Fatalf("Unable to retrieve messages: %v", err)
|
|
}
|
|
|
|
log.Printf("Processing %v messages...\n", len(r.Messages))
|
|
for _, m := range r.Messages {
|
|
msg, err := srv.Users.Messages.Get("me", m.Id).Format("full").Do()
|
|
if err != nil {
|
|
log.Fatalf("Unable to retrieve message %v: %v", m.Id, err)
|
|
}
|
|
date := ""
|
|
for _, h := range msg.Payload.Headers {
|
|
if h.Name == "Date" {
|
|
date = h.Value
|
|
}
|
|
// break
|
|
}
|
|
|
|
msgs = append(msgs, gMSG{
|
|
gmailID: msg.Id,
|
|
date: date,
|
|
snippet: html.UnescapeString(msg.Snippet),
|
|
})
|
|
}
|
|
|
|
if r.NextPageToken == "" {
|
|
break
|
|
}
|
|
pageToken = r.NextPageToken
|
|
}
|
|
|
|
msgs = reverse(msgs)
|
|
|
|
count, deleted := 0, 0
|
|
for _, m := range msgs {
|
|
count++
|
|
|
|
re := regexp.MustCompile(`^(?P<address>[^\s\[\[].*)\s\[\[(?P<city>[^\]\]].*)\]\]\s\((?P<type>[^\)].*)\)\s\-\s(?P<description>.*?)(?P<phone>\(\d{3}\)\s\d{3}\-\d{4})?\s?(?:F\d{9})?\s?(?:\d{7})?\s?(?P<time>\d{2}\:\d{2})?$`)
|
|
if re.Match([]byte(m.snippet)) {
|
|
|
|
groups := re.SubexpNames()
|
|
result := re.FindAllStringSubmatch(m.snippet, -1)
|
|
|
|
rt := map[string]string{}
|
|
for i, n := range result[0] {
|
|
rt[groups[i]] = n
|
|
}
|
|
// fmt.Printf("%v >> %+v\n", m.gmailID, rt)
|
|
|
|
_, err := srv.Users.Messages.Modify("me", m.gmailID, &gmail.ModifyMessageRequest{RemoveLabelIds: []string{"UNREAD"}}).Do()
|
|
if err != nil {
|
|
log.Fatalf("unable to modify message %v: %v", m.gmailID, err)
|
|
}
|
|
// log.Printf("Modified message %v.\n", msg)
|
|
|
|
var mutual_aid bool
|
|
re := regexp.MustCompile(`^([\*]+).*$`)
|
|
if re.Match([]byte(rt["address"])) {
|
|
mutual_aid = true
|
|
} else {
|
|
mutual_aid = false
|
|
}
|
|
saveCallData(m.gmailID, rt["address"], rt["city"], rt["type"], rt["description"], rt["phone"], m.snippet, m.date, mutual_aid)
|
|
} else {
|
|
log.Printf("Couldn't parse a Gmail Message... (%s)\n", m.gmailID)
|
|
_, err := srv.Users.Messages.Modify("me", m.gmailID, &gmail.ModifyMessageRequest{RemoveLabelIds: []string{"UNREAD"}, AddLabelIds: []string{"Label_2438996450476414779"}}).Do()
|
|
if err != nil {
|
|
log.Fatalf("unable to modify message %v: %v", m.gmailID, err)
|
|
}
|
|
}
|
|
}
|
|
log.Printf("Done. %v messages processed, %v deleted\n", count, deleted)
|
|
|
|
case <-ctx.Done():
|
|
log.Print("Caller has told us to stop\n")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func saveCallData(gid, iaddress, icity, itype, idescription, iphone, snippet, itime string, imutual_iad bool) error {
|
|
_, err := conn.Exec(context.Background(), "INSERT INTO incidents(gmessage_id, address, city, type, description, phone, mutual_aid, snippet, time, created_on) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", gid, iaddress, icity, itype, idescription, iphone, imutual_iad, snippet, itime, time.Now())
|
|
return err
|
|
}
|
|
|
|
func toPrinter() {
|
|
|
|
fmt.Print("I'm in the Printer\n")
|
|
var address, city, itype, description, phone, itime string
|
|
// var mutual_aid bool
|
|
// var created_on time.Time
|
|
var err error
|
|
// err = conn.QueryRow(context.Background(), "SELECT address, city, type, description, phone, time FROM incidents WHERE inc_id=$1", 566).Scan(&address, &city, &itype, &description, &phone, &itime)
|
|
err = conn.QueryRow(context.Background(), "SELECT address, city, type, description, phone, time FROM incidents ORDER BY created_on DESC LIMIT 1").Scan(&address, &city, &itype, &description, &phone, &itime)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "QueryRow failed: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
prntr.Init()
|
|
prntr.SetSmooth(1)
|
|
prntr.SetFontSize(2, 3)
|
|
prntr.SetFont("A")
|
|
prntr.Formfeed()
|
|
prntr.Write(" NEW PROVIDENCE FIRE DEPT")
|
|
prntr.Formfeed()
|
|
prntr.SetFont("B")
|
|
prntr.Write(itime)
|
|
prntr.Formfeed()
|
|
if len(address) > 0 {
|
|
prntr.Write("--- ADDRESS ------------------------------")
|
|
prntr.SetFont("A")
|
|
prntr.Write(string(address))
|
|
prntr.Formfeed()
|
|
} else {
|
|
fmt.Print("There is no address value\n")
|
|
}
|
|
if len(city) > 0 {
|
|
prntr.SetFont("B")
|
|
prntr.Write("--- CITY ---------------------------------")
|
|
prntr.SetFont("A")
|
|
prntr.Write(string(city))
|
|
prntr.Formfeed()
|
|
} else {
|
|
fmt.Print("There is no city value\n")
|
|
}
|
|
if len(itype) > 0 {
|
|
prntr.SetFont("B")
|
|
prntr.Write("--- TYPE ---------------------------------")
|
|
prntr.SetFont("A")
|
|
prntr.Write(string(itype))
|
|
prntr.Formfeed()
|
|
} else {
|
|
fmt.Print("There is no type value\n")
|
|
}
|
|
if len(description) > 0 {
|
|
prntr.SetFont("B")
|
|
prntr.Write("--- DESCRIPTION --------------------------")
|
|
prntr.SetFont("A")
|
|
prntr.Write(string(description))
|
|
prntr.Formfeed()
|
|
} else {
|
|
fmt.Print("There is no description value\n")
|
|
}
|
|
if len(phone) > 0 {
|
|
prntr.SetFont("B")
|
|
prntr.Write("--- PHONE --------------------------------")
|
|
prntr.SetFont("A")
|
|
prntr.Write(string(phone))
|
|
prntr.Formfeed()
|
|
} else {
|
|
fmt.Print("There is no phone value\n")
|
|
}
|
|
// if value, ok := rt["time"]; ok {
|
|
// prntr.Write("Time: " + value)
|
|
// prntr.Formfeed()
|
|
// } else {
|
|
// fmt.Print("There is no time value\n")
|
|
// }
|
|
prntr.FormfeedN(3)
|
|
// prntr.Cut()
|
|
prntr.End()
|
|
writerP.Flush()
|
|
readWriter.Flush()
|
|
log.Print("Printed message.\n")
|
|
}
|
|
|
|
func main() {
|
|
viper.SetConfigName("config") // name of config file (without extension)
|
|
viper.SetConfigType("yml") // REQUIRED if the config file does not have the extension in the name
|
|
// viper.AddConfigPath("/data/") //
|
|
viper.AddConfigPath(".") // optionally look for config in the working directory
|
|
|
|
err := viper.ReadInConfig() // Find and read the config file
|
|
if err != nil { // Handle errors reading the config file
|
|
log.Fatalf("Fatal error config file: %s \n", err.Error())
|
|
os.Exit(1)
|
|
}
|
|
|
|
// viper.WatchConfig()
|
|
// viper.OnConfigChange(func(e fsnotify.Event) {
|
|
// log.Println("Config file changed:", e.Name)
|
|
// })
|
|
|
|
if err := viper.Unmarshal(&config); err != nil {
|
|
log.Fatalf("Fatal error config file: %s \n", err.Error())
|
|
os.Exit(1)
|
|
}
|
|
|
|
logLevel, err := logrus.ParseLevel(viper.GetString("log.level"))
|
|
if err != nil {
|
|
logLevel = logrus.DebugLevel
|
|
}
|
|
|
|
switch viper.GetString("log.level") {
|
|
case "debug":
|
|
log.SetLevel(logrus.DebugLevel)
|
|
case "info":
|
|
log.SetLevel(logrus.InfoLevel)
|
|
case "warn":
|
|
log.SetLevel(logrus.WarnLevel)
|
|
case "error":
|
|
log.SetLevel(logrus.ErrorLevel)
|
|
default:
|
|
log.SetLevel(logrus.DebugLevel)
|
|
log.Warnf("Invalid log level supplied: '%s'", logLevel)
|
|
}
|
|
|
|
customFormatter := new(logrus.TextFormatter)
|
|
customFormatter.TimestampFormat = "2006-01-02 15:04:05"
|
|
customFormatter.FullTimestamp = true
|
|
log.SetFormatter(customFormatter)
|
|
log.Printf("CONFIG: %+v", config)
|
|
|
|
// create a context that we can cancel
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
conn, err = pgx.Connect(context.Background(), fmt.Sprintf("postgres://%s:%s@%s:%d/%s", config.Postgres.Username, config.Postgres.Password, config.Postgres.Host, config.Postgres.Port, config.Postgres.Database))
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
log.Println("Setup PostgreSQL connection...")
|
|
defer conn.Close(context.Background())
|
|
|
|
/// PRINTER START
|
|
printerFile, err := os.OpenFile(config.Printer, os.O_RDWR, 0)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
defer printerFile.Close()
|
|
|
|
readerP = bufio.NewReader(printerFile)
|
|
writerP = bufio.NewWriter(printerFile)
|
|
readWriter = bufio.NewReadWriter(readerP, writerP)
|
|
prntr = printer.New(readWriter)
|
|
/// PRINTER END
|
|
|
|
// a WaitGroup for the goroutines to tell us they've stopped
|
|
wg := sync.WaitGroup{}
|
|
|
|
mqttClient := mqttConnect(&wg)
|
|
defer mqttClient.Disconnect(250)
|
|
|
|
// listen for C-c
|
|
c := make(chan os.Signal, 1)
|
|
signal.Notify(c, os.Interrupt)
|
|
<-c
|
|
log.Println("Main: received CTRL-c - shutting down")
|
|
|
|
// tell the goroutines to stop
|
|
log.Println("Main: telling goroutines to stop")
|
|
cancel()
|
|
|
|
// and wait for them both to reply back
|
|
wg.Wait()
|
|
log.Println("Main: all goroutines have told us they've finished")
|
|
}
|