Add record segment duration to configuration and implement file rotation in recorder

This commit is contained in:
2025-08-19 10:58:34 -04:00
parent f51749ec0c
commit 265c142edb
2 changed files with 118 additions and 31 deletions

View File

@ -1,2 +1,3 @@
rtsp_url: "rtsp://10.10.10.104:8554/union" rtsp_url: "rtsp://10.10.10.104:8554/union"
output_file: "output.wav" output_file: "output.wav"
record_segment_duration: "30m" # Duration for each recording segment

148
main.go
View File

@ -6,7 +6,10 @@ import (
"fmt" "fmt"
"log/slog" "log/slog"
"os" "os"
"path/filepath"
"strings"
"sync" "sync"
"time"
"github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/base"
@ -17,10 +20,11 @@ import (
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
) )
// Config holds RTSP URL and output file path // Config holds RTSP URL, output file path, and rotation settings
type Config struct { type Config struct {
RTSPURL string `yaml:"rtsp_url"` RTSPURL string `yaml:"rtsp_url"`
OutputFile string `yaml:"output_file"` OutputFile string `yaml:"output_file"`
RecordSegmentDuration string `yaml:"record_segment_duration"` // Added
} }
// Event represents lifecycle events (e.g., source ready, recording started) // Event represents lifecycle events (e.g., source ready, recording started)
@ -44,17 +48,22 @@ type Data struct {
Pkt *rtp.Packet Pkt *rtp.Packet
} }
// Recorder saves packets to a file // Recorder saves packets to a file with rotation
type Recorder struct { type Recorder struct {
name string name string
path *Path path *Path
file *os.File file *os.File
writer *bufio.Writer writer *bufio.Writer
isMuLaw bool // true for μ-law, false for A-law isMuLaw bool // true for μ-law, false for A-law
dataChan chan *Data dataChan chan *Data
terminate chan struct{} terminate chan struct{}
sampleRate int sampleRate int
channels int channels int
outputBase string // Base filename (without timestamp) // Added
segmentDur time.Duration // Added
segmentStart time.Time // Added
totalDataSize uint32 // Added (moved from local variable)
fileMu sync.Mutex // Protects file operations // Added
} }
// NewPath creates a stream hub // NewPath creates a stream hub
@ -103,26 +112,38 @@ func (p *Path) SourceReady(audioMedia *description.Media, audioFormat format.For
} }
// NewRecorder creates a recorder that subscribes to the path // NewRecorder creates a recorder that subscribes to the path
func NewRecorder(path *Path, outputFile string, isMuLaw bool) (*Recorder, error) { func NewRecorder(path *Path, outputFile string, isMuLaw bool, segmentDur time.Duration) (*Recorder, error) {
file, err := os.Create(outputFile) // Initialize with a timestamped filename
fileName := formatFileName(outputFile, time.Now()) // Added
file, err := os.Create(fileName) // Modified
if err != nil { if err != nil {
return nil, fmt.Errorf("create file: %w", err) return nil, fmt.Errorf("create file %s: %w", fileName, err) // Modified
} }
r := &Recorder{ r := &Recorder{
name: "recorder", name: "recorder",
path: path, path: path,
file: file, file: file,
writer: bufio.NewWriter(file), writer: bufio.NewWriter(file),
isMuLaw: isMuLaw, isMuLaw: isMuLaw,
dataChan: make(chan *Data, 100), // Buffered channel dataChan: make(chan *Data, 100), // Buffered channel
terminate: make(chan struct{}), terminate: make(chan struct{}),
sampleRate: 8000, // Fixed for G.711 sampleRate: 8000, // Fixed for G.711
channels: 1, // Mono channels: 1, // Mono
outputBase: outputFile, // Added
segmentDur: segmentDur, // Added
segmentStart: time.Now(), // Added
} }
path.AddReader(r.name, r.dataChan) path.AddReader(r.name, r.dataChan)
return r, nil return r, nil
} }
// formatFileName generates a timestamped filename (e.g., output_2025-08-19_10-24-00.wav)
func formatFileName(baseName string, t time.Time) string {
ext := filepath.Ext(baseName)
name := strings.TrimSuffix(baseName, ext)
return fmt.Sprintf("%s_%s%s", name, t.Format("2006-01-02_15-04-05"), ext)
}
// writeWAVHeader writes a basic WAV header for PCM audio // writeWAVHeader writes a basic WAV header for PCM audio
func (r *Recorder) writeWAVHeader(dataSize uint32) error { func (r *Recorder) writeWAVHeader(dataSize uint32) error {
header := make([]byte, 44) header := make([]byte, 44)
@ -143,6 +164,50 @@ func (r *Recorder) writeWAVHeader(dataSize uint32) error {
return err return err
} }
// rotateFile closes the current file and opens a new one
func (r *Recorder) rotateFile() error {
r.fileMu.Lock()
defer r.fileMu.Unlock()
// Flush and close current file
if err := r.writer.Flush(); err != nil {
return fmt.Errorf("flush file: %w", err)
}
if _, err := r.file.Seek(4, 0); err != nil {
return fmt.Errorf("seek file: %w", err)
}
var buf [4]byte
binary.LittleEndian.PutUint32(buf[:], 36+r.totalDataSize)
r.file.Write(buf[:])
if _, err := r.file.Seek(40, 0); err != nil {
return fmt.Errorf("seek file: %w", err)
}
binary.LittleEndian.PutUint32(buf[:], r.totalDataSize)
r.file.Write(buf[:])
if err := r.file.Close(); err != nil {
return fmt.Errorf("close file: %w", err)
}
// Open new file
fileName := formatFileName(r.outputBase, time.Now())
file, err := os.Create(fileName)
if err != nil {
return fmt.Errorf("create new file %s: %w", fileName, err)
}
r.file = file
r.writer = bufio.NewWriter(file)
r.totalDataSize = 0
r.segmentStart = time.Now()
// Write new WAV header
if err := r.writeWAVHeader(0); err != nil {
return fmt.Errorf("write new header: %w", err)
}
slog.Info("Rotated to new file", "filename", fileName)
r.path.eventChan <- Event{Type: "segment_complete", Data: fileName}
return nil
}
// muLawToPCM converts μ-law samples to 16-bit PCM // muLawToPCM converts μ-law samples to 16-bit PCM
func muLawToPCM(sample byte) int16 { func muLawToPCM(sample byte) int16 {
const muLawBias = 33 const muLawBias = 33
@ -193,7 +258,7 @@ func (r *Recorder) Start() {
if err := r.writeWAVHeader(0); err != nil { if err := r.writeWAVHeader(0); err != nil {
slog.Error("Write header error", "error", err) slog.Error("Write header error", "error", err)
} }
var totalDataSize uint32 r.totalDataSize = 0 // Moved from local variable to struct field
go func() { go func() {
defer r.file.Close() defer r.file.Close()
@ -202,6 +267,13 @@ func (r *Recorder) Start() {
select { select {
case data := <-r.dataChan: case data := <-r.dataChan:
if data.Media.Type == "audio" { if data.Media.Type == "audio" {
// Check for rotation
if time.Since(r.segmentStart) >= r.segmentDur { // Added
if err := r.rotateFile(); err != nil {
slog.Error("File rotation error", "error", err)
}
}
// Unmarshal G.711 payload // Unmarshal G.711 payload
var samples []byte var samples []byte
var err error var err error
@ -219,6 +291,7 @@ func (r *Recorder) Start() {
continue continue
} }
// Convert to 16-bit PCM and write // Convert to 16-bit PCM and write
r.fileMu.Lock() // Added
for _, sample := range samples { for _, sample := range samples {
var pcmSample int16 var pcmSample int16
if r.isMuLaw { if r.isMuLaw {
@ -232,11 +305,13 @@ func (r *Recorder) Start() {
if err != nil { if err != nil {
slog.Error("Write error", "error", err) slog.Error("Write error", "error", err)
} }
totalDataSize += 2 r.totalDataSize += 2
} }
r.fileMu.Unlock() // Added
} }
case <-r.terminate: case <-r.terminate:
// Update WAV header with actual data size // Update WAV header with final data size
r.fileMu.Lock() // Added
if err := r.writer.Flush(); err != nil { if err := r.writer.Flush(); err != nil {
slog.Error("Flush error", "error", err) slog.Error("Flush error", "error", err)
} }
@ -245,14 +320,15 @@ func (r *Recorder) Start() {
return return
} }
var buf [4]byte var buf [4]byte
binary.LittleEndian.PutUint32(buf[:], 36+totalDataSize) binary.LittleEndian.PutUint32(buf[:], 36+r.totalDataSize)
r.file.Write(buf[:]) r.file.Write(buf[:])
if _, err := r.file.Seek(40, 0); err != nil { if _, err := r.file.Seek(40, 0); err != nil {
slog.Error("Seek error", "error", err) slog.Error("Seek error", "error", err)
return return
} }
binary.LittleEndian.PutUint32(buf[:], totalDataSize) binary.LittleEndian.PutUint32(buf[:], r.totalDataSize)
r.file.Write(buf[:]) r.file.Write(buf[:])
r.fileMu.Unlock() // Added
return return
} }
} }
@ -285,6 +361,16 @@ func main() {
os.Exit(1) os.Exit(1)
} }
segmentDur := 5 * time.Minute
if conf.RecordSegmentDuration != "" {
dur, err := time.ParseDuration(conf.RecordSegmentDuration)
if err != nil {
slog.Error("Invalid record_segment_duration", "value", conf.RecordSegmentDuration, "error", err)
os.Exit(1)
}
segmentDur = dur
}
// Step 2: Set up event bus // Step 2: Set up event bus
eventChan := make(chan Event, 10) eventChan := make(chan Event, 10)
go func() { go func() {
@ -343,7 +429,7 @@ func main() {
// Step 5: Set up recorder (choose μ-law or A-law based on format) // Step 5: Set up recorder (choose μ-law or A-law based on format)
isMuLaw := audioFormat.MULaw isMuLaw := audioFormat.MULaw
recorder, err := NewRecorder(path, conf.OutputFile, isMuLaw) recorder, err := NewRecorder(path, conf.OutputFile, isMuLaw, segmentDur) // Modified: added segmentDur
if err != nil { if err != nil {
slog.Error("Create recorder", "error", err) slog.Error("Create recorder", "error", err)
os.Exit(1) os.Exit(1)