import asyncio import struct from collections.abc import AsyncIterator async def capture_frames( channel: str, quality: str, interval: int ) -> AsyncIterator[bytes]: """Capture frames from a Twitch stream using streamlink + ffmpeg. Yields JPEG frames as bytes at the specified interval. """ streamlink_cmd = [ "streamlink", "--stdout", f"https://twitch.tv/{channel}", quality, ] ffmpeg_cmd = [ "ffmpeg", "-i", "pipe:0", "-vf", f"fps=1/{interval}", "-f", "image2pipe", "-vcodec", "mjpeg", "-q:v", "5", "pipe:1", ] streamlink_proc = await asyncio.create_subprocess_exec( *streamlink_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) ffmpeg_proc = await asyncio.create_subprocess_exec( *ffmpeg_cmd, stdin=streamlink_proc.stdout, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) # Release streamlink's stdout so ffmpeg owns the pipe streamlink_proc.stdout = None try: buf = b"" while True: chunk = await ffmpeg_proc.stdout.read(65536) if not chunk: break buf += chunk # Extract complete JPEG frames (SOI: FF D8, EOI: FF D9) while True: soi = buf.find(b"\xff\xd8") if soi == -1: buf = b"" break eoi = buf.find(b"\xff\xd9", soi + 2) if eoi == -1: # Keep from SOI onward, discard junk before buf = buf[soi:] break frame = buf[soi : eoi + 2] buf = buf[eoi + 2 :] yield frame finally: for proc in (ffmpeg_proc, streamlink_proc): try: proc.terminate() except ProcessLookupError: pass await asyncio.gather( ffmpeg_proc.wait(), streamlink_proc.wait(), return_exceptions=True )