initial commit
This commit is contained in:
418
main.go
Normal file
418
main.go
Normal file
@ -0,0 +1,418 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"html"
|
||||
"os"
|
||||
"os/signal"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.savin.nyc/alex/go-npfd-printer/parser"
|
||||
"git.savin.nyc/alex/go-npfd-printer/printer"
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/jackc/pgx/v4"
|
||||
logrus "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/viper"
|
||||
"google.golang.org/api/gmail/v1"
|
||||
)
|
||||
|
||||
var log = logrus.New()
|
||||
var config Config
|
||||
var conn *pgx.Conn
|
||||
var prntr *printer.Escpos
|
||||
var readerP *bufio.Reader
|
||||
var writerP *bufio.Writer
|
||||
var readWriter *bufio.ReadWriter
|
||||
var ctx context.Context
|
||||
|
||||
// Config .
|
||||
type Config struct {
|
||||
MQTT struct {
|
||||
Host string `json:"host"`
|
||||
Port int `json:"port"`
|
||||
Username string `json:"username"`
|
||||
Password string `json:"password"`
|
||||
ClientId string `json:"clientid"`
|
||||
Retained bool `json:"retained"`
|
||||
Topic string `json:"topic"`
|
||||
} `json:"mqtt"`
|
||||
Postgres struct {
|
||||
Host string `json:"host"`
|
||||
Port int `json:"port"`
|
||||
Username string `json:"username"`
|
||||
Password string `json:"password"`
|
||||
Database string `json:"database"`
|
||||
} `json:"postgres"`
|
||||
Update time.Duration `json:"update"`
|
||||
Printer string `json:"printer"`
|
||||
Log struct {
|
||||
Level string `json:"level"`
|
||||
} `json:"log"`
|
||||
}
|
||||
|
||||
// gMSG .
|
||||
type gMSG struct {
|
||||
gmailID string
|
||||
date string // retrieved from message header
|
||||
snippet string
|
||||
}
|
||||
|
||||
// mqttConnect .
|
||||
func mqttConnect(wg *sync.WaitGroup) mqtt.Client {
|
||||
opts := mqtt.NewClientOptions()
|
||||
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", config.MQTT.Host, config.MQTT.Port))
|
||||
opts.SetUsername(config.MQTT.Username)
|
||||
opts.SetPassword(config.MQTT.Password)
|
||||
opts.SetClientID(config.MQTT.ClientId)
|
||||
// opts.OnConnect = func(m mqtt.Client) {
|
||||
// log.Printf("MQTT Client is connected to a MQTT Broker\n")
|
||||
// starter(wg, m)
|
||||
// }
|
||||
opts.OnConnect = func(m mqtt.Client) {
|
||||
log.Printf("MQTT Client is connected to a MQTT Broker\n")
|
||||
starter(wg, m)
|
||||
if token := m.Subscribe(config.MQTT.Topic, byte(0), onMessageReceived); token.Wait() && token.Error() != nil {
|
||||
panic(token.Error())
|
||||
}
|
||||
}
|
||||
client := mqtt.NewClient(opts)
|
||||
token := client.Connect()
|
||||
for !token.WaitTimeout(3 * time.Second) {
|
||||
}
|
||||
if err := token.Error(); err != nil {
|
||||
log.Fatalf("Couldn't connect to a MQTT Broker (%s)\n", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
return client
|
||||
}
|
||||
|
||||
// onMessageReceived .
|
||||
func onMessageReceived(m mqtt.Client, message mqtt.Message) {
|
||||
log.Printf("TOPIC: %s\n", message.Topic())
|
||||
log.Printf("MSG: %s\n", message.Payload())
|
||||
re := regexp.MustCompile(`^\{\"time\"\:(?P<time>[0-9]*)(?:\,\"value\"\:)?(?P<value>[0-5]+)?\}$`)
|
||||
if re.Match([]byte(message.Payload())) {
|
||||
|
||||
groups := re.SubexpNames()
|
||||
result := re.FindAllStringSubmatch(string(message.Payload()), -1)
|
||||
|
||||
rt := map[string]string{}
|
||||
for i, n := range result[0] {
|
||||
rt[groups[i]] = n
|
||||
}
|
||||
log.Printf("PARESED MESSAGE: %+v\n", rt)
|
||||
if rt["value"] == "0" {
|
||||
log.Printf("LET'S PRINT IT\n")
|
||||
toPrinter()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// starter .
|
||||
func starter(wg *sync.WaitGroup, m mqtt.Client) {
|
||||
|
||||
log.Println("Executing a starter func connection...")
|
||||
|
||||
wg.Add(1)
|
||||
go scanner(wg, m)
|
||||
}
|
||||
|
||||
// reverse .
|
||||
func reverse(msgs []gMSG) []gMSG {
|
||||
newMsgs := make([]gMSG, 0, len(msgs))
|
||||
for i := len(msgs) - 1; i >= 0; i-- {
|
||||
newMsgs = append(newMsgs, msgs[i])
|
||||
}
|
||||
|
||||
return newMsgs
|
||||
}
|
||||
|
||||
// scanner .
|
||||
func scanner(wg *sync.WaitGroup, m mqtt.Client) {
|
||||
defer wg.Done()
|
||||
|
||||
srv := parser.Client()
|
||||
|
||||
ticker := time.NewTicker(180 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
log.Println("Setup GMail message scanner...")
|
||||
for {
|
||||
select {
|
||||
|
||||
case <-ticker.C:
|
||||
log.Print("Updating messages...\n")
|
||||
msgs := []gMSG{}
|
||||
pageToken := ""
|
||||
for {
|
||||
req := srv.Users.Messages.List("me").LabelIds("UNREAD").Q("is:unread").Q("from:messaging@iamresponding.com")
|
||||
// req := srv.Users.Messages.List("me").Q("from:messaging@iamresponding.com")
|
||||
if pageToken != "" {
|
||||
req.PageToken(pageToken)
|
||||
}
|
||||
r, err := req.Do()
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to retrieve messages: %v", err)
|
||||
}
|
||||
|
||||
log.Printf("Processing %v messages...\n", len(r.Messages))
|
||||
for _, m := range r.Messages {
|
||||
msg, err := srv.Users.Messages.Get("me", m.Id).Format("full").Do()
|
||||
if err != nil {
|
||||
log.Fatalf("Unable to retrieve message %v: %v", m.Id, err)
|
||||
}
|
||||
date := ""
|
||||
for _, h := range msg.Payload.Headers {
|
||||
if h.Name == "Date" {
|
||||
date = h.Value
|
||||
}
|
||||
// break
|
||||
}
|
||||
|
||||
msgs = append(msgs, gMSG{
|
||||
gmailID: msg.Id,
|
||||
date: date,
|
||||
snippet: html.UnescapeString(msg.Snippet),
|
||||
})
|
||||
}
|
||||
|
||||
if r.NextPageToken == "" {
|
||||
break
|
||||
}
|
||||
pageToken = r.NextPageToken
|
||||
}
|
||||
|
||||
msgs = reverse(msgs)
|
||||
|
||||
count, deleted := 0, 0
|
||||
for _, m := range msgs {
|
||||
count++
|
||||
|
||||
re := regexp.MustCompile(`^(?P<address>[^\s\[\[].*)\s\[\[(?P<city>[^\]\]].*)\]\]\s\((?P<type>[^\)].*)\)\s\-\s(?P<description>.*?)(?P<phone>\(\d{3}\)\s\d{3}\-\d{4})?\s?(?:F\d{9})?\s?(?:\d{7})?\s?(?P<time>\d{2}\:\d{2})?$`)
|
||||
if re.Match([]byte(m.snippet)) {
|
||||
|
||||
groups := re.SubexpNames()
|
||||
result := re.FindAllStringSubmatch(m.snippet, -1)
|
||||
|
||||
rt := map[string]string{}
|
||||
for i, n := range result[0] {
|
||||
rt[groups[i]] = n
|
||||
}
|
||||
// fmt.Printf("%v >> %+v\n", m.gmailID, rt)
|
||||
|
||||
_, err := srv.Users.Messages.Modify("me", m.gmailID, &gmail.ModifyMessageRequest{RemoveLabelIds: []string{"UNREAD"}}).Do()
|
||||
if err != nil {
|
||||
log.Fatalf("unable to modify message %v: %v", m.gmailID, err)
|
||||
}
|
||||
// log.Printf("Modified message %v.\n", msg)
|
||||
|
||||
var mutual_aid bool
|
||||
re := regexp.MustCompile(`^([\*]+).*$`)
|
||||
if re.Match([]byte(rt["address"])) {
|
||||
mutual_aid = true
|
||||
} else {
|
||||
mutual_aid = false
|
||||
}
|
||||
saveCallData(m.gmailID, rt["address"], rt["city"], rt["type"], rt["description"], rt["phone"], m.snippet, m.date, mutual_aid)
|
||||
} else {
|
||||
log.Printf("Couldn't parse a Gmail Message... (%s)\n", m.gmailID)
|
||||
_, err := srv.Users.Messages.Modify("me", m.gmailID, &gmail.ModifyMessageRequest{RemoveLabelIds: []string{"UNREAD"}, AddLabelIds: []string{"Label_2438996450476414779"}}).Do()
|
||||
if err != nil {
|
||||
log.Fatalf("unable to modify message %v: %v", m.gmailID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Printf("Done. %v messages processed, %v deleted\n", count, deleted)
|
||||
|
||||
case <-ctx.Done():
|
||||
log.Print("Caller has told us to stop\n")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func saveCallData(gid, iaddress, icity, itype, idescription, iphone, snippet, itime string, imutual_iad bool) error {
|
||||
_, err := conn.Exec(context.Background(), "INSERT INTO incidents(gmessage_id, address, city, type, description, phone, mutual_aid, snippet, time, created_on) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", gid, iaddress, icity, itype, idescription, iphone, imutual_iad, snippet, itime, time.Now())
|
||||
return err
|
||||
}
|
||||
|
||||
func toPrinter() {
|
||||
|
||||
fmt.Print("I'm in the Printer\n")
|
||||
var address, city, itype, description, phone, itime string
|
||||
// var mutual_aid bool
|
||||
// var created_on time.Time
|
||||
var err error
|
||||
// err = conn.QueryRow(context.Background(), "SELECT address, city, type, description, phone, time FROM incidents WHERE inc_id=$1", 566).Scan(&address, &city, &itype, &description, &phone, &itime)
|
||||
err = conn.QueryRow(context.Background(), "SELECT address, city, type, description, phone, time FROM incidents ORDER BY created_on DESC LIMIT 1").Scan(&address, &city, &itype, &description, &phone, &itime)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "QueryRow failed: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
prntr.Init()
|
||||
prntr.SetSmooth(1)
|
||||
prntr.SetFontSize(2, 3)
|
||||
prntr.SetFont("A")
|
||||
prntr.Formfeed()
|
||||
prntr.Write(" NEW PROVIDENCE FIRE DEPT")
|
||||
prntr.Formfeed()
|
||||
prntr.SetFont("B")
|
||||
prntr.Write(itime)
|
||||
prntr.Formfeed()
|
||||
if len(address) > 0 {
|
||||
prntr.Write("--- ADDRESS ------------------------------")
|
||||
prntr.SetFont("A")
|
||||
prntr.Write(string(address))
|
||||
prntr.Formfeed()
|
||||
} else {
|
||||
fmt.Print("There is no address value\n")
|
||||
}
|
||||
if len(city) > 0 {
|
||||
prntr.SetFont("B")
|
||||
prntr.Write("--- CITY ---------------------------------")
|
||||
prntr.SetFont("A")
|
||||
prntr.Write(string(city))
|
||||
prntr.Formfeed()
|
||||
} else {
|
||||
fmt.Print("There is no city value\n")
|
||||
}
|
||||
if len(itype) > 0 {
|
||||
prntr.SetFont("B")
|
||||
prntr.Write("--- TYPE ---------------------------------")
|
||||
prntr.SetFont("A")
|
||||
prntr.Write(string(itype))
|
||||
prntr.Formfeed()
|
||||
} else {
|
||||
fmt.Print("There is no type value\n")
|
||||
}
|
||||
if len(description) > 0 {
|
||||
prntr.SetFont("B")
|
||||
prntr.Write("--- DESCRIPTION --------------------------")
|
||||
prntr.SetFont("A")
|
||||
prntr.Write(string(description))
|
||||
prntr.Formfeed()
|
||||
} else {
|
||||
fmt.Print("There is no description value\n")
|
||||
}
|
||||
if len(phone) > 0 {
|
||||
prntr.SetFont("B")
|
||||
prntr.Write("--- PHONE --------------------------------")
|
||||
prntr.SetFont("A")
|
||||
prntr.Write(string(phone))
|
||||
prntr.Formfeed()
|
||||
} else {
|
||||
fmt.Print("There is no phone value\n")
|
||||
}
|
||||
// if value, ok := rt["time"]; ok {
|
||||
// prntr.Write("Time: " + value)
|
||||
// prntr.Formfeed()
|
||||
// } else {
|
||||
// fmt.Print("There is no time value\n")
|
||||
// }
|
||||
prntr.FormfeedN(3)
|
||||
// prntr.Cut()
|
||||
prntr.End()
|
||||
writerP.Flush()
|
||||
readWriter.Flush()
|
||||
log.Print("Printed message.\n")
|
||||
}
|
||||
|
||||
func main() {
|
||||
viper.SetConfigName("config") // name of config file (without extension)
|
||||
viper.SetConfigType("yml") // REQUIRED if the config file does not have the extension in the name
|
||||
// viper.AddConfigPath("/data/") //
|
||||
viper.AddConfigPath(".") // optionally look for config in the working directory
|
||||
|
||||
err := viper.ReadInConfig() // Find and read the config file
|
||||
if err != nil { // Handle errors reading the config file
|
||||
log.Fatalf("Fatal error config file: %s \n", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// viper.WatchConfig()
|
||||
// viper.OnConfigChange(func(e fsnotify.Event) {
|
||||
// log.Println("Config file changed:", e.Name)
|
||||
// })
|
||||
|
||||
if err := viper.Unmarshal(&config); err != nil {
|
||||
log.Fatalf("Fatal error config file: %s \n", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
logLevel, err := logrus.ParseLevel(viper.GetString("log.level"))
|
||||
if err != nil {
|
||||
logLevel = logrus.DebugLevel
|
||||
}
|
||||
|
||||
switch viper.GetString("log.level") {
|
||||
case "debug":
|
||||
log.SetLevel(logrus.DebugLevel)
|
||||
case "info":
|
||||
log.SetLevel(logrus.InfoLevel)
|
||||
case "warn":
|
||||
log.SetLevel(logrus.WarnLevel)
|
||||
case "error":
|
||||
log.SetLevel(logrus.ErrorLevel)
|
||||
default:
|
||||
log.SetLevel(logrus.DebugLevel)
|
||||
log.Warnf("Invalid log level supplied: '%s'", logLevel)
|
||||
}
|
||||
|
||||
customFormatter := new(logrus.TextFormatter)
|
||||
customFormatter.TimestampFormat = "2006-01-02 15:04:05"
|
||||
customFormatter.FullTimestamp = true
|
||||
log.SetFormatter(customFormatter)
|
||||
log.Printf("CONFIG: %+v", config)
|
||||
|
||||
// create a context that we can cancel
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
conn, err = pgx.Connect(context.Background(), fmt.Sprintf("postgres://%s:%s@%s:%d/%s", config.Postgres.Username, config.Postgres.Password, config.Postgres.Host, config.Postgres.Port, config.Postgres.Database))
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
log.Println("Setup PostgreSQL connection...")
|
||||
defer conn.Close(context.Background())
|
||||
|
||||
/// PRINTER START
|
||||
printerFile, err := os.OpenFile(config.Printer, os.O_RDWR, 0)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer printerFile.Close()
|
||||
|
||||
readerP = bufio.NewReader(printerFile)
|
||||
writerP = bufio.NewWriter(printerFile)
|
||||
readWriter = bufio.NewReadWriter(readerP, writerP)
|
||||
prntr = printer.New(readWriter)
|
||||
/// PRINTER END
|
||||
|
||||
// a WaitGroup for the goroutines to tell us they've stopped
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
mqttClient := mqttConnect(&wg)
|
||||
defer mqttClient.Disconnect(250)
|
||||
|
||||
// listen for C-c
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, os.Interrupt)
|
||||
<-c
|
||||
log.Println("Main: received CTRL-c - shutting down")
|
||||
|
||||
// tell the goroutines to stop
|
||||
log.Println("Main: telling goroutines to stop")
|
||||
cancel()
|
||||
|
||||
// and wait for them both to reply back
|
||||
wg.Wait()
|
||||
log.Println("Main: all goroutines have told us they've finished")
|
||||
}
|
Reference in New Issue
Block a user