Command Query Responsibility Segregation
Разделение команд и запросов
CQRS — паттерн, разделяющий операции чтения (Queries) и записи (Commands) данных на отдельные модели и хранилища.
┌─────────────────────────────────┐ │ Application │ │ ┌──────────┐ ┌──────────┐ │ │ │ Read │ │ Write │ │ │ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ └──────┬──────┘ │ │ ↓ │ │ ┌───────────────┐ │ │ │ Single Model │ │ │ └───────┬───────┘ │ │ ↓ │ │ ┌───────────────┐ │ │ │ Single DB │ │ │ └───────────────┘ │ └─────────────────────────────────┘
Одна модель для чтения и записи
┌─────────────────────────────────┐ │ Application │ │ ┌──────────┐ ┌──────────┐ │ │ │ Query │ │ Command │ │ │ │ Model │ │ Model │ │ │ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ ↓ ↓ │ │ ┌─────────┐ ┌─────────┐ │ │ │ Read DB │ │ Write DB│ │ │ │(Optimized│ │(Normalized│ │ │ │ for read)│ │ for write)│ │ │ └─────────┘ └─────────┘ │ └─────────────────────────────────┘
Отдельные модели для чтения и записи
# ❌ Одна модель должна решать разные задачи
# Запись: нужна нормализация
class User:
id: int
name: str
email: str
created_at: datetime
# Много полей, нормализованная структура
# Чтение: нужна денормализация для производительности
# Запрос: получить всех пользователей с их заказами
SELECT u.*, o.*
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
# Медленно! Нужны джойны
Одна модель не может быть оптимальной для обоих случаев
# Write Model (Command) — нормализованная
class User:
id: int
name: str
email: str
created_at: datetime
# Read Model (Query) — денормализованная, оптимизирована для чтения
class UserView:
id: int
name: str
email: str
order_count: int # Предвычисленное значение
total_spent: float # Предвычисленное значение
last_order_date: datetime # Кэшированное значение
# Чтение быстрее — не нужны джойны!
# commands/create_user_command.py
from dataclasses import dataclass
@dataclass
class CreateUserCommand:
name: str
email: str
def validate(self):
if not self.name:
raise ValueError("Name is required")
if "@" not in self.email:
raise ValueError("Invalid email")
# commands/update_user_command.py
@dataclass
class UpdateUserCommand:
user_id: int
name: str = None
email: str = None
💡 Command — это намерение изменить состояние
# handlers/create_user_handler.py
class CreateUserHandler:
def __init__(self, user_repository, event_bus):
self.user_repository = user_repository
self.event_bus = event_bus
def handle(self, command: CreateUserCommand):
# 1. Валидация
command.validate()
# 2. Создание сущности
user = User(
id=generate_id(),
name=command.name,
email=command.email
)
# 3. Сохранение в Write DB
self.user_repository.save(user)
# 4. Публикация события (для обновления Read Model)
self.event_bus.publish("user_created", {
"user_id": user.id,
"name": user.name,
"email": user.email
})
return user.id
# queries/get_user_query.py
from dataclasses import dataclass
@dataclass
class GetUserQuery:
user_id: int
@dataclass
class GetUsersQuery:
page: int = 1
page_size: int = 10
search: str = None
💡 Query — это намерение прочитать данные
# handlers/get_user_handler.py
class GetUserHandler:
def __init__(self, user_view_repository):
self.user_view_repository = user_view_repository
def handle(self, query: GetUserQuery) -> UserView:
# Чтение из оптимизированной Read DB
return self.user_view_repository.find_by_id(query.user_id)
# handlers/get_users_handler.py
class GetUsersHandler:
def __init__(self, user_view_repository):
self.user_view_repository = user_view_repository
def handle(self, query: GetUsersQuery) -> List[UserView]:
# Чтение из Read DB (денормализованная, быстрая)
return self.user_view_repository.find_all(
page=query.page,
page_size=query.page_size,
search=query.search
)
# projections/user_projection.py
class UserProjection:
def __init__(self, user_view_repository):
self.user_view_repository = user_view_repository
def on_user_created(self, event):
# Создаём Read Model из события
user_view = UserView(
id=event['user_id'],
name=event['name'],
email=event['email'],
order_count=0, # Начальное значение
total_spent=0.0
)
self.user_view_repository.save(user_view)
def on_order_created(self, event):
# Обновляем Read Model
user_view = self.user_view_repository.find_by_id(event['user_id'])
user_view.order_count += 1
user_view.total_spent += event['amount']
user_view.last_order_date = datetime.now()
self.user_view_repository.save(user_view)
💡 Projection обновляет Read Model на основе событий
# write_models/user.py
class User:
def __init__(self, user_id: int, name: str, email: str):
self.id = user_id
self.name = name
self.email = email
self.created_at = datetime.now()
# write_models/order.py
class Order:
def __init__(self, order_id: int, user_id: int, items: List[OrderItem]):
self.id = order_id
self.user_id = user_id
self.items = items
self.status = "pending"
self.created_at = datetime.now()
# Write DB: PostgreSQL (нормализованная)
# users table: id, name, email, created_at
# orders table: id, user_id, status, created_at
# order_items table: id, order_id, product_id, quantity
# read_models/user_view.py
class UserView:
def __init__(self):
self.id: int
self.name: str
self.email: str
self.order_count: int # Денормализовано
self.total_spent: float # Денормализовано
self.last_order_date: datetime # Денормализовано
self.favorite_categories: List[str] # Предвычислено
# Read DB: MongoDB или Elasticsearch (денормализованная)
# user_views collection:
# {
# "id": 1,
# "name": "Alice",
# "email": "alice@example.com",
# "order_count": 5,
# "total_spent": 1250.50,
# "last_order_date": "2024-01-15T10:30:00Z",
# "favorite_categories": ["electronics", "books"]
# }
# commands/create_order_command.py
@dataclass
class CreateOrderCommand:
user_id: int
items: List[OrderItemDTO]
# handlers/create_order_handler.py
class CreateOrderHandler:
def __init__(self, order_repository, event_bus):
self.order_repository = order_repository
self.event_bus = event_bus
def handle(self, command: CreateOrderCommand):
# 1. Создаём заказ в Write DB
order = Order(
id=generate_id(),
user_id=command.user_id,
items=command.items
)
self.order_repository.save(order)
# 2. Публикуем событие
self.event_bus.publish("order_created", {
"order_id": order.id,
"user_id": order.user_id,
"total": order.calculate_total(),
"items": [item.to_dict() for item in order.items]
})
return order.id
# queries/get_user_with_stats_query.py
@dataclass
class GetUserWithStatsQuery:
user_id: int
# handlers/get_user_with_stats_handler.py
class GetUserWithStatsHandler:
def __init__(self, user_view_repository):
self.user_view_repository = user_view_repository
def handle(self, query: GetUserWithStatsQuery) -> UserView:
# Чтение из оптимизированной Read DB
# Не нужны джойны! Всё уже денормализовано
return self.user_view_repository.find_by_id(query.user_id)
# Результат:
# {
# "id": 1,
# "name": "Alice",
# "order_count": 5,
# "total_spent": 1250.50,
# "last_order_date": "2024-01-15T10:30:00Z"
# }
# Быстро! Один запрос к Read DB
# projections/user_projection.py
class UserProjection:
def __init__(self, user_view_repository):
self.user_view_repository = user_view_repository
def on_order_created(self, event):
user_id = event['user_id']
# Получаем или создаём UserView
user_view = self.user_view_repository.find_by_id(user_id)
if not user_view:
# Если пользователь новый, создаём из User Service
user_view = self._create_user_view(user_id)
# Обновляем статистику
user_view.order_count += 1
user_view.total_spent += event['total']
user_view.last_order_date = datetime.now()
# Обновляем категории
categories = [item['category'] for item in event['items']]
user_view.favorite_categories = self._update_categories(
user_view.favorite_categories,
categories
)
# Сохраняем в Read DB
self.user_view_repository.save(user_view)
Write Model (Command)
↓
Event: user_created
↓
Event Bus (Kafka/RabbitMQ)
↓
Projection Handler
↓
Read Model (Query)
💡 Read Model обновляется асинхронно через события
# Write Side: Event Sourcing
# Сохраняем события, а не состояние
events = [
UserCreatedEvent(user_id=1, name="Alice", email="alice@example.com"),
OrderCreatedEvent(order_id=1, user_id=1, total=100.0),
OrderCreatedEvent(order_id=2, user_id=1, total=200.0)
]
# Read Side: Projection
# Строим Read Model из событий
user_view = UserView(id=1, name="Alice", email="alice@example.com")
for event in events:
if isinstance(event, OrderCreatedEvent) and event.user_id == 1:
user_view.order_count += 1
user_view.total_spent += event.total
# Результат: user_view.order_count = 2, total_spent = 300.0
# Альтернатива: периодическая синхронизация
class ReadModelSync:
def sync(self):
# Находим изменения в Write DB
recent_orders = self.write_db.query(
"SELECT * FROM orders WHERE updated_at > ?",
(self.last_sync_time,)
)
# Обновляем Read Model
for order in recent_orders:
self.update_read_model(order)
self.last_sync_time = datetime.now()
# Запускаем каждые 5 минут
scheduler.add_job(sync, 'interval', minutes=5)
💡 Проще, но менее реальное время
| Система | Причина |
|---|---|
| E-Commerce | Много чтений (каталог), мало записей (заказы) |
| Аналитика | Сложные запросы, агрегации |
| Социальные сети | Лента новостей (чтение), посты (запись) |
| ML Platform | Обучение моделей (запись), инференс (чтение) |
"CQRS — это не про сложность. Это про оптимизацию.
Используйте его, когда одна модель не может решить все задачи оптимально."