mirror of
https://github.com/StepanovPlaton/torrent_backend.git
synced 2026-04-03 12:20:38 +04:00
110 lines
4.0 KiB
Python
110 lines
4.0 KiB
Python
import hashlib
|
|
from io import BytesIO
|
|
import mimetypes
|
|
from pathlib import Path
|
|
from typing import Literal
|
|
import aiofiles
|
|
from fastapi import UploadFile
|
|
from PIL import Image
|
|
|
|
from env import Env
|
|
|
|
IMAGE_TARGET_SIZE = Env.get_strict("IMAGE_TARGET_SIZE", int)
|
|
PREVIEW_TARGET_SIZE = Env.get_strict("PREVIEW_TARGET_SIZE", int)
|
|
|
|
|
|
def create_hash_name(filename: str):
|
|
# TODO: Hash from file data
|
|
return str(hashlib.sha1(filename.encode()).hexdigest())
|
|
|
|
|
|
async def save_torrent_file(torrent: UploadFile):
|
|
if (torrent.filename is None):
|
|
raise ValueError("Filename not found")
|
|
hash_filename = create_hash_name(torrent.filename)+".torrent"
|
|
async with aiofiles.open(Path() / "content" / "torrent"
|
|
/ hash_filename, 'wb') as file:
|
|
torrent_data = await torrent.read()
|
|
if (isinstance(torrent_data, str)):
|
|
raise ValueError("Invalid torrent file")
|
|
await file.write(torrent_data)
|
|
return hash_filename
|
|
|
|
|
|
async def save_image(cover: UploadFile, type: Literal["cover", "screenshot"]):
|
|
if (cover.filename is None):
|
|
raise ValueError("Filename not found")
|
|
if (cover.content_type is None):
|
|
raise ValueError("File content type unknown")
|
|
|
|
hash_filename = create_hash_name(cover.filename)
|
|
file_extension = mimetypes.guess_extension(cover.content_type)
|
|
if (file_extension is None):
|
|
raise NameError("File extension not found")
|
|
else:
|
|
hash_filename += file_extension
|
|
|
|
async with aiofiles.open(Path() / "content" / "images" / type / "full_size"
|
|
/ hash_filename, 'wb') as full_size_file, \
|
|
aiofiles.open(Path() / "content" / "images" / type /
|
|
"preview" / hash_filename, 'wb') as preview_file:
|
|
cover_data = await cover.read()
|
|
if (isinstance(cover_data, str)):
|
|
raise ValueError("Invalid image file")
|
|
|
|
cover_full_size = Image.open(BytesIO(cover_data))
|
|
compressed_coefficient = \
|
|
(cover_full_size.size[0] * cover_full_size.size[1]
|
|
) / IMAGE_TARGET_SIZE
|
|
if (compressed_coefficient < 1):
|
|
compressed_coefficient = 1
|
|
|
|
cover_full_size = cover_full_size.resize(
|
|
(int(cover_full_size.size[0] / compressed_coefficient),
|
|
int(cover_full_size.size[1] / compressed_coefficient))
|
|
)
|
|
buf = BytesIO()
|
|
cover_full_size.save(
|
|
buf, format=cover.content_type.upper().replace("IMAGE/", ""))
|
|
await full_size_file.write(buf.getbuffer())
|
|
|
|
cover_preview = Image.open(BytesIO(cover_data))
|
|
compressed_coefficient /= \
|
|
(cover_preview.size[0] * cover_preview.size[1]
|
|
) / PREVIEW_TARGET_SIZE
|
|
if (compressed_coefficient < 1):
|
|
compressed_coefficient = 1
|
|
|
|
cover_preview = cover_preview.resize(
|
|
(int(cover_preview.size[0] / compressed_coefficient),
|
|
int(cover_preview.size[1] / compressed_coefficient))
|
|
)
|
|
|
|
buf = BytesIO()
|
|
cover_preview.save(
|
|
buf, format=cover.content_type.upper().replace("IMAGE/", ""))
|
|
await preview_file.write(buf.getbuffer())
|
|
return hash_filename
|
|
|
|
|
|
async def save_audio_fragment(fragment: UploadFile):
|
|
if (fragment.filename is None):
|
|
raise ValueError("Filename not found")
|
|
if (fragment.content_type is None):
|
|
raise ValueError("File content type unknown")
|
|
|
|
hash_filename = create_hash_name(fragment.filename)
|
|
file_extension = mimetypes.guess_extension(fragment.content_type)
|
|
if (file_extension is None):
|
|
raise NameError("File extension not found")
|
|
else:
|
|
hash_filename += file_extension
|
|
|
|
async with aiofiles.open(Path() / "content" / "audio"
|
|
/ hash_filename, 'wb') as file:
|
|
fragment_data = await fragment.read()
|
|
if (isinstance(fragment_data, str)):
|
|
raise ValueError("Invalid audio file")
|
|
await file.write(fragment_data)
|
|
return hash_filename
|