Хранение событий вместо состояния
Аудит, история, временные путешествия
Event Sourcing — паттерн, при котором состояние приложения хранится как последовательность событий, а не как текущее состояние.
# ❌ Храним только текущее состояние
class User:
id: int
name: str
email: str
balance: float
# В БД:
users table:
id | name | email | balance
1 | Alice | alice@example.com | 1000.0
# Проблемы:
# - Нет истории изменений
# - Невозможно откатить изменения
# - Нет аудита
# ✅ Храним события
events table:
id | aggregate_id | event_type | data | timestamp
1 | user_1 | UserCreated | {...} | 2024-01-01 10:00:00
2 | user_1 | BalanceDeposited | {"amount": 1000} | 2024-01-01 10:05:00
3 | user_1 | BalanceWithdrawn | {"amount": 200} | 2024-01-01 10:10:00
# Текущее состояние вычисляется из событий:
# user.balance = 0 + 1000 - 200 = 800
💡 Состояние — это результат применения всех событий
# events/user_events.py
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Event:
aggregate_id: str
event_type: str
data: dict
timestamp: datetime
version: int
@dataclass
class UserCreatedEvent(Event):
aggregate_id: str
name: str
email: str
timestamp: datetime
version: int = 1
@dataclass
class BalanceDepositedEvent(Event):
aggregate_id: str
amount: float
timestamp: datetime
version: int
@dataclass
class BalanceWithdrawnEvent(Event):
aggregate_id: str
amount: float
timestamp: datetime
version: int
# aggregates/user.py
class User:
def __init__(self, user_id: str):
self.id = user_id
self.name = None
self.email = None
self.balance = 0.0
self.version = 0
self.uncommitted_events = []
@staticmethod
def create(user_id: str, name: str, email: str):
user = User(user_id)
event = UserCreatedEvent(
aggregate_id=user_id,
name=name,
email=email,
timestamp=datetime.now(),
version=1
)
user.apply(event)
user.uncommitted_events.append(event)
return user
def deposit(self, amount: float):
event = BalanceDepositedEvent(
aggregate_id=self.id,
amount=amount,
timestamp=datetime.now(),
version=self.version + 1
)
self.apply(event)
self.uncommitted_events.append(event)
def withdraw(self, amount: float):
if self.balance < amount:
raise ValueError("Insufficient funds")
event = BalanceWithdrawnEvent(
aggregate_id=self.id,
amount=amount,
timestamp=datetime.now(),
version=self.version + 1
)
self.apply(event)
self.uncommitted_events.append(event)
def apply(self, event: Event):
if isinstance(event, UserCreatedEvent):
self.name = event.name
self.email = event.email
elif isinstance(event, BalanceDepositedEvent):
self.balance += event.amount
elif isinstance(event, BalanceWithdrawnEvent):
self.balance -= event.amount
self.version = event.version
# event_store.py
from abc import ABC, abstractmethod
class EventStore(ABC):
@abstractmethod
def save_events(self, aggregate_id: str, events: List[Event], expected_version: int):
pass
@abstractmethod
def get_events(self, aggregate_id: str) -> List[Event]:
pass
# Реализация для PostgreSQL
class PostgresEventStore(EventStore):
def __init__(self, db):
self.db = db
def save_events(self, aggregate_id: str, events: List[Event], expected_version: int):
# Проверяем версию (оптимистичная блокировка)
current_version = self.db.fetch_one(
"SELECT MAX(version) FROM events WHERE aggregate_id = ?",
(aggregate_id,)
)['max'] or 0
if current_version != expected_version:
raise ValueError("Concurrent modification detected")
# Сохраняем события
for event in events:
self.db.execute("""
INSERT INTO events (aggregate_id, event_type, data, timestamp, version)
VALUES (?, ?, ?, ?, ?)
""", (
event.aggregate_id,
event.event_type,
json.dumps(event.data),
event.timestamp,
event.version
))
def get_events(self, aggregate_id: str) -> List[Event]:
rows = self.db.fetch_all(
"SELECT * FROM events WHERE aggregate_id = ? ORDER BY version",
(aggregate_id,)
)
return [self._deserialize_event(row) for row in rows]
def _deserialize_event(self, row) -> Event:
event_class = EVENT_TYPES[row['event_type']]
data = json.loads(row['data'])
return event_class(**data, version=row['version'], timestamp=row['timestamp'])
# repositories/user_repository.py
class UserRepository:
def __init__(self, event_store: EventStore):
self.event_store = event_store
def save(self, user: User):
events = user.uncommitted_events
expected_version = user.version - len(events)
self.event_store.save_events(user.id, events, expected_version)
user.uncommitted_events = []
def find_by_id(self, user_id: str) -> User:
events = self.event_store.get_events(user_id)
if not events:
return None
user = User(user_id)
for event in events:
user.apply(event)
return user
💡 Восстанавливаем состояние из событий
# Создаём пользователя
user = User.create("user_1", "Alice", "alice@example.com")
# События:
# 1. UserCreatedEvent(user_id="user_1", name="Alice", email="alice@example.com")
# Сохраняем
repository.save(user)
# В Event Store:
# id | aggregate_id | event_type | data | version
# 1 | user_1 | UserCreated | {"name":"Alice","email":"..."} | 1
# Загружаем пользователя из событий
user = repository.find_by_id("user_1")
# Применяем события: UserCreatedEvent
# Результат: user.balance = 0.0
# Депозит
user.deposit(1000.0)
# Событие: BalanceDepositedEvent(amount=1000.0)
# Снятие
user.withdraw(200.0)
# Событие: BalanceWithdrawnEvent(amount=200.0)
# Сохраняем
repository.save(user)
# В Event Store:
# id | aggregate_id | event_type | data | version
# 1 | user_1 | UserCreated | {...} | 1
# 2 | user_1 | BalanceDeposited | {"amount":1000}| 2
# 3 | user_1 | BalanceWithdrawn | {"amount":200} | 3
# Текущее состояние: balance = 0 + 1000 - 200 = 800.0
# Восстанавливаем состояние на любой момент времени
def get_user_at_time(user_id: str, at_time: datetime) -> User:
events = event_store.get_events_until(user_id, at_time)
user = User(user_id)
for event in events:
user.apply(event)
return user
# Получаем состояние на 10:05 (до снятия)
user_at_10_05 = get_user_at_time("user_1", datetime(2024, 1, 1, 10, 5))
# balance = 1000.0 (только депозит)
# Получаем состояние на 10:15 (после всех операций)
user_at_10_15 = get_user_at_time("user_1", datetime(2024, 1, 1, 10, 15))
# balance = 800.0 (депозит - снятие)
# Получаем историю изменений
def get_audit_trail(user_id: str) -> List[Event]:
events = event_store.get_events(user_id)
return [
{
"timestamp": event.timestamp,
"event_type": event.event_type,
"data": event.data,
"user": event.metadata.get("user_id") # Кто сделал изменение
}
for event in events
]
# Результат:
# [
# {"timestamp": "2024-01-01 10:00:00", "event_type": "UserCreated", "data": {...}},
# {"timestamp": "2024-01-01 10:05:00", "event_type": "BalanceDeposited", "data": {"amount": 1000}},
# {"timestamp": "2024-01-01 10:10:00", "event_type": "BalanceWithdrawn", "data": {"amount": 200}}
# ]
# projections/user_projection.py
class UserProjection:
def __init__(self, read_db):
self.read_db = read_db
def handle_event(self, event: Event):
if isinstance(event, UserCreatedEvent):
self.read_db.execute("""
INSERT INTO user_views (id, name, email, balance)
VALUES (?, ?, ?, ?)
""", (event.aggregate_id, event.name, event.email, 0.0))
elif isinstance(event, BalanceDepositedEvent):
self.read_db.execute("""
UPDATE user_views
SET balance = balance + ?
WHERE id = ?
""", (event.amount, event.aggregate_id))
elif isinstance(event, BalanceWithdrawnEvent):
self.read_db.execute("""
UPDATE user_views
SET balance = balance - ?
WHERE id = ?
""", (event.amount, event.aggregate_id))
# Читаем события из Event Store и обновляем Read Model
projection = UserProjection(read_db)
events = event_store.get_all_events()
for event in events:
projection.handle_event(event)
Write Side (Event Sourcing)
↓
Event Store
↓
Projection
↓
Read Model (Optimized for queries)
💡 Event Sourcing для записи, Read Model для чтения
# Если у пользователя 10,000 событий,
# загрузка всех событий медленная
user = repository.find_by_id("user_1")
# Загружаем 10,000 событий
# Применяем все события
# Медленно!
# Сохраняем снимок состояния каждые N событий
class Snapshot:
def __init__(self, aggregate_id: str, state: dict, version: int):
self.aggregate_id = aggregate_id
self.state = state
self.version = version
# Создаём снимок каждые 100 событий
def create_snapshot(user: User):
snapshot = Snapshot(
aggregate_id=user.id,
state={
"name": user.name,
"email": user.email,
"balance": user.balance
},
version=user.version
)
snapshot_store.save(snapshot)
# Загрузка с снимка
def find_by_id(user_id: str) -> User:
snapshot = snapshot_store.find_latest(user_id)
if snapshot:
# Загружаем снимок
user = User.from_snapshot(snapshot)
# Загружаем только события после снимка
events = event_store.get_events_after(user_id, snapshot.version)
else:
# Загружаем все события
user = User(user_id)
events = event_store.get_events(user_id)
# Применяем события
for event in events:
user.apply(event)
return user
"Event Sourcing — это не про сложность. Это про историю.
Используйте его, когда история важнее текущего состояния."