package parser import ( "context" "errors" "io" "log/slog" "os" "sync/atomic" "git.savin.nyc/alex/go-receipt-tracker/bus" "git.savin.nyc/alex/go-receipt-tracker/config" "git.savin.nyc/alex/go-receipt-tracker/utils" "github.com/liushuangls/go-anthropic/v2" ) type Claude struct { id string model string opts *config.Parser client *anthropic.Client subscriptions map[string]chan bus.Event // bus *bus.Bus log *slog.Logger cancel context.CancelFunc // end uint32 // ensure the close methods are only called once } func NewClaude(cfg *config.Parser, bus *bus.Bus) *Claude { ai := &Claude{ id: cfg.Type, // "claude", model: cfg.Model, // "gpt-4o-mini" opts: cfg, bus: bus, } return ai } func (ai *Claude) ID() string { return ai.id } func (ai *Claude) Type() string { return "parser" } func (ai *Claude) Init(log *slog.Logger) error { ai.client = anthropic.NewClient(ai.opts.ApiKey) ai.log = log return nil } func (ai *Claude) Serve() { if atomic.LoadUint32(&ai.end) == 1 { return } ai.subscribe("parser:" + ai.ID()) ai.subscribe("parser:*") ctx, cancel := context.WithCancel(context.Background()) ai.cancel = cancel go ai.eventLoop(ctx) } func (ai *Claude) Stop() { } func (ai *Claude) subscribe(chn string) error { s, err := ai.bus.Subscribe(chn, ai.Type()+":"+ai.ID()) if err != nil { ai.log.Error("couldn't subscribe to a channel", "channel", chn, "error", err.Error()) return err } ai.subscriptions[chn] = s return nil } // eventLoop loops forever func (ai *Claude) eventLoop(ctx context.Context) { ai.log.Debug(ai.ID() + " communication event loop started") defer ai.log.Debug(ai.ID() + " communication event loop halted") for { for chn, ch := range ai.subscriptions { select { case event := <-ch: ai.log.Debug("got a new message to a channel", "channel", chn) for _, msg := range event.Payload.([]*bus.Message) { ai.log.Debug("publishing mqtt message", "message", msg) } case <-ctx.Done(): ai.log.Info("stopping " + ai.ID() + " communication event loop") return } } } } func (ai *Claude) Recognize(img string) (res string, err error) { image := "./" + img + ".jpg" imageMediaType := "image/jpeg" imageFile, err := os.Open(image) if err != nil { return "", err } imageData, err := io.ReadAll(imageFile) if err != nil { return "", err } resp, err := ai.client.CreateMessages(context.Background(), anthropic.MessagesRequest{ Model: anthropic.ModelClaude3Opus20240229, Messages: []anthropic.Message{ { Role: anthropic.RoleUser, Content: []anthropic.MessageContent{ anthropic.NewImageMessageContent(anthropic.MessageContentSource{ Type: "base64", MediaType: imageMediaType, Data: imageData, }), anthropic.NewTextMessageContent(config.Request), }, }, }, MaxTokens: 1000, }) if err != nil { var e *anthropic.APIError if errors.As(err, &e) { ai.log.Error("cannot recognize a receipt", "type", e.Type, "message", e.Message) } else { ai.log.Error("cannot recognize a receipt", "Messages error: %v\n", err) } return "", err } if !utils.IsJSON(*resp.Content[0].Text) { ai.log.Error("Claude returned not valid JSON", "%+v", resp.Content[0].Text) return "", err } ai.log.Debug("recognition output", "%+v", resp.Content[0].Text) return *resp.Content[0].Text, nil }