Event Sourcing

Хранение событий вместо состояния

Аудит, история, временные путешествия

Что такое Event Sourcing?

Event Sourcing — паттерн, при котором состояние приложения хранится как последовательность событий, а не как текущее состояние.

🔹 Традиционный подход

# ❌ Храним только текущее состояние
class User:
    id: int
    name: str
    email: str
    balance: float

# В БД:
users table:
  id | name  | email           | balance
  1  | Alice | alice@example.com | 1000.0

# Проблемы:
# - Нет истории изменений
# - Невозможно откатить изменения
# - Нет аудита

✅ Event Sourcing подход

# ✅ Храним события
events table:
  id | aggregate_id | event_type | data | timestamp
  1  | user_1       | UserCreated | {...} | 2024-01-01 10:00:00
  2  | user_1       | BalanceDeposited | {"amount": 1000} | 2024-01-01 10:05:00
  3  | user_1       | BalanceWithdrawn | {"amount": 200} | 2024-01-01 10:10:00

# Текущее состояние вычисляется из событий:
# user.balance = 0 + 1000 - 200 = 800

💡 Состояние — это результат применения всех событий

🔹 Преимущества Event Sourcing

  • Полная история — все изменения сохранены
  • Аудит — кто, что, когда изменил
  • Временные путешествия — можно восстановить состояние на любой момент
  • Откат изменений — можно отменить событие
  • Дебаг — можно воспроизвести всю последовательность событий

Реализация Event Sourcing

🔹 Events (События)

# events/user_events.py
from dataclasses import dataclass
from datetime import datetime

@dataclass
class Event:
    aggregate_id: str
    event_type: str
    data: dict
    timestamp: datetime
    version: int

@dataclass
class UserCreatedEvent(Event):
    aggregate_id: str
    name: str
    email: str
    timestamp: datetime
    version: int = 1

@dataclass
class BalanceDepositedEvent(Event):
    aggregate_id: str
    amount: float
    timestamp: datetime
    version: int

@dataclass
class BalanceWithdrawnEvent(Event):
    aggregate_id: str
    amount: float
    timestamp: datetime
    version: int

🔹 Aggregate (Агрегат)

# aggregates/user.py
class User:
    def __init__(self, user_id: str):
        self.id = user_id
        self.name = None
        self.email = None
        self.balance = 0.0
        self.version = 0
        self.uncommitted_events = []
    
    @staticmethod
    def create(user_id: str, name: str, email: str):
        user = User(user_id)
        event = UserCreatedEvent(
            aggregate_id=user_id,
            name=name,
            email=email,
            timestamp=datetime.now(),
            version=1
        )
        user.apply(event)
        user.uncommitted_events.append(event)
        return user
    
    def deposit(self, amount: float):
        event = BalanceDepositedEvent(
            aggregate_id=self.id,
            amount=amount,
            timestamp=datetime.now(),
            version=self.version + 1
        )
        self.apply(event)
        self.uncommitted_events.append(event)
    
    def withdraw(self, amount: float):
        if self.balance < amount:
            raise ValueError("Insufficient funds")
        
        event = BalanceWithdrawnEvent(
            aggregate_id=self.id,
            amount=amount,
            timestamp=datetime.now(),
            version=self.version + 1
        )
        self.apply(event)
        self.uncommitted_events.append(event)
    
    def apply(self, event: Event):
        if isinstance(event, UserCreatedEvent):
            self.name = event.name
            self.email = event.email
        elif isinstance(event, BalanceDepositedEvent):
            self.balance += event.amount
        elif isinstance(event, BalanceWithdrawnEvent):
            self.balance -= event.amount
        self.version = event.version

🔹 Event Store (Хранилище событий)

# event_store.py
from abc import ABC, abstractmethod

class EventStore(ABC):
    @abstractmethod
    def save_events(self, aggregate_id: str, events: List[Event], expected_version: int):
        pass
    
    @abstractmethod
    def get_events(self, aggregate_id: str) -> List[Event]:
        pass

# Реализация для PostgreSQL
class PostgresEventStore(EventStore):
    def __init__(self, db):
        self.db = db
    
    def save_events(self, aggregate_id: str, events: List[Event], expected_version: int):
        # Проверяем версию (оптимистичная блокировка)
        current_version = self.db.fetch_one(
            "SELECT MAX(version) FROM events WHERE aggregate_id = ?",
            (aggregate_id,)
        )['max'] or 0
        
        if current_version != expected_version:
            raise ValueError("Concurrent modification detected")
        
        # Сохраняем события
        for event in events:
            self.db.execute("""
                INSERT INTO events (aggregate_id, event_type, data, timestamp, version)
                VALUES (?, ?, ?, ?, ?)
            """, (
                event.aggregate_id,
                event.event_type,
                json.dumps(event.data),
                event.timestamp,
                event.version
            ))
    
    def get_events(self, aggregate_id: str) -> List[Event]:
        rows = self.db.fetch_all(
            "SELECT * FROM events WHERE aggregate_id = ? ORDER BY version",
            (aggregate_id,)
        )
        return [self._deserialize_event(row) for row in rows]
    
    def _deserialize_event(self, row) -> Event:
        event_class = EVENT_TYPES[row['event_type']]
        data = json.loads(row['data'])
        return event_class(**data, version=row['version'], timestamp=row['timestamp'])

🔹 Repository

# repositories/user_repository.py
class UserRepository:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
    
    def save(self, user: User):
        events = user.uncommitted_events
        expected_version = user.version - len(events)
        self.event_store.save_events(user.id, events, expected_version)
        user.uncommitted_events = []
    
    def find_by_id(self, user_id: str) -> User:
        events = self.event_store.get_events(user_id)
        if not events:
            return None
        
        user = User(user_id)
        for event in events:
            user.apply(event)
        return user

💡 Восстанавливаем состояние из событий

Практический пример: Банковский счёт

🔹 Создание счёта

# Создаём пользователя
user = User.create("user_1", "Alice", "alice@example.com")

# События:
# 1. UserCreatedEvent(user_id="user_1", name="Alice", email="alice@example.com")

# Сохраняем
repository.save(user)

# В Event Store:
# id | aggregate_id | event_type    | data                          | version
# 1  | user_1       | UserCreated   | {"name":"Alice","email":"..."} | 1

🔹 Операции со счётом

# Загружаем пользователя из событий
user = repository.find_by_id("user_1")
# Применяем события: UserCreatedEvent
# Результат: user.balance = 0.0

# Депозит
user.deposit(1000.0)
# Событие: BalanceDepositedEvent(amount=1000.0)

# Снятие
user.withdraw(200.0)
# Событие: BalanceWithdrawnEvent(amount=200.0)

# Сохраняем
repository.save(user)

# В Event Store:
# id | aggregate_id | event_type         | data           | version
# 1  | user_1       | UserCreated        | {...}          | 1
# 2  | user_1       | BalanceDeposited   | {"amount":1000}| 2
# 3  | user_1       | BalanceWithdrawn   | {"amount":200} | 3

# Текущее состояние: balance = 0 + 1000 - 200 = 800.0

🔹 Восстановление состояния

# Восстанавливаем состояние на любой момент времени
def get_user_at_time(user_id: str, at_time: datetime) -> User:
    events = event_store.get_events_until(user_id, at_time)
    user = User(user_id)
    for event in events:
        user.apply(event)
    return user

# Получаем состояние на 10:05 (до снятия)
user_at_10_05 = get_user_at_time("user_1", datetime(2024, 1, 1, 10, 5))
# balance = 1000.0 (только депозит)

# Получаем состояние на 10:15 (после всех операций)
user_at_10_15 = get_user_at_time("user_1", datetime(2024, 1, 1, 10, 15))
# balance = 800.0 (депозит - снятие)

🔹 Аудит

# Получаем историю изменений
def get_audit_trail(user_id: str) -> List[Event]:
    events = event_store.get_events(user_id)
    return [
        {
            "timestamp": event.timestamp,
            "event_type": event.event_type,
            "data": event.data,
            "user": event.metadata.get("user_id")  # Кто сделал изменение
        }
        for event in events
    ]

# Результат:
# [
#   {"timestamp": "2024-01-01 10:00:00", "event_type": "UserCreated", "data": {...}},
#   {"timestamp": "2024-01-01 10:05:00", "event_type": "BalanceDeposited", "data": {"amount": 1000}},
#   {"timestamp": "2024-01-01 10:10:00", "event_type": "BalanceWithdrawn", "data": {"amount": 200}}
# ]

Projections и Read Models

🔹 Projection (Проекция)

# projections/user_projection.py
class UserProjection:
    def __init__(self, read_db):
        self.read_db = read_db
    
    def handle_event(self, event: Event):
        if isinstance(event, UserCreatedEvent):
            self.read_db.execute("""
                INSERT INTO user_views (id, name, email, balance)
                VALUES (?, ?, ?, ?)
            """, (event.aggregate_id, event.name, event.email, 0.0))
        
        elif isinstance(event, BalanceDepositedEvent):
            self.read_db.execute("""
                UPDATE user_views
                SET balance = balance + ?
                WHERE id = ?
            """, (event.amount, event.aggregate_id))
        
        elif isinstance(event, BalanceWithdrawnEvent):
            self.read_db.execute("""
                UPDATE user_views
                SET balance = balance - ?
                WHERE id = ?
            """, (event.amount, event.aggregate_id))

# Читаем события из Event Store и обновляем Read Model
projection = UserProjection(read_db)
events = event_store.get_all_events()
for event in events:
    projection.handle_event(event)

🔹 Event Sourcing + CQRS

Write Side (Event Sourcing)
    ↓
Event Store
    ↓
Projection
    ↓
Read Model (Optimized for queries)

💡 Event Sourcing для записи, Read Model для чтения

Snapshots (Снимки)

🔹 Проблема: много событий

# Если у пользователя 10,000 событий,
# загрузка всех событий медленная

user = repository.find_by_id("user_1")
# Загружаем 10,000 событий
# Применяем все события
# Медленно!

✅ Решение: Snapshots

# Сохраняем снимок состояния каждые N событий
class Snapshot:
    def __init__(self, aggregate_id: str, state: dict, version: int):
        self.aggregate_id = aggregate_id
        self.state = state
        self.version = version

# Создаём снимок каждые 100 событий
def create_snapshot(user: User):
    snapshot = Snapshot(
        aggregate_id=user.id,
        state={
            "name": user.name,
            "email": user.email,
            "balance": user.balance
        },
        version=user.version
    )
    snapshot_store.save(snapshot)

# Загрузка с снимка
def find_by_id(user_id: str) -> User:
    snapshot = snapshot_store.find_latest(user_id)
    if snapshot:
        # Загружаем снимок
        user = User.from_snapshot(snapshot)
        # Загружаем только события после снимка
        events = event_store.get_events_after(user_id, snapshot.version)
    else:
        # Загружаем все события
        user = User(user_id)
        events = event_store.get_events(user_id)
    
    # Применяем события
    for event in events:
        user.apply(event)
    
    return user

Когда использовать Event Sourcing?

✅ Хорошо подходит для:

  • Аудит — нужно знать, кто, что, когда изменил
  • История — нужно восстановить состояние на любой момент
  • Комpliance — финансовые системы, медицинские записи
  • Дебаг — нужно воспроизвести последовательность событий
  • Аналитика — анализ изменений во времени

❌ Не подходит для:

  • Простые CRUD — избыточно
  • Высокая производительность — overhead на восстановление состояния
  • Простые модели — нет смысла хранить события
  • Нет требований к аудиту — overhead не оправдан

🎯 Ключевые принципы Event Sourcing

  1. Храним события — не состояние, а последовательность событий
  2. Восстанавливаем состояние — применяем все события
  3. Неизменяемость — события не изменяются, только добавляются
  4. Версионирование — каждое событие имеет версию
  5. Projections — строим Read Models из событий
"Event Sourcing — это не про сложность. Это про историю.
Используйте его, когда история важнее текущего состояния."

📚 Дополнительные ресурсы

  • Статьи: martinfowler.com/eaaDev/EventSourcing.html
  • Книга: "Implementing Domain-Driven Design" by Vaughn Vernon
  • Паттерн: CQRS (часто используется с Event Sourcing)