mirror of
https://github.com/guoyww/AnimateDiff.git
synced 2026-04-03 17:56:15 +02:00
support sdxl
This commit is contained in:
BIN
animatediff/.DS_Store
vendored
Normal file
BIN
animatediff/.DS_Store
vendored
Normal file
Binary file not shown.
@@ -1,98 +0,0 @@
|
||||
import os, io, csv, math, random
|
||||
import numpy as np
|
||||
from einops import rearrange
|
||||
from decord import VideoReader
|
||||
|
||||
import torch
|
||||
import torchvision.transforms as transforms
|
||||
from torch.utils.data.dataset import Dataset
|
||||
from animatediff.utils.util import zero_rank_print
|
||||
|
||||
|
||||
|
||||
class WebVid10M(Dataset):
|
||||
def __init__(
|
||||
self,
|
||||
csv_path, video_folder,
|
||||
sample_size=256, sample_stride=4, sample_n_frames=16,
|
||||
is_image=False,
|
||||
):
|
||||
zero_rank_print(f"loading annotations from {csv_path} ...")
|
||||
with open(csv_path, 'r') as csvfile:
|
||||
self.dataset = list(csv.DictReader(csvfile))
|
||||
self.length = len(self.dataset)
|
||||
zero_rank_print(f"data scale: {self.length}")
|
||||
|
||||
self.video_folder = video_folder
|
||||
self.sample_stride = sample_stride
|
||||
self.sample_n_frames = sample_n_frames
|
||||
self.is_image = is_image
|
||||
|
||||
sample_size = tuple(sample_size) if not isinstance(sample_size, int) else (sample_size, sample_size)
|
||||
self.pixel_transforms = transforms.Compose([
|
||||
transforms.RandomHorizontalFlip(),
|
||||
transforms.Resize(sample_size[0]),
|
||||
transforms.CenterCrop(sample_size),
|
||||
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5], inplace=True),
|
||||
])
|
||||
|
||||
def get_batch(self, idx):
|
||||
video_dict = self.dataset[idx]
|
||||
videoid, name, page_dir = video_dict['videoid'], video_dict['name'], video_dict['page_dir']
|
||||
|
||||
video_dir = os.path.join(self.video_folder, f"{videoid}.mp4")
|
||||
video_reader = VideoReader(video_dir)
|
||||
video_length = len(video_reader)
|
||||
|
||||
if not self.is_image:
|
||||
clip_length = min(video_length, (self.sample_n_frames - 1) * self.sample_stride + 1)
|
||||
start_idx = random.randint(0, video_length - clip_length)
|
||||
batch_index = np.linspace(start_idx, start_idx + clip_length - 1, self.sample_n_frames, dtype=int)
|
||||
else:
|
||||
batch_index = [random.randint(0, video_length - 1)]
|
||||
|
||||
pixel_values = torch.from_numpy(video_reader.get_batch(batch_index).asnumpy()).permute(0, 3, 1, 2).contiguous()
|
||||
pixel_values = pixel_values / 255.
|
||||
del video_reader
|
||||
|
||||
if self.is_image:
|
||||
pixel_values = pixel_values[0]
|
||||
|
||||
return pixel_values, name
|
||||
|
||||
def __len__(self):
|
||||
return self.length
|
||||
|
||||
def __getitem__(self, idx):
|
||||
while True:
|
||||
try:
|
||||
pixel_values, name = self.get_batch(idx)
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
idx = random.randint(0, self.length-1)
|
||||
|
||||
pixel_values = self.pixel_transforms(pixel_values)
|
||||
sample = dict(pixel_values=pixel_values, text=name)
|
||||
return sample
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from animatediff.utils.util import save_videos_grid
|
||||
|
||||
dataset = WebVid10M(
|
||||
csv_path="/mnt/petrelfs/guoyuwei/projects/datasets/webvid/results_2M_val.csv",
|
||||
video_folder="/mnt/petrelfs/guoyuwei/projects/datasets/webvid/2M_val",
|
||||
sample_size=256,
|
||||
sample_stride=4, sample_n_frames=16,
|
||||
is_image=True,
|
||||
)
|
||||
import pdb
|
||||
pdb.set_trace()
|
||||
|
||||
dataloader = torch.utils.data.DataLoader(dataset, batch_size=4, num_workers=16,)
|
||||
for idx, batch in enumerate(dataloader):
|
||||
print(batch["pixel_values"].shape, len(batch["text"]))
|
||||
# for i in range(batch["pixel_values"].shape[0]):
|
||||
# save_videos_grid(batch["pixel_values"][i:i+1].permute(0,2,1,3,4), os.path.join(".", f"{idx}-{i}.mp4"), rescale=True)
|
||||
@@ -1,300 +0,0 @@
|
||||
# Adapted from https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/attention.py
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
import torch
|
||||
import torch.nn.functional as F
|
||||
from torch import nn
|
||||
|
||||
from diffusers.configuration_utils import ConfigMixin, register_to_config
|
||||
from diffusers.modeling_utils import ModelMixin
|
||||
from diffusers.utils import BaseOutput
|
||||
from diffusers.utils.import_utils import is_xformers_available
|
||||
from diffusers.models.attention import CrossAttention, FeedForward, AdaLayerNorm
|
||||
|
||||
from einops import rearrange, repeat
|
||||
import pdb
|
||||
|
||||
@dataclass
|
||||
class Transformer3DModelOutput(BaseOutput):
|
||||
sample: torch.FloatTensor
|
||||
|
||||
|
||||
if is_xformers_available():
|
||||
import xformers
|
||||
import xformers.ops
|
||||
else:
|
||||
xformers = None
|
||||
|
||||
|
||||
class Transformer3DModel(ModelMixin, ConfigMixin):
|
||||
@register_to_config
|
||||
def __init__(
|
||||
self,
|
||||
num_attention_heads: int = 16,
|
||||
attention_head_dim: int = 88,
|
||||
in_channels: Optional[int] = None,
|
||||
num_layers: int = 1,
|
||||
dropout: float = 0.0,
|
||||
norm_num_groups: int = 32,
|
||||
cross_attention_dim: Optional[int] = None,
|
||||
attention_bias: bool = False,
|
||||
activation_fn: str = "geglu",
|
||||
num_embeds_ada_norm: Optional[int] = None,
|
||||
use_linear_projection: bool = False,
|
||||
only_cross_attention: bool = False,
|
||||
upcast_attention: bool = False,
|
||||
|
||||
unet_use_cross_frame_attention=None,
|
||||
unet_use_temporal_attention=None,
|
||||
):
|
||||
super().__init__()
|
||||
self.use_linear_projection = use_linear_projection
|
||||
self.num_attention_heads = num_attention_heads
|
||||
self.attention_head_dim = attention_head_dim
|
||||
inner_dim = num_attention_heads * attention_head_dim
|
||||
|
||||
# Define input layers
|
||||
self.in_channels = in_channels
|
||||
|
||||
self.norm = torch.nn.GroupNorm(num_groups=norm_num_groups, num_channels=in_channels, eps=1e-6, affine=True)
|
||||
if use_linear_projection:
|
||||
self.proj_in = nn.Linear(in_channels, inner_dim)
|
||||
else:
|
||||
self.proj_in = nn.Conv2d(in_channels, inner_dim, kernel_size=1, stride=1, padding=0)
|
||||
|
||||
# Define transformers blocks
|
||||
self.transformer_blocks = nn.ModuleList(
|
||||
[
|
||||
BasicTransformerBlock(
|
||||
inner_dim,
|
||||
num_attention_heads,
|
||||
attention_head_dim,
|
||||
dropout=dropout,
|
||||
cross_attention_dim=cross_attention_dim,
|
||||
activation_fn=activation_fn,
|
||||
num_embeds_ada_norm=num_embeds_ada_norm,
|
||||
attention_bias=attention_bias,
|
||||
only_cross_attention=only_cross_attention,
|
||||
upcast_attention=upcast_attention,
|
||||
|
||||
unet_use_cross_frame_attention=unet_use_cross_frame_attention,
|
||||
unet_use_temporal_attention=unet_use_temporal_attention,
|
||||
)
|
||||
for d in range(num_layers)
|
||||
]
|
||||
)
|
||||
|
||||
# 4. Define output layers
|
||||
if use_linear_projection:
|
||||
self.proj_out = nn.Linear(in_channels, inner_dim)
|
||||
else:
|
||||
self.proj_out = nn.Conv2d(inner_dim, in_channels, kernel_size=1, stride=1, padding=0)
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None, timestep=None, return_dict: bool = True):
|
||||
# Input
|
||||
assert hidden_states.dim() == 5, f"Expected hidden_states to have ndim=5, but got ndim={hidden_states.dim()}."
|
||||
video_length = hidden_states.shape[2]
|
||||
hidden_states = rearrange(hidden_states, "b c f h w -> (b f) c h w")
|
||||
encoder_hidden_states = repeat(encoder_hidden_states, 'b n c -> (b f) n c', f=video_length)
|
||||
|
||||
batch, channel, height, weight = hidden_states.shape
|
||||
residual = hidden_states
|
||||
|
||||
hidden_states = self.norm(hidden_states)
|
||||
if not self.use_linear_projection:
|
||||
hidden_states = self.proj_in(hidden_states)
|
||||
inner_dim = hidden_states.shape[1]
|
||||
hidden_states = hidden_states.permute(0, 2, 3, 1).reshape(batch, height * weight, inner_dim)
|
||||
else:
|
||||
inner_dim = hidden_states.shape[1]
|
||||
hidden_states = hidden_states.permute(0, 2, 3, 1).reshape(batch, height * weight, inner_dim)
|
||||
hidden_states = self.proj_in(hidden_states)
|
||||
|
||||
# Blocks
|
||||
for block in self.transformer_blocks:
|
||||
hidden_states = block(
|
||||
hidden_states,
|
||||
encoder_hidden_states=encoder_hidden_states,
|
||||
timestep=timestep,
|
||||
video_length=video_length
|
||||
)
|
||||
|
||||
# Output
|
||||
if not self.use_linear_projection:
|
||||
hidden_states = (
|
||||
hidden_states.reshape(batch, height, weight, inner_dim).permute(0, 3, 1, 2).contiguous()
|
||||
)
|
||||
hidden_states = self.proj_out(hidden_states)
|
||||
else:
|
||||
hidden_states = self.proj_out(hidden_states)
|
||||
hidden_states = (
|
||||
hidden_states.reshape(batch, height, weight, inner_dim).permute(0, 3, 1, 2).contiguous()
|
||||
)
|
||||
|
||||
output = hidden_states + residual
|
||||
|
||||
output = rearrange(output, "(b f) c h w -> b c f h w", f=video_length)
|
||||
if not return_dict:
|
||||
return (output,)
|
||||
|
||||
return Transformer3DModelOutput(sample=output)
|
||||
|
||||
|
||||
class BasicTransformerBlock(nn.Module):
|
||||
def __init__(
|
||||
self,
|
||||
dim: int,
|
||||
num_attention_heads: int,
|
||||
attention_head_dim: int,
|
||||
dropout=0.0,
|
||||
cross_attention_dim: Optional[int] = None,
|
||||
activation_fn: str = "geglu",
|
||||
num_embeds_ada_norm: Optional[int] = None,
|
||||
attention_bias: bool = False,
|
||||
only_cross_attention: bool = False,
|
||||
upcast_attention: bool = False,
|
||||
|
||||
unet_use_cross_frame_attention = None,
|
||||
unet_use_temporal_attention = None,
|
||||
):
|
||||
super().__init__()
|
||||
self.only_cross_attention = only_cross_attention
|
||||
self.use_ada_layer_norm = num_embeds_ada_norm is not None
|
||||
self.unet_use_cross_frame_attention = unet_use_cross_frame_attention
|
||||
self.unet_use_temporal_attention = unet_use_temporal_attention
|
||||
|
||||
# SC-Attn
|
||||
assert unet_use_cross_frame_attention is not None
|
||||
if unet_use_cross_frame_attention:
|
||||
self.attn1 = SparseCausalAttention2D(
|
||||
query_dim=dim,
|
||||
heads=num_attention_heads,
|
||||
dim_head=attention_head_dim,
|
||||
dropout=dropout,
|
||||
bias=attention_bias,
|
||||
cross_attention_dim=cross_attention_dim if only_cross_attention else None,
|
||||
upcast_attention=upcast_attention,
|
||||
)
|
||||
else:
|
||||
self.attn1 = CrossAttention(
|
||||
query_dim=dim,
|
||||
heads=num_attention_heads,
|
||||
dim_head=attention_head_dim,
|
||||
dropout=dropout,
|
||||
bias=attention_bias,
|
||||
upcast_attention=upcast_attention,
|
||||
)
|
||||
self.norm1 = AdaLayerNorm(dim, num_embeds_ada_norm) if self.use_ada_layer_norm else nn.LayerNorm(dim)
|
||||
|
||||
# Cross-Attn
|
||||
if cross_attention_dim is not None:
|
||||
self.attn2 = CrossAttention(
|
||||
query_dim=dim,
|
||||
cross_attention_dim=cross_attention_dim,
|
||||
heads=num_attention_heads,
|
||||
dim_head=attention_head_dim,
|
||||
dropout=dropout,
|
||||
bias=attention_bias,
|
||||
upcast_attention=upcast_attention,
|
||||
)
|
||||
else:
|
||||
self.attn2 = None
|
||||
|
||||
if cross_attention_dim is not None:
|
||||
self.norm2 = AdaLayerNorm(dim, num_embeds_ada_norm) if self.use_ada_layer_norm else nn.LayerNorm(dim)
|
||||
else:
|
||||
self.norm2 = None
|
||||
|
||||
# Feed-forward
|
||||
self.ff = FeedForward(dim, dropout=dropout, activation_fn=activation_fn)
|
||||
self.norm3 = nn.LayerNorm(dim)
|
||||
|
||||
# Temp-Attn
|
||||
assert unet_use_temporal_attention is not None
|
||||
if unet_use_temporal_attention:
|
||||
self.attn_temp = CrossAttention(
|
||||
query_dim=dim,
|
||||
heads=num_attention_heads,
|
||||
dim_head=attention_head_dim,
|
||||
dropout=dropout,
|
||||
bias=attention_bias,
|
||||
upcast_attention=upcast_attention,
|
||||
)
|
||||
nn.init.zeros_(self.attn_temp.to_out[0].weight.data)
|
||||
self.norm_temp = AdaLayerNorm(dim, num_embeds_ada_norm) if self.use_ada_layer_norm else nn.LayerNorm(dim)
|
||||
|
||||
def set_use_memory_efficient_attention_xformers(self, use_memory_efficient_attention_xformers: bool):
|
||||
if not is_xformers_available():
|
||||
print("Here is how to install it")
|
||||
raise ModuleNotFoundError(
|
||||
"Refer to https://github.com/facebookresearch/xformers for more information on how to install"
|
||||
" xformers",
|
||||
name="xformers",
|
||||
)
|
||||
elif not torch.cuda.is_available():
|
||||
raise ValueError(
|
||||
"torch.cuda.is_available() should be True but is False. xformers' memory efficient attention is only"
|
||||
" available for GPU "
|
||||
)
|
||||
else:
|
||||
try:
|
||||
# Make sure we can run the memory efficient attention
|
||||
_ = xformers.ops.memory_efficient_attention(
|
||||
torch.randn((1, 2, 40), device="cuda"),
|
||||
torch.randn((1, 2, 40), device="cuda"),
|
||||
torch.randn((1, 2, 40), device="cuda"),
|
||||
)
|
||||
except Exception as e:
|
||||
raise e
|
||||
self.attn1._use_memory_efficient_attention_xformers = use_memory_efficient_attention_xformers
|
||||
if self.attn2 is not None:
|
||||
self.attn2._use_memory_efficient_attention_xformers = use_memory_efficient_attention_xformers
|
||||
# self.attn_temp._use_memory_efficient_attention_xformers = use_memory_efficient_attention_xformers
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None, timestep=None, attention_mask=None, video_length=None):
|
||||
# SparseCausal-Attention
|
||||
norm_hidden_states = (
|
||||
self.norm1(hidden_states, timestep) if self.use_ada_layer_norm else self.norm1(hidden_states)
|
||||
)
|
||||
|
||||
# if self.only_cross_attention:
|
||||
# hidden_states = (
|
||||
# self.attn1(norm_hidden_states, encoder_hidden_states, attention_mask=attention_mask) + hidden_states
|
||||
# )
|
||||
# else:
|
||||
# hidden_states = self.attn1(norm_hidden_states, attention_mask=attention_mask, video_length=video_length) + hidden_states
|
||||
|
||||
# pdb.set_trace()
|
||||
if self.unet_use_cross_frame_attention:
|
||||
hidden_states = self.attn1(norm_hidden_states, attention_mask=attention_mask, video_length=video_length) + hidden_states
|
||||
else:
|
||||
hidden_states = self.attn1(norm_hidden_states, attention_mask=attention_mask) + hidden_states
|
||||
|
||||
if self.attn2 is not None:
|
||||
# Cross-Attention
|
||||
norm_hidden_states = (
|
||||
self.norm2(hidden_states, timestep) if self.use_ada_layer_norm else self.norm2(hidden_states)
|
||||
)
|
||||
hidden_states = (
|
||||
self.attn2(
|
||||
norm_hidden_states, encoder_hidden_states=encoder_hidden_states, attention_mask=attention_mask
|
||||
)
|
||||
+ hidden_states
|
||||
)
|
||||
|
||||
# Feed-forward
|
||||
hidden_states = self.ff(self.norm3(hidden_states)) + hidden_states
|
||||
|
||||
# Temporal-Attention
|
||||
if self.unet_use_temporal_attention:
|
||||
d = hidden_states.shape[1]
|
||||
hidden_states = rearrange(hidden_states, "(b f) d c -> (b d) f c", f=video_length)
|
||||
norm_hidden_states = (
|
||||
self.norm_temp(hidden_states, timestep) if self.use_ada_layer_norm else self.norm_temp(hidden_states)
|
||||
)
|
||||
hidden_states = self.attn_temp(norm_hidden_states) + hidden_states
|
||||
hidden_states = rearrange(hidden_states, "(b d) f c -> (b f) d c", d=d)
|
||||
|
||||
return hidden_states
|
||||
@@ -1,5 +1,5 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Optional, Tuple, Union
|
||||
from typing import Callable, List, Optional, Tuple, Union
|
||||
|
||||
import torch
|
||||
import numpy as np
|
||||
@@ -8,324 +8,418 @@ from torch import nn
|
||||
import torchvision
|
||||
|
||||
from diffusers.configuration_utils import ConfigMixin, register_to_config
|
||||
from diffusers.modeling_utils import ModelMixin
|
||||
from diffusers.models.modeling_utils import ModelMixin
|
||||
from diffusers.utils import BaseOutput
|
||||
from diffusers.utils.import_utils import is_xformers_available
|
||||
from diffusers.models.attention import CrossAttention, FeedForward
|
||||
from diffusers.models.attention_processor import Attention
|
||||
from diffusers.models.attention import FeedForward
|
||||
|
||||
from animatediff.utils.util import zero_rank_print
|
||||
|
||||
from einops import rearrange, repeat
|
||||
import math
|
||||
import math, pdb
|
||||
import random
|
||||
|
||||
|
||||
def zero_module(module):
|
||||
# Zero out the parameters of a module and return it.
|
||||
for p in module.parameters():
|
||||
p.detach().zero_()
|
||||
return module
|
||||
# Zero out the parameters of a module and return it.
|
||||
for p in module.parameters():
|
||||
p.detach().zero_()
|
||||
return module
|
||||
|
||||
|
||||
@dataclass
|
||||
class TemporalTransformer3DModelOutput(BaseOutput):
|
||||
sample: torch.FloatTensor
|
||||
|
||||
|
||||
if is_xformers_available():
|
||||
import xformers
|
||||
import xformers.ops
|
||||
else:
|
||||
xformers = None
|
||||
sample: torch.FloatTensor
|
||||
|
||||
|
||||
def get_motion_module(
|
||||
in_channels,
|
||||
motion_module_type: str,
|
||||
motion_module_kwargs: dict
|
||||
in_channels,
|
||||
motion_module_type: str,
|
||||
motion_module_kwargs: dict
|
||||
):
|
||||
if motion_module_type == "Vanilla":
|
||||
return VanillaTemporalModule(in_channels=in_channels, **motion_module_kwargs,)
|
||||
else:
|
||||
raise ValueError
|
||||
|
||||
if motion_module_type == "Vanilla":
|
||||
return VanillaTemporalModule(in_channels=in_channels, **motion_module_kwargs)
|
||||
elif motion_module_type == "Conv":
|
||||
return ConvTemporalModule(in_channels=in_channels, **motion_module_kwargs)
|
||||
else:
|
||||
raise ValueError
|
||||
|
||||
class VanillaTemporalModule(nn.Module):
|
||||
def __init__(
|
||||
self,
|
||||
in_channels,
|
||||
num_attention_heads = 8,
|
||||
num_transformer_block = 2,
|
||||
attention_block_types =( "Temporal_Self", "Temporal_Self" ),
|
||||
cross_frame_attention_mode = None,
|
||||
temporal_position_encoding = False,
|
||||
temporal_position_encoding_max_len = 24,
|
||||
temporal_attention_dim_div = 1,
|
||||
zero_initialize = True,
|
||||
):
|
||||
super().__init__()
|
||||
|
||||
self.temporal_transformer = TemporalTransformer3DModel(
|
||||
in_channels=in_channels,
|
||||
num_attention_heads=num_attention_heads,
|
||||
attention_head_dim=in_channels // num_attention_heads // temporal_attention_dim_div,
|
||||
num_layers=num_transformer_block,
|
||||
attention_block_types=attention_block_types,
|
||||
cross_frame_attention_mode=cross_frame_attention_mode,
|
||||
temporal_position_encoding=temporal_position_encoding,
|
||||
temporal_position_encoding_max_len=temporal_position_encoding_max_len,
|
||||
)
|
||||
|
||||
if zero_initialize:
|
||||
self.temporal_transformer.proj_out = zero_module(self.temporal_transformer.proj_out)
|
||||
def __init__(
|
||||
self,
|
||||
in_channels,
|
||||
num_attention_heads = 8,
|
||||
num_transformer_block = 2,
|
||||
attention_block_types =( "Temporal_Self", ),
|
||||
spatial_position_encoding = False,
|
||||
temporal_position_encoding = True,
|
||||
temporal_position_encoding_max_len = 32,
|
||||
temporal_attention_dim_div = 1,
|
||||
zero_initialize = True,
|
||||
|
||||
causal_temporal_attention = False,
|
||||
causal_temporal_attention_mask_type = "",
|
||||
):
|
||||
super().__init__()
|
||||
|
||||
self.temporal_transformer = TemporalTransformer3DModel(
|
||||
in_channels=in_channels,
|
||||
num_attention_heads=num_attention_heads,
|
||||
attention_head_dim=in_channels // num_attention_heads // temporal_attention_dim_div,
|
||||
num_layers=num_transformer_block,
|
||||
attention_block_types=attention_block_types,
|
||||
temporal_position_encoding=temporal_position_encoding,
|
||||
temporal_position_encoding_max_len=temporal_position_encoding_max_len,
|
||||
spatial_position_encoding = spatial_position_encoding,
|
||||
causal_temporal_attention=causal_temporal_attention,
|
||||
causal_temporal_attention_mask_type=causal_temporal_attention_mask_type,
|
||||
)
|
||||
|
||||
if zero_initialize:
|
||||
self.temporal_transformer.proj_out = zero_module(self.temporal_transformer.proj_out)
|
||||
|
||||
def forward(self, input_tensor, temb, encoder_hidden_states, attention_mask=None, anchor_frame_idx=None):
|
||||
hidden_states = input_tensor
|
||||
hidden_states = self.temporal_transformer(hidden_states, encoder_hidden_states, attention_mask)
|
||||
def forward(self, input_tensor, temb=None, encoder_hidden_states=None, attention_mask=None):
|
||||
hidden_states = input_tensor
|
||||
hidden_states = self.temporal_transformer(hidden_states, encoder_hidden_states, attention_mask)
|
||||
|
||||
output = hidden_states
|
||||
return output
|
||||
output = hidden_states
|
||||
return output
|
||||
|
||||
|
||||
class TemporalTransformer3DModel(nn.Module):
|
||||
def __init__(
|
||||
self,
|
||||
in_channels,
|
||||
num_attention_heads,
|
||||
attention_head_dim,
|
||||
class TemporalTransformer3DModel(nn.Module):
|
||||
def __init__(
|
||||
self,
|
||||
in_channels,
|
||||
num_attention_heads,
|
||||
attention_head_dim,
|
||||
num_layers,
|
||||
attention_block_types = ( "Temporal_Self", "Temporal_Self", ),
|
||||
dropout = 0.0,
|
||||
norm_num_groups = 32,
|
||||
cross_attention_dim = 768,
|
||||
activation_fn = "geglu",
|
||||
attention_bias = False,
|
||||
upcast_attention = False,
|
||||
temporal_position_encoding = False,
|
||||
temporal_position_encoding_max_len = 32,
|
||||
spatial_position_encoding = False,
|
||||
|
||||
causal_temporal_attention = None,
|
||||
causal_temporal_attention_mask_type = "",
|
||||
):
|
||||
super().__init__()
|
||||
assert causal_temporal_attention is not None
|
||||
self.causal_temporal_attention = causal_temporal_attention
|
||||
|
||||
num_layers,
|
||||
attention_block_types = ( "Temporal_Self", "Temporal_Self", ),
|
||||
dropout = 0.0,
|
||||
norm_num_groups = 32,
|
||||
cross_attention_dim = 768,
|
||||
activation_fn = "geglu",
|
||||
attention_bias = False,
|
||||
upcast_attention = False,
|
||||
|
||||
cross_frame_attention_mode = None,
|
||||
temporal_position_encoding = False,
|
||||
temporal_position_encoding_max_len = 24,
|
||||
):
|
||||
super().__init__()
|
||||
assert (not causal_temporal_attention) or (causal_temporal_attention_mask_type != "")
|
||||
self.causal_temporal_attention_mask_type = causal_temporal_attention_mask_type
|
||||
self.causal_temporal_attention_mask = None
|
||||
self.spatial_position_encoding = spatial_position_encoding
|
||||
inner_dim = num_attention_heads * attention_head_dim
|
||||
|
||||
inner_dim = num_attention_heads * attention_head_dim
|
||||
self.norm = torch.nn.GroupNorm(num_groups=norm_num_groups, num_channels=in_channels, eps=1e-6, affine=True)
|
||||
self.proj_in = nn.Linear(in_channels, inner_dim)
|
||||
if spatial_position_encoding:
|
||||
self.pos_encoder_2d = PositionalEncoding2D(inner_dim)
|
||||
|
||||
|
||||
self.norm = torch.nn.GroupNorm(num_groups=norm_num_groups, num_channels=in_channels, eps=1e-6, affine=True)
|
||||
self.proj_in = nn.Linear(in_channels, inner_dim)
|
||||
self.transformer_blocks = nn.ModuleList(
|
||||
[
|
||||
TemporalTransformerBlock(
|
||||
dim=inner_dim,
|
||||
num_attention_heads=num_attention_heads,
|
||||
attention_head_dim=attention_head_dim,
|
||||
attention_block_types=attention_block_types,
|
||||
dropout=dropout,
|
||||
norm_num_groups=norm_num_groups,
|
||||
cross_attention_dim=cross_attention_dim,
|
||||
activation_fn=activation_fn,
|
||||
attention_bias=attention_bias,
|
||||
upcast_attention=upcast_attention,
|
||||
temporal_position_encoding=temporal_position_encoding,
|
||||
temporal_position_encoding_max_len=temporal_position_encoding_max_len,
|
||||
)
|
||||
for d in range(num_layers)
|
||||
]
|
||||
)
|
||||
self.proj_out = nn.Linear(inner_dim, in_channels)
|
||||
|
||||
def get_causal_temporal_attention_mask(self, hidden_states):
|
||||
batch_size, sequence_length, dim = hidden_states.shape
|
||||
|
||||
if self.causal_temporal_attention_mask is None or self.causal_temporal_attention_mask.shape != (batch_size, sequence_length, sequence_length):
|
||||
zero_rank_print(f"build attn mask of type {self.causal_temporal_attention_mask_type}")
|
||||
if self.causal_temporal_attention_mask_type == "causal":
|
||||
# 1. vanilla causal mask
|
||||
mask = torch.tril(torch.ones(sequence_length, sequence_length))
|
||||
|
||||
self.transformer_blocks = nn.ModuleList(
|
||||
[
|
||||
TemporalTransformerBlock(
|
||||
dim=inner_dim,
|
||||
num_attention_heads=num_attention_heads,
|
||||
attention_head_dim=attention_head_dim,
|
||||
attention_block_types=attention_block_types,
|
||||
dropout=dropout,
|
||||
norm_num_groups=norm_num_groups,
|
||||
cross_attention_dim=cross_attention_dim,
|
||||
activation_fn=activation_fn,
|
||||
attention_bias=attention_bias,
|
||||
upcast_attention=upcast_attention,
|
||||
cross_frame_attention_mode=cross_frame_attention_mode,
|
||||
temporal_position_encoding=temporal_position_encoding,
|
||||
temporal_position_encoding_max_len=temporal_position_encoding_max_len,
|
||||
)
|
||||
for d in range(num_layers)
|
||||
]
|
||||
)
|
||||
self.proj_out = nn.Linear(inner_dim, in_channels)
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None, attention_mask=None):
|
||||
assert hidden_states.dim() == 5, f"Expected hidden_states to have ndim=5, but got ndim={hidden_states.dim()}."
|
||||
video_length = hidden_states.shape[2]
|
||||
hidden_states = rearrange(hidden_states, "b c f h w -> (b f) c h w")
|
||||
elif self.causal_temporal_attention_mask_type == "2-seq":
|
||||
# 2. 2-seq
|
||||
mask = torch.zeros(sequence_length, sequence_length)
|
||||
mask[:sequence_length // 2, :sequence_length // 2] = 1
|
||||
mask[-sequence_length // 2:, -sequence_length // 2:] = 1
|
||||
|
||||
elif self.causal_temporal_attention_mask_type == "0-prev":
|
||||
# attn to the previous frame
|
||||
indices = torch.arange(sequence_length)
|
||||
indices_prev = indices - 1
|
||||
indices_prev[0] = 0
|
||||
mask = torch.zeros(sequence_length, sequence_length)
|
||||
mask[:, 0] = 1.
|
||||
mask[indices, indices_prev] = 1.
|
||||
|
||||
batch, channel, height, weight = hidden_states.shape
|
||||
residual = hidden_states
|
||||
elif self.causal_temporal_attention_mask_type == "0":
|
||||
# only attn to first frame
|
||||
mask = torch.zeros(sequence_length, sequence_length)
|
||||
mask[:,0] = 1
|
||||
|
||||
hidden_states = self.norm(hidden_states)
|
||||
inner_dim = hidden_states.shape[1]
|
||||
hidden_states = hidden_states.permute(0, 2, 3, 1).reshape(batch, height * weight, inner_dim)
|
||||
hidden_states = self.proj_in(hidden_states)
|
||||
elif self.causal_temporal_attention_mask_type == "wo-self":
|
||||
indices = torch.arange(sequence_length)
|
||||
mask = torch.ones(sequence_length, sequence_length)
|
||||
mask[indices, indices] = 0
|
||||
|
||||
# Transformer Blocks
|
||||
for block in self.transformer_blocks:
|
||||
hidden_states = block(hidden_states, encoder_hidden_states=encoder_hidden_states, video_length=video_length)
|
||||
|
||||
# output
|
||||
hidden_states = self.proj_out(hidden_states)
|
||||
hidden_states = hidden_states.reshape(batch, height, weight, inner_dim).permute(0, 3, 1, 2).contiguous()
|
||||
elif self.causal_temporal_attention_mask_type == "circle":
|
||||
indices = torch.arange(sequence_length)
|
||||
indices_prev = indices - 1
|
||||
indices_prev[0] = 0
|
||||
|
||||
output = hidden_states + residual
|
||||
output = rearrange(output, "(b f) c h w -> b c f h w", f=video_length)
|
||||
|
||||
return output
|
||||
mask = torch.eye(sequence_length)
|
||||
mask[indices, indices_prev] = 1
|
||||
mask[0,-1] = 1
|
||||
|
||||
else: raise ValueError
|
||||
|
||||
# for sanity check
|
||||
if dim == 320: zero_rank_print(mask)
|
||||
|
||||
# generate attention mask fron binary values
|
||||
mask = mask.masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
|
||||
mask = mask.unsqueeze(0)
|
||||
mask = mask.repeat(batch_size, 1, 1)
|
||||
|
||||
self.causal_temporal_attention_mask = mask.to(hidden_states.device)
|
||||
|
||||
return self.causal_temporal_attention_mask
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None, attention_mask=None):
|
||||
residual = hidden_states
|
||||
assert hidden_states.dim() == 5, f"Expected hidden_states to have ndim=5, but got ndim={hidden_states.dim()}."
|
||||
height, width = hidden_states.shape[-2:]
|
||||
|
||||
hidden_states = self.norm(hidden_states)
|
||||
|
||||
hidden_states = rearrange(hidden_states, "b c f h w -> (b h w) f c")
|
||||
hidden_states = self.proj_in(hidden_states)
|
||||
if self.spatial_position_encoding:
|
||||
|
||||
video_length = hidden_states.shape[1]
|
||||
hidden_states = rearrange(hidden_states, "(b h w) f c -> (b f) h w c", h=height, w=width)
|
||||
pos_encoding = self.pos_encoder_2d(hidden_states)
|
||||
pos_encoding = rearrange(pos_encoding, "(b f) h w c -> (b h w) f c", f = video_length)
|
||||
hidden_states = rearrange(hidden_states, "(b f) h w c -> (b h w) f c", f=video_length)
|
||||
|
||||
attention_mask = self.get_causal_temporal_attention_mask(hidden_states) if self.causal_temporal_attention else attention_mask
|
||||
|
||||
# Transformer Blocks
|
||||
for block in self.transformer_blocks:
|
||||
if not self.spatial_position_encoding :
|
||||
pos_encoding = None
|
||||
|
||||
hidden_states = block(hidden_states, pos_encoding=pos_encoding, encoder_hidden_states=encoder_hidden_states, attention_mask=attention_mask)
|
||||
|
||||
hidden_states = self.proj_out(hidden_states)
|
||||
|
||||
hidden_states = rearrange(hidden_states, "(b h w) f c -> b c f h w", h=height, w=width)
|
||||
|
||||
output = hidden_states + residual
|
||||
# output = hidden_states
|
||||
|
||||
return output
|
||||
|
||||
class TemporalTransformerBlock(nn.Module):
|
||||
def __init__(
|
||||
self,
|
||||
dim,
|
||||
num_attention_heads,
|
||||
attention_head_dim,
|
||||
attention_block_types = ( "Temporal_Self", "Temporal_Self", ),
|
||||
dropout = 0.0,
|
||||
norm_num_groups = 32,
|
||||
cross_attention_dim = 768,
|
||||
activation_fn = "geglu",
|
||||
attention_bias = False,
|
||||
upcast_attention = False,
|
||||
cross_frame_attention_mode = None,
|
||||
temporal_position_encoding = False,
|
||||
temporal_position_encoding_max_len = 24,
|
||||
):
|
||||
super().__init__()
|
||||
def __init__(
|
||||
self,
|
||||
dim,
|
||||
num_attention_heads,
|
||||
attention_head_dim,
|
||||
attention_block_types = ( "Temporal_Self", "Temporal_Self", ),
|
||||
dropout = 0.0,
|
||||
norm_num_groups = 32,
|
||||
cross_attention_dim = 768,
|
||||
activation_fn = "geglu",
|
||||
attention_bias = False,
|
||||
upcast_attention = False,
|
||||
temporal_position_encoding = False,
|
||||
temporal_position_encoding_max_len = 32,
|
||||
):
|
||||
super().__init__()
|
||||
|
||||
attention_blocks = []
|
||||
norms = []
|
||||
|
||||
for block_name in attention_block_types:
|
||||
attention_blocks.append(
|
||||
VersatileAttention(
|
||||
attention_mode=block_name.split("_")[0],
|
||||
cross_attention_dim=cross_attention_dim if block_name.endswith("_Cross") else None,
|
||||
|
||||
query_dim=dim,
|
||||
heads=num_attention_heads,
|
||||
dim_head=attention_head_dim,
|
||||
dropout=dropout,
|
||||
bias=attention_bias,
|
||||
upcast_attention=upcast_attention,
|
||||
|
||||
cross_frame_attention_mode=cross_frame_attention_mode,
|
||||
temporal_position_encoding=temporal_position_encoding,
|
||||
temporal_position_encoding_max_len=temporal_position_encoding_max_len,
|
||||
)
|
||||
)
|
||||
norms.append(nn.LayerNorm(dim))
|
||||
|
||||
self.attention_blocks = nn.ModuleList(attention_blocks)
|
||||
self.norms = nn.ModuleList(norms)
|
||||
|
||||
self.ff = FeedForward(dim, dropout=dropout, activation_fn=activation_fn)
|
||||
self.ff_norm = nn.LayerNorm(dim)
|
||||
attention_blocks = []
|
||||
norms = []
|
||||
|
||||
for block_name in attention_block_types:
|
||||
attention_blocks.append(
|
||||
TemporalSelfAttention(
|
||||
attention_mode=block_name.split("_")[0],
|
||||
cross_attention_dim=cross_attention_dim if block_name.endswith("_Cross") else None,
|
||||
|
||||
query_dim=dim,
|
||||
heads=num_attention_heads,
|
||||
dim_head=attention_head_dim,
|
||||
dropout=dropout,
|
||||
bias=attention_bias,
|
||||
upcast_attention=upcast_attention,
|
||||
|
||||
temporal_position_encoding=temporal_position_encoding,
|
||||
temporal_position_encoding_max_len=temporal_position_encoding_max_len,
|
||||
)
|
||||
)
|
||||
norms.append(nn.LayerNorm(dim))
|
||||
|
||||
self.attention_blocks = nn.ModuleList(attention_blocks)
|
||||
self.norms = nn.ModuleList(norms)
|
||||
|
||||
self.ff = FeedForward(dim, dropout=dropout, activation_fn=activation_fn)
|
||||
self.ff_norm = nn.LayerNorm(dim)
|
||||
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None, attention_mask=None, video_length=None):
|
||||
for attention_block, norm in zip(self.attention_blocks, self.norms):
|
||||
norm_hidden_states = norm(hidden_states)
|
||||
hidden_states = attention_block(
|
||||
norm_hidden_states,
|
||||
encoder_hidden_states=encoder_hidden_states if attention_block.is_cross_attention else None,
|
||||
video_length=video_length,
|
||||
) + hidden_states
|
||||
|
||||
hidden_states = self.ff(self.ff_norm(hidden_states)) + hidden_states
|
||||
|
||||
output = hidden_states
|
||||
return output
|
||||
def forward(self, hidden_states, pos_encoding=None, encoder_hidden_states=None, attention_mask=None):
|
||||
for attention_block, norm in zip(self.attention_blocks, self.norms):
|
||||
if pos_encoding is not None:
|
||||
hidden_states += pos_encoding
|
||||
norm_hidden_states = norm(hidden_states)
|
||||
hidden_states = attention_block(
|
||||
norm_hidden_states,
|
||||
encoder_hidden_states=encoder_hidden_states,
|
||||
attention_mask=attention_mask,
|
||||
) + hidden_states
|
||||
|
||||
hidden_states = self.ff(self.ff_norm(hidden_states)) + hidden_states
|
||||
|
||||
output = hidden_states
|
||||
return output
|
||||
|
||||
|
||||
def get_emb(sin_inp):
|
||||
"""
|
||||
Gets a base embedding for one dimension with sin and cos intertwined
|
||||
"""
|
||||
emb = torch.stack((sin_inp.sin(), sin_inp.cos()), dim=-1)
|
||||
return torch.flatten(emb, -2, -1)
|
||||
|
||||
class PositionalEncoding2D(nn.Module):
|
||||
def __init__(self, channels):
|
||||
"""
|
||||
:param channels: The last dimension of the tensor you want to apply pos emb to.
|
||||
"""
|
||||
super(PositionalEncoding2D, self).__init__()
|
||||
self.org_channels = channels
|
||||
channels = int(np.ceil(channels / 4) * 2)
|
||||
self.channels = channels
|
||||
inv_freq = 1.0 / (10000 ** (torch.arange(0, channels, 2).float() / channels))
|
||||
self.register_buffer("inv_freq", inv_freq)
|
||||
self.register_buffer("cached_penc", None)
|
||||
|
||||
def forward(self, tensor):
|
||||
"""
|
||||
:param tensor: A 4d tensor of size (batch_size, x, y, ch)
|
||||
:return: Positional Encoding Matrix of size (batch_size, x, y, ch)
|
||||
"""
|
||||
if len(tensor.shape) != 4:
|
||||
raise RuntimeError("The input tensor has to be 4d!")
|
||||
|
||||
if self.cached_penc is not None and self.cached_penc.shape == tensor.shape:
|
||||
return self.cached_penc
|
||||
|
||||
self.cached_penc = None
|
||||
batch_size, x, y, orig_ch = tensor.shape
|
||||
pos_x = torch.arange(x, device=tensor.device).type(self.inv_freq.type())
|
||||
pos_y = torch.arange(y, device=tensor.device).type(self.inv_freq.type())
|
||||
sin_inp_x = torch.einsum("i,j->ij", pos_x, self.inv_freq)
|
||||
sin_inp_y = torch.einsum("i,j->ij", pos_y, self.inv_freq)
|
||||
emb_x = get_emb(sin_inp_x).unsqueeze(1)
|
||||
emb_y = get_emb(sin_inp_y)
|
||||
emb = torch.zeros((x, y, self.channels * 2), device=tensor.device).type(
|
||||
tensor.type()
|
||||
)
|
||||
emb[:, :, : self.channels] = emb_x
|
||||
emb[:, :, self.channels : 2 * self.channels] = emb_y
|
||||
|
||||
self.cached_penc = emb[None, :, :, :orig_ch].repeat(tensor.shape[0], 1, 1, 1)
|
||||
return self.cached_penc
|
||||
|
||||
class PositionalEncoding(nn.Module):
|
||||
def __init__(
|
||||
self,
|
||||
d_model,
|
||||
dropout = 0.,
|
||||
max_len = 24
|
||||
):
|
||||
super().__init__()
|
||||
self.dropout = nn.Dropout(p=dropout)
|
||||
position = torch.arange(max_len).unsqueeze(1)
|
||||
div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
|
||||
pe = torch.zeros(1, max_len, d_model)
|
||||
pe[0, :, 0::2] = torch.sin(position * div_term)
|
||||
pe[0, :, 1::2] = torch.cos(position * div_term)
|
||||
self.register_buffer('pe', pe)
|
||||
def __init__(
|
||||
self,
|
||||
d_model,
|
||||
dropout = 0.,
|
||||
max_len = 32,
|
||||
):
|
||||
super().__init__()
|
||||
self.dropout = nn.Dropout(p=dropout)
|
||||
position = torch.arange(max_len).unsqueeze(1)
|
||||
div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
|
||||
pe = torch.zeros(1, max_len, d_model)
|
||||
pe[0, :, 0::2] = torch.sin(position * div_term)
|
||||
pe[0, :, 1::2] = torch.cos(position * div_term)
|
||||
self.register_buffer('pe', pe)
|
||||
|
||||
def forward(self, x):
|
||||
x = x + self.pe[:, :x.size(1)]
|
||||
return self.dropout(x)
|
||||
def forward(self, x):
|
||||
# if x.size(1) < 16:
|
||||
# start_idx = random.randint(0, 12)
|
||||
# else:
|
||||
# start_idx = 0
|
||||
|
||||
x = x + self.pe[:, :x.size(1)]
|
||||
return self.dropout(x)
|
||||
|
||||
|
||||
class VersatileAttention(CrossAttention):
|
||||
def __init__(
|
||||
self,
|
||||
attention_mode = None,
|
||||
cross_frame_attention_mode = None,
|
||||
temporal_position_encoding = False,
|
||||
temporal_position_encoding_max_len = 24,
|
||||
*args, **kwargs
|
||||
):
|
||||
super().__init__(*args, **kwargs)
|
||||
assert attention_mode == "Temporal"
|
||||
class TemporalSelfAttention(Attention):
|
||||
def __init__(
|
||||
self,
|
||||
attention_mode = None,
|
||||
temporal_position_encoding = False,
|
||||
temporal_position_encoding_max_len = 32,
|
||||
*args, **kwargs
|
||||
):
|
||||
super().__init__(*args, **kwargs)
|
||||
assert attention_mode == "Temporal"
|
||||
|
||||
self.attention_mode = attention_mode
|
||||
self.is_cross_attention = kwargs["cross_attention_dim"] is not None
|
||||
|
||||
self.pos_encoder = PositionalEncoding(
|
||||
kwargs["query_dim"],
|
||||
dropout=0.,
|
||||
max_len=temporal_position_encoding_max_len
|
||||
) if (temporal_position_encoding and attention_mode == "Temporal") else None
|
||||
self.pos_encoder = PositionalEncoding(
|
||||
kwargs["query_dim"],
|
||||
max_len=temporal_position_encoding_max_len
|
||||
) if temporal_position_encoding else None
|
||||
|
||||
def extra_repr(self):
|
||||
return f"(Module Info) Attention_Mode: {self.attention_mode}, Is_Cross_Attention: {self.is_cross_attention}"
|
||||
def set_use_memory_efficient_attention_xformers(
|
||||
self, use_memory_efficient_attention_xformers: bool, attention_op: Optional[Callable] = None
|
||||
):
|
||||
# disable motion module efficient xformers to avoid bad results, don't know why
|
||||
# TODO: fix this bug
|
||||
pass
|
||||
|
||||
def forward(self, hidden_states, encoder_hidden_states=None, attention_mask=None, video_length=None):
|
||||
batch_size, sequence_length, _ = hidden_states.shape
|
||||
def forward(self, hidden_states, encoder_hidden_states=None, attention_mask=None, **cross_attention_kwargs):
|
||||
# The `Attention` class can call different attention processors / attention functions
|
||||
# here we simply pass along all tensors to the selected processor class
|
||||
# For standard processors that are defined here, `**cross_attention_kwargs` is empty
|
||||
|
||||
if self.attention_mode == "Temporal":
|
||||
d = hidden_states.shape[1]
|
||||
hidden_states = rearrange(hidden_states, "(b f) d c -> (b d) f c", f=video_length)
|
||||
|
||||
if self.pos_encoder is not None:
|
||||
hidden_states = self.pos_encoder(hidden_states)
|
||||
|
||||
encoder_hidden_states = repeat(encoder_hidden_states, "b n c -> (b d) n c", d=d) if encoder_hidden_states is not None else encoder_hidden_states
|
||||
else:
|
||||
raise NotImplementedError
|
||||
# add position encoding
|
||||
hidden_states = self.pos_encoder(hidden_states)
|
||||
|
||||
encoder_hidden_states = encoder_hidden_states
|
||||
if hasattr(self.processor, "__call__"):
|
||||
return self.processor.__call__(
|
||||
self,
|
||||
hidden_states,
|
||||
encoder_hidden_states=None,
|
||||
attention_mask=attention_mask,
|
||||
**cross_attention_kwargs,
|
||||
)
|
||||
|
||||
if self.group_norm is not None:
|
||||
hidden_states = self.group_norm(hidden_states.transpose(1, 2)).transpose(1, 2)
|
||||
|
||||
query = self.to_q(hidden_states)
|
||||
dim = query.shape[-1]
|
||||
query = self.reshape_heads_to_batch_dim(query)
|
||||
|
||||
if self.added_kv_proj_dim is not None:
|
||||
raise NotImplementedError
|
||||
|
||||
encoder_hidden_states = encoder_hidden_states if encoder_hidden_states is not None else hidden_states
|
||||
key = self.to_k(encoder_hidden_states)
|
||||
value = self.to_v(encoder_hidden_states)
|
||||
|
||||
key = self.reshape_heads_to_batch_dim(key)
|
||||
value = self.reshape_heads_to_batch_dim(value)
|
||||
|
||||
if attention_mask is not None:
|
||||
if attention_mask.shape[-1] != query.shape[1]:
|
||||
target_length = query.shape[1]
|
||||
attention_mask = F.pad(attention_mask, (0, target_length), value=0.0)
|
||||
attention_mask = attention_mask.repeat_interleave(self.heads, dim=0)
|
||||
|
||||
# attention, what we cannot get enough of
|
||||
if self._use_memory_efficient_attention_xformers:
|
||||
hidden_states = self._memory_efficient_attention_xformers(query, key, value, attention_mask)
|
||||
# Some versions of xformers return output in fp32, cast it back to the dtype of the input
|
||||
hidden_states = hidden_states.to(query.dtype)
|
||||
else:
|
||||
if self._slice_size is None or query.shape[0] // self._slice_size == 1:
|
||||
hidden_states = self._attention(query, key, value, attention_mask)
|
||||
else:
|
||||
hidden_states = self._sliced_attention(query, key, value, sequence_length, dim, attention_mask)
|
||||
|
||||
# linear proj
|
||||
hidden_states = self.to_out[0](hidden_states)
|
||||
|
||||
# dropout
|
||||
hidden_states = self.to_out[1](hidden_states)
|
||||
|
||||
if self.attention_mode == "Temporal":
|
||||
hidden_states = rearrange(hidden_states, "(b d) f c -> (b f) d c", d=d)
|
||||
|
||||
return hidden_states
|
||||
else:
|
||||
return self.processor(
|
||||
self,
|
||||
hidden_states,
|
||||
encoder_hidden_states=None,
|
||||
attention_mask=attention_mask,
|
||||
**cross_attention_kwargs,
|
||||
)
|
||||
|
||||
@@ -1,217 +0,0 @@
|
||||
# Adapted from https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/resnet.py
|
||||
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.nn.functional as F
|
||||
|
||||
from einops import rearrange
|
||||
|
||||
|
||||
class InflatedConv3d(nn.Conv2d):
|
||||
def forward(self, x):
|
||||
video_length = x.shape[2]
|
||||
|
||||
x = rearrange(x, "b c f h w -> (b f) c h w")
|
||||
x = super().forward(x)
|
||||
x = rearrange(x, "(b f) c h w -> b c f h w", f=video_length)
|
||||
|
||||
return x
|
||||
|
||||
|
||||
class InflatedGroupNorm(nn.GroupNorm):
|
||||
def forward(self, x):
|
||||
video_length = x.shape[2]
|
||||
|
||||
x = rearrange(x, "b c f h w -> (b f) c h w")
|
||||
x = super().forward(x)
|
||||
x = rearrange(x, "(b f) c h w -> b c f h w", f=video_length)
|
||||
|
||||
return x
|
||||
|
||||
|
||||
class Upsample3D(nn.Module):
|
||||
def __init__(self, channels, use_conv=False, use_conv_transpose=False, out_channels=None, name="conv"):
|
||||
super().__init__()
|
||||
self.channels = channels
|
||||
self.out_channels = out_channels or channels
|
||||
self.use_conv = use_conv
|
||||
self.use_conv_transpose = use_conv_transpose
|
||||
self.name = name
|
||||
|
||||
conv = None
|
||||
if use_conv_transpose:
|
||||
raise NotImplementedError
|
||||
elif use_conv:
|
||||
self.conv = InflatedConv3d(self.channels, self.out_channels, 3, padding=1)
|
||||
|
||||
def forward(self, hidden_states, output_size=None):
|
||||
assert hidden_states.shape[1] == self.channels
|
||||
|
||||
if self.use_conv_transpose:
|
||||
raise NotImplementedError
|
||||
|
||||
# Cast to float32 to as 'upsample_nearest2d_out_frame' op does not support bfloat16
|
||||
dtype = hidden_states.dtype
|
||||
if dtype == torch.bfloat16:
|
||||
hidden_states = hidden_states.to(torch.float32)
|
||||
|
||||
# upsample_nearest_nhwc fails with large batch sizes. see https://github.com/huggingface/diffusers/issues/984
|
||||
if hidden_states.shape[0] >= 64:
|
||||
hidden_states = hidden_states.contiguous()
|
||||
|
||||
# if `output_size` is passed we force the interpolation output
|
||||
# size and do not make use of `scale_factor=2`
|
||||
if output_size is None:
|
||||
hidden_states = F.interpolate(hidden_states, scale_factor=[1.0, 2.0, 2.0], mode="nearest")
|
||||
else:
|
||||
hidden_states = F.interpolate(hidden_states, size=output_size, mode="nearest")
|
||||
|
||||
# If the input is bfloat16, we cast back to bfloat16
|
||||
if dtype == torch.bfloat16:
|
||||
hidden_states = hidden_states.to(dtype)
|
||||
|
||||
# if self.use_conv:
|
||||
# if self.name == "conv":
|
||||
# hidden_states = self.conv(hidden_states)
|
||||
# else:
|
||||
# hidden_states = self.Conv2d_0(hidden_states)
|
||||
hidden_states = self.conv(hidden_states)
|
||||
|
||||
return hidden_states
|
||||
|
||||
|
||||
class Downsample3D(nn.Module):
|
||||
def __init__(self, channels, use_conv=False, out_channels=None, padding=1, name="conv"):
|
||||
super().__init__()
|
||||
self.channels = channels
|
||||
self.out_channels = out_channels or channels
|
||||
self.use_conv = use_conv
|
||||
self.padding = padding
|
||||
stride = 2
|
||||
self.name = name
|
||||
|
||||
if use_conv:
|
||||
self.conv = InflatedConv3d(self.channels, self.out_channels, 3, stride=stride, padding=padding)
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
def forward(self, hidden_states):
|
||||
assert hidden_states.shape[1] == self.channels
|
||||
if self.use_conv and self.padding == 0:
|
||||
raise NotImplementedError
|
||||
|
||||
assert hidden_states.shape[1] == self.channels
|
||||
hidden_states = self.conv(hidden_states)
|
||||
|
||||
return hidden_states
|
||||
|
||||
|
||||
class ResnetBlock3D(nn.Module):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
in_channels,
|
||||
out_channels=None,
|
||||
conv_shortcut=False,
|
||||
dropout=0.0,
|
||||
temb_channels=512,
|
||||
groups=32,
|
||||
groups_out=None,
|
||||
pre_norm=True,
|
||||
eps=1e-6,
|
||||
non_linearity="swish",
|
||||
time_embedding_norm="default",
|
||||
output_scale_factor=1.0,
|
||||
use_in_shortcut=None,
|
||||
use_inflated_groupnorm=None,
|
||||
):
|
||||
super().__init__()
|
||||
self.pre_norm = pre_norm
|
||||
self.pre_norm = True
|
||||
self.in_channels = in_channels
|
||||
out_channels = in_channels if out_channels is None else out_channels
|
||||
self.out_channels = out_channels
|
||||
self.use_conv_shortcut = conv_shortcut
|
||||
self.time_embedding_norm = time_embedding_norm
|
||||
self.output_scale_factor = output_scale_factor
|
||||
|
||||
if groups_out is None:
|
||||
groups_out = groups
|
||||
|
||||
assert use_inflated_groupnorm != None
|
||||
if use_inflated_groupnorm:
|
||||
self.norm1 = InflatedGroupNorm(num_groups=groups, num_channels=in_channels, eps=eps, affine=True)
|
||||
else:
|
||||
self.norm1 = torch.nn.GroupNorm(num_groups=groups, num_channels=in_channels, eps=eps, affine=True)
|
||||
|
||||
self.conv1 = InflatedConv3d(in_channels, out_channels, kernel_size=3, stride=1, padding=1)
|
||||
|
||||
if temb_channels is not None:
|
||||
if self.time_embedding_norm == "default":
|
||||
time_emb_proj_out_channels = out_channels
|
||||
elif self.time_embedding_norm == "scale_shift":
|
||||
time_emb_proj_out_channels = out_channels * 2
|
||||
else:
|
||||
raise ValueError(f"unknown time_embedding_norm : {self.time_embedding_norm} ")
|
||||
|
||||
self.time_emb_proj = torch.nn.Linear(temb_channels, time_emb_proj_out_channels)
|
||||
else:
|
||||
self.time_emb_proj = None
|
||||
|
||||
if use_inflated_groupnorm:
|
||||
self.norm2 = InflatedGroupNorm(num_groups=groups_out, num_channels=out_channels, eps=eps, affine=True)
|
||||
else:
|
||||
self.norm2 = torch.nn.GroupNorm(num_groups=groups_out, num_channels=out_channels, eps=eps, affine=True)
|
||||
|
||||
self.dropout = torch.nn.Dropout(dropout)
|
||||
self.conv2 = InflatedConv3d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
|
||||
|
||||
if non_linearity == "swish":
|
||||
self.nonlinearity = lambda x: F.silu(x)
|
||||
elif non_linearity == "mish":
|
||||
self.nonlinearity = Mish()
|
||||
elif non_linearity == "silu":
|
||||
self.nonlinearity = nn.SiLU()
|
||||
|
||||
self.use_in_shortcut = self.in_channels != self.out_channels if use_in_shortcut is None else use_in_shortcut
|
||||
|
||||
self.conv_shortcut = None
|
||||
if self.use_in_shortcut:
|
||||
self.conv_shortcut = InflatedConv3d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
|
||||
|
||||
def forward(self, input_tensor, temb):
|
||||
hidden_states = input_tensor
|
||||
|
||||
hidden_states = self.norm1(hidden_states)
|
||||
hidden_states = self.nonlinearity(hidden_states)
|
||||
|
||||
hidden_states = self.conv1(hidden_states)
|
||||
|
||||
if temb is not None:
|
||||
temb = self.time_emb_proj(self.nonlinearity(temb))[:, :, None, None, None]
|
||||
|
||||
if temb is not None and self.time_embedding_norm == "default":
|
||||
hidden_states = hidden_states + temb
|
||||
|
||||
hidden_states = self.norm2(hidden_states)
|
||||
|
||||
if temb is not None and self.time_embedding_norm == "scale_shift":
|
||||
scale, shift = torch.chunk(temb, 2, dim=1)
|
||||
hidden_states = hidden_states * (1 + scale) + shift
|
||||
|
||||
hidden_states = self.nonlinearity(hidden_states)
|
||||
|
||||
hidden_states = self.dropout(hidden_states)
|
||||
hidden_states = self.conv2(hidden_states)
|
||||
|
||||
if self.conv_shortcut is not None:
|
||||
input_tensor = self.conv_shortcut(input_tensor)
|
||||
|
||||
output_tensor = (input_tensor + hidden_states) / self.output_scale_factor
|
||||
|
||||
return output_tensor
|
||||
|
||||
|
||||
class Mish(torch.nn.Module):
|
||||
def forward(self, hidden_states):
|
||||
return hidden_states * torch.tanh(torch.nn.functional.softplus(hidden_states))
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -1,959 +0,0 @@
|
||||
# coding=utf-8
|
||||
# Copyright 2023 The HuggingFace Inc. team.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
""" Conversion script for the Stable Diffusion checkpoints."""
|
||||
|
||||
import re
|
||||
from io import BytesIO
|
||||
from typing import Optional
|
||||
|
||||
import requests
|
||||
import torch
|
||||
from transformers import (
|
||||
AutoFeatureExtractor,
|
||||
BertTokenizerFast,
|
||||
CLIPImageProcessor,
|
||||
CLIPTextModel,
|
||||
CLIPTextModelWithProjection,
|
||||
CLIPTokenizer,
|
||||
CLIPVisionConfig,
|
||||
CLIPVisionModelWithProjection,
|
||||
)
|
||||
|
||||
from diffusers.models import (
|
||||
AutoencoderKL,
|
||||
PriorTransformer,
|
||||
UNet2DConditionModel,
|
||||
)
|
||||
from diffusers.schedulers import (
|
||||
DDIMScheduler,
|
||||
DDPMScheduler,
|
||||
DPMSolverMultistepScheduler,
|
||||
EulerAncestralDiscreteScheduler,
|
||||
EulerDiscreteScheduler,
|
||||
HeunDiscreteScheduler,
|
||||
LMSDiscreteScheduler,
|
||||
PNDMScheduler,
|
||||
UnCLIPScheduler,
|
||||
)
|
||||
from diffusers.utils.import_utils import BACKENDS_MAPPING
|
||||
|
||||
|
||||
def shave_segments(path, n_shave_prefix_segments=1):
|
||||
"""
|
||||
Removes segments. Positive values shave the first segments, negative shave the last segments.
|
||||
"""
|
||||
if n_shave_prefix_segments >= 0:
|
||||
return ".".join(path.split(".")[n_shave_prefix_segments:])
|
||||
else:
|
||||
return ".".join(path.split(".")[:n_shave_prefix_segments])
|
||||
|
||||
|
||||
def renew_resnet_paths(old_list, n_shave_prefix_segments=0):
|
||||
"""
|
||||
Updates paths inside resnets to the new naming scheme (local renaming)
|
||||
"""
|
||||
mapping = []
|
||||
for old_item in old_list:
|
||||
new_item = old_item.replace("in_layers.0", "norm1")
|
||||
new_item = new_item.replace("in_layers.2", "conv1")
|
||||
|
||||
new_item = new_item.replace("out_layers.0", "norm2")
|
||||
new_item = new_item.replace("out_layers.3", "conv2")
|
||||
|
||||
new_item = new_item.replace("emb_layers.1", "time_emb_proj")
|
||||
new_item = new_item.replace("skip_connection", "conv_shortcut")
|
||||
|
||||
new_item = shave_segments(new_item, n_shave_prefix_segments=n_shave_prefix_segments)
|
||||
|
||||
mapping.append({"old": old_item, "new": new_item})
|
||||
|
||||
return mapping
|
||||
|
||||
|
||||
def renew_vae_resnet_paths(old_list, n_shave_prefix_segments=0):
|
||||
"""
|
||||
Updates paths inside resnets to the new naming scheme (local renaming)
|
||||
"""
|
||||
mapping = []
|
||||
for old_item in old_list:
|
||||
new_item = old_item
|
||||
|
||||
new_item = new_item.replace("nin_shortcut", "conv_shortcut")
|
||||
new_item = shave_segments(new_item, n_shave_prefix_segments=n_shave_prefix_segments)
|
||||
|
||||
mapping.append({"old": old_item, "new": new_item})
|
||||
|
||||
return mapping
|
||||
|
||||
|
||||
def renew_attention_paths(old_list, n_shave_prefix_segments=0):
|
||||
"""
|
||||
Updates paths inside attentions to the new naming scheme (local renaming)
|
||||
"""
|
||||
mapping = []
|
||||
for old_item in old_list:
|
||||
new_item = old_item
|
||||
|
||||
# new_item = new_item.replace('norm.weight', 'group_norm.weight')
|
||||
# new_item = new_item.replace('norm.bias', 'group_norm.bias')
|
||||
|
||||
# new_item = new_item.replace('proj_out.weight', 'proj_attn.weight')
|
||||
# new_item = new_item.replace('proj_out.bias', 'proj_attn.bias')
|
||||
|
||||
# new_item = shave_segments(new_item, n_shave_prefix_segments=n_shave_prefix_segments)
|
||||
|
||||
mapping.append({"old": old_item, "new": new_item})
|
||||
|
||||
return mapping
|
||||
|
||||
|
||||
def renew_vae_attention_paths(old_list, n_shave_prefix_segments=0):
|
||||
"""
|
||||
Updates paths inside attentions to the new naming scheme (local renaming)
|
||||
"""
|
||||
mapping = []
|
||||
for old_item in old_list:
|
||||
new_item = old_item
|
||||
|
||||
new_item = new_item.replace("norm.weight", "group_norm.weight")
|
||||
new_item = new_item.replace("norm.bias", "group_norm.bias")
|
||||
|
||||
new_item = new_item.replace("q.weight", "query.weight")
|
||||
new_item = new_item.replace("q.bias", "query.bias")
|
||||
|
||||
new_item = new_item.replace("k.weight", "key.weight")
|
||||
new_item = new_item.replace("k.bias", "key.bias")
|
||||
|
||||
new_item = new_item.replace("v.weight", "value.weight")
|
||||
new_item = new_item.replace("v.bias", "value.bias")
|
||||
|
||||
new_item = new_item.replace("proj_out.weight", "proj_attn.weight")
|
||||
new_item = new_item.replace("proj_out.bias", "proj_attn.bias")
|
||||
|
||||
new_item = shave_segments(new_item, n_shave_prefix_segments=n_shave_prefix_segments)
|
||||
|
||||
mapping.append({"old": old_item, "new": new_item})
|
||||
|
||||
return mapping
|
||||
|
||||
|
||||
def assign_to_checkpoint(
|
||||
paths, checkpoint, old_checkpoint, attention_paths_to_split=None, additional_replacements=None, config=None
|
||||
):
|
||||
"""
|
||||
This does the final conversion step: take locally converted weights and apply a global renaming to them. It splits
|
||||
attention layers, and takes into account additional replacements that may arise.
|
||||
|
||||
Assigns the weights to the new checkpoint.
|
||||
"""
|
||||
assert isinstance(paths, list), "Paths should be a list of dicts containing 'old' and 'new' keys."
|
||||
|
||||
# Splits the attention layers into three variables.
|
||||
if attention_paths_to_split is not None:
|
||||
for path, path_map in attention_paths_to_split.items():
|
||||
old_tensor = old_checkpoint[path]
|
||||
channels = old_tensor.shape[0] // 3
|
||||
|
||||
target_shape = (-1, channels) if len(old_tensor.shape) == 3 else (-1)
|
||||
|
||||
num_heads = old_tensor.shape[0] // config["num_head_channels"] // 3
|
||||
|
||||
old_tensor = old_tensor.reshape((num_heads, 3 * channels // num_heads) + old_tensor.shape[1:])
|
||||
query, key, value = old_tensor.split(channels // num_heads, dim=1)
|
||||
|
||||
checkpoint[path_map["query"]] = query.reshape(target_shape)
|
||||
checkpoint[path_map["key"]] = key.reshape(target_shape)
|
||||
checkpoint[path_map["value"]] = value.reshape(target_shape)
|
||||
|
||||
for path in paths:
|
||||
new_path = path["new"]
|
||||
|
||||
# These have already been assigned
|
||||
if attention_paths_to_split is not None and new_path in attention_paths_to_split:
|
||||
continue
|
||||
|
||||
# Global renaming happens here
|
||||
new_path = new_path.replace("middle_block.0", "mid_block.resnets.0")
|
||||
new_path = new_path.replace("middle_block.1", "mid_block.attentions.0")
|
||||
new_path = new_path.replace("middle_block.2", "mid_block.resnets.1")
|
||||
|
||||
if additional_replacements is not None:
|
||||
for replacement in additional_replacements:
|
||||
new_path = new_path.replace(replacement["old"], replacement["new"])
|
||||
|
||||
# proj_attn.weight has to be converted from conv 1D to linear
|
||||
if "proj_attn.weight" in new_path:
|
||||
checkpoint[new_path] = old_checkpoint[path["old"]][:, :, 0]
|
||||
else:
|
||||
checkpoint[new_path] = old_checkpoint[path["old"]]
|
||||
|
||||
|
||||
def conv_attn_to_linear(checkpoint):
|
||||
keys = list(checkpoint.keys())
|
||||
attn_keys = ["query.weight", "key.weight", "value.weight"]
|
||||
for key in keys:
|
||||
if ".".join(key.split(".")[-2:]) in attn_keys:
|
||||
if checkpoint[key].ndim > 2:
|
||||
checkpoint[key] = checkpoint[key][:, :, 0, 0]
|
||||
elif "proj_attn.weight" in key:
|
||||
if checkpoint[key].ndim > 2:
|
||||
checkpoint[key] = checkpoint[key][:, :, 0]
|
||||
|
||||
|
||||
def create_unet_diffusers_config(original_config, image_size: int, controlnet=False):
|
||||
"""
|
||||
Creates a config for the diffusers based on the config of the LDM model.
|
||||
"""
|
||||
if controlnet:
|
||||
unet_params = original_config.model.params.control_stage_config.params
|
||||
else:
|
||||
unet_params = original_config.model.params.unet_config.params
|
||||
|
||||
vae_params = original_config.model.params.first_stage_config.params.ddconfig
|
||||
|
||||
block_out_channels = [unet_params.model_channels * mult for mult in unet_params.channel_mult]
|
||||
|
||||
down_block_types = []
|
||||
resolution = 1
|
||||
for i in range(len(block_out_channels)):
|
||||
block_type = "CrossAttnDownBlock2D" if resolution in unet_params.attention_resolutions else "DownBlock2D"
|
||||
down_block_types.append(block_type)
|
||||
if i != len(block_out_channels) - 1:
|
||||
resolution *= 2
|
||||
|
||||
up_block_types = []
|
||||
for i in range(len(block_out_channels)):
|
||||
block_type = "CrossAttnUpBlock2D" if resolution in unet_params.attention_resolutions else "UpBlock2D"
|
||||
up_block_types.append(block_type)
|
||||
resolution //= 2
|
||||
|
||||
vae_scale_factor = 2 ** (len(vae_params.ch_mult) - 1)
|
||||
|
||||
head_dim = unet_params.num_heads if "num_heads" in unet_params else None
|
||||
use_linear_projection = (
|
||||
unet_params.use_linear_in_transformer if "use_linear_in_transformer" in unet_params else False
|
||||
)
|
||||
if use_linear_projection:
|
||||
# stable diffusion 2-base-512 and 2-768
|
||||
if head_dim is None:
|
||||
head_dim = [5, 10, 20, 20]
|
||||
|
||||
class_embed_type = None
|
||||
projection_class_embeddings_input_dim = None
|
||||
|
||||
if "num_classes" in unet_params:
|
||||
if unet_params.num_classes == "sequential":
|
||||
class_embed_type = "projection"
|
||||
assert "adm_in_channels" in unet_params
|
||||
projection_class_embeddings_input_dim = unet_params.adm_in_channels
|
||||
else:
|
||||
raise NotImplementedError(f"Unknown conditional unet num_classes config: {unet_params.num_classes}")
|
||||
|
||||
config = {
|
||||
"sample_size": image_size // vae_scale_factor,
|
||||
"in_channels": unet_params.in_channels,
|
||||
"down_block_types": tuple(down_block_types),
|
||||
"block_out_channels": tuple(block_out_channels),
|
||||
"layers_per_block": unet_params.num_res_blocks,
|
||||
"cross_attention_dim": unet_params.context_dim,
|
||||
"attention_head_dim": head_dim,
|
||||
"use_linear_projection": use_linear_projection,
|
||||
"class_embed_type": class_embed_type,
|
||||
"projection_class_embeddings_input_dim": projection_class_embeddings_input_dim,
|
||||
}
|
||||
|
||||
if not controlnet:
|
||||
config["out_channels"] = unet_params.out_channels
|
||||
config["up_block_types"] = tuple(up_block_types)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def create_vae_diffusers_config(original_config, image_size: int):
|
||||
"""
|
||||
Creates a config for the diffusers based on the config of the LDM model.
|
||||
"""
|
||||
vae_params = original_config.model.params.first_stage_config.params.ddconfig
|
||||
_ = original_config.model.params.first_stage_config.params.embed_dim
|
||||
|
||||
block_out_channels = [vae_params.ch * mult for mult in vae_params.ch_mult]
|
||||
down_block_types = ["DownEncoderBlock2D"] * len(block_out_channels)
|
||||
up_block_types = ["UpDecoderBlock2D"] * len(block_out_channels)
|
||||
|
||||
config = {
|
||||
"sample_size": image_size,
|
||||
"in_channels": vae_params.in_channels,
|
||||
"out_channels": vae_params.out_ch,
|
||||
"down_block_types": tuple(down_block_types),
|
||||
"up_block_types": tuple(up_block_types),
|
||||
"block_out_channels": tuple(block_out_channels),
|
||||
"latent_channels": vae_params.z_channels,
|
||||
"layers_per_block": vae_params.num_res_blocks,
|
||||
}
|
||||
return config
|
||||
|
||||
|
||||
def create_diffusers_schedular(original_config):
|
||||
schedular = DDIMScheduler(
|
||||
num_train_timesteps=original_config.model.params.timesteps,
|
||||
beta_start=original_config.model.params.linear_start,
|
||||
beta_end=original_config.model.params.linear_end,
|
||||
beta_schedule="scaled_linear",
|
||||
)
|
||||
return schedular
|
||||
|
||||
|
||||
def create_ldm_bert_config(original_config):
|
||||
bert_params = original_config.model.parms.cond_stage_config.params
|
||||
config = LDMBertConfig(
|
||||
d_model=bert_params.n_embed,
|
||||
encoder_layers=bert_params.n_layer,
|
||||
encoder_ffn_dim=bert_params.n_embed * 4,
|
||||
)
|
||||
return config
|
||||
|
||||
|
||||
def convert_ldm_unet_checkpoint(checkpoint, config, path=None, extract_ema=False, controlnet=False):
|
||||
"""
|
||||
Takes a state dict and a config, and returns a converted checkpoint.
|
||||
"""
|
||||
|
||||
# extract state_dict for UNet
|
||||
unet_state_dict = {}
|
||||
keys = list(checkpoint.keys())
|
||||
|
||||
if controlnet:
|
||||
unet_key = "control_model."
|
||||
else:
|
||||
unet_key = "model.diffusion_model."
|
||||
|
||||
# at least a 100 parameters have to start with `model_ema` in order for the checkpoint to be EMA
|
||||
if sum(k.startswith("model_ema") for k in keys) > 100 and extract_ema:
|
||||
print(f"Checkpoint {path} has both EMA and non-EMA weights.")
|
||||
print(
|
||||
"In this conversion only the EMA weights are extracted. If you want to instead extract the non-EMA"
|
||||
" weights (useful to continue fine-tuning), please make sure to remove the `--extract_ema` flag."
|
||||
)
|
||||
for key in keys:
|
||||
if key.startswith("model.diffusion_model"):
|
||||
flat_ema_key = "model_ema." + "".join(key.split(".")[1:])
|
||||
unet_state_dict[key.replace(unet_key, "")] = checkpoint.pop(flat_ema_key)
|
||||
else:
|
||||
if sum(k.startswith("model_ema") for k in keys) > 100:
|
||||
print(
|
||||
"In this conversion only the non-EMA weights are extracted. If you want to instead extract the EMA"
|
||||
" weights (usually better for inference), please make sure to add the `--extract_ema` flag."
|
||||
)
|
||||
|
||||
for key in keys:
|
||||
if key.startswith(unet_key):
|
||||
unet_state_dict[key.replace(unet_key, "")] = checkpoint.pop(key)
|
||||
|
||||
new_checkpoint = {}
|
||||
|
||||
new_checkpoint["time_embedding.linear_1.weight"] = unet_state_dict["time_embed.0.weight"]
|
||||
new_checkpoint["time_embedding.linear_1.bias"] = unet_state_dict["time_embed.0.bias"]
|
||||
new_checkpoint["time_embedding.linear_2.weight"] = unet_state_dict["time_embed.2.weight"]
|
||||
new_checkpoint["time_embedding.linear_2.bias"] = unet_state_dict["time_embed.2.bias"]
|
||||
|
||||
if config["class_embed_type"] is None:
|
||||
# No parameters to port
|
||||
...
|
||||
elif config["class_embed_type"] == "timestep" or config["class_embed_type"] == "projection":
|
||||
new_checkpoint["class_embedding.linear_1.weight"] = unet_state_dict["label_emb.0.0.weight"]
|
||||
new_checkpoint["class_embedding.linear_1.bias"] = unet_state_dict["label_emb.0.0.bias"]
|
||||
new_checkpoint["class_embedding.linear_2.weight"] = unet_state_dict["label_emb.0.2.weight"]
|
||||
new_checkpoint["class_embedding.linear_2.bias"] = unet_state_dict["label_emb.0.2.bias"]
|
||||
else:
|
||||
raise NotImplementedError(f"Not implemented `class_embed_type`: {config['class_embed_type']}")
|
||||
|
||||
new_checkpoint["conv_in.weight"] = unet_state_dict["input_blocks.0.0.weight"]
|
||||
new_checkpoint["conv_in.bias"] = unet_state_dict["input_blocks.0.0.bias"]
|
||||
|
||||
if not controlnet:
|
||||
new_checkpoint["conv_norm_out.weight"] = unet_state_dict["out.0.weight"]
|
||||
new_checkpoint["conv_norm_out.bias"] = unet_state_dict["out.0.bias"]
|
||||
new_checkpoint["conv_out.weight"] = unet_state_dict["out.2.weight"]
|
||||
new_checkpoint["conv_out.bias"] = unet_state_dict["out.2.bias"]
|
||||
|
||||
# Retrieves the keys for the input blocks only
|
||||
num_input_blocks = len({".".join(layer.split(".")[:2]) for layer in unet_state_dict if "input_blocks" in layer})
|
||||
input_blocks = {
|
||||
layer_id: [key for key in unet_state_dict if f"input_blocks.{layer_id}" in key]
|
||||
for layer_id in range(num_input_blocks)
|
||||
}
|
||||
|
||||
# Retrieves the keys for the middle blocks only
|
||||
num_middle_blocks = len({".".join(layer.split(".")[:2]) for layer in unet_state_dict if "middle_block" in layer})
|
||||
middle_blocks = {
|
||||
layer_id: [key for key in unet_state_dict if f"middle_block.{layer_id}" in key]
|
||||
for layer_id in range(num_middle_blocks)
|
||||
}
|
||||
|
||||
# Retrieves the keys for the output blocks only
|
||||
num_output_blocks = len({".".join(layer.split(".")[:2]) for layer in unet_state_dict if "output_blocks" in layer})
|
||||
output_blocks = {
|
||||
layer_id: [key for key in unet_state_dict if f"output_blocks.{layer_id}" in key]
|
||||
for layer_id in range(num_output_blocks)
|
||||
}
|
||||
|
||||
for i in range(1, num_input_blocks):
|
||||
block_id = (i - 1) // (config["layers_per_block"] + 1)
|
||||
layer_in_block_id = (i - 1) % (config["layers_per_block"] + 1)
|
||||
|
||||
resnets = [
|
||||
key for key in input_blocks[i] if f"input_blocks.{i}.0" in key and f"input_blocks.{i}.0.op" not in key
|
||||
]
|
||||
attentions = [key for key in input_blocks[i] if f"input_blocks.{i}.1" in key]
|
||||
|
||||
if f"input_blocks.{i}.0.op.weight" in unet_state_dict:
|
||||
new_checkpoint[f"down_blocks.{block_id}.downsamplers.0.conv.weight"] = unet_state_dict.pop(
|
||||
f"input_blocks.{i}.0.op.weight"
|
||||
)
|
||||
new_checkpoint[f"down_blocks.{block_id}.downsamplers.0.conv.bias"] = unet_state_dict.pop(
|
||||
f"input_blocks.{i}.0.op.bias"
|
||||
)
|
||||
|
||||
paths = renew_resnet_paths(resnets)
|
||||
meta_path = {"old": f"input_blocks.{i}.0", "new": f"down_blocks.{block_id}.resnets.{layer_in_block_id}"}
|
||||
assign_to_checkpoint(
|
||||
paths, new_checkpoint, unet_state_dict, additional_replacements=[meta_path], config=config
|
||||
)
|
||||
|
||||
if len(attentions):
|
||||
paths = renew_attention_paths(attentions)
|
||||
meta_path = {"old": f"input_blocks.{i}.1", "new": f"down_blocks.{block_id}.attentions.{layer_in_block_id}"}
|
||||
assign_to_checkpoint(
|
||||
paths, new_checkpoint, unet_state_dict, additional_replacements=[meta_path], config=config
|
||||
)
|
||||
|
||||
resnet_0 = middle_blocks[0]
|
||||
attentions = middle_blocks[1]
|
||||
resnet_1 = middle_blocks[2]
|
||||
|
||||
resnet_0_paths = renew_resnet_paths(resnet_0)
|
||||
assign_to_checkpoint(resnet_0_paths, new_checkpoint, unet_state_dict, config=config)
|
||||
|
||||
resnet_1_paths = renew_resnet_paths(resnet_1)
|
||||
assign_to_checkpoint(resnet_1_paths, new_checkpoint, unet_state_dict, config=config)
|
||||
|
||||
attentions_paths = renew_attention_paths(attentions)
|
||||
meta_path = {"old": "middle_block.1", "new": "mid_block.attentions.0"}
|
||||
assign_to_checkpoint(
|
||||
attentions_paths, new_checkpoint, unet_state_dict, additional_replacements=[meta_path], config=config
|
||||
)
|
||||
|
||||
for i in range(num_output_blocks):
|
||||
block_id = i // (config["layers_per_block"] + 1)
|
||||
layer_in_block_id = i % (config["layers_per_block"] + 1)
|
||||
output_block_layers = [shave_segments(name, 2) for name in output_blocks[i]]
|
||||
output_block_list = {}
|
||||
|
||||
for layer in output_block_layers:
|
||||
layer_id, layer_name = layer.split(".")[0], shave_segments(layer, 1)
|
||||
if layer_id in output_block_list:
|
||||
output_block_list[layer_id].append(layer_name)
|
||||
else:
|
||||
output_block_list[layer_id] = [layer_name]
|
||||
|
||||
if len(output_block_list) > 1:
|
||||
resnets = [key for key in output_blocks[i] if f"output_blocks.{i}.0" in key]
|
||||
attentions = [key for key in output_blocks[i] if f"output_blocks.{i}.1" in key]
|
||||
|
||||
resnet_0_paths = renew_resnet_paths(resnets)
|
||||
paths = renew_resnet_paths(resnets)
|
||||
|
||||
meta_path = {"old": f"output_blocks.{i}.0", "new": f"up_blocks.{block_id}.resnets.{layer_in_block_id}"}
|
||||
assign_to_checkpoint(
|
||||
paths, new_checkpoint, unet_state_dict, additional_replacements=[meta_path], config=config
|
||||
)
|
||||
|
||||
output_block_list = {k: sorted(v) for k, v in output_block_list.items()}
|
||||
if ["conv.bias", "conv.weight"] in output_block_list.values():
|
||||
index = list(output_block_list.values()).index(["conv.bias", "conv.weight"])
|
||||
new_checkpoint[f"up_blocks.{block_id}.upsamplers.0.conv.weight"] = unet_state_dict[
|
||||
f"output_blocks.{i}.{index}.conv.weight"
|
||||
]
|
||||
new_checkpoint[f"up_blocks.{block_id}.upsamplers.0.conv.bias"] = unet_state_dict[
|
||||
f"output_blocks.{i}.{index}.conv.bias"
|
||||
]
|
||||
|
||||
# Clear attentions as they have been attributed above.
|
||||
if len(attentions) == 2:
|
||||
attentions = []
|
||||
|
||||
if len(attentions):
|
||||
paths = renew_attention_paths(attentions)
|
||||
meta_path = {
|
||||
"old": f"output_blocks.{i}.1",
|
||||
"new": f"up_blocks.{block_id}.attentions.{layer_in_block_id}",
|
||||
}
|
||||
assign_to_checkpoint(
|
||||
paths, new_checkpoint, unet_state_dict, additional_replacements=[meta_path], config=config
|
||||
)
|
||||
else:
|
||||
resnet_0_paths = renew_resnet_paths(output_block_layers, n_shave_prefix_segments=1)
|
||||
for path in resnet_0_paths:
|
||||
old_path = ".".join(["output_blocks", str(i), path["old"]])
|
||||
new_path = ".".join(["up_blocks", str(block_id), "resnets", str(layer_in_block_id), path["new"]])
|
||||
|
||||
new_checkpoint[new_path] = unet_state_dict[old_path]
|
||||
|
||||
if controlnet:
|
||||
# conditioning embedding
|
||||
|
||||
orig_index = 0
|
||||
|
||||
new_checkpoint["controlnet_cond_embedding.conv_in.weight"] = unet_state_dict.pop(
|
||||
f"input_hint_block.{orig_index}.weight"
|
||||
)
|
||||
new_checkpoint["controlnet_cond_embedding.conv_in.bias"] = unet_state_dict.pop(
|
||||
f"input_hint_block.{orig_index}.bias"
|
||||
)
|
||||
|
||||
orig_index += 2
|
||||
|
||||
diffusers_index = 0
|
||||
|
||||
while diffusers_index < 6:
|
||||
new_checkpoint[f"controlnet_cond_embedding.blocks.{diffusers_index}.weight"] = unet_state_dict.pop(
|
||||
f"input_hint_block.{orig_index}.weight"
|
||||
)
|
||||
new_checkpoint[f"controlnet_cond_embedding.blocks.{diffusers_index}.bias"] = unet_state_dict.pop(
|
||||
f"input_hint_block.{orig_index}.bias"
|
||||
)
|
||||
diffusers_index += 1
|
||||
orig_index += 2
|
||||
|
||||
new_checkpoint["controlnet_cond_embedding.conv_out.weight"] = unet_state_dict.pop(
|
||||
f"input_hint_block.{orig_index}.weight"
|
||||
)
|
||||
new_checkpoint["controlnet_cond_embedding.conv_out.bias"] = unet_state_dict.pop(
|
||||
f"input_hint_block.{orig_index}.bias"
|
||||
)
|
||||
|
||||
# down blocks
|
||||
for i in range(num_input_blocks):
|
||||
new_checkpoint[f"controlnet_down_blocks.{i}.weight"] = unet_state_dict.pop(f"zero_convs.{i}.0.weight")
|
||||
new_checkpoint[f"controlnet_down_blocks.{i}.bias"] = unet_state_dict.pop(f"zero_convs.{i}.0.bias")
|
||||
|
||||
# mid block
|
||||
new_checkpoint["controlnet_mid_block.weight"] = unet_state_dict.pop("middle_block_out.0.weight")
|
||||
new_checkpoint["controlnet_mid_block.bias"] = unet_state_dict.pop("middle_block_out.0.bias")
|
||||
|
||||
return new_checkpoint
|
||||
|
||||
|
||||
def convert_ldm_vae_checkpoint(checkpoint, config):
|
||||
# extract state dict for VAE
|
||||
vae_state_dict = {}
|
||||
vae_key = "first_stage_model."
|
||||
keys = list(checkpoint.keys())
|
||||
for key in keys:
|
||||
if key.startswith(vae_key):
|
||||
vae_state_dict[key.replace(vae_key, "")] = checkpoint.get(key)
|
||||
|
||||
new_checkpoint = {}
|
||||
|
||||
new_checkpoint["encoder.conv_in.weight"] = vae_state_dict["encoder.conv_in.weight"]
|
||||
new_checkpoint["encoder.conv_in.bias"] = vae_state_dict["encoder.conv_in.bias"]
|
||||
new_checkpoint["encoder.conv_out.weight"] = vae_state_dict["encoder.conv_out.weight"]
|
||||
new_checkpoint["encoder.conv_out.bias"] = vae_state_dict["encoder.conv_out.bias"]
|
||||
new_checkpoint["encoder.conv_norm_out.weight"] = vae_state_dict["encoder.norm_out.weight"]
|
||||
new_checkpoint["encoder.conv_norm_out.bias"] = vae_state_dict["encoder.norm_out.bias"]
|
||||
|
||||
new_checkpoint["decoder.conv_in.weight"] = vae_state_dict["decoder.conv_in.weight"]
|
||||
new_checkpoint["decoder.conv_in.bias"] = vae_state_dict["decoder.conv_in.bias"]
|
||||
new_checkpoint["decoder.conv_out.weight"] = vae_state_dict["decoder.conv_out.weight"]
|
||||
new_checkpoint["decoder.conv_out.bias"] = vae_state_dict["decoder.conv_out.bias"]
|
||||
new_checkpoint["decoder.conv_norm_out.weight"] = vae_state_dict["decoder.norm_out.weight"]
|
||||
new_checkpoint["decoder.conv_norm_out.bias"] = vae_state_dict["decoder.norm_out.bias"]
|
||||
|
||||
new_checkpoint["quant_conv.weight"] = vae_state_dict["quant_conv.weight"]
|
||||
new_checkpoint["quant_conv.bias"] = vae_state_dict["quant_conv.bias"]
|
||||
new_checkpoint["post_quant_conv.weight"] = vae_state_dict["post_quant_conv.weight"]
|
||||
new_checkpoint["post_quant_conv.bias"] = vae_state_dict["post_quant_conv.bias"]
|
||||
|
||||
# Retrieves the keys for the encoder down blocks only
|
||||
num_down_blocks = len({".".join(layer.split(".")[:3]) for layer in vae_state_dict if "encoder.down" in layer})
|
||||
down_blocks = {
|
||||
layer_id: [key for key in vae_state_dict if f"down.{layer_id}" in key] for layer_id in range(num_down_blocks)
|
||||
}
|
||||
|
||||
# Retrieves the keys for the decoder up blocks only
|
||||
num_up_blocks = len({".".join(layer.split(".")[:3]) for layer in vae_state_dict if "decoder.up" in layer})
|
||||
up_blocks = {
|
||||
layer_id: [key for key in vae_state_dict if f"up.{layer_id}" in key] for layer_id in range(num_up_blocks)
|
||||
}
|
||||
|
||||
for i in range(num_down_blocks):
|
||||
resnets = [key for key in down_blocks[i] if f"down.{i}" in key and f"down.{i}.downsample" not in key]
|
||||
|
||||
if f"encoder.down.{i}.downsample.conv.weight" in vae_state_dict:
|
||||
new_checkpoint[f"encoder.down_blocks.{i}.downsamplers.0.conv.weight"] = vae_state_dict.pop(
|
||||
f"encoder.down.{i}.downsample.conv.weight"
|
||||
)
|
||||
new_checkpoint[f"encoder.down_blocks.{i}.downsamplers.0.conv.bias"] = vae_state_dict.pop(
|
||||
f"encoder.down.{i}.downsample.conv.bias"
|
||||
)
|
||||
|
||||
paths = renew_vae_resnet_paths(resnets)
|
||||
meta_path = {"old": f"down.{i}.block", "new": f"down_blocks.{i}.resnets"}
|
||||
assign_to_checkpoint(paths, new_checkpoint, vae_state_dict, additional_replacements=[meta_path], config=config)
|
||||
|
||||
mid_resnets = [key for key in vae_state_dict if "encoder.mid.block" in key]
|
||||
num_mid_res_blocks = 2
|
||||
for i in range(1, num_mid_res_blocks + 1):
|
||||
resnets = [key for key in mid_resnets if f"encoder.mid.block_{i}" in key]
|
||||
|
||||
paths = renew_vae_resnet_paths(resnets)
|
||||
meta_path = {"old": f"mid.block_{i}", "new": f"mid_block.resnets.{i - 1}"}
|
||||
assign_to_checkpoint(paths, new_checkpoint, vae_state_dict, additional_replacements=[meta_path], config=config)
|
||||
|
||||
mid_attentions = [key for key in vae_state_dict if "encoder.mid.attn" in key]
|
||||
paths = renew_vae_attention_paths(mid_attentions)
|
||||
meta_path = {"old": "mid.attn_1", "new": "mid_block.attentions.0"}
|
||||
assign_to_checkpoint(paths, new_checkpoint, vae_state_dict, additional_replacements=[meta_path], config=config)
|
||||
conv_attn_to_linear(new_checkpoint)
|
||||
|
||||
for i in range(num_up_blocks):
|
||||
block_id = num_up_blocks - 1 - i
|
||||
resnets = [
|
||||
key for key in up_blocks[block_id] if f"up.{block_id}" in key and f"up.{block_id}.upsample" not in key
|
||||
]
|
||||
|
||||
if f"decoder.up.{block_id}.upsample.conv.weight" in vae_state_dict:
|
||||
new_checkpoint[f"decoder.up_blocks.{i}.upsamplers.0.conv.weight"] = vae_state_dict[
|
||||
f"decoder.up.{block_id}.upsample.conv.weight"
|
||||
]
|
||||
new_checkpoint[f"decoder.up_blocks.{i}.upsamplers.0.conv.bias"] = vae_state_dict[
|
||||
f"decoder.up.{block_id}.upsample.conv.bias"
|
||||
]
|
||||
|
||||
paths = renew_vae_resnet_paths(resnets)
|
||||
meta_path = {"old": f"up.{block_id}.block", "new": f"up_blocks.{i}.resnets"}
|
||||
assign_to_checkpoint(paths, new_checkpoint, vae_state_dict, additional_replacements=[meta_path], config=config)
|
||||
|
||||
mid_resnets = [key for key in vae_state_dict if "decoder.mid.block" in key]
|
||||
num_mid_res_blocks = 2
|
||||
for i in range(1, num_mid_res_blocks + 1):
|
||||
resnets = [key for key in mid_resnets if f"decoder.mid.block_{i}" in key]
|
||||
|
||||
paths = renew_vae_resnet_paths(resnets)
|
||||
meta_path = {"old": f"mid.block_{i}", "new": f"mid_block.resnets.{i - 1}"}
|
||||
assign_to_checkpoint(paths, new_checkpoint, vae_state_dict, additional_replacements=[meta_path], config=config)
|
||||
|
||||
mid_attentions = [key for key in vae_state_dict if "decoder.mid.attn" in key]
|
||||
paths = renew_vae_attention_paths(mid_attentions)
|
||||
meta_path = {"old": "mid.attn_1", "new": "mid_block.attentions.0"}
|
||||
assign_to_checkpoint(paths, new_checkpoint, vae_state_dict, additional_replacements=[meta_path], config=config)
|
||||
conv_attn_to_linear(new_checkpoint)
|
||||
return new_checkpoint
|
||||
|
||||
|
||||
def convert_ldm_bert_checkpoint(checkpoint, config):
|
||||
def _copy_attn_layer(hf_attn_layer, pt_attn_layer):
|
||||
hf_attn_layer.q_proj.weight.data = pt_attn_layer.to_q.weight
|
||||
hf_attn_layer.k_proj.weight.data = pt_attn_layer.to_k.weight
|
||||
hf_attn_layer.v_proj.weight.data = pt_attn_layer.to_v.weight
|
||||
|
||||
hf_attn_layer.out_proj.weight = pt_attn_layer.to_out.weight
|
||||
hf_attn_layer.out_proj.bias = pt_attn_layer.to_out.bias
|
||||
|
||||
def _copy_linear(hf_linear, pt_linear):
|
||||
hf_linear.weight = pt_linear.weight
|
||||
hf_linear.bias = pt_linear.bias
|
||||
|
||||
def _copy_layer(hf_layer, pt_layer):
|
||||
# copy layer norms
|
||||
_copy_linear(hf_layer.self_attn_layer_norm, pt_layer[0][0])
|
||||
_copy_linear(hf_layer.final_layer_norm, pt_layer[1][0])
|
||||
|
||||
# copy attn
|
||||
_copy_attn_layer(hf_layer.self_attn, pt_layer[0][1])
|
||||
|
||||
# copy MLP
|
||||
pt_mlp = pt_layer[1][1]
|
||||
_copy_linear(hf_layer.fc1, pt_mlp.net[0][0])
|
||||
_copy_linear(hf_layer.fc2, pt_mlp.net[2])
|
||||
|
||||
def _copy_layers(hf_layers, pt_layers):
|
||||
for i, hf_layer in enumerate(hf_layers):
|
||||
if i != 0:
|
||||
i += i
|
||||
pt_layer = pt_layers[i : i + 2]
|
||||
_copy_layer(hf_layer, pt_layer)
|
||||
|
||||
hf_model = LDMBertModel(config).eval()
|
||||
|
||||
# copy embeds
|
||||
hf_model.model.embed_tokens.weight = checkpoint.transformer.token_emb.weight
|
||||
hf_model.model.embed_positions.weight.data = checkpoint.transformer.pos_emb.emb.weight
|
||||
|
||||
# copy layer norm
|
||||
_copy_linear(hf_model.model.layer_norm, checkpoint.transformer.norm)
|
||||
|
||||
# copy hidden layers
|
||||
_copy_layers(hf_model.model.layers, checkpoint.transformer.attn_layers.layers)
|
||||
|
||||
_copy_linear(hf_model.to_logits, checkpoint.transformer.to_logits)
|
||||
|
||||
return hf_model
|
||||
|
||||
|
||||
def convert_ldm_clip_checkpoint(checkpoint):
|
||||
text_model = CLIPTextModel.from_pretrained("openai/clip-vit-large-patch14")
|
||||
keys = list(checkpoint.keys())
|
||||
|
||||
text_model_dict = {}
|
||||
|
||||
for key in keys:
|
||||
if key.startswith("cond_stage_model.transformer"):
|
||||
text_model_dict[key[len("cond_stage_model.transformer.") :]] = checkpoint[key]
|
||||
|
||||
text_model.load_state_dict(text_model_dict)
|
||||
|
||||
return text_model
|
||||
|
||||
|
||||
textenc_conversion_lst = [
|
||||
("cond_stage_model.model.positional_embedding", "text_model.embeddings.position_embedding.weight"),
|
||||
("cond_stage_model.model.token_embedding.weight", "text_model.embeddings.token_embedding.weight"),
|
||||
("cond_stage_model.model.ln_final.weight", "text_model.final_layer_norm.weight"),
|
||||
("cond_stage_model.model.ln_final.bias", "text_model.final_layer_norm.bias"),
|
||||
]
|
||||
textenc_conversion_map = {x[0]: x[1] for x in textenc_conversion_lst}
|
||||
|
||||
textenc_transformer_conversion_lst = [
|
||||
# (stable-diffusion, HF Diffusers)
|
||||
("resblocks.", "text_model.encoder.layers."),
|
||||
("ln_1", "layer_norm1"),
|
||||
("ln_2", "layer_norm2"),
|
||||
(".c_fc.", ".fc1."),
|
||||
(".c_proj.", ".fc2."),
|
||||
(".attn", ".self_attn"),
|
||||
("ln_final.", "transformer.text_model.final_layer_norm."),
|
||||
("token_embedding.weight", "transformer.text_model.embeddings.token_embedding.weight"),
|
||||
("positional_embedding", "transformer.text_model.embeddings.position_embedding.weight"),
|
||||
]
|
||||
protected = {re.escape(x[0]): x[1] for x in textenc_transformer_conversion_lst}
|
||||
textenc_pattern = re.compile("|".join(protected.keys()))
|
||||
|
||||
|
||||
def convert_paint_by_example_checkpoint(checkpoint):
|
||||
config = CLIPVisionConfig.from_pretrained("openai/clip-vit-large-patch14")
|
||||
model = PaintByExampleImageEncoder(config)
|
||||
|
||||
keys = list(checkpoint.keys())
|
||||
|
||||
text_model_dict = {}
|
||||
|
||||
for key in keys:
|
||||
if key.startswith("cond_stage_model.transformer"):
|
||||
text_model_dict[key[len("cond_stage_model.transformer.") :]] = checkpoint[key]
|
||||
|
||||
# load clip vision
|
||||
model.model.load_state_dict(text_model_dict)
|
||||
|
||||
# load mapper
|
||||
keys_mapper = {
|
||||
k[len("cond_stage_model.mapper.res") :]: v
|
||||
for k, v in checkpoint.items()
|
||||
if k.startswith("cond_stage_model.mapper")
|
||||
}
|
||||
|
||||
MAPPING = {
|
||||
"attn.c_qkv": ["attn1.to_q", "attn1.to_k", "attn1.to_v"],
|
||||
"attn.c_proj": ["attn1.to_out.0"],
|
||||
"ln_1": ["norm1"],
|
||||
"ln_2": ["norm3"],
|
||||
"mlp.c_fc": ["ff.net.0.proj"],
|
||||
"mlp.c_proj": ["ff.net.2"],
|
||||
}
|
||||
|
||||
mapped_weights = {}
|
||||
for key, value in keys_mapper.items():
|
||||
prefix = key[: len("blocks.i")]
|
||||
suffix = key.split(prefix)[-1].split(".")[-1]
|
||||
name = key.split(prefix)[-1].split(suffix)[0][1:-1]
|
||||
mapped_names = MAPPING[name]
|
||||
|
||||
num_splits = len(mapped_names)
|
||||
for i, mapped_name in enumerate(mapped_names):
|
||||
new_name = ".".join([prefix, mapped_name, suffix])
|
||||
shape = value.shape[0] // num_splits
|
||||
mapped_weights[new_name] = value[i * shape : (i + 1) * shape]
|
||||
|
||||
model.mapper.load_state_dict(mapped_weights)
|
||||
|
||||
# load final layer norm
|
||||
model.final_layer_norm.load_state_dict(
|
||||
{
|
||||
"bias": checkpoint["cond_stage_model.final_ln.bias"],
|
||||
"weight": checkpoint["cond_stage_model.final_ln.weight"],
|
||||
}
|
||||
)
|
||||
|
||||
# load final proj
|
||||
model.proj_out.load_state_dict(
|
||||
{
|
||||
"bias": checkpoint["proj_out.bias"],
|
||||
"weight": checkpoint["proj_out.weight"],
|
||||
}
|
||||
)
|
||||
|
||||
# load uncond vector
|
||||
model.uncond_vector.data = torch.nn.Parameter(checkpoint["learnable_vector"])
|
||||
return model
|
||||
|
||||
|
||||
def convert_open_clip_checkpoint(checkpoint):
|
||||
text_model = CLIPTextModel.from_pretrained("stabilityai/stable-diffusion-2", subfolder="text_encoder")
|
||||
|
||||
keys = list(checkpoint.keys())
|
||||
|
||||
text_model_dict = {}
|
||||
|
||||
if "cond_stage_model.model.text_projection" in checkpoint:
|
||||
d_model = int(checkpoint["cond_stage_model.model.text_projection"].shape[0])
|
||||
else:
|
||||
d_model = 1024
|
||||
|
||||
text_model_dict["text_model.embeddings.position_ids"] = text_model.text_model.embeddings.get_buffer("position_ids")
|
||||
|
||||
for key in keys:
|
||||
if "resblocks.23" in key: # Diffusers drops the final layer and only uses the penultimate layer
|
||||
continue
|
||||
if key in textenc_conversion_map:
|
||||
text_model_dict[textenc_conversion_map[key]] = checkpoint[key]
|
||||
if key.startswith("cond_stage_model.model.transformer."):
|
||||
new_key = key[len("cond_stage_model.model.transformer.") :]
|
||||
if new_key.endswith(".in_proj_weight"):
|
||||
new_key = new_key[: -len(".in_proj_weight")]
|
||||
new_key = textenc_pattern.sub(lambda m: protected[re.escape(m.group(0))], new_key)
|
||||
text_model_dict[new_key + ".q_proj.weight"] = checkpoint[key][:d_model, :]
|
||||
text_model_dict[new_key + ".k_proj.weight"] = checkpoint[key][d_model : d_model * 2, :]
|
||||
text_model_dict[new_key + ".v_proj.weight"] = checkpoint[key][d_model * 2 :, :]
|
||||
elif new_key.endswith(".in_proj_bias"):
|
||||
new_key = new_key[: -len(".in_proj_bias")]
|
||||
new_key = textenc_pattern.sub(lambda m: protected[re.escape(m.group(0))], new_key)
|
||||
text_model_dict[new_key + ".q_proj.bias"] = checkpoint[key][:d_model]
|
||||
text_model_dict[new_key + ".k_proj.bias"] = checkpoint[key][d_model : d_model * 2]
|
||||
text_model_dict[new_key + ".v_proj.bias"] = checkpoint[key][d_model * 2 :]
|
||||
else:
|
||||
new_key = textenc_pattern.sub(lambda m: protected[re.escape(m.group(0))], new_key)
|
||||
|
||||
text_model_dict[new_key] = checkpoint[key]
|
||||
|
||||
text_model.load_state_dict(text_model_dict)
|
||||
|
||||
return text_model
|
||||
|
||||
|
||||
def stable_unclip_image_encoder(original_config):
|
||||
"""
|
||||
Returns the image processor and clip image encoder for the img2img unclip pipeline.
|
||||
|
||||
We currently know of two types of stable unclip models which separately use the clip and the openclip image
|
||||
encoders.
|
||||
"""
|
||||
|
||||
image_embedder_config = original_config.model.params.embedder_config
|
||||
|
||||
sd_clip_image_embedder_class = image_embedder_config.target
|
||||
sd_clip_image_embedder_class = sd_clip_image_embedder_class.split(".")[-1]
|
||||
|
||||
if sd_clip_image_embedder_class == "ClipImageEmbedder":
|
||||
clip_model_name = image_embedder_config.params.model
|
||||
|
||||
if clip_model_name == "ViT-L/14":
|
||||
feature_extractor = CLIPImageProcessor()
|
||||
image_encoder = CLIPVisionModelWithProjection.from_pretrained("openai/clip-vit-large-patch14")
|
||||
else:
|
||||
raise NotImplementedError(f"Unknown CLIP checkpoint name in stable diffusion checkpoint {clip_model_name}")
|
||||
|
||||
elif sd_clip_image_embedder_class == "FrozenOpenCLIPImageEmbedder":
|
||||
feature_extractor = CLIPImageProcessor()
|
||||
image_encoder = CLIPVisionModelWithProjection.from_pretrained("laion/CLIP-ViT-H-14-laion2B-s32B-b79K")
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
f"Unknown CLIP image embedder class in stable diffusion checkpoint {sd_clip_image_embedder_class}"
|
||||
)
|
||||
|
||||
return feature_extractor, image_encoder
|
||||
|
||||
|
||||
def stable_unclip_image_noising_components(
|
||||
original_config, clip_stats_path: Optional[str] = None, device: Optional[str] = None
|
||||
):
|
||||
"""
|
||||
Returns the noising components for the img2img and txt2img unclip pipelines.
|
||||
|
||||
Converts the stability noise augmentor into
|
||||
1. a `StableUnCLIPImageNormalizer` for holding the CLIP stats
|
||||
2. a `DDPMScheduler` for holding the noise schedule
|
||||
|
||||
If the noise augmentor config specifies a clip stats path, the `clip_stats_path` must be provided.
|
||||
"""
|
||||
noise_aug_config = original_config.model.params.noise_aug_config
|
||||
noise_aug_class = noise_aug_config.target
|
||||
noise_aug_class = noise_aug_class.split(".")[-1]
|
||||
|
||||
if noise_aug_class == "CLIPEmbeddingNoiseAugmentation":
|
||||
noise_aug_config = noise_aug_config.params
|
||||
embedding_dim = noise_aug_config.timestep_dim
|
||||
max_noise_level = noise_aug_config.noise_schedule_config.timesteps
|
||||
beta_schedule = noise_aug_config.noise_schedule_config.beta_schedule
|
||||
|
||||
image_normalizer = StableUnCLIPImageNormalizer(embedding_dim=embedding_dim)
|
||||
image_noising_scheduler = DDPMScheduler(num_train_timesteps=max_noise_level, beta_schedule=beta_schedule)
|
||||
|
||||
if "clip_stats_path" in noise_aug_config:
|
||||
if clip_stats_path is None:
|
||||
raise ValueError("This stable unclip config requires a `clip_stats_path`")
|
||||
|
||||
clip_mean, clip_std = torch.load(clip_stats_path, map_location=device)
|
||||
clip_mean = clip_mean[None, :]
|
||||
clip_std = clip_std[None, :]
|
||||
|
||||
clip_stats_state_dict = {
|
||||
"mean": clip_mean,
|
||||
"std": clip_std,
|
||||
}
|
||||
|
||||
image_normalizer.load_state_dict(clip_stats_state_dict)
|
||||
else:
|
||||
raise NotImplementedError(f"Unknown noise augmentor class: {noise_aug_class}")
|
||||
|
||||
return image_normalizer, image_noising_scheduler
|
||||
|
||||
|
||||
def convert_controlnet_checkpoint(
|
||||
checkpoint, original_config, checkpoint_path, image_size, upcast_attention, extract_ema
|
||||
):
|
||||
ctrlnet_config = create_unet_diffusers_config(original_config, image_size=image_size, controlnet=True)
|
||||
ctrlnet_config["upcast_attention"] = upcast_attention
|
||||
|
||||
ctrlnet_config.pop("sample_size")
|
||||
|
||||
controlnet_model = ControlNetModel(**ctrlnet_config)
|
||||
|
||||
converted_ctrl_checkpoint = convert_ldm_unet_checkpoint(
|
||||
checkpoint, ctrlnet_config, path=checkpoint_path, extract_ema=extract_ema, controlnet=True
|
||||
)
|
||||
|
||||
controlnet_model.load_state_dict(converted_ctrl_checkpoint)
|
||||
|
||||
return controlnet_model
|
||||
@@ -5,7 +5,7 @@
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@@ -23,132 +23,112 @@ from safetensors.torch import load_file
|
||||
from diffusers import StableDiffusionPipeline
|
||||
import pdb
|
||||
|
||||
|
||||
|
||||
def convert_motion_lora_ckpt_to_diffusers(pipeline, state_dict, alpha=1.0):
|
||||
# directly update weight in diffusers model
|
||||
for key in state_dict:
|
||||
# only process lora down key
|
||||
if "up." in key: continue
|
||||
|
||||
up_key = key.replace(".down.", ".up.")
|
||||
model_key = key.replace("processor.", "").replace("_lora", "").replace("down.", "").replace("up.", "")
|
||||
model_key = model_key.replace("to_out.", "to_out.0.")
|
||||
layer_infos = model_key.split(".")[:-1]
|
||||
|
||||
curr_layer = pipeline.unet
|
||||
while len(layer_infos) > 0:
|
||||
temp_name = layer_infos.pop(0)
|
||||
curr_layer = curr_layer.__getattr__(temp_name)
|
||||
|
||||
weight_down = state_dict[key]
|
||||
weight_up = state_dict[up_key]
|
||||
curr_layer.weight.data += alpha * torch.mm(weight_up, weight_down).to(curr_layer.weight.data.device)
|
||||
|
||||
return pipeline
|
||||
|
||||
|
||||
|
||||
def convert_lora(pipeline, state_dict, LORA_PREFIX_UNET="lora_unet", LORA_PREFIX_TEXT_ENCODER="lora_te", alpha=0.6):
|
||||
# load base model
|
||||
# pipeline = StableDiffusionPipeline.from_pretrained(base_model_path, torch_dtype=torch.float32)
|
||||
# load base model
|
||||
# pipeline = StableDiffusionPipeline.from_pretrained(base_model_path, torch_dtype=torch.float32)
|
||||
|
||||
# load LoRA weight from .safetensors
|
||||
# state_dict = load_file(checkpoint_path)
|
||||
# load LoRA weight from .safetensors
|
||||
# state_dict = load_file(checkpoint_path)
|
||||
|
||||
visited = []
|
||||
visited = []
|
||||
# directly update weight in diffusers model
|
||||
for lora_name in state_dict:
|
||||
# it is suggested to print out the key, it usually will be something like below
|
||||
# "lora_te_text_model_encoder_layers_0_self_attn_k_proj.lora_down.weight"
|
||||
|
||||
# directly update weight in diffusers model
|
||||
for key in state_dict:
|
||||
# it is suggested to print out the key, it usually will be something like below
|
||||
# "lora_te_text_model_encoder_layers_0_self_attn_k_proj.lora_down.weight"
|
||||
# as we have set the alpha beforehand, so just skip
|
||||
if ".alpha" in lora_name or lora_name in visited:
|
||||
continue
|
||||
|
||||
# as we have set the alpha beforehand, so just skip
|
||||
if ".alpha" in key or key in visited:
|
||||
continue
|
||||
if "te" in lora_name:
|
||||
if "lora_te1" in key:
|
||||
LORA_PREFIX_TEXT_ENCODER = "lora_te1"
|
||||
elif "lora_te2" in key:
|
||||
LORA_PREFIX_TEXT_ENCODER = "lora_te2"
|
||||
else:
|
||||
LORA_PREFIX_TEXT_ENCODER = "lora_te"
|
||||
layer_infos = key.split(".")[0].split(LORA_PREFIX_TEXT_ENCODER + "_")[-1].split("_")
|
||||
|
||||
|
||||
if "text" in key:
|
||||
layer_infos = key.split(".")[0].split(LORA_PREFIX_TEXT_ENCODER + "_")[-1].split("_")
|
||||
curr_layer = pipeline.text_encoder
|
||||
else:
|
||||
layer_infos = key.split(".")[0].split(LORA_PREFIX_UNET + "_")[-1].split("_")
|
||||
curr_layer = pipeline.unet
|
||||
else:
|
||||
layer_infos = key.split(".")[0].split(LORA_PREFIX_UNET + "_")[-1].split("_")
|
||||
curr_layer = pipeline.unet
|
||||
|
||||
# find the target layer
|
||||
temp_name = layer_infos.pop(0)
|
||||
while len(layer_infos) > -1:
|
||||
try:
|
||||
curr_layer = curr_layer.__getattr__(temp_name)
|
||||
if len(layer_infos) > 0:
|
||||
temp_name = layer_infos.pop(0)
|
||||
elif len(layer_infos) == 0:
|
||||
break
|
||||
except Exception:
|
||||
if len(temp_name) > 0:
|
||||
temp_name += "_" + layer_infos.pop(0)
|
||||
else:
|
||||
temp_name = layer_infos.pop(0)
|
||||
# find the target layer
|
||||
temp_name = layer_infos.pop(0)
|
||||
while len(layer_infos) > -1:
|
||||
try:
|
||||
curr_layer = curr_layer.__getattr__(temp_name)
|
||||
if len(layer_infos) > 0:
|
||||
temp_name = layer_infos.pop(0)
|
||||
elif len(layer_infos) == 0:
|
||||
break
|
||||
except Exception:
|
||||
if len(temp_name) > 0:
|
||||
temp_name += "_" + layer_infos.pop(0)
|
||||
else:
|
||||
temp_name = layer_infos.pop(0)
|
||||
|
||||
pair_keys = []
|
||||
if "lora_down" in key:
|
||||
pair_keys.append(key.replace("lora_down", "lora_up"))
|
||||
pair_keys.append(key)
|
||||
else:
|
||||
pair_keys.append(key)
|
||||
pair_keys.append(key.replace("lora_up", "lora_down"))
|
||||
pair_keys = []
|
||||
if "lora.down" in key:
|
||||
pair_keys.append(key.replace("lora.down", "lora.up"))
|
||||
pair_keys.append(key)
|
||||
else:
|
||||
pair_keys.append(key)
|
||||
pair_keys.append(key.replace("lora.up", "lora.down"))
|
||||
|
||||
# update weight
|
||||
if len(state_dict[pair_keys[0]].shape) == 4:
|
||||
weight_up = state_dict[pair_keys[0]].squeeze(3).squeeze(2).to(torch.float32)
|
||||
weight_down = state_dict[pair_keys[1]].squeeze(3).squeeze(2).to(torch.float32)
|
||||
curr_layer.weight.data += alpha * torch.mm(weight_up, weight_down).unsqueeze(2).unsqueeze(3).to(curr_layer.weight.data.device)
|
||||
else:
|
||||
weight_up = state_dict[pair_keys[0]].to(torch.float32)
|
||||
weight_down = state_dict[pair_keys[1]].to(torch.float32)
|
||||
curr_layer.weight.data += alpha * torch.mm(weight_up, weight_down).to(curr_layer.weight.data.device)
|
||||
# update weight
|
||||
if len(state_dict[pair_keys[0]].shape) == 4:
|
||||
weight_up = state_dict[pair_keys[0]].squeeze(3).squeeze(2).to(torch.float32)
|
||||
weight_down = state_dict[pair_keys[1]].squeeze(3).squeeze(2).to(torch.float32)
|
||||
curr_layer.weight.data += alpha * torch.mm(weight_up, weight_down).unsqueeze(2).unsqueeze(3).to(curr_layer.weight.data.device)
|
||||
else:
|
||||
weight_up = state_dict[pair_keys[0]].to(torch.float32)
|
||||
weight_down = state_dict[pair_keys[1]].to(torch.float32)
|
||||
curr_layer.weight.data += alpha * torch.mm(weight_up, weight_down).to(curr_layer.weight.data.device)
|
||||
|
||||
# update visited list
|
||||
for item in pair_keys:
|
||||
visited.append(item)
|
||||
# update visited list
|
||||
for item in pair_keys:
|
||||
visited.append(item)
|
||||
|
||||
return pipeline
|
||||
return pipeline
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
parser.add_argument(
|
||||
"--base_model_path", default=None, type=str, required=True, help="Path to the base model in diffusers format."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--checkpoint_path", default=None, type=str, required=True, help="Path to the checkpoint to convert."
|
||||
)
|
||||
parser.add_argument("--dump_path", default=None, type=str, required=True, help="Path to the output model.")
|
||||
parser.add_argument(
|
||||
"--lora_prefix_unet", default="lora_unet", type=str, help="The prefix of UNet weight in safetensors"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--lora_prefix_text_encoder",
|
||||
default="lora_te",
|
||||
type=str,
|
||||
help="The prefix of text encoder weight in safetensors",
|
||||
)
|
||||
parser.add_argument("--alpha", default=0.75, type=float, help="The merging ratio in W = W0 + alpha * deltaW")
|
||||
parser.add_argument(
|
||||
"--to_safetensors", action="store_true", help="Whether to store pipeline in safetensors format or not."
|
||||
)
|
||||
parser.add_argument("--device", type=str, help="Device to use (e.g. cpu, cuda:0, cuda:1, etc.)")
|
||||
parser.add_argument(
|
||||
"--base_model_path", default=None, type=str, required=True, help="Path to the base model in diffusers format."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--checkpoint_path", default=None, type=str, required=True, help="Path to the checkpoint to convert."
|
||||
)
|
||||
parser.add_argument("--dump_path", default=None, type=str, required=True, help="Path to the output model.")
|
||||
parser.add_argument(
|
||||
"--lora_prefix_unet", default="lora_unet", type=str, help="The prefix of UNet weight in safetensors"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--lora_prefix_text_encoder",
|
||||
default="lora_te",
|
||||
type=str,
|
||||
help="The prefix of text encoder weight in safetensors",
|
||||
)
|
||||
parser.add_argument("--alpha", default=0.75, type=float, help="The merging ratio in W = W0 + alpha * deltaW")
|
||||
parser.add_argument(
|
||||
"--to_safetensors", action="store_true", help="Whether to store pipeline in safetensors format or not."
|
||||
)
|
||||
parser.add_argument("--device", type=str, help="Device to use (e.g. cpu, cuda:0, cuda:1, etc.)")
|
||||
|
||||
args = parser.parse_args()
|
||||
args = parser.parse_args()
|
||||
|
||||
base_model_path = args.base_model_path
|
||||
checkpoint_path = args.checkpoint_path
|
||||
dump_path = args.dump_path
|
||||
lora_prefix_unet = args.lora_prefix_unet
|
||||
lora_prefix_text_encoder = args.lora_prefix_text_encoder
|
||||
alpha = args.alpha
|
||||
base_model_path = args.base_model_path
|
||||
checkpoint_path = args.checkpoint_path
|
||||
dump_path = args.dump_path
|
||||
lora_prefix_unet = args.lora_prefix_unet
|
||||
lora_prefix_text_encoder = args.lora_prefix_text_encoder
|
||||
alpha = args.alpha
|
||||
|
||||
pipe = convert(base_model_path, checkpoint_path, lora_prefix_unet, lora_prefix_text_encoder, alpha)
|
||||
pipe = convert(base_model_path, checkpoint_path, lora_prefix_unet, lora_prefix_text_encoder, alpha)
|
||||
|
||||
pipe = pipe.to(args.device)
|
||||
pipe.save_pretrained(args.dump_path, safe_serialization=args.to_safetensors)
|
||||
pipe = pipe.to(args.device)
|
||||
pipe.save_pretrained(args.dump_path, safe_serialization=args.to_safetensors)
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
1095
animatediff/utils/xl_lora_util.py
Normal file
1095
animatediff/utils/xl_lora_util.py
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user