first commit
This commit is contained in:
60
watcher/watcher.go
Normal file
60
watcher/watcher.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package watcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"git.savin.nyc/alex/go-iar-notificator/bus"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
)
|
||||
|
||||
const (
|
||||
EventNewMP3 = "new_mp3" // Event name for new MP3 detection
|
||||
)
|
||||
|
||||
func WatchDirectory(ctx context.Context, dirToWatch string, eb *bus.CustomEventBus, logger *slog.Logger) {
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
logger.Error("Failed to create directory watcher", "error", err)
|
||||
return
|
||||
}
|
||||
defer watcher.Close()
|
||||
|
||||
// Ensure the directory exists
|
||||
if err := os.MkdirAll(dirToWatch, os.ModePerm); err != nil {
|
||||
logger.Error("Failed to create watched directory", "error", err, "dir", dirToWatch)
|
||||
return
|
||||
}
|
||||
|
||||
if err := watcher.Add(dirToWatch); err != nil {
|
||||
logger.Error("Failed to add watch on directory", "error", err, "dir", dirToWatch)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info("Watching directory for MP3 files", "dir", dirToWatch)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info("Stopping directory watcher")
|
||||
return
|
||||
case event, ok := <-watcher.Events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
// Trigger on file creation or write (e.g., when saved)
|
||||
if (event.Op&fsnotify.Create == fsnotify.Create || event.Op&fsnotify.Write == fsnotify.Write) &&
|
||||
filepath.Ext(event.Name) == ".mp3" {
|
||||
logger.Info("Detected new or saved MP3 file", "path", event.Name)
|
||||
eb.Publish(EventNewMP3, event.Name)
|
||||
}
|
||||
case err, ok := <-watcher.Errors:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
logger.Error("Directory watcher error", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
195
watcher/watcher_test.go
Normal file
195
watcher/watcher_test.go
Normal file
@@ -0,0 +1,195 @@
|
||||
package watcher_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.savin.nyc/alex/go-iar-notificator/bus"
|
||||
"git.savin.nyc/alex/go-iar-notificator/watcher"
|
||||
)
|
||||
|
||||
func TestWatchDirectory_NewMP3File(t *testing.T) {
|
||||
// Create a temporary directory
|
||||
tempDir := t.TempDir()
|
||||
|
||||
// Create event bus
|
||||
eb := bus.NewCustomEventBus()
|
||||
|
||||
// Channel to capture published events
|
||||
eventReceived := make(chan string, 1)
|
||||
|
||||
// Subscribe to new_mp3 event
|
||||
eb.Subscribe("new_mp3", "test_handler", func(data any) {
|
||||
if path, ok := data.(string); ok {
|
||||
eventReceived <- path
|
||||
}
|
||||
})
|
||||
|
||||
// Create logger (discard output)
|
||||
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
|
||||
|
||||
// Start watcher in a goroutine
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go watcher.WatchDirectory(ctx, tempDir, eb, logger)
|
||||
|
||||
// Give watcher time to start
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Create a new MP3 file
|
||||
mp3File := filepath.Join(tempDir, "test.mp3")
|
||||
err := os.WriteFile(mp3File, []byte("fake mp3 data"), 0644)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create MP3 file: %v", err)
|
||||
}
|
||||
|
||||
// Wait for the event
|
||||
select {
|
||||
case receivedPath := <-eventReceived:
|
||||
if receivedPath != mp3File {
|
||||
t.Errorf("Expected path %s, got %s", mp3File, receivedPath)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Error("Event was not received within timeout")
|
||||
}
|
||||
|
||||
// Clean up
|
||||
cancel()
|
||||
}
|
||||
|
||||
func TestWatchDirectory_NonMP3File(t *testing.T) {
|
||||
// Create a temporary directory
|
||||
tempDir := t.TempDir()
|
||||
|
||||
// Create event bus
|
||||
eb := bus.NewCustomEventBus()
|
||||
|
||||
// Channel to capture published events
|
||||
eventReceived := make(chan string, 1)
|
||||
|
||||
// Subscribe to new_mp3 event
|
||||
eb.Subscribe("new_mp3", "test_handler", func(data any) {
|
||||
if path, ok := data.(string); ok {
|
||||
eventReceived <- path
|
||||
}
|
||||
})
|
||||
|
||||
// Create logger
|
||||
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
|
||||
|
||||
// Start watcher
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go watcher.WatchDirectory(ctx, tempDir, eb, logger)
|
||||
|
||||
// Give watcher time to start
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Create a non-MP3 file
|
||||
txtFile := filepath.Join(tempDir, "test.txt")
|
||||
err := os.WriteFile(txtFile, []byte("text data"), 0644)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create text file: %v", err)
|
||||
}
|
||||
|
||||
// Wait a bit and check no event was received
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
select {
|
||||
case <-eventReceived:
|
||||
t.Error("Event should not be received for non-MP3 file")
|
||||
default:
|
||||
// Expected: no event
|
||||
}
|
||||
|
||||
// Clean up
|
||||
cancel()
|
||||
}
|
||||
|
||||
func TestWatchDirectory_DirectoryCreation(t *testing.T) {
|
||||
// Use a non-existent directory path
|
||||
tempDir := t.TempDir()
|
||||
nonExistentDir := filepath.Join(tempDir, "new_dir")
|
||||
|
||||
// Create event bus
|
||||
eb := bus.NewCustomEventBus()
|
||||
|
||||
// Create logger
|
||||
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
|
||||
|
||||
// Start watcher (should create the directory)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go watcher.WatchDirectory(ctx, nonExistentDir, eb, logger)
|
||||
|
||||
// Give time for directory creation
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Check if directory was created
|
||||
if _, err := os.Stat(nonExistentDir); os.IsNotExist(err) {
|
||||
t.Error("Directory should have been created")
|
||||
}
|
||||
|
||||
// Clean up
|
||||
cancel()
|
||||
}
|
||||
|
||||
func TestWatchDirectory_WriteEvent(t *testing.T) {
|
||||
// Create a temporary directory
|
||||
tempDir := t.TempDir()
|
||||
|
||||
// Create event bus
|
||||
eb := bus.NewCustomEventBus()
|
||||
|
||||
// Channel to capture published events
|
||||
eventReceived := make(chan string, 1)
|
||||
|
||||
// Subscribe to new_mp3 event
|
||||
eb.Subscribe("new_mp3", "test_handler", func(data any) {
|
||||
if path, ok := data.(string); ok {
|
||||
eventReceived <- path
|
||||
}
|
||||
})
|
||||
|
||||
// Create logger
|
||||
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
|
||||
|
||||
// Start watcher
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go watcher.WatchDirectory(ctx, tempDir, eb, logger)
|
||||
|
||||
// Give watcher time to start
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Create and then write to an MP3 file (simulating save)
|
||||
mp3File := filepath.Join(tempDir, "test.mp3")
|
||||
err := os.WriteFile(mp3File, []byte("initial data"), 0644)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create MP3 file: %v", err)
|
||||
}
|
||||
|
||||
// Wait for initial event
|
||||
select {
|
||||
case <-eventReceived:
|
||||
// Expected
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Error("Initial event was not received")
|
||||
}
|
||||
|
||||
// Now write again (should trigger again)
|
||||
err = os.WriteFile(mp3File, []byte("updated data"), 0644)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to update MP3 file: %v", err)
|
||||
}
|
||||
|
||||
// Wait for second event
|
||||
select {
|
||||
case <-eventReceived:
|
||||
// Expected
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Error("Second event was not received")
|
||||
}
|
||||
|
||||
// Clean up
|
||||
cancel()
|
||||
}
|
Reference in New Issue
Block a user