327 lines
12 KiB
Python
327 lines
12 KiB
Python
import asyncio
|
|
import tempfile
|
|
import re
|
|
from pathlib import Path
|
|
from typing import List
|
|
from datetime import datetime, timezone
|
|
|
|
import httpx
|
|
from sqlalchemy import select, update
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from ..db_models import AnimeTheme, DownloadTask, DownloadStatus, Anime
|
|
from ..schemas import QueueStatusResponse, QueueTaskResponse
|
|
from ..config import downloader_settings
|
|
from ...storage import storage
|
|
from ...db_models import Opening
|
|
|
|
|
|
class DownloadService:
|
|
"""Service for downloading and converting anime themes."""
|
|
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
|
|
async def add_to_queue(self, theme_ids: List[int]) -> int:
|
|
"""Add themes to download queue (idempotent). Returns number of tasks added."""
|
|
|
|
added = 0
|
|
for theme_id in theme_ids:
|
|
# Check if already in queue
|
|
existing = await self.db.execute(
|
|
select(DownloadTask).where(DownloadTask.theme_id == theme_id)
|
|
)
|
|
if existing.scalar_one_or_none():
|
|
continue
|
|
|
|
# Check if theme exists and is not already downloaded
|
|
result = await self.db.execute(
|
|
select(AnimeTheme).where(AnimeTheme.id == theme_id)
|
|
)
|
|
theme = result.scalar_one_or_none()
|
|
if not theme:
|
|
continue
|
|
|
|
# Skip if already downloaded
|
|
if theme.audio_s3_key:
|
|
continue
|
|
|
|
# Skip if no video URL available
|
|
if not theme.animethemes_video_url:
|
|
continue
|
|
|
|
task = DownloadTask(
|
|
theme_id=theme_id,
|
|
status=DownloadStatus.QUEUED,
|
|
estimated_size_bytes=downloader_settings.default_estimated_size_bytes,
|
|
)
|
|
self.db.add(task)
|
|
added += 1
|
|
|
|
await self.db.commit()
|
|
return added
|
|
|
|
async def add_all_anime_themes(self, anime_id: int) -> int:
|
|
"""Add all themes from anime to queue. Returns number of tasks added."""
|
|
|
|
result = await self.db.execute(
|
|
select(AnimeTheme)
|
|
.where(AnimeTheme.anime_id == anime_id)
|
|
.where(AnimeTheme.audio_s3_key.is_(None))
|
|
.where(AnimeTheme.animethemes_video_url.isnot(None))
|
|
)
|
|
themes = result.scalars().all()
|
|
return await self.add_to_queue([t.id for t in themes])
|
|
|
|
async def get_queue_status(self, worker_running: bool = False) -> QueueStatusResponse:
|
|
"""Get current queue status."""
|
|
|
|
result = await self.db.execute(
|
|
select(DownloadTask)
|
|
.options(selectinload(DownloadTask.theme).selectinload(AnimeTheme.anime))
|
|
.order_by(DownloadTask.created_at.desc())
|
|
)
|
|
tasks = result.scalars().all()
|
|
|
|
task_responses = []
|
|
total_queued = 0
|
|
total_downloading = 0
|
|
total_done = 0
|
|
total_failed = 0
|
|
estimated_queue_size = 0
|
|
|
|
for task in tasks:
|
|
theme = task.theme
|
|
anime = theme.anime
|
|
|
|
anime_title = anime.title_russian or anime.title_english or "Unknown"
|
|
|
|
task_responses.append(QueueTaskResponse(
|
|
id=task.id,
|
|
theme_id=theme.id,
|
|
anime_title=anime_title,
|
|
theme_name=theme.full_name,
|
|
song_title=theme.song_title,
|
|
status=task.status,
|
|
progress_percent=task.progress_percent,
|
|
error_message=task.error_message,
|
|
estimated_size_bytes=task.estimated_size_bytes,
|
|
created_at=task.created_at,
|
|
started_at=task.started_at,
|
|
completed_at=task.completed_at,
|
|
))
|
|
|
|
if task.status == DownloadStatus.QUEUED:
|
|
total_queued += 1
|
|
estimated_queue_size += task.estimated_size_bytes
|
|
elif task.status in (DownloadStatus.DOWNLOADING, DownloadStatus.CONVERTING, DownloadStatus.UPLOADING):
|
|
total_downloading += 1
|
|
estimated_queue_size += task.estimated_size_bytes
|
|
elif task.status == DownloadStatus.DONE:
|
|
total_done += 1
|
|
elif task.status == DownloadStatus.FAILED:
|
|
total_failed += 1
|
|
|
|
return QueueStatusResponse(
|
|
tasks=task_responses,
|
|
total_queued=total_queued,
|
|
total_downloading=total_downloading,
|
|
total_done=total_done,
|
|
total_failed=total_failed,
|
|
estimated_queue_size_bytes=estimated_queue_size,
|
|
worker_running=worker_running,
|
|
)
|
|
|
|
async def process_queue(self) -> None:
|
|
"""Process download queue (called as background task)."""
|
|
|
|
while True:
|
|
# Get next queued task
|
|
result = await self.db.execute(
|
|
select(DownloadTask)
|
|
.where(DownloadTask.status == DownloadStatus.QUEUED)
|
|
.order_by(DownloadTask.created_at)
|
|
.limit(1)
|
|
)
|
|
task = result.scalar_one_or_none()
|
|
|
|
if not task:
|
|
break
|
|
|
|
await self._process_task(task)
|
|
|
|
async def _process_task(self, task: DownloadTask) -> None:
|
|
"""Process a single download task."""
|
|
|
|
try:
|
|
# Update status to downloading
|
|
task.status = DownloadStatus.DOWNLOADING
|
|
task.started_at = datetime.now(timezone.utc)
|
|
task.progress_percent = 10
|
|
await self.db.commit()
|
|
|
|
# Get theme info with anime
|
|
result = await self.db.execute(
|
|
select(AnimeTheme)
|
|
.options(selectinload(AnimeTheme.anime))
|
|
.where(AnimeTheme.id == task.theme_id)
|
|
)
|
|
theme = result.scalar_one()
|
|
anime = theme.anime
|
|
|
|
if not theme.animethemes_video_url:
|
|
raise ValueError("No video URL available")
|
|
|
|
# Download and convert in temp directory
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
tmp_path = Path(tmp_dir)
|
|
webm_file = tmp_path / "video.webm"
|
|
mp3_file = tmp_path / "audio.mp3"
|
|
|
|
# Stream download WebM file
|
|
async with httpx.AsyncClient() as client:
|
|
async with client.stream(
|
|
"GET",
|
|
theme.animethemes_video_url,
|
|
timeout=downloader_settings.download_timeout_seconds,
|
|
follow_redirects=True,
|
|
) as response:
|
|
response.raise_for_status()
|
|
with open(webm_file, "wb") as f:
|
|
async for chunk in response.aiter_bytes(chunk_size=8192):
|
|
f.write(chunk)
|
|
|
|
task.progress_percent = 40
|
|
task.status = DownloadStatus.CONVERTING
|
|
await self.db.commit()
|
|
|
|
# Convert to MP3 with FFmpeg
|
|
process = await asyncio.create_subprocess_exec(
|
|
"ffmpeg", "-i", str(webm_file),
|
|
"-vn", "-acodec", "libmp3lame", "-q:a", "2",
|
|
str(mp3_file),
|
|
"-y",
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await process.communicate()
|
|
|
|
if process.returncode != 0:
|
|
raise RuntimeError(f"FFmpeg error: {stderr.decode()[:500]}")
|
|
|
|
if not mp3_file.exists():
|
|
raise RuntimeError("FFmpeg did not create output file")
|
|
|
|
task.progress_percent = 70
|
|
task.status = DownloadStatus.UPLOADING
|
|
await self.db.commit()
|
|
|
|
# Generate safe S3 key
|
|
anime_name = self._sanitize_filename(
|
|
anime.title_english or anime.title_russian or f"anime_{anime.shikimori_id}"
|
|
)
|
|
theme_name = theme.full_name
|
|
song_part = f"_{self._sanitize_filename(theme.song_title)}" if theme.song_title else ""
|
|
s3_key = f"audio/{anime_name}_{theme_name}{song_part}.mp3"
|
|
|
|
# Read file and upload to S3
|
|
file_data = mp3_file.read_bytes()
|
|
file_size = len(file_data)
|
|
|
|
success = storage.upload_file(s3_key, file_data, "audio/mpeg")
|
|
if not success:
|
|
raise RuntimeError("Failed to upload to S3")
|
|
|
|
# Update theme with file info
|
|
theme.audio_s3_key = s3_key
|
|
theme.file_size_bytes = file_size
|
|
|
|
# Create Opening entity in main table
|
|
opening = Opening(
|
|
anime_name=anime.title_russian or anime.title_english or f"Anime {anime.shikimori_id}",
|
|
op_number=theme_name,
|
|
song_name=theme.song_title,
|
|
audio_file=s3_key.replace("audio/", ""),
|
|
)
|
|
self.db.add(opening)
|
|
await self.db.flush()
|
|
|
|
theme.opening_id = opening.id
|
|
|
|
# Mark task as done
|
|
task.status = DownloadStatus.DONE
|
|
task.progress_percent = 100
|
|
task.completed_at = datetime.now(timezone.utc)
|
|
task.estimated_size_bytes = file_size
|
|
|
|
await self.db.commit()
|
|
|
|
except Exception as e:
|
|
task.status = DownloadStatus.FAILED
|
|
task.error_message = str(e)[:1000]
|
|
task.progress_percent = 0
|
|
await self.db.commit()
|
|
|
|
def _sanitize_filename(self, name: str) -> str:
|
|
"""Sanitize string for use in filename."""
|
|
if not name:
|
|
return "unknown"
|
|
# Remove or replace problematic characters
|
|
sanitized = re.sub(r'[<>:"/\\|?*]', '', name)
|
|
sanitized = sanitized.replace(' ', '_')
|
|
# Limit length
|
|
return sanitized[:100]
|
|
|
|
async def cancel_task(self, task_id: int) -> bool:
|
|
"""Cancel a queued task. Returns True if cancelled."""
|
|
|
|
result = await self.db.execute(
|
|
select(DownloadTask).where(DownloadTask.id == task_id)
|
|
)
|
|
task = result.scalar_one_or_none()
|
|
|
|
if not task or task.status != DownloadStatus.QUEUED:
|
|
return False
|
|
|
|
await self.db.delete(task)
|
|
await self.db.commit()
|
|
return True
|
|
|
|
async def retry_task(self, task_id: int) -> bool:
|
|
"""Retry a failed task. Returns True if requeued."""
|
|
|
|
result = await self.db.execute(
|
|
update(DownloadTask)
|
|
.where(DownloadTask.id == task_id)
|
|
.where(DownloadTask.status == DownloadStatus.FAILED)
|
|
.values(
|
|
status=DownloadStatus.QUEUED,
|
|
error_message=None,
|
|
progress_percent=0,
|
|
started_at=None,
|
|
completed_at=None,
|
|
)
|
|
.returning(DownloadTask.id)
|
|
)
|
|
updated = result.scalar_one_or_none()
|
|
await self.db.commit()
|
|
return updated is not None
|
|
|
|
async def clear_completed_tasks(self, include_failed: bool = False) -> int:
|
|
"""Clear completed (and optionally failed) tasks. Returns number of deleted tasks."""
|
|
from sqlalchemy import delete
|
|
|
|
statuses = [DownloadStatus.DONE]
|
|
if include_failed:
|
|
statuses.append(DownloadStatus.FAILED)
|
|
|
|
result = await self.db.execute(
|
|
delete(DownloadTask)
|
|
.where(DownloadTask.status.in_(statuses))
|
|
.returning(DownloadTask.id)
|
|
)
|
|
deleted_ids = result.scalars().all()
|
|
await self.db.commit()
|
|
return len(deleted_ids)
|