CQRS

Command Query Responsibility Segregation

Разделение команд и запросов

Что такое CQRS?

CQRS — паттерн, разделяющий операции чтения (Queries) и записи (Commands) данных на отдельные модели и хранилища.

Важно: CQRS ≠ CQS

  • CQS (Command-Query Separation) — правило уровня метода: метод либо меняет состояние, либо возвращает значение.
  • CQRS — архитектурный паттерн: отдельные модели и часто отдельные хранилища для чтения и записи.
  • Вы можете применять CQS без CQRS; CQRS почти всегда предполагает CQS.

🔹 Традиционный подход

┌─────────────────────────────────┐
│         Application             │
│  ┌──────────┐  ┌──────────┐    │
│  │  Read    │  │  Write   │    │
│  └────┬─────┘  └────┬─────┘    │
│       │             │           │
│       └──────┬──────┘           │
│              ↓                  │
│      ┌───────────────┐          │
│      │  Single Model │          │
│      └───────┬───────┘          │
│              ↓                  │
│      ┌───────────────┐          │
│      │  Single DB    │          │
│      └───────────────┘          │
└─────────────────────────────────┘

Одна модель для чтения и записи

🔹 CQRS подход

┌─────────────────────────────────┐
│         Application             │
│  ┌──────────┐  ┌──────────┐    │
│  │  Query   │  │ Command  │    │
│  │  Model   │  │  Model   │    │
│  └────┬─────┘  └────┬─────┘    │
│       │             │           │
│       ↓             ↓           │
│  ┌─────────┐  ┌─────────┐      │
│  │ Read DB │  │ Write DB│      │
│  │(Optimized│  │(Normalized│   │
│  │ for read)│  │ for write)│   │
│  └─────────┘  └─────────┘      │
└─────────────────────────────────┘

Отдельные модели для чтения и записи

🔹 Принцип CQRS

  • Command — изменяет состояние системы (Create, Update, Delete)
  • Query — возвращает данные, не изменяя состояние (Read)
  • Разделение — команды и запросы используют разные модели
  • Оптимизация — модели оптимизированы под свои задачи

Почему CQRS?

🔹 Проблема: конфликт требований

# ❌ Одна модель должна решать разные задачи

# Запись: нужна нормализация
class User:
    id: int
    name: str
    email: str
    created_at: datetime
    # Много полей, нормализованная структура

# Чтение: нужна денормализация для производительности
# Запрос: получить всех пользователей с их заказами
SELECT u.*, o.* 
FROM users u 
LEFT JOIN orders o ON u.id = o.user_id
# Медленно! Нужны джойны

Одна модель не может быть оптимальной для обоих случаев

✅ Решение: разделение моделей

# Write Model (Command) — нормализованная
class User:
    id: int
    name: str
    email: str
    created_at: datetime

# Read Model (Query) — денормализованная, оптимизирована для чтения
class UserView:
    id: int
    name: str
    email: str
    order_count: int  # Предвычисленное значение
    total_spent: float  # Предвычисленное значение
    last_order_date: datetime  # Кэшированное значение

# Чтение быстрее — не нужны джойны!

💡 Преимущества CQRS

  • Производительность — оптимизация под чтение и запись отдельно
  • Масштабируемость — можно масштабировать чтение и запись независимо
  • Гибкость — разные технологии для чтения и записи
  • Безопасность — разделение прав доступа
  • Простота — упрощение моделей

Реализация CQRS

🔹 Commands (Команды)

# commands/create_user_command.py
from dataclasses import dataclass

@dataclass
class CreateUserCommand:
    name: str
    email: str
    
    def validate(self):
        if not self.name:
            raise ValueError("Name is required")
        if "@" not in self.email:
            raise ValueError("Invalid email")

# commands/update_user_command.py
@dataclass
class UpdateUserCommand:
    user_id: int
    name: str = None
    email: str = None

💡 Command — это намерение изменить состояние

🔹 Command Handlers

# handlers/create_user_handler.py
class CreateUserHandler:
    def __init__(self, user_repository, event_bus):
        self.user_repository = user_repository
        self.event_bus = event_bus
    
    def handle(self, command: CreateUserCommand):
        # 1. Валидация
        command.validate()
        
        # 2. Создание сущности
        user = User(
            id=generate_id(),
            name=command.name,
            email=command.email
        )
        
        # 3. Сохранение в Write DB
        self.user_repository.save(user)
        
        # 4. Публикация события (для обновления Read Model)
        self.event_bus.publish("user_created", {
            "user_id": user.id,
            "name": user.name,
            "email": user.email
        })
        
        return user.id

🔹 Queries (Запросы)

# queries/get_user_query.py
from dataclasses import dataclass

@dataclass
class GetUserQuery:
    user_id: int

@dataclass
class GetUsersQuery:
    page: int = 1
    page_size: int = 10
    search: str = None

💡 Query — это намерение прочитать данные

🔹 Query Handlers

# handlers/get_user_handler.py
class GetUserHandler:
    def __init__(self, user_view_repository):
        self.user_view_repository = user_view_repository
    
    def handle(self, query: GetUserQuery) -> UserView:
        # Чтение из оптимизированной Read DB
        return self.user_view_repository.find_by_id(query.user_id)

# handlers/get_users_handler.py
class GetUsersHandler:
    def __init__(self, user_view_repository):
        self.user_view_repository = user_view_repository
    
    def handle(self, query: GetUsersQuery) -> List[UserView]:
        # Чтение из Read DB (денормализованная, быстрая)
        return self.user_view_repository.find_all(
            page=query.page,
            page_size=query.page_size,
            search=query.search
        )

🔹 Read Model Projection

# projections/user_projection.py
class UserProjection:
    def __init__(self, user_view_repository):
        self.user_view_repository = user_view_repository
    
    def on_user_created(self, event):
        # Создаём Read Model из события
        user_view = UserView(
            id=event['user_id'],
            name=event['name'],
            email=event['email'],
            order_count=0,  # Начальное значение
            total_spent=0.0
        )
        self.user_view_repository.save(user_view)
    
    def on_order_created(self, event):
        # Обновляем Read Model
        user_view = self.user_view_repository.find_by_id(event['user_id'])
        user_view.order_count += 1
        user_view.total_spent += event['amount']
        user_view.last_order_date = datetime.now()
        self.user_view_repository.save(user_view)

💡 Projection обновляет Read Model на основе событий

Практический пример: E-Commerce

🔹 Write Model (Command Side)

# write_models/user.py
class User:
    def __init__(self, user_id: int, name: str, email: str):
        self.id = user_id
        self.name = name
        self.email = email
        self.created_at = datetime.now()

# write_models/order.py
class Order:
    def __init__(self, order_id: int, user_id: int, items: List[OrderItem]):
        self.id = order_id
        self.user_id = user_id
        self.items = items
        self.status = "pending"
        self.created_at = datetime.now()

# Write DB: PostgreSQL (нормализованная)
# users table: id, name, email, created_at
# orders table: id, user_id, status, created_at
# order_items table: id, order_id, product_id, quantity

🔹 Read Model (Query Side)

# read_models/user_view.py
class UserView:
    def __init__(self):
        self.id: int
        self.name: str
        self.email: str
        self.order_count: int  # Денормализовано
        self.total_spent: float  # Денормализовано
        self.last_order_date: datetime  # Денормализовано
        self.favorite_categories: List[str]  # Предвычислено

# Read DB: MongoDB или Elasticsearch (денормализованная)
# user_views collection:
# {
#   "id": 1,
#   "name": "Alice",
#   "email": "alice@example.com",
#   "order_count": 5,
#   "total_spent": 1250.50,
#   "last_order_date": "2024-01-15T10:30:00Z",
#   "favorite_categories": ["electronics", "books"]
# }

🔹 Command: Create Order

# commands/create_order_command.py
@dataclass
class CreateOrderCommand:
    user_id: int
    items: List[OrderItemDTO]

# handlers/create_order_handler.py
class CreateOrderHandler:
    def __init__(self, order_repository, event_bus):
        self.order_repository = order_repository
        self.event_bus = event_bus
    
    def handle(self, command: CreateOrderCommand):
        # 1. Создаём заказ в Write DB
        order = Order(
            id=generate_id(),
            user_id=command.user_id,
            items=command.items
        )
        self.order_repository.save(order)
        
        # 2. Публикуем событие
        self.event_bus.publish("order_created", {
            "order_id": order.id,
            "user_id": order.user_id,
            "total": order.calculate_total(),
            "items": [item.to_dict() for item in order.items]
        })
        
        return order.id

🔹 Query: Get User with Stats

# queries/get_user_with_stats_query.py
@dataclass
class GetUserWithStatsQuery:
    user_id: int

# handlers/get_user_with_stats_handler.py
class GetUserWithStatsHandler:
    def __init__(self, user_view_repository):
        self.user_view_repository = user_view_repository
    
    def handle(self, query: GetUserWithStatsQuery) -> UserView:
        # Чтение из оптимизированной Read DB
        # Не нужны джойны! Всё уже денормализовано
        return self.user_view_repository.find_by_id(query.user_id)

# Результат:
# {
#   "id": 1,
#   "name": "Alice",
#   "order_count": 5,
#   "total_spent": 1250.50,
#   "last_order_date": "2024-01-15T10:30:00Z"
# }
# Быстро! Один запрос к Read DB

🔹 Projection: Update Read Model

# projections/user_projection.py
class UserProjection:
    def __init__(self, user_view_repository):
        self.user_view_repository = user_view_repository
    
    def on_order_created(self, event):
        user_id = event['user_id']
        
        # Получаем или создаём UserView
        user_view = self.user_view_repository.find_by_id(user_id)
        if not user_view:
            # Если пользователь новый, создаём из User Service
            user_view = self._create_user_view(user_id)
        
        # Обновляем статистику
        user_view.order_count += 1
        user_view.total_spent += event['total']
        user_view.last_order_date = datetime.now()
        
        # Обновляем категории
        categories = [item['category'] for item in event['items']]
        user_view.favorite_categories = self._update_categories(
            user_view.favorite_categories,
            categories
        )
        
        # Сохраняем в Read DB
        self.user_view_repository.save(user_view)

Синхронизация Read и Write Models

🔹 Event-Driven Synchronization

Write Model (Command)
    ↓
Event: user_created
    ↓
Event Bus (Kafka/RabbitMQ)
    ↓
Projection Handler
    ↓
Read Model (Query)

💡 Read Model обновляется асинхронно через события

🔹 Event Sourcing + CQRS

# Write Side: Event Sourcing
# Сохраняем события, а не состояние
events = [
    UserCreatedEvent(user_id=1, name="Alice", email="alice@example.com"),
    OrderCreatedEvent(order_id=1, user_id=1, total=100.0),
    OrderCreatedEvent(order_id=2, user_id=1, total=200.0)
]

# Read Side: Projection
# Строим Read Model из событий
user_view = UserView(id=1, name="Alice", email="alice@example.com")
for event in events:
    if isinstance(event, OrderCreatedEvent) and event.user_id == 1:
        user_view.order_count += 1
        user_view.total_spent += event.total

# Результат: user_view.order_count = 2, total_spent = 300.0

🔹 Polling Pattern

# Альтернатива: периодическая синхронизация
class ReadModelSync:
    def sync(self):
        # Находим изменения в Write DB
        recent_orders = self.write_db.query(
            "SELECT * FROM orders WHERE updated_at > ?",
            (self.last_sync_time,)
        )
        
        # Обновляем Read Model
        for order in recent_orders:
            self.update_read_model(order)
        
        self.last_sync_time = datetime.now()

# Запускаем каждые 5 минут
scheduler.add_job(sync, 'interval', minutes=5)

💡 Проще, но менее реальное время

Когда использовать CQRS?

✅ Хорошо подходит для:

  • Высокая нагрузка на чтение — много запросов, мало записей
  • Сложные запросы — нужны денормализованные данные
  • Разные команды и запросы — разные модели упрощают код
  • Разные технологии — PostgreSQL для записи, MongoDB для чтения
  • Масштабируемость — нужно масштабировать чтение отдельно

❌ Не подходит для:

  • Простые CRUD — избыточно
  • Низкая нагрузка — overhead не оправдан
  • Простая модель — нет смысла разделять
  • Строгая консистентность — eventual consistency может быть проблемой

💡 Примеры использования

Система Причина
E-Commerce Много чтений (каталог), мало записей (заказы)
Аналитика Сложные запросы, агрегации
Социальные сети Лента новостей (чтение), посты (запись)
ML Platform Обучение моделей (запись), инференс (чтение)

🎯 Ключевые принципы CQRS

  1. Разделение — команды и запросы используют разные модели
  2. Оптимизация — модели оптимизированы под свои задачи
  3. Event-Driven — синхронизация через события
  4. Eventual Consistency — Read Model обновляется асинхронно
  5. Масштабируемость — чтение и запись масштабируются независимо
"CQRS — это не про сложность. Это про оптимизацию.
Используйте его, когда одна модель не может решить все задачи оптимально."

📚 Дополнительные ресурсы

  • Статьи: martinfowler.com/bliki/CQRS.html
  • Книга: "Implementing Domain-Driven Design" by Vaughn Vernon
  • Паттерн: Event Sourcing (часто используется с CQRS)