-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
298 lines (244 loc) · 9.39 KB
/
main.py
File metadata and controls
298 lines (244 loc) · 9.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
from fastapi import FastAPI, Request, HTTPException, Response, Form, UploadFile, File
from fastapi.responses import RedirectResponse
import shutil
from fastapi.responses import StreamingResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from pathlib import Path
import subprocess
app = FastAPI()
BASE_DIR = Path(__file__).parent
SHOWS_DIR = BASE_DIR / "shows"
MOVIES_DIR = SHOWS_DIR / "Movies"
templates = Jinja2Templates(directory="templates")
app.mount("/shows", StaticFiles(directory=SHOWS_DIR), name="shows")
def get_show_season_structure():
structure = {}
for show_dir in SHOWS_DIR.iterdir():
if show_dir.is_dir() and show_dir.name.lower() != "movies":
seasons = [season.name for season in show_dir.iterdir() if season.is_dir()]
structure[show_dir.name] = sorted(seasons)
return structure
@app.get("/upload")
def upload_page(request: Request):
show_structure = get_show_season_structure()
return templates.TemplateResponse("upload.html", {
"request": request,
"show_structure": show_structure
})
@app.get("/")
def home(request: Request):
entries = []
for entry_dir in SHOWS_DIR.iterdir():
if not entry_dir.is_dir():
continue
is_movie = any(f.suffix == ".mp4" for f in entry_dir.iterdir())
entries.append({
"name": entry_dir.name,
"is_movie": is_movie,
"thumbnail": f"/shows/{entry_dir.name}/thumbnail.jpeg"
})
return templates.TemplateResponse("index.html", {
"request": request,
"entries": sorted(entries, key=lambda x: x["name"].lower())
})
@app.get("/Movies")
def list_all_movies(request: Request):
if not MOVIES_DIR.exists() or not MOVIES_DIR.is_dir():
raise HTTPException(status_code=404, detail="Movies folder missing")
movie_files = [f.name for f in MOVIES_DIR.glob("*.mp4")]
return templates.TemplateResponse("episodes.html", {
"request": request,
"show": "Movies",
"season": None,
"episodes": sorted(movie_files),
"is_movie": True
})
@app.get("/{entry_name}/{second}")
def handle_show_or_movie_file(entry_name: str, second: str, request: Request):
entry_dir = SHOWS_DIR / entry_name
full_path = entry_dir / second
if not entry_dir.exists():
raise HTTPException(status_code=404, detail="Entry not found")
# If it's a video file, render player
if full_path.suffix == ".mp4" and full_path.is_file():
return templates.TemplateResponse("watch.html", {
"request": request,
"title": f"{entry_name} - {second}",
"stream_type": "shows",
"video_url": f"{entry_name}/{second}"
})
# If it's a directory (season), render episode list
if full_path.is_dir():
episodes = [
f.name for f in full_path.iterdir()
if f.suffix == ".mp4"
]
return templates.TemplateResponse("episodes.html", {
"request": request,
"show": entry_name,
"season": second,
"episodes": sorted(episodes)
})
raise HTTPException(status_code=404, detail="Not found")
@app.get("/{show_name}/{season}/{episode}")
def episode_player(show_name: str, season: str, episode: str, request: Request):
file_path = SHOWS_DIR / show_name / season / episode
if not file_path.exists():
raise HTTPException(status_code=404, detail="Episode not found")
return templates.TemplateResponse("watch.html", {
"request": request,
"title": f"{show_name} - {season} - {episode}",
"stream_type": "shows",
"video_url": f"{show_name}/{season}/{episode}"
})
@app.get("/stream/{type}/{path:path}")
def stream_video(type: str, path: str, request: Request):
if type not in ("shows", "movies"):
raise HTTPException(status_code=404)
file_path = SHOWS_DIR / path if type == "shows" else MOVIES_DIR / path
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
file_size = file_path.stat().st_size
range_header = request.headers.get("range")
start = 0
end = file_size - 1
if range_header:
bytes_range = range_header.replace("bytes=", "").split("-")
if bytes_range[0]:
start = int(bytes_range[0])
if len(bytes_range) > 1 and bytes_range[1]:
end = int(bytes_range[1])
if start >= file_size or start > end:
return Response(status_code=416)
def iterfile():
with open(file_path, "rb") as f:
f.seek(start)
remaining = end - start + 1
chunk_size = 1024 * 1024
while remaining > 0:
chunk = f.read(min(chunk_size, remaining))
if not chunk:
break
yield chunk
remaining -= len(chunk)
headers = {
"Content-Type": "video/mp4",
"Accept-Ranges": "bytes",
"Content-Range": f"bytes {start}-{end}/{file_size}",
"Content-Length": str(end - start + 1),
}
return StreamingResponse(iterfile(), headers=headers, status_code=206 if range_header else 200)
@app.get("/{entry_name}")
def entry_page(entry_name: str, request: Request):
entry_path = SHOWS_DIR / entry_name
if not entry_path.exists() or not entry_path.is_dir():
raise HTTPException(status_code=404, detail="Entry not found")
# Movie folder (contains mp4s directly)
mp4s = [f for f in entry_path.iterdir() if f.suffix == ".mp4"]
if mp4s:
if len(mp4s) == 1:
return templates.TemplateResponse("watch.html", {
"request": request,
"title": entry_name,
"stream_type": "shows",
"video_url": f"{entry_name}/{mp4s[0].name}"
})
else:
return templates.TemplateResponse("episodes.html", {
"request": request,
"show": entry_name,
"season": None,
"episodes": [f.name for f in mp4s]
})
# TV show folder (contains season directories)
seasons = [d.name for d in entry_path.iterdir() if d.is_dir()]
return templates.TemplateResponse("seasons.html", {
"request": request,
"show": entry_name,
"seasons": sorted(seasons)
})
@app.post("/upload")
async def upload_handler(
request: Request,
type: str = Form(...),
movie_name: str = Form(None),
show_name: str = Form(None),
new_show: str = Form(None),
season: str = Form(None),
new_season: str = Form(None),
episode: str = Form(None),
video: UploadFile = Form(...),
thumb_time: int = Form(55),
):
if type == "movie":
movie_dir = MOVIES_DIR
movie_dir.mkdir(parents=True, exist_ok=True)
dest_path = movie_dir / f"{movie_name}.mp4"
thumb_path = movie_dir / f"{movie_name}-thumbnail.jpeg"
elif type == "show":
final_show = new_show if show_name == "_new" else show_name
final_season = new_season if season == "_new" else season
if not final_show or not final_season or not episode:
raise HTTPException(status_code=400, detail="Missing show, season, or episode")
dest_dir = SHOWS_DIR / final_show / final_season
dest_dir.mkdir(parents=True, exist_ok=True)
dest_path = dest_dir / episode
thumb_path = dest_dir / f"{Path(episode).stem}-thumbnail.jpeg"
else:
raise HTTPException(status_code=400, detail="Invalid type")
# Save uploaded video
with dest_path.open("wb") as f:
shutil.copyfileobj(video.file, f)
# Generate thumbnail using ffmpeg
try:
subprocess.run([
"ffmpeg",
"-ss", str(thumb_time),
"-i", str(dest_path),
"-vframes", "1",
"-q:v", "2",
str(thumb_path)
], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError:
print(f"Thumbnail generation failed for {dest_path}")
return RedirectResponse(url="/", status_code=303)
def generate_missing_thumbnails():
print("🔍 Scanning for missing thumbnails...")
for entry in SHOWS_DIR.iterdir():
if not entry.is_dir():
continue
# Handle movies
if entry.name.lower() == "movies":
for mp4 in entry.glob("*.mp4"):
generate_thumb_if_missing(mp4)
continue
# Handle shows with seasons
for season in entry.iterdir():
if not season.is_dir():
continue
for mp4 in season.glob("*.mp4"):
generate_thumb_if_missing(mp4)
print("✅ Thumbnail check complete.")
def generate_thumb_if_missing(mp4_path: Path):
thumb_path = mp4_path.with_name(mp4_path.stem + "-thumbnail.jpg")
if thumb_path.exists():
return
print(f"🎞️ Generating thumbnail for: {mp4_path.relative_to(SHOWS_DIR.parent)}")
cmd = [
"ffmpeg",
"-loglevel", "error",
"-ss", "00:00:55",
"-i", str(mp4_path),
"-frames:v", "1",
"-q:v", "2",
str(thumb_path)
]
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError:
print(f"⚠️ Failed to generate thumbnail for {mp4_path}")
if __name__ == "__main__":
import uvicorn
generate_missing_thumbnails()
uvicorn.run(app, host="0.0.0.0", port=8000)