Модуль 4.1 · Урок 3
Практика: собираем команду агентов
Содержание
- Цель урока
- 1. Подготовка окружения
- 1.1 Системные требования
- 1.2 Установка зависимостей
- 1.3 Выбор модели: локальная или облачная
- 1.4 Структура проекта
- 2. Шаг 1: Базовый класс Agent
- 3. Шаг 2: Класс Orchestrator
- 4. Шаг 3: Конкретные агенты
- 5. Шаг 4: Полный рабочий пример
- 6. Шаг 5: Улучшения и оптимизации
- 6.1 Добавляем общую память между агентами
- 6.2 Добавляем retry логику
- 6.3 Параллельное выполнение задач
- 6.4 Маршрутизация моделей (разные модели для разных агентов)
- 7. Альтернативные фреймворки
- 7.1 CrewAI (более высокоуровневый подход)
- 7.2 LangGraph (более структурированный граф-подход)
- 8. Задания для самостоятельной работы
- Задание 1: Добавьте 5-го агента (Tester)
- Задание 2: Добавьте возможность поиска в интернете
- Задание 3: Развёртывание на двух машинах
- 9. Часто задаваемые вопросы
- Итоги
Цель урока
В этом уроке вы создадите работающую систему из 4 взаимодействующих AI агентов, которые совместно анализируют кодовую базу и генерируют отчёт. Это реальный пример микросервисной архитектуры с искусственным интеллектом.
Проект: “AI Development Team”
- Analyst — анализирует исходный код, находит проблемы
- Coder — генерирует исправления и улучшения
- Reviewer — проверяет качество предложенных решений
- Manager — координирует работу всей команды и собирает финальный отчёт
graph TD
MGR[Manager -- координатор] --> AN[Analyst -- анализ кода]
MGR --> CD[Coder -- исправления]
MGR --> RV[Reviewer -- проверка]
AN -- проблемы --> CD
CD -- исправленный код --> RV
RV -- отчёт ревью --> MGR
style MGR fill:#4f46e5,color:#fff,stroke:#4338ca
style AN fill:#2563eb,color:#fff,stroke:#1d4ed8
style CD fill:#059669,color:#fff,stroke:#047857
style RV fill:#d97706,color:#fff,stroke:#b45309
1. Подготовка окружения
1.1 Системные требования
# Проверьте версию Python
python3 --version # должна быть 3.11 или выше
# Создайте виртуальное окружение
python3 -m venv venv
source venv/bin/activate # Linux/Mac
# или
venv\Scripts\activate # Windows
1.2 Установка зависимостей
pip install openai pydantic python-dotenv requests
Требуемые пакеты:
openai— клиент для OpenAI-совместимого APIpydantic— валидация данныхpython-dotenv— загрузка переменных окружения
1.3 Выбор модели: локальная или облачная
Вариант A: Локальная модель (Ollama)
# Установите Ollama: https://ollama.com
# Скачайте модель (например, Mistral)
ollama pull mistral-small
# Запустите Ollama в отдельном терминале
ollama serve
Ollama будет доступна на http://localhost:11434/v1 (OpenAI-совместимый API).
Вариант B: OpenAI API (облачная)
# Установите API ключ в переменную окружения
export OPENAI_API_KEY="sk-..."
1.4 Структура проекта
ai-dev-team/
├── .env # Переменные окружения
├── main.py # Точка входа
├── agents.py # Классы агентов
├── orchestrator.py # Оркестратор задач
├── sample_code/ # Примеры кода для анализа
│ └── calculator.py
└── reports/ # Выходные отчёты
└── report.md
2. Шаг 1: Базовый класс Agent
Создайте файл agents.py:
import os
import json
from typing import Optional, Any
from abc import ABC, abstractmethod
from pydantic import BaseModel
import requests
from openai import OpenAI
class AgentConfig(BaseModel):
"""Конфигурация агента"""
name: str
role: str
system_prompt: str
model: str = "mistral"
api_endpoint: Optional[str] = None
temperature: float = 0.7
class Agent(ABC):
"""Базовый класс для AI агента"""
def __init__(self, config: AgentConfig):
self.config = config
self.name = config.name
self.role = config.role
self.system_prompt = config.system_prompt
self.model = config.model
self.conversation_history = []
self.results = []
# Инициализируем клиент OpenAI
api_key = os.getenv("OPENAI_API_KEY", "not-needed")
base_url = config.api_endpoint or "http://localhost:11434/v1"
self.client = OpenAI(api_key=api_key, base_url=base_url)
def send_message(self, user_message: str, context: Optional[str] = None) -> str:
"""
Отправить сообщение агенту и получить ответ.
Args:
user_message: Основное сообщение
context: Дополнительный контекст
Returns:
Ответ от LLM
"""
# Добавляем контекст если есть
if context:
full_message = f"{context}\n\n{user_message}"
else:
full_message = user_message
# Добавляем в историю
self.conversation_history.append({
"role": "user",
"content": full_message
})
try:
# Вызываем API
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.system_prompt},
*self.conversation_history
],
temperature=self.config.temperature,
max_tokens=2000
)
# Извлекаем текст ответа
assistant_message = response.choices[0].message.content
# Сохраняем в историю
self.conversation_history.append({
"role": "assistant",
"content": assistant_message
})
return assistant_message
except Exception as e:
error_msg = f"Ошибка при вызове API: {str(e)}"
print(f"[{self.name}] {error_msg}")
return error_msg
@abstractmethod
def execute(self, task: str, *args, **kwargs) -> dict:
"""
Выполнить задачу специфичную для этого агента.
Каждый конкретный агент реализует свою логику.
"""
pass
def get_summary(self) -> str:
"""Получить сводку работы агента"""
return f"[{self.name}] {self.role}: {len(self.results)} задач выполнено"
class AnalystAgent(Agent):
"""Агент-аналитик: читает код и находит проблемы"""
def execute(self, task: str, code_content: str = "", file_path: str = "") -> dict:
prompt = f"""Проанализируй следующий код и найди:
1. Потенциальные баги и проблемы безопасности
2. Нарушения best practices
3. Возможности оптимизации
4. Проблемы с читаемостью
Код из файла: {file_path}
```python
{code_content}
```
Предоставь детальный анализ в формате:
- **Проблема**: описание
- **Серьёзность**: High/Medium/Low
- **Предложение**: как исправить
"""
result = self.send_message(prompt)
self.results.append({
"task": "analyze",
"file": file_path,
"analysis": result
})
return {
"agent": self.name,
"task": task,
"analysis": result
}
class CoderAgent(Agent):
"""Агент-разработчик: генерирует исправления"""
def execute(self, task: str, original_code: str = "", analysis: str = "",
file_path: str = "") -> dict:
prompt = f"""На основе анализа кода, создай исправленную версию.
Оригинальный код ({file_path}):
```python
{original_code}
```
Анализ проблем:
{analysis}
Требования:
1. Исправь все выявленные проблемы
2. Добавь docstring для функций
3. Добавь обработку ошибок
4. Сохрани исходный функционал
Выведи только исправленный код в блоке ```python```
"""
result = self.send_message(prompt)
self.results.append({
"task": "code_fix",
"file": file_path,
"fixed_code": result
})
return {
"agent": self.name,
"task": task,
"fixed_code": result
}
class ReviewerAgent(Agent):
"""Агент-ревьюер: проверяет качество кода"""
def execute(self, task: str, original_code: str = "", fixed_code: str = "",
file_path: str = "") -> dict:
prompt = f"""Проведи код-ревью для исправленного кода.
Оригинальный код:
```python
{original_code}
```
Исправленный код:
```python
{fixed_code}
```
Проверь:
1. Корректность исправлений
2. Совместимость с исходной API
3. Качество кода (PEP 8, типы данных)
4. Полноту (все ли проблемы решены?)
5. Производительность
Выведи отчёт в формате:
- **Статус**: Готово / Требует правок / Критические проблемы
- **Плюсы**: список улучшений
- **Минусы**: оставшиеся проблемы (если есть)
- **Рекомендации**: дополнительные улучшения
"""
result = self.send_message(prompt)
self.results.append({
"task": "review",
"file": file_path,
"review": result
})
return {
"agent": self.name,
"task": task,
"review": result
}
class ManagerAgent(Agent):
"""Агент-менеджер: координирует всё и создаёт финальный отчёт"""
def execute(self, task: str, all_results: dict = None) -> dict:
if not all_results:
all_results = {}
summary = json.dumps(all_results, ensure_ascii=False, indent=2)
prompt = f"""На основе результатов работы всех агентов, создай финальный отчёт о качестве кода.
Результаты анализа:
{summary}
Создай структурированный отчёт:
## Итоговый отчёт по анализу кода
### 1. Статистика
- Количество файлов
- Количество найденных проблем
- Распределение по серьёзности
### 2. Критические проблемы
[Список с приоритетом]
### 3. Рекомендации по улучшению
[Детальные рекомендации]
### 4. Заключение
[Общая оценка качества кода]
"""
result = self.send_message(prompt)
self.results.append({
"task": "final_report",
"report": result
})
return {
"agent": self.name,
"task": task,
"report": result
}
3. Шаг 2: Класс Orchestrator
Создайте файл orchestrator.py:
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
from agents import Agent, AgentConfig, AnalystAgent, CoderAgent, ReviewerAgent, ManagerAgent
import asyncio
from pathlib import Path
import time
class TaskStatus(Enum):
"""Статус задачи"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Task:
"""Задача для выполнения"""
id: str
agent_name: str
task_type: str
data: Dict[str, Any] = field(default_factory=dict)
status: TaskStatus = TaskStatus.PENDING
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
def to_dict(self) -> Dict:
return {
"id": self.id,
"agent": self.agent_name,
"type": self.task_type,
"status": self.status.value,
"result": self.result
}
class Orchestrator:
"""
Оркестратор для управления командой агентов.
Распределяет задачи, управляет очередью и собирает результаты.
"""
def __init__(self, model: str = "mistral", api_endpoint: str = "http://localhost:11434/v1"):
self.agents: Dict[str, Agent] = {}
self.task_queue: List[Task] = []
self.completed_tasks: Dict[str, Task] = {}
self.task_results: Dict[str, Any] = {}
self.model = model
self.api_endpoint = api_endpoint
# Инициализируем агентов
self._initialize_agents()
def _initialize_agents(self):
"""Инициализировать стандартную команду агентов"""
# Аналитик
analyst_config = AgentConfig(
name="Analyst",
role="Анализирует исходный код и находит проблемы",
system_prompt="""Ты опытный аналитик кода. Твоя задача - найти баги, уязвимости,
нарушения best practices и возможности оптимизации. Будь критичен, но справедлив.
Предоставляй конкретные примеры и рекомендации.""",
model=self.model,
api_endpoint=self.api_endpoint
)
self.agents["Analyst"] = AnalystAgent(analyst_config)
# Разработчик
coder_config = AgentConfig(
name="Coder",
role="Генерирует исправления и улучшения кода",
system_prompt="""Ты опытный Python разработчик. Твоя задача - писать чистый,
эффективный, хорошо документированный код. Следуй PEP 8, добавляй типы данных,
обработку ошибок. Сохраняй совместимость с исходной API.""",
model=self.model,
api_endpoint=self.api_endpoint
)
self.agents["Coder"] = CoderAgent(coder_config)
# Ревьюер
reviewer_config = AgentConfig(
name="Reviewer",
role="Проверяет качество и корректность кода",
system_prompt="""Ты опытный code reviewer. Твоя задача - убедиться, что код
высокого качества, безопасен и эффективен. Смотри на логику, производительность,
читаемость. Будь конструктивен в критике.""",
model=self.model,
api_endpoint=self.api_endpoint
)
self.agents["Reviewer"] = ReviewerAgent(reviewer_config)
# Менеджер
manager_config = AgentConfig(
name="Manager",
role="Координирует работу команды и создаёт отчёты",
system_prompt="""Ты опытный проект менеджер. Твоя задача - синтезировать
информацию от других агентов, приоритизировать проблемы и создать понятный
отчёт для разработчиков. Будь системным и конструктивным.""",
model=self.model,
api_endpoint=self.api_endpoint
)
self.agents["Manager"] = ManagerAgent(manager_config)
def register_agent(self, agent: Agent):
"""Зарегистрировать новый агент"""
self.agents[agent.name] = agent
def add_task(self, agent_name: str, task_type: str, task_id: str = None, **data) -> str:
"""
Добавить задачу в очередь.
Returns:
ID задачи
"""
if task_id is None:
task_id = f"{agent_name}_{task_type}_{int(time.time() * 1000)}"
task = Task(
id=task_id,
agent_name=agent_name,
task_type=task_type,
data=data
)
self.task_queue.append(task)
return task_id
def dispatch(self) -> Dict[str, Any]:
"""
Выполнить все задачи из очереди.
Returns:
Результаты выполнения всех задач
"""
print(f"\nЗапуск оркестратора. Задач в очереди: {len(self.task_queue)}\n")
for task in self.task_queue:
print(f" Задача: {task.id} -> {task.agent_name}")
try:
# Изменяем статус
task.status = TaskStatus.RUNNING
# Получаем агента
agent = self.agents.get(task.agent_name)
if not agent:
raise ValueError(f"Агент {task.agent_name} не найден")
# Выполняем задачу
result = agent.execute(task.task_type, **task.data)
# Сохраняем результат
task.result = result
task.status = TaskStatus.COMPLETED
self.task_results[task.id] = result
print(f" {task.id}: выполнено\n")
except Exception as e:
task.status = TaskStatus.FAILED
task.error = str(e)
print(f" {task.id}: ошибка - {str(e)}\n")
self.completed_tasks = {task.id: task for task in self.task_queue}
return self.task_results
def collect_results(self, agent_name: str = None) -> Dict[str, Any]:
"""
Собрать результаты выполнения.
Args:
agent_name: Если указано, собрать только результаты конкретного агента
Returns:
Словарь с результатами
"""
if agent_name:
return {
aid: result for aid, result in self.task_results.items()
if result.get("agent") == agent_name
}
return self.task_results
def get_summary(self) -> str:
"""Получить сводку работы оркестратора"""
completed = sum(1 for t in self.completed_tasks.values()
if t.status == TaskStatus.COMPLETED)
failed = sum(1 for t in self.completed_tasks.values()
if t.status == TaskStatus.FAILED)
summary = f"\nСВОДКА ОРКЕСТРАТОРА\n"
summary += f"{'='*50}\n"
summary += f"Всего задач: {len(self.completed_tasks)}\n"
summary += f"Выполнено: {completed}\n"
summary += f"Ошибок: {failed}\n\n"
for agent_name, agent in self.agents.items():
summary += f" {agent.get_summary()}\n"
return summary
4. Шаг 3: Конкретные агенты
Агенты уже реализованы в файле agents.py выше. Рассмотрим их роли:
| Агент | Входные данные | Выходные данные |
|---|---|---|
| Analyst | Исходный код | Список найденных проблем |
| Coder | Оригинальный код + анализ | Исправленный код |
| Reviewer | Оригинальный + исправленный код | Оценка качества исправлений |
| Manager | Результаты всех агентов | Финальный отчёт |
5. Шаг 4: Полный рабочий пример
Создайте файл sample_code/calculator.py с примером кода для анализа:
# Плохой код с множеством проблем
def calculate(a, b, operation):
if operation == "add":
return a + b
elif operation == "subtract":
return a - b
elif operation == "divide":
return a / b # Нет проверки на ноль!
elif operation == "multiply":
return a * b
def process_user_input(user_input):
parts = user_input.split(",")
num1 = int(parts[0]) # Может быть ValueError
num2 = int(parts[1])
op = parts[2]
result = calculate(num1, num2, op)
print(result)
Теперь создайте файл main.py:
import os
from pathlib import Path
from dotenv import load_dotenv
from orchestrator import Orchestrator
# Загружаем переменные окружения
load_dotenv()
def read_file(path: str) -> str:
"""Читать файл"""
try:
with open(path, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
return f"Файл {path} не найден"
def run_code_review_pipeline(project_path: str = "sample_code"):
"""
Запустить полный пайплайн анализа кода.
Args:
project_path: Путь к проекту для анализа
"""
print("\n" + "="*60)
print("AI DEVELOPMENT TEAM - Анализ кода")
print("="*60 + "\n")
# Инициализируем оркестратор
orchestrator = Orchestrator(
model="mistral",
api_endpoint="http://localhost:11434/v1"
# Или для OpenAI:
# model="gpt-4",
# api_endpoint="https://api.openai.com/v1"
)
# Находим все Python файлы
project_dir = Path(project_path)
python_files = list(project_dir.glob("*.py"))
if not python_files:
print(f"Python файлы не найдены в {project_path}")
return
# Фаза 1: Анализ
print("ФАЗА 1: АНАЛИЗ КОДА")
print("-" * 60)
analyses = {}
for py_file in python_files:
code_content = read_file(str(py_file))
file_name = py_file.name
# Добавляем задачу для Analyst
task_id = orchestrator.add_task(
agent_name="Analyst",
task_type="analyze",
file_path=file_name,
code_content=code_content
)
analyses[file_name] = task_id
# Выполняем анализ
results = orchestrator.dispatch()
# Фаза 2: Генерация исправлений
print("\nФАЗА 2: ГЕНЕРАЦИЯ ИСПРАВЛЕНИЙ")
print("-" * 60)
fixed_codes = {}
for py_file in python_files:
code_content = read_file(str(py_file))
file_name = py_file.name
# Получаем анализ для этого файла
analysis_task_id = analyses[file_name]
analysis_result = results.get(analysis_task_id, {}).get("analysis", "")
# Добавляем задачу для Coder
task_id = orchestrator.add_task(
agent_name="Coder",
task_type="fix_code",
file_path=file_name,
original_code=code_content,
analysis=analysis_result
)
fixed_codes[file_name] = task_id
# Выполняем исправления
results.update(orchestrator.dispatch())
# Фаза 3: Ревью
print("\nФАЗА 3: КОД-РЕВЬЮ")
print("-" * 60)
reviews = {}
for py_file in python_files:
code_content = read_file(str(py_file))
file_name = py_file.name
# Получаем исправленный код
fix_task_id = fixed_codes[file_name]
fixed_code = results.get(fix_task_id, {}).get("fixed_code", "")
# Добавляем задачу для Reviewer
task_id = orchestrator.add_task(
agent_name="Reviewer",
task_type="review",
file_path=file_name,
original_code=code_content,
fixed_code=fixed_code
)
reviews[file_name] = task_id
# Выполняем ревью
results.update(orchestrator.dispatch())
# Фаза 4: Финальный отчёт
print("\nФАЗА 4: ФИНАЛЬНЫЙ ОТЧЁТ")
print("-" * 60)
# Собираем все результаты для менеджера
all_results = {
"analyses": orchestrator.collect_results("Analyst"),
"fixes": orchestrator.collect_results("Coder"),
"reviews": orchestrator.collect_results("Reviewer")
}
# Добавляем задачу для Manager
orchestrator.add_task(
agent_name="Manager",
task_type="create_report",
all_results=all_results
)
# Выполняем финальное создание отчёта
results.update(orchestrator.dispatch())
# Сохраняем отчёт в файл
manager_results = orchestrator.collect_results("Manager")
if manager_results:
report_content = list(manager_results.values())[0].get("report", "")
# Создаём директорию для отчётов
Path("reports").mkdir(exist_ok=True)
# Сохраняем отчёт
with open("reports/report.md", 'w', encoding='utf-8') as f:
f.write(report_content)
print("\n" + report_content)
print("\nОтчёт сохранён в reports/report.md")
# Выводим сводку
print(orchestrator.get_summary())
if __name__ == "__main__":
# Проверяем наличие Ollama
import subprocess
try:
subprocess.run(
["curl", "-s", "http://localhost:11434/api/tags"],
capture_output=True,
timeout=2
)
print("Ollama доступна")
except:
print("Ollama недоступна на localhost:11434")
print("Убедитесь, что Ollama запущена: ollama serve")
print("Или установите переменную OPENAI_API_KEY для использования облачного API\n")
# Запускаем пайплайн
run_code_review_pipeline()
Запуск проекта:
# Убедитесь, что Ollama запущена в отдельном терминале
ollama serve
# В основном терминале
python main.py
6. Шаг 5: Улучшения и оптимизации
6.1 Добавляем общую память между агентами
Модифицируйте orchestrator.py, добавив класс SharedMemory:
class SharedMemory:
"""Общая память для обмена информацией между агентами"""
def __init__(self):
self.data = {}
def set(self, key: str, value: Any):
"""Сохранить значение"""
self.data[key] = value
print(f"Память: {key} = {str(value)[:50]}...")
def get(self, key: str, default=None) -> Any:
"""Получить значение"""
return self.data.get(key, default)
def get_all(self) -> Dict[str, Any]:
"""Получить всё содержимое памяти"""
return self.data.copy()
# В Orchestrator добавьте:
class Orchestrator:
def __init__(self, ...):
# ... существующий код ...
self.shared_memory = SharedMemory()
# Передаём память всем агентам
for agent in self.agents.values():
agent.shared_memory = self.shared_memory
6.2 Добавляем retry логику
class Orchestrator:
def dispatch(self, max_retries: int = 2) -> Dict[str, Any]:
"""Выполнить все задачи с повторами при ошибке"""
for task in self.task_queue:
retries = 0
while retries <= max_retries:
try:
task.status = TaskStatus.RUNNING
agent = self.agents.get(task.agent_name)
if not agent:
raise ValueError(f"Агент {task.agent_name} не найден")
result = agent.execute(task.task_type, **task.data)
task.result = result
task.status = TaskStatus.COMPLETED
self.task_results[task.id] = result
print(f"{task.id}: успешно")
break # Выходим из цикла повторов
except Exception as e:
retries += 1
if retries <= max_retries:
print(f"{task.id}: ошибка (попытка {retries}), повторяю...")
import time
time.sleep(2 ** retries) # Экспоненциальная задержка
else:
task.status = TaskStatus.FAILED
task.error = str(e)
print(f"{task.id}: не удалось после {max_retries} попыток")
self.completed_tasks = {task.id: task for task in self.task_queue}
return self.task_results
6.3 Параллельное выполнение задач
import concurrent.futures
class Orchestrator:
def dispatch_parallel(self, max_workers: int = 4) -> Dict[str, Any]:
"""Выполнить задачи параллельно"""
def execute_task(task):
try:
agent = self.agents.get(task.agent_name)
result = agent.execute(task.task_type, **task.data)
task.result = result
task.status = TaskStatus.COMPLETED
return (task.id, result)
except Exception as e:
task.status = TaskStatus.FAILED
task.error = str(e)
return (task.id, {"error": str(e)})
print(f"Параллельное выполнение ({max_workers} потоков)")
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(execute_task, task) for task in self.task_queue]
for future in concurrent.futures.as_completed(futures):
task_id, result = future.result()
self.task_results[task_id] = result
self.completed_tasks = {task.id: task for task in self.task_queue}
return self.task_results
6.4 Маршрутизация моделей (разные модели для разных агентов)
class Orchestrator:
def __init__(self, model_config: Dict[str, str] = None):
"""
Args:
model_config: {"Analyst": "mistral-small", "Coder": "qwen2.5", ...}
"""
self.model_config = model_config or {}
self._initialize_agents()
def _initialize_agents(self):
# Используем разные модели для разных агентов
analyst_config = AgentConfig(
name="Analyst",
role="...",
system_prompt="...",
model=self.model_config.get("Analyst", "mistral")
)
# ... и т.д. для других агентов
Использование:
# Разные модели для разных задач
model_config = {
"Analyst": "mistral", # Быстрая, хороша для анализа
"Coder": "qwen2.5", # Лучше для генерации кода
"Reviewer": "mistral",
"Manager": "qwen2.5"
}
orchestrator = Orchestrator(model_config=model_config)
7. Альтернативные фреймворки
7.1 CrewAI (более высокоуровневый подход)
pip install crewai openai
from crewai import Agent, Task, Crew
# Определяем агентов более просто
analyst = Agent(
role="Code Analyst",
goal="Find bugs and issues in code",
backstory="You are an expert code reviewer with 10 years of experience"
)
coder = Agent(
role="Python Developer",
goal="Fix code issues and improve quality",
backstory="You are a senior Python developer"
)
# Определяем задачи
analyze_task = Task(
description="Analyze the code and find all issues",
agent=analyst,
expected_output="Detailed analysis of code issues"
)
fix_task = Task(
description="Fix the issues found in the analysis",
agent=coder,
expected_output="Fixed and improved code"
)
# Создаём команду
crew = Crew(
agents=[analyst, coder],
tasks=[analyze_task, fix_task]
)
result = crew.kickoff(inputs={"code": code_content})
Преимущества CrewAI:
- Менее кода, больше функциональности
- Встроенная поддержка tools и интеграций
- Автоматическое управление контекстом
Недостатки:
- Менее гибкий, чем кастомное решение
- Привязка к OpenAI API
7.2 LangGraph (более структурированный граф-подход)
pip install langgraph langchain-openai
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from typing import TypedDict
class CodeReviewState(TypedDict):
code: str
analysis: str
fixed_code: str
review: str
report: str
# Определяем узлы графа
def analyze_code(state: CodeReviewState) -> CodeReviewState:
llm = ChatOpenAI(model="gpt-4")
state["analysis"] = llm.invoke(f"Analyze: {state['code']}")
return state
def fix_code(state: CodeReviewState) -> CodeReviewState:
llm = ChatOpenAI(model="gpt-4")
state["fixed_code"] = llm.invoke(f"Fix: {state['code']}\n\n{state['analysis']}")
return state
def review_code(state: CodeReviewState) -> CodeReviewState:
llm = ChatOpenAI(model="gpt-4")
state["review"] = llm.invoke(f"Review: {state['fixed_code']}")
return state
# Создаём граф
workflow = StateGraph(CodeReviewState)
workflow.add_node("analyze", analyze_code)
workflow.add_node("fix", fix_code)
workflow.add_node("review", review_code)
# Определяем переходы
workflow.add_edge(START, "analyze")
workflow.add_edge("analyze", "fix")
workflow.add_edge("fix", "review")
workflow.add_edge("review", END)
# Компилируем и запускаем
graph = workflow.compile()
result = graph.invoke({"code": code_content})
Таблица сравнения:
| Аспект | Наше решение | CrewAI | LangGraph |
|---|---|---|---|
| Контроль | Максимальный | Средний | Высокий |
| Простота | Средняя | Высокая | Выше средней |
| Производительность | Высокая | Средняя | Высокая |
| Масштабируемость | Высокая | Средняя | Максимальная |
8. Задания для самостоятельной работы
Задание 1: Добавьте 5-го агента (Tester)
Цель: Создать TesterAgent, который генерирует unit тесты для исправленного кода.
Требования:
- Читает исправленный код
- Генерирует pytest тесты
- Проверяет покрытие функциональности
- Возвращает созданные тесты в виде строки
Подсказка:
class TesterAgent(Agent):
def execute(self, task: str, fixed_code: str = "", file_path: str = "") -> dict:
prompt = f"""Создай полный набор pytest тестов для следующего кода:
```python
{fixed_code}
```
Требования:
1. Покрывай все функции
2. Включай edge cases
3. Используй fixtures если нужно
4. Добавляй docstrings
Выведи только тестовый код в блоке ```python```"""
# ... реализуйте метод ...
Задание 2: Добавьте возможность поиска в интернете
Цель: Дать агентам доступ к поиску в интернете для решения проблем.
Требования:
pip install duckduckgo-search
class SearchTool:
@staticmethod
def search(query: str, max_results: int = 3) -> List[str]:
from duckduckgo_search import DDGS
results = DDGS().text(query, max_results=max_results)
return [r['body'] for r in results]
# В Agent добавьте:
def search_documentation(self, query: str) -> str:
"""Поиск в интернете"""
results = SearchTool.search(query)
return "\n".join(results)
Задание 3: Развёртывание на двух машинах
Цель: Запустить агентов на разных машинах (одна с GPU для LLM, одна для оркестратора).
Архитектура:
+-----------------+
| GPU Host |
| (Ollama/vLLM) |
| :11434 |
+--------+--------+
| API
+--------v--------+
| Agent VM |
| (Orchestrator) |
| localhost:8000 |
+-----------------+
Реализация:
# На GPU Host: запустите модель
# ollama serve --host 0.0.0.0:11434
# На Agent VM: используйте удалённый API
orchestrator = Orchestrator(
api_endpoint="http://<GPU_HOST_IP>:11434/v1"
)
# Или создайте REST API для оркестратора
from flask import Flask, request, jsonify
app = Flask(__name__)
orchestrator = Orchestrator()
@app.route("/dispatch", methods=["POST"])
def dispatch():
data = request.json
# Добавляем задачи из данных
results = orchestrator.dispatch()
return jsonify(results)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)
9. Часто задаваемые вопросы
Q: Какую модель выбрать для локального запуска?
A: Начните с mistral-small (быстрая, 7B параметров) или qwen2.5 (хороша для кода). Для более сложных задач используйте llama4 или gemma2.
Q: Как увеличить скорость работы? A:
- Используйте более маленькие модели (7B вместо 13B)
- Включите параллельное выполнение (
dispatch_parallel) - Используйте GPU (установите
CUDA_VISIBLE_DEVICES)
Q: Как работать с защищённым кодом?
A: Используйте api_endpoint="https://api.openai.com/v1" и установите OPENAI_API_KEY.
Q: Как добавить новый тип агента?
A: Наследуйте от Agent, реализуйте метод execute() и зарегистрируйте через orchestrator.register_agent().
Итоги
Вы создали полнофункциональную систему многоагентных AI, которая:
- Разделяет работу между специализированными агентами
- Управляет очередью задач и координирует выполнение
- Поддерживает параллельное выполнение и retry логику
- Может работать с локальными и облачными моделями
- Полностью настраивается и расширяется
Эта архитектура применима к любым задачам, требующим сотрудничества нескольких AI компонентов: аналитика данных, обработка документов, автоматизация разработки и многое другое.
Дальнейшее обучение:
- Документация Ollama: https://ollama.com
- OpenAI API: https://platform.openai.com/docs
- CrewAI: https://docs.crewai.com
- LangGraph: https://langchain-ai.github.io/langgraph