Files
tg_bot_language/services/task_service.py
mamonov.ep d937b37a3b feat: multiple translations with context, improved task examples
- Add WordTranslation model for storing multiple translations per word
- AI generates translations with example sentences and their translations
- Show example usage after answering tasks (learning + interface language)
- Save translations to word_translations table when adding words from tasks
- Improve word exclusion in new_words mode (stronger prompt + client filtering)
- Add migration for word_translations table

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-06 21:29:41 +03:00

375 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import random
from datetime import datetime
from typing import List, Dict, Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database.models import Task, Vocabulary, WordTranslation
from services.ai_service import ai_service
class TaskService:
"""Сервис для работы с заданиями"""
@staticmethod
async def generate_translation_tasks(
session: AsyncSession,
user_id: int,
count: int = 5
) -> List[Dict]:
"""
Генерация заданий на перевод слов
Args:
session: Сессия базы данных
user_id: ID пользователя
count: Количество заданий
Returns:
Список заданий
"""
# Получаем слова пользователя
result = await session.execute(
select(Vocabulary)
.where(Vocabulary.user_id == user_id)
.order_by(Vocabulary.last_reviewed.asc().nullsfirst())
.limit(count * 2) # Берем больше, чтобы было из чего выбрать
)
words = list(result.scalars().all())
if not words:
return []
# Выбираем случайные слова
selected_words = random.sample(words, min(count, len(words)))
tasks = []
for word in selected_words:
# Случайно выбираем направление перевода
direction = random.choice(['en_to_ru', 'ru_to_en'])
if direction == 'en_to_ru':
task = {
'type': 'translate_to_ru',
'word_id': word.id,
'question': f"Переведи слово: <b>{word.word_original}</b>",
'word': word.word_original,
'correct_answer': word.word_translation,
'transcription': word.transcription
}
else:
task = {
'type': 'translate_to_en',
'word_id': word.id,
'question': f"Переведи слово: <b>{word.word_translation}</b>",
'word': word.word_translation,
'correct_answer': word.word_original,
'transcription': word.transcription
}
tasks.append(task)
return tasks
@staticmethod
async def generate_mixed_tasks(
session: AsyncSession,
user_id: int,
count: int = 5,
learning_lang: str = 'en',
translation_lang: str = 'ru'
) -> List[Dict]:
"""
Генерация заданий разных типов (переводы + заполнение пропусков)
Args:
session: Сессия базы данных
user_id: ID пользователя
count: Количество заданий
Returns:
Список заданий разных типов
"""
# Получаем слова пользователя
result = await session.execute(
select(Vocabulary)
.where(Vocabulary.user_id == user_id)
.order_by(Vocabulary.last_reviewed.asc().nullsfirst())
.limit(count * 2)
)
words = list(result.scalars().all())
if not words:
return []
# Выбираем случайные слова
selected_words = random.sample(words, min(count, len(words)))
tasks = []
for word in selected_words:
# Получаем переводы из таблицы WordTranslation
translations_result = await session.execute(
select(WordTranslation)
.where(WordTranslation.vocabulary_id == word.id)
.order_by(WordTranslation.is_primary.desc())
)
translations = list(translations_result.scalars().all())
# Случайно выбираем тип задания
# Если есть переводы с контекстом, добавляем тип 'context_translate'
task_types = ['translate', 'fill_in']
if translations and any(tr.context for tr in translations):
task_types.append('context_translate')
task_type = random.choice(task_types)
if task_type == 'context_translate' and translations:
# Задание на перевод по контексту
# Выбираем случайный перевод с контекстом
translations_with_context = [tr for tr in translations if tr.context]
if translations_with_context:
selected_tr = random.choice(translations_with_context)
# Локализация фразы
if translation_lang == 'en':
prompt = "Translate the highlighted word in context:"
elif translation_lang == 'ja':
prompt = "文脈に合った翻訳を入力してください:"
else:
prompt = "Переведи выделенное слово в контексте:"
task = {
'type': 'context_translate',
'word_id': word.id,
'translation_id': selected_tr.id,
'question': (
f"{prompt}\n\n"
f"<i>«{selected_tr.context}»</i>\n\n"
f"<b>{word.word_original}</b> = ?"
),
'word': word.word_original,
'correct_answer': selected_tr.translation,
'transcription': word.transcription,
'context': selected_tr.context,
'context_translation': selected_tr.context_translation
}
tasks.append(task)
continue
if task_type == 'translate':
# Задание на перевод между языком обучения и языком перевода
direction = random.choice(['learn_to_tr', 'tr_to_learn'])
# Локализация фразы "Переведи слово"
if translation_lang == 'en':
prompt = "Translate the word:"
elif translation_lang == 'ja':
prompt = "単語を訳してください:"
else:
prompt = "Переведи слово:"
# Определяем правильный ответ - берём из таблицы переводов если есть
correct_translation = word.word_translation
if translations:
# Берём основной перевод или первый
primary = next((tr for tr in translations if tr.is_primary), translations[0] if translations else None)
if primary:
correct_translation = primary.translation
if direction == 'learn_to_tr':
task = {
'type': f'translate_to_{translation_lang}',
'word_id': word.id,
'question': f"{prompt} <b>{word.word_original}</b>",
'word': word.word_original,
'correct_answer': correct_translation,
'transcription': word.transcription,
# Все допустимые ответы для проверки
'all_translations': [tr.translation for tr in translations] if translations else [correct_translation]
}
else:
task = {
'type': f'translate_to_{learning_lang}',
'word_id': word.id,
'question': f"{prompt} <b>{correct_translation}</b>",
'word': correct_translation,
'correct_answer': word.word_original,
'transcription': word.transcription
}
else:
# Задание на заполнение пропуска
# Генерируем предложение с пропуском через AI
sentence_data = await ai_service.generate_fill_in_sentence(
word.word_original,
learning_lang=learning_lang,
translation_lang=translation_lang
)
# Локализация заголовка
if translation_lang == 'en':
fill_title = "Fill in the blank in the sentence:"
elif translation_lang == 'ja':
fill_title = "文の空欄を埋めてください:"
else:
fill_title = "Заполни пропуск в предложении:"
task = {
'type': 'fill_in',
'word_id': word.id,
'question': (
f"{fill_title}\n\n"
f"<b>{sentence_data['sentence']}</b>\n\n"
f"<i>{sentence_data.get('translation', '')}</i>"
),
'word': word.word_original,
'correct_answer': sentence_data['answer'],
'sentence': sentence_data['sentence']
}
tasks.append(task)
return tasks
@staticmethod
async def save_task_result(
session: AsyncSession,
user_id: int,
task_type: str,
content: Dict,
user_answer: str,
correct_answer: str,
is_correct: bool,
ai_feedback: Optional[str] = None
) -> Task:
"""
Сохранение результата выполнения задания
Args:
session: Сессия базы данных
user_id: ID пользователя
task_type: Тип задания
content: Содержимое задания
user_answer: Ответ пользователя
correct_answer: Правильный ответ
is_correct: Правильность ответа
ai_feedback: Обратная связь от AI
Returns:
Сохраненное задание
"""
task = Task(
user_id=user_id,
task_type=task_type,
content=content,
user_answer=user_answer,
correct_answer=correct_answer,
is_correct=is_correct,
ai_feedback=ai_feedback,
completed_at=datetime.utcnow()
)
session.add(task)
await session.commit()
await session.refresh(task)
return task
@staticmethod
async def update_word_statistics(
session: AsyncSession,
word_id: int,
is_correct: bool
):
"""
Обновление статистики слова
Args:
session: Сессия базы данных
word_id: ID слова
is_correct: Правильность ответа
"""
result = await session.execute(
select(Vocabulary).where(Vocabulary.id == word_id)
)
word = result.scalar_one_or_none()
if word:
word.times_reviewed += 1
if is_correct:
word.correct_answers += 1
word.last_reviewed = datetime.utcnow()
await session.commit()
@staticmethod
async def get_user_stats(session: AsyncSession, user_id: int) -> Dict:
"""
Получение статистики пользователя
Args:
session: Сессия базы данных
user_id: ID пользователя
Returns:
Статистика пользователя
"""
# Количество слов
words_result = await session.execute(
select(Vocabulary).where(Vocabulary.user_id == user_id)
)
words = list(words_result.scalars().all())
total_words = len(words)
# Количество выполненных заданий
tasks_result = await session.execute(
select(Task).where(Task.user_id == user_id)
)
tasks = list(tasks_result.scalars().all())
total_tasks = len(tasks)
# Правильные ответы
correct_tasks = len([t for t in tasks if t.is_correct])
accuracy = int((correct_tasks / total_tasks * 100)) if total_tasks > 0 else 0
# Слова с повторениями
reviewed_words = len([w for w in words if w.times_reviewed > 0])
return {
'total_words': total_words,
'reviewed_words': reviewed_words,
'total_tasks': total_tasks,
'correct_tasks': correct_tasks,
'accuracy': accuracy
}
@staticmethod
async def get_correctly_answered_words(
session: AsyncSession,
user_id: int
) -> List[str]:
"""
Получить список слов, на которые пользователь правильно ответил в заданиях
Args:
session: Сессия базы данных
user_id: ID пользователя
Returns:
Список слов (строки) с правильными ответами
"""
result = await session.execute(
select(Task)
.where(Task.user_id == user_id)
.where(Task.is_correct == True)
)
tasks = list(result.scalars().all())
words = []
for task in tasks:
if task.content and isinstance(task.content, dict):
word = task.content.get('word')
if word:
words.append(word.lower())
return list(set(words))