|
| 1 | +import requests |
| 2 | +import json |
| 3 | +import time |
| 4 | +import os |
| 5 | +import base64 |
| 6 | +from PIL import Image |
| 7 | +from io import BytesIO |
| 8 | +from lumaai import LumaAI |
| 9 | +from src.upload.bilitool.bilitool.model.model import Model |
| 10 | +from src.config import LUMA_API_KEY |
| 11 | + |
| 12 | + |
| 13 | +def cover_up(img: str): |
| 14 | + """Upload the cover image |
| 15 | + Parameters |
| 16 | + ---------- |
| 17 | + - img: img path or stream |
| 18 | + Returns |
| 19 | + ------- |
| 20 | + - url: str |
| 21 | + the url of the cover image in bili server |
| 22 | + """ |
| 23 | + from PIL import Image |
| 24 | + from io import BytesIO |
| 25 | + |
| 26 | + request = requests.Session() |
| 27 | + request.headers = { |
| 28 | + "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/63.0.3239.108", |
| 29 | + "referer": "https://www.bilibili.com/", |
| 30 | + "connection": "keep-alive", |
| 31 | + } |
| 32 | + model = Model().get_config() |
| 33 | + request.cookies.set("SESSDATA", model["cookies"]["SESSDATA"]) |
| 34 | + with Image.open(img) as im: |
| 35 | + # you should keep the image ratio 16:10 |
| 36 | + xsize, ysize = im.size |
| 37 | + if xsize / ysize > 1.6: |
| 38 | + delta = xsize - ysize * 1.6 |
| 39 | + region = im.crop((delta / 2, 0, xsize - delta / 2, ysize)) |
| 40 | + else: |
| 41 | + delta = ysize - xsize * 10 / 16 |
| 42 | + region = im.crop((0, delta / 2, xsize, ysize - delta / 2)) |
| 43 | + buffered = BytesIO() |
| 44 | + region.save(buffered, format=im.format) |
| 45 | + r = request.post( |
| 46 | + url="https://member.bilibili.com/x/vu/web/cover/up", |
| 47 | + data={ |
| 48 | + "cover": b"data:image/jpeg;base64," |
| 49 | + + (base64.b64encode(buffered.getvalue())), |
| 50 | + "csrf": model["cookies"]["bili_jct"], |
| 51 | + }, |
| 52 | + timeout=30, |
| 53 | + ) |
| 54 | + buffered.close() |
| 55 | + res = r.json() |
| 56 | + if res.get("data") is None: |
| 57 | + raise Exception(res) |
| 58 | + print(res["data"]["url"], flush=True) |
| 59 | + return res["data"]["url"] |
| 60 | + |
| 61 | + |
| 62 | +def luma_generate_cover(your_file_path): |
| 63 | + """Generate cover for video using Luma Photon |
| 64 | + Args: |
| 65 | + your_file_path: str, path to the video file |
| 66 | + Returns: |
| 67 | + str: generated cover |
| 68 | + """ |
| 69 | + try: |
| 70 | + cover_url = cover_up(your_file_path) |
| 71 | + client = LumaAI( |
| 72 | + auth_token=LUMA_API_KEY, |
| 73 | + ) |
| 74 | + generation = client.generations.image.create( |
| 75 | + prompt="This is a video screenshot, please generate a cover in the style of a manga", |
| 76 | + image_ref=[{"url": cover_url, "weight": 0.85}], |
| 77 | + ) |
| 78 | + completed = False |
| 79 | + while not completed: |
| 80 | + generation = client.generations.get(id=generation.id) |
| 81 | + if generation.state == "completed": |
| 82 | + completed = True |
| 83 | + elif generation.state == "failed": |
| 84 | + raise RuntimeError(f"Generation failed: {generation.failure_reason}") |
| 85 | + print("Dreaming") |
| 86 | + time.sleep(2) |
| 87 | + |
| 88 | + image_url = generation.assets.image |
| 89 | + |
| 90 | + response = requests.get(image_url, stream=True) |
| 91 | + cover_name = time.strftime("%Y%m%d%H%M%S") + ".jpg" |
| 92 | + temp_cover_path = os.path.join(os.path.dirname(your_file_path), cover_name) |
| 93 | + with open(temp_cover_path, "wb") as file: |
| 94 | + file.write(response.content) |
| 95 | + os.remove(your_file_path) |
| 96 | + return temp_cover_path |
| 97 | + except Exception as e: |
| 98 | + print(e, flush=True) |
| 99 | + return None |
| 100 | + |
| 101 | + |
| 102 | +if __name__ == "__main__": |
| 103 | + print(luma_generate_cover("")) |
0 commit comments