import asyncio from collections.abc import AsyncIterator from rich.console import Console console = Console() async def _pipe_stream(source: asyncio.StreamReader, dest: asyncio.StreamWriter): """Forward data from streamlink stdout to ffmpeg stdin.""" try: while True: chunk = await source.read(65536) if not chunk: break dest.write(chunk) await dest.drain() except (BrokenPipeError, ConnectionResetError): pass finally: dest.close() async def _log_stderr(proc_name: str, stderr: asyncio.StreamReader): """Read and display stderr from a subprocess.""" while True: line = await stderr.readline() if not line: break text = line.decode("utf-8", errors="replace").rstrip() if text: console.print(f"[dim red][{proc_name}] {text}[/dim red]") async def capture_frames( channel: str, quality: str, interval: int ) -> AsyncIterator[bytes]: """Capture frames from a Twitch stream using streamlink + ffmpeg. Yields JPEG frames as bytes at the specified interval. """ streamlink_cmd = [ "streamlink", "--stdout", f"https://twitch.tv/{channel}", quality, ] ffmpeg_cmd = [ "ffmpeg", "-loglevel", "warning", "-i", "pipe:0", "-vf", f"fps=1/{interval}", "-f", "image2pipe", "-vcodec", "mjpeg", "-q:v", "5", "pipe:1", ] console.print("[dim]Starting streamlink...[/dim]") streamlink_proc = await asyncio.create_subprocess_exec( *streamlink_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) # Wait a moment and check if streamlink started OK await asyncio.sleep(2) if streamlink_proc.returncode is not None: stderr_out = await streamlink_proc.stderr.read() raise RuntimeError( f"streamlink exited with code {streamlink_proc.returncode}: " f"{stderr_out.decode('utf-8', errors='replace')}" ) console.print("[dim]Starting ffmpeg...[/dim]") ffmpeg_proc = await asyncio.create_subprocess_exec( *ffmpeg_cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) # Log stderr from both processes stderr_tasks = [ asyncio.create_task(_log_stderr("streamlink", streamlink_proc.stderr)), asyncio.create_task(_log_stderr("ffmpeg", ffmpeg_proc.stderr)), ] # Forward streamlink → ffmpeg in background pipe_task = asyncio.create_task( _pipe_stream(streamlink_proc.stdout, ffmpeg_proc.stdin) ) console.print("[dim]Pipeline running, waiting for first frame...[/dim]") try: buf = b"" while True: chunk = await ffmpeg_proc.stdout.read(65536) if not chunk: break buf += chunk # Extract complete JPEG frames (SOI: FF D8, EOI: FF D9) while True: soi = buf.find(b"\xff\xd8") if soi == -1: buf = b"" break eoi = buf.find(b"\xff\xd9", soi + 2) if eoi == -1: buf = buf[soi:] break frame = buf[soi : eoi + 2] buf = buf[eoi + 2 :] yield frame finally: pipe_task.cancel() for t in stderr_tasks: t.cancel() for proc in (ffmpeg_proc, streamlink_proc): try: proc.terminate() except ProcessLookupError: pass await asyncio.gather( ffmpeg_proc.wait(), streamlink_proc.wait(), return_exceptions=True )