Compare commits
1 Commits
Author | SHA1 | Date | |
---|---|---|---|
265c142edb |
@ -1,2 +1,3 @@
|
|||||||
rtsp_url: "rtsp://10.10.10.104:8554/union"
|
rtsp_url: "rtsp://10.10.10.104:8554/union"
|
||||||
output_file: "output.wav"
|
output_file: "output.wav"
|
||||||
|
record_segment_duration: "30m" # Duration for each recording segment
|
148
main.go
148
main.go
@ -6,7 +6,10 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/bluenviron/gortsplib/v4"
|
"github.com/bluenviron/gortsplib/v4"
|
||||||
"github.com/bluenviron/gortsplib/v4/pkg/base"
|
"github.com/bluenviron/gortsplib/v4/pkg/base"
|
||||||
@ -17,10 +20,11 @@ import (
|
|||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Config holds RTSP URL and output file path
|
// Config holds RTSP URL, output file path, and rotation settings
|
||||||
type Config struct {
|
type Config struct {
|
||||||
RTSPURL string `yaml:"rtsp_url"`
|
RTSPURL string `yaml:"rtsp_url"`
|
||||||
OutputFile string `yaml:"output_file"`
|
OutputFile string `yaml:"output_file"`
|
||||||
|
RecordSegmentDuration string `yaml:"record_segment_duration"` // Added
|
||||||
}
|
}
|
||||||
|
|
||||||
// Event represents lifecycle events (e.g., source ready, recording started)
|
// Event represents lifecycle events (e.g., source ready, recording started)
|
||||||
@ -44,17 +48,22 @@ type Data struct {
|
|||||||
Pkt *rtp.Packet
|
Pkt *rtp.Packet
|
||||||
}
|
}
|
||||||
|
|
||||||
// Recorder saves packets to a file
|
// Recorder saves packets to a file with rotation
|
||||||
type Recorder struct {
|
type Recorder struct {
|
||||||
name string
|
name string
|
||||||
path *Path
|
path *Path
|
||||||
file *os.File
|
file *os.File
|
||||||
writer *bufio.Writer
|
writer *bufio.Writer
|
||||||
isMuLaw bool // true for μ-law, false for A-law
|
isMuLaw bool // true for μ-law, false for A-law
|
||||||
dataChan chan *Data
|
dataChan chan *Data
|
||||||
terminate chan struct{}
|
terminate chan struct{}
|
||||||
sampleRate int
|
sampleRate int
|
||||||
channels int
|
channels int
|
||||||
|
outputBase string // Base filename (without timestamp) // Added
|
||||||
|
segmentDur time.Duration // Added
|
||||||
|
segmentStart time.Time // Added
|
||||||
|
totalDataSize uint32 // Added (moved from local variable)
|
||||||
|
fileMu sync.Mutex // Protects file operations // Added
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPath creates a stream hub
|
// NewPath creates a stream hub
|
||||||
@ -103,26 +112,38 @@ func (p *Path) SourceReady(audioMedia *description.Media, audioFormat format.For
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewRecorder creates a recorder that subscribes to the path
|
// NewRecorder creates a recorder that subscribes to the path
|
||||||
func NewRecorder(path *Path, outputFile string, isMuLaw bool) (*Recorder, error) {
|
func NewRecorder(path *Path, outputFile string, isMuLaw bool, segmentDur time.Duration) (*Recorder, error) {
|
||||||
file, err := os.Create(outputFile)
|
// Initialize with a timestamped filename
|
||||||
|
fileName := formatFileName(outputFile, time.Now()) // Added
|
||||||
|
file, err := os.Create(fileName) // Modified
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("create file: %w", err)
|
return nil, fmt.Errorf("create file %s: %w", fileName, err) // Modified
|
||||||
}
|
}
|
||||||
r := &Recorder{
|
r := &Recorder{
|
||||||
name: "recorder",
|
name: "recorder",
|
||||||
path: path,
|
path: path,
|
||||||
file: file,
|
file: file,
|
||||||
writer: bufio.NewWriter(file),
|
writer: bufio.NewWriter(file),
|
||||||
isMuLaw: isMuLaw,
|
isMuLaw: isMuLaw,
|
||||||
dataChan: make(chan *Data, 100), // Buffered channel
|
dataChan: make(chan *Data, 100), // Buffered channel
|
||||||
terminate: make(chan struct{}),
|
terminate: make(chan struct{}),
|
||||||
sampleRate: 8000, // Fixed for G.711
|
sampleRate: 8000, // Fixed for G.711
|
||||||
channels: 1, // Mono
|
channels: 1, // Mono
|
||||||
|
outputBase: outputFile, // Added
|
||||||
|
segmentDur: segmentDur, // Added
|
||||||
|
segmentStart: time.Now(), // Added
|
||||||
}
|
}
|
||||||
path.AddReader(r.name, r.dataChan)
|
path.AddReader(r.name, r.dataChan)
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// formatFileName generates a timestamped filename (e.g., output_2025-08-19_10-24-00.wav)
|
||||||
|
func formatFileName(baseName string, t time.Time) string {
|
||||||
|
ext := filepath.Ext(baseName)
|
||||||
|
name := strings.TrimSuffix(baseName, ext)
|
||||||
|
return fmt.Sprintf("%s_%s%s", name, t.Format("2006-01-02_15-04-05"), ext)
|
||||||
|
}
|
||||||
|
|
||||||
// writeWAVHeader writes a basic WAV header for PCM audio
|
// writeWAVHeader writes a basic WAV header for PCM audio
|
||||||
func (r *Recorder) writeWAVHeader(dataSize uint32) error {
|
func (r *Recorder) writeWAVHeader(dataSize uint32) error {
|
||||||
header := make([]byte, 44)
|
header := make([]byte, 44)
|
||||||
@ -143,6 +164,50 @@ func (r *Recorder) writeWAVHeader(dataSize uint32) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rotateFile closes the current file and opens a new one
|
||||||
|
func (r *Recorder) rotateFile() error {
|
||||||
|
r.fileMu.Lock()
|
||||||
|
defer r.fileMu.Unlock()
|
||||||
|
|
||||||
|
// Flush and close current file
|
||||||
|
if err := r.writer.Flush(); err != nil {
|
||||||
|
return fmt.Errorf("flush file: %w", err)
|
||||||
|
}
|
||||||
|
if _, err := r.file.Seek(4, 0); err != nil {
|
||||||
|
return fmt.Errorf("seek file: %w", err)
|
||||||
|
}
|
||||||
|
var buf [4]byte
|
||||||
|
binary.LittleEndian.PutUint32(buf[:], 36+r.totalDataSize)
|
||||||
|
r.file.Write(buf[:])
|
||||||
|
if _, err := r.file.Seek(40, 0); err != nil {
|
||||||
|
return fmt.Errorf("seek file: %w", err)
|
||||||
|
}
|
||||||
|
binary.LittleEndian.PutUint32(buf[:], r.totalDataSize)
|
||||||
|
r.file.Write(buf[:])
|
||||||
|
if err := r.file.Close(); err != nil {
|
||||||
|
return fmt.Errorf("close file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open new file
|
||||||
|
fileName := formatFileName(r.outputBase, time.Now())
|
||||||
|
file, err := os.Create(fileName)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("create new file %s: %w", fileName, err)
|
||||||
|
}
|
||||||
|
r.file = file
|
||||||
|
r.writer = bufio.NewWriter(file)
|
||||||
|
r.totalDataSize = 0
|
||||||
|
r.segmentStart = time.Now()
|
||||||
|
|
||||||
|
// Write new WAV header
|
||||||
|
if err := r.writeWAVHeader(0); err != nil {
|
||||||
|
return fmt.Errorf("write new header: %w", err)
|
||||||
|
}
|
||||||
|
slog.Info("Rotated to new file", "filename", fileName)
|
||||||
|
r.path.eventChan <- Event{Type: "segment_complete", Data: fileName}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// muLawToPCM converts μ-law samples to 16-bit PCM
|
// muLawToPCM converts μ-law samples to 16-bit PCM
|
||||||
func muLawToPCM(sample byte) int16 {
|
func muLawToPCM(sample byte) int16 {
|
||||||
const muLawBias = 33
|
const muLawBias = 33
|
||||||
@ -193,7 +258,7 @@ func (r *Recorder) Start() {
|
|||||||
if err := r.writeWAVHeader(0); err != nil {
|
if err := r.writeWAVHeader(0); err != nil {
|
||||||
slog.Error("Write header error", "error", err)
|
slog.Error("Write header error", "error", err)
|
||||||
}
|
}
|
||||||
var totalDataSize uint32
|
r.totalDataSize = 0 // Moved from local variable to struct field
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer r.file.Close()
|
defer r.file.Close()
|
||||||
@ -202,6 +267,13 @@ func (r *Recorder) Start() {
|
|||||||
select {
|
select {
|
||||||
case data := <-r.dataChan:
|
case data := <-r.dataChan:
|
||||||
if data.Media.Type == "audio" {
|
if data.Media.Type == "audio" {
|
||||||
|
// Check for rotation
|
||||||
|
if time.Since(r.segmentStart) >= r.segmentDur { // Added
|
||||||
|
if err := r.rotateFile(); err != nil {
|
||||||
|
slog.Error("File rotation error", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Unmarshal G.711 payload
|
// Unmarshal G.711 payload
|
||||||
var samples []byte
|
var samples []byte
|
||||||
var err error
|
var err error
|
||||||
@ -219,6 +291,7 @@ func (r *Recorder) Start() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Convert to 16-bit PCM and write
|
// Convert to 16-bit PCM and write
|
||||||
|
r.fileMu.Lock() // Added
|
||||||
for _, sample := range samples {
|
for _, sample := range samples {
|
||||||
var pcmSample int16
|
var pcmSample int16
|
||||||
if r.isMuLaw {
|
if r.isMuLaw {
|
||||||
@ -232,11 +305,13 @@ func (r *Recorder) Start() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("Write error", "error", err)
|
slog.Error("Write error", "error", err)
|
||||||
}
|
}
|
||||||
totalDataSize += 2
|
r.totalDataSize += 2
|
||||||
}
|
}
|
||||||
|
r.fileMu.Unlock() // Added
|
||||||
}
|
}
|
||||||
case <-r.terminate:
|
case <-r.terminate:
|
||||||
// Update WAV header with actual data size
|
// Update WAV header with final data size
|
||||||
|
r.fileMu.Lock() // Added
|
||||||
if err := r.writer.Flush(); err != nil {
|
if err := r.writer.Flush(); err != nil {
|
||||||
slog.Error("Flush error", "error", err)
|
slog.Error("Flush error", "error", err)
|
||||||
}
|
}
|
||||||
@ -245,14 +320,15 @@ func (r *Recorder) Start() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
var buf [4]byte
|
var buf [4]byte
|
||||||
binary.LittleEndian.PutUint32(buf[:], 36+totalDataSize)
|
binary.LittleEndian.PutUint32(buf[:], 36+r.totalDataSize)
|
||||||
r.file.Write(buf[:])
|
r.file.Write(buf[:])
|
||||||
if _, err := r.file.Seek(40, 0); err != nil {
|
if _, err := r.file.Seek(40, 0); err != nil {
|
||||||
slog.Error("Seek error", "error", err)
|
slog.Error("Seek error", "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
binary.LittleEndian.PutUint32(buf[:], totalDataSize)
|
binary.LittleEndian.PutUint32(buf[:], r.totalDataSize)
|
||||||
r.file.Write(buf[:])
|
r.file.Write(buf[:])
|
||||||
|
r.fileMu.Unlock() // Added
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -285,6 +361,16 @@ func main() {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
segmentDur := 5 * time.Minute
|
||||||
|
if conf.RecordSegmentDuration != "" {
|
||||||
|
dur, err := time.ParseDuration(conf.RecordSegmentDuration)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("Invalid record_segment_duration", "value", conf.RecordSegmentDuration, "error", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
segmentDur = dur
|
||||||
|
}
|
||||||
|
|
||||||
// Step 2: Set up event bus
|
// Step 2: Set up event bus
|
||||||
eventChan := make(chan Event, 10)
|
eventChan := make(chan Event, 10)
|
||||||
go func() {
|
go func() {
|
||||||
@ -343,7 +429,7 @@ func main() {
|
|||||||
|
|
||||||
// Step 5: Set up recorder (choose μ-law or A-law based on format)
|
// Step 5: Set up recorder (choose μ-law or A-law based on format)
|
||||||
isMuLaw := audioFormat.MULaw
|
isMuLaw := audioFormat.MULaw
|
||||||
recorder, err := NewRecorder(path, conf.OutputFile, isMuLaw)
|
recorder, err := NewRecorder(path, conf.OutputFile, isMuLaw, segmentDur) // Modified: added segmentDur
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("Create recorder", "error", err)
|
slog.Error("Create recorder", "error", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
Reference in New Issue
Block a user