From 265c142edb8e3ecf61a7a593b828a79fd441d3bc Mon Sep 17 00:00:00 2001 From: Alex Savin Date: Tue, 19 Aug 2025 10:58:34 -0400 Subject: [PATCH] Add record segment duration to configuration and implement file rotation in recorder --- config.yaml | 1 + main.go | 148 +++++++++++++++++++++++++++++++++++++++++----------- 2 files changed, 118 insertions(+), 31 deletions(-) diff --git a/config.yaml b/config.yaml index da76979..f4370d1 100644 --- a/config.yaml +++ b/config.yaml @@ -1,2 +1,3 @@ rtsp_url: "rtsp://10.10.10.104:8554/union" output_file: "output.wav" +record_segment_duration: "30m" # Duration for each recording segment \ No newline at end of file diff --git a/main.go b/main.go index df0a2a4..728fa20 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,10 @@ import ( "fmt" "log/slog" "os" + "path/filepath" + "strings" "sync" + "time" "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/base" @@ -17,10 +20,11 @@ import ( "gopkg.in/yaml.v3" ) -// Config holds RTSP URL and output file path +// Config holds RTSP URL, output file path, and rotation settings type Config struct { - RTSPURL string `yaml:"rtsp_url"` - OutputFile string `yaml:"output_file"` + RTSPURL string `yaml:"rtsp_url"` + OutputFile string `yaml:"output_file"` + RecordSegmentDuration string `yaml:"record_segment_duration"` // Added } // Event represents lifecycle events (e.g., source ready, recording started) @@ -44,17 +48,22 @@ type Data struct { Pkt *rtp.Packet } -// Recorder saves packets to a file +// Recorder saves packets to a file with rotation type Recorder struct { - name string - path *Path - file *os.File - writer *bufio.Writer - isMuLaw bool // true for μ-law, false for A-law - dataChan chan *Data - terminate chan struct{} - sampleRate int - channels int + name string + path *Path + file *os.File + writer *bufio.Writer + isMuLaw bool // true for μ-law, false for A-law + dataChan chan *Data + terminate chan struct{} + sampleRate int + channels int + outputBase string // Base filename (without timestamp) // Added + segmentDur time.Duration // Added + segmentStart time.Time // Added + totalDataSize uint32 // Added (moved from local variable) + fileMu sync.Mutex // Protects file operations // Added } // NewPath creates a stream hub @@ -103,26 +112,38 @@ func (p *Path) SourceReady(audioMedia *description.Media, audioFormat format.For } // NewRecorder creates a recorder that subscribes to the path -func NewRecorder(path *Path, outputFile string, isMuLaw bool) (*Recorder, error) { - file, err := os.Create(outputFile) +func NewRecorder(path *Path, outputFile string, isMuLaw bool, segmentDur time.Duration) (*Recorder, error) { + // Initialize with a timestamped filename + fileName := formatFileName(outputFile, time.Now()) // Added + file, err := os.Create(fileName) // Modified if err != nil { - return nil, fmt.Errorf("create file: %w", err) + return nil, fmt.Errorf("create file %s: %w", fileName, err) // Modified } r := &Recorder{ - name: "recorder", - path: path, - file: file, - writer: bufio.NewWriter(file), - isMuLaw: isMuLaw, - dataChan: make(chan *Data, 100), // Buffered channel - terminate: make(chan struct{}), - sampleRate: 8000, // Fixed for G.711 - channels: 1, // Mono + name: "recorder", + path: path, + file: file, + writer: bufio.NewWriter(file), + isMuLaw: isMuLaw, + dataChan: make(chan *Data, 100), // Buffered channel + terminate: make(chan struct{}), + sampleRate: 8000, // Fixed for G.711 + channels: 1, // Mono + outputBase: outputFile, // Added + segmentDur: segmentDur, // Added + segmentStart: time.Now(), // Added } path.AddReader(r.name, r.dataChan) return r, nil } +// formatFileName generates a timestamped filename (e.g., output_2025-08-19_10-24-00.wav) +func formatFileName(baseName string, t time.Time) string { + ext := filepath.Ext(baseName) + name := strings.TrimSuffix(baseName, ext) + return fmt.Sprintf("%s_%s%s", name, t.Format("2006-01-02_15-04-05"), ext) +} + // writeWAVHeader writes a basic WAV header for PCM audio func (r *Recorder) writeWAVHeader(dataSize uint32) error { header := make([]byte, 44) @@ -143,6 +164,50 @@ func (r *Recorder) writeWAVHeader(dataSize uint32) error { return err } +// rotateFile closes the current file and opens a new one +func (r *Recorder) rotateFile() error { + r.fileMu.Lock() + defer r.fileMu.Unlock() + + // Flush and close current file + if err := r.writer.Flush(); err != nil { + return fmt.Errorf("flush file: %w", err) + } + if _, err := r.file.Seek(4, 0); err != nil { + return fmt.Errorf("seek file: %w", err) + } + var buf [4]byte + binary.LittleEndian.PutUint32(buf[:], 36+r.totalDataSize) + r.file.Write(buf[:]) + if _, err := r.file.Seek(40, 0); err != nil { + return fmt.Errorf("seek file: %w", err) + } + binary.LittleEndian.PutUint32(buf[:], r.totalDataSize) + r.file.Write(buf[:]) + if err := r.file.Close(); err != nil { + return fmt.Errorf("close file: %w", err) + } + + // Open new file + fileName := formatFileName(r.outputBase, time.Now()) + file, err := os.Create(fileName) + if err != nil { + return fmt.Errorf("create new file %s: %w", fileName, err) + } + r.file = file + r.writer = bufio.NewWriter(file) + r.totalDataSize = 0 + r.segmentStart = time.Now() + + // Write new WAV header + if err := r.writeWAVHeader(0); err != nil { + return fmt.Errorf("write new header: %w", err) + } + slog.Info("Rotated to new file", "filename", fileName) + r.path.eventChan <- Event{Type: "segment_complete", Data: fileName} + return nil +} + // muLawToPCM converts μ-law samples to 16-bit PCM func muLawToPCM(sample byte) int16 { const muLawBias = 33 @@ -193,7 +258,7 @@ func (r *Recorder) Start() { if err := r.writeWAVHeader(0); err != nil { slog.Error("Write header error", "error", err) } - var totalDataSize uint32 + r.totalDataSize = 0 // Moved from local variable to struct field go func() { defer r.file.Close() @@ -202,6 +267,13 @@ func (r *Recorder) Start() { select { case data := <-r.dataChan: if data.Media.Type == "audio" { + // Check for rotation + if time.Since(r.segmentStart) >= r.segmentDur { // Added + if err := r.rotateFile(); err != nil { + slog.Error("File rotation error", "error", err) + } + } + // Unmarshal G.711 payload var samples []byte var err error @@ -219,6 +291,7 @@ func (r *Recorder) Start() { continue } // Convert to 16-bit PCM and write + r.fileMu.Lock() // Added for _, sample := range samples { var pcmSample int16 if r.isMuLaw { @@ -232,11 +305,13 @@ func (r *Recorder) Start() { if err != nil { slog.Error("Write error", "error", err) } - totalDataSize += 2 + r.totalDataSize += 2 } + r.fileMu.Unlock() // Added } case <-r.terminate: - // Update WAV header with actual data size + // Update WAV header with final data size + r.fileMu.Lock() // Added if err := r.writer.Flush(); err != nil { slog.Error("Flush error", "error", err) } @@ -245,14 +320,15 @@ func (r *Recorder) Start() { return } var buf [4]byte - binary.LittleEndian.PutUint32(buf[:], 36+totalDataSize) + binary.LittleEndian.PutUint32(buf[:], 36+r.totalDataSize) r.file.Write(buf[:]) if _, err := r.file.Seek(40, 0); err != nil { slog.Error("Seek error", "error", err) return } - binary.LittleEndian.PutUint32(buf[:], totalDataSize) + binary.LittleEndian.PutUint32(buf[:], r.totalDataSize) r.file.Write(buf[:]) + r.fileMu.Unlock() // Added return } } @@ -285,6 +361,16 @@ func main() { os.Exit(1) } + segmentDur := 5 * time.Minute + if conf.RecordSegmentDuration != "" { + dur, err := time.ParseDuration(conf.RecordSegmentDuration) + if err != nil { + slog.Error("Invalid record_segment_duration", "value", conf.RecordSegmentDuration, "error", err) + os.Exit(1) + } + segmentDur = dur + } + // Step 2: Set up event bus eventChan := make(chan Event, 10) go func() { @@ -343,7 +429,7 @@ func main() { // Step 5: Set up recorder (choose μ-law or A-law based on format) isMuLaw := audioFormat.MULaw - recorder, err := NewRecorder(path, conf.OutputFile, isMuLaw) + recorder, err := NewRecorder(path, conf.OutputFile, isMuLaw, segmentDur) // Modified: added segmentDur if err != nil { slog.Error("Create recorder", "error", err) os.Exit(1)