mirror of
				https://github.com/Wan-Video/Wan2.1.git
				synced 2025-11-04 06:15:17 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			261 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			261 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""
 | 
						|
Notification sounds for Wan2GP video generation application
 | 
						|
Pure Python audio notification system with multiple backend support
 | 
						|
"""
 | 
						|
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import threading
 | 
						|
import time
 | 
						|
import numpy as np
 | 
						|
 | 
						|
 | 
						|
def generate_notification_beep(volume=50, sample_rate=44100):
 | 
						|
    """Generate pleasant C major chord notification sound"""
 | 
						|
    if volume == 0:
 | 
						|
        return np.array([])
 | 
						|
    
 | 
						|
    volume = max(0, min(100, volume))
 | 
						|
    
 | 
						|
    # Volume curve mapping: 25%->50%, 50%->75%, 75%->100%, 100%->105%
 | 
						|
    if volume <= 25:
 | 
						|
        volume_mapped = (volume / 25.0) * 0.5
 | 
						|
    elif volume <= 50:
 | 
						|
        volume_mapped = 0.5 + ((volume - 25) / 25.0) * 0.25
 | 
						|
    elif volume <= 75:
 | 
						|
        volume_mapped = 0.75 + ((volume - 50) / 25.0) * 0.25
 | 
						|
    else:
 | 
						|
        volume_mapped = 1.0 + ((volume - 75) / 25.0) * 0.05  # Only 5% boost instead of 15%
 | 
						|
    
 | 
						|
    volume = volume_mapped
 | 
						|
    
 | 
						|
    # C major chord frequencies
 | 
						|
    freq_c = 261.63  # C4
 | 
						|
    freq_e = 329.63  # E4  
 | 
						|
    freq_g = 392.00  # G4
 | 
						|
    
 | 
						|
    duration = 0.8
 | 
						|
    t = np.linspace(0, duration, int(sample_rate * duration), False)
 | 
						|
    
 | 
						|
    # Generate chord components
 | 
						|
    wave_c = np.sin(freq_c * 2 * np.pi * t) * 0.4
 | 
						|
    wave_e = np.sin(freq_e * 2 * np.pi * t) * 0.3
 | 
						|
    wave_g = np.sin(freq_g * 2 * np.pi * t) * 0.2
 | 
						|
    
 | 
						|
    wave = wave_c + wave_e + wave_g
 | 
						|
    
 | 
						|
    # Prevent clipping
 | 
						|
    max_amplitude = np.max(np.abs(wave))
 | 
						|
    if max_amplitude > 0:
 | 
						|
        wave = wave / max_amplitude * 0.8
 | 
						|
    
 | 
						|
    # ADSR envelope
 | 
						|
    def apply_adsr_envelope(wave_data):
 | 
						|
        length = len(wave_data)
 | 
						|
        attack_time = int(0.2 * length)
 | 
						|
        decay_time = int(0.1 * length)
 | 
						|
        release_time = int(0.5 * length)
 | 
						|
        
 | 
						|
        envelope = np.ones(length)
 | 
						|
        
 | 
						|
        if attack_time > 0:
 | 
						|
            envelope[:attack_time] = np.power(np.linspace(0, 1, attack_time), 3)
 | 
						|
        
 | 
						|
        if decay_time > 0:
 | 
						|
            start_idx = attack_time
 | 
						|
            end_idx = attack_time + decay_time
 | 
						|
            envelope[start_idx:end_idx] = np.linspace(1, 0.85, decay_time)
 | 
						|
        
 | 
						|
        if release_time > 0:
 | 
						|
            start_idx = length - release_time
 | 
						|
            envelope[start_idx:] = 0.85 * np.exp(-4 * np.linspace(0, 1, release_time))
 | 
						|
        
 | 
						|
        return wave_data * envelope
 | 
						|
    
 | 
						|
    wave = apply_adsr_envelope(wave)
 | 
						|
    
 | 
						|
    # Simple low-pass filter
 | 
						|
    def simple_lowpass_filter(signal, cutoff_ratio=0.8):
 | 
						|
        window_size = max(3, int(len(signal) * 0.001))
 | 
						|
        if window_size % 2 == 0:
 | 
						|
            window_size += 1
 | 
						|
        
 | 
						|
        kernel = np.ones(window_size) / window_size
 | 
						|
        padded = np.pad(signal, window_size//2, mode='edge')
 | 
						|
        filtered = np.convolve(padded, kernel, mode='same')
 | 
						|
        return filtered[window_size//2:-window_size//2]
 | 
						|
    
 | 
						|
    wave = simple_lowpass_filter(wave)
 | 
						|
    
 | 
						|
    # Add reverb effect
 | 
						|
    if len(wave) > sample_rate // 4:
 | 
						|
        delay_samples = int(0.12 * sample_rate)
 | 
						|
        reverb = np.zeros_like(wave)
 | 
						|
        reverb[delay_samples:] = wave[:-delay_samples] * 0.08
 | 
						|
        wave = wave + reverb
 | 
						|
    
 | 
						|
    # Apply volume first, then normalize to prevent clipping
 | 
						|
    wave = wave * volume * 0.5
 | 
						|
    
 | 
						|
    # Final normalization with safety margin
 | 
						|
    max_amplitude = np.max(np.abs(wave))
 | 
						|
    if max_amplitude > 0.85:  # If approaching clipping threshold
 | 
						|
        wave = wave / max_amplitude * 0.85  # More conservative normalization
 | 
						|
    
 | 
						|
    return wave
 | 
						|
 | 
						|
 | 
						|
def play_audio_with_pygame(audio_data, sample_rate=44100):
 | 
						|
    """Play audio using pygame backend"""
 | 
						|
    try:
 | 
						|
        import pygame
 | 
						|
        # Initialize pygame mixer only if not already initialized
 | 
						|
        if not pygame.mixer.get_init():
 | 
						|
            pygame.mixer.pre_init(frequency=sample_rate, size=-16, channels=2, buffer=1024)
 | 
						|
            pygame.mixer.init()
 | 
						|
        else:
 | 
						|
            # Reinitialize with new settings if needed
 | 
						|
            current_freq, current_size, current_channels = pygame.mixer.get_init()
 | 
						|
            if current_freq != sample_rate or current_channels != 2:
 | 
						|
                pygame.mixer.quit()
 | 
						|
                pygame.mixer.pre_init(frequency=sample_rate, size=-16, channels=2, buffer=1024)
 | 
						|
                pygame.mixer.init()
 | 
						|
        
 | 
						|
        audio_int16 = (audio_data * 32767).astype(np.int16)
 | 
						|
        
 | 
						|
        # Convert mono to stereo
 | 
						|
        if len(audio_int16.shape) == 1:
 | 
						|
            stereo_data = np.column_stack((audio_int16, audio_int16))
 | 
						|
        else:
 | 
						|
            stereo_data = audio_int16
 | 
						|
        
 | 
						|
        sound = pygame.sndarray.make_sound(stereo_data)
 | 
						|
        sound.play()
 | 
						|
        pygame.time.wait(int(len(audio_data) / sample_rate * 1000) + 100)
 | 
						|
        # Don't quit mixer - this can interfere with Gradio server
 | 
						|
        # pygame.mixer.quit()
 | 
						|
        return True
 | 
						|
        
 | 
						|
    except ImportError:
 | 
						|
        return False
 | 
						|
    except Exception as e:
 | 
						|
        print(f"Pygame error: {e}")
 | 
						|
        return False
 | 
						|
 | 
						|
 | 
						|
def play_audio_with_sounddevice(audio_data, sample_rate=44100):
 | 
						|
    """Play audio using sounddevice backend"""
 | 
						|
    try:
 | 
						|
        import sounddevice as sd
 | 
						|
        sd.play(audio_data, sample_rate)
 | 
						|
        sd.wait()
 | 
						|
        return True
 | 
						|
        
 | 
						|
    except ImportError:
 | 
						|
        return False
 | 
						|
    except Exception as e:
 | 
						|
        print(f"Sounddevice error: {e}")
 | 
						|
        return False
 | 
						|
 | 
						|
 | 
						|
def play_audio_with_winsound(audio_data, sample_rate=44100):
 | 
						|
    """Play audio using winsound backend (Windows only)"""
 | 
						|
    if sys.platform != "win32":
 | 
						|
        return False
 | 
						|
        
 | 
						|
    try:
 | 
						|
        import winsound
 | 
						|
        import wave
 | 
						|
        import tempfile
 | 
						|
        import uuid
 | 
						|
        
 | 
						|
        temp_dir = tempfile.gettempdir()
 | 
						|
        temp_filename = os.path.join(temp_dir, f"notification_{uuid.uuid4().hex}.wav")
 | 
						|
        
 | 
						|
        try:
 | 
						|
            with wave.open(temp_filename, 'w') as wav_file:
 | 
						|
                wav_file.setnchannels(1)
 | 
						|
                wav_file.setsampwidth(2)
 | 
						|
                wav_file.setframerate(sample_rate)
 | 
						|
                
 | 
						|
                audio_int16 = (audio_data * 32767).astype(np.int16)
 | 
						|
                wav_file.writeframes(audio_int16.tobytes())
 | 
						|
            
 | 
						|
            winsound.PlaySound(temp_filename, winsound.SND_FILENAME)
 | 
						|
            
 | 
						|
        finally:
 | 
						|
            # Clean up temp file
 | 
						|
            for _ in range(3):
 | 
						|
                try:
 | 
						|
                    if os.path.exists(temp_filename):
 | 
						|
                        os.unlink(temp_filename)
 | 
						|
                    break
 | 
						|
                except:
 | 
						|
                    time.sleep(0.1)
 | 
						|
            
 | 
						|
        return True
 | 
						|
        
 | 
						|
    except ImportError:
 | 
						|
        return False
 | 
						|
    except Exception as e:
 | 
						|
        print(f"Winsound error: {e}")
 | 
						|
        return False
 | 
						|
 | 
						|
 | 
						|
def play_notification_sound(volume=50):
 | 
						|
    """Play notification sound with specified volume"""
 | 
						|
    if volume == 0:
 | 
						|
        return
 | 
						|
    
 | 
						|
    audio_data = generate_notification_beep(volume=volume)
 | 
						|
    
 | 
						|
    if len(audio_data) == 0:
 | 
						|
        return
 | 
						|
    
 | 
						|
    # Try audio backends in order
 | 
						|
    audio_backends = [
 | 
						|
        play_audio_with_pygame,
 | 
						|
        play_audio_with_sounddevice,
 | 
						|
        play_audio_with_winsound,
 | 
						|
    ]
 | 
						|
    
 | 
						|
    for backend in audio_backends:
 | 
						|
        try:
 | 
						|
            if backend(audio_data):
 | 
						|
                return
 | 
						|
        except Exception as e:
 | 
						|
            continue
 | 
						|
    
 | 
						|
    # Fallback: terminal beep
 | 
						|
    print(f"All audio backends failed, using terminal beep")
 | 
						|
    print('\a')
 | 
						|
 | 
						|
 | 
						|
def play_notification_async(volume=50):
 | 
						|
    """Play notification sound asynchronously (non-blocking)"""
 | 
						|
    def play_sound():
 | 
						|
        try:
 | 
						|
            play_notification_sound(volume)
 | 
						|
        except Exception as e:
 | 
						|
            print(f"Error playing notification sound: {e}")
 | 
						|
    
 | 
						|
    sound_thread = threading.Thread(target=play_sound, daemon=True)
 | 
						|
    sound_thread.start()
 | 
						|
 | 
						|
 | 
						|
def notify_video_completion(video_path=None, volume=50):
 | 
						|
    """Notify about completed video generation"""
 | 
						|
    play_notification_async(volume)
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    print("Testing notification sounds with different volumes...")
 | 
						|
    print("Auto-detecting available audio backends...")
 | 
						|
    
 | 
						|
    volumes = [25, 50, 75, 100]
 | 
						|
    for vol in volumes:
 | 
						|
        print(f"Testing volume {vol}%:")
 | 
						|
        play_notification_sound(vol)
 | 
						|
        time.sleep(2)
 | 
						|
    
 | 
						|
    print("Test completed!")  |