Skip to content

Commit 2b1cd60

Browse files
authored
파일업로드 및 트랜스코딩 파이프라인 작성 (#79)
* Chore : 컨테이너 이름지정 삭제 * Feat : 카프카 컨테이너 작성 * Feat : 더 이상 버전 불필요 * Feat : 컨테이너 이름 및 플랫폼 지정 * Feat : 버전 다운 그레이드, ZooKeeper와 함께 사용하여 메시지 생성 소비 확인 * Fix : 업로드땐 HEAD 불필요 * Feat : ffmpeg 워커 추가 * Feat : ffmpeg 스크립트 수정필요 * Feat : 트랜스코딩을 위한 스크립트 작성 * Feat :sh/ffmpeg/transcode.py를 실행해 같은 폴더이 있는 input.mp4영상을 output.mp4영상으로 트랜스코딩하는 도커파일 작성 * Feat :카프카와 도커 컨테이너 토픽을 이용해 메시지 송수신 완료 * Feat : S3이벤트 수신완료 * Feat : S3이벤트 수신, 다운로드 완료 * Feat : ffmpeg설치 * Feat : 영상파일인지 판별, DASH인코딩 함수 추가 * Feat : 트랜스코딩 완료후 업로드 완료 * Feat : 화질별 트랜스코딩 완료 * Feat : 파이프라인 설정 완료
1 parent 4c030a5 commit 2b1cd60

File tree

8 files changed

+231
-15
lines changed

8 files changed

+231
-15
lines changed

Dockerfile

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
FROM python:3.11-slim
2+
3+
WORKDIR /app
4+
5+
COPY requirements.txt .
6+
RUN apt-get update && apt-get install -y ffmpeg
7+
RUN pip install --no-cache-dir -r requirements.txt
8+
9+
COPY consumer.py .
10+
11+
ENV KAFKA_TOPIC=s3-events
12+
ENV KAFKA_BOOTSTRAP=kafka:9092
13+
ENV MINIO_ENDPOINT=http://minio:9000
14+
ENV MINIO_ACCESS_KEY=minioadmin
15+
ENV MINIO_SECRET_KEY=minioadmin
16+
ENV DOWNLOAD_DIR=/downloads
17+
18+
VOLUME ["/downloads"]
19+
20+
CMD ["python", "consumer.py"]

back/src/main/java/com/back/domain/file/controller/VideoController.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,17 @@ public class VideoController {
1818

1919
// 업로드용 Presigned URL
2020
@GetMapping("/videos/upload-url")
21-
public URL getUploadUrl(@RequestParam String fileName) {
22-
return videoService.generateUploadUrl("test-bucket", fileName);
21+
public URL getUploadUrl(@RequestParam String bucket, @RequestParam String fileName) {
22+
return videoService.generateUploadUrl(bucket, fileName);
2323
}
2424

2525
// DASH 스트리밍용 URL
2626
@GetMapping("/videos/dash-urls")
2727
public Map<String, URL> getDashUrls(
28+
@RequestParam String bucket,
2829
@RequestParam String mpdFile,
2930
@RequestParam List<String> segmentFiles
3031
) {
31-
return videoService.generateDashUrls("test-bucket", mpdFile, segmentFiles);
32+
return videoService.generateDashUrls(bucket, mpdFile, segmentFiles);
3233
}
3334
}

back/src/main/java/com/back/domain/file/service/VideoService.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,6 @@ public Video getNewsByUuid(String uuid) {
4141

4242
//HeadObjectRequest 고려
4343
public URL generateUploadUrl(String bucket, String objectKey) {
44-
if(!isExist(bucket, objectKey)){
45-
throw new NoSuchElementException("요청한 파일이 존재하지 않습니다: " + objectKey);
46-
}
47-
4844
PutObjectRequest request = PutObjectRequest.builder()
4945
.bucket(bucket)
5046
.key(objectKey)

back/src/main/java/com/back/global/app/S3Config.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import software.amazon.awssdk.regions.Region;
88
import software.amazon.awssdk.services.s3.S3Client;
99
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
10+
import software.amazon.awssdk.services.s3.S3Configuration;
1011

1112
import java.net.URI;
1213

@@ -28,6 +29,7 @@ public S3Presigner s3Presigner() {
2829
)
2930
.endpointOverride(URI.create(ENDPOINT))
3031
.region(REGION)
32+
.serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build())
3133
.build();
3234
}
3335

@@ -41,6 +43,7 @@ public S3Client s3Client() {
4143
)
4244
.endpointOverride(URI.create(ENDPOINT))
4345
.region(REGION)
46+
.forcePathStyle(true)
4447
.build();
4548
}
4649
}

back/src/main/java/com/back/global/security/SecurityConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
3030
.requestMatchers("/auth/**").permitAll()
3131
.requestMatchers("/actuator/**").permitAll()
3232
.requestMatchers("/swagger-ui/**", "/v3/api-docs/**", "/swagger-resources/**").permitAll()
33+
.requestMatchers("/videos/upload-url").permitAll()
3334
.anyRequest().authenticated()
3435
)
3536
.headers(headers -> headers

consumer.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import os
2+
import json
3+
import time
4+
import subprocess
5+
from kafka import KafkaConsumer
6+
from kafka.errors import NoBrokersAvailable
7+
import boto3
8+
9+
# =========================
10+
# 환경 변수 설정
11+
# =========================
12+
13+
# Kafka
14+
KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "s3-events")
15+
KAFKA_BOOTSTRAP = os.getenv("KAFKA_BOOTSTRAP", "kafka:29092") # Docker 내부 네트워크용
16+
GROUP_ID = os.getenv("GROUP_ID", "minio-consumer")
17+
18+
# S3 / MinIO
19+
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "http://minio:9000")
20+
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
21+
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "minioadmin")
22+
DOWNLOAD_DIR = os.getenv("DOWNLOAD_DIR", "/downloads")
23+
REUPLOAD_BUCKET = os.getenv("REUPLOAD_BUCKET", "transcoded-videos")
24+
25+
# 다운로드 디렉토리 생성
26+
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
27+
28+
# =========================
29+
# S3 클라이언트 생성
30+
# =========================
31+
s3 = boto3.client(
32+
"s3",
33+
endpoint_url=MINIO_ENDPOINT,
34+
aws_access_key_id=MINIO_ACCESS_KEY,
35+
aws_secret_access_key=MINIO_SECRET_KEY
36+
)
37+
38+
# =========================
39+
# DASH 트랜스코딩 함수
40+
# =========================
41+
def is_video(file_path):
42+
try:
43+
cmd = [
44+
"ffprobe",
45+
"-v", "error",
46+
"-select_streams", "v:0",
47+
"-show_entries", "stream=codec_type",
48+
"-of", "default=noprint_wrappers=1:nokey=1",
49+
file_path
50+
]
51+
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode().strip()
52+
return output == "video"
53+
except Exception:
54+
return False
55+
56+
def encode_dash_multi_quality(input_file, output_dir):
57+
os.makedirs(output_dir, exist_ok=True)
58+
59+
# 화질별 설정 (폴더 이름, 해상도, 비트레이트)
60+
qualities = [
61+
("1080p", "1920x1080", "5000k"),
62+
("720p", "1280x720", "3000k"),
63+
("480p", "854x480", "1500k")
64+
]
65+
66+
for name, resolution, bitrate in qualities:
67+
quality_dir = os.path.join(output_dir, name)
68+
os.makedirs(quality_dir, exist_ok=True)
69+
70+
cmd = [
71+
"ffmpeg",
72+
"-i", input_file,
73+
"-c:v", "libx264",
74+
"-b:v", bitrate,
75+
"-s", resolution,
76+
"-c:a", "aac",
77+
"-f", "dash",
78+
os.path.join(quality_dir, "manifest.mpd")
79+
]
80+
81+
print(f"{name} 트랜스코딩 시작: {input_file}{quality_dir}")
82+
subprocess.run(cmd, check=True)
83+
print(f"{name} 트랜스코딩 완료: {quality_dir}")
84+
85+
# =========================
86+
# DASH 폴더 업로드 함수
87+
# =========================
88+
def upload_folder_to_minio(local_folder, bucket_name, s3_prefix=""):
89+
try:
90+
s3.head_bucket(Bucket=bucket_name)
91+
except:
92+
s3.create_bucket(Bucket=bucket_name)
93+
print(f"버킷 생성: {bucket_name}")
94+
95+
for root, dirs, files in os.walk(local_folder):
96+
for file in files:
97+
local_path = os.path.join(root, file)
98+
relative_path = os.path.relpath(local_path, local_folder)
99+
s3_key = os.path.join(s3_prefix, relative_path).replace("\\", "/")
100+
print(f"업로드 중: {local_path}{bucket_name}/{s3_key}")
101+
s3.upload_file(local_path, bucket_name, s3_key)
102+
print(f"폴더 업로드 완료: {bucket_name}/{s3_prefix}")
103+
104+
# =========================
105+
# Kafka Consumer 연결 재시도
106+
# =========================
107+
while True:
108+
try:
109+
consumer = KafkaConsumer(
110+
KAFKA_TOPIC,
111+
bootstrap_servers=KAFKA_BOOTSTRAP,
112+
auto_offset_reset='earliest',
113+
enable_auto_commit=True,
114+
group_id=GROUP_ID
115+
)
116+
print("Kafka 이벤트 구독 시작...")
117+
break
118+
except NoBrokersAvailable:
119+
print("Kafka 브로커 연결 실패, 5초 후 재시도...")
120+
time.sleep(5)
121+
122+
# =========================
123+
# 이벤트 처리 루프
124+
# =========================
125+
for msg in consumer:
126+
try:
127+
data = json.loads(msg.value.decode('utf-8'))
128+
bucket = data['Records'][0]['s3']['bucket']['name']
129+
key = data['Records'][0]['s3']['object']['key']
130+
print(f"업로드 감지: {bucket}/{key}")
131+
132+
# 다운로드
133+
download_path = os.path.join(DOWNLOAD_DIR, key.replace("/", "_"))
134+
s3.download_file(bucket, key, download_path)
135+
print(f"다운로드 완료: {download_path}")
136+
137+
# 영상 확인 후 DASH 인코딩 (3화질)
138+
if is_video(download_path):
139+
dash_output_dir = os.path.join(DOWNLOAD_DIR, "dash_" + os.path.splitext(key)[0])
140+
encode_dash_multi_quality(download_path, dash_output_dir)
141+
print(f"DASH 인코딩 완료: {dash_output_dir}")
142+
143+
# DASH 결과 재업로드
144+
upload_folder_to_minio(dash_output_dir, REUPLOAD_BUCKET, s3_prefix=os.path.splitext(key)[0])
145+
else:
146+
print(f"영상 아님, 인코딩 스킵: {download_path}")
147+
148+
except Exception as e:
149+
print("오류:", e)

docker-compose.yml

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,62 @@
1-
version: "3.8"
2-
31
services:
2+
zookeeper:
3+
image: confluentinc/cp-zookeeper:7.0.1
4+
container_name: zookeeper
5+
ports:
6+
- "2181:2181"
7+
environment:
8+
- ZOOKEEPER_CLIENT_PORT=2181
9+
- ZOOKEEPER_TICK_TIME=2000
10+
11+
kafka:
12+
image: confluentinc/cp-kafka:7.0.1
13+
container_name: kafka
14+
depends_on:
15+
- zookeeper
16+
ports:
17+
- "9092:9092"
18+
environment:
19+
KAFKA_BROKER_ID: 1
20+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
21+
KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
22+
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
23+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
24+
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
25+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
26+
427
minio:
5-
image: quay.io/minio/minio:latest
28+
image: minio/minio
629
container_name: minio
730
ports:
8-
- "9000:9000" # S3 API
9-
- "9001:9001" # Web Console
31+
- "9000:9000"
32+
- "9001:9001"
1033
environment:
11-
MINIO_ROOT_USER: minioadmin
12-
MINIO_ROOT_PASSWORD: minioadmin
34+
- MINIO_ROOT_USER=minioadmin
35+
- MINIO_ROOT_PASSWORD=minioadmin
36+
- MINIO_NOTIFY_KAFKA_ENABLE_PRIMARY=on
37+
- MINIO_NOTIFY_KAFKA_BROKERS_PRIMARY=kafka:29092
38+
- MINIO_NOTIFY_KAFKA_TOPIC_PRIMARY=s3-events
39+
- MINIO_NOTIFY_KAFKA_VERSION_PRIMARY=2.7.0
40+
command: server /data --console-address ":9001"
1341
volumes:
1442
- minio-data:/data
15-
command: server /data --console-address ":9001"
43+
depends_on:
44+
- kafka
45+
minio-consumer:
46+
build: .
47+
container_name: minio-consumer
48+
depends_on:
49+
- kafka
50+
- minio
51+
environment:
52+
- KAFKA_TOPIC=s3-events
53+
- KAFKA_BOOTSTRAP=kafka:29092 # 컨테이너 네트워크 내부 주소
54+
- MINIO_ENDPOINT=http://minio:9000
55+
- MINIO_ACCESS_KEY=minioadmin
56+
- MINIO_SECRET_KEY=minioadmin
57+
- DOWNLOAD_DIR=/downloads
58+
volumes:
59+
- ./downloads:/downloads
1660

1761
volumes:
1862
minio-data:

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
kafka-python
2+
boto3

0 commit comments

Comments
 (0)