- stderr from both processes now displayed in console - streamlink checked for early exit before starting ffmpeg - RuntimeError shown to user if stream unavailable Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
130 lines
3.7 KiB
Python
130 lines
3.7 KiB
Python
import asyncio
|
|
from collections.abc import AsyncIterator
|
|
|
|
from rich.console import Console
|
|
|
|
console = Console()
|
|
|
|
|
|
async def _pipe_stream(source: asyncio.StreamReader, dest: asyncio.StreamWriter):
|
|
"""Forward data from streamlink stdout to ffmpeg stdin."""
|
|
try:
|
|
while True:
|
|
chunk = await source.read(65536)
|
|
if not chunk:
|
|
break
|
|
dest.write(chunk)
|
|
await dest.drain()
|
|
except (BrokenPipeError, ConnectionResetError):
|
|
pass
|
|
finally:
|
|
dest.close()
|
|
|
|
|
|
async def _log_stderr(proc_name: str, stderr: asyncio.StreamReader):
|
|
"""Read and display stderr from a subprocess."""
|
|
while True:
|
|
line = await stderr.readline()
|
|
if not line:
|
|
break
|
|
text = line.decode("utf-8", errors="replace").rstrip()
|
|
if text:
|
|
console.print(f"[dim red][{proc_name}] {text}[/dim red]")
|
|
|
|
|
|
async def capture_frames(
|
|
channel: str, quality: str, interval: int
|
|
) -> AsyncIterator[bytes]:
|
|
"""Capture frames from a Twitch stream using streamlink + ffmpeg.
|
|
|
|
Yields JPEG frames as bytes at the specified interval.
|
|
"""
|
|
streamlink_cmd = [
|
|
"streamlink",
|
|
"--stdout",
|
|
f"https://twitch.tv/{channel}",
|
|
quality,
|
|
]
|
|
|
|
ffmpeg_cmd = [
|
|
"ffmpeg",
|
|
"-loglevel", "warning",
|
|
"-i", "pipe:0",
|
|
"-vf", f"fps=1/{interval}",
|
|
"-f", "image2pipe",
|
|
"-vcodec", "mjpeg",
|
|
"-q:v", "5",
|
|
"pipe:1",
|
|
]
|
|
|
|
console.print("[dim]Starting streamlink...[/dim]")
|
|
streamlink_proc = await asyncio.create_subprocess_exec(
|
|
*streamlink_cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
|
|
# Wait a moment and check if streamlink started OK
|
|
await asyncio.sleep(2)
|
|
if streamlink_proc.returncode is not None:
|
|
stderr_out = await streamlink_proc.stderr.read()
|
|
raise RuntimeError(
|
|
f"streamlink exited with code {streamlink_proc.returncode}: "
|
|
f"{stderr_out.decode('utf-8', errors='replace')}"
|
|
)
|
|
|
|
console.print("[dim]Starting ffmpeg...[/dim]")
|
|
ffmpeg_proc = await asyncio.create_subprocess_exec(
|
|
*ffmpeg_cmd,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
|
|
# Log stderr from both processes
|
|
stderr_tasks = [
|
|
asyncio.create_task(_log_stderr("streamlink", streamlink_proc.stderr)),
|
|
asyncio.create_task(_log_stderr("ffmpeg", ffmpeg_proc.stderr)),
|
|
]
|
|
|
|
# Forward streamlink → ffmpeg in background
|
|
pipe_task = asyncio.create_task(
|
|
_pipe_stream(streamlink_proc.stdout, ffmpeg_proc.stdin)
|
|
)
|
|
|
|
console.print("[dim]Pipeline running, waiting for first frame...[/dim]")
|
|
|
|
try:
|
|
buf = b""
|
|
while True:
|
|
chunk = await ffmpeg_proc.stdout.read(65536)
|
|
if not chunk:
|
|
break
|
|
buf += chunk
|
|
|
|
# Extract complete JPEG frames (SOI: FF D8, EOI: FF D9)
|
|
while True:
|
|
soi = buf.find(b"\xff\xd8")
|
|
if soi == -1:
|
|
buf = b""
|
|
break
|
|
eoi = buf.find(b"\xff\xd9", soi + 2)
|
|
if eoi == -1:
|
|
buf = buf[soi:]
|
|
break
|
|
frame = buf[soi : eoi + 2]
|
|
buf = buf[eoi + 2 :]
|
|
yield frame
|
|
finally:
|
|
pipe_task.cancel()
|
|
for t in stderr_tasks:
|
|
t.cancel()
|
|
for proc in (ffmpeg_proc, streamlink_proc):
|
|
try:
|
|
proc.terminate()
|
|
except ProcessLookupError:
|
|
pass
|
|
await asyncio.gather(
|
|
ffmpeg_proc.wait(), streamlink_proc.wait(), return_exceptions=True
|
|
)
|