Абстракция для работы с хранилищем данных
Repository — паттерн, который инкапсулирует логику доступа к данным и предоставляет более объектно-ориентированный взгляд на персистентность.
# ❌ Бизнес-логика зависит от деталей БД
class UserService:
def get_user(self, user_id: int):
# Прямой доступ к БД
conn = psycopg2.connect("dbname=test")
cur = conn.cursor()
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
row = cur.fetchone()
return User(row[0], row[1], row[2])
# Что если нужно сменить БД на MongoDB?
# Что если нужно добавить кэширование?
# Что если нужно протестировать?
# ✅ Бизнес-логика зависит от абстракции
class UserService:
def __init__(self, user_repository: UserRepository):
self.user_repository = user_repository
def get_user(self, user_id: int):
# Используем абстракцию
return self.user_repository.find_by_id(user_id)
# Repository скрывает детали БД
class UserRepository(ABC):
@abstractmethod
def find_by_id(self, user_id: int) -> User:
pass
from abc import ABC, abstractmethod
from typing import List, Optional
class Repository(ABC):
@abstractmethod
def save(self, entity) -> None:
"""Сохранить сущность"""
pass
@abstractmethod
def find_by_id(self, id) -> Optional:
"""Найти по ID"""
pass
@abstractmethod
def find_all(self) -> List:
"""Найти все"""
pass
@abstractmethod
def delete(self, entity) -> None:
"""Удалить сущность"""
pass
class UserRepository(Repository):
@abstractmethod
def find_by_email(self, email: str) -> Optional[User]:
"""Найти по email"""
pass
class PostgresUserRepository(UserRepository):
def __init__(self, db_session):
self.db = db_session
def save(self, user: User) -> None:
if user.id is None:
# Создание
self.db.execute(
"INSERT INTO users (name, email) VALUES (%s, %s) RETURNING id",
(user.name, user.email)
)
user.id = self.db.fetchone()[0]
else:
# Обновление
self.db.execute(
"UPDATE users SET name = %s, email = %s WHERE id = %s",
(user.name, user.email, user.id)
)
self.db.commit()
def find_by_id(self, user_id: int) -> Optional[User]:
row = self.db.execute(
"SELECT id, name, email FROM users WHERE id = %s",
(user_id,)
).fetchone()
if row:
return User(id=row[0], name=row[1], email=row[2])
return None
def find_by_email(self, email: str) -> Optional[User]:
row = self.db.execute(
"SELECT id, name, email FROM users WHERE email = %s",
(email,)
).fetchone()
if row:
return User(id=row[0], name=row[1], email=row[2])
return None
def find_all(self) -> List[User]:
rows = self.db.execute("SELECT id, name, email FROM users").fetchall()
return [User(id=row[0], name=row[1], email=row[2]) for row in rows]
def delete(self, user: User) -> None:
self.db.execute("DELETE FROM users WHERE id = %s", (user.id,))
self.db.commit()
from pymongo import MongoClient
class MongoUserRepository(UserRepository):
def __init__(self, db):
self.collection = db['users']
def save(self, user: User) -> None:
if user.id is None:
# Создание
result = self.collection.insert_one({
"name": user.name,
"email": user.email
})
user.id = result.inserted_id
else:
# Обновление
self.collection.update_one(
{"_id": user.id},
{"$set": {"name": user.name, "email": user.email}}
)
def find_by_id(self, user_id: int) -> Optional[User]:
doc = self.collection.find_one({"_id": user_id})
if doc:
return User(id=doc["_id"], name=doc["name"], email=doc["email"])
return None
def find_by_email(self, email: str) -> Optional[User]:
doc = self.collection.find_one({"email": email})
if doc:
return User(id=doc["_id"], name=doc["name"], email=doc["email"])
return None
def find_all(self) -> List[User]:
docs = self.collection.find()
return [User(id=doc["_id"], name=doc["name"], email=doc["email"])
for doc in docs]
def delete(self, user: User) -> None:
self.collection.delete_one({"_id": user.id})
# Сервис не знает, какая БД используется
class UserService:
def __init__(self, user_repository: UserRepository):
self.user_repository = user_repository
def create_user(self, name: str, email: str) -> User:
# Проверяем, существует ли пользователь
existing = self.user_repository.find_by_email(email)
if existing:
raise ValueError("User already exists")
# Создаём пользователя
user = User(name=name, email=email)
self.user_repository.save(user)
return user
def get_user(self, user_id: int) -> User:
user = self.user_repository.find_by_id(user_id)
if not user:
raise ValueError("User not found")
return user
# Можно использовать с любой реализацией
postgres_repo = PostgresUserRepository(db_session)
mongo_repo = MongoUserRepository(mongo_db)
service = UserService(postgres_repo) # или mongo_repo
Управление транзакциями
# ❌ Каждый repository управляет своей транзакцией
user_repo.save(user) # Транзакция 1
order_repo.save(order) # Транзакция 2
# Что если order_repo.save() упадёт?
# user уже сохранён, но order нет
# Несогласованное состояние!
class UnitOfWork(ABC):
@abstractmethod
def __enter__(self):
pass
@abstractmethod
def __exit__(self, exc_type, exc_val, exc_tb):
pass
@abstractmethod
def commit(self):
pass
@abstractmethod
def rollback(self):
pass
class PostgresUnitOfWork(UnitOfWork):
def __init__(self, db_session):
self.db = db_session
self.user_repo = PostgresUserRepository(db_session)
self.order_repo = PostgresOrderRepository(db_session)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.commit()
else:
self.rollback()
def commit(self):
self.db.commit()
def rollback(self):
self.db.rollback()
# Использование
with PostgresUnitOfWork(db_session) as uow:
uow.user_repo.save(user)
uow.order_repo.save(order)
# Если что-то упадёт — автоматический rollback
from abc import ABC, abstractmethod
class Specification(ABC):
@abstractmethod
def is_satisfied_by(self, entity) -> bool:
pass
def __and__(self, other):
return AndSpecification(self, other)
def __or__(self, other):
return OrSpecification(self, other)
class UserEmailSpecification(Specification):
def __init__(self, email: str):
self.email = email
def is_satisfied_by(self, user: User) -> bool:
return user.email == self.email
class UserNameSpecification(Specification):
def __init__(self, name_pattern: str):
self.name_pattern = name_pattern
def is_satisfied_by(self, user: User) -> bool:
return self.name_pattern in user.name
# Использование
spec = UserEmailSpecification("alice@example.com") & UserNameSpecification("Alice")
users = user_repository.find_by_specification(spec)
class CachedUserRepository(UserRepository):
def __init__(self, user_repository: UserRepository, cache):
self.user_repository = user_repository
self.cache = cache
def find_by_id(self, user_id: int) -> Optional[User]:
# Проверяем кэш
cache_key = f"user:{user_id}"
cached_user = self.cache.get(cache_key)
if cached_user:
return cached_user
# Если нет в кэше — загружаем из БД
user = self.user_repository.find_by_id(user_id)
if user:
self.cache.set(cache_key, user, ttl=3600)
return user
def save(self, user: User) -> None:
# Сохраняем в БД
self.user_repository.save(user)
# Обновляем кэш
cache_key = f"user:{user.id}"
self.cache.set(cache_key, user, ttl=3600)
def delete(self, user: User) -> None:
# Удаляем из БД
self.user_repository.delete(user)
# Удаляем из кэша
cache_key = f"user:{user.id}"
self.cache.delete(cache_key)
from sqlalchemy.orm import Session
from typing import TypeVar, Generic, List, Optional
T = TypeVar('T')
class SQLAlchemyRepository(Generic[T]):
def __init__(self, session: Session, model_class):
self.session = session
self.model_class = model_class
def save(self, entity: T) -> T:
self.session.add(entity)
self.session.commit()
self.session.refresh(entity)
return entity
def find_by_id(self, id: int) -> Optional[T]:
return self.session.query(self.model_class).filter_by(id=id).first()
def find_all(self) -> List[T]:
return self.session.query(self.model_class).all()
def delete(self, entity: T) -> None:
self.session.delete(entity)
self.session.commit()
# Использование
user_repository = SQLAlchemyRepository(session, User)
user = user_repository.find_by_id(1)
class UserRepository(SQLAlchemyRepository[User]):
def __init__(self, session: Session):
super().__init__(session, User)
def find_by_email(self, email: str) -> Optional[User]:
return self.session.query(User).filter_by(email=email).first()
def find_active_users(self) -> List[User]:
return self.session.query(User).filter_by(active=True).all()
def find_by_name_pattern(self, pattern: str) -> List[User]:
return self.session.query(User).filter(
User.name.like(f"%{pattern}%")
).all()
# Использование
user_repo = UserRepository(session)
user = user_repo.find_by_email("alice@example.com")
active_users = user_repo.find_active_users()
class InMemoryUserRepository(UserRepository):
def __init__(self):
self.users = {}
self.next_id = 1
def save(self, user: User) -> None:
if user.id is None:
user.id = self.next_id
self.next_id += 1
self.users[user.id] = user
def find_by_id(self, user_id: int) -> Optional[User]:
return self.users.get(user_id)
def find_by_email(self, email: str) -> Optional[User]:
for user in self.users.values():
if user.email == email:
return user
return None
def find_all(self) -> List[User]:
return list(self.users.values())
def delete(self, user: User) -> None:
if user.id in self.users:
del self.users[user.id]
# Тестирование
def test_user_service():
# Используем in-memory repository
repo = InMemoryUserRepository()
service = UserService(repo)
# Тест
user = service.create_user("Alice", "alice@example.com")
assert user.id is not None
assert service.get_user(user.id).name == "Alice"
from unittest.mock import Mock
def test_user_service_with_mock():
# Создаём mock repository
mock_repo = Mock(spec=UserRepository)
mock_repo.find_by_id.return_value = User(id=1, name="Alice", email="alice@example.com")
service = UserService(mock_repo)
user = service.get_user(1)
assert user.name == "Alice"
mock_repo.find_by_id.assert_called_once_with(1)
| Сценарий | Рекомендация |
|---|---|
| Сложная бизнес-логика | ✅ Подходит |
| Нужна возможность смены БД | ✅ Подходит |
| Тестирование | ✅ Подходит |
| Простые CRUD операции | ⚠️ Может быть избыточно |
| Использование ORM (SQLAlchemy, Django ORM) | ⚠️ ORM уже является абстракцией |
"Repository — это не про код. Это про разделение ответственности.
Домен не должен знать, как данные хранятся. Repository скрывает эти детали."