Message Queues & Streams

Асинхронная коммуникация в распределённых системах

Что такое Message Queue?

Message Queue — механизм асинхронной коммуникации между компонентами системы, где сообщения временно хранятся в очереди до обработки.

🔹 Синхронная коммуникация (проблемы)

Service A → Service B (ждёт ответа)
    ↓
  Блокировка
    ↓
  Медленно
    ↓
  Каскадные сбои
# ❌ Синхронный вызов
response = requests.post("http://service-b/process", json=data)
# Service A ждёт, пока Service B обработает
# Если Service B упал — Service A тоже блокируется

✅ Асинхронная коммуникация (решение)

Service A → Message Queue → Service B
    ↓
  Не блокируется
    ↓
  Быстро
    ↓
  Устойчиво к сбоям
# ✅ Асинхронный вызов
queue.send("process_task", data)
# Service A не ждёт
# Service B обработает когда сможет

Типы Message Queues

Что выбрать и когда?

  • RabbitMQ — задачи/джобы, гарантии доставки, маршрутизация (direct/topic/fanout). Отлично для фоновой обработки.
  • Kafka — высокопроизводительные потоки событий, хранение лога, consumer groups. Отлично для аналитики и event streaming.
  • Redis Pub/Sub/Streams — простые сценарии, низкая задержка, минимум инфраструктуры.

Критерии: схема доставки (at-least/at-most), throughput, порядок, ретеншн, операционные затраты.

🔹 Point-to-Point (Очередь)

Producer → Queue → Consumer
              ↓
         Одно сообщение
         обрабатывается
         одним consumer
# Producer
queue.send("order_created", {"order_id": 123})

# Consumer (один из многих workers)
message = queue.receive()
process_order(message.data)

💡 Используется для: задачи, задания, фоновые процессы

🔹 Publish-Subscribe (Топики)

Publisher → Topic → [Subscriber1, Subscriber2, Subscriber3]
              ↓
         Одно сообщение
         получают все
         подписчики
# Publisher
pubsub.publish("user_registered", {"user_id": 123})

# Subscriber 1
pubsub.subscribe("user_registered", send_welcome_email)

# Subscriber 2
pubsub.subscribe("user_registered", update_analytics)

# Subscriber 3
pubsub.subscribe("user_registered", create_profile)

💡 Используется для: события, уведомления, бродкасты

🔹 Streams (Потоки)

Producer → Stream → Consumer
              ↓
         Непрерывный поток
         сообщений
         (как лента)
# Producer
stream.write("user_123", {"action": "click", "timestamp": "..."})
stream.write("user_123", {"action": "view", "timestamp": "..."})

# Consumer (читает поток)
for event in stream.read("user_123"):
    process_event(event)

💡 Используется для: логи, аналитика, real-time обработка

RabbitMQ

Классический message broker

🔹 Архитектура RabbitMQ

Producer → Exchange → Queue → Consumer
              ↓
         Routing rules
         (direct, topic, fanout)

🔹 Producer (Python)

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Создаём очередь
channel.queue_declare(queue='task_queue', durable=True)

# Отправляем сообщение
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body='{"order_id": 123, "action": "process"}',
    properties=pika.BasicProperties(
        delivery_mode=2,  # Сохранять сообщение на диск
    )
)

connection.close()

🔹 Consumer (Python)

import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

def process_order(ch, method, properties, body):
    data = json.loads(body)
    order_id = data['order_id']
    
    # Обработка заказа
    print(f"Processing order {order_id}")
    
    # Подтверждаем обработку
    ch.basic_ack(delivery_tag=method.delivery_tag)

# Подписываемся на очередь
channel.basic_consume(
    queue='task_queue',
    on_message_callback=process_order
)

channel.start_consuming()

🔹 Exchange Types

  • Direct — маршрутизация по ключу
  • Topic — маршрутизация по паттерну
  • Fanout — бродкаст всем очередям
  • Headers — маршрутизация по заголовкам
# Fanout Exchange (Pub/Sub)
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body=message)

# Topic Exchange
channel.exchange_declare(exchange='events', exchange_type='topic')
channel.basic_publish(
    exchange='events',
    routing_key='order.created',
    body=message
)

Apache Kafka

Распределённая streaming platform

🔹 Архитектура Kafka

Producer → Topic (Partitions) → Consumer Groups
              ↓
         Распределённое
         хранение
         (replication)
  • Topic — поток сообщений
  • Partition — часть топика (для масштабирования)
  • Consumer Group — группа потребителей
  • Broker — сервер Kafka

🔹 Producer (Python)

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Отправляем сообщение
producer.send('user_events', {
    'user_id': 123,
    'event': 'user_registered',
    'timestamp': '2024-01-01T10:00:00Z'
})

producer.flush()
producer.close()

🔹 Consumer (Python)

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'user_events',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    group_id='user_processor_group'
)

# Читаем сообщения
for message in consumer:
    event = message.value
    print(f"Processing event: {event['event']} for user {event['user_id']}")
    
    # Обработка события
    process_event(event)

🔹 Partitions и Consumer Groups

Topic: user_events
  Partition 0: [msg1, msg4, msg7]
  Partition 1: [msg2, msg5, msg8]
  Partition 2: [msg3, msg6, msg9]

Consumer Group: processors
  Consumer 1 → Partition 0
  Consumer 2 → Partition 1
  Consumer 3 → Partition 2

Каждый consumer обрабатывает свою partition

💡 Параллельная обработка сообщений

🔹 Kafka vs RabbitMQ

Критерий RabbitMQ Kafka
Тип Message Queue Streaming Platform
Хранение Временное Постоянное (логи)
Пропускная способность Средняя Очень высокая
Латентность Низкая Средняя
Использование Задачи, задания Логи, события, аналитика

Redis Pub/Sub

Простой pub/sub для небольших систем

🔹 Publisher

import redis

r = redis.Redis(host='localhost', port=6379)

# Публикуем сообщение
r.publish('user_events', json.dumps({
    'user_id': 123,
    'event': 'user_registered'
}))

🔹 Subscriber

import redis
import json

r = redis.Redis(host='localhost', port=6379)
pubsub = r.pubsub()

# Подписываемся на канал
pubsub.subscribe('user_events')

# Читаем сообщения
for message in pubsub.listen():
    if message['type'] == 'message':
        data = json.loads(message['data'])
        process_event(data)

🔹 Redis Streams

# Producer
r.xadd('user_events', {
    'user_id': '123',
    'event': 'user_registered',
    'timestamp': '2024-01-01T10:00:00Z'
})

# Consumer
while True:
    messages = r.xread({'user_events': '$'}, block=1000)
    for stream, msgs in messages:
        for msg_id, data in msgs:
            process_event(data)
            # Подтверждаем обработку
            r.xack('user_events', 'consumer_group', msg_id)

💡 Redis Streams — более продвинутая версия pub/sub с гарантией доставки

Паттерны использования

🔹 Task Queue (Очередь заданий)

# Producer: API endpoint
@app.post("/process")
def process_data(data: Data):
    # Отправляем задание в очередь
    queue.send("process_task", {
        "data_id": data.id,
        "user_id": data.user_id
    })
    return {"status": "queued"}

# Consumer: Worker
def worker():
    while True:
        message = queue.receive()
        if message:
            process_task(message.data)
            queue.ack(message.id)

💡 Используется для: фоновые задачи, обработка файлов, отправка email

🔹 Event-Driven Architecture

# Order Service публикует событие
def create_order(order_data):
    order = Order.create(order_data)
    
    # Публикуем событие
    event_bus.publish("order_created", {
        "order_id": order.id,
        "user_id": order.user_id,
        "total": order.total
    })
    
    return order

# Payment Service подписывается
event_bus.subscribe("order_created", process_payment)

# Notification Service подписывается
event_bus.subscribe("order_created", send_notification)

# Analytics Service подписывается
event_bus.subscribe("order_created", track_analytics)

🔹 Request-Reply Pattern

# Client отправляет запрос
request_id = generate_id()
response_queue = f"response_{request_id}"

# Отправляем запрос
queue.send("process_request", {
    "request_id": request_id,
    "data": request_data,
    "reply_to": response_queue
})

# Ждём ответ
response = queue.receive(response_queue, timeout=30)
return response.data

# Server обрабатывает запрос
def process_request(message):
    request_data = message.data
    result = process(request_data)
    
    # Отправляем ответ
    queue.send(message.reply_to, {
        "request_id": request_data["request_id"],
        "result": result
    })

Гарантии доставки

🔹 At most once (максимум один раз)

Сообщение может быть доставлено 0 или 1 раз.

# Простая отправка, без подтверждения
queue.send("task", data)
# Может потеряться, если consumer упал

💡 Быстро, но возможна потеря сообщений

🔹 At least once (минимум один раз)

Сообщение доставляется минимум один раз, может быть дубликат.

# С подтверждением
message = queue.receive()
process(message.data)
queue.ack(message.id)  # Подтверждаем обработку

# Если ack не отправлен — сообщение вернётся в очередь
# Может быть обработано повторно

💡 Надёжно, но нужна идемпотентность

🔹 Exactly once (ровно один раз)

Сообщение доставляется ровно один раз.

# Идемпотентная обработка
def process_order(order_id):
    # Проверяем, не обработан ли уже
    if is_processed(order_id):
        return
    
    # Обрабатываем
    process(order_id)
    
    # Помечаем как обработанное
    mark_as_processed(order_id)

💡 Сложнее, но гарантирует отсутствие дубликатов

Обработка ошибок

🔹 Dead Letter Queue (DLQ)

# Обработка с повторными попытками
def process_message(message):
    try:
        process(message.data)
        queue.ack(message.id)
    except Exception as e:
        # Увеличиваем счётчик попыток
        message.retry_count += 1
        
        if message.retry_count > 3:
            # Отправляем в DLQ
            dlq.send(message)
            queue.ack(message.id)
        else:
            # Возвращаем в очередь
            queue.nack(message.id, requeue=True)

💡 Сообщения, которые не удалось обработать, отправляются в DLQ

🔹 Circuit Breaker

class CircuitBreaker:
    def __init__(self, failure_threshold=5):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.state = "closed"  # closed, open, half_open
    
    def call(self, func):
        if self.state == "open":
            raise Exception("Circuit breaker is open")
        
        try:
            result = func()
            self.failure_count = 0
            self.state = "closed"
            return result
        except Exception as e:
            self.failure_count += 1
            if self.failure_count >= self.failure_threshold:
                self.state = "open"
            raise e

Когда использовать Message Queues?

✅ Хорошо подходит для:

  • Асинхронная обработка — долгие задачи
  • Развязка сервисов — слабая связность
  • Масштабируемость — горизонтальное масштабирование
  • Отказоустойчивость — сообщения не теряются
  • Event-Driven — события между сервисами
  • Очередь задач — фоновые процессы

❌ Не подходит для:

  • Синхронные операции — нужен немедленный ответ
  • Простые системы — overhead не оправдан
  • Низкая латентность — задержка на обработку
  • Строгая консистентность — eventual consistency

🎯 Ключевые принципы Message Queues

  1. Асинхронность — не блокируем отправителя
  2. Надёжность — сообщения не теряются
  3. Масштабируемость — горизонтальное масштабирование
  4. Идемпотентность — обработка может повторяться
  5. Обработка ошибок — retry, DLQ, circuit breaker
"Message Queues — это не про сложность. Это про надёжность и масштабируемость.
Используйте их, когда нужно развязать компоненты и обеспечить асинхронную обработку."

📚 Инструменты

  • RabbitMQ — классический message broker
  • Apache Kafka — streaming platform
  • Redis — простой pub/sub и streams
  • Amazon SQS — управляемый message queue
  • Celery — распределённая task queue для Python