-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdramatic_loop_video_pipeline.py
More file actions
202 lines (190 loc) · 8.67 KB
/
dramatic_loop_video_pipeline.py
File metadata and controls
202 lines (190 loc) · 8.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#!/usr/bin/env python3
"""
Dramatic Loop Video Pipeline
Creates a looping video with dramatic, non-linear, combined effects (echo, reverb, distortion).
"""
import os
import sys
import subprocess
import numpy as np
from pathlib import Path
from PIL import Image
class DramaticLoopVideoProcessor:
def __init__(self, input_dir="input", output_dir="output", video_dir="video"):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.video_dir = Path(video_dir)
self.bitmap_dir = Path("input-bitmap")
self.raw_dir = Path("output-raw")
self.audio_dir = Path("temp-audio")
self.resized_dir = Path("resized")
# Create all necessary directories
for dir_path in [self.output_dir, self.video_dir, self.bitmap_dir, self.raw_dir, self.audio_dir, self.resized_dir]:
dir_path.mkdir(exist_ok=True)
def convert_to_bitmap(self):
supported_formats = {'.jpg', '.jpeg', '.png'}
image_files = [f for f in self.input_dir.iterdir() if f.is_file() and f.suffix.lower() in supported_formats]
if not image_files:
print(f"No JPG or PNG files found in '{self.input_dir}' directory.")
return False
for file_path in image_files:
with Image.open(file_path) as img:
if img.mode != 'RGB':
img = img.convert('RGB')
output_filename = file_path.stem + '.bmp'
output_path = self.bitmap_dir / output_filename
img.save(output_path, 'BMP')
return True
def bitmap_to_raw_audio(self):
bitmap_files = list(self.bitmap_dir.glob("*.bmp"))
if not bitmap_files:
print(f"No bitmap files found in '{self.bitmap_dir}' directory.")
return False
for bitmap_file in bitmap_files:
with open(bitmap_file, 'rb') as f:
bitmap_data = f.read()
audio_data = bitmap_data[255:]
audio_array = np.frombuffer(audio_data, dtype=np.uint8)
audio_16bit = ((audio_array.astype(np.float32) - 128) * 256).astype(np.int16)
audio_filename = bitmap_file.stem + '.raw'
audio_path = self.audio_dir / audio_filename
with open(audio_path, 'wb') as f:
f.write(audio_16bit.tobytes())
return True
def apply_dramatic_combined_effects(self, num_iterations=300):
print(f"Applying dramatic, non-linear, combined effects for {num_iterations} frames...")
audio_files = list(self.audio_dir.glob("*.raw"))
if not audio_files:
print(f"No audio files found in '{self.audio_dir}' directory.")
return False
for audio_file in audio_files:
for i in range(num_iterations):
# Non-linear curve: use sine for smooth in/out
t = i / (num_iterations - 1)
if t <= 0.5:
curve = np.sin(np.pi * t)
else:
curve = np.sin(np.pi * (1 - t))
# Dramatic ranges
delay = 0.05 + 1.5 * (curve ** 2) # 0.05 to ~1.55
decay = 0.01 + 0.98 * (curve ** 2.5) # 0.01 to ~0.99
room_size = 0.1 + 0.8 * (curve ** 1.5) # 0.1 to ~0.9
reverb = 0.1 + 0.8 * (curve ** 2)
overdrive = 5 + 45 * (curve ** 2.2) # 5 to 50
# Output filename
output_file = self.raw_dir / f"{audio_file.stem}_dramaloop_{i:04d}.raw"
# Build SoX command: echo, reverb, overdrive
cmd = [
"sox", "-r", "44100", "-e", "signed", "-b", "16", "-c", "1",
str(audio_file), str(output_file),
"echo", "0.8", "0.7", f"{delay:.3f}", f"{decay:.3f}",
"reverb", f"{room_size:.2f}", "0.5", f"{reverb:.2f}", "0.5",
"overdrive", f"{overdrive:.1f}"
]
try:
subprocess.run(cmd, capture_output=True, check=True)
if i % 50 == 0:
print(f" ✓ Frame {i+1}/{num_iterations} (curve={curve:.3f}, delay={delay:.2f}, decay={decay:.2f}, reverb={reverb:.2f}, overdrive={overdrive:.1f})")
except subprocess.CalledProcessError as e:
print(f" ✗ Error in frame {i+1}: {e.stderr}")
return False
return True
def fix_bitmap_headers(self):
raw_files = sorted(self.raw_dir.glob("*.raw"))
bitmap_files = list(self.bitmap_dir.glob("*.bmp"))
if not raw_files or not bitmap_files:
print(f"No raw or bitmap files found.")
return False
for raw_file in raw_files:
base_name = raw_file.stem.split('_dramaloop_')[0]
original_filename = base_name + '.bmp'
original_file = self.bitmap_dir / original_filename
if not original_file.exists():
continue
with open(original_file, 'rb') as f:
header = f.read(255)
with open(raw_file, 'rb') as f:
raw_content = f.read()
raw_array = np.frombuffer(raw_content, dtype=np.int16)
bitmap_array = ((raw_array.astype(np.float32) / 256) + 128).astype(np.uint8)
bitmap_content = bitmap_array.tobytes()
output_filename = raw_file.stem + '.bmp'
output_file = self.output_dir / output_filename
with open(output_file, 'wb') as f:
f.write(header + bitmap_content)
return True
def resize_images(self, target_width=1920):
input_path = self.output_dir
resized_path = self.resized_dir
resized_path.mkdir(exist_ok=True)
image_files = sorted(list(input_path.glob("*.bmp")))
if not image_files:
print(f"No images found in {input_path}")
return False
for i, image_file in enumerate(image_files):
with Image.open(image_file) as img:
aspect_ratio = img.height / img.width
new_height = int(target_width * aspect_ratio)
resized_img = img.resize((target_width, new_height), Image.Resampling.LANCZOS)
output_filename = image_file.name
output_path = resized_path / output_filename
resized_img.save(output_path, 'BMP')
return True
def create_video(self, fps=60):
resized_path = self.resized_dir
video_path = self.video_dir
video_path.mkdir(exist_ok=True)
image_files = sorted(list(resized_path.glob("*.bmp")))
if not image_files:
print(f"No resized images found in {resized_path}")
return False
base_name = image_files[0].stem.split('_dramaloop_')[0]
video_filename = f"{base_name}_dramatic_loop_{fps}fps.mp4"
video_file = video_path / video_filename
input_pattern = str(resized_path / f"{base_name}_dramaloop_%04d.bmp")
cmd = [
"ffmpeg", "-y", "-framerate", str(fps), "-i", input_pattern,
"-c:v", "libx264", "-preset", "fast", "-crf", "23", "-pix_fmt", "yuv420p", str(video_file)
]
try:
subprocess.run(cmd, capture_output=True, check=True)
print(f"✓ Dramatic loop video created: {video_file}")
return True
except subprocess.CalledProcessError as e:
print(f"✗ Error creating video: {e.stderr}")
return False
def run_pipeline(self, num_iterations=300, fps=60, target_width=1920):
print("Starting dramatic loop video pipeline...")
if not self.convert_to_bitmap():
print("Failed at bitmap conversion.")
return
if not self.bitmap_to_raw_audio():
print("Failed at bitmap to raw audio.")
return
if not self.apply_dramatic_combined_effects(num_iterations):
print("Failed at applying effects.")
return
if not self.fix_bitmap_headers():
print("Failed at fixing headers.")
return
if not self.resize_images(target_width):
print("Failed at resizing images.")
return
if not self.create_video(fps):
print("Failed at video creation.")
return
print("Pipeline completed successfully!")
def main():
num_iterations = 300
fps = 60
target_width = 1920
if len(sys.argv) > 1:
num_iterations = int(sys.argv[1])
if len(sys.argv) > 2:
fps = int(sys.argv[2])
if len(sys.argv) > 3:
target_width = int(sys.argv[3])
processor = DramaticLoopVideoProcessor()
processor.run_pipeline(num_iterations, fps, target_width)
if __name__ == "__main__":
main()