Initial commit: Twitch Stream Vision Analyzer

Async pipeline: streamlink + ffmpeg frame capture → Gemini Vision API analysis → rich console output + log file.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-05 10:35:37 +01:00
commit d0e0e53087
8 changed files with 305 additions and 0 deletions

3
.env.example Normal file
View File

@@ -0,0 +1,3 @@
GEMINI_API_KEY=your-api-key-here
# Optional: Cloudflare AI Gateway URL
# GEMINI_BASE_URL=https://gateway.ai.cloudflare.com/v1/your-account/your-gateway/google-ai-studio

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
__pycache__/
*.pyc
.env
stream_log.txt

77
analyzer.py Normal file
View File

@@ -0,0 +1,77 @@
import base64
from google import genai
from google.genai import types
SYSTEM_PROMPT_RU = (
"Ты анализируешь кадры с Twitch-стрима. "
"Кратко опиши что происходит на экране: игра, действия стримера, "
"интерфейс, чат, оверлеи. Будь лаконичен (2-3 предложения). "
"Если ничего не изменилось по сравнению с предыдущим описанием, "
"скажи 'Без изменений' и уточни только новые детали."
)
SYSTEM_PROMPT_EN = (
"You are analyzing frames from a Twitch stream. "
"Briefly describe what's happening on screen: game, streamer actions, "
"UI, chat, overlays. Be concise (2-3 sentences). "
"If nothing changed compared to the previous description, "
"say 'No changes' and only note new details."
)
class VisionAnalyzer:
def __init__(self, api_key: str, base_url: str | None = None, lang: str = "ru"):
client_kwargs = {"api_key": api_key}
if base_url:
client_kwargs["http_options"] = types.HttpOptions(base_url=base_url)
self.client = genai.Client(**client_kwargs)
self.model = "gemini-2.0-flash"
self.system_prompt = SYSTEM_PROMPT_RU if lang == "ru" else SYSTEM_PROMPT_EN
self.previous_description: str | None = None
async def analyze_frame(self, frame_data: bytes) -> str:
b64_image = base64.b64encode(frame_data).decode("utf-8")
contents = []
if self.previous_description:
contents.append(
types.Content(
role="user",
parts=[
types.Part.from_text(
text=f"Предыдущее описание: {self.previous_description}"
)
],
)
)
contents.append(
types.Content(
role="model",
parts=[types.Part.from_text(text="Понял, учту контекст.")],
)
)
contents.append(
types.Content(
role="user",
parts=[
types.Part.from_bytes(data=frame_data, mime_type="image/jpeg"),
types.Part.from_text(text="Опиши что сейчас происходит на стриме."),
],
)
)
response = await self.client.aio.models.generate_content(
model=self.model,
contents=contents,
config=types.GenerateContentConfig(
system_instruction=self.system_prompt,
max_output_tokens=300,
temperature=0.3,
),
)
description = response.text or "(нет описания)"
self.previous_description = description
return description

76
capture.py Normal file
View File

@@ -0,0 +1,76 @@
import asyncio
import struct
from collections.abc import AsyncIterator
async def capture_frames(
channel: str, quality: str, interval: int
) -> AsyncIterator[bytes]:
"""Capture frames from a Twitch stream using streamlink + ffmpeg.
Yields JPEG frames as bytes at the specified interval.
"""
streamlink_cmd = [
"streamlink",
"--stdout",
f"https://twitch.tv/{channel}",
quality,
]
ffmpeg_cmd = [
"ffmpeg",
"-i", "pipe:0",
"-vf", f"fps=1/{interval}",
"-f", "image2pipe",
"-vcodec", "mjpeg",
"-q:v", "5",
"pipe:1",
]
streamlink_proc = await asyncio.create_subprocess_exec(
*streamlink_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
ffmpeg_proc = await asyncio.create_subprocess_exec(
*ffmpeg_cmd,
stdin=streamlink_proc.stdout,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
# Release streamlink's stdout so ffmpeg owns the pipe
streamlink_proc.stdout = None
try:
buf = b""
while True:
chunk = await ffmpeg_proc.stdout.read(65536)
if not chunk:
break
buf += chunk
# Extract complete JPEG frames (SOI: FF D8, EOI: FF D9)
while True:
soi = buf.find(b"\xff\xd8")
if soi == -1:
buf = b""
break
eoi = buf.find(b"\xff\xd9", soi + 2)
if eoi == -1:
# Keep from SOI onward, discard junk before
buf = buf[soi:]
break
frame = buf[soi : eoi + 2]
buf = buf[eoi + 2 :]
yield frame
finally:
for proc in (ffmpeg_proc, streamlink_proc):
try:
proc.terminate()
except ProcessLookupError:
pass
await asyncio.gather(
ffmpeg_proc.wait(), streamlink_proc.wait(), return_exceptions=True
)

42
config.py Normal file
View File

@@ -0,0 +1,42 @@
import argparse
import os
from dotenv import load_dotenv
def load_config() -> argparse.Namespace:
load_dotenv()
parser = argparse.ArgumentParser(description="Twitch Stream Vision Analyzer")
parser.add_argument("--channel", required=True, help="Twitch channel name")
parser.add_argument(
"--interval",
type=int,
default=15,
help="Frame capture interval in seconds (default: 15)",
)
parser.add_argument(
"--quality",
default="480p",
help="Stream quality (default: 480p)",
)
parser.add_argument(
"--lang",
default="ru",
help="Language for descriptions (default: ru)",
)
parser.add_argument(
"--log-file",
default="stream_log.txt",
help="Log file path (default: stream_log.txt)",
)
args = parser.parse_args()
args.gemini_api_key = os.environ.get("GEMINI_API_KEY")
if not args.gemini_api_key:
parser.error("GEMINI_API_KEY must be set in .env or environment")
args.gemini_base_url = os.environ.get("GEMINI_BASE_URL")
return args

71
main.py Normal file
View File

@@ -0,0 +1,71 @@
import asyncio
import signal
from rich.console import Console
from config import load_config
from capture import capture_frames
from analyzer import VisionAnalyzer
from output import print_description, log_description
console = Console()
async def run(config) -> None:
analyzer = VisionAnalyzer(
api_key=config.gemini_api_key,
base_url=config.gemini_base_url,
lang=config.lang,
)
console.print(
f"[bold green]Starting stream analysis[/bold green] "
f"channel=[cyan]{config.channel}[/cyan] "
f"interval=[cyan]{config.interval}s[/cyan] "
f"quality=[cyan]{config.quality}[/cyan]"
)
console.print("[dim]Press Ctrl+C to stop[/dim]\n")
frame_number = 0
async for frame_data in capture_frames(
config.channel, config.quality, config.interval
):
frame_number += 1
console.print(f"[dim]Captured frame #{frame_number}, analyzing...[/dim]")
try:
description = await analyzer.analyze_frame(frame_data)
except Exception as e:
console.print(f"[bold red]Analysis error:[/bold red] {e}")
continue
print_description(description, frame_number)
await log_description(config.log_file, description, frame_number)
def main() -> None:
config = load_config()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = loop.create_task(run(config))
def shutdown(sig, frame):
console.print("\n[bold yellow]Shutting down...[/bold yellow]")
task.cancel()
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
console.print("[bold green]Stopped.[/bold green]")
finally:
loop.close()
if __name__ == "__main__":
main()

27
output.py Normal file
View File

@@ -0,0 +1,27 @@
import aiofiles
from datetime import datetime
from rich.console import Console
from rich.panel import Panel
console = Console()
def print_description(description: str, frame_number: int) -> None:
timestamp = datetime.now().strftime("%H:%M:%S")
console.print(
Panel(
description,
title=f"[bold cyan]Frame #{frame_number}[/bold cyan] [{timestamp}]",
border_style="blue",
)
)
async def log_description(
log_file: str, description: str, frame_number: int
) -> None:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{timestamp}] Frame #{frame_number}: {description}\n"
async with aiofiles.open(log_file, "a", encoding="utf-8") as f:
await f.write(line)

5
requirements.txt Normal file
View File

@@ -0,0 +1,5 @@
streamlink
google-genai
python-dotenv
rich
aiofiles