import hashlib import urllib3 from pathlib import Path from typing import Optional import boto3 from botocore.config import Config as BotoConfig from botocore.exceptions import ClientError from .config import settings # Suppress SSL warnings for S3 endpoint urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class S3Storage: """S3-compatible storage service for media files.""" def __init__(self): self.client = boto3.client( "s3", endpoint_url=settings.s3_endpoint, aws_access_key_id=settings.s3_access_key, aws_secret_access_key=settings.s3_secret_key, region_name=settings.s3_region, config=BotoConfig(signature_version="s3v4"), verify=False, # Disable SSL verification for FirstVDS S3 ) self.bucket = settings.s3_bucket self.cache_path = settings.cache_path self._ensure_bucket_exists() def _ensure_bucket_exists(self): """Create bucket if it doesn't exist.""" try: self.client.head_bucket(Bucket=self.bucket) except ClientError as e: error_code = e.response.get("Error", {}).get("Code") if error_code in ("404", "NoSuchBucket"): try: self.client.create_bucket(Bucket=self.bucket) print(f"Created S3 bucket: {self.bucket}") except ClientError as create_error: print(f"Failed to create bucket {self.bucket}: {create_error}") def _get_cache_path(self, key: str) -> Path: """Get local cache path for a file.""" # Use hash of key for cache filename to avoid path issues key_hash = hashlib.md5(key.encode()).hexdigest()[:16] ext = Path(key).suffix return self.cache_path / f"{key_hash}{ext}" def list_files(self, prefix: str, extensions: Optional[list[str]] = None) -> list[str]: """List files in S3 bucket with given prefix.""" try: response = self.client.list_objects_v2( Bucket=self.bucket, Prefix=prefix, ) files = [] for obj in response.get("Contents", []): key = obj["Key"] filename = key.replace(prefix, "").lstrip("/") if filename: # Skip the prefix itself if extensions: if any(filename.lower().endswith(ext) for ext in extensions): files.append(filename) else: files.append(filename) return sorted(files) except ClientError as e: print(f"Error listing S3 files: {e}") return [] def download_file(self, key: str) -> Optional[Path]: """Download file from S3 to local cache.""" cache_file = self._get_cache_path(key) # Return cached file if exists if cache_file.exists(): return cache_file try: self.client.download_file( Bucket=self.bucket, Key=key, Filename=str(cache_file), ) return cache_file except ClientError as e: print(f"Error downloading {key} from S3: {e}") return None def get_audio_file(self, filename: str) -> Optional[Path]: """Download audio file from S3.""" key = f"audio/{filename}" return self.download_file(key) def get_background_file(self, filename: str) -> Optional[Path]: """Download background video from S3.""" key = f"backgrounds/{filename}" return self.download_file(key) def get_poster_file(self, filename: str) -> Optional[Path]: """Download poster image from S3.""" key = f"posters/{filename}" return self.download_file(key) def get_transition_file(self, filename: str) -> Optional[Path]: """Download transition sound from S3.""" key = f"transitions/{filename}" return self.download_file(key) def list_audio_files(self) -> list[str]: """List available audio files.""" return self.list_files("audio/", [".mp3", ".wav", ".ogg", ".m4a"]) def list_background_videos(self) -> list[str]: """List available background videos.""" return self.list_files("backgrounds/", [".mp4", ".mov", ".avi"]) def list_posters(self) -> list[str]: """List available poster images.""" return self.list_files("posters/", [".jpg", ".jpeg", ".png", ".webp"]) def list_transition_sounds(self) -> list[str]: """List available transition sounds.""" return self.list_files("transitions/", [".mp3", ".wav", ".ogg"]) def file_exists(self, key: str) -> bool: """Check if file exists in S3.""" try: self.client.head_object(Bucket=self.bucket, Key=key) return True except ClientError: return False def clear_cache(self): """Clear local cache.""" for file in self.cache_path.glob("*"): try: file.unlink() except Exception: pass def upload_file(self, key: str, file_data: bytes, content_type: str = None) -> bool: """Upload file to S3.""" try: extra_args = {} if content_type: extra_args["ContentType"] = content_type self.client.put_object( Bucket=self.bucket, Key=key, Body=file_data, **extra_args ) return True except ClientError as e: print(f"Error uploading {key} to S3: {e}") return False def upload_audio(self, filename: str, file_data: bytes) -> bool: """Upload audio file to S3.""" content_type = "audio/mpeg" if filename.lower().endswith(".mp3") else "audio/wav" return self.upload_file(f"audio/{filename}", file_data, content_type) def upload_background(self, filename: str, file_data: bytes) -> bool: """Upload background video to S3.""" return self.upload_file(f"backgrounds/{filename}", file_data, "video/mp4") def upload_poster(self, filename: str, file_data: bytes) -> bool: """Upload poster image to S3.""" ext = filename.lower().split(".")[-1] content_types = { "jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png", "webp": "image/webp" } content_type = content_types.get(ext, "application/octet-stream") return self.upload_file(f"posters/{filename}", file_data, content_type) def upload_transition(self, filename: str, file_data: bytes) -> bool: """Upload transition sound to S3.""" return self.upload_file(f"transitions/{filename}", file_data, "audio/mpeg") def delete_file(self, key: str) -> bool: """Delete file from S3.""" try: self.client.delete_object(Bucket=self.bucket, Key=key) # Also remove from cache cache_file = self._get_cache_path(key) if cache_file.exists(): cache_file.unlink() return True except ClientError as e: print(f"Error deleting {key} from S3: {e}") return False def delete_audio(self, filename: str) -> bool: """Delete audio file from S3.""" return self.delete_file(f"audio/{filename}") def delete_background(self, filename: str) -> bool: """Delete background video from S3.""" return self.delete_file(f"backgrounds/{filename}") def delete_poster(self, filename: str) -> bool: """Delete poster image from S3.""" return self.delete_file(f"posters/{filename}") def delete_transition(self, filename: str) -> bool: """Delete transition sound from S3.""" return self.delete_file(f"transitions/{filename}") # Global storage instance storage = S3Storage()