From f51749ec0cdce45a736fefac436ef216aed50fa5 Mon Sep 17 00:00:00 2001 From: Alex Savin Date: Tue, 19 Aug 2025 10:38:15 -0400 Subject: [PATCH] first commit --- config.yaml | 2 + go.mod | 22 +++ go.sum | 34 +++++ main.go | 378 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 436 insertions(+) create mode 100644 config.yaml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..da76979 --- /dev/null +++ b/config.yaml @@ -0,0 +1,2 @@ +rtsp_url: "rtsp://10.10.10.104:8554/union" +output_file: "output.wav" diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..2ffcfd2 --- /dev/null +++ b/go.mod @@ -0,0 +1,22 @@ +module git.savin.nyc/alex/go-two-tone-detector + +go 1.25 + +require ( + github.com/bluenviron/gortsplib/v4 v4.16.2 + github.com/bluenviron/mediacommon/v2 v2.4.1 + github.com/pion/rtp v1.8.21 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/google/uuid v1.6.0 // indirect + github.com/pion/logging v0.2.4 // indirect + github.com/pion/randutil v0.1.0 // indirect + github.com/pion/rtcp v1.2.15 // indirect + github.com/pion/sdp/v3 v3.0.15 // indirect + github.com/pion/srtp/v3 v3.0.7 // indirect + github.com/pion/transport/v3 v3.0.7 // indirect + golang.org/x/net v0.43.0 // indirect + golang.org/x/sys v0.35.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e3f622c --- /dev/null +++ b/go.sum @@ -0,0 +1,34 @@ +github.com/bluenviron/gortsplib/v4 v4.16.2 h1:10HaMsorjW13gscLp3R7Oj41ck2i1EHIUYCNWD2wpkI= +github.com/bluenviron/gortsplib/v4 v4.16.2/go.mod h1:Vm07yUMys9XKnuZJLfTT8zluAN2n9ZOtz40Xb8RKh+8= +github.com/bluenviron/mediacommon/v2 v2.4.1 h1:PsKrO/c7hDjXxiOGRUBsYtMGNb4lKWIFea6zcOchoVs= +github.com/bluenviron/mediacommon/v2 v2.4.1/go.mod h1:a6MbPmXtYda9mKibKVMZlW20GYLLrX2R7ZkUE+1pwV0= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pion/logging v0.2.4 h1:tTew+7cmQ+Mc1pTBLKH2puKsOvhm32dROumOZ655zB8= +github.com/pion/logging v0.2.4/go.mod h1:DffhXTKYdNZU+KtJ5pyQDjvOAh/GsNSyv1lbkFbe3so= +github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= +github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= +github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= +github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0= +github.com/pion/rtp v1.8.21 h1:3yrOwmZFyUpcIosNcWRpQaU+UXIJ6yxLuJ8Bx0mw37Y= +github.com/pion/rtp v1.8.21/go.mod h1:bAu2UFKScgzyFqvUKmbvzSdPr+NGbZtv6UB2hesqXBk= +github.com/pion/sdp/v3 v3.0.15 h1:F0I1zds+K/+37ZrzdADmx2Q44OFDOPRLhPnNTaUX9hk= +github.com/pion/sdp/v3 v3.0.15/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= +github.com/pion/srtp/v3 v3.0.7 h1:QUElw0A/FUg3MP8/KNMZB3i0m8F9XeMnTum86F7S4bs= +github.com/pion/srtp/v3 v3.0.7/go.mod h1:qvnHeqbhT7kDdB+OGB05KA/P067G3mm7XBfLaLiaNF0= +github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1o0= +github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..df0a2a4 --- /dev/null +++ b/main.go @@ -0,0 +1,378 @@ +package main + +import ( + "bufio" + "encoding/binary" + "fmt" + "log/slog" + "os" + "sync" + + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/base" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/v2/pkg/codecs/g711" + "github.com/pion/rtp" + "gopkg.in/yaml.v3" +) + +// Config holds RTSP URL and output file path +type Config struct { + RTSPURL string `yaml:"rtsp_url"` + OutputFile string `yaml:"output_file"` +} + +// Event represents lifecycle events (e.g., source ready, recording started) +type Event struct { + Type string + Data interface{} +} + +// Path manages a stream, distributing packets to readers/recorders +type Path struct { + audioMedia *description.Media + audioFormat format.Format + readers map[string]chan *Data + readerMu sync.RWMutex + eventChan chan Event +} + +// Data holds an RTP packet and its media type +type Data struct { + Media *description.Media + Pkt *rtp.Packet +} + +// Recorder saves packets to a file +type Recorder struct { + name string + path *Path + file *os.File + writer *bufio.Writer + isMuLaw bool // true for μ-law, false for A-law + dataChan chan *Data + terminate chan struct{} + sampleRate int + channels int +} + +// NewPath creates a stream hub +func NewPath(eventChan chan Event) *Path { + return &Path{ + readers: make(map[string]chan *Data), + eventChan: eventChan, + } +} + +// AddReader adds a reader (e.g., recorder) with its own channel +func (p *Path) AddReader(name string, ch chan *Data) { + p.readerMu.Lock() + defer p.readerMu.Unlock() + p.readers[name] = ch +} + +// RemoveReader removes a reader +func (p *Path) RemoveReader(name string) { + p.readerMu.Lock() + defer p.readerMu.Unlock() + if ch, ok := p.readers[name]; ok { + close(ch) + delete(p.readers, name) + } +} + +// DistributeData sends packets to all readers +func (p *Path) DistributeData(data *Data) { + p.readerMu.RLock() + defer p.readerMu.RUnlock() + for _, ch := range p.readers { + select { + case ch <- data: + default: + slog.Warn("Reader channel full, dropping packet") + } + } +} + +// SourceReady sets up the path with media info +func (p *Path) SourceReady(audioMedia *description.Media, audioFormat format.Format) { + p.audioMedia = audioMedia + p.audioFormat = audioFormat + p.eventChan <- Event{Type: "source_ready", Data: audioMedia} +} + +// NewRecorder creates a recorder that subscribes to the path +func NewRecorder(path *Path, outputFile string, isMuLaw bool) (*Recorder, error) { + file, err := os.Create(outputFile) + if err != nil { + return nil, fmt.Errorf("create file: %w", err) + } + r := &Recorder{ + name: "recorder", + path: path, + file: file, + writer: bufio.NewWriter(file), + isMuLaw: isMuLaw, + dataChan: make(chan *Data, 100), // Buffered channel + terminate: make(chan struct{}), + sampleRate: 8000, // Fixed for G.711 + channels: 1, // Mono + } + path.AddReader(r.name, r.dataChan) + return r, nil +} + +// writeWAVHeader writes a basic WAV header for PCM audio +func (r *Recorder) writeWAVHeader(dataSize uint32) error { + header := make([]byte, 44) + copy(header[0:4], "RIFF") + binary.LittleEndian.PutUint32(header[4:8], 36+dataSize) // ChunkSize + copy(header[8:12], "WAVE") + copy(header[12:16], "fmt ") + binary.LittleEndian.PutUint32(header[16:20], 16) // Subchunk1Size (16 for PCM) + binary.LittleEndian.PutUint16(header[20:22], 1) // AudioFormat (1 for PCM) + binary.LittleEndian.PutUint16(header[22:24], uint16(r.channels)) + binary.LittleEndian.PutUint32(header[24:28], uint32(r.sampleRate)) + binary.LittleEndian.PutUint32(header[28:32], uint32(r.sampleRate*r.channels*2)) // ByteRate + binary.LittleEndian.PutUint16(header[32:34], uint16(r.channels*2)) // BlockAlign + binary.LittleEndian.PutUint16(header[34:36], 16) // BitsPerSample + copy(header[36:40], "data") + binary.LittleEndian.PutUint32(header[40:44], dataSize) + _, err := r.writer.Write(header) + return err +} + +// muLawToPCM converts μ-law samples to 16-bit PCM +func muLawToPCM(sample byte) int16 { + const muLawBias = 33 + const muLawMax = 32767 + var sign int + if sample&0x80 != 0 { + sign = -1 + sample = ^sample + } else { + sign = 1 + } + sample = sample & 0x7F + exponent := int(sample>>4) & 0x07 + mantissa := int(sample & 0x0F) + value := (mantissa << (exponent + 3)) + muLawBias + if exponent > 0 { + value += (1 << (exponent + 2)) + } + value -= muLawBias + return int16(sign * value * muLawMax / 8159) +} + +// aLawToPCM converts A-law samples to 16-bit PCM +func aLawToPCM(sample byte) int16 { + const aLawMax = 32767 + var sign int + if sample&0x80 != 0 { + sign = -1 + sample = ^sample + } else { + sign = 1 + } + sample = sample & 0x7F + exponent := int(sample>>4) & 0x07 + mantissa := int(sample & 0x0F) + value := 0 + if exponent == 0 { + value = mantissa + } else { + value = (mantissa << 4) + (1 << (exponent + 2)) + } + return int16(sign * value * aLawMax / 4096) +} + +// Start runs the recorder's async writing loop +func (r *Recorder) Start() { + // Placeholder for data size; update at end + if err := r.writeWAVHeader(0); err != nil { + slog.Error("Write header error", "error", err) + } + var totalDataSize uint32 + + go func() { + defer r.file.Close() + defer r.writer.Flush() + for { + select { + case data := <-r.dataChan: + if data.Media.Type == "audio" { + // Unmarshal G.711 payload + var samples []byte + var err error + if r.isMuLaw { + var mu g711.Mulaw + mu.Unmarshal(data.Pkt.Payload) + samples = []byte(mu) + } else { + var al g711.Alaw + al.Unmarshal(data.Pkt.Payload) + samples = []byte(al) + } + if err != nil { + slog.Error("Unmarshal audio error", "error", err) + continue + } + // Convert to 16-bit PCM and write + for _, sample := range samples { + var pcmSample int16 + if r.isMuLaw { + pcmSample = muLawToPCM(sample) + } else { + pcmSample = aLawToPCM(sample) + } + var buf [2]byte + binary.LittleEndian.PutUint16(buf[:], uint16(pcmSample)) + _, err := r.writer.Write(buf[:]) + if err != nil { + slog.Error("Write error", "error", err) + } + totalDataSize += 2 + } + } + case <-r.terminate: + // Update WAV header with actual data size + if err := r.writer.Flush(); err != nil { + slog.Error("Flush error", "error", err) + } + if _, err := r.file.Seek(4, 0); err != nil { + slog.Error("Seek error", "error", err) + return + } + var buf [4]byte + binary.LittleEndian.PutUint32(buf[:], 36+totalDataSize) + r.file.Write(buf[:]) + if _, err := r.file.Seek(40, 0); err != nil { + slog.Error("Seek error", "error", err) + return + } + binary.LittleEndian.PutUint32(buf[:], totalDataSize) + r.file.Write(buf[:]) + return + } + } + }() +} + +// Stop terminates the recorder +func (r *Recorder) Stop() { + r.path.RemoveReader(r.name) + close(r.terminate) +} + +func main() { + // Initialize slog (default handler for structured logging) + slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))) + + // Step 1: Parse config + configFile, err := os.ReadFile("config.yaml") + if err != nil { + slog.Error("Read config", "error", err) + os.Exit(1) + } + var conf Config + if err := yaml.Unmarshal(configFile, &conf); err != nil { + slog.Error("Unmarshal config", "error", err) + os.Exit(1) + } + if conf.RTSPURL == "" || conf.OutputFile == "" { + slog.Error("RTSP URL or output file missing") + os.Exit(1) + } + + // Step 2: Set up event bus + eventChan := make(chan Event, 10) + go func() { + for ev := range eventChan { + slog.Info("Event", "type", ev.Type, "data", ev.Data) + } + }() + + // Step 3: Create path (stream hub) + path := NewPath(eventChan) + + // Step 4: Set up RTSP client (using provided connection logic) + u, err := base.ParseURL(conf.RTSPURL) + if err != nil { + slog.Error("Failed to parse RTSP URL", "error", err, "url", conf.RTSPURL) + os.Exit(1) + } + + c := gortsplib.Client{ + Scheme: u.Scheme, + Host: u.Host, + } + + // Connect to the server + err = c.Start2() + if err != nil { + slog.Error("Failed to connect to RTSP server", "error", err) + os.Exit(1) + } + defer c.Close() + + // Find available medias + desc, _, err := c.Describe(u) + if err != nil { + slog.Error("Failed to describe RTSP stream", "error", err) + os.Exit(1) + } + + // Find the G711 media and format + var audioFormat *format.G711 + audioMedia := desc.FindFormat(&audioFormat) + if audioMedia == nil { + slog.Error("G711 media not found in RTSP stream") + os.Exit(1) + } + if audioFormat.SampleRate != 8000 || audioFormat.ChannelCount != 1 { + slog.Error("Audio format is not mono 8000Hz G711") + os.Exit(1) + } + + _, err = c.Setup(desc.BaseURL, audioMedia, 0, 0) + if err != nil { + slog.Error("Failed to setup RTSP media", "error", err) + os.Exit(1) + } + + // Step 5: Set up recorder (choose μ-law or A-law based on format) + isMuLaw := audioFormat.MULaw + recorder, err := NewRecorder(path, conf.OutputFile, isMuLaw) + if err != nil { + slog.Error("Create recorder", "error", err) + os.Exit(1) + } + recorder.Start() + + // Step 6: Signal source ready + path.SourceReady(audioMedia, audioFormat) + + // Step 7: Read RTP packets and distribute + go func() { + c.OnPacketRTP(audioMedia, audioFormat, func(pkt *rtp.Packet) { + path.DistributeData(&Data{Media: audioMedia, Pkt: pkt}) + }) + _, err := c.Play(nil) + if err != nil { + slog.Error("Play failed", "error", err) + eventChan <- Event{Type: "source_error", Data: err} + return + } + }() + + // Step 8: Wait for errors or interruption + err = c.Wait() + if err != nil { + slog.Error("RTSP client error", "error", err) + recorder.Stop() + c.Close() + close(eventChan) + os.Exit(1) + } +}