package main import ( "flag" "fmt" "log" "log/slog" "math" "os" "os/signal" "sync" "syscall" "time" "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/gortsplib/v4/pkg/url" "github.com/go-audio/audio" "github.com/go-audio/wav" "github.com/pion/rtp" ) // Command-line flags var ( rtspURL = flag.String("rtsp", "", "RTSP stream URL (e.g., rtsp://localhost:8554/stream)") minAms = flag.Int("minA", 1000, "Minimum Tone A duration (ms)") minBms = flag.Int("minB", 3000, "Minimum Tone B duration (ms)") gapMaxMs = flag.Int("gap", 5000, "Max gap between A and B (ms)") winMs = flag.Int("win", 100, "Window size (ms)") hopMs = flag.Int("hop", 50, "Hop size (ms)") ratioThresh = flag.Float64("ratio", 0.65, "Power ratio threshold for tone detection") rmsThresh = flag.Float64("rms", 300.0, "Minimum RMS for valid signal") silenceThresh = flag.Float64("silenceThresh", 100.0, "RMS threshold for silence detection") silenceDurMs = flag.Int("silenceDur", 5000, "Duration of silence (ms) to stop recording") ) // decodeMuLaw converts a mu-law encoded byte slice to PCM (int16) samples func decodeMuLaw(data []byte) ([]int16, error) { const ( bias = 0x84 clip = 32635 ) // Mu-law decoding table (simplified from standard mu-law algorithm) muLawTable := [256]int16{ -32124, -31100, -30076, -29052, -28028, -27004, -25980, -24956, -23932, -22908, -21884, -20860, -19836, -18812, -17788, -16764, -15996, -15484, -14972, -14460, -13948, -13436, -12924, -12412, -11900, -11388, -10876, -10364, -9852, -9340, -8828, -8316, -7932, -7676, -7420, -7164, -6908, -6652, -6396, -6140, -5884, -5628, -5372, -5116, -4860, -4604, -4348, -4092, -3900, -3772, -3644, -3516, -3388, -3260, -3132, -3004, -2876, -2748, -2620, -2492, -2364, -2236, -2108, -1980, -1884, -1820, -1756, -1692, -1628, -1564, -1500, -1436, -1372, -1308, -1244, -1180, -1116, -1052, -988, -924, -876, -844, -812, -780, -748, -716, -684, -652, -620, -588, -556, -524, -492, -460, -428, -396, -372, -356, -340, -324, -308, -292, -276, -260, -244, -228, -212, -196, -180, -164, -148, -132, -120, -112, -104, -96, -88, -80, -72, -64, -56, -48, -40, -32, -24, -16, -8, 0, 32124, 31100, 30076, 29052, 28028, 27004, 25980, 24956, 23932, 22908, 21884, 20860, 19836, 18812, 17788, 16764, 15996, 15484, 14972, 14460, 13948, 13436, 12924, 12412, 11900, 11388, 10876, 10364, 9852, 9340, 8828, 8316, 7932, 7676, 7420, 7164, 6908, 6652, 6396, 6140, 5884, 5628, 5372, 5116, 4860, 4604, 4348, 4092, 3900, 3772, 3644, 3516, 3388, 3260, 3132, 3004, 2876, 2748, 2620, 2492, 2364, 2236, 2108, 1980, 1884, 1820, 1756, 1692, 1628, 1564, 1500, 1436, 1372, 1308, 1244, 1180, 1116, 1052, 988, 924, 876, 844, 812, 780, 748, 716, 684, 652, 620, 588, 556, 524, 492, 460, 428, 396, 372, 356, 340, 324, 308, 292, 276, 260, 244, 228, 212, 196, 180, 164, 148, 132, 120, 112, 104, 96, 88, 80, 72, 64, 56, 48, 40, 32, 24, 16, 8, 0, } pcm := make([]int16, len(data)) for i, b := range data { pcm[i] = muLawTable[b] } return pcm, nil } // Goertzel struct for frequency detection type goertzel struct { N int fs float64 k int coeff float64 } func newGoertzel(targetHz float64, fs float64, N int) *goertzel { g := &goertzel{N: N, fs: fs} g.k = int(0.5 + (float64(g.N)*targetHz)/fs) omega := (2.0 * math.Pi * float64(g.k)) / float64(g.N) g.coeff = 2.0 * math.Cos(omega) return g } func (g *goertzel) Power(x []float64) float64 { var s0, s1, s2 float64 for i := 0; i < g.N; i++ { s0 = x[i] + g.coeff*s1 - s2 s2 = s1 s1 = s0 } omega := (2.0 * math.Pi * float64(g.k)) / float64(g.N) real := s1 - s2*math.Cos(omega) imag := s2 * math.Sin(omega) return real*real + imag*imag } func windowHann(x []float64) { n := float64(len(x)) for i := range x { x[i] *= 0.5 * (1.0 - math.Cos(2.0*math.Pi*float64(i)/(n-1.0))) } } func pcmToFloat(buf []int16, N int) []float64 { out := make([]float64, N) for i := 0; i < N && i < len(buf); i++ { out[i] = float64(buf[i]) } return out } func rmsPCM(buf []int16) float64 { var s float64 for _, v := range buf { f := float64(v) s += f * f } if len(buf) == 0 { return 0 } return math.Sqrt(s / float64(len(buf))) } // twoToneDetector for detecting tone sequences type twoToneDetector struct { fs int winN int hopN int ratioThresh float64 rmsThresh float64 minAms int minBms int gapMaxMs int freqs []float64 gzBank []*goertzel inA bool aFreq float64 aAccumMs int aStart time.Time waitingB bool bFreq float64 bAccumMs int bStart time.Time bEnd time.Time gapRemainMs int recording bool silenceAccumMs int recordBuf []int16 } func newTwoToneDetector(fs, winN, hopN int, ratioThresh, rmsThresh float64, minAms, minBms, gapMaxMs int) *twoToneDetector { freqs := make([]float64, 0) for f := 300.0; f <= 3000.0; f += 10.0 { freqs = append(freqs, f) } gzBank := make([]*goertzel, len(freqs)) for i, f := range freqs { gzBank[i] = newGoertzel(f, float64(fs), winN) } return &twoToneDetector{ fs: fs, winN: winN, hopN: hopN, ratioThresh: ratioThresh, rmsThresh: rmsThresh, minAms: minAms, minBms: minBms, gapMaxMs: gapMaxMs, freqs: freqs, gzBank: gzBank, } } func (d *twoToneDetector) stepWindow(pcms []int16, t0 time.Time) (event string, aFreq, aDur, bFreq, bDur float64) { xi := pcmToFloat(pcms, d.winN) windowHann(xi) var total float64 for _, v := range xi { total += v * v } r := rmsPCM(pcms) hopDur := time.Millisecond * time.Duration(int(float64(d.hopN)*1000.0/float64(d.fs))) now := t0 if r < d.rmsThresh { slog.Debug("Window RMS below threshold, resetting", "at", now.Format(time.RFC3339), "RMS", r, "threshold", d.rmsThresh) d.reset() return "", 0, 0, 0, 0 } // Find frequency with highest power bestIdx := -1 bestPow := 0.0 for i, gz := range d.gzBank { p := gz.Power(xi) if p > bestPow { bestPow = p bestIdx = i } } ratio := bestPow / (total + 1e-12) if ratio < d.ratioThresh { slog.Debug("Window ratio below threshold, resetting", "at", now.Format(time.RFC3339), "ratio", ratio, "threshold", d.ratioThresh) d.reset() return "", 0, 0, 0, 0 } freq := d.freqs[bestIdx] if !d.inA && !d.waitingB && !d.recording { // Looking for Tone A d.inA = true d.aFreq = freq d.aAccumMs = int(hopDur.Milliseconds()) d.aStart = now } else if d.inA && !d.waitingB { // Confirming Tone A if math.Abs(freq-d.aFreq) <= 10.0 { d.aAccumMs += int(hopDur.Milliseconds()) if d.aAccumMs >= d.minAms { d.inA = false d.waitingB = true d.gapRemainMs = d.gapMaxMs } } else { slog.Debug("Window freq differs from Tone A, resetting A", "at", now.Format(time.RFC3339), "freq (Hz)", freq, "Tone A (Hz)", d.aFreq) d.reset() } } else if d.waitingB { d.gapRemainMs -= int(hopDur.Milliseconds()) if d.gapRemainMs <= 0 { slog.Debug("Window at %s: gap exceeded %d ms, resetting", now.Format(time.RFC3339), d.gapMaxMs) d.reset() } else if math.Abs(freq-d.aFreq) > 10.0 { // Check for Tone B if d.bAccumMs == 0 { d.bFreq = freq d.bStart = now } else if math.Abs(freq-d.bFreq) > 10.0 { slog.Debug("Window freq differs from Tone B, resetting B", "at", now.Format(time.RFC3339), "freq (Hz)", freq, "Tone B (Hz)", d.bFreq) d.bFreq = freq d.bAccumMs = 0 d.bStart = now } d.bAccumMs += int(hopDur.Milliseconds()) d.bEnd = now.Add(hopDur) if d.bAccumMs >= d.minBms { event = "TWO_TONE_DETECTED" d.recording = true d.silenceAccumMs = 0 d.recordBuf = make([]int16, 0) slog.Info("Two-tone detected, starting recording", "at", now.Format(time.RFC3339)) return event, d.aFreq, float64(d.aAccumMs), d.bFreq, float64(d.bAccumMs) } } } return "", 0, 0, 0, 0 } func (d *twoToneDetector) reset() { d.inA = false d.aFreq = 0 d.aAccumMs = 0 d.aStart = time.Time{} d.waitingB = false d.bFreq = 0 d.bAccumMs = 0 d.bStart = time.Time{} d.bEnd = time.Time{} d.gapRemainMs = 0 } func saveToWAV(data []int16, sampleRate int) { filename := fmt.Sprintf("dispatch_%d.wav", time.Now().Unix()) file, err := os.Create(filename) if err != nil { slog.Error("Error creating WAV file", "error", err) return } defer file.Close() enc := wav.NewEncoder(file, sampleRate, 16, 1, 1) // PCM format (AudioFormat=1) intData := make([]int, len(data)) for i, v := range data { intData[i] = int(v) } buf := &audio.IntBuffer{ Data: intData, Format: &audio.Format{SampleRate: sampleRate, NumChannels: 1}, SourceBitDepth: 16, } if err := enc.Write(buf); err != nil { slog.Error("Error writing WAV file", "error", err) return } if err := enc.Close(); err != nil { slog.Error("Error closing WAV file", "error", err) return } slog.Info("Saved recording", "fimename", filename, "samples", len(data), "seconds", float64(len(data))/float64(sampleRate)) } func main() { logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) slog.SetDefault(logger) flag.Parse() if *rtspURL == "" { log.Fatal("RTSP URL is required (use -rtsp flag)") } const fs = 8000 winN := int(float64(fs) * float64(*winMs) / 1000.0) hopN := int(float64(fs) * float64(*hopMs) / 1000.0) if winN <= 0 || hopN <= 0 || hopN > winN { log.Fatalf("Invalid window/hop: winN=%d, hopN=%d", winN, hopN) } det := newTwoToneDetector(fs, winN, hopN, *ratioThresh, *rmsThresh, *minAms, *minBms, *gapMaxMs) client := gortsplib.Client{} u, err := url.Parse(*rtspURL) if err != nil { log.Fatalf("Invalid RTSP URL: %v", err) } // Convert *url.URL to *base.URL baseURL, err := base.ParseURL(*rtspURL) if err != nil { log.Fatalf("Invalid base URL: %v", err) } // Set up signal handling sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) var wg sync.WaitGroup wg.Add(1) // Connect to RTSP stream if err := client.Start(u.Scheme, u.Host); err != nil { log.Fatalf("Failed to connect to RTSP stream: %v", err) } defer client.Close() // Find G711 mu-law track session, _, err := client.Describe(u) if err != nil { log.Fatalf("Failed to describe RTSP stream: %v", err) } var g711Track *format.G711 var selectedMedia *description.Media for _, media := range session.Medias { if g711, ok := media.Formats[0].(*format.G711); ok && g711.MULaw && g711.SampleRate == fs && g711.ChannelCount == 1 { g711Track = g711 selectedMedia = media break } } if g711Track == nil || selectedMedia == nil { log.Fatal("No mono 8kHz G711 mu-law track found") } // Set up the track _, err = client.Setup(baseURL, selectedMedia, 0, 0) if err != nil { log.Fatalf("Failed to setup RTSP stream: %v", err) } // Set up RTP packet handler pcmBuf := make([]int16, 0, 8192) startTime := time.Now() sampleCount := 0 client.OnPacketRTP(selectedMedia, g711Track, func(pkt *rtp.Packet) { // Decode G711 mu-law to PCM pcm, err := decodeMuLaw(pkt.Payload) if err != nil { slog.Error("Error decoding mu-law", "error", err) return } pcmBuf = append(pcmBuf, pcm...) sampleCount += len(pcm) // Process in chunks of ~100ms (800 samples at 8kHz) chunkSize := winN for len(pcmBuf) >= chunkSize { chunk := pcmBuf[:chunkSize] pcmBuf = pcmBuf[chunkSize:] chunkTime := startTime.Add(time.Duration(sampleCount-len(pcmBuf)-chunkSize) * time.Second / time.Duration(fs)) // Process windows for tone detection for offset := 0; offset <= len(chunk)-winN; offset += hopN { win := chunk[offset:min(offset+winN, len(chunk))] t := chunkTime.Add(time.Duration(offset) * time.Second / time.Duration(fs)) event, aFreq, aDur, bFreq, bDur := det.stepWindow(win, t) if event != "" { slog.Info("Detected two-tone sequence:\n") slog.Info("Tone A", "Hz", aFreq, "duration (ms)", aDur) slog.Info("Tone B", "Hz", bFreq, "duration (ms)", bDur) det.reset() } } // Recording logic if det.recording { if !det.bEnd.IsZero() && !chunkTime.Before(det.bEnd.Add(100*time.Millisecond)) { r := rmsPCM(chunk) slog.Info("chunk", "at", chunkTime.Format(time.RFC3339), "RMS", r, "silenceThresh", *silenceThresh, "silenceAccum (ms)", det.silenceAccumMs, "record buffer (samples)", len(det.recordBuf)) if r < *silenceThresh { det.silenceAccumMs += int(time.Duration(len(chunk)) * time.Second / time.Duration(fs) / time.Millisecond) if det.silenceAccumMs >= *silenceDurMs && len(det.recordBuf) > 0 { slog.Info("Silence detected, saving recording") saveToWAV(det.recordBuf, fs) det.recording = false det.silenceAccumMs = 0 det.recordBuf = nil det.reset() } else { det.recordBuf = append(det.recordBuf, chunk...) } } else { det.silenceAccumMs = 0 det.recordBuf = append(det.recordBuf, chunk...) } } } } }) // Start reading the stream slog.Info("Processing RTSP stream...") if _, err := client.Play(nil); err != nil { log.Fatalf("Failed to play RTSP stream: %v", err) } // Wait for signal go func() { <-sigChan slog.Info("Received interrupt, stopping...") if det.recording && len(det.recordBuf) > 0 { slog.Info("Saving final recording") saveToWAV(det.recordBuf, fs) } client.Close() wg.Done() }() wg.Wait() slog.Info("Program terminated") } func min(a, b int) int { if a < b { return a } return b }