package main import ( "bufio" "encoding/binary" "fmt" "log/slog" "os" "sync" "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/v2/pkg/codecs/g711" "github.com/pion/rtp" "gopkg.in/yaml.v3" ) // Config holds RTSP URL and output file path type Config struct { RTSPURL string `yaml:"rtsp_url"` OutputFile string `yaml:"output_file"` } // Event represents lifecycle events (e.g., source ready, recording started) type Event struct { Type string Data interface{} } // Path manages a stream, distributing packets to readers/recorders type Path struct { audioMedia *description.Media audioFormat format.Format readers map[string]chan *Data readerMu sync.RWMutex eventChan chan Event } // Data holds an RTP packet and its media type type Data struct { Media *description.Media Pkt *rtp.Packet } // Recorder saves packets to a file type Recorder struct { name string path *Path file *os.File writer *bufio.Writer isMuLaw bool // true for μ-law, false for A-law dataChan chan *Data terminate chan struct{} sampleRate int channels int } // NewPath creates a stream hub func NewPath(eventChan chan Event) *Path { return &Path{ readers: make(map[string]chan *Data), eventChan: eventChan, } } // AddReader adds a reader (e.g., recorder) with its own channel func (p *Path) AddReader(name string, ch chan *Data) { p.readerMu.Lock() defer p.readerMu.Unlock() p.readers[name] = ch } // RemoveReader removes a reader func (p *Path) RemoveReader(name string) { p.readerMu.Lock() defer p.readerMu.Unlock() if ch, ok := p.readers[name]; ok { close(ch) delete(p.readers, name) } } // DistributeData sends packets to all readers func (p *Path) DistributeData(data *Data) { p.readerMu.RLock() defer p.readerMu.RUnlock() for _, ch := range p.readers { select { case ch <- data: default: slog.Warn("Reader channel full, dropping packet") } } } // SourceReady sets up the path with media info func (p *Path) SourceReady(audioMedia *description.Media, audioFormat format.Format) { p.audioMedia = audioMedia p.audioFormat = audioFormat p.eventChan <- Event{Type: "source_ready", Data: audioMedia} } // NewRecorder creates a recorder that subscribes to the path func NewRecorder(path *Path, outputFile string, isMuLaw bool) (*Recorder, error) { file, err := os.Create(outputFile) if err != nil { return nil, fmt.Errorf("create file: %w", err) } r := &Recorder{ name: "recorder", path: path, file: file, writer: bufio.NewWriter(file), isMuLaw: isMuLaw, dataChan: make(chan *Data, 100), // Buffered channel terminate: make(chan struct{}), sampleRate: 8000, // Fixed for G.711 channels: 1, // Mono } path.AddReader(r.name, r.dataChan) return r, nil } // writeWAVHeader writes a basic WAV header for PCM audio func (r *Recorder) writeWAVHeader(dataSize uint32) error { header := make([]byte, 44) copy(header[0:4], "RIFF") binary.LittleEndian.PutUint32(header[4:8], 36+dataSize) // ChunkSize copy(header[8:12], "WAVE") copy(header[12:16], "fmt ") binary.LittleEndian.PutUint32(header[16:20], 16) // Subchunk1Size (16 for PCM) binary.LittleEndian.PutUint16(header[20:22], 1) // AudioFormat (1 for PCM) binary.LittleEndian.PutUint16(header[22:24], uint16(r.channels)) binary.LittleEndian.PutUint32(header[24:28], uint32(r.sampleRate)) binary.LittleEndian.PutUint32(header[28:32], uint32(r.sampleRate*r.channels*2)) // ByteRate binary.LittleEndian.PutUint16(header[32:34], uint16(r.channels*2)) // BlockAlign binary.LittleEndian.PutUint16(header[34:36], 16) // BitsPerSample copy(header[36:40], "data") binary.LittleEndian.PutUint32(header[40:44], dataSize) _, err := r.writer.Write(header) return err } // muLawToPCM converts μ-law samples to 16-bit PCM func muLawToPCM(sample byte) int16 { const muLawBias = 33 const muLawMax = 32767 var sign int if sample&0x80 != 0 { sign = -1 sample = ^sample } else { sign = 1 } sample = sample & 0x7F exponent := int(sample>>4) & 0x07 mantissa := int(sample & 0x0F) value := (mantissa << (exponent + 3)) + muLawBias if exponent > 0 { value += (1 << (exponent + 2)) } value -= muLawBias return int16(sign * value * muLawMax / 8159) } // aLawToPCM converts A-law samples to 16-bit PCM func aLawToPCM(sample byte) int16 { const aLawMax = 32767 var sign int if sample&0x80 != 0 { sign = -1 sample = ^sample } else { sign = 1 } sample = sample & 0x7F exponent := int(sample>>4) & 0x07 mantissa := int(sample & 0x0F) value := 0 if exponent == 0 { value = mantissa } else { value = (mantissa << 4) + (1 << (exponent + 2)) } return int16(sign * value * aLawMax / 4096) } // Start runs the recorder's async writing loop func (r *Recorder) Start() { // Placeholder for data size; update at end if err := r.writeWAVHeader(0); err != nil { slog.Error("Write header error", "error", err) } var totalDataSize uint32 go func() { defer r.file.Close() defer r.writer.Flush() for { select { case data := <-r.dataChan: if data.Media.Type == "audio" { // Unmarshal G.711 payload var samples []byte var err error if r.isMuLaw { var mu g711.Mulaw mu.Unmarshal(data.Pkt.Payload) samples = []byte(mu) } else { var al g711.Alaw al.Unmarshal(data.Pkt.Payload) samples = []byte(al) } if err != nil { slog.Error("Unmarshal audio error", "error", err) continue } // Convert to 16-bit PCM and write for _, sample := range samples { var pcmSample int16 if r.isMuLaw { pcmSample = muLawToPCM(sample) } else { pcmSample = aLawToPCM(sample) } var buf [2]byte binary.LittleEndian.PutUint16(buf[:], uint16(pcmSample)) _, err := r.writer.Write(buf[:]) if err != nil { slog.Error("Write error", "error", err) } totalDataSize += 2 } } case <-r.terminate: // Update WAV header with actual data size if err := r.writer.Flush(); err != nil { slog.Error("Flush error", "error", err) } if _, err := r.file.Seek(4, 0); err != nil { slog.Error("Seek error", "error", err) return } var buf [4]byte binary.LittleEndian.PutUint32(buf[:], 36+totalDataSize) r.file.Write(buf[:]) if _, err := r.file.Seek(40, 0); err != nil { slog.Error("Seek error", "error", err) return } binary.LittleEndian.PutUint32(buf[:], totalDataSize) r.file.Write(buf[:]) return } } }() } // Stop terminates the recorder func (r *Recorder) Stop() { r.path.RemoveReader(r.name) close(r.terminate) } func main() { // Initialize slog (default handler for structured logging) slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))) // Step 1: Parse config configFile, err := os.ReadFile("config.yaml") if err != nil { slog.Error("Read config", "error", err) os.Exit(1) } var conf Config if err := yaml.Unmarshal(configFile, &conf); err != nil { slog.Error("Unmarshal config", "error", err) os.Exit(1) } if conf.RTSPURL == "" || conf.OutputFile == "" { slog.Error("RTSP URL or output file missing") os.Exit(1) } // Step 2: Set up event bus eventChan := make(chan Event, 10) go func() { for ev := range eventChan { slog.Info("Event", "type", ev.Type, "data", ev.Data) } }() // Step 3: Create path (stream hub) path := NewPath(eventChan) // Step 4: Set up RTSP client (using provided connection logic) u, err := base.ParseURL(conf.RTSPURL) if err != nil { slog.Error("Failed to parse RTSP URL", "error", err, "url", conf.RTSPURL) os.Exit(1) } c := gortsplib.Client{ Scheme: u.Scheme, Host: u.Host, } // Connect to the server err = c.Start2() if err != nil { slog.Error("Failed to connect to RTSP server", "error", err) os.Exit(1) } defer c.Close() // Find available medias desc, _, err := c.Describe(u) if err != nil { slog.Error("Failed to describe RTSP stream", "error", err) os.Exit(1) } // Find the G711 media and format var audioFormat *format.G711 audioMedia := desc.FindFormat(&audioFormat) if audioMedia == nil { slog.Error("G711 media not found in RTSP stream") os.Exit(1) } if audioFormat.SampleRate != 8000 || audioFormat.ChannelCount != 1 { slog.Error("Audio format is not mono 8000Hz G711") os.Exit(1) } _, err = c.Setup(desc.BaseURL, audioMedia, 0, 0) if err != nil { slog.Error("Failed to setup RTSP media", "error", err) os.Exit(1) } // Step 5: Set up recorder (choose μ-law or A-law based on format) isMuLaw := audioFormat.MULaw recorder, err := NewRecorder(path, conf.OutputFile, isMuLaw) if err != nil { slog.Error("Create recorder", "error", err) os.Exit(1) } recorder.Start() // Step 6: Signal source ready path.SourceReady(audioMedia, audioFormat) // Step 7: Read RTP packets and distribute go func() { c.OnPacketRTP(audioMedia, audioFormat, func(pkt *rtp.Packet) { path.DistributeData(&Data{Media: audioMedia, Pkt: pkt}) }) _, err := c.Play(nil) if err != nil { slog.Error("Play failed", "error", err) eventChan <- Event{Type: "source_error", Data: err} return } }() // Step 8: Wait for errors or interruption err = c.Wait() if err != nil { slog.Error("RTSP client error", "error", err) recorder.Stop() c.Close() close(eventChan) os.Exit(1) } }