diff --git a/config.yaml b/config.yaml index f4370d1..e532c63 100644 --- a/config.yaml +++ b/config.yaml @@ -1,3 +1,5 @@ rtsp_url: "rtsp://10.10.10.104:8554/union" output_file: "output.wav" -record_segment_duration: "30m" # Duration for each recording segment \ No newline at end of file +record_segment_duration: "30m" # Duration for each recording segment +vad_mode: 1 +frame_ms: 10 \ No newline at end of file diff --git a/go.mod b/go.mod index 2ffcfd2..14650e9 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25 require ( github.com/bluenviron/gortsplib/v4 v4.16.2 github.com/bluenviron/mediacommon/v2 v2.4.1 + github.com/maxhawkins/go-webrtcvad v0.0.0-20210121163624-be60036f3083 github.com/pion/rtp v1.8.21 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index e3f622c..f38aca9 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/maxhawkins/go-webrtcvad v0.0.0-20210121163624-be60036f3083 h1:0JDcvP4R28p6+u8VIHCwYx7UwiHZ074INz3C397oc9s= +github.com/maxhawkins/go-webrtcvad v0.0.0-20210121163624-be60036f3083/go.mod h1:YdrZ05xnooeP54y7m+/UvI23O1Td46PjWkLJu1VLObM= github.com/pion/logging v0.2.4 h1:tTew+7cmQ+Mc1pTBLKH2puKsOvhm32dROumOZ655zB8= github.com/pion/logging v0.2.4/go.mod h1:DffhXTKYdNZU+KtJ5pyQDjvOAh/GsNSyv1lbkFbe3so= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= diff --git a/main.go b/main.go index 728fa20..4db1705 100644 --- a/main.go +++ b/main.go @@ -16,15 +16,18 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/v2/pkg/codecs/g711" + "github.com/maxhawkins/go-webrtcvad" "github.com/pion/rtp" "gopkg.in/yaml.v3" ) -// Config holds RTSP URL, output file path, and rotation settings +// Config holds RTSP URL, output file path, rotation settings, and VAD settings type Config struct { RTSPURL string `yaml:"rtsp_url"` OutputFile string `yaml:"output_file"` - RecordSegmentDuration string `yaml:"record_segment_duration"` // Added + RecordSegmentDuration string `yaml:"record_segment_duration"` + VADMode int `yaml:"vad_mode"` + FrameMs int `yaml:"frame_ms"` } // Event represents lifecycle events (e.g., source ready, recording started) @@ -66,6 +69,19 @@ type Recorder struct { fileMu sync.Mutex // Protects file operations // Added } +// VADReader detects speech in audio packets using WebRTC VAD +type VADReader struct { + name string + path *Path + isMuLaw bool + dataChan chan *Data + terminate chan struct{} + vad *webrtcvad.VAD + sampleRate int + frameMs int + segmentStart time.Time // Tracks start of current audio/silence segment +} + // NewPath creates a stream hub func NewPath(eventChan chan Event) *Path { return &Path{ @@ -341,6 +357,135 @@ func (r *Recorder) Stop() { close(r.terminate) } +// NewVADReader creates a VAD reader that subscribes to the path +func NewVADReader(path *Path, isMuLaw bool, vadMode, frameMs int) (*VADReader, error) { + vad, err := webrtcvad.New() + if err != nil { + return nil, fmt.Errorf("create VAD: %w", err) + } + if err := vad.SetMode(vadMode); err != nil { + return nil, fmt.Errorf("set VAD mode %d: %w", vadMode, err) + } + if frameMs != 10 && frameMs != 20 && frameMs != 30 { + return nil, fmt.Errorf("invalid frame duration %dms; must be 10, 20, or 30", frameMs) + } + sampleRate := 8000 + frameSamples := sampleRate * frameMs / 1000 + frameBytes := frameSamples * 2 // 16-bit PCM + if !vad.ValidRateAndFrameLength(sampleRate, frameBytes) { + return nil, fmt.Errorf("invalid VAD parameters: sample_rate=%d, frame_bytes=%d", sampleRate, frameBytes) + } + + r := &VADReader{ + name: "vad_reader", + path: path, + isMuLaw: isMuLaw, + dataChan: make(chan *Data, 100), // Buffered channel + terminate: make(chan struct{}), + vad: vad, + sampleRate: sampleRate, + frameMs: frameMs, + segmentStart: time.Now(), + } + path.AddReader(r.name, r.dataChan) + return r, nil +} + +// Start runs the VAD reader's async processing loop +func (r *VADReader) Start() { + go func() { + var pcmBuffer []byte + var isSilent = true + var audioStart time.Time // Added to track audio duration + const minSilenceDuration = 4 * time.Second // Changed from 2s + const maxSilenceDuration = 6 * time.Second // Added + + frameSamples := r.sampleRate * r.frameMs / 1000 + frameBytes := frameSamples * 2 // 16-bit PCM + for { + select { + case data := <-r.dataChan: + if data.Media.Type == "audio" { + // Unmarshal G.711 payload + var samples []byte + var err error + if r.isMuLaw { + var mu g711.Mulaw + mu.Unmarshal(data.Pkt.Payload) + samples = []byte(mu) + } else { + var al g711.Alaw + al.Unmarshal(data.Pkt.Payload) + samples = []byte(al) + } + if err != nil { + slog.Error("Unmarshal audio error", "error", err) + continue + } + // Convert to 16-bit PCM + for _, sample := range samples { + var pcmSample int16 + if r.isMuLaw { + pcmSample = muLawToPCM(sample) + } else { + pcmSample = aLawToPCM(sample) + } + var buf [2]byte + binary.LittleEndian.PutUint16(buf[:], uint16(pcmSample)) + pcmBuffer = append(pcmBuffer, buf[:]...) + } + + // Process VAD frames + for len(pcmBuffer) >= frameBytes { + frame := pcmBuffer[:frameBytes] + pcmBuffer = pcmBuffer[frameBytes:] + + active, err := r.vad.Process(r.sampleRate, frame) + if err != nil { + slog.Warn("VAD processing error", "error", err) + continue + } + + now := time.Now() + if active && isSilent { + slog.Info("Speech detected", "timestamp", now.Format("2006-01-02 15:04:05")) + r.path.eventChan <- Event{Type: "speech_detected", Data: now} + isSilent = false + r.segmentStart = now + audioStart = now // Added + } else if !active && !isSilent { + if r.segmentStart.IsZero() { + r.segmentStart = now + } else if now.Sub(r.segmentStart) >= minSilenceDuration && now.Sub(r.segmentStart) <= maxSilenceDuration { + var audioDurationMs int64 + if !audioStart.IsZero() { + audioDurationMs = now.Sub(audioStart).Milliseconds() // Added + } + slog.Info("Silence detected", + "timestamp", now.Format("2006-01-02 15:04:05"), + "silence_duration_ms", now.Sub(r.segmentStart).Milliseconds(), + "audio_duration_ms", audioDurationMs) // Added + r.path.eventChan <- Event{Type: "silence_detected", Data: now} + isSilent = true + r.segmentStart = time.Time{} // Modified: reset to allow new silence detection + audioStart = time.Time{} // Added + } + } + } + } + case <-r.terminate: + return + } + } + }() +} + +// Stop terminates the VAD reader +func (r *VADReader) Stop() { + r.path.RemoveReader(r.name) + close(r.terminate) +} + func main() { // Initialize slog (default handler for structured logging) slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))) @@ -361,6 +506,7 @@ func main() { os.Exit(1) } + // Parse segment duration (default to 5 minutes) segmentDur := 5 * time.Minute if conf.RecordSegmentDuration != "" { dur, err := time.ParseDuration(conf.RecordSegmentDuration) @@ -371,6 +517,24 @@ func main() { segmentDur = dur } + // Parse VAD settings (default: mode=3, frame_ms=20) + vadMode := 3 + frameMs := 20 + if conf.VADMode != 0 { + if conf.VADMode < 0 || conf.VADMode > 3 { + slog.Error("Invalid vad_mode", "value", conf.VADMode, "allowed", "0-3") + os.Exit(1) + } + vadMode = conf.VADMode + } + if conf.FrameMs != 0 { + if conf.FrameMs != 10 && conf.FrameMs != 20 && conf.FrameMs != 30 { + slog.Error("Invalid frame_ms", "value", conf.FrameMs, "allowed", "10, 20, 30") + os.Exit(1) + } + frameMs = conf.FrameMs + } + // Step 2: Set up event bus eventChan := make(chan Event, 10) go func() { @@ -436,10 +600,18 @@ func main() { } recorder.Start() - // Step 6: Signal source ready + // Step 6: Set up VAD reader + vadReader, err := NewVADReader(path, isMuLaw, vadMode, frameMs) + if err != nil { + slog.Error("Create VAD reader", "error", err) + os.Exit(1) + } + vadReader.Start() + + // Step 7: Signal source ready path.SourceReady(audioMedia, audioFormat) - // Step 7: Read RTP packets and distribute + // Step 8: Read RTP packets and distribute go func() { c.OnPacketRTP(audioMedia, audioFormat, func(pkt *rtp.Packet) { path.DistributeData(&Data{Media: audioMedia, Pkt: pkt})