the killer of seven

This commit is contained in:
deepbeepmeep 2025-08-21 00:48:29 +02:00
parent 51341ac1c1
commit 1c310f7c9d
23 changed files with 1294 additions and 504 deletions

View File

@ -20,6 +20,13 @@ WanGP supports the Wan (and derived models), Hunyuan Video and LTV Video models
**Follow DeepBeepMeep on Twitter/X to get the Latest News**: https://x.com/deepbeepmeep **Follow DeepBeepMeep on Twitter/X to get the Latest News**: https://x.com/deepbeepmeep
## 🔥 Latest Updates : ## 🔥 Latest Updates :
### August 21 2025: WanGP v8.0 - the killer of seven
- Qwen Image Edit : Flux Kontext challenger (prompt driven image edition). You should use it at high res (1080p) if you want to preserve identity of the original people / objects. It works with Qwen Lora Lightning 4 steps. I have also unlocked all the resolutions for Qwen models. Bonus Zone: support for multiple image compositions
- On demand Prompt Enhancer (need to be enabled in Configuration Tab) that you can use to Enhance a Text Prompt before starting a Generation. You can refine the Enhanced Prompt or change the original Prompt.
- Choice of a non censored Prompt Enhancer. Beware this is one is VRAM hungry and will require 12 GB of VRAM to work
- Memory Profile customizable per model : useful to set for instance Profile 3 (preload the model entirely in VRAM) with only Image Generation models, if you have 24 GB of VRAM. In that case Generation will be much faster because with Image generators (contrary to Video generators) as a lot of time is wasted in offloading
- Expert Guidance Mode: change the Guidance during the generation up to 2 times. Very useful with Wan 2.2 Ligthning to reduce the slow motion effect. The idea is to insert a CFG phase before the 2 accelerated phases that follow and have no Guidance. I have added the finetune *Wan2.2 Vace Lightning 3 Phases 14B* with a prebuilt configuration. Please note that it is a 8 steps process although the lora lightning is 4 steps. This expert guidance mode is also available with Wan 2.1.
### August 12 2025: WanGP v7.7777 - Lucky Day(s) ### August 12 2025: WanGP v7.7777 - Lucky Day(s)

View File

@ -5,7 +5,7 @@
"architecture" : "hunyuan", "architecture" : "hunyuan",
"description": "Probably the best text 2 video model available.", "description": "Probably the best text 2 video model available.",
"URLs": [ "URLs": [
"https://huggingface.co/DeepBeepMeep/HunyuanVideo/resolve/main/hunyuan_video_720_bf16.safetensors.safetensors", "https://huggingface.co/DeepBeepMeep/HunyuanVideo/resolve/main/hunyuan_video_720_bf16.safetensors",
"https://huggingface.co/DeepBeepMeep/HunyuanVideo/resolve/main/hunyuan_video_720_quanto_int8.safetensors" "https://huggingface.co/DeepBeepMeep/HunyuanVideo/resolve/main/hunyuan_video_720_quanto_int8.safetensors"
] ]
} }

View File

@ -2,12 +2,12 @@
"model": { "model": {
"name": "Qwen Image 20B", "name": "Qwen Image 20B",
"architecture": "qwen_image_20B", "architecture": "qwen_image_20B",
"description": "Qwen Image is generative model that will very high quality images. It is one of the few models capable to generate in the image very long texts.", "description": "Qwen Image is generative model that will generate very high quality images. It is one of the few models capable to generate in the image very long texts.",
"URLs": [ "URLs": [
"https://huggingface.co/DeepBeepMeep/Qwen_image/resolve/main/qwen_image_20B_bf16.safetensors", "https://huggingface.co/DeepBeepMeep/Qwen_image/resolve/main/qwen_image_20B_bf16.safetensors",
"https://huggingface.co/DeepBeepMeep/Qwen_image/resolve/main/qwen_image_20B_quanto_bf16_int8.safetensors" "https://huggingface.co/DeepBeepMeep/Qwen_image/resolve/main/qwen_image_20B_quanto_bf16_int8.safetensors"
], ],
"resolutions": [ ["1328x1328 (1:1)", "1328x1328"], "xresolutions": [ ["1328x1328 (1:1)", "1328x1328"],
["1664x928 (16:9)", "1664x928"], ["1664x928 (16:9)", "1664x928"],
["928x1664 (9:16)", "928x1664"], ["928x1664 (9:16)", "928x1664"],
["1472x1140 (4:3)", "1472x1140"], ["1472x1140 (4:3)", "1472x1140"],
@ -16,6 +16,6 @@
"image_outputs": true "image_outputs": true
}, },
"prompt": "draw a hat", "prompt": "draw a hat",
"resolution": "1280x720", "resolution": "1920x1088",
"batch_size": 1 "batch_size": 1
} }

View File

@ -0,0 +1,19 @@
{
"model": {
"name": "Qwen Image Edit 20B",
"architecture": "qwen_image_edit_20B",
"description": "Qwen Image Edit is generative model that will generate very high quality images. It can be used to edit a Subject or combine multiple Subjects. It is one of the few models capable to generate in the image very long texts.",
"URLs": [
"https://huggingface.co/DeepBeepMeep/Qwen_image/resolve/main/qwen_image_edit_20B_bf16.safetensors",
"https://huggingface.co/DeepBeepMeep/Qwen_image/resolve/main/qwen_image_edit_20B_quanto_bf16_int8.safetensors"
],
"attention": {
"<89": "sdpa"
},
"reference_image": true,
"image_outputs": true
},
"prompt": "draw a hat",
"resolution": "1280x720",
"batch_size": 1
}

View File

@ -0,0 +1,29 @@
{
"model": {
"name": "Wan2.2 Vace Lightning 3 Phases 14B",
"architecture": "vace_14B",
"modules": [
"vace_14B"
],
"description": "This finetune uses the Lightning 4 steps Loras Accelerator for Wan 2.2 but extend them to 8 steps in order to insert a CFG phase before the 2 accelerated phases with no Guidance. The ultimate goal is reduce the slow motion effect of these Loras Accelerators.",
"URLs": "t2v_2_2",
"URLs2": "t2v_2_2",
"loras": [
"https://huggingface.co/DeepBeepMeep/Wan2.2/resolve/main/loras_accelerators/Wan2.2-Lightning_T2V-v1.1-A14B-4steps-lora_HIGH_fp16.safetensors",
"https://huggingface.co/DeepBeepMeep/Wan2.2/resolve/main/loras_accelerators/Wan2.2-Lightning_T2V-v1.1-A14B-4steps-lora_LOW_fp16.safetensors"
],
"loras_multipliers": ["0;1;0", "0;0;1"],
"lock_guidance_phases": true,
"group": "wan2_2"
},
"num_inference_steps": 8,
"guidance_phases": 3.1,
"guidance_scale": 3.5,
"guidance2_scale": 1,
"guidance3_scale": 1,
"switch_threshold": 965,
"switch_threshold2": 800,
"model_switch_phase": 2,
"flow_shift": 3,
"sample_solver": "euler"
}

View File

@ -69,9 +69,25 @@ For instance if one adds a module *vace_14B* on top of a model with architecture
-*visible* : by default assumed to be true. If set to false the model will no longer be visible. This can be useful if you create a finetune to override a default model and hide it. -*visible* : by default assumed to be true. If set to false the model will no longer be visible. This can be useful if you create a finetune to override a default model and hide it.
-*image_outputs* : turn any model that generates a video into a model that generates images. In fact it will adapt the user interface for image generation and ask the model to generate a video with a single frame. -*image_outputs* : turn any model that generates a video into a model that generates images. In fact it will adapt the user interface for image generation and ask the model to generate a video with a single frame.
In order to favor reusability the properties of *URLs*, *modules*, *loras* and *preload_URLs* can contain instead of a list of URLs a single text which corresponds to the id of a finetune or default model to reuse. In order to favor reusability the properties of *URLs*, *modules*, *loras* and *preload_URLs* can contain instead of a list of URLs a single text which corresponds to the id of a finetune or default model to reuse. Instead of:
```
"URLs": [
"https://huggingface.co/DeepBeepMeep/Wan2.2/resolve/main/wan2.2_text2video_14B_high_mbf16.safetensors",
"https://huggingface.co/DeepBeepMeep/Wan2.2/resolve/main/wan2.2_text2video_14B_high_quanto_mbf16_int8.safetensors",
"https://huggingface.co/DeepBeepMeep/Wan2.2/resolve/main/wan2.2_text2video_14B_high_quanto_mfp16_int8.safetensors"
],
"URLs2": [
"https://huggingface.co/DeepBeepMeep/Wan2.2/resolve/main/wan2.2_text2video_14B_low_mbf16.safetensors",
"https://huggingface.co/DeepBeepMeep/Wan2.2/resolve/main/wan2.2_text2video_14B_low_quanto_mbf16_int8.safetensors",
"https://huggingface.co/DeepBeepMeep/Wan2.2/resolve/main/wan2.2_text2video_14B_low_quanto_mfp16_int8.safetensors"
],
```
You can write:
```
"URLs": "t2v_2_2",
"URLs2": "t2v_2_2",
```
For example lets say you have defined a *t2v_fusionix.json* file which contains the URLs to download the finetune. In the *vace_fusionix.json* you can write « URLs » : « fusionix » to reuse automatically the URLS already defined in the correspond file.
Example of **model** subtree Example of **model** subtree
``` ```

View File

@ -7,18 +7,21 @@ Loras (Low-Rank Adaptations) allow you to customize video generation models by a
Loras are organized in different folders based on the model they're designed for: Loras are organized in different folders based on the model they're designed for:
### Wan Text-to-Video Models ### Wan Text-to-Video Models
- `loras/` - General t2v loras - `loras/` - General t2v loras for Wan 2.1 (t2v only) and for all Wan 2.2 models
Optional sub folders:
- `loras/1.3B/` - Loras specifically for 1.3B models - `loras/1.3B/` - Loras specifically for 1.3B models
- `loras/5B/` - Loras specifically for 1.3B models
- `loras/14B/` - Loras specifically for 14B models - `loras/14B/` - Loras specifically for 14B models
### Wan Image-to-Video Models ### Wan Image-to-Video Models
- `loras_i2v/` - Image-to-video loras - `loras_i2v/` - Image-to-video loras for Wan 2.1
### Other Models ### Other Models
- `loras_hunyuan/` - Hunyuan Video t2v loras - `loras_hunyuan/` - Hunyuan Video t2v loras
- `loras_hunyuan_i2v/` - Hunyuan Video i2v loras - `loras_hunyuan_i2v/` - Hunyuan Video i2v loras
- `loras_ltxv/` - LTX Video loras - `loras_ltxv/` - LTX Video loras
- `loras_flux/` - Flux loras - `loras_flux/` - Flux loras
- `loras_qwen/` - Qwen loras
## Custom Lora Directory ## Custom Lora Directory
@ -40,7 +43,9 @@ python wgp.py --lora-dir-hunyuan /path/to/hunyuan/loras --lora-dir-ltxv /path/to
2. Launch WanGP 2. Launch WanGP
3. In the Advanced Tab, select the "Loras" section 3. In the Advanced Tab, select the "Loras" section
4. Check the loras you want to activate 4. Check the loras you want to activate
5. Set multipliers for each lora (default is 1.0) 5. Set multipliers for each lora (default is 1.0 if multiplier is not mentioned)
If you store loras in the loras folder once WanGP has been launched, click the *Refresh* button at the top so that it can become selectable.
### Lora Multipliers ### Lora Multipliers
@ -53,7 +58,7 @@ Multipliers control the strength of each lora's effect:
- First lora: 1.2 strength - First lora: 1.2 strength
- Second lora: 0.8 strength - Second lora: 0.8 strength
#### Time-based Multipliers #### Time-based and Phase-based Multipliers
For dynamic effects over generation steps, use comma-separated values: For dynamic effects over generation steps, use comma-separated values:
``` ```
0.9,0.8,0.7 0.9,0.8,0.7
@ -75,7 +80,7 @@ Also with Wan 2.2, if you have two loras and you want the first one to be applie
1;0 0;1 1;0 0;1
``` ```
As usual, you can use any float for of multiplier and have a multiplier varries throughout one phase for one Lora: As usual, you can use any float for a multiplier and have a multiplier varries throughout one phase for one Lora:
``` ```
0.9,0.8;1.2,1.1,1 0.9,0.8;1.2,1.1,1
``` ```
@ -87,7 +92,31 @@ Here is another example for two loras:
0.5;0,0.7 0.5;0,0.7
``` ```
Note that the syntax for multipliers can also be used in a Finetune model definition file (except that each multiplier definition is a string in a json list) If one of several of your Lora multipliers are phased based (that is with a ";") and there are also Loras Multipliers that are only time based (don't have a ";" but have a ",") the time only multiplier will ignore the phases. For instance, let's assume we have a 6 steps denoising process in the following example:
```
1;0
0;1
0.8,0.7,0.5
```
Here the first lora will be as expected only used with the High Noise model and the second lora only used with the Low noise model. However for the third Lora: for steps 1-2 the multiplier will be (regadless of the phase) 0.8 then for steps 3-4 the multiplier will be 0.7 and finally for steps 5-6 the multiplier will be 0.5
You can use phased Lora multipliers even if have a single model (that is without any High / Low models) as Lora multiplier phases are aligned with Guidance phases. Let's assume you have defined 3 guidance phases (for instance guidance=3, then guidance=1.5 and at last guidance=1 ):
```
0;1;0
0;0;1
```
In that case no lora will be applied during the first phase when guidance is 3. Then the fist lora will be only used when guidance is 1.5 and the second lora only when guidance is 1.
Best of all you can combine 3 guidance phases with High / Low models. Let's take this practical example with *Lightning 4/8 steps loras accelerators for Wan 2.2* where we want to increase the motion by adding some guidance at the very beginning (in that case a first phase that lasts only 1 step should be sufficient):
```
Guidances: 3.5, 1 and 1
Model transition: Phase 2-3
Loras Multipliers: 0;1;0 0;0;1
```
Here during the first phase with guidance 3.5, the High model will be used but there won't be any lora at all. Then during phase 2 only the High lora will be used (which requires to set the guidance to 1). At last in phase 3 WanGP will switch to the Low model and then only the Low lora will be used.
*Note that the syntax for multipliers can also be used in a Finetune model definition file (except that each multiplier definition is a string in a json list)*
## Lora Presets ## Lora Presets
Lora Presets are combinations of loras with predefined multipliers and prompts. Lora Presets are combinations of loras with predefined multipliers and prompts.
@ -125,15 +154,22 @@ WanGP supports multiple lora formats:
## Loras Accelerators ## Loras Accelerators
Most Loras are used to apply a specific style or to alter the content of the output of the generated video. Most Loras are used to apply a specific style or to alter the content of the output of the generated video.
However some Loras have been designed to tranform a model into a distilled model which requires fewer steps to generate a video. However some Loras have been designed to tranform a model into a distilled model which requires fewer steps to generate a video.
Loras accelerators usually require to the set the Guidance to 1. Don't forget to do it as not only the quality of the generate video will be bad but it will two times slower.
You will find most *Loras Accelerators* here: You will find most *Loras Accelerators* below:
- Wan 2.1
https://huggingface.co/DeepBeepMeep/Wan2.1/tree/main/loras_accelerators https://huggingface.co/DeepBeepMeep/Wan2.1/tree/main/loras_accelerators
- Wan 2.2
https://huggingface.co/DeepBeepMeep/Wan2.2/tree/main/loras_accelerators
- Qwen:
https://huggingface.co/DeepBeepMeep/Qwen_image/tree/main/loras_accelerators
### Setup Instructions ### Setup Instructions
1. Download the Lora 1. Download the Lora
2. Place it in your `loras/` directory if it is a t2v lora or in the `loras_i2v/` directory if it isa i2v lora 2. Place it in your `loras/` directory if it is a t2v lora or in the `loras_i2v/` directory if it isa i2v lora
## FusioniX (or FusionX) Lora ## FusioniX (or FusionX) Lora for Wan 2.1 / Wan 2.2
If you need just one Lora accelerator use this one. It is a combination of multiple Loras acelerators (including Causvid below) and style loras. It will not only accelerate the video generation but it will also improve the quality. There are two versions of this lora whether you use it for t2v or i2v If you need just one Lora accelerator use this one. It is a combination of multiple Loras acelerators (including Causvid below) and style loras. It will not only accelerate the video generation but it will also improve the quality. There are two versions of this lora whether you use it for t2v or i2v
### Usage ### Usage
@ -148,8 +184,8 @@ If you need just one Lora accelerator use this one. It is a combination of multi
5. Set generation steps from 8-10 5. Set generation steps from 8-10
6. Generate! 6. Generate!
## Safe-Forcing lightx2v Lora (Video Generation Accelerator) ## Self-Forcing lightx2v Lora (Video Generation Accelerator) for Wan 2.1 / Wan 2.2
Safeforcing Lora has been created by Kijai from the Safe-Forcing lightx2v distilled Wan model and can generate videos with only 2 steps and offers also a 2x speed improvement since it doesnt require classifier free guidance. It works on both t2v and i2v models Selg forcing Lora has been created by Kijai from the Self-Forcing lightx2v distilled Wan model and can generate videos with only 2 steps and offers also a 2x speed improvement since it doesnt require classifier free guidance. It works on both t2v and i2v models
You will find it under the name of *Wan21_T2V_14B_lightx2v_cfg_step_distill_lora_rank32.safetensors* You will find it under the name of *Wan21_T2V_14B_lightx2v_cfg_step_distill_lora_rank32.safetensors*
### Usage ### Usage
@ -165,7 +201,7 @@ You will find it under the name of *Wan21_T2V_14B_lightx2v_cfg_step_distill_lora
6. Generate! 6. Generate!
## CausVid Lora (Video Generation Accelerator) ## CausVid Lora (Video Generation Accelerator) for Wan 2.1 / Wan 2.2
CausVid is a distilled Wan model that generates videos in 4-12 steps with 2x speed improvement. CausVid is a distilled Wan model that generates videos in 4-12 steps with 2x speed improvement.
### Usage ### Usage
@ -188,11 +224,10 @@ CausVid is a distilled Wan model that generates videos in 4-12 steps with 2x spe
*Note: Lower steps = lower quality (especially motion)* *Note: Lower steps = lower quality (especially motion)*
## AccVid Lora (Video Generation Accelerator) ## AccVid Lora (Video Generation Accelerator) for Wan 2.1 / Wan 2.2
AccVid is a distilled Wan model that generates videos with a 2x speed improvement since classifier free guidance is no longer needed (that is cfg = 1). AccVid is a distilled Wan model that generates videos with a 2x speed improvement since classifier free guidance is no longer needed (that is cfg = 1).
### Usage ### Usage
1. Select a Wan t2v model (e.g., Wan 2.1 text2video 13B or Vace 13B) or Wan i2v model 1. Select a Wan t2v model (e.g., Wan 2.1 text2video 13B or Vace 13B) or Wan i2v model
2. Enable Advanced Mode 2. Enable Advanced Mode
@ -201,6 +236,21 @@ AccVid is a distilled Wan model that generates videos with a 2x speed improvemen
- Set Shift Scale = 5 - Set Shift Scale = 5
4. The number steps remain unchanged compared to what you would use with the original model but it will be two times faster since classifier free guidance is not needed 4. The number steps remain unchanged compared to what you would use with the original model but it will be two times faster since classifier free guidance is not needed
## Lightx2v 4 steps Lora (Video Generation Accelerator) for Wan 2.2
This lora is in fact composed of two loras, one for the High model and one for the Low Wan 2.2 model.
You need to select these two loras and set the following Loras multipliers:
```
1;0 0;1 (the High lora should be only enabled when only the High model is loaded, same for the Low lora)
```
Don't forget to set guidance to 1 !
## Qwen Image Lightning 4 steps / Lightning 8 steps
Very powerful lora that you can use to reduce the number of steps from 30 to only 4 !
Just install the lora in *lora_qwen* folder, select the lora and set Guidance to 1 and the number of steps to 4 or 8
https://huggingface.co/Kijai/WanVideo_comfy/blob/main/Wan21_T2V_14B_lightx2v_cfg_step_distill_lora_rank32.safetensors https://huggingface.co/Kijai/WanVideo_comfy/blob/main/Wan21_T2V_14B_lightx2v_cfg_step_distill_lora_rank32.safetensors
@ -215,6 +265,7 @@ https://huggingface.co/Kijai/WanVideo_comfy/blob/main/Wan21_T2V_14B_lightx2v_cfg
- Loras are loaded on-demand to save VRAM - Loras are loaded on-demand to save VRAM
- Multiple loras can be used simultaneously - Multiple loras can be used simultaneously
- Time-based multipliers don't use extra memory - Time-based multipliers don't use extra memory
- The order of Loras doesn't matter (as long as the loras multipliers are in the right order of course !)
## Finding Loras ## Finding Loras
@ -266,6 +317,7 @@ In the video, a man is presented. The man is in a city and looks at his watch.
## Troubleshooting ## Troubleshooting
### Lora Not Working ### Lora Not Working
0. If it is a lora accelerator, Guidance should be set to 1
1. Check if lora is compatible with your model size (1.3B vs 14B) 1. Check if lora is compatible with your model size (1.3B vs 14B)
2. Verify lora format is supported 2. Verify lora format is supported
3. Try different multiplier values 3. Try different multiplier values
@ -287,12 +339,13 @@ In the video, a man is presented. The man is in a city and looks at his watch.
```bash ```bash
# Lora-related command line options # Lora-related command line options
--lora-dir path # Path to t2v loras directory --lora-dir path # Path to t2v loras directory
--lora-dir-i2v path # Path to i2v loras directory --lora-dir-i2v path # Path to i2v loras directory
--lora-dir-hunyuan path # Path to Hunyuan t2v loras --lora-dir-hunyuan path # Path to Hunyuan t2v loras
--lora-dir-hunyuan-i2v path # Path to Hunyuan i2v loras --lora-dir-hunyuan-i2v path # Path to Hunyuan i2v loras
--lora-dir-ltxv path # Path to LTX Video loras --lora-dir-ltxv path # Path to LTX Video loras
--lora-dir-flux path # Path to Flux loras --lora-dir-flux path # Path to Flux loras
--lora-dir-qwen path # Path to Qwen loras
--lora-preset preset # Load preset on startup --lora-preset preset # Load preset on startup
--check-loras # Filter incompatible loras --check-loras # Filter incompatible loras
``` ```

View File

@ -15,9 +15,7 @@ class family_handler():
"image_outputs" : True, "image_outputs" : True,
"no_negative_prompt" : True, "no_negative_prompt" : True,
} }
if flux_schnell: if not flux_schnell:
model_def_output["no_guidance"] = True
else:
model_def_output["embedded_guidance"] = True model_def_output["embedded_guidance"] = True

View File

@ -42,7 +42,11 @@ class family_handler():
extra_model_def["frames_minimum"] = 5 extra_model_def["frames_minimum"] = 5
extra_model_def["frames_steps"] = 4 extra_model_def["frames_steps"] = 4
extra_model_def["sliding_window"] = False extra_model_def["sliding_window"] = False
extra_model_def["embedded_guidance"] = base_model_type in ["hunyuan", "hunyuan_i2v"] if base_model_type in ["hunyuan", "hunyuan_i2v"]:
extra_model_def["embedded_guidance"] = True
else:
extra_model_def["guidance_max_phases"] = 1
extra_model_def["cfg_star"] = base_model_type in [ "hunyuan_avatar", "hunyuan_custom_audio", "hunyuan_custom_edit", "hunyuan_custom"] extra_model_def["cfg_star"] = base_model_type in [ "hunyuan_avatar", "hunyuan_custom_audio", "hunyuan_custom_edit", "hunyuan_custom"]
extra_model_def["tea_cache"] = True extra_model_def["tea_cache"] = True
extra_model_def["mag_cache"] = True extra_model_def["mag_cache"] = True

View File

@ -12,9 +12,7 @@ class family_handler():
def query_model_def(base_model_type, model_def): def query_model_def(base_model_type, model_def):
LTXV_config = model_def.get("LTXV_config", "") LTXV_config = model_def.get("LTXV_config", "")
distilled= "distilled" in LTXV_config distilled= "distilled" in LTXV_config
extra_model_def = { extra_model_def = {}
"no_guidance": True,
}
if distilled: if distilled:
extra_model_def.update({ extra_model_def.update({
"lock_inference_steps": True, "lock_inference_steps": True,

View File

@ -27,9 +27,30 @@ from diffusers.utils.torch_utils import randn_tensor
from transformers import Qwen2_5_VLForConditionalGeneration, Qwen2Tokenizer, AutoTokenizer from transformers import Qwen2_5_VLForConditionalGeneration, Qwen2Tokenizer, AutoTokenizer
from .autoencoder_kl_qwenimage import AutoencoderKLQwenImage from .autoencoder_kl_qwenimage import AutoencoderKLQwenImage
from diffusers import FlowMatchEulerDiscreteScheduler from diffusers import FlowMatchEulerDiscreteScheduler
from PIL import Image
XLA_AVAILABLE = False XLA_AVAILABLE = False
PREFERRED_QWENIMAGE_RESOLUTIONS = [
(672, 1568),
(688, 1504),
(720, 1456),
(752, 1392),
(800, 1328),
(832, 1248),
(880, 1184),
(944, 1104),
(1024, 1024),
(1104, 944),
(1184, 880),
(1248, 832),
(1328, 800),
(1392, 752),
(1456, 720),
(1504, 688),
(1568, 672),
]
logger = logging.get_logger(__name__) # pylint: disable=invalid-name logger = logging.get_logger(__name__) # pylint: disable=invalid-name
@ -122,6 +143,18 @@ def retrieve_timesteps(
timesteps = scheduler.timesteps timesteps = scheduler.timesteps
return timesteps, num_inference_steps return timesteps, num_inference_steps
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img.retrieve_latents
def retrieve_latents(
encoder_output: torch.Tensor, generator: Optional[torch.Generator] = None, sample_mode: str = "sample"
):
if hasattr(encoder_output, "latent_dist") and sample_mode == "sample":
return encoder_output.latent_dist.sample(generator)
elif hasattr(encoder_output, "latent_dist") and sample_mode == "argmax":
return encoder_output.latent_dist.mode()
elif hasattr(encoder_output, "latents"):
return encoder_output.latents
else:
raise AttributeError("Could not access latents of provided encoder_output")
class QwenImagePipeline(): #DiffusionPipeline class QwenImagePipeline(): #DiffusionPipeline
r""" r"""
@ -151,20 +184,27 @@ class QwenImagePipeline(): #DiffusionPipeline
text_encoder, text_encoder,
tokenizer, tokenizer,
transformer, transformer,
processor,
): ):
self.vae=vae self.vae=vae
self.text_encoder=text_encoder self.text_encoder=text_encoder
self.tokenizer=tokenizer self.tokenizer=tokenizer
self.transformer=transformer self.transformer=transformer
self.processor = processor
self.latent_channels = self.vae.config.z_dim if getattr(self, "vae", None) else 16
self.vae_scale_factor = 2 ** len(self.vae.temperal_downsample) if getattr(self, "vae", None) else 8 self.vae_scale_factor = 2 ** len(self.vae.temperal_downsample) if getattr(self, "vae", None) else 8
# QwenImage latents are turned into 2x2 patches and packed. This means the latent width and height has to be divisible # QwenImage latents are turned into 2x2 patches and packed. This means the latent width and height has to be divisible
# by the patch size. So the vae scale factor is multiplied by the patch size to account for this # by the patch size. So the vae scale factor is multiplied by the patch size to account for this
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor * 2) self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor * 2)
self.tokenizer_max_length = 1024 self.tokenizer_max_length = 1024
self.prompt_template_encode = "<|im_start|>system\nDescribe the image by detailing the color, shape, size, texture, quantity, text, spatial relationships of the objects and background:<|im_end|>\n<|im_start|>user\n{}<|im_end|>\n<|im_start|>assistant\n" if processor is not None:
self.prompt_template_encode_start_idx = 34 self.prompt_template_encode = "<|im_start|>system\nDescribe the key features of the input image (color, shape, size, texture, objects, background), then explain how the user's text instruction should alter or modify the image. Generate a new image that meets the user's requirements while maintaining consistency with the original input where appropriate.<|im_end|>\n<|im_start|>user\n<|vision_start|><|image_pad|><|vision_end|>{}<|im_end|>\n<|im_start|>assistant\n"
self.prompt_template_encode_start_idx = 64
else:
self.prompt_template_encode = "<|im_start|>system\nDescribe the image by detailing the color, shape, size, texture, quantity, text, spatial relationships of the objects and background:<|im_end|>\n<|im_start|>user\n{}<|im_end|>\n<|im_start|>assistant\n"
self.prompt_template_encode_start_idx = 34
self.default_sample_size = 128 self.default_sample_size = 128
def _extract_masked_hidden(self, hidden_states: torch.Tensor, mask: torch.Tensor): def _extract_masked_hidden(self, hidden_states: torch.Tensor, mask: torch.Tensor):
@ -178,6 +218,7 @@ class QwenImagePipeline(): #DiffusionPipeline
def _get_qwen_prompt_embeds( def _get_qwen_prompt_embeds(
self, self,
prompt: Union[str, List[str]] = None, prompt: Union[str, List[str]] = None,
image: Optional[torch.Tensor] = None,
device: Optional[torch.device] = None, device: Optional[torch.device] = None,
dtype: Optional[torch.dtype] = None, dtype: Optional[torch.dtype] = None,
): ):
@ -189,16 +230,36 @@ class QwenImagePipeline(): #DiffusionPipeline
template = self.prompt_template_encode template = self.prompt_template_encode
drop_idx = self.prompt_template_encode_start_idx drop_idx = self.prompt_template_encode_start_idx
txt = [template.format(e) for e in prompt] txt = [template.format(e) for e in prompt]
txt_tokens = self.tokenizer(
txt, max_length=self.tokenizer_max_length + drop_idx, padding=True, truncation=True, return_tensors="pt" if self.processor is not None and image is not None:
).to(device) model_inputs = self.processor(
encoder_hidden_states = self.text_encoder( text=txt,
input_ids=txt_tokens.input_ids, images=image,
attention_mask=txt_tokens.attention_mask, padding=True,
output_hidden_states=True, return_tensors="pt",
) ).to(device)
hidden_states = encoder_hidden_states.hidden_states[-1]
split_hidden_states = self._extract_masked_hidden(hidden_states, txt_tokens.attention_mask) outputs = self.text_encoder(
input_ids=model_inputs.input_ids,
attention_mask=model_inputs.attention_mask,
pixel_values=model_inputs.pixel_values,
image_grid_thw=model_inputs.image_grid_thw,
output_hidden_states=True,
)
hidden_states = outputs.hidden_states[-1]
split_hidden_states = self._extract_masked_hidden(hidden_states, model_inputs.attention_mask)
else:
txt_tokens = self.tokenizer(
txt, max_length=self.tokenizer_max_length + drop_idx, padding=True, truncation=True, return_tensors="pt"
).to(device)
encoder_hidden_states = self.text_encoder(
input_ids=txt_tokens.input_ids,
attention_mask=txt_tokens.attention_mask,
output_hidden_states=True,
)
hidden_states = encoder_hidden_states.hidden_states[-1]
split_hidden_states = self._extract_masked_hidden(hidden_states, txt_tokens.attention_mask)
split_hidden_states = [e[drop_idx:] for e in split_hidden_states] split_hidden_states = [e[drop_idx:] for e in split_hidden_states]
attn_mask_list = [torch.ones(e.size(0), dtype=torch.long, device=e.device) for e in split_hidden_states] attn_mask_list = [torch.ones(e.size(0), dtype=torch.long, device=e.device) for e in split_hidden_states]
max_seq_len = max([e.size(0) for e in split_hidden_states]) max_seq_len = max([e.size(0) for e in split_hidden_states])
@ -216,6 +277,7 @@ class QwenImagePipeline(): #DiffusionPipeline
def encode_prompt( def encode_prompt(
self, self,
prompt: Union[str, List[str]], prompt: Union[str, List[str]],
image: Optional[torch.Tensor] = None,
device: Optional[torch.device] = None, device: Optional[torch.device] = None,
num_images_per_prompt: int = 1, num_images_per_prompt: int = 1,
prompt_embeds: Optional[torch.Tensor] = None, prompt_embeds: Optional[torch.Tensor] = None,
@ -227,6 +289,8 @@ class QwenImagePipeline(): #DiffusionPipeline
Args: Args:
prompt (`str` or `List[str]`, *optional*): prompt (`str` or `List[str]`, *optional*):
prompt to be encoded prompt to be encoded
image (`torch.Tensor`, *optional*):
image to be encoded
device: (`torch.device`): device: (`torch.device`):
torch device torch device
num_images_per_prompt (`int`): num_images_per_prompt (`int`):
@ -241,7 +305,7 @@ class QwenImagePipeline(): #DiffusionPipeline
batch_size = len(prompt) if prompt_embeds is None else prompt_embeds.shape[0] batch_size = len(prompt) if prompt_embeds is None else prompt_embeds.shape[0]
if prompt_embeds is None: if prompt_embeds is None:
prompt_embeds, prompt_embeds_mask = self._get_qwen_prompt_embeds(prompt, device) prompt_embeds, prompt_embeds_mask = self._get_qwen_prompt_embeds(prompt, image, device)
_, seq_len, _ = prompt_embeds.shape _, seq_len, _ = prompt_embeds.shape
prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1) prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)
@ -251,6 +315,7 @@ class QwenImagePipeline(): #DiffusionPipeline
return prompt_embeds, prompt_embeds_mask return prompt_embeds, prompt_embeds_mask
def check_inputs( def check_inputs(
self, self,
prompt, prompt,
@ -344,6 +409,29 @@ class QwenImagePipeline(): #DiffusionPipeline
return latents return latents
def _encode_vae_image(self, image: torch.Tensor, generator: torch.Generator):
if isinstance(generator, list):
image_latents = [
retrieve_latents(self.vae.encode(image[i : i + 1]), generator=generator[i], sample_mode="argmax")
for i in range(image.shape[0])
]
image_latents = torch.cat(image_latents, dim=0)
else:
image_latents = retrieve_latents(self.vae.encode(image), generator=generator, sample_mode="argmax")
latents_mean = (
torch.tensor(self.vae.config.latents_mean)
.view(1, self.latent_channels, 1, 1, 1)
.to(image_latents.device, image_latents.dtype)
)
latents_std = (
torch.tensor(self.vae.config.latents_std)
.view(1, self.latent_channels, 1, 1, 1)
.to(image_latents.device, image_latents.dtype)
)
image_latents = (image_latents - latents_mean) / latents_std
return image_latents
def enable_vae_slicing(self): def enable_vae_slicing(self):
r""" r"""
Enable sliced VAE decoding. When this option is enabled, the VAE will split the input tensor in slices to Enable sliced VAE decoding. When this option is enabled, the VAE will split the input tensor in slices to
@ -375,6 +463,7 @@ class QwenImagePipeline(): #DiffusionPipeline
def prepare_latents( def prepare_latents(
self, self,
image,
batch_size, batch_size,
num_channels_latents, num_channels_latents,
height, height,
@ -391,22 +480,41 @@ class QwenImagePipeline(): #DiffusionPipeline
shape = (batch_size, 1, num_channels_latents, height, width) shape = (batch_size, 1, num_channels_latents, height, width)
if latents is not None: image_latents = None
latent_image_ids = self._prepare_latent_image_ids(batch_size, height // 2, width // 2, device, dtype) if image is not None:
return latents.to(device=device, dtype=dtype), latent_image_ids image = image.to(device=device, dtype=dtype)
if image.shape[1] != self.latent_channels:
image_latents = self._encode_vae_image(image=image, generator=generator)
else:
image_latents = image
if batch_size > image_latents.shape[0] and batch_size % image_latents.shape[0] == 0:
# expand init_latents for batch_size
additional_image_per_prompt = batch_size // image_latents.shape[0]
image_latents = torch.cat([image_latents] * additional_image_per_prompt, dim=0)
elif batch_size > image_latents.shape[0] and batch_size % image_latents.shape[0] != 0:
raise ValueError(
f"Cannot duplicate `image` of batch size {image_latents.shape[0]} to {batch_size} text prompts."
)
else:
image_latents = torch.cat([image_latents], dim=0)
image_latent_height, image_latent_width = image_latents.shape[3:]
image_latents = self._pack_latents(
image_latents, batch_size, num_channels_latents, image_latent_height, image_latent_width
)
if isinstance(generator, list) and len(generator) != batch_size: if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError( raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch" f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators." f" size of {batch_size}. Make sure the batch size matches the length of the generators."
) )
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
latents = self._pack_latents(latents, batch_size, num_channels_latents, height, width)
else:
latents = latents.to(device=device, dtype=dtype)
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype) return latents, image_latents
latents = self._pack_latents(latents, batch_size, num_channels_latents, height, width)
latent_image_ids = self._prepare_latent_image_ids(batch_size, height // 2, width // 2, device, dtype)
return latents, latent_image_ids
@property @property
def guidance_scale(self): def guidance_scale(self):
@ -453,6 +561,7 @@ class QwenImagePipeline(): #DiffusionPipeline
callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None, callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None,
callback_on_step_end_tensor_inputs: List[str] = ["latents"], callback_on_step_end_tensor_inputs: List[str] = ["latents"],
max_sequence_length: int = 512, max_sequence_length: int = 512,
image = None,
callback=None, callback=None,
pipeline=None, pipeline=None,
loras_slists=None, loras_slists=None,
@ -540,6 +649,10 @@ class QwenImagePipeline(): #DiffusionPipeline
height = height or self.default_sample_size * self.vae_scale_factor height = height or self.default_sample_size * self.vae_scale_factor
width = width or self.default_sample_size * self.vae_scale_factor width = width or self.default_sample_size * self.vae_scale_factor
multiple_of = self.vae_scale_factor * 2
width = width // multiple_of * multiple_of
height = height // multiple_of * multiple_of
# 1. Check inputs. Raise error if not correct # 1. Check inputs. Raise error if not correct
self.check_inputs( self.check_inputs(
prompt, prompt,
@ -567,13 +680,30 @@ class QwenImagePipeline(): #DiffusionPipeline
else: else:
batch_size = prompt_embeds.shape[0] batch_size = prompt_embeds.shape[0]
device = "cuda" device = "cuda"
# device = self._execution_device
prompt_image = None
if image is not None and not (isinstance(image, torch.Tensor) and image.size(1) == self.latent_channels):
image = image[0] if isinstance(image, list) else image
image_height, image_width = self.image_processor.get_default_height_width(image)
aspect_ratio = image_width / image_height
if True :
_, image_width, image_height = min(
(abs(aspect_ratio - w / h), w, h) for w, h in PREFERRED_QWENIMAGE_RESOLUTIONS
)
image_width = image_width // multiple_of * multiple_of
image_height = image_height // multiple_of * multiple_of
# image = self.image_processor.resize(image, image_height, image_width)
image = image.resize((image_width,image_height), resample=Image.Resampling.LANCZOS)
prompt_image = image
image = self.image_processor.preprocess(image, image_height, image_width)
image = image.unsqueeze(2)
has_neg_prompt = negative_prompt is not None or ( has_neg_prompt = negative_prompt is not None or (
negative_prompt_embeds is not None and negative_prompt_embeds_mask is not None negative_prompt_embeds is not None and negative_prompt_embeds_mask is not None
) )
do_true_cfg = true_cfg_scale > 1 and has_neg_prompt do_true_cfg = true_cfg_scale > 1 and has_neg_prompt
prompt_embeds, prompt_embeds_mask = self.encode_prompt( prompt_embeds, prompt_embeds_mask = self.encode_prompt(
image=prompt_image,
prompt=prompt, prompt=prompt,
prompt_embeds=prompt_embeds, prompt_embeds=prompt_embeds,
prompt_embeds_mask=prompt_embeds_mask, prompt_embeds_mask=prompt_embeds_mask,
@ -583,6 +713,7 @@ class QwenImagePipeline(): #DiffusionPipeline
) )
if do_true_cfg: if do_true_cfg:
negative_prompt_embeds, negative_prompt_embeds_mask = self.encode_prompt( negative_prompt_embeds, negative_prompt_embeds_mask = self.encode_prompt(
image=prompt_image,
prompt=negative_prompt, prompt=negative_prompt,
prompt_embeds=negative_prompt_embeds, prompt_embeds=negative_prompt_embeds,
prompt_embeds_mask=negative_prompt_embeds_mask, prompt_embeds_mask=negative_prompt_embeds_mask,
@ -597,7 +728,8 @@ class QwenImagePipeline(): #DiffusionPipeline
# 4. Prepare latent variables # 4. Prepare latent variables
num_channels_latents = self.transformer.in_channels // 4 num_channels_latents = self.transformer.in_channels // 4
latents, latent_image_ids = self.prepare_latents( latents, image_latents = self.prepare_latents(
image,
batch_size * num_images_per_prompt, batch_size * num_images_per_prompt,
num_channels_latents, num_channels_latents,
height, height,
@ -607,7 +739,15 @@ class QwenImagePipeline(): #DiffusionPipeline
generator, generator,
latents, latents,
) )
img_shapes = [(1, height // self.vae_scale_factor // 2, width // self.vae_scale_factor // 2)] * batch_size if image is not None:
img_shapes = [
[
(1, height // self.vae_scale_factor // 2, width // self.vae_scale_factor // 2),
(1, image_height // self.vae_scale_factor // 2, image_width // self.vae_scale_factor // 2),
]
] * batch_size
else:
img_shapes = [(1, height // self.vae_scale_factor // 2, width // self.vae_scale_factor // 2)] * batch_size
# 5. Prepare timesteps # 5. Prepare timesteps
sigmas = np.linspace(1.0, 1 / num_inference_steps, num_inference_steps) if sigmas is None else sigmas sigmas = np.linspace(1.0, 1 / num_inference_steps, num_inference_steps) if sigmas is None else sigmas
@ -639,6 +779,11 @@ class QwenImagePipeline(): #DiffusionPipeline
if self.attention_kwargs is None: if self.attention_kwargs is None:
self._attention_kwargs = {} self._attention_kwargs = {}
txt_seq_lens = prompt_embeds_mask.sum(dim=1).tolist() if prompt_embeds_mask is not None else None
negative_txt_seq_lens = (
negative_prompt_embeds_mask.sum(dim=1).tolist() if negative_prompt_embeds_mask is not None else None
)
# 6. Denoising loop # 6. Denoising loop
self.scheduler.set_begin_index(0) self.scheduler.set_begin_index(0)
updated_num_steps= len(timesteps) updated_num_steps= len(timesteps)
@ -655,46 +800,54 @@ class QwenImagePipeline(): #DiffusionPipeline
# broadcast to batch dimension in a way that's compatible with ONNX/Core ML # broadcast to batch dimension in a way that's compatible with ONNX/Core ML
timestep = t.expand(latents.shape[0]).to(latents.dtype) timestep = t.expand(latents.shape[0]).to(latents.dtype)
latent_model_input = latents
if image_latents is not None:
latent_model_input = torch.cat([latents, image_latents], dim=1)
if do_true_cfg and joint_pass: if do_true_cfg and joint_pass:
noise_pred, neg_noise_pred = self.transformer( noise_pred, neg_noise_pred = self.transformer(
hidden_states=latents, hidden_states=latent_model_input,
timestep=timestep / 1000, timestep=timestep / 1000,
guidance=guidance, guidance=guidance,
encoder_hidden_states_mask_list=[prompt_embeds_mask,negative_prompt_embeds_mask], encoder_hidden_states_mask_list=[prompt_embeds_mask,negative_prompt_embeds_mask],
encoder_hidden_states_list=[prompt_embeds, negative_prompt_embeds], encoder_hidden_states_list=[prompt_embeds, negative_prompt_embeds],
img_shapes=img_shapes, img_shapes=img_shapes,
txt_seq_lens_list=[prompt_embeds_mask.sum(dim=1).tolist(),negative_prompt_embeds_mask.sum(dim=1).tolist()], txt_seq_lens_list=[txt_seq_lens, negative_txt_seq_lens],
attention_kwargs=self.attention_kwargs, attention_kwargs=self.attention_kwargs,
**kwargs **kwargs
) )
if noise_pred == None: return None if noise_pred == None: return None
noise_pred = noise_pred[:, : latents.size(1)]
neg_noise_pred = neg_noise_pred[:, : latents.size(1)]
else: else:
noise_pred = self.transformer( noise_pred = self.transformer(
hidden_states=latents, hidden_states=latent_model_input,
timestep=timestep / 1000, timestep=timestep / 1000,
guidance=guidance, guidance=guidance,
encoder_hidden_states_mask_list=[prompt_embeds_mask], encoder_hidden_states_mask_list=[prompt_embeds_mask],
encoder_hidden_states_list=[prompt_embeds], encoder_hidden_states_list=[prompt_embeds],
img_shapes=img_shapes, img_shapes=img_shapes,
txt_seq_lens_list=[prompt_embeds_mask.sum(dim=1).tolist()], txt_seq_lens_list=[txt_seq_lens],
attention_kwargs=self.attention_kwargs, attention_kwargs=self.attention_kwargs,
**kwargs **kwargs
)[0] )[0]
if noise_pred == None: return None if noise_pred == None: return None
noise_pred = noise_pred[:, : latents.size(1)]
if do_true_cfg: if do_true_cfg:
neg_noise_pred = self.transformer( neg_noise_pred = self.transformer(
hidden_states=latents, hidden_states=latent_model_input,
timestep=timestep / 1000, timestep=timestep / 1000,
guidance=guidance, guidance=guidance,
encoder_hidden_states_mask_list=[negative_prompt_embeds_mask], encoder_hidden_states_mask_list=[negative_prompt_embeds_mask],
encoder_hidden_states_list=[negative_prompt_embeds], encoder_hidden_states_list=[negative_prompt_embeds],
img_shapes=img_shapes, img_shapes=img_shapes,
txt_seq_lens_list=[negative_prompt_embeds_mask.sum(dim=1).tolist()], txt_seq_lens_list=[negative_txt_seq_lens],
attention_kwargs=self.attention_kwargs, attention_kwargs=self.attention_kwargs,
**kwargs **kwargs
)[0] )[0]
if neg_noise_pred == None: return None if neg_noise_pred == None: return None
neg_noise_pred = neg_noise_pred[:, : latents.size(1)]
if do_true_cfg: if do_true_cfg:
comb_pred = neg_noise_pred + true_cfg_scale * (noise_pred - neg_noise_pred) comb_pred = neg_noise_pred + true_cfg_scale * (noise_pred - neg_noise_pred)

View File

@ -13,7 +13,8 @@ class family_handler():
"image_outputs" : True, "image_outputs" : True,
"sample_solvers":[ "sample_solvers":[
("Default", "default"), ("Default", "default"),
("Lightning", "lightning")] ("Lightning", "lightning")],
"guidance_max_phases" : 1,
} }
@ -21,7 +22,7 @@ class family_handler():
@staticmethod @staticmethod
def query_supported_types(): def query_supported_types():
return ["qwen_image_20B"] return ["qwen_image_20B", "qwen_image_edit_20B"]
@staticmethod @staticmethod
def query_family_maps(): def query_family_maps():
@ -41,7 +42,7 @@ class family_handler():
return { return {
"repoId" : "DeepBeepMeep/Qwen_image", "repoId" : "DeepBeepMeep/Qwen_image",
"sourceFolderList" : ["", "Qwen2.5-VL-7B-Instruct"], "sourceFolderList" : ["", "Qwen2.5-VL-7B-Instruct"],
"fileList" : [ ["qwen_vae.safetensors", "qwen_vae_config.json"], ["merges.txt", "tokenizer_config.json", "config.json", "vocab.json"] + computeList(text_encoder_filename) ] "fileList" : [ ["qwen_vae.safetensors", "qwen_vae_config.json"], ["merges.txt", "tokenizer_config.json", "config.json", "vocab.json", "video_preprocessor_config.json", "preprocessor_config.json"] + computeList(text_encoder_filename) ]
} }
@staticmethod @staticmethod

View File

@ -12,10 +12,24 @@ from .transformer_qwenimage import QwenImageTransformer2DModel
from diffusers.utils import logging, replace_example_docstring from diffusers.utils import logging, replace_example_docstring
from diffusers.utils.torch_utils import randn_tensor from diffusers.utils.torch_utils import randn_tensor
from transformers import Qwen2_5_VLForConditionalGeneration, Qwen2Tokenizer, AutoTokenizer from transformers import Qwen2_5_VLForConditionalGeneration, Qwen2Tokenizer, AutoTokenizer, Qwen2VLProcessor
from .autoencoder_kl_qwenimage import AutoencoderKLQwenImage from .autoencoder_kl_qwenimage import AutoencoderKLQwenImage
from diffusers import FlowMatchEulerDiscreteScheduler from diffusers import FlowMatchEulerDiscreteScheduler
from .pipeline_qwenimage import QwenImagePipeline from .pipeline_qwenimage import QwenImagePipeline
from PIL import Image
from shared.utils.utils import calculate_new_dimensions
def stitch_images(img1, img2):
# Resize img2 to match img1's height
width1, height1 = img1.size
width2, height2 = img2.size
new_width2 = int(width2 * height1 / height2)
img2_resized = img2.resize((new_width2, height1), Image.Resampling.LANCZOS)
stitched = Image.new('RGB', (width1 + new_width2, height1))
stitched.paste(img1, (0, 0))
stitched.paste(img2_resized, (width1, 0))
return stitched
class model_factory(): class model_factory():
def __init__( def __init__(
@ -35,9 +49,16 @@ class model_factory():
transformer_filename = model_filename[0] transformer_filename = model_filename[0]
tokenizer = AutoTokenizer.from_pretrained(os.path.join(checkpoint_dir,"Qwen2.5-VL-7B-Instruct")) processor = None
tokenizer = None
if base_model_type == "qwen_image_edit_20B":
processor = Qwen2VLProcessor.from_pretrained(os.path.join(checkpoint_dir,"Qwen2.5-VL-7B-Instruct"))
else:
tokenizer = AutoTokenizer.from_pretrained(os.path.join(checkpoint_dir,"Qwen2.5-VL-7B-Instruct"))
with open("configs/qwen_image_20B.json", 'r', encoding='utf-8') as f:
base_config_file = "configs/qwen_image_20B.json"
with open(base_config_file, 'r', encoding='utf-8') as f:
transformer_config = json.load(f) transformer_config = json.load(f)
transformer_config.pop("_diffusers_version") transformer_config.pop("_diffusers_version")
transformer_config.pop("_class_name") transformer_config.pop("_class_name")
@ -46,9 +67,22 @@ class model_factory():
from accelerate import init_empty_weights from accelerate import init_empty_weights
with init_empty_weights(): with init_empty_weights():
transformer = QwenImageTransformer2DModel(**transformer_config) transformer = QwenImageTransformer2DModel(**transformer_config)
offload.load_model_data(transformer, transformer_filename) source = model_def.get("source", None)
if source is not None:
offload.load_model_data(transformer, source)
else:
offload.load_model_data(transformer, transformer_filename)
# transformer = offload.fast_load_transformers_model("transformer_quanto.safetensors", writable_tensors= True , modelClass=QwenImageTransformer2DModel, defaultConfigPath="transformer_config.json") # transformer = offload.fast_load_transformers_model("transformer_quanto.safetensors", writable_tensors= True , modelClass=QwenImageTransformer2DModel, defaultConfigPath="transformer_config.json")
if not source is None:
from wgp import save_model
save_model(transformer, model_type, dtype, None)
if save_quantized:
from wgp import save_quantized_model
save_quantized_model(transformer, model_type, model_filename[0], dtype, base_config_file)
text_encoder = offload.fast_load_transformers_model(text_encoder_filename, writable_tensors= True , modelClass=Qwen2_5_VLForConditionalGeneration, defaultConfigPath= os.path.join(checkpoint_dir, "Qwen2.5-VL-7B-Instruct", "config.json")) text_encoder = offload.fast_load_transformers_model(text_encoder_filename, writable_tensors= True , modelClass=Qwen2_5_VLForConditionalGeneration, defaultConfigPath= os.path.join(checkpoint_dir, "Qwen2.5-VL-7B-Instruct", "config.json"))
# text_encoder = offload.fast_load_transformers_model(text_encoder_filename, do_quantize=True, writable_tensors= True , modelClass=Qwen2_5_VLForConditionalGeneration, defaultConfigPath="text_encoder_config.json", verboseLevel=2) # text_encoder = offload.fast_load_transformers_model(text_encoder_filename, do_quantize=True, writable_tensors= True , modelClass=Qwen2_5_VLForConditionalGeneration, defaultConfigPath="text_encoder_config.json", verboseLevel=2)
# text_encoder.to(torch.float16) # text_encoder.to(torch.float16)
@ -56,11 +90,12 @@ class model_factory():
vae = offload.fast_load_transformers_model( os.path.join(checkpoint_dir,"qwen_vae.safetensors"), writable_tensors= True , modelClass=AutoencoderKLQwenImage, defaultConfigPath=os.path.join(checkpoint_dir,"qwen_vae_config.json")) vae = offload.fast_load_transformers_model( os.path.join(checkpoint_dir,"qwen_vae.safetensors"), writable_tensors= True , modelClass=AutoencoderKLQwenImage, defaultConfigPath=os.path.join(checkpoint_dir,"qwen_vae_config.json"))
self.pipeline = QwenImagePipeline(vae, text_encoder, tokenizer, transformer) self.pipeline = QwenImagePipeline(vae, text_encoder, tokenizer, transformer, processor)
self.vae=vae self.vae=vae
self.text_encoder=text_encoder self.text_encoder=text_encoder
self.tokenizer=tokenizer self.tokenizer=tokenizer
self.transformer=transformer self.transformer=transformer
self.processor = processor
def generate( def generate(
self, self,
@ -141,19 +176,31 @@ class model_factory():
if n_prompt is None or len(n_prompt) == 0: if n_prompt is None or len(n_prompt) == 0:
n_prompt= "text, watermark, copyright, blurry, low resolution" n_prompt= "text, watermark, copyright, blurry, low resolution"
if input_ref_images is not None:
# image stiching method
stiched = input_ref_images[0]
if "K" in video_prompt_type :
w, h = input_ref_images[0].size
height, width = calculate_new_dimensions(height, width, h, w, fit_into_canvas)
for new_img in input_ref_images[1:]:
stiched = stitch_images(stiched, new_img)
input_ref_images = [stiched]
image = self.pipeline( image = self.pipeline(
prompt=input_prompt, prompt=input_prompt,
negative_prompt=n_prompt, negative_prompt=n_prompt,
width=width, image = input_ref_images,
height=height, width=width,
num_inference_steps=sampling_steps, height=height,
num_images_per_prompt = batch_size, num_inference_steps=sampling_steps,
true_cfg_scale=guide_scale, num_images_per_prompt = batch_size,
callback = callback, true_cfg_scale=guide_scale,
pipeline=self, callback = callback,
loras_slists=loras_slists, pipeline=self,
joint_pass = joint_pass, loras_slists=loras_slists,
generator=torch.Generator(device="cuda").manual_seed(seed) joint_pass = joint_pass,
generator=torch.Generator(device="cuda").manual_seed(seed)
) )
if image is None: return None if image is None: return None
return image.transpose(0, 1) return image.transpose(0, 1)

View File

@ -26,6 +26,7 @@ from diffusers.models.embeddings import TimestepEmbedding, Timesteps
from diffusers.models.modeling_outputs import Transformer2DModelOutput from diffusers.models.modeling_outputs import Transformer2DModelOutput
from diffusers.models.normalization import AdaLayerNormContinuous, RMSNorm from diffusers.models.normalization import AdaLayerNormContinuous, RMSNorm
from shared.attention import pay_attention from shared.attention import pay_attention
import functools
def get_timestep_embedding( def get_timestep_embedding(
timesteps: torch.Tensor, timesteps: torch.Tensor,
@ -150,8 +151,8 @@ class QwenEmbedRope(nn.Module):
super().__init__() super().__init__()
self.theta = theta self.theta = theta
self.axes_dim = axes_dim self.axes_dim = axes_dim
pos_index = torch.arange(1024) pos_index = torch.arange(4096)
neg_index = torch.arange(1024).flip(0) * -1 - 1 neg_index = torch.arange(4096).flip(0) * -1 - 1
self.pos_freqs = torch.cat( self.pos_freqs = torch.cat(
[ [
self.rope_params(pos_index, self.axes_dim[0], self.theta), self.rope_params(pos_index, self.axes_dim[0], self.theta),
@ -170,7 +171,7 @@ class QwenEmbedRope(nn.Module):
) )
self.rope_cache = {} self.rope_cache = {}
# 是否使用 scale rope # DO NOT USING REGISTER BUFFER HERE, IT WILL CAUSE COMPLEX NUMBERS LOSE ITS IMAGINARY PART
self.scale_rope = scale_rope self.scale_rope = scale_rope
def rope_params(self, index, dim, theta=10000): def rope_params(self, index, dim, theta=10000):
@ -194,38 +195,54 @@ class QwenEmbedRope(nn.Module):
if isinstance(video_fhw, list): if isinstance(video_fhw, list):
video_fhw = video_fhw[0] video_fhw = video_fhw[0]
frame, height, width = video_fhw if not isinstance(video_fhw, list):
rope_key = f"{frame}_{height}_{width}" video_fhw = [video_fhw]
if rope_key not in self.rope_cache: vid_freqs = []
seq_lens = frame * height * width max_vid_index = 0
freqs_pos = self.pos_freqs.split([x // 2 for x in self.axes_dim], dim=1) for idx, fhw in enumerate(video_fhw):
freqs_neg = self.neg_freqs.split([x // 2 for x in self.axes_dim], dim=1) frame, height, width = fhw
freqs_frame = freqs_pos[0][:frame].view(frame, 1, 1, -1).expand(frame, height, width, -1) rope_key = f"{idx}_{height}_{width}"
if self.scale_rope:
freqs_height = torch.cat([freqs_neg[1][-(height - height // 2) :], freqs_pos[1][: height // 2]], dim=0)
freqs_height = freqs_height.view(1, height, 1, -1).expand(frame, height, width, -1)
freqs_width = torch.cat([freqs_neg[2][-(width - width // 2) :], freqs_pos[2][: width // 2]], dim=0)
freqs_width = freqs_width.view(1, 1, width, -1).expand(frame, height, width, -1)
if not torch.compiler.is_compiling():
if rope_key not in self.rope_cache:
self.rope_cache[rope_key] = self._compute_video_freqs(frame, height, width, idx)
video_freq = self.rope_cache[rope_key]
else: else:
freqs_height = freqs_pos[1][:height].view(1, height, 1, -1).expand(frame, height, width, -1) video_freq = self._compute_video_freqs(frame, height, width, idx)
freqs_width = freqs_pos[2][:width].view(1, 1, width, -1).expand(frame, height, width, -1) video_freq = video_freq.to(device)
vid_freqs.append(video_freq)
freqs = torch.cat([freqs_frame, freqs_height, freqs_width], dim=-1).reshape(seq_lens, -1) if self.scale_rope:
self.rope_cache[rope_key] = freqs.clone().contiguous() max_vid_index = max(height // 2, width // 2, max_vid_index)
vid_freqs = self.rope_cache[rope_key] else:
max_vid_index = max(height, width, max_vid_index)
if self.scale_rope:
max_vid_index = max(height // 2, width // 2)
else:
max_vid_index = max(height, width)
max_len = max(txt_seq_lens) max_len = max(txt_seq_lens)
txt_freqs = self.pos_freqs[max_vid_index : max_vid_index + max_len, ...] txt_freqs = self.pos_freqs[max_vid_index : max_vid_index + max_len, ...]
vid_freqs = torch.cat(vid_freqs, dim=0)
return vid_freqs, txt_freqs return vid_freqs, txt_freqs
@functools.lru_cache(maxsize=None)
def _compute_video_freqs(self, frame, height, width, idx=0):
seq_lens = frame * height * width
freqs_pos = self.pos_freqs.split([x // 2 for x in self.axes_dim], dim=1)
freqs_neg = self.neg_freqs.split([x // 2 for x in self.axes_dim], dim=1)
freqs_frame = freqs_pos[0][idx : idx + frame].view(frame, 1, 1, -1).expand(frame, height, width, -1)
if self.scale_rope:
freqs_height = torch.cat([freqs_neg[1][-(height - height // 2) :], freqs_pos[1][: height // 2]], dim=0)
freqs_height = freqs_height.view(1, height, 1, -1).expand(frame, height, width, -1)
freqs_width = torch.cat([freqs_neg[2][-(width - width // 2) :], freqs_pos[2][: width // 2]], dim=0)
freqs_width = freqs_width.view(1, 1, width, -1).expand(frame, height, width, -1)
else:
freqs_height = freqs_pos[1][:height].view(1, height, 1, -1).expand(frame, height, width, -1)
freqs_width = freqs_pos[2][:width].view(1, 1, width, -1).expand(frame, height, width, -1)
freqs = torch.cat([freqs_frame, freqs_height, freqs_width], dim=-1).reshape(seq_lens, -1)
return freqs.clone().contiguous()
class QwenDoubleStreamAttnProcessor2_0: class QwenDoubleStreamAttnProcessor2_0:
""" """

View File

@ -393,7 +393,11 @@ class WanAny2V:
sampling_steps=50, sampling_steps=50,
guide_scale=5.0, guide_scale=5.0,
guide2_scale = 5.0, guide2_scale = 5.0,
guide3_scale = 5.0,
switch_threshold = 0, switch_threshold = 0,
switch2_threshold = 0,
guide_phases= 1 ,
model_switch_phase = 1,
n_prompt="", n_prompt="",
seed=-1, seed=-1,
callback = None, callback = None,
@ -427,6 +431,7 @@ class WanAny2V:
prefix_frames_count = 0, prefix_frames_count = 0,
image_mode = 0, image_mode = 0,
window_no = 0, window_no = 0,
set_header_text = None,
**bbargs **bbargs
): ):
@ -745,16 +750,25 @@ class WanAny2V:
# init denoising # init denoising
updated_num_steps= len(timesteps) updated_num_steps= len(timesteps)
if callback != None:
from shared.utils.loras_mutipliers import update_loras_slists denoising_extra = ""
model_switch_step = updated_num_steps from shared.utils.loras_mutipliers import update_loras_slists, get_model_switch_steps
for i, t in enumerate(timesteps):
if t <= switch_threshold: phase_switch_step, phase_switch_step2, phases_description = get_model_switch_steps(timesteps, updated_num_steps, guide_phases, 0 if self.model2 is None else model_switch_phase, switch_threshold, switch2_threshold )
model_switch_step = i if len(phases_description) > 0: set_header_text(phases_description)
break guidance_switch_done = guidance_switch2_done = False
update_loras_slists(self.model, loras_slists, updated_num_steps, model_switch_step= model_switch_step) if guide_phases > 1: denoising_extra = f"Phase 1/{guide_phases} High Noise" if self.model2 is not None else f"Phase 1/{guide_phases}"
if self.model2 is not None: update_loras_slists(self.model2, loras_slists, updated_num_steps, model_switch_step= model_switch_step) def update_guidance(step_no, t, guide_scale, new_guide_scale, guidance_switch_done, switch_threshold, trans, phase_no, denoising_extra):
callback(-1, None, True, override_num_inference_steps = updated_num_steps) if guide_phases >= phase_no and not guidance_switch_done and t <= switch_threshold:
if model_switch_phase == phase_no-1 and self.model2 is not None: trans = self.model2
guide_scale, guidance_switch_done = new_guide_scale, True
denoising_extra = f"Phase {phase_no}/{guide_phases} {'Low Noise' if trans == self.model2 else 'High Noise'}" if self.model2 is not None else f"Phase {phase_no}/{guide_phases}"
callback(step_no-1, denoising_extra = denoising_extra)
return guide_scale, guidance_switch_done, trans, denoising_extra
update_loras_slists(self.model, loras_slists, updated_num_steps, phase_switch_step= phase_switch_step, phase_switch_step2= phase_switch_step2)
if self.model2 is not None: update_loras_slists(self.model2, loras_slists, updated_num_steps, phase_switch_step= phase_switch_step, phase_switch_step2= phase_switch_step2)
callback(-1, None, True, override_num_inference_steps = updated_num_steps, denoising_extra = denoising_extra)
if sample_scheduler != None: if sample_scheduler != None:
scheduler_kwargs = {} if isinstance(sample_scheduler, FlowMatchScheduler) else {"generator": seed_g} scheduler_kwargs = {} if isinstance(sample_scheduler, FlowMatchScheduler) else {"generator": seed_g}
@ -766,16 +780,12 @@ class WanAny2V:
text_momentumbuffer = MomentumBuffer(apg_momentum) text_momentumbuffer = MomentumBuffer(apg_momentum)
audio_momentumbuffer = MomentumBuffer(apg_momentum) audio_momentumbuffer = MomentumBuffer(apg_momentum)
guidance_switch_done = False
# denoising # denoising
trans = self.model trans = self.model
for i, t in enumerate(tqdm(timesteps)): for i, t in enumerate(tqdm(timesteps)):
if not guidance_switch_done and t <= switch_threshold: guide_scale, guidance_switch_done, trans, denoising_extra = update_guidance(i, t, guide_scale, guide2_scale, guidance_switch_done, switch_threshold, trans, 2, denoising_extra)
guide_scale = guide2_scale guide_scale, guidance_switch2_done, trans, denoising_extra = update_guidance(i, t, guide_scale, guide3_scale, guidance_switch2_done, switch2_threshold, trans, 3, denoising_extra)
if self.model2 is not None: trans = self.model2
guidance_switch_done = True
offload.set_step_no_for_lora(trans, i) offload.set_step_no_for_lora(trans, i)
timestep = torch.stack([t]) timestep = torch.stack([t])
@ -920,7 +930,7 @@ class WanAny2V:
if trim_frames > 0: latents_preview= latents_preview[:, :,:-trim_frames] if trim_frames > 0: latents_preview= latents_preview[:, :,:-trim_frames]
if image_outputs: latents_preview= latents_preview[:, :,:1] if image_outputs: latents_preview= latents_preview[:, :,:1]
if len(latents_preview) > 1: latents_preview = latents_preview.transpose(0,2) if len(latents_preview) > 1: latents_preview = latents_preview.transpose(0,2)
callback(i, latents_preview[0], False) callback(i, latents_preview[0], False, denoising_extra =denoising_extra )
latents_preview = None latents_preview = None
if timestep_injection: if timestep_injection:

View File

@ -24,6 +24,8 @@ class family_handler():
extra_model_def["sliding_window"] = True extra_model_def["sliding_window"] = True
extra_model_def["skip_layer_guidance"] = True extra_model_def["skip_layer_guidance"] = True
extra_model_def["tea_cache"] = True extra_model_def["tea_cache"] = True
extra_model_def["guidance_max_phases"] = 1
return extra_model_def return extra_model_def
@staticmethod @staticmethod

View File

@ -767,6 +767,15 @@ class WanModel(ModelMixin, ConfigMixin):
if first == None: if first == None:
return sd return sd
new_sd = {}
# for k,v in sd.items():
# if k.endswith("modulation.diff"):
# pass
# else:
# new_sd[ k] = v
# sd = new_sd
# if first.startswith("blocks."): # if first.startswith("blocks."):
# new_sd = {} # new_sd = {}
# for k,v in sd.items(): # for k,v in sd.items():

View File

@ -7,6 +7,9 @@ def test_class_i2v(base_model_type):
def test_class_1_3B(base_model_type): def test_class_1_3B(base_model_type):
return base_model_type in [ "vace_1.3B", "t2v_1.3B", "recam_1.3B","phantom_1.3B","fun_inp_1.3B"] return base_model_type in [ "vace_1.3B", "t2v_1.3B", "recam_1.3B","phantom_1.3B","fun_inp_1.3B"]
def test_multitalk(base_model_type):
return base_model_type in ["multitalk", "vace_multitalk_14B", "i2v_2_2_multitalk"]
class family_handler(): class family_handler():
@staticmethod @staticmethod
@ -79,11 +82,11 @@ class family_handler():
extra_model_def["no_steps_skipping"] = True extra_model_def["no_steps_skipping"] = True
i2v = test_class_i2v(base_model_type) i2v = test_class_i2v(base_model_type)
extra_model_def["i2v_class"] = i2v extra_model_def["i2v_class"] = i2v
extra_model_def["multitalk_class"] = base_model_type in ["multitalk", "vace_multitalk_14B", "i2v_2_2_multitalk"] extra_model_def["multitalk_class"] = test_multitalk(base_model_type)
vace_class = base_model_type in ["vace_14B", "vace_1.3B", "vace_multitalk_14B"] vace_class = base_model_type in ["vace_14B", "vace_1.3B", "vace_multitalk_14B"]
extra_model_def["vace_class"] = vace_class extra_model_def["vace_class"] = vace_class
if base_model_type in ["multitalk", "vace_multitalk_14B", "i2v_2_2_multitalk"]: if test_multitalk(base_model_type):
fps = 25 fps = 25
elif base_model_type in ["fantasy"]: elif base_model_type in ["fantasy"]:
fps = 23 fps = 23
@ -92,7 +95,7 @@ class family_handler():
else: else:
fps = 16 fps = 16
extra_model_def["fps"] =fps extra_model_def["fps"] =fps
multiple_submodels = "URLs2" in model_def
if vace_class: if vace_class:
frames_minimum, frames_steps = 17, 4 frames_minimum, frames_steps = 17, 4
else: else:
@ -101,12 +104,13 @@ class family_handler():
"frames_minimum" : frames_minimum, "frames_minimum" : frames_minimum,
"frames_steps" : frames_steps, "frames_steps" : frames_steps,
"sliding_window" : base_model_type in ["multitalk", "t2v", "fantasy"] or test_class_i2v(base_model_type) or vace_class, #"ti2v_2_2", "sliding_window" : base_model_type in ["multitalk", "t2v", "fantasy"] or test_class_i2v(base_model_type) or vace_class, #"ti2v_2_2",
"guidance_max_phases" : 2, "multiple_submodels" : multiple_submodels,
"guidance_max_phases" : 3,
"skip_layer_guidance" : True, "skip_layer_guidance" : True,
"cfg_zero" : True, "cfg_zero" : True,
"cfg_star" : True, "cfg_star" : True,
"adaptive_projected_guidance" : True, "adaptive_projected_guidance" : True,
"tea_cache" : not (base_model_type in ["i2v_2_2", "ti2v_2_2" ] or "URLs2" in model_def), "tea_cache" : not (base_model_type in ["i2v_2_2", "ti2v_2_2" ] or multiple_submodels),
"mag_cache" : True, "mag_cache" : True,
"sample_solvers":[ "sample_solvers":[
("unipc", "unipc"), ("unipc", "unipc"),
@ -157,8 +161,7 @@ class family_handler():
@staticmethod @staticmethod
def get_rgb_factors(base_model_type ): def get_rgb_factors(base_model_type ):
from shared.RGB_factors import get_rgb_factors from shared.RGB_factors import get_rgb_factors
if base_model_type == "ti2v_2_2": return None, None latent_rgb_factors, latent_rgb_factors_bias = get_rgb_factors("wan", base_model_type)
latent_rgb_factors, latent_rgb_factors_bias = get_rgb_factors("wan")
return latent_rgb_factors, latent_rgb_factors_bias return latent_rgb_factors, latent_rgb_factors_bias
@staticmethod @staticmethod
@ -218,6 +221,10 @@ class family_handler():
if ui_defaults.get("sample_solver", "") == "": if ui_defaults.get("sample_solver", "") == "":
ui_defaults["sample_solver"] = "unipc" ui_defaults["sample_solver"] = "unipc"
if settings_version < 2.24:
if model_def.get("multiple_submodels", False) or ui_defaults.get("switch_threshold", 0) > 0:
ui_defaults["guidance_phases"] = 2
@staticmethod @staticmethod
def update_default_settings(base_model_type, model_def, ui_defaults): def update_default_settings(base_model_type, model_def, ui_defaults):
ui_defaults.update({ ui_defaults.update({
@ -233,7 +240,6 @@ class family_handler():
ui_defaults.update({ ui_defaults.update({
"guidance_scale": 5.0, "guidance_scale": 5.0,
"flow_shift": 7, # 11 for 720p "flow_shift": 7, # 11 for 720p
"audio_guidance_scale": 4,
"sliding_window_discard_last_frames" : 4, "sliding_window_discard_last_frames" : 4,
"sample_solver" : "euler", "sample_solver" : "euler",
"adaptive_switch" : 1, "adaptive_switch" : 1,
@ -258,4 +264,10 @@ class family_handler():
"image_prompt_type": "T", "image_prompt_type": "T",
}) })
if test_multitalk(base_model_type):
ui_defaults["audio_guidance_scale"] = 4
if model_def.get("multiple_submodels", False):
ui_defaults["guidance_phases"] = 2

View File

@ -1,26 +1,80 @@
# thanks Comfyui for the rgb factors # thanks Comfyui for the rgb factors (https://github.com/comfyanonymous/ComfyUI/blob/master/comfy/latent_formats.py)
def get_rgb_factors(model_family): def get_rgb_factors(model_family, model_type = None):
if model_family == "wan": if model_family == "wan":
latent_channels = 16 if model_type =="ti2v_2_2":
latent_dimensions = 3 latent_channels = 48
latent_rgb_factors = [ latent_dimensions = 3
[-0.1299, -0.1692, 0.2932], latent_rgb_factors = [
[ 0.0671, 0.0406, 0.0442], [ 0.0119, 0.0103, 0.0046],
[ 0.3568, 0.2548, 0.1747], [-0.1062, -0.0504, 0.0165],
[ 0.0372, 0.2344, 0.1420], [ 0.0140, 0.0409, 0.0491],
[ 0.0313, 0.0189, -0.0328], [-0.0813, -0.0677, 0.0607],
[ 0.0296, -0.0956, -0.0665], [ 0.0656, 0.0851, 0.0808],
[-0.3477, -0.4059, -0.2925], [ 0.0264, 0.0463, 0.0912],
[ 0.0166, 0.1902, 0.1975], [ 0.0295, 0.0326, 0.0590],
[-0.0412, 0.0267, -0.1364], [-0.0244, -0.0270, 0.0025],
[-0.1293, 0.0740, 0.1636], [ 0.0443, -0.0102, 0.0288],
[ 0.0680, 0.3019, 0.1128], [-0.0465, -0.0090, -0.0205],
[ 0.0032, 0.0581, 0.0639], [ 0.0359, 0.0236, 0.0082],
[-0.1251, 0.0927, 0.1699], [-0.0776, 0.0854, 0.1048],
[ 0.0060, -0.0633, 0.0005], [ 0.0564, 0.0264, 0.0561],
[ 0.3477, 0.2275, 0.2950], [ 0.0006, 0.0594, 0.0418],
[ 0.1984, 0.0913, 0.1861] [-0.0319, -0.0542, -0.0637],
] [-0.0268, 0.0024, 0.0260],
[ 0.0539, 0.0265, 0.0358],
[-0.0359, -0.0312, -0.0287],
[-0.0285, -0.1032, -0.1237],
[ 0.1041, 0.0537, 0.0622],
[-0.0086, -0.0374, -0.0051],
[ 0.0390, 0.0670, 0.2863],
[ 0.0069, 0.0144, 0.0082],
[ 0.0006, -0.0167, 0.0079],
[ 0.0313, -0.0574, -0.0232],
[-0.1454, -0.0902, -0.0481],
[ 0.0714, 0.0827, 0.0447],
[-0.0304, -0.0574, -0.0196],
[ 0.0401, 0.0384, 0.0204],
[-0.0758, -0.0297, -0.0014],
[ 0.0568, 0.1307, 0.1372],
[-0.0055, -0.0310, -0.0380],
[ 0.0239, -0.0305, 0.0325],
[-0.0663, -0.0673, -0.0140],
[-0.0416, -0.0047, -0.0023],
[ 0.0166, 0.0112, -0.0093],
[-0.0211, 0.0011, 0.0331],
[ 0.1833, 0.1466, 0.2250],
[-0.0368, 0.0370, 0.0295],
[-0.3441, -0.3543, -0.2008],
[-0.0479, -0.0489, -0.0420],
[-0.0660, -0.0153, 0.0800],
[-0.0101, 0.0068, 0.0156],
[-0.0690, -0.0452, -0.0927],
[-0.0145, 0.0041, 0.0015],
[ 0.0421, 0.0451, 0.0373],
[ 0.0504, -0.0483, -0.0356],
[-0.0837, 0.0168, 0.0055]
]
else:
latent_channels = 16
latent_dimensions = 3
latent_rgb_factors = [
[-0.1299, -0.1692, 0.2932],
[ 0.0671, 0.0406, 0.0442],
[ 0.3568, 0.2548, 0.1747],
[ 0.0372, 0.2344, 0.1420],
[ 0.0313, 0.0189, -0.0328],
[ 0.0296, -0.0956, -0.0665],
[-0.3477, -0.4059, -0.2925],
[ 0.0166, 0.1902, 0.1975],
[-0.0412, 0.0267, -0.1364],
[-0.1293, 0.0740, 0.1636],
[ 0.0680, 0.3019, 0.1128],
[ 0.0032, 0.0581, 0.0639],
[-0.1251, 0.0927, 0.1699],
[ 0.0060, -0.0633, 0.0005],
[ 0.3477, 0.2275, 0.2950],
[ 0.1984, 0.0913, 0.1861]
]
latent_rgb_factors_bias = [-0.1835, -0.0868, -0.3360] latent_rgb_factors_bias = [-0.1835, -0.0868, -0.3360]

View File

@ -8,7 +8,7 @@ def preparse_loras_multipliers(loras_multipliers):
loras_multipliers = " ".join(loras_mult_choices_list) loras_multipliers = " ".join(loras_mult_choices_list)
return loras_multipliers.split(" ") return loras_multipliers.split(" ")
def expand_slist(slists_dict, mult_no, num_inference_steps, model_switch_step ): def expand_slist(slists_dict, mult_no, num_inference_steps, model_switch_step, model_switch_step2 ):
def expand_one(slist, num_inference_steps): def expand_one(slist, num_inference_steps):
if not isinstance(slist, list): slist = [slist] if not isinstance(slist, list): slist = [slist]
new_slist= [] new_slist= []
@ -23,13 +23,20 @@ def expand_slist(slists_dict, mult_no, num_inference_steps, model_switch_step ):
phase1 = slists_dict["phase1"][mult_no] phase1 = slists_dict["phase1"][mult_no]
phase2 = slists_dict["phase2"][mult_no] phase2 = slists_dict["phase2"][mult_no]
if isinstance(phase1, float) and isinstance(phase2, float) and phase1 == phase2: phase3 = slists_dict["phase3"][mult_no]
return phase1 shared = slists_dict["shared"][mult_no]
return expand_one(phase1, model_switch_step) + expand_one(phase2, num_inference_steps - model_switch_step) if shared:
if isinstance(phase1, float): return phase1
return expand_one(phase1, num_inference_steps)
else:
if isinstance(phase1, float) and isinstance(phase2, float) and isinstance(phase3, float) and phase1 == phase2 and phase2 == phase3: return phase1
return expand_one(phase1, model_switch_step) + expand_one(phase2, model_switch_step2 - model_switch_step) + expand_one(phase3, num_inference_steps - model_switch_step2)
def parse_loras_multipliers(loras_multipliers, nb_loras, num_inference_steps, merge_slist = None, max_phases = 2, model_switch_step = None): def parse_loras_multipliers(loras_multipliers, nb_loras, num_inference_steps, merge_slist = None, nb_phases = 2, model_switch_step = None, model_switch_step2 = None):
if model_switch_step is None: if model_switch_step is None:
model_switch_step = num_inference_steps model_switch_step = num_inference_steps
if model_switch_step2 is None:
model_switch_step2 = num_inference_steps
def is_float(element: any) -> bool: def is_float(element: any) -> bool:
if element is None: if element is None:
return False return False
@ -40,8 +47,11 @@ def parse_loras_multipliers(loras_multipliers, nb_loras, num_inference_steps, me
return False return False
loras_list_mult_choices_nums = [] loras_list_mult_choices_nums = []
slists_dict = { "model_switch_step": model_switch_step} slists_dict = { "model_switch_step": model_switch_step}
slists_dict = { "model_switch_step2": model_switch_step2}
slists_dict["phase1"] = phase1 = [1.] * nb_loras slists_dict["phase1"] = phase1 = [1.] * nb_loras
slists_dict["phase2"] = phase2 = [1.] * nb_loras slists_dict["phase2"] = phase2 = [1.] * nb_loras
slists_dict["phase3"] = phase3 = [1.] * nb_loras
slists_dict["shared"] = shared = [False] * nb_loras
if isinstance(loras_multipliers, list) or len(loras_multipliers) > 0: if isinstance(loras_multipliers, list) or len(loras_multipliers) > 0:
list_mult_choices_list = preparse_loras_multipliers(loras_multipliers)[:nb_loras] list_mult_choices_list = preparse_loras_multipliers(loras_multipliers)[:nb_loras]
@ -51,41 +61,66 @@ def parse_loras_multipliers(loras_multipliers, nb_loras, num_inference_steps, me
mult = mult.strip() mult = mult.strip()
phase_mult = mult.split(";") phase_mult = mult.split(";")
shared_phases = len(phase_mult) <=1 shared_phases = len(phase_mult) <=1
if len(phase_mult) > max_phases: if not shared_phases and len(phase_mult) != nb_phases :
return "", "", f"Loras can not be defined for more than {max_phases} Denoising phase{'s' if max_phases>1 else ''} for this model" return "", "", f"if the ';' syntax is used for one Lora multiplier, the multipliers for its {nb_phases} denoising phases should be specified for this multiplier"
for phase_no, mult in enumerate(phase_mult): for phase_no, mult in enumerate(phase_mult):
if phase_no > 0: current_phase = phase2 if phase_no == 1:
current_phase = phase2
elif phase_no == 2:
current_phase = phase3
if "," in mult: if "," in mult:
multlist = mult.split(",") multlist = mult.split(",")
slist = [] slist = []
for smult in multlist: for smult in multlist:
if not is_float(smult): if not is_float(smult):
return "", "", f"Lora sub value no {i+1} ({smult}) in Multiplier definition '{multlist}' is invalid" return "", "", f"Lora sub value no {i+1} ({smult}) in Multiplier definition '{multlist}' is invalid in Phase {phase_no+1}"
slist.append(float(smult)) slist.append(float(smult))
else: else:
if not is_float(mult): if not is_float(mult):
return "", "", f"Lora Multiplier no {i+1} ({mult}) is invalid" return "", "", f"Lora Multiplier no {i+1} ({mult}) is invalid"
slist = float(mult) slist = float(mult)
if shared_phases: if shared_phases:
phase1[i] = phase2[i] = slist phase1[i] = phase2[i] = phase3[i] = slist
shared[i] = True
else: else:
current_phase[i] = slist current_phase[i] = slist
else: else:
phase1[i] = phase2[i] = float(mult) phase1[i] = phase2[i] = phase3[i] = float(mult)
shared[i] = True
if merge_slist is not None: if merge_slist is not None:
slists_dict["phase1"] = phase1 = merge_slist["phase1"] + phase1 slists_dict["phase1"] = phase1 = merge_slist["phase1"] + phase1
slists_dict["phase2"] = phase2 = merge_slist["phase2"] + phase2 slists_dict["phase2"] = phase2 = merge_slist["phase2"] + phase2
slists_dict["phase3"] = phase3 = merge_slist["phase3"] + phase3
slists_dict["shared"] = shared = merge_slist["shared"] + shared
loras_list_mult_choices_nums = [ expand_slist(slists_dict, i, num_inference_steps, model_switch_step ) for i in range(len(phase1)) ] loras_list_mult_choices_nums = [ expand_slist(slists_dict, i, num_inference_steps, model_switch_step, model_switch_step2 ) for i in range(len(phase1)) ]
loras_list_mult_choices_nums = [ slist[0] if isinstance(slist, list) else slist for slist in loras_list_mult_choices_nums ] loras_list_mult_choices_nums = [ slist[0] if isinstance(slist, list) else slist for slist in loras_list_mult_choices_nums ]
return loras_list_mult_choices_nums, slists_dict, "" return loras_list_mult_choices_nums, slists_dict, ""
def update_loras_slists(trans, slists_dict, num_inference_steps, model_switch_step = None ): def update_loras_slists(trans, slists_dict, num_inference_steps, phase_switch_step = None, phase_switch_step2 = None ):
from mmgp import offload from mmgp import offload
sz = len(slists_dict["phase1"]) sz = len(slists_dict["phase1"])
slists = [ expand_slist(slists_dict, i, num_inference_steps, model_switch_step ) for i in range(sz) ] slists = [ expand_slist(slists_dict, i, num_inference_steps, phase_switch_step, phase_switch_step2 ) for i in range(sz) ]
nos = [str(l) for l in range(sz)] nos = [str(l) for l in range(sz)]
offload.activate_loras(trans, nos, slists ) offload.activate_loras(trans, nos, slists )
def get_model_switch_steps(timesteps, total_num_steps, guide_phases, model_switch_phase, switch_threshold, switch2_threshold ):
model_switch_step = model_switch_step2 = None
for i, t in enumerate(timesteps):
if guide_phases >=2 and model_switch_step is None and t <= switch_threshold: model_switch_step = i
if guide_phases >=3 and model_switch_step2 is None and t <= switch2_threshold: model_switch_step2 = i
if model_switch_step is None: model_switch_step = total_num_steps
if model_switch_step2 is None: model_switch_step2 = total_num_steps
phases_description = ""
if guide_phases > 1:
phases_description = "Denoising Steps: "
phases_description += f" Phase 1 = None" if model_switch_step == 0 else f" Phase 1 = 1:{ min(model_switch_step,total_num_steps) }"
if model_switch_step < total_num_steps:
phases_description += f", Phase 2 = None" if model_switch_step == model_switch_step2 else f", Phase 2 = {model_switch_step +1}:{ min(model_switch_step2,total_num_steps) }"
if guide_phases > 2 and model_switch_step2 < total_num_steps:
phases_description += f", Phase 3 = {model_switch_step2 +1}:{ total_num_steps}"
return model_switch_step, model_switch_step2, phases_description

View File

@ -8,17 +8,22 @@ import sys
import threading import threading
import time import time
import numpy as np import numpy as np
import os
os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1" os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "1"
def generate_notification_beep(volume=50, sample_rate=44100): _cached_waveforms = {}
_sample_rate = 44100
_mixer_initialized = False
_mixer_lock = threading.Lock()
def _generate_notification_beep(volume=50, sample_rate=_sample_rate):
"""Generate pleasant C major chord notification sound""" """Generate pleasant C major chord notification sound"""
if volume == 0: if volume == 0:
return np.array([]) return np.array([])
volume = max(0, min(100, volume)) volume = max(0, min(100, volume))
# Volume curve mapping: 25%->50%, 50%->75%, 75%->100%, 100%->105% # Volume curve mapping
if volume <= 25: if volume <= 25:
volume_mapped = (volume / 25.0) * 0.5 volume_mapped = (volume / 25.0) * 0.5
elif volume <= 50: elif volume <= 50:
@ -26,232 +31,191 @@ def generate_notification_beep(volume=50, sample_rate=44100):
elif volume <= 75: elif volume <= 75:
volume_mapped = 0.75 + ((volume - 50) / 25.0) * 0.25 volume_mapped = 0.75 + ((volume - 50) / 25.0) * 0.25
else: else:
volume_mapped = 1.0 + ((volume - 75) / 25.0) * 0.05 # Only 5% boost instead of 15% volume_mapped = 1.0 + ((volume - 75) / 25.0) * 0.05
volume = volume_mapped volume = volume_mapped
# C major chord frequencies # C major chord frequencies
freq_c = 261.63 # C4 freq_c, freq_e, freq_g = 261.63, 329.63, 392.00
freq_e = 329.63 # E4
freq_g = 392.00 # G4
duration = 0.8 duration = 0.8
t = np.linspace(0, duration, int(sample_rate * duration), False) t = np.linspace(0, duration, int(sample_rate * duration), False)
# Generate chord components # Generate chord components
wave_c = np.sin(freq_c * 2 * np.pi * t) * 0.4 wave = (
wave_e = np.sin(freq_e * 2 * np.pi * t) * 0.3 np.sin(freq_c * 2 * np.pi * t) * 0.4
wave_g = np.sin(freq_g * 2 * np.pi * t) * 0.2 + np.sin(freq_e * 2 * np.pi * t) * 0.3
+ np.sin(freq_g * 2 * np.pi * t) * 0.2
wave = wave_c + wave_e + wave_g )
# Prevent clipping # Normalize
max_amplitude = np.max(np.abs(wave)) max_amplitude = np.max(np.abs(wave))
if max_amplitude > 0: if max_amplitude > 0:
wave = wave / max_amplitude * 0.8 wave = wave / max_amplitude * 0.8
# ADSR envelope # ADSR envelope
def apply_adsr_envelope(wave_data): def apply_adsr_envelope(wave_data):
length = len(wave_data) length = len(wave_data)
attack_time = int(0.2 * length) attack_time = int(0.2 * length)
decay_time = int(0.1 * length) decay_time = int(0.1 * length)
release_time = int(0.5 * length) release_time = int(0.5 * length)
envelope = np.ones(length) envelope = np.ones(length)
if attack_time > 0: if attack_time > 0:
envelope[:attack_time] = np.power(np.linspace(0, 1, attack_time), 3) envelope[:attack_time] = np.power(np.linspace(0, 1, attack_time), 3)
if decay_time > 0: if decay_time > 0:
start_idx = attack_time start_idx, end_idx = attack_time, attack_time + decay_time
end_idx = attack_time + decay_time
envelope[start_idx:end_idx] = np.linspace(1, 0.85, decay_time) envelope[start_idx:end_idx] = np.linspace(1, 0.85, decay_time)
if release_time > 0: if release_time > 0:
start_idx = length - release_time start_idx = length - release_time
envelope[start_idx:] = 0.85 * np.exp(-4 * np.linspace(0, 1, release_time)) envelope[start_idx:] = 0.85 * np.exp(-4 * np.linspace(0, 1, release_time))
return wave_data * envelope return wave_data * envelope
wave = apply_adsr_envelope(wave) wave = apply_adsr_envelope(wave)
# Simple low-pass filter # Simple low-pass filter
def simple_lowpass_filter(signal, cutoff_ratio=0.8): def simple_lowpass_filter(signal, cutoff_ratio=0.8):
window_size = max(3, int(len(signal) * 0.001)) window_size = max(3, int(len(signal) * 0.001))
if window_size % 2 == 0: if window_size % 2 == 0:
window_size += 1 window_size += 1
kernel = np.ones(window_size) / window_size kernel = np.ones(window_size) / window_size
padded = np.pad(signal, window_size//2, mode='edge') padded = np.pad(signal, window_size // 2, mode="edge")
filtered = np.convolve(padded, kernel, mode='same') filtered = np.convolve(padded, kernel, mode="same")
return filtered[window_size//2:-window_size//2] return filtered[window_size // 2 : -window_size // 2]
wave = simple_lowpass_filter(wave) wave = simple_lowpass_filter(wave)
# Add reverb effect # Add reverb
if len(wave) > sample_rate // 4: if len(wave) > sample_rate // 4:
delay_samples = int(0.12 * sample_rate) delay_samples = int(0.12 * sample_rate)
reverb = np.zeros_like(wave) reverb = np.zeros_like(wave)
reverb[delay_samples:] = wave[:-delay_samples] * 0.08 reverb[delay_samples:] = wave[:-delay_samples] * 0.08
wave = wave + reverb wave = wave + reverb
# Apply volume first, then normalize to prevent clipping
wave = wave * volume * 0.5
# Final normalization with safety margin
max_amplitude = np.max(np.abs(wave))
if max_amplitude > 0.85: # If approaching clipping threshold
wave = wave / max_amplitude * 0.85 # More conservative normalization
return wave
_mixer_lock = threading.Lock()
def play_audio_with_pygame(audio_data, sample_rate=44100): # Apply volume & final normalize
""" wave = wave * volume * 0.5
Play audio with clean stereo output - sounds like single notification from both speakers max_amplitude = np.max(np.abs(wave))
""" if max_amplitude > 0.85:
wave = wave / max_amplitude * 0.85
return wave
def _get_cached_waveform(volume):
"""Return cached waveform for volume"""
if volume not in _cached_waveforms:
_cached_waveforms[volume] = _generate_notification_beep(volume)
return _cached_waveforms[volume]
def play_audio_with_pygame(audio_data, sample_rate=_sample_rate):
"""Play audio with pygame backend"""
global _mixer_initialized
try: try:
import pygame import pygame
with _mixer_lock: with _mixer_lock:
if len(audio_data) == 0: if not _mixer_initialized:
return False pygame.mixer.pre_init(frequency=sample_rate, size=-16, channels=2, buffer=512)
pygame.mixer.init()
# Clean mixer initialization - quit any existing mixer first _mixer_initialized = True
if pygame.mixer.get_init() is not None:
pygame.mixer.quit()
time.sleep(0.2) # Longer pause to ensure clean shutdown
# Initialize fresh mixer
pygame.mixer.pre_init(
frequency=sample_rate,
size=-16,
channels=2,
buffer=512 # Smaller buffer to reduce latency/doubling
)
pygame.mixer.init()
# Verify clean initialization
mixer_info = pygame.mixer.get_init() mixer_info = pygame.mixer.get_init()
if mixer_info is None or mixer_info[2] != 2: if mixer_info is None or mixer_info[2] != 2:
return False return False
# Prepare audio - ensure clean conversion
audio_int16 = (audio_data * 32767).astype(np.int16) audio_int16 = (audio_data * 32767).astype(np.int16)
if len(audio_int16.shape) > 1: if len(audio_int16.shape) > 1:
audio_int16 = audio_int16.flatten() audio_int16 = audio_int16.flatten()
# Create clean stereo with identical channels
stereo_data = np.zeros((len(audio_int16), 2), dtype=np.int16) stereo_data = np.zeros((len(audio_int16), 2), dtype=np.int16)
stereo_data[:, 0] = audio_int16 # Left channel stereo_data[:, 0] = audio_int16
stereo_data[:, 1] = audio_int16 # Right channel stereo_data[:, 1] = audio_int16
# Create sound and play once
sound = pygame.sndarray.make_sound(stereo_data) sound = pygame.sndarray.make_sound(stereo_data)
pygame.mixer.stop()
# Ensure only one playback
pygame.mixer.stop() # Stop any previous sounds
sound.play() sound.play()
# Wait for completion
duration_ms = int(len(audio_data) / sample_rate * 1000) + 50 duration_ms = int(len(audio_data) / sample_rate * 1000) + 50
pygame.time.wait(duration_ms) pygame.time.wait(duration_ms)
return True return True
except ImportError: except ImportError:
return False return False
except Exception as e: except Exception as e:
print(f"Pygame clean error: {e}") print(f"Pygame error: {e}")
return False return False
def play_audio_with_sounddevice(audio_data, sample_rate=44100): def play_audio_with_sounddevice(audio_data, sample_rate=_sample_rate):
"""Play audio using sounddevice backend""" """Play audio using sounddevice backend"""
try: try:
import sounddevice as sd import sounddevice as sd
sd.play(audio_data, sample_rate) sd.play(audio_data, sample_rate)
sd.wait() sd.wait()
return True return True
except ImportError: except ImportError:
return False return False
except Exception as e: except Exception as e:
print(f"Sounddevice error: {e}") print(f"Sounddevice error: {e}")
return False return False
def play_audio_with_winsound(audio_data, sample_rate=_sample_rate):
def play_audio_with_winsound(audio_data, sample_rate=44100):
"""Play audio using winsound backend (Windows only)""" """Play audio using winsound backend (Windows only)"""
if sys.platform != "win32": if sys.platform != "win32":
return False return False
try: try:
import winsound import winsound, wave, tempfile, uuid
import wave
import tempfile
import uuid
temp_dir = tempfile.gettempdir() temp_dir = tempfile.gettempdir()
temp_filename = os.path.join(temp_dir, f"notification_{uuid.uuid4().hex}.wav") temp_filename = os.path.join(temp_dir, f"notification_{uuid.uuid4().hex}.wav")
try: try:
with wave.open(temp_filename, 'w') as wav_file: with wave.open(temp_filename, "w") as wav_file:
wav_file.setnchannels(1) wav_file.setnchannels(1)
wav_file.setsampwidth(2) wav_file.setsampwidth(2)
wav_file.setframerate(sample_rate) wav_file.setframerate(sample_rate)
audio_int16 = (audio_data * 32767).astype(np.int16) audio_int16 = (audio_data * 32767).astype(np.int16)
wav_file.writeframes(audio_int16.tobytes()) wav_file.writeframes(audio_int16.tobytes())
winsound.PlaySound(temp_filename, winsound.SND_FILENAME) winsound.PlaySound(temp_filename, winsound.SND_FILENAME)
finally: finally:
# Clean up temp file try:
for _ in range(3): if os.path.exists(temp_filename):
try: os.unlink(temp_filename)
if os.path.exists(temp_filename): except:
os.unlink(temp_filename) pass
break
except:
time.sleep(0.1)
return True return True
except ImportError: except ImportError:
return False return False
except Exception as e: except Exception as e:
print(f"Winsound error: {e}") print(f"Winsound error: {e}")
return False return False
def play_notification_sound(volume=50): def play_notification_sound(volume=50):
"""Play notification sound with specified volume""" """Play notification sound with specified volume"""
if volume == 0: if volume == 0:
return return
audio_data = generate_notification_beep(volume=volume) audio_data = _get_cached_waveform(volume)
if len(audio_data) == 0: if len(audio_data) == 0:
return return
# Try audio backends in order audio_backends = [play_audio_with_pygame, play_audio_with_sounddevice, play_audio_with_winsound]
audio_backends = [
play_audio_with_pygame,
play_audio_with_sounddevice,
play_audio_with_winsound,
]
for backend in audio_backends: for backend in audio_backends:
try: try:
if backend(audio_data): if backend(audio_data):
return return
except Exception as e: except Exception:
continue continue
# Fallback: terminal beep
print(f"All audio backends failed, using terminal beep")
print('\a')
print("All audio backends failed, using terminal beep")
print("\a")
def play_notification_async(volume=50): def play_notification_async(volume=50):
"""Play notification sound asynchronously (non-blocking)""" """Play notification sound asynchronously (non-blocking)"""
@ -260,24 +224,12 @@ def play_notification_async(volume=50):
play_notification_sound(volume) play_notification_sound(volume)
except Exception as e: except Exception as e:
print(f"Error playing notification sound: {e}") print(f"Error playing notification sound: {e}")
sound_thread = threading.Thread(target=play_sound, daemon=True)
sound_thread.start()
threading.Thread(target=play_sound, daemon=True).start()
def notify_video_completion(video_path=None, volume=50): def notify_video_completion(video_path=None, volume=50):
"""Notify about completed video generation""" """Notify about completed video generation"""
play_notification_async(volume) play_notification_async(volume)
for vol in (25, 50, 75, 100):
if __name__ == "__main__": _get_cached_waveform(vol)
print("Testing notification sounds with different volumes...")
print("Auto-detecting available audio backends...")
volumes = [25, 50, 75, 100]
for vol in volumes:
print(f"Testing volume {vol}%:")
play_notification_sound(vol)
time.sleep(2)
print("Test completed!")

View File

@ -1,6 +1,6 @@
import re import re
def process_template(input_text): def process_template(input_text, keep_comments = False):
""" """
Process a text template with macro instructions and variable substitution. Process a text template with macro instructions and variable substitution.
Supports multiple values for variables to generate multiple output versions. Supports multiple values for variables to generate multiple output versions.
@ -28,9 +28,12 @@ def process_template(input_text):
line_number += 1 line_number += 1
# Skip empty lines or comments # Skip empty lines or comments
if not line or line.startswith('#'): if not line:
continue continue
if line.startswith('#') and not keep_comments:
continue
# Handle macro instructions # Handle macro instructions
if line.startswith('!'): if line.startswith('!'):
# Process any accumulated template lines before starting a new macro # Process any accumulated template lines before starting a new macro
@ -106,13 +109,14 @@ def process_template(input_text):
# Handle template lines # Handle template lines
else: else:
# Check for unknown variables in template line if not line.startswith('#'):
var_references = re.findall(r'\{([^}]+)\}', line) # Check for unknown variables in template line
for var_ref in var_references: var_references = re.findall(r'\{([^}]+)\}', line)
if var_ref not in current_variables: for var_ref in var_references:
error_message = f"Unknown variable '{{{var_ref}}}' in template\nLine: '{orig_line}'" if var_ref not in current_variables:
return "", error_message error_message = f"Unknown variable '{{{var_ref}}}' in template\nLine: '{orig_line}'"
return "", error_message
# Add to current template lines # Add to current template lines
current_template_lines.append(line) current_template_lines.append(line)

738
wgp.py

File diff suppressed because it is too large Load Diff