Files
tg_bot_language/services/vocabulary_service.py
mamonov.ep adc8a6bf8e feat: мини-игры, premium подписка, улучшенные контексты
Мини-игры (/games):
- Speed Round: 10 раундов, 10 секунд на ответ, очки за скорость
- Match Pairs: 5 слов + 5 переводов, соединить пары

Premium-функции:
- Поля is_premium и premium_until для пользователей
- AI режим проверки ответов (учитывает синонимы)
- Batch проверка всех ответов одним запросом

Улучшения:
- Примеры использования для всех добавляемых слов
- Разбиение переводов по запятой на отдельные записи
- Полные предложения в контекстах (без ___)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-10 19:42:10 +03:00

529 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import random
import re
from typing import List, Optional, Dict
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from database.models import Vocabulary, WordSource, LanguageLevel, WordTranslation
class VocabularyService:
"""Сервис для работы со словарным запасом"""
@staticmethod
async def add_word(
session: AsyncSession,
user_id: int,
word_original: str,
word_translation: str,
source_lang: Optional[str] = None,
translation_lang: Optional[str] = None,
transcription: Optional[str] = None,
difficulty_level: Optional[str] = None,
source: WordSource = WordSource.MANUAL,
notes: Optional[str] = None
) -> Vocabulary:
"""
Добавить слово в словарь пользователя
Args:
session: Сессия базы данных
user_id: ID пользователя
word_original: Оригинальное слово
word_translation: Перевод
transcription: Транскрипция
difficulty_level: Уровень сложности
source: Источник добавления
notes: Заметки пользователя
Returns:
Созданный объект слова
"""
# Преобразование difficulty_level в enum
difficulty_enum = None
if difficulty_level:
try:
difficulty_enum = LanguageLevel[difficulty_level]
except KeyError:
difficulty_enum = None
new_word = Vocabulary(
user_id=user_id,
word_original=word_original,
word_translation=word_translation,
source_lang=source_lang,
translation_lang=translation_lang,
transcription=transcription,
difficulty_level=difficulty_enum,
source=source,
notes=notes
)
session.add(new_word)
await session.commit()
await session.refresh(new_word)
return new_word
@staticmethod
@staticmethod
def _is_japanese(text: str) -> bool:
if not text:
return False
return re.search(r"[\u3040-\u30FF\u3400-\u4DBF\u4E00-\u9FFF]", text) is not None
@staticmethod
def _filter_by_learning_lang(words: List[Vocabulary], learning_lang: Optional[str]) -> List[Vocabulary]:
if not learning_lang:
return words
# Если в БД указан source_lang фильтруем по нему.
with_lang = [w for w in words if getattr(w, 'source_lang', None)]
if with_lang:
return [w for w in words if (w.source_lang or '').lower() == learning_lang.lower()]
# Фолбэк-эвристика для японского, если язык не сохранён
if learning_lang.lower() == 'ja':
return [w for w in words if VocabularyService._is_japanese(w.word_original)]
return [w for w in words if not VocabularyService._is_japanese(w.word_original)]
@staticmethod
async def get_user_words(
session: AsyncSession,
user_id: int,
limit: int = 50,
offset: int = 0,
learning_lang: Optional[str] = None
) -> List[Vocabulary]:
"""
Получить слова пользователя с пагинацией
Args:
session: Сессия базы данных
user_id: ID пользователя
limit: Максимальное количество слов
offset: Смещение для пагинации
Returns:
Список слов пользователя
"""
result = await session.execute(
select(Vocabulary)
.where(Vocabulary.user_id == user_id)
.order_by(Vocabulary.created_at.desc())
)
words = list(result.scalars().all())
words = VocabularyService._filter_by_learning_lang(words, learning_lang)
return words[offset:offset + limit]
@staticmethod
async def get_words_count(session: AsyncSession, user_id: int, learning_lang: Optional[str] = None) -> int:
"""
Получить количество слов в словаре пользователя
Args:
session: Сессия базы данных
user_id: ID пользователя
Returns:
Количество слов
"""
result = await session.execute(
select(Vocabulary).where(Vocabulary.user_id == user_id)
)
words = list(result.scalars().all())
words = VocabularyService._filter_by_learning_lang(words, learning_lang)
return len(words)
@staticmethod
async def find_word(
session: AsyncSession,
user_id: int,
word: str,
source_lang: Optional[str] = None
) -> Optional[Vocabulary]:
"""
Найти слово в словаре пользователя
Args:
session: Сессия базы данных
user_id: ID пользователя
word: Слово для поиска
source_lang: Язык изучения для фильтрации (если указан)
Returns:
Объект слова или None
"""
query = (
select(Vocabulary)
.where(Vocabulary.user_id == user_id)
.where(Vocabulary.word_original.ilike(f"%{word}%"))
)
if source_lang:
query = query.where(Vocabulary.source_lang == source_lang.lower())
result = await session.execute(query)
return result.scalar_one_or_none()
@staticmethod
async def get_word_by_original(
session: AsyncSession,
user_id: int,
word: str,
source_lang: Optional[str] = None
) -> Optional[Vocabulary]:
"""
Получить слово по точному совпадению
Args:
session: Сессия базы данных
user_id: ID пользователя
word: Слово для поиска (точное совпадение)
source_lang: Язык изучения для фильтрации (если указан)
Returns:
Объект слова или None
"""
query = (
select(Vocabulary)
.where(Vocabulary.user_id == user_id)
.where(Vocabulary.word_original == word.lower())
)
if source_lang:
query = query.where(Vocabulary.source_lang == source_lang.lower())
result = await session.execute(query)
return result.scalar_one_or_none()
@staticmethod
async def get_all_user_word_strings(
session: AsyncSession,
user_id: int,
learning_lang: Optional[str] = None
) -> List[str]:
"""
Получить список всех слов пользователя (только строки)
Args:
session: Сессия базы данных
user_id: ID пользователя
learning_lang: Язык изучения для фильтрации
Returns:
Список строк — оригинальных слов
"""
result = await session.execute(
select(Vocabulary)
.where(Vocabulary.user_id == user_id)
)
words = list(result.scalars().all())
words = VocabularyService._filter_by_learning_lang(words, learning_lang)
return [w.word_original.lower() for w in words]
# === Методы для работы с переводами ===
@staticmethod
async def add_translation(
session: AsyncSession,
vocabulary_id: int,
translation: str,
context: Optional[str] = None,
context_translation: Optional[str] = None,
is_primary: bool = False
) -> WordTranslation:
"""
Добавить перевод к слову
Args:
session: Сессия базы данных
vocabulary_id: ID слова в словаре
translation: Перевод
context: Пример предложения на языке изучения
context_translation: Перевод примера
is_primary: Является ли основным переводом
Returns:
Созданный объект перевода
"""
# Если это основной перевод, снимаем флаг с других
if is_primary:
result = await session.execute(
select(WordTranslation)
.where(WordTranslation.vocabulary_id == vocabulary_id)
.where(WordTranslation.is_primary == True)
)
for existing in result.scalars().all():
existing.is_primary = False
new_translation = WordTranslation(
vocabulary_id=vocabulary_id,
translation=translation,
context=context,
context_translation=context_translation,
is_primary=is_primary
)
session.add(new_translation)
await session.commit()
await session.refresh(new_translation)
return new_translation
@staticmethod
async def add_translation_split(
session: AsyncSession,
vocabulary_id: int,
translation: str,
context: Optional[str] = None,
context_translation: Optional[str] = None,
is_primary: bool = True
) -> List[WordTranslation]:
"""
Добавить перевод(ы) к слову, разбивая строку по разделителям.
Если translation содержит несколько переводов через запятую или точку с запятой,
каждый перевод добавляется отдельной записью.
Args:
session: Сессия базы данных
vocabulary_id: ID слова в словаре
translation: Перевод (может содержать несколько через запятую)
context: Пример использования на языке изучения
context_translation: Перевод примера
is_primary: Является ли первый перевод основным
Returns:
Список созданных переводов
"""
import re
# Разбиваем по запятой или точке с запятой
parts = re.split(r'[,;]\s*', translation)
# Очищаем и фильтруем пустые
translations = [p.strip() for p in parts if p.strip()]
if not translations:
return []
created = []
for i, tr in enumerate(translations):
new_translation = WordTranslation(
vocabulary_id=vocabulary_id,
translation=tr,
context=context if i == 0 else None, # Контекст только для первого перевода
context_translation=context_translation if i == 0 else None,
is_primary=(is_primary and i == 0) # Только первый - основной
)
session.add(new_translation)
created.append(new_translation)
await session.commit()
for t in created:
await session.refresh(t)
return created
@staticmethod
async def add_translations_bulk(
session: AsyncSession,
vocabulary_id: int,
translations: List[Dict]
) -> List[WordTranslation]:
"""
Добавить несколько переводов к слову
Args:
session: Сессия базы данных
vocabulary_id: ID слова
translations: Список словарей с переводами
[{"translation": "...", "context": "...", "context_translation": "...", "is_primary": bool}]
Returns:
Список созданных переводов
"""
created = []
for i, tr_data in enumerate(translations):
new_translation = WordTranslation(
vocabulary_id=vocabulary_id,
translation=tr_data.get('translation', ''),
context=tr_data.get('context'),
context_translation=tr_data.get('context_translation'),
is_primary=tr_data.get('is_primary', i == 0) # Первый по умолчанию основной
)
session.add(new_translation)
created.append(new_translation)
await session.commit()
for tr in created:
await session.refresh(tr)
return created
@staticmethod
async def get_word_translations(
session: AsyncSession,
vocabulary_id: int
) -> List[WordTranslation]:
"""
Получить все переводы слова
Args:
session: Сессия базы данных
vocabulary_id: ID слова
Returns:
Список переводов
"""
result = await session.execute(
select(WordTranslation)
.where(WordTranslation.vocabulary_id == vocabulary_id)
.order_by(WordTranslation.is_primary.desc(), WordTranslation.created_at)
)
return list(result.scalars().all())
@staticmethod
async def get_primary_translation(
session: AsyncSession,
vocabulary_id: int
) -> Optional[WordTranslation]:
"""
Получить основной перевод слова
Args:
session: Сессия базы данных
vocabulary_id: ID слова
Returns:
Основной перевод или None
"""
result = await session.execute(
select(WordTranslation)
.where(WordTranslation.vocabulary_id == vocabulary_id)
.where(WordTranslation.is_primary == True)
)
return result.scalar_one_or_none()
@staticmethod
async def delete_translation(
session: AsyncSession,
translation_id: int
) -> bool:
"""
Удалить перевод
Args:
session: Сессия базы данных
translation_id: ID перевода
Returns:
True если удалено, False если не найдено
"""
result = await session.execute(
select(WordTranslation).where(WordTranslation.id == translation_id)
)
translation = result.scalar_one_or_none()
if translation:
await session.delete(translation)
await session.commit()
return True
return False
@staticmethod
async def get_user_word_count(session: AsyncSession, user_id: int) -> int:
"""
Получить количество слов пользователя
Args:
session: Сессия базы данных
user_id: ID пользователя
Returns:
Количество слов
"""
result = await session.execute(
select(func.count(Vocabulary.id)).where(Vocabulary.user_id == user_id)
)
return result.scalar() or 0
@staticmethod
async def get_random_words_for_game(
session: AsyncSession,
user_id: int,
count: int = 10,
learning_lang: Optional[str] = None
) -> List[Vocabulary]:
"""
Получить случайные слова для мини-игры
Args:
session: Сессия базы данных
user_id: ID пользователя
count: Количество слов
learning_lang: Язык изучения для фильтрации
Returns:
Список случайных слов
"""
result = await session.execute(
select(Vocabulary).where(Vocabulary.user_id == user_id)
)
words = list(result.scalars().all())
if learning_lang:
words = VocabularyService._filter_by_learning_lang(words, learning_lang)
# Перемешиваем и берём нужное количество
random.shuffle(words)
return words[:count]
@staticmethod
async def get_random_words_with_translations(
session: AsyncSession,
user_id: int,
count: int = 10,
learning_lang: Optional[str] = None
) -> List[dict]:
"""
Получить случайные слова для мини-игры вместе со всеми переводами
Args:
session: Сессия базы данных
user_id: ID пользователя
count: Количество слов
learning_lang: Язык изучения для фильтрации
Returns:
Список словарей с информацией о словах и их переводах
"""
# Получаем слова
words = await VocabularyService.get_random_words_for_game(
session, user_id, count, learning_lang
)
result = []
for word in words:
# Получаем все переводы для слова
translations = await VocabularyService.get_word_translations(session, word.id)
# Собираем все варианты перевода
all_translations = []
# Основной перевод из vocabulary
if word.word_translation:
all_translations.append(word.word_translation.lower().strip())
# Переводы из word_translations
for tr in translations:
tr_text = tr.translation.lower().strip()
if tr_text not in all_translations:
all_translations.append(tr_text)
result.append({
'id': word.id,
'word': word.word_original,
'translation': word.word_translation, # Основной перевод для отображения
'all_translations': all_translations, # Все варианты для проверки
'transcription': word.transcription
})
return result