Skip to content

Commit 4e7be3c

Browse files
LimLLLlimliutimerring
authored
fix & feat: use sqlite to track upload queue and add re-upload when last upload task failed (#234)
* use sqlite to track upload queue and add reupload when last uplaod task failed * chore: add uuid * feat: control whether to include room title in video title * revert: revert title adjustment --------- Co-authored-by: limliu <[email protected]> Co-authored-by: John Howe <[email protected]>
1 parent d20cc94 commit 4e7be3c

File tree

8 files changed

+195
-68
lines changed

8 files changed

+195
-68
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,4 +382,5 @@ src/subtitle/models/base.pt
382382
src/subtitle/models/small.pt
383383
src/burn/mergevideo.txt
384384
src/upload/upload.yaml
385-
src/upload/uploadVideoQueue.txt
385+
src/upload/uploadVideoQueue.txt
386+
src/db/data.db

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,5 @@ triton==3.1.0
1313
zhconv==1.4.3
1414
bilitool
1515
zhipuai
16+
pysqlite3
17+
uuid

src/burn/render_then_merge.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33
import argparse
44
import os
55
import subprocess
6-
from src.config import GPU_EXIST, SRC_DIR
6+
from src.config import GPU_EXIST, SRC_DIR, VIDEOS_DIR
77
from src.danmaku.generate_danmakus import get_resolution, process_danmakus
88
from src.subtitle.generate_subtitles import generate_subtitles
99
from src.burn.render_command import render_command
1010
from src.upload.extract_video_info import get_video_info
1111
from src.log.logger import scan_log
12+
from db.conn import insert_upload_queue
13+
from src.upload.generate_yaml import generate_yaml_template
14+
from uuid import uuid4
1215

1316
def normalize_video_path(filepath):
1417
"""Normalize the video path to upload
@@ -90,5 +93,10 @@ def render_then_merge(video_path_list):
9093
merge_command(output_video_path, title, artist, date, merge_list)
9194
subprocess.run(['rm', '-r', tmp])
9295

93-
with open(f"{SRC_DIR}/upload/uploadVideoQueue.txt", "a") as file:
94-
file.write(f"{output_video_path}\n")
96+
yaml_template = generate_yaml_template(output_video_path)
97+
template_path = os.path.join(VIDEOS_DIR, f'upload_conf/{uuid4()}.yaml')
98+
with open(template_path, 'w', encoding='utf-8') as f:
99+
f.write(yaml_template)
100+
101+
if not insert_upload_queue(output_video_path, template_path):
102+
scan_log('插入待上传条目失败')

src/burn/render_video.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import argparse
44
import os
55
import subprocess
6-
from src.config import GPU_EXIST, SRC_DIR, MODEL_TYPE, AUTO_SLICE, SLICE_DURATION, MIN_VIDEO_SIZE, SLICE_NUM, SLICE_OVERLAP, SLICE_STEP
6+
from src.config import GPU_EXIST, SRC_DIR, MODEL_TYPE, AUTO_SLICE, SLICE_DURATION, MIN_VIDEO_SIZE, VIDEOS_DIR , SLICE_NUM, SLICE_OVERLAP, SLICE_STEP
77
from src.danmaku.generate_danmakus import get_resolution, process_danmakus
88
from src.subtitle.generate_subtitles import generate_subtitles
99
from src.burn.render_command import render_command
@@ -12,6 +12,9 @@
1212
from src.autoslice.zhipu_sdk import zhipu_glm_4v_plus_generate_title
1313
from src.upload.extract_video_info import get_video_info
1414
from src.log.logger import scan_log
15+
from db.conn import insert_upload_queue
16+
from src.upload.generate_yaml import generate_yaml_template, generate_slice_yaml_template
17+
from uuid import uuid4
1518

1619
def normalize_video_path(filepath):
1720
"""Normalize the video path to upload
@@ -69,9 +72,13 @@ def render_video(video_path):
6972
slice_video_flv_path = slice_path[:-4] + '.flv'
7073
inject_metadata(slice_path, glm_title, slice_video_flv_path)
7174
os.remove(slice_path)
72-
with open(f"{SRC_DIR}/upload/uploadVideoQueue.txt", "a") as file:
73-
scan_log.info(f"Complete {slice_video_flv_path} and wait for uploading!")
74-
file.write(f"{slice_video_flv_path}\n")
75+
slice_yaml_template = generate_slice_yaml_template(slice_video_flv_path)
76+
slice_template_path = os.path.join(VIDEOS_DIR, f'upload_conf/{uuid4()}.yaml')
77+
with open(slice_template_path, 'w', encoding='utf-8') as f:
78+
f.write(slice_yaml_template)
79+
80+
if not insert_upload_queue(slice_video_flv_path, slice_template_path):
81+
scan_log('插入待上传条目失败')
7582
except Exception as e:
7683
scan_log.error(f"Error in {slice_path}: {e}")
7784

@@ -83,6 +90,11 @@ def render_video(video_path):
8390
# # For test
8491
# test_path = original_video_path[:-4]
8592
# os.rename(original_video_path, test_path)
86-
87-
with open(f"{SRC_DIR}/upload/uploadVideoQueue.txt", "a") as file:
88-
file.write(f"{format_video_path}\n")
93+
94+
yaml_template = generate_yaml_template(format_video_path)
95+
template_path = os.path.join(VIDEOS_DIR, f'upload_conf/{uuid4()}.yaml')
96+
with open(template_path, 'w', encoding='utf-8') as f:
97+
f.write(yaml_template)
98+
99+
if not insert_upload_queue(format_video_path, template_path):
100+
scan_log('插入待上传条目失败')

src/config.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from pathlib import Path
55
from datetime import datetime
66
import configparser
7+
from db.conn import create_table
78

89
# ============================ Your configuration ============================
910
GPU_EXIST=True
@@ -33,6 +34,16 @@
3334
LOG_DIR = os.path.join(BILIVE_DIR, 'logs')
3435
VIDEOS_DIR = os.path.join(BILIVE_DIR, 'Videos')
3536

37+
38+
if not os.path.exists(SRC_DIR + '/db/data.db'):
39+
print("初始化数据库")
40+
create_table()
41+
42+
if not os.path.exists(VIDEOS_DIR):
43+
os.makedirs(VIDEOS_DIR)
44+
if not os.path.exists(VIDEOS_DIR + '/upload_conf'):
45+
os.makedirs(VIDEOS_DIR + '/upload_conf')
46+
3647
def get_model_path():
3748
SRC_DIR = str(Path(os.path.abspath(__file__)).parent)
3849
model_dir = os.path.join(SRC_DIR, 'subtitle', 'models')

src/db/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from db.conn import *

src/db/conn.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import sqlite3
2+
import os
3+
4+
# DATA_BASE_FILE ='./data.db'
5+
DATA_BASE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data.db')
6+
7+
def connect():
8+
db = sqlite3.connect(DATA_BASE_FILE)
9+
return db
10+
11+
def create_table():
12+
try:
13+
db = connect()
14+
cursor = db.cursor()
15+
sql = [
16+
"create table upload_queue (id integer primary key autoincrement, video_path text, config_path text, locked integer default 0);",
17+
"create unique index idx_video_path on upload_queue(video_path);",
18+
]
19+
for s in sql:
20+
cursor.execute(s)
21+
db.commit()
22+
db.close()
23+
return True
24+
except:
25+
print("Create table failed.")
26+
return False
27+
28+
def get_single_upload_queue():
29+
db = connect()
30+
cursor = db.cursor()
31+
cursor.execute("select video_path, config_path from upload_queue where locked = 0 limit 1;")
32+
row = cursor.fetchone()
33+
result = {'video_path': row[0], 'config_path': row[1]} if row else None
34+
db.close()
35+
return result
36+
37+
def insert_upload_queue(video_path: str, config_path: str):
38+
try:
39+
db = connect()
40+
cursor = db.cursor()
41+
cursor.execute("insert into upload_queue (video_path, config_path) values (?, ?);", (video_path, config_path))
42+
db.commit()
43+
db.close()
44+
return True
45+
except sqlite3.IntegrityError:
46+
print("Insert Upload Queue failed, the video path already exists.")
47+
return False
48+
49+
def delete_upload_queue(video_path: str):
50+
try:
51+
db = connect()
52+
cursor = db.cursor()
53+
cursor.execute("delete from upload_queue where video_path = ?;", (video_path,))
54+
db.commit()
55+
db.close()
56+
return True
57+
except:
58+
print("Delete Upload Queue failed.")
59+
return False
60+
61+
def update_upload_queue_lock(video_path: str, locked: int):
62+
try:
63+
db = connect()
64+
cursor = db.cursor()
65+
cursor.execute("update upload_queue set locked = ? where video_path = ?;", (locked, video_path))
66+
db.commit()
67+
db.close()
68+
return True
69+
except:
70+
print("Update Upload Queue failed.")
71+
return False
72+
73+
74+
75+
if __name__ == "__main__":
76+
# Create Table
77+
create_table()
78+
# Insert Test Data
79+
insert_upload_queue('test.mp4', 'config.yaml')
80+
# Insert again to check the unique index
81+
print(insert_upload_queue('test.mp4', 'config.yaml'))
82+
# Get the single upload queue, shold be {'video_path': 'test.mp4', 'config_path': 'config.yaml'}
83+
print(get_single_upload_queue())
84+
# Delete the upload queue
85+
delete_upload_queue('test.mp4')
86+
# Get the single upload queue after delete, should be None
87+
print(get_single_upload_queue())

src/upload/upload.py

Lines changed: 62 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@
1010
from src.log.logger import upload_log
1111
import time
1212
import fcntl
13+
from concurrent.futures import ThreadPoolExecutor, as_completed
14+
from db.conn import get_single_upload_queue, delete_upload_queue, update_upload_queue_lock
15+
import threading
16+
17+
read_lock = threading.Lock()
1318

1419
def upload_video(upload_path, yaml_file_path):
1520
try:
@@ -32,12 +37,16 @@ def upload_video(upload_path, yaml_file_path):
3237
if result.returncode == 0:
3338
upload_log.info("Upload successfully, then delete the video")
3439
os.remove(upload_path)
40+
os.remove(yaml_file_path)
41+
delete_upload_queue(upload_path)
3542
else:
3643
upload_log.error("Fail to upload, the files will be reserved.")
44+
update_upload_queue_lock(upload_path, 0)
3745
return False
3846

3947
except subprocess.CalledProcessError as e:
4048
upload_log.error(f"The upload_video called failed, the files will be reserved. error: {e}")
49+
update_upload_queue_lock(upload_path, 0)
4150
return False
4251

4352
def find_bv_number(target_str, my_list):
@@ -48,57 +57,6 @@ def find_bv_number(target_str, my_list):
4857
return parts[1].strip()
4958
return None
5059

51-
def read_append_and_delete_lines(file_path):
52-
try:
53-
while True:
54-
if os.path.getsize(file_path) == 0:
55-
upload_log.info("Empty queue, wait 2 minutes and check again.")
56-
time.sleep(120)
57-
return
58-
59-
with open(file_path, "r+") as file:
60-
fcntl.flock(file, fcntl.LOCK_EX)
61-
lines = file.readlines()
62-
upload_video_path = lines.pop(0).strip()
63-
file.seek(0)
64-
file.writelines(lines)
65-
# Truncate the file to the current position
66-
file.truncate()
67-
# Release the lock
68-
fcntl.flock(file, fcntl.LOCK_UN)
69-
70-
upload_log.info(f"deal with {upload_video_path}")
71-
# check if the live is already uploaded
72-
if upload_video_path.endswith('.flv'):
73-
# upload slice video
74-
yaml_template = generate_slice_yaml_template(upload_video_path)
75-
yaml_file_path = SRC_DIR + "/upload/upload.yaml"
76-
with open(yaml_file_path, 'w', encoding='utf-8') as file:
77-
file.write(yaml_template)
78-
upload_video(upload_video_path, yaml_file_path)
79-
return
80-
else:
81-
query = generate_title(upload_video_path)
82-
result = subprocess.check_output("bilitool" + " list", shell=True)
83-
# print(result.decode("utf-8"), flush=True)
84-
upload_list = result.decode("utf-8").splitlines()
85-
bv_result = find_bv_number(query, upload_list)
86-
if bv_result:
87-
upload_log.info(f"The series of videos has already been uploaded, the BV number is: {bv_result}")
88-
append_upload(upload_video_path, bv_result)
89-
else:
90-
upload_log.info("First upload this live")
91-
# generate the yaml template
92-
yaml_template = generate_yaml_template(upload_video_path)
93-
yaml_file_path = SRC_DIR + "/upload/upload.yaml"
94-
with open(yaml_file_path, 'w', encoding='utf-8') as file:
95-
file.write(yaml_template)
96-
upload_video(upload_video_path, yaml_file_path)
97-
return
98-
99-
except subprocess.CalledProcessError as e:
100-
upload_log.error(f"The read_append_and_delete_lines called failed, the files will be reserved. error: {e}")
101-
return False
10260

10361
def append_upload(upload_path, bv_result):
10462
try:
@@ -120,18 +78,65 @@ def append_upload(upload_path, bv_result):
12078
if result.returncode == 0:
12179
upload_log.info("Upload successfully, then delete the video")
12280
os.remove(upload_path)
81+
delete_upload_queue(upload_path)
12382
else:
12483
upload_log.error("Fail to append, the files will be reserved.")
84+
update_upload_queue_lock(upload_path, 0)
12585
return False
12686

12787
except subprocess.CalledProcessError as e:
12888
upload_log.error(f"The append_upload called failed, the files will be reserved. error: {e}")
89+
update_upload_queue_lock(upload_path, 0)
12990
return False
91+
92+
93+
def read_append_and_delete_lines():
94+
while True:
95+
upload_queue = None
96+
# read the queue and update lock status to prevent other threads from reading the same data
97+
with read_lock:
98+
upload_queue = get_single_upload_queue()
99+
# if there is a task in the queue, try to lock the task
100+
if upload_queue:
101+
video_path, config_path = upload_queue.values()
102+
# lock the task by updating the locked status to 1
103+
update_result = update_upload_queue_lock(video_path, 1)
104+
# if failed to lock, log the error and let the next thread to handle the task
105+
if not update_result:
106+
upload_log.error(f"Failed to lock task for {video_path}, possibly already locked by another thread or database error.")
107+
upload_queue = None
108+
continue
109+
else:
110+
upload_log.info("Empty queue, wait 2 minutes and check again.")
111+
time.sleep(120)
112+
continue
113+
114+
if upload_queue:
115+
video_path, config_path = upload_queue.values()
116+
upload_log.info(f"deal with {video_path}")
117+
# check if the live is already uploaded
118+
if video_path.endswith('.flv'):
119+
# upload slice video
120+
upload_video(video_path, config_path)
121+
return
122+
else:
123+
query = generate_title(video_path)
124+
result = subprocess.check_output("bilitool" + " list", shell=True)
125+
# print(result.decode("utf-8"), flush=True)
126+
upload_list = result.decode("utf-8").splitlines()
127+
bv_result = find_bv_number(query, upload_list)
128+
if bv_result:
129+
upload_log.info(f"The series of videos has already been uploaded, the BV number is: {bv_result}")
130+
append_upload(video_path, bv_result)
131+
else:
132+
upload_log.info("First upload this live")
133+
upload_video(video_path, config_path)
134+
return
135+
time.sleep(20)
136+
130137

131138
if __name__ == "__main__":
132-
# read the queue and upload the video
133-
queue_path = SRC_DIR + "/upload/uploadVideoQueue.txt"
134-
while True:
135-
read_append_and_delete_lines(queue_path)
136-
upload_log.info("wait for 20 seconds")
137-
time.sleep(20)
139+
max_workers = os.getenv("MAX_WORKERS", 5)
140+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
141+
future_to_upload = {executor.submit(read_append_and_delete_lines) for _ in range(max_workers)}
142+

0 commit comments

Comments
 (0)