commit cd6934049efd095d3597cd0a91378e2ea3b50e05 Author: Alex Savin Date: Fri Aug 15 11:30:28 2025 -0400 Add initial implementation of two-tone detector with Docker support diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..bcc0397 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,34 @@ +# builder +FROM golang:alpine AS builder + +WORKDIR /app +ADD . /app + +RUN CGO_ENABLED=0 go build -o two-tone-detector + +# runner +FROM alpine:latest + +WORKDIR /app +COPY --from=builder /app/two-tone-detector /app/two-tone-detector + +# Metadata params +ARG VERSION +ARG BUILD_DATE +ARG NAME +ARG VENDOR + +# Metadata +LABEL org.label-schema.build-date=$BUILD_DATE \ +org.label-schema.url="https://alex.savin.nyc" \ +org.label-schema.docker.schema-version="1.0" +# org.label-schema.name=$NAME \ +# org.label-schema.description="Example of multi-stage docker build" \ +# org.label-schema.vcs-url=https://github.com/alex-savin/$VCS_URL \ +# org.label-schema.vcs-ref=$VCS_REF \ +# org.label-schema.vendor=$VENDOR \ +# org.label-schema.version=$VERSION \ + +VOLUME /app/config + +CMD ["./two-tone-detector"] diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..480f141 --- /dev/null +++ b/go.mod @@ -0,0 +1,24 @@ +module git.savin.nyc/alex/go-two-tone-detector-file + +go 1.25 + +require ( + github.com/bluenviron/gortsplib/v4 v4.16.2 + github.com/go-audio/audio v1.0.0 + github.com/go-audio/wav v1.1.0 + github.com/pion/rtp v1.8.21 +) + +require ( + github.com/bluenviron/mediacommon/v2 v2.4.1 // indirect + github.com/go-audio/riff v1.0.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/pion/logging v0.2.4 // indirect + github.com/pion/randutil v0.1.0 // indirect + github.com/pion/rtcp v1.2.15 // indirect + github.com/pion/sdp/v3 v3.0.15 // indirect + github.com/pion/srtp/v3 v3.0.7 // indirect + github.com/pion/transport/v3 v3.0.7 // indirect + golang.org/x/net v0.43.0 // indirect + golang.org/x/sys v0.35.0 // indirect +) diff --git a/main.go b/main.go new file mode 100644 index 0000000..42edb07 --- /dev/null +++ b/main.go @@ -0,0 +1,479 @@ +package main + +import ( + "flag" + "fmt" + "log" + "log/slog" + "math" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/base" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/gortsplib/v4/pkg/url" + "github.com/go-audio/audio" + "github.com/go-audio/wav" + "github.com/pion/rtp" +) + +// Command-line flags +var ( + rtspURL = flag.String("rtsp", "", "RTSP stream URL (e.g., rtsp://localhost:8554/stream)") + minAms = flag.Int("minA", 1000, "Minimum Tone A duration (ms)") + minBms = flag.Int("minB", 3000, "Minimum Tone B duration (ms)") + gapMaxMs = flag.Int("gap", 5000, "Max gap between A and B (ms)") + winMs = flag.Int("win", 100, "Window size (ms)") + hopMs = flag.Int("hop", 50, "Hop size (ms)") + ratioThresh = flag.Float64("ratio", 0.65, "Power ratio threshold for tone detection") + rmsThresh = flag.Float64("rms", 300.0, "Minimum RMS for valid signal") + silenceThresh = flag.Float64("silenceThresh", 100.0, "RMS threshold for silence detection") + silenceDurMs = flag.Int("silenceDur", 5000, "Duration of silence (ms) to stop recording") +) + +// decodeMuLaw converts a mu-law encoded byte slice to PCM (int16) samples +func decodeMuLaw(data []byte) ([]int16, error) { + const ( + bias = 0x84 + clip = 32635 + ) + // Mu-law decoding table (simplified from standard mu-law algorithm) + muLawTable := [256]int16{ + -32124, -31100, -30076, -29052, -28028, -27004, -25980, -24956, + -23932, -22908, -21884, -20860, -19836, -18812, -17788, -16764, + -15996, -15484, -14972, -14460, -13948, -13436, -12924, -12412, + -11900, -11388, -10876, -10364, -9852, -9340, -8828, -8316, + -7932, -7676, -7420, -7164, -6908, -6652, -6396, -6140, + -5884, -5628, -5372, -5116, -4860, -4604, -4348, -4092, + -3900, -3772, -3644, -3516, -3388, -3260, -3132, -3004, + -2876, -2748, -2620, -2492, -2364, -2236, -2108, -1980, + -1884, -1820, -1756, -1692, -1628, -1564, -1500, -1436, + -1372, -1308, -1244, -1180, -1116, -1052, -988, -924, + -876, -844, -812, -780, -748, -716, -684, -652, + -620, -588, -556, -524, -492, -460, -428, -396, + -372, -356, -340, -324, -308, -292, -276, -260, + -244, -228, -212, -196, -180, -164, -148, -132, + -120, -112, -104, -96, -88, -80, -72, -64, + -56, -48, -40, -32, -24, -16, -8, 0, + 32124, 31100, 30076, 29052, 28028, 27004, 25980, 24956, + 23932, 22908, 21884, 20860, 19836, 18812, 17788, 16764, + 15996, 15484, 14972, 14460, 13948, 13436, 12924, 12412, + 11900, 11388, 10876, 10364, 9852, 9340, 8828, 8316, + 7932, 7676, 7420, 7164, 6908, 6652, 6396, 6140, + 5884, 5628, 5372, 5116, 4860, 4604, 4348, 4092, + 3900, 3772, 3644, 3516, 3388, 3260, 3132, 3004, + 2876, 2748, 2620, 2492, 2364, 2236, 2108, 1980, + 1884, 1820, 1756, 1692, 1628, 1564, 1500, 1436, + 1372, 1308, 1244, 1180, 1116, 1052, 988, 924, + 876, 844, 812, 780, 748, 716, 684, 652, + 620, 588, 556, 524, 492, 460, 428, 396, + 372, 356, 340, 324, 308, 292, 276, 260, + 244, 228, 212, 196, 180, 164, 148, 132, + 120, 112, 104, 96, 88, 80, 72, 64, + 56, 48, 40, 32, 24, 16, 8, 0, + } + + pcm := make([]int16, len(data)) + for i, b := range data { + pcm[i] = muLawTable[b] + } + return pcm, nil +} + +// Goertzel struct for frequency detection +type goertzel struct { + N int + fs float64 + k int + coeff float64 +} + +func newGoertzel(targetHz float64, fs float64, N int) *goertzel { + g := &goertzel{N: N, fs: fs} + g.k = int(0.5 + (float64(g.N)*targetHz)/fs) + omega := (2.0 * math.Pi * float64(g.k)) / float64(g.N) + g.coeff = 2.0 * math.Cos(omega) + return g +} + +func (g *goertzel) Power(x []float64) float64 { + var s0, s1, s2 float64 + for i := 0; i < g.N; i++ { + s0 = x[i] + g.coeff*s1 - s2 + s2 = s1 + s1 = s0 + } + omega := (2.0 * math.Pi * float64(g.k)) / float64(g.N) + real := s1 - s2*math.Cos(omega) + imag := s2 * math.Sin(omega) + return real*real + imag*imag +} + +func windowHann(x []float64) { + n := float64(len(x)) + for i := range x { + x[i] *= 0.5 * (1.0 - math.Cos(2.0*math.Pi*float64(i)/(n-1.0))) + } +} + +func pcmToFloat(buf []int16, N int) []float64 { + out := make([]float64, N) + for i := 0; i < N && i < len(buf); i++ { + out[i] = float64(buf[i]) + } + return out +} + +func rmsPCM(buf []int16) float64 { + var s float64 + for _, v := range buf { + f := float64(v) + s += f * f + } + if len(buf) == 0 { + return 0 + } + return math.Sqrt(s / float64(len(buf))) +} + +// twoToneDetector for detecting tone sequences +type twoToneDetector struct { + fs int + winN int + hopN int + ratioThresh float64 + rmsThresh float64 + minAms int + minBms int + gapMaxMs int + freqs []float64 + gzBank []*goertzel + inA bool + aFreq float64 + aAccumMs int + aStart time.Time + waitingB bool + bFreq float64 + bAccumMs int + bStart time.Time + bEnd time.Time + gapRemainMs int + recording bool + silenceAccumMs int + recordBuf []int16 +} + +func newTwoToneDetector(fs, winN, hopN int, ratioThresh, rmsThresh float64, minAms, minBms, gapMaxMs int) *twoToneDetector { + freqs := make([]float64, 0) + for f := 300.0; f <= 3000.0; f += 10.0 { + freqs = append(freqs, f) + } + gzBank := make([]*goertzel, len(freqs)) + for i, f := range freqs { + gzBank[i] = newGoertzel(f, float64(fs), winN) + } + return &twoToneDetector{ + fs: fs, + winN: winN, + hopN: hopN, + ratioThresh: ratioThresh, + rmsThresh: rmsThresh, + minAms: minAms, + minBms: minBms, + gapMaxMs: gapMaxMs, + freqs: freqs, + gzBank: gzBank, + } +} + +func (d *twoToneDetector) stepWindow(pcms []int16, t0 time.Time) (event string, aFreq, aDur, bFreq, bDur float64) { + xi := pcmToFloat(pcms, d.winN) + windowHann(xi) + + var total float64 + for _, v := range xi { + total += v * v + } + + r := rmsPCM(pcms) + hopDur := time.Millisecond * time.Duration(int(float64(d.hopN)*1000.0/float64(d.fs))) + now := t0 + + if r < d.rmsThresh { + slog.Debug("Window at %s: RMS %.2f below threshold %.2f, resetting", now.Format(time.RFC3339), r, d.rmsThresh) + d.reset() + return "", 0, 0, 0, 0 + } + + // Find frequency with highest power + bestIdx := -1 + bestPow := 0.0 + for i, gz := range d.gzBank { + p := gz.Power(xi) + if p > bestPow { + bestPow = p + bestIdx = i + } + } + ratio := bestPow / (total + 1e-12) + if ratio < d.ratioThresh { + slog.Debug("Window at %s: ratio %.3f below threshold %.3f, resetting", now.Format(time.RFC3339), ratio, d.ratioThresh) + d.reset() + return "", 0, 0, 0, 0 + } + freq := d.freqs[bestIdx] + + if !d.inA && !d.waitingB && !d.recording { + // Looking for Tone A + d.inA = true + d.aFreq = freq + d.aAccumMs = int(hopDur.Milliseconds()) + d.aStart = now + } else if d.inA && !d.waitingB { + // Confirming Tone A + if math.Abs(freq-d.aFreq) <= 10.0 { + d.aAccumMs += int(hopDur.Milliseconds()) + if d.aAccumMs >= d.minAms { + d.inA = false + d.waitingB = true + d.gapRemainMs = d.gapMaxMs + } + } else { + slog.Debug("Window at %s: freq %.1f Hz differs from Tone A %.1f Hz, resetting", now.Format(time.RFC3339), freq, d.aFreq) + d.reset() + } + } else if d.waitingB { + d.gapRemainMs -= int(hopDur.Milliseconds()) + if d.gapRemainMs <= 0 { + slog.Debug("Window at %s: gap exceeded %d ms, resetting", now.Format(time.RFC3339), d.gapMaxMs) + d.reset() + } else if math.Abs(freq-d.aFreq) > 10.0 { + // Check for Tone B + if d.bAccumMs == 0 { + d.bFreq = freq + d.bStart = now + } else if math.Abs(freq-d.bFreq) > 10.0 { + slog.Debug("Window at %s: freq %.1f Hz differs from Tone B %.1f Hz, resetting B", now.Format(time.RFC3339), freq, d.bFreq) + d.bFreq = freq + d.bAccumMs = 0 + d.bStart = now + } + d.bAccumMs += int(hopDur.Milliseconds()) + d.bEnd = now.Add(hopDur) + if d.bAccumMs >= d.minBms { + event = "TWO_TONE_DETECTED" + d.recording = true + d.silenceAccumMs = 0 + d.recordBuf = make([]int16, 0) + slog.Info("Two-tone detected, starting recording", "at", now.Format(time.RFC3339)) + return event, d.aFreq, float64(d.aAccumMs), d.bFreq, float64(d.bAccumMs) + } + } + } + return "", 0, 0, 0, 0 +} + +func (d *twoToneDetector) reset() { + d.inA = false + d.aFreq = 0 + d.aAccumMs = 0 + d.aStart = time.Time{} + d.waitingB = false + d.bFreq = 0 + d.bAccumMs = 0 + d.bStart = time.Time{} + d.bEnd = time.Time{} + d.gapRemainMs = 0 +} + +func saveToWAV(data []int16, sampleRate int) { + filename := fmt.Sprintf("dispatch_%d.wav", time.Now().Unix()) + file, err := os.Create(filename) + if err != nil { + slog.Error("Error creating WAV file", "error", err) + return + } + defer file.Close() + + enc := wav.NewEncoder(file, sampleRate, 16, 1, 1) // PCM format (AudioFormat=1) + intData := make([]int, len(data)) + for i, v := range data { + intData[i] = int(v) + } + buf := &audio.IntBuffer{ + Data: intData, + Format: &audio.Format{SampleRate: sampleRate, NumChannels: 1}, + SourceBitDepth: 16, + } + if err := enc.Write(buf); err != nil { + slog.Error("Error writing WAV file", "error", err) + return + } + if err := enc.Close(); err != nil { + slog.Error("Error closing WAV file", "error", err) + return + } + slog.Info("Saved recording", "fimename", filename, "samples", len(data), "seconds", float64(len(data))/float64(sampleRate)) +} + +func main() { + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + slog.SetDefault(logger) + + flag.Parse() + if *rtspURL == "" { + log.Fatal("RTSP URL is required (use -rtsp flag)") + } + + const fs = 8000 + winN := int(float64(fs) * float64(*winMs) / 1000.0) + hopN := int(float64(fs) * float64(*hopMs) / 1000.0) + if winN <= 0 || hopN <= 0 || hopN > winN { + log.Fatalf("Invalid window/hop: winN=%d, hopN=%d", winN, hopN) + } + + det := newTwoToneDetector(fs, winN, hopN, *ratioThresh, *rmsThresh, *minAms, *minBms, *gapMaxMs) + client := gortsplib.Client{} + u, err := url.Parse(*rtspURL) + if err != nil { + log.Fatalf("Invalid RTSP URL: %v", err) + } + + // Convert *url.URL to *base.URL + baseURL, err := base.ParseURL(*rtspURL) + if err != nil { + log.Fatalf("Invalid base URL: %v", err) + } + + // Set up signal handling + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + var wg sync.WaitGroup + wg.Add(1) + + // Connect to RTSP stream + if err := client.Start(u.Scheme, u.Host); err != nil { + log.Fatalf("Failed to connect to RTSP stream: %v", err) + } + defer client.Close() + + // Find G711 mu-law track + session, _, err := client.Describe(u) + if err != nil { + log.Fatalf("Failed to describe RTSP stream: %v", err) + } + + var g711Track *format.G711 + var selectedMedia *description.Media + for _, media := range session.Medias { + if g711, ok := media.Formats[0].(*format.G711); ok && g711.MULaw && g711.SampleRate == fs && g711.ChannelCount == 1 { + g711Track = g711 + selectedMedia = media + break + } + } + if g711Track == nil || selectedMedia == nil { + log.Fatal("No mono 8kHz G711 mu-law track found") + } + + // Set up the track + _, err = client.Setup(baseURL, selectedMedia, 0, 0) + if err != nil { + log.Fatalf("Failed to setup RTSP stream: %v", err) + } + + // Set up RTP packet handler + pcmBuf := make([]int16, 0, 8192) + startTime := time.Now() + sampleCount := 0 + + client.OnPacketRTP(selectedMedia, g711Track, func(pkt *rtp.Packet) { + // Decode G711 mu-law to PCM + pcm, err := decodeMuLaw(pkt.Payload) + if err != nil { + slog.Error("Error decoding mu-law", "error", err) + return + } + pcmBuf = append(pcmBuf, pcm...) + sampleCount += len(pcm) + + // Process in chunks of ~100ms (800 samples at 8kHz) + chunkSize := winN + for len(pcmBuf) >= chunkSize { + chunk := pcmBuf[:chunkSize] + pcmBuf = pcmBuf[chunkSize:] + chunkTime := startTime.Add(time.Duration(sampleCount-len(pcmBuf)-chunkSize) * time.Second / time.Duration(fs)) + + // Process windows for tone detection + for offset := 0; offset <= len(chunk)-winN; offset += hopN { + win := chunk[offset:min(offset+winN, len(chunk))] + t := chunkTime.Add(time.Duration(offset) * time.Second / time.Duration(fs)) + event, aFreq, aDur, bFreq, bDur := det.stepWindow(win, t) + if event != "" { + slog.Info("Detected two-tone sequence:\n") + slog.Info("Tone A", "Hz", aFreq, "duration (ms)", aDur) + slog.Info("Tone B", "Hz", bFreq, "duration (ms)", bDur) + det.reset() + } + } + + // Recording logic + if det.recording { + if !det.bEnd.IsZero() && !chunkTime.Before(det.bEnd.Add(100*time.Millisecond)) { + r := rmsPCM(chunk) + slog.Info("Chunk at %s: RMS=%.2f, silenceThresh=%.2f, silenceAccum=%d ms, recordBuf=%d samples", + chunkTime.Format(time.RFC3339), r, *silenceThresh, det.silenceAccumMs, len(det.recordBuf)) + if r < *silenceThresh { + det.silenceAccumMs += int(time.Duration(len(chunk)) * time.Second / time.Duration(fs) / time.Millisecond) + if det.silenceAccumMs >= *silenceDurMs && len(det.recordBuf) > 0 { + slog.Info("Silence detected, saving recording") + saveToWAV(det.recordBuf, fs) + det.recording = false + det.silenceAccumMs = 0 + det.recordBuf = nil + det.reset() + } else { + det.recordBuf = append(det.recordBuf, chunk...) + } + } else { + det.silenceAccumMs = 0 + det.recordBuf = append(det.recordBuf, chunk...) + } + } + } + } + }) + + // Start reading the stream + slog.Info("Processing RTSP stream...") + if _, err := client.Play(nil); err != nil { + log.Fatalf("Failed to play RTSP stream: %v", err) + } + + // Wait for signal + go func() { + <-sigChan + slog.Info("Received interrupt, stopping...") + if det.recording && len(det.recordBuf) > 0 { + slog.Info("Saving final recording") + saveToWAV(det.recordBuf, fs) + } + client.Close() + wg.Done() + }() + + wg.Wait() + slog.Info("Program terminated") +} + +func min(a, b int) int { + if a < b { + return a + } + return b +}