Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.idea/
build/
video_illustration.egg-info/
**/__pycache__
**/.vscode
160 changes: 160 additions & 0 deletions video_illustration/cross_image_attention/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
16 changes: 16 additions & 0 deletions video_illustration/cross_image_attention/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Cross Image ICL

Implementation copied from https://github.com/garibida/cross-image-attention.

In this code, different images are used as key,query,value instead of style and struct images.

for query set query_image_path
for key set prompt_image_path
for value set prompt_gt_image_path

for this code set use_masked_adain=False (was not tested with masked adain)

Setup and usage are the same as the above repository.

## Example
python run.py --prompt_gt_image_path ../../images/airplane/000000196185_mask_512.jpg --prompt_image_path ../../images/airplane/000000196185_512.jpg --query_image_path ../../images/airplane/000000196185_512.jpg --output_path ../../results/icl_recover_results_with_cross_img_basecode --use_masked_adain False --load_latents False --num_timesteps 70 --skip_steps 0 --contrast_strength 1.67 --swap_guidance_scale 3.5
Empty file.
189 changes: 189 additions & 0 deletions video_illustration/cross_image_attention/appearance_transfer_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
from typing import List, Optional, Callable

import torch
import torch.nn.functional as F

from config import RunConfig
from constants import OUT_INDEX, K_INDEX, Q_INDEX, V_INDEX
from models.stable_diffusion import CrossImageAttentionStableDiffusionPipeline
from utils import attention_utils
from utils.adain import masked_adain, adain
from utils.model_utils import get_stable_diffusion_model
from utils.segmentation import Segmentor


class AppearanceTransferModel:

def __init__(self, config: RunConfig, pipe: Optional[CrossImageAttentionStableDiffusionPipeline] = None):
self.config = config
self.pipe = get_stable_diffusion_model() if pipe is None else pipe
self.register_attention_control()
self.segmentor = Segmentor(prompt=config.prompt, object_nouns=[config.object_noun])
self.latents_query, self.latents_key, self.latents_value = None, None, None
self.zs_query, self.zs_key, self.zs_value = None, None, None

self.image_query_mask_32, self.image_query_mask_64 = None, None
self.image_key_mask_32, self.image_key_mask_64 = None, None
self.image_value_mask_32, self.image_value_mask_64 = None, None

self.enable_edit = False
self.step = 0

def set_latents(self, latents_query: torch.Tensor, latents_key: torch.Tensor, latents_value: torch.Tensor):
self.latents_query = latents_query
self.latents_key = latents_key
self.latents_value = latents_value

def set_noise(self, zs_query: torch.Tensor, zs_key: torch.Tensor, zs_value: torch.Tensor):
self.zs_query = zs_query
self.zs_key = zs_key
self.zs_value = zs_value

def set_masks(self, masks: List[torch.Tensor]):
self.image_app_mask_32, self.image_struct_mask_32, self.image_app_mask_64, self.image_struct_mask_64 = masks

def get_adain_callback(self):

def callback(st: int, timestep: int, latents: torch.FloatTensor) -> Callable:
self.step = st
# Compute the masks using prompt mixing self-segmentation and use the masks for AdaIN operation
if self.config.use_masked_adain and self.step == self.config.adain_range.start:
masks = self.segmentor.get_object_masks()
self.set_masks(masks)
# Apply AdaIN operation using the computed masks
if self.config.adain_range.start <= self.step < self.config.adain_range.end:
if self.config.use_masked_adain:
latents[OUT_INDEX] = masked_adain(latents[OUT_INDEX], latents[V_INDEX], self.image_struct_mask_64, self.image_app_mask_64)
else:
latents[OUT_INDEX] = adain(latents[OUT_INDEX], latents[V_INDEX])

return callback

def register_attention_control(self):

model_self = self

class AttentionProcessor:

def __init__(self, place_in_unet: str):
self.place_in_unet = place_in_unet
if not hasattr(F, "scaled_dot_product_attention"):
raise ImportError("AttnProcessor2_0 requires torch 2.0, to use it, please upgrade torch to 2.0.")

def __call__(self,
attn,
hidden_states: torch.Tensor,
encoder_hidden_states: Optional[torch.Tensor] = None,
attention_mask=None,
temb=None,
perform_swap: bool = False):

residual = hidden_states

if attn.spatial_norm is not None:
hidden_states = attn.spatial_norm(hidden_states, temb)

input_ndim = hidden_states.ndim

if input_ndim == 4:
batch_size, channel, height, width = hidden_states.shape
hidden_states = hidden_states.view(batch_size, channel, height * width).transpose(1, 2)

batch_size, sequence_length, _ = (
hidden_states.shape if encoder_hidden_states is None else encoder_hidden_states.shape
)

if attention_mask is not None:
attention_mask = attn.prepare_attention_mask(attention_mask, sequence_length, batch_size)
attention_mask = attention_mask.view(batch_size, attn.heads, -1, attention_mask.shape[-1])

if attn.group_norm is not None:
hidden_states = attn.group_norm(hidden_states.transpose(1, 2)).transpose(1, 2)

query = attn.to_q(hidden_states)

is_cross = encoder_hidden_states is not None
if not is_cross:
encoder_hidden_states = hidden_states
elif attn.norm_cross:
encoder_hidden_states = attn.norm_encoder_hidden_states(encoder_hidden_states)

key = attn.to_k(encoder_hidden_states)
value = attn.to_v(encoder_hidden_states)

inner_dim = key.shape[-1]
head_dim = inner_dim // attn.heads
should_mix = False

# Potentially apply our cross image attention operation
# To do so, we need to be in a self-attention layer in the decoder part of the denoising network
if perform_swap and not is_cross and "up" in self.place_in_unet and model_self.enable_edit:
# if attention_utils.should_mix_keys_and_values(model_self, hidden_states):
should_mix = True
key[OUT_INDEX] = key[K_INDEX]
value[OUT_INDEX] = value[V_INDEX]

# if model_self.step % 5 == 0 and model_self.step < 40:
# # Inject the structure's keys and values
# key[OUT_INDEX] = key[STRUCT_INDEX]
# value[OUT_INDEX] = value[STRUCT_INDEX]
# else:
# # Inject the appearance's keys and values
# key[OUT_INDEX] = key[STYLE_INDEX]
# value[OUT_INDEX] = value[STYLE_INDEX]

query = query.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
key = key.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
value = value.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)

# Compute the cross attention and apply our contrasting operation
hidden_states, attn_weight = attention_utils.compute_scaled_dot_product_attention(
query, key, value,
edit_map=perform_swap and model_self.enable_edit and should_mix,
is_cross=is_cross,
contrast_strength=model_self.config.contrast_strength,
tau = model_self.config.tau,
)

# Update attention map for segmentation
if model_self.config.use_masked_adain and model_self.step == model_self.config.adain_range.start - 1:
model_self.segmentor.update_attention(attn_weight, is_cross)

hidden_states = hidden_states.transpose(1, 2).reshape(batch_size, -1, attn.heads * head_dim)
hidden_states = hidden_states.to(query[OUT_INDEX].dtype)

# linear proj
hidden_states = attn.to_out[0](hidden_states)
# dropout
hidden_states = attn.to_out[1](hidden_states)

if input_ndim == 4:
hidden_states = hidden_states.transpose(-1, -2).reshape(batch_size, channel, height, width)

if attn.residual_connection:
hidden_states = hidden_states + residual

hidden_states = hidden_states / attn.rescale_output_factor

return hidden_states

def register_recr(net_, count, place_in_unet):
if net_.__class__.__name__ == 'ResnetBlock2D':
pass
if net_.__class__.__name__ == 'Attention':
net_.set_processor(AttentionProcessor(place_in_unet + f"_{count + 1}"))
return count + 1
elif hasattr(net_, 'children'):
for net__ in net_.children():
count = register_recr(net__, count, place_in_unet)
return count

cross_att_count = 0
sub_nets = self.pipe.unet.named_children()
for net in sub_nets:
if "down" in net[0]:
cross_att_count += register_recr(net[1], 0, "down")
elif "up" in net[0]:
cross_att_count += register_recr(net[1], 0, "up")
elif "mid" in net[0]:
cross_att_count += register_recr(net[1], 0, "mid")
Loading