import asyncio from collections.abc import AsyncIterator async def _pipe_stream(source: asyncio.StreamReader, dest: asyncio.StreamWriter): """Forward data from streamlink stdout to ffmpeg stdin.""" try: while True: chunk = await source.read(65536) if not chunk: break dest.write(chunk) await dest.drain() except (BrokenPipeError, ConnectionResetError): pass finally: dest.close() async def capture_frames( channel: str, quality: str, interval: int ) -> AsyncIterator[bytes]: """Capture frames from a Twitch stream using streamlink + ffmpeg. Yields JPEG frames as bytes at the specified interval. """ streamlink_cmd = [ "streamlink", "--stdout", f"https://twitch.tv/{channel}", quality, ] ffmpeg_cmd = [ "ffmpeg", "-i", "pipe:0", "-vf", f"fps=1/{interval}", "-f", "image2pipe", "-vcodec", "mjpeg", "-q:v", "5", "pipe:1", ] streamlink_proc = await asyncio.create_subprocess_exec( *streamlink_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) ffmpeg_proc = await asyncio.create_subprocess_exec( *ffmpeg_cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) # Forward streamlink → ffmpeg in background pipe_task = asyncio.create_task( _pipe_stream(streamlink_proc.stdout, ffmpeg_proc.stdin) ) try: buf = b"" while True: chunk = await ffmpeg_proc.stdout.read(65536) if not chunk: break buf += chunk # Extract complete JPEG frames (SOI: FF D8, EOI: FF D9) while True: soi = buf.find(b"\xff\xd8") if soi == -1: buf = b"" break eoi = buf.find(b"\xff\xd9", soi + 2) if eoi == -1: buf = buf[soi:] break frame = buf[soi : eoi + 2] buf = buf[eoi + 2 :] yield frame finally: pipe_task.cancel() for proc in (ffmpeg_proc, streamlink_proc): try: proc.terminate() except ProcessLookupError: pass await asyncio.gather( ffmpeg_proc.wait(), streamlink_proc.wait(), return_exceptions=True )