Files
go-two-tone-detector-service/main.go

637 lines
18 KiB
Go

package main
import (
"bufio"
"encoding/binary"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/v2/pkg/codecs/g711"
"github.com/maxhawkins/go-webrtcvad"
"github.com/pion/rtp"
"gopkg.in/yaml.v3"
)
// Config holds RTSP URL, output file path, rotation settings, and VAD settings
type Config struct {
RTSPURL string `yaml:"rtsp_url"`
OutputFile string `yaml:"output_file"`
RecordSegmentDuration string `yaml:"record_segment_duration"`
VADMode int `yaml:"vad_mode"`
FrameMs int `yaml:"frame_ms"`
}
// Event represents lifecycle events (e.g., source ready, recording started)
type Event struct {
Type string
Data interface{}
}
// Path manages a stream, distributing packets to readers/recorders
type Path struct {
audioMedia *description.Media
audioFormat format.Format
readers map[string]chan *Data
readerMu sync.RWMutex
eventChan chan Event
}
// Data holds an RTP packet and its media type
type Data struct {
Media *description.Media
Pkt *rtp.Packet
}
// Recorder saves packets to a file with rotation
type Recorder struct {
name string
path *Path
file *os.File
writer *bufio.Writer
isMuLaw bool // true for μ-law, false for A-law
dataChan chan *Data
terminate chan struct{}
sampleRate int
channels int
outputBase string // Base filename (without timestamp) // Added
segmentDur time.Duration // Added
segmentStart time.Time // Added
totalDataSize uint32 // Added (moved from local variable)
fileMu sync.Mutex // Protects file operations // Added
}
// VADReader detects speech in audio packets using WebRTC VAD
type VADReader struct {
name string
path *Path
isMuLaw bool
dataChan chan *Data
terminate chan struct{}
vad *webrtcvad.VAD
sampleRate int
frameMs int
segmentStart time.Time // Tracks start of current audio/silence segment
}
// NewPath creates a stream hub
func NewPath(eventChan chan Event) *Path {
return &Path{
readers: make(map[string]chan *Data),
eventChan: eventChan,
}
}
// AddReader adds a reader (e.g., recorder) with its own channel
func (p *Path) AddReader(name string, ch chan *Data) {
p.readerMu.Lock()
defer p.readerMu.Unlock()
p.readers[name] = ch
}
// RemoveReader removes a reader
func (p *Path) RemoveReader(name string) {
p.readerMu.Lock()
defer p.readerMu.Unlock()
if ch, ok := p.readers[name]; ok {
close(ch)
delete(p.readers, name)
}
}
// DistributeData sends packets to all readers
func (p *Path) DistributeData(data *Data) {
p.readerMu.RLock()
defer p.readerMu.RUnlock()
for _, ch := range p.readers {
select {
case ch <- data:
default:
slog.Warn("Reader channel full, dropping packet")
}
}
}
// SourceReady sets up the path with media info
func (p *Path) SourceReady(audioMedia *description.Media, audioFormat format.Format) {
p.audioMedia = audioMedia
p.audioFormat = audioFormat
p.eventChan <- Event{Type: "source_ready", Data: audioMedia}
}
// NewRecorder creates a recorder that subscribes to the path
func NewRecorder(path *Path, outputFile string, isMuLaw bool, segmentDur time.Duration) (*Recorder, error) {
// Initialize with a timestamped filename
fileName := formatFileName(outputFile, time.Now()) // Added
file, err := os.Create(fileName) // Modified
if err != nil {
return nil, fmt.Errorf("create file %s: %w", fileName, err) // Modified
}
r := &Recorder{
name: "recorder",
path: path,
file: file,
writer: bufio.NewWriter(file),
isMuLaw: isMuLaw,
dataChan: make(chan *Data, 100), // Buffered channel
terminate: make(chan struct{}),
sampleRate: 8000, // Fixed for G.711
channels: 1, // Mono
outputBase: outputFile, // Added
segmentDur: segmentDur, // Added
segmentStart: time.Now(), // Added
}
path.AddReader(r.name, r.dataChan)
return r, nil
}
// formatFileName generates a timestamped filename (e.g., output_2025-08-19_10-24-00.wav)
func formatFileName(baseName string, t time.Time) string {
ext := filepath.Ext(baseName)
name := strings.TrimSuffix(baseName, ext)
return fmt.Sprintf("%s_%s%s", name, t.Format("2006-01-02_15-04-05"), ext)
}
// writeWAVHeader writes a basic WAV header for PCM audio
func (r *Recorder) writeWAVHeader(dataSize uint32) error {
header := make([]byte, 44)
copy(header[0:4], "RIFF")
binary.LittleEndian.PutUint32(header[4:8], 36+dataSize) // ChunkSize
copy(header[8:12], "WAVE")
copy(header[12:16], "fmt ")
binary.LittleEndian.PutUint32(header[16:20], 16) // Subchunk1Size (16 for PCM)
binary.LittleEndian.PutUint16(header[20:22], 1) // AudioFormat (1 for PCM)
binary.LittleEndian.PutUint16(header[22:24], uint16(r.channels))
binary.LittleEndian.PutUint32(header[24:28], uint32(r.sampleRate))
binary.LittleEndian.PutUint32(header[28:32], uint32(r.sampleRate*r.channels*2)) // ByteRate
binary.LittleEndian.PutUint16(header[32:34], uint16(r.channels*2)) // BlockAlign
binary.LittleEndian.PutUint16(header[34:36], 16) // BitsPerSample
copy(header[36:40], "data")
binary.LittleEndian.PutUint32(header[40:44], dataSize)
_, err := r.writer.Write(header)
return err
}
// rotateFile closes the current file and opens a new one
func (r *Recorder) rotateFile() error {
r.fileMu.Lock()
defer r.fileMu.Unlock()
// Flush and close current file
if err := r.writer.Flush(); err != nil {
return fmt.Errorf("flush file: %w", err)
}
if _, err := r.file.Seek(4, 0); err != nil {
return fmt.Errorf("seek file: %w", err)
}
var buf [4]byte
binary.LittleEndian.PutUint32(buf[:], 36+r.totalDataSize)
r.file.Write(buf[:])
if _, err := r.file.Seek(40, 0); err != nil {
return fmt.Errorf("seek file: %w", err)
}
binary.LittleEndian.PutUint32(buf[:], r.totalDataSize)
r.file.Write(buf[:])
if err := r.file.Close(); err != nil {
return fmt.Errorf("close file: %w", err)
}
// Open new file
fileName := formatFileName(r.outputBase, time.Now())
file, err := os.Create(fileName)
if err != nil {
return fmt.Errorf("create new file %s: %w", fileName, err)
}
r.file = file
r.writer = bufio.NewWriter(file)
r.totalDataSize = 0
r.segmentStart = time.Now()
// Write new WAV header
if err := r.writeWAVHeader(0); err != nil {
return fmt.Errorf("write new header: %w", err)
}
slog.Info("Rotated to new file", "filename", fileName)
r.path.eventChan <- Event{Type: "segment_complete", Data: fileName}
return nil
}
// muLawToPCM converts μ-law samples to 16-bit PCM
func muLawToPCM(sample byte) int16 {
const muLawBias = 33
const muLawMax = 32767
var sign int
if sample&0x80 != 0 {
sign = -1
sample = ^sample
} else {
sign = 1
}
sample = sample & 0x7F
exponent := int(sample>>4) & 0x07
mantissa := int(sample & 0x0F)
value := (mantissa << (exponent + 3)) + muLawBias
if exponent > 0 {
value += (1 << (exponent + 2))
}
value -= muLawBias
return int16(sign * value * muLawMax / 8159)
}
// aLawToPCM converts A-law samples to 16-bit PCM
func aLawToPCM(sample byte) int16 {
const aLawMax = 32767
var sign int
if sample&0x80 != 0 {
sign = -1
sample = ^sample
} else {
sign = 1
}
sample = sample & 0x7F
exponent := int(sample>>4) & 0x07
mantissa := int(sample & 0x0F)
value := 0
if exponent == 0 {
value = mantissa
} else {
value = (mantissa << 4) + (1 << (exponent + 2))
}
return int16(sign * value * aLawMax / 4096)
}
// Start runs the recorder's async writing loop
func (r *Recorder) Start() {
// Placeholder for data size; update at end
if err := r.writeWAVHeader(0); err != nil {
slog.Error("Write header error", "error", err)
}
r.totalDataSize = 0 // Moved from local variable to struct field
go func() {
defer r.file.Close()
defer r.writer.Flush()
for {
select {
case data := <-r.dataChan:
if data.Media.Type == "audio" {
// Check for rotation
if time.Since(r.segmentStart) >= r.segmentDur { // Added
if err := r.rotateFile(); err != nil {
slog.Error("File rotation error", "error", err)
}
}
// Unmarshal G.711 payload
var samples []byte
var err error
if r.isMuLaw {
var mu g711.Mulaw
mu.Unmarshal(data.Pkt.Payload)
samples = []byte(mu)
} else {
var al g711.Alaw
al.Unmarshal(data.Pkt.Payload)
samples = []byte(al)
}
if err != nil {
slog.Error("Unmarshal audio error", "error", err)
continue
}
// Convert to 16-bit PCM and write
r.fileMu.Lock() // Added
for _, sample := range samples {
var pcmSample int16
if r.isMuLaw {
pcmSample = muLawToPCM(sample)
} else {
pcmSample = aLawToPCM(sample)
}
var buf [2]byte
binary.LittleEndian.PutUint16(buf[:], uint16(pcmSample))
_, err := r.writer.Write(buf[:])
if err != nil {
slog.Error("Write error", "error", err)
}
r.totalDataSize += 2
}
r.fileMu.Unlock() // Added
}
case <-r.terminate:
// Update WAV header with final data size
r.fileMu.Lock() // Added
if err := r.writer.Flush(); err != nil {
slog.Error("Flush error", "error", err)
}
if _, err := r.file.Seek(4, 0); err != nil {
slog.Error("Seek error", "error", err)
return
}
var buf [4]byte
binary.LittleEndian.PutUint32(buf[:], 36+r.totalDataSize)
r.file.Write(buf[:])
if _, err := r.file.Seek(40, 0); err != nil {
slog.Error("Seek error", "error", err)
return
}
binary.LittleEndian.PutUint32(buf[:], r.totalDataSize)
r.file.Write(buf[:])
r.fileMu.Unlock() // Added
return
}
}
}()
}
// Stop terminates the recorder
func (r *Recorder) Stop() {
r.path.RemoveReader(r.name)
close(r.terminate)
}
// NewVADReader creates a VAD reader that subscribes to the path
func NewVADReader(path *Path, isMuLaw bool, vadMode, frameMs int) (*VADReader, error) {
vad, err := webrtcvad.New()
if err != nil {
return nil, fmt.Errorf("create VAD: %w", err)
}
if err := vad.SetMode(vadMode); err != nil {
return nil, fmt.Errorf("set VAD mode %d: %w", vadMode, err)
}
if frameMs != 10 && frameMs != 20 && frameMs != 30 {
return nil, fmt.Errorf("invalid frame duration %dms; must be 10, 20, or 30", frameMs)
}
sampleRate := 8000
frameSamples := sampleRate * frameMs / 1000
frameBytes := frameSamples * 2 // 16-bit PCM
if !vad.ValidRateAndFrameLength(sampleRate, frameBytes) {
return nil, fmt.Errorf("invalid VAD parameters: sample_rate=%d, frame_bytes=%d", sampleRate, frameBytes)
}
r := &VADReader{
name: "vad_reader",
path: path,
isMuLaw: isMuLaw,
dataChan: make(chan *Data, 100), // Buffered channel
terminate: make(chan struct{}),
vad: vad,
sampleRate: sampleRate,
frameMs: frameMs,
segmentStart: time.Now(),
}
path.AddReader(r.name, r.dataChan)
return r, nil
}
// Start runs the VAD reader's async processing loop
func (r *VADReader) Start() {
go func() {
var pcmBuffer []byte
var isSilent = true
var audioStart time.Time // Added to track audio duration
const minSilenceDuration = 4 * time.Second // Changed from 2s
const maxSilenceDuration = 6 * time.Second // Added
frameSamples := r.sampleRate * r.frameMs / 1000
frameBytes := frameSamples * 2 // 16-bit PCM
for {
select {
case data := <-r.dataChan:
if data.Media.Type == "audio" {
// Unmarshal G.711 payload
var samples []byte
var err error
if r.isMuLaw {
var mu g711.Mulaw
mu.Unmarshal(data.Pkt.Payload)
samples = []byte(mu)
} else {
var al g711.Alaw
al.Unmarshal(data.Pkt.Payload)
samples = []byte(al)
}
if err != nil {
slog.Error("Unmarshal audio error", "error", err)
continue
}
// Convert to 16-bit PCM
for _, sample := range samples {
var pcmSample int16
if r.isMuLaw {
pcmSample = muLawToPCM(sample)
} else {
pcmSample = aLawToPCM(sample)
}
var buf [2]byte
binary.LittleEndian.PutUint16(buf[:], uint16(pcmSample))
pcmBuffer = append(pcmBuffer, buf[:]...)
}
// Process VAD frames
for len(pcmBuffer) >= frameBytes {
frame := pcmBuffer[:frameBytes]
pcmBuffer = pcmBuffer[frameBytes:]
active, err := r.vad.Process(r.sampleRate, frame)
if err != nil {
slog.Warn("VAD processing error", "error", err)
continue
}
now := time.Now()
if active && isSilent {
slog.Info("Speech detected", "timestamp", now.Format("2006-01-02 15:04:05"))
r.path.eventChan <- Event{Type: "speech_detected", Data: now}
isSilent = false
r.segmentStart = now
audioStart = now // Added
} else if !active && !isSilent {
if r.segmentStart.IsZero() {
r.segmentStart = now
} else if now.Sub(r.segmentStart) >= minSilenceDuration && now.Sub(r.segmentStart) <= maxSilenceDuration {
var audioDurationMs int64
if !audioStart.IsZero() {
audioDurationMs = now.Sub(audioStart).Milliseconds() // Added
}
slog.Info("Silence detected",
"timestamp", now.Format("2006-01-02 15:04:05"),
"silence_duration_ms", now.Sub(r.segmentStart).Milliseconds(),
"audio_duration_ms", audioDurationMs) // Added
r.path.eventChan <- Event{Type: "silence_detected", Data: now}
isSilent = true
r.segmentStart = time.Time{} // Modified: reset to allow new silence detection
audioStart = time.Time{} // Added
}
}
}
}
case <-r.terminate:
return
}
}
}()
}
// Stop terminates the VAD reader
func (r *VADReader) Stop() {
r.path.RemoveReader(r.name)
close(r.terminate)
}
func main() {
// Initialize slog (default handler for structured logging)
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})))
// Step 1: Parse config
configFile, err := os.ReadFile("config.yaml")
if err != nil {
slog.Error("Read config", "error", err)
os.Exit(1)
}
var conf Config
if err := yaml.Unmarshal(configFile, &conf); err != nil {
slog.Error("Unmarshal config", "error", err)
os.Exit(1)
}
if conf.RTSPURL == "" || conf.OutputFile == "" {
slog.Error("RTSP URL or output file missing")
os.Exit(1)
}
// Parse segment duration (default to 5 minutes)
segmentDur := 5 * time.Minute
if conf.RecordSegmentDuration != "" {
dur, err := time.ParseDuration(conf.RecordSegmentDuration)
if err != nil {
slog.Error("Invalid record_segment_duration", "value", conf.RecordSegmentDuration, "error", err)
os.Exit(1)
}
segmentDur = dur
}
// Parse VAD settings (default: mode=3, frame_ms=20)
vadMode := 3
frameMs := 20
if conf.VADMode != 0 {
if conf.VADMode < 0 || conf.VADMode > 3 {
slog.Error("Invalid vad_mode", "value", conf.VADMode, "allowed", "0-3")
os.Exit(1)
}
vadMode = conf.VADMode
}
if conf.FrameMs != 0 {
if conf.FrameMs != 10 && conf.FrameMs != 20 && conf.FrameMs != 30 {
slog.Error("Invalid frame_ms", "value", conf.FrameMs, "allowed", "10, 20, 30")
os.Exit(1)
}
frameMs = conf.FrameMs
}
// Step 2: Set up event bus
eventChan := make(chan Event, 10)
go func() {
for ev := range eventChan {
slog.Info("Event", "type", ev.Type, "data", ev.Data)
}
}()
// Step 3: Create path (stream hub)
path := NewPath(eventChan)
// Step 4: Set up RTSP client (using provided connection logic)
u, err := base.ParseURL(conf.RTSPURL)
if err != nil {
slog.Error("Failed to parse RTSP URL", "error", err, "url", conf.RTSPURL)
os.Exit(1)
}
c := gortsplib.Client{
Scheme: u.Scheme,
Host: u.Host,
}
// Connect to the server
err = c.Start2()
if err != nil {
slog.Error("Failed to connect to RTSP server", "error", err)
os.Exit(1)
}
defer c.Close()
// Find available medias
desc, _, err := c.Describe(u)
if err != nil {
slog.Error("Failed to describe RTSP stream", "error", err)
os.Exit(1)
}
// Find the G711 media and format
var audioFormat *format.G711
audioMedia := desc.FindFormat(&audioFormat)
if audioMedia == nil {
slog.Error("G711 media not found in RTSP stream")
os.Exit(1)
}
if audioFormat.SampleRate != 8000 || audioFormat.ChannelCount != 1 {
slog.Error("Audio format is not mono 8000Hz G711")
os.Exit(1)
}
_, err = c.Setup(desc.BaseURL, audioMedia, 0, 0)
if err != nil {
slog.Error("Failed to setup RTSP media", "error", err)
os.Exit(1)
}
// Step 5: Set up recorder (choose μ-law or A-law based on format)
isMuLaw := audioFormat.MULaw
recorder, err := NewRecorder(path, conf.OutputFile, isMuLaw, segmentDur) // Modified: added segmentDur
if err != nil {
slog.Error("Create recorder", "error", err)
os.Exit(1)
}
recorder.Start()
// Step 6: Set up VAD reader
vadReader, err := NewVADReader(path, isMuLaw, vadMode, frameMs)
if err != nil {
slog.Error("Create VAD reader", "error", err)
os.Exit(1)
}
vadReader.Start()
// Step 7: Signal source ready
path.SourceReady(audioMedia, audioFormat)
// Step 8: Read RTP packets and distribute
go func() {
c.OnPacketRTP(audioMedia, audioFormat, func(pkt *rtp.Packet) {
path.DistributeData(&Data{Media: audioMedia, Pkt: pkt})
})
_, err := c.Play(nil)
if err != nil {
slog.Error("Play failed", "error", err)
eventChan <- Event{Type: "source_error", Data: err}
return
}
}()
// Step 8: Wait for errors or interruption
err = c.Wait()
if err != nil {
slog.Error("RTSP client error", "error", err)
recorder.Stop()
c.Close()
close(eventChan)
os.Exit(1)
}
}