1 Commits

2 changed files with 118 additions and 31 deletions

View File

@ -1,2 +1,3 @@
rtsp_url: "rtsp://10.10.10.104:8554/union"
output_file: "output.wav"
record_segment_duration: "30m" # Duration for each recording segment

148
main.go
View File

@ -6,7 +6,10 @@ import (
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
@ -17,10 +20,11 @@ import (
"gopkg.in/yaml.v3"
)
// Config holds RTSP URL and output file path
// Config holds RTSP URL, output file path, and rotation settings
type Config struct {
RTSPURL string `yaml:"rtsp_url"`
OutputFile string `yaml:"output_file"`
RTSPURL string `yaml:"rtsp_url"`
OutputFile string `yaml:"output_file"`
RecordSegmentDuration string `yaml:"record_segment_duration"` // Added
}
// Event represents lifecycle events (e.g., source ready, recording started)
@ -44,17 +48,22 @@ type Data struct {
Pkt *rtp.Packet
}
// Recorder saves packets to a file
// Recorder saves packets to a file with rotation
type Recorder struct {
name string
path *Path
file *os.File
writer *bufio.Writer
isMuLaw bool // true for μ-law, false for A-law
dataChan chan *Data
terminate chan struct{}
sampleRate int
channels int
name string
path *Path
file *os.File
writer *bufio.Writer
isMuLaw bool // true for μ-law, false for A-law
dataChan chan *Data
terminate chan struct{}
sampleRate int
channels int
outputBase string // Base filename (without timestamp) // Added
segmentDur time.Duration // Added
segmentStart time.Time // Added
totalDataSize uint32 // Added (moved from local variable)
fileMu sync.Mutex // Protects file operations // Added
}
// NewPath creates a stream hub
@ -103,26 +112,38 @@ func (p *Path) SourceReady(audioMedia *description.Media, audioFormat format.For
}
// NewRecorder creates a recorder that subscribes to the path
func NewRecorder(path *Path, outputFile string, isMuLaw bool) (*Recorder, error) {
file, err := os.Create(outputFile)
func NewRecorder(path *Path, outputFile string, isMuLaw bool, segmentDur time.Duration) (*Recorder, error) {
// Initialize with a timestamped filename
fileName := formatFileName(outputFile, time.Now()) // Added
file, err := os.Create(fileName) // Modified
if err != nil {
return nil, fmt.Errorf("create file: %w", err)
return nil, fmt.Errorf("create file %s: %w", fileName, err) // Modified
}
r := &Recorder{
name: "recorder",
path: path,
file: file,
writer: bufio.NewWriter(file),
isMuLaw: isMuLaw,
dataChan: make(chan *Data, 100), // Buffered channel
terminate: make(chan struct{}),
sampleRate: 8000, // Fixed for G.711
channels: 1, // Mono
name: "recorder",
path: path,
file: file,
writer: bufio.NewWriter(file),
isMuLaw: isMuLaw,
dataChan: make(chan *Data, 100), // Buffered channel
terminate: make(chan struct{}),
sampleRate: 8000, // Fixed for G.711
channels: 1, // Mono
outputBase: outputFile, // Added
segmentDur: segmentDur, // Added
segmentStart: time.Now(), // Added
}
path.AddReader(r.name, r.dataChan)
return r, nil
}
// formatFileName generates a timestamped filename (e.g., output_2025-08-19_10-24-00.wav)
func formatFileName(baseName string, t time.Time) string {
ext := filepath.Ext(baseName)
name := strings.TrimSuffix(baseName, ext)
return fmt.Sprintf("%s_%s%s", name, t.Format("2006-01-02_15-04-05"), ext)
}
// writeWAVHeader writes a basic WAV header for PCM audio
func (r *Recorder) writeWAVHeader(dataSize uint32) error {
header := make([]byte, 44)
@ -143,6 +164,50 @@ func (r *Recorder) writeWAVHeader(dataSize uint32) error {
return err
}
// rotateFile closes the current file and opens a new one
func (r *Recorder) rotateFile() error {
r.fileMu.Lock()
defer r.fileMu.Unlock()
// Flush and close current file
if err := r.writer.Flush(); err != nil {
return fmt.Errorf("flush file: %w", err)
}
if _, err := r.file.Seek(4, 0); err != nil {
return fmt.Errorf("seek file: %w", err)
}
var buf [4]byte
binary.LittleEndian.PutUint32(buf[:], 36+r.totalDataSize)
r.file.Write(buf[:])
if _, err := r.file.Seek(40, 0); err != nil {
return fmt.Errorf("seek file: %w", err)
}
binary.LittleEndian.PutUint32(buf[:], r.totalDataSize)
r.file.Write(buf[:])
if err := r.file.Close(); err != nil {
return fmt.Errorf("close file: %w", err)
}
// Open new file
fileName := formatFileName(r.outputBase, time.Now())
file, err := os.Create(fileName)
if err != nil {
return fmt.Errorf("create new file %s: %w", fileName, err)
}
r.file = file
r.writer = bufio.NewWriter(file)
r.totalDataSize = 0
r.segmentStart = time.Now()
// Write new WAV header
if err := r.writeWAVHeader(0); err != nil {
return fmt.Errorf("write new header: %w", err)
}
slog.Info("Rotated to new file", "filename", fileName)
r.path.eventChan <- Event{Type: "segment_complete", Data: fileName}
return nil
}
// muLawToPCM converts μ-law samples to 16-bit PCM
func muLawToPCM(sample byte) int16 {
const muLawBias = 33
@ -193,7 +258,7 @@ func (r *Recorder) Start() {
if err := r.writeWAVHeader(0); err != nil {
slog.Error("Write header error", "error", err)
}
var totalDataSize uint32
r.totalDataSize = 0 // Moved from local variable to struct field
go func() {
defer r.file.Close()
@ -202,6 +267,13 @@ func (r *Recorder) Start() {
select {
case data := <-r.dataChan:
if data.Media.Type == "audio" {
// Check for rotation
if time.Since(r.segmentStart) >= r.segmentDur { // Added
if err := r.rotateFile(); err != nil {
slog.Error("File rotation error", "error", err)
}
}
// Unmarshal G.711 payload
var samples []byte
var err error
@ -219,6 +291,7 @@ func (r *Recorder) Start() {
continue
}
// Convert to 16-bit PCM and write
r.fileMu.Lock() // Added
for _, sample := range samples {
var pcmSample int16
if r.isMuLaw {
@ -232,11 +305,13 @@ func (r *Recorder) Start() {
if err != nil {
slog.Error("Write error", "error", err)
}
totalDataSize += 2
r.totalDataSize += 2
}
r.fileMu.Unlock() // Added
}
case <-r.terminate:
// Update WAV header with actual data size
// Update WAV header with final data size
r.fileMu.Lock() // Added
if err := r.writer.Flush(); err != nil {
slog.Error("Flush error", "error", err)
}
@ -245,14 +320,15 @@ func (r *Recorder) Start() {
return
}
var buf [4]byte
binary.LittleEndian.PutUint32(buf[:], 36+totalDataSize)
binary.LittleEndian.PutUint32(buf[:], 36+r.totalDataSize)
r.file.Write(buf[:])
if _, err := r.file.Seek(40, 0); err != nil {
slog.Error("Seek error", "error", err)
return
}
binary.LittleEndian.PutUint32(buf[:], totalDataSize)
binary.LittleEndian.PutUint32(buf[:], r.totalDataSize)
r.file.Write(buf[:])
r.fileMu.Unlock() // Added
return
}
}
@ -285,6 +361,16 @@ func main() {
os.Exit(1)
}
segmentDur := 5 * time.Minute
if conf.RecordSegmentDuration != "" {
dur, err := time.ParseDuration(conf.RecordSegmentDuration)
if err != nil {
slog.Error("Invalid record_segment_duration", "value", conf.RecordSegmentDuration, "error", err)
os.Exit(1)
}
segmentDur = dur
}
// Step 2: Set up event bus
eventChan := make(chan Event, 10)
go func() {
@ -343,7 +429,7 @@ func main() {
// Step 5: Set up recorder (choose μ-law or A-law based on format)
isMuLaw := audioFormat.MULaw
recorder, err := NewRecorder(path, conf.OutputFile, isMuLaw)
recorder, err := NewRecorder(path, conf.OutputFile, isMuLaw, segmentDur) // Modified: added segmentDur
if err != nil {
slog.Error("Create recorder", "error", err)
os.Exit(1)