This commit is contained in:
DeepBeepMeep 2025-06-12 10:00:51 +02:00
commit 8d68c8b8dc
3 changed files with 318 additions and 5 deletions

261
notification_sound.py Normal file
View File

@ -0,0 +1,261 @@
"""
Notification sounds for Wan2GP video generation application
Pure Python audio notification system with multiple backend support
"""
import os
import sys
import threading
import time
import numpy as np
def generate_notification_beep(volume=50, sample_rate=44100):
"""Generate pleasant C major chord notification sound"""
if volume == 0:
return np.array([])
volume = max(0, min(100, volume))
# Volume curve mapping: 25%->50%, 50%->75%, 75%->100%, 100%->105%
if volume <= 25:
volume_mapped = (volume / 25.0) * 0.5
elif volume <= 50:
volume_mapped = 0.5 + ((volume - 25) / 25.0) * 0.25
elif volume <= 75:
volume_mapped = 0.75 + ((volume - 50) / 25.0) * 0.25
else:
volume_mapped = 1.0 + ((volume - 75) / 25.0) * 0.05 # Only 5% boost instead of 15%
volume = volume_mapped
# C major chord frequencies
freq_c = 261.63 # C4
freq_e = 329.63 # E4
freq_g = 392.00 # G4
duration = 0.8
t = np.linspace(0, duration, int(sample_rate * duration), False)
# Generate chord components
wave_c = np.sin(freq_c * 2 * np.pi * t) * 0.4
wave_e = np.sin(freq_e * 2 * np.pi * t) * 0.3
wave_g = np.sin(freq_g * 2 * np.pi * t) * 0.2
wave = wave_c + wave_e + wave_g
# Prevent clipping
max_amplitude = np.max(np.abs(wave))
if max_amplitude > 0:
wave = wave / max_amplitude * 0.8
# ADSR envelope
def apply_adsr_envelope(wave_data):
length = len(wave_data)
attack_time = int(0.2 * length)
decay_time = int(0.1 * length)
release_time = int(0.5 * length)
envelope = np.ones(length)
if attack_time > 0:
envelope[:attack_time] = np.power(np.linspace(0, 1, attack_time), 3)
if decay_time > 0:
start_idx = attack_time
end_idx = attack_time + decay_time
envelope[start_idx:end_idx] = np.linspace(1, 0.85, decay_time)
if release_time > 0:
start_idx = length - release_time
envelope[start_idx:] = 0.85 * np.exp(-4 * np.linspace(0, 1, release_time))
return wave_data * envelope
wave = apply_adsr_envelope(wave)
# Simple low-pass filter
def simple_lowpass_filter(signal, cutoff_ratio=0.8):
window_size = max(3, int(len(signal) * 0.001))
if window_size % 2 == 0:
window_size += 1
kernel = np.ones(window_size) / window_size
padded = np.pad(signal, window_size//2, mode='edge')
filtered = np.convolve(padded, kernel, mode='same')
return filtered[window_size//2:-window_size//2]
wave = simple_lowpass_filter(wave)
# Add reverb effect
if len(wave) > sample_rate // 4:
delay_samples = int(0.12 * sample_rate)
reverb = np.zeros_like(wave)
reverb[delay_samples:] = wave[:-delay_samples] * 0.08
wave = wave + reverb
# Apply volume first, then normalize to prevent clipping
wave = wave * volume * 0.5
# Final normalization with safety margin
max_amplitude = np.max(np.abs(wave))
if max_amplitude > 0.85: # If approaching clipping threshold
wave = wave / max_amplitude * 0.85 # More conservative normalization
return wave
def play_audio_with_pygame(audio_data, sample_rate=44100):
"""Play audio using pygame backend"""
try:
import pygame
# Initialize pygame mixer only if not already initialized
if not pygame.mixer.get_init():
pygame.mixer.pre_init(frequency=sample_rate, size=-16, channels=2, buffer=1024)
pygame.mixer.init()
else:
# Reinitialize with new settings if needed
current_freq, current_size, current_channels = pygame.mixer.get_init()
if current_freq != sample_rate or current_channels != 2:
pygame.mixer.quit()
pygame.mixer.pre_init(frequency=sample_rate, size=-16, channels=2, buffer=1024)
pygame.mixer.init()
audio_int16 = (audio_data * 32767).astype(np.int16)
# Convert mono to stereo
if len(audio_int16.shape) == 1:
stereo_data = np.column_stack((audio_int16, audio_int16))
else:
stereo_data = audio_int16
sound = pygame.sndarray.make_sound(stereo_data)
sound.play()
pygame.time.wait(int(len(audio_data) / sample_rate * 1000) + 100)
# Don't quit mixer - this can interfere with Gradio server
# pygame.mixer.quit()
return True
except ImportError:
return False
except Exception as e:
print(f"Pygame error: {e}")
return False
def play_audio_with_sounddevice(audio_data, sample_rate=44100):
"""Play audio using sounddevice backend"""
try:
import sounddevice as sd
sd.play(audio_data, sample_rate)
sd.wait()
return True
except ImportError:
return False
except Exception as e:
print(f"Sounddevice error: {e}")
return False
def play_audio_with_winsound(audio_data, sample_rate=44100):
"""Play audio using winsound backend (Windows only)"""
if sys.platform != "win32":
return False
try:
import winsound
import wave
import tempfile
import uuid
temp_dir = tempfile.gettempdir()
temp_filename = os.path.join(temp_dir, f"notification_{uuid.uuid4().hex}.wav")
try:
with wave.open(temp_filename, 'w') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(sample_rate)
audio_int16 = (audio_data * 32767).astype(np.int16)
wav_file.writeframes(audio_int16.tobytes())
winsound.PlaySound(temp_filename, winsound.SND_FILENAME)
finally:
# Clean up temp file
for _ in range(3):
try:
if os.path.exists(temp_filename):
os.unlink(temp_filename)
break
except:
time.sleep(0.1)
return True
except ImportError:
return False
except Exception as e:
print(f"Winsound error: {e}")
return False
def play_notification_sound(volume=50):
"""Play notification sound with specified volume"""
if volume == 0:
return
audio_data = generate_notification_beep(volume=volume)
if len(audio_data) == 0:
return
# Try audio backends in order
audio_backends = [
play_audio_with_pygame,
play_audio_with_sounddevice,
play_audio_with_winsound,
]
for backend in audio_backends:
try:
if backend(audio_data):
return
except Exception as e:
continue
# Fallback: terminal beep
print(f"All audio backends failed, using terminal beep")
print('\a')
def play_notification_async(volume=50):
"""Play notification sound asynchronously (non-blocking)"""
def play_sound():
try:
play_notification_sound(volume)
except Exception as e:
print(f"Error playing notification sound: {e}")
sound_thread = threading.Thread(target=play_sound, daemon=True)
sound_thread.start()
def notify_video_completion(video_path=None, volume=50):
"""Notify about completed video generation"""
play_notification_async(volume)
if __name__ == "__main__":
print("Testing notification sounds with different volumes...")
print("Auto-detecting available audio backends...")
volumes = [25, 50, 75, 100]
for vol in volumes:
print(f"Testing volume {vol}%:")
play_notification_sound(vol)
time.sleep(2)
print("Test completed!")

View File

@ -34,4 +34,5 @@ loguru
sentencepiece sentencepiece
av av
opencv-python opencv-python
pygame
# rembg==2.0.65 # rembg==2.0.65

61
wgp.py
View File

@ -14,6 +14,7 @@ import gradio as gr
import random import random
import json import json
import wan import wan
import notification_sound
from wan.configs import MAX_AREA_CONFIGS, WAN_CONFIGS, SUPPORTED_SIZES, VACE_SIZE_CONFIGS from wan.configs import MAX_AREA_CONFIGS, WAN_CONFIGS, SUPPORTED_SIZES, VACE_SIZE_CONFIGS
from wan.utils.utils import cache_video from wan.utils.utils import cache_video
from wan.modules.attention import get_attention_modes, get_supported_attention_modes from wan.modules.attention import get_attention_modes, get_supported_attention_modes
@ -2444,7 +2445,9 @@ def apply_changes( state,
UI_theme_choice = "default", UI_theme_choice = "default",
enhancer_enabled_choice = 0, enhancer_enabled_choice = 0,
fit_canvas_choice = 0, fit_canvas_choice = 0,
preload_in_VRAM_choice = 0 preload_in_VRAM_choice = 0,
notification_sound_enabled_choice = 1,
notification_sound_volume_choice = 50
): ):
if args.lock_config: if args.lock_config:
return return
@ -2469,7 +2472,9 @@ def apply_changes( state,
"UI_theme" : UI_theme_choice, "UI_theme" : UI_theme_choice,
"fit_canvas": fit_canvas_choice, "fit_canvas": fit_canvas_choice,
"enhancer_enabled" : enhancer_enabled_choice, "enhancer_enabled" : enhancer_enabled_choice,
"preload_in_VRAM" : preload_in_VRAM_choice "preload_in_VRAM" : preload_in_VRAM_choice,
"notification_sound_enabled" : notification_sound_enabled_choice,
"notification_sound_volume" : notification_sound_volume_choice
} }
if Path(server_config_filename).is_file(): if Path(server_config_filename).is_file():
@ -2506,7 +2511,7 @@ def apply_changes( state,
transformer_types = server_config["transformer_types"] transformer_types = server_config["transformer_types"]
model_filename = get_model_filename(get_model_type(state["model_filename"]), transformer_quantization, transformer_dtype_policy) model_filename = get_model_filename(get_model_type(state["model_filename"]), transformer_quantization, transformer_dtype_policy)
state["model_filename"] = model_filename state["model_filename"] = model_filename
if all(change in ["attention_mode", "vae_config", "boost", "save_path", "metadata_type", "clear_file_list", "fit_canvas"] for change in changes ): if all(change in ["attention_mode", "vae_config", "boost", "save_path", "metadata_type", "clear_file_list", "fit_canvas", "notification_sound_enabled", "notification_sound_volume"] for change in changes ):
model_choice = gr.Dropdown() model_choice = gr.Dropdown()
else: else:
reload_needed = True reload_needed = True
@ -2656,7 +2661,16 @@ def refresh_gallery(state): #, msg
for img_uri in list_uri: for img_uri in list_uri:
thumbnails += f'<TD><img src="{img_uri}" alt="Start" style="max-width:{thumbnail_size}; max-height:{thumbnail_size}; display: block; margin: auto; object-fit: contain;" /></TD>' thumbnails += f'<TD><img src="{img_uri}" alt="Start" style="max-width:{thumbnail_size}; max-height:{thumbnail_size}; display: block; margin: auto; object-fit: contain;" /></TD>'
html = "<STYLE> #PINFO, #PINFO th, #PINFO td {border: 1px solid #CCCCCC;background-color:#FFFFFF;}</STYLE><TABLE WIDTH=100% ID=PINFO ><TR><TD width=100%>" + prompt + "</TD>" + thumbnails + "</TR></TABLE>" # Get current theme from server config
current_theme = server_config.get("UI_theme", "default")
if current_theme == "gradio":
# Native Gradio theme (dark)
table_style = "border: 1px solid #444444; background-color: #2B2B2B; color: #FFFFFF; padding: 8px;"
else:
# Blue Sky Soft theme - match the input fields styling
table_style = "border: 1px solid #0a0f1e; background-color: #1e293b; color: #FFFFFF; padding: 8px;"
html = f"<TABLE WIDTH=100% ID=PINFO style='{table_style}'><TR><TD width=100% style='{table_style}'>" + prompt + "</TD>" + thumbnails + "</TR></TABLE>"
html_output = gr.HTML(html, visible= True) html_output = gr.HTML(html, visible= True)
return gr.Gallery(selected_index=choice, value = file_list), html_output, gr.Button(visible=False), gr.Button(visible=True), gr.Row(visible=True), update_queue_data(queue), gr.Button(interactive= abort_interactive), gr.Button(visible= onemorewindow_visible) return gr.Gallery(selected_index=choice, value = file_list), html_output, gr.Button(visible=False), gr.Button(visible=True), gr.Row(visible=True), update_queue_data(queue), gr.Button(interactive= abort_interactive), gr.Button(visible= onemorewindow_visible)
@ -3570,6 +3584,17 @@ def generate_video(
file_list.append(video_path) file_list.append(video_path)
file_settings_list.append(configs) file_settings_list.append(configs)
# Play notification sound for single video
try:
if server_config.get("notification_sound_enabled", 1):
volume = server_config.get("notification_sound_volume", 50)
notification_sound.notify_video_completion(
video_path=video_path,
volume=volume
)
except Exception as e:
print(f"Error playing notification sound for individual video: {e}")
send_cmd("output") send_cmd("output")
seed += 1 seed += 1
@ -3912,6 +3937,13 @@ def process_tasks(state):
status = f"Video generation was aborted. Total Generation Time: {end_time-start_time:.1f}s" status = f"Video generation was aborted. Total Generation Time: {end_time-start_time:.1f}s"
else: else:
status = f"Total Generation Time: {end_time-start_time:.1f}s" status = f"Total Generation Time: {end_time-start_time:.1f}s"
# Play notification sound when video generation completed successfully
try:
if server_config.get("notification_sound_enabled", 1):
volume = server_config.get("notification_sound_volume", 50)
notification_sound.notify_video_completion(volume=volume)
except Exception as e:
print(f"Error playing notification sound: {e}")
gen["status"] = status gen["status"] = status
gen["status_display"] = False gen["status_display"] = False
@ -5740,7 +5772,24 @@ def generate_configuration_tab(state, blocks, header, model_choice, prompt_enhan
) )
preload_in_VRAM_choice = gr.Slider(0, 40000, value=server_config.get("preload_in_VRAM", 0), step=100, label="Number of MB of Models that are Preloaded in VRAM (0 will use Profile default)") preload_in_VRAM_choice = gr.Slider(0, 40000, value=server_config.get("preload_in_VRAM", 0), step=100, label="Number of MB of Models that are Preloaded in VRAM (0 will use Profile default)")
with gr.Tab("Notifications"):
gr.Markdown("### Notification Settings")
notification_sound_enabled_choice = gr.Dropdown(
choices=[
("On", 1),
("Off", 0),
],
value=server_config.get("notification_sound_enabled", 1),
label="Notification Sound Enabled"
)
notification_sound_volume_choice = gr.Slider(
minimum=0,
maximum=100,
value=server_config.get("notification_sound_volume", 50),
step=5,
label="Notification Sound Volume (0 = silent, 100 = very loud)"
)
msg = gr.Markdown() msg = gr.Markdown()
@ -5767,7 +5816,9 @@ def generate_configuration_tab(state, blocks, header, model_choice, prompt_enhan
UI_theme_choice, UI_theme_choice,
enhancer_enabled_choice, enhancer_enabled_choice,
fit_canvas_choice, fit_canvas_choice,
preload_in_VRAM_choice preload_in_VRAM_choice,
notification_sound_enabled_choice,
notification_sound_volume_choice
], ],
outputs= [msg , header, model_choice, prompt_enhancer_row] outputs= [msg , header, model_choice, prompt_enhancer_row]
) )