Асинхронная коммуникация в распределённых системах
Message Queue — механизм асинхронной коммуникации между компонентами системы, где сообщения временно хранятся в очереди до обработки.
Service A → Service B (ждёт ответа)
↓
Блокировка
↓
Медленно
↓
Каскадные сбои
# ❌ Синхронный вызов
response = requests.post("http://service-b/process", json=data)
# Service A ждёт, пока Service B обработает
# Если Service B упал — Service A тоже блокируется
Service A → Message Queue → Service B
↓
Не блокируется
↓
Быстро
↓
Устойчиво к сбоям
# ✅ Асинхронный вызов
queue.send("process_task", data)
# Service A не ждёт
# Service B обработает когда сможет
Критерии: схема доставки (at-least/at-most), throughput, порядок, ретеншн, операционные затраты.
Producer → Queue → Consumer
↓
Одно сообщение
обрабатывается
одним consumer
# Producer
queue.send("order_created", {"order_id": 123})
# Consumer (один из многих workers)
message = queue.receive()
process_order(message.data)
💡 Используется для: задачи, задания, фоновые процессы
Publisher → Topic → [Subscriber1, Subscriber2, Subscriber3]
↓
Одно сообщение
получают все
подписчики
# Publisher
pubsub.publish("user_registered", {"user_id": 123})
# Subscriber 1
pubsub.subscribe("user_registered", send_welcome_email)
# Subscriber 2
pubsub.subscribe("user_registered", update_analytics)
# Subscriber 3
pubsub.subscribe("user_registered", create_profile)
💡 Используется для: события, уведомления, бродкасты
Producer → Stream → Consumer
↓
Непрерывный поток
сообщений
(как лента)
# Producer
stream.write("user_123", {"action": "click", "timestamp": "..."})
stream.write("user_123", {"action": "view", "timestamp": "..."})
# Consumer (читает поток)
for event in stream.read("user_123"):
process_event(event)
💡 Используется для: логи, аналитика, real-time обработка
Классический message broker
Producer → Exchange → Queue → Consumer
↓
Routing rules
(direct, topic, fanout)
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Создаём очередь
channel.queue_declare(queue='task_queue', durable=True)
# Отправляем сообщение
channel.basic_publish(
exchange='',
routing_key='task_queue',
body='{"order_id": 123, "action": "process"}',
properties=pika.BasicProperties(
delivery_mode=2, # Сохранять сообщение на диск
)
)
connection.close()
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
def process_order(ch, method, properties, body):
data = json.loads(body)
order_id = data['order_id']
# Обработка заказа
print(f"Processing order {order_id}")
# Подтверждаем обработку
ch.basic_ack(delivery_tag=method.delivery_tag)
# Подписываемся на очередь
channel.basic_consume(
queue='task_queue',
on_message_callback=process_order
)
channel.start_consuming()
# Fanout Exchange (Pub/Sub)
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body=message)
# Topic Exchange
channel.exchange_declare(exchange='events', exchange_type='topic')
channel.basic_publish(
exchange='events',
routing_key='order.created',
body=message
)
Распределённая streaming platform
Producer → Topic (Partitions) → Consumer Groups
↓
Распределённое
хранение
(replication)
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Отправляем сообщение
producer.send('user_events', {
'user_id': 123,
'event': 'user_registered',
'timestamp': '2024-01-01T10:00:00Z'
})
producer.flush()
producer.close()
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'user_events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='user_processor_group'
)
# Читаем сообщения
for message in consumer:
event = message.value
print(f"Processing event: {event['event']} for user {event['user_id']}")
# Обработка события
process_event(event)
Topic: user_events Partition 0: [msg1, msg4, msg7] Partition 1: [msg2, msg5, msg8] Partition 2: [msg3, msg6, msg9] Consumer Group: processors Consumer 1 → Partition 0 Consumer 2 → Partition 1 Consumer 3 → Partition 2 Каждый consumer обрабатывает свою partition
💡 Параллельная обработка сообщений
| Критерий | RabbitMQ | Kafka |
|---|---|---|
| Тип | Message Queue | Streaming Platform |
| Хранение | Временное | Постоянное (логи) |
| Пропускная способность | Средняя | Очень высокая |
| Латентность | Низкая | Средняя |
| Использование | Задачи, задания | Логи, события, аналитика |
Простой pub/sub для небольших систем
import redis
r = redis.Redis(host='localhost', port=6379)
# Публикуем сообщение
r.publish('user_events', json.dumps({
'user_id': 123,
'event': 'user_registered'
}))
import redis
import json
r = redis.Redis(host='localhost', port=6379)
pubsub = r.pubsub()
# Подписываемся на канал
pubsub.subscribe('user_events')
# Читаем сообщения
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
process_event(data)
# Producer
r.xadd('user_events', {
'user_id': '123',
'event': 'user_registered',
'timestamp': '2024-01-01T10:00:00Z'
})
# Consumer
while True:
messages = r.xread({'user_events': '$'}, block=1000)
for stream, msgs in messages:
for msg_id, data in msgs:
process_event(data)
# Подтверждаем обработку
r.xack('user_events', 'consumer_group', msg_id)
💡 Redis Streams — более продвинутая версия pub/sub с гарантией доставки
# Producer: API endpoint
@app.post("/process")
def process_data(data: Data):
# Отправляем задание в очередь
queue.send("process_task", {
"data_id": data.id,
"user_id": data.user_id
})
return {"status": "queued"}
# Consumer: Worker
def worker():
while True:
message = queue.receive()
if message:
process_task(message.data)
queue.ack(message.id)
💡 Используется для: фоновые задачи, обработка файлов, отправка email
# Order Service публикует событие
def create_order(order_data):
order = Order.create(order_data)
# Публикуем событие
event_bus.publish("order_created", {
"order_id": order.id,
"user_id": order.user_id,
"total": order.total
})
return order
# Payment Service подписывается
event_bus.subscribe("order_created", process_payment)
# Notification Service подписывается
event_bus.subscribe("order_created", send_notification)
# Analytics Service подписывается
event_bus.subscribe("order_created", track_analytics)
# Client отправляет запрос
request_id = generate_id()
response_queue = f"response_{request_id}"
# Отправляем запрос
queue.send("process_request", {
"request_id": request_id,
"data": request_data,
"reply_to": response_queue
})
# Ждём ответ
response = queue.receive(response_queue, timeout=30)
return response.data
# Server обрабатывает запрос
def process_request(message):
request_data = message.data
result = process(request_data)
# Отправляем ответ
queue.send(message.reply_to, {
"request_id": request_data["request_id"],
"result": result
})
Сообщение может быть доставлено 0 или 1 раз.
# Простая отправка, без подтверждения
queue.send("task", data)
# Может потеряться, если consumer упал
💡 Быстро, но возможна потеря сообщений
Сообщение доставляется минимум один раз, может быть дубликат.
# С подтверждением
message = queue.receive()
process(message.data)
queue.ack(message.id) # Подтверждаем обработку
# Если ack не отправлен — сообщение вернётся в очередь
# Может быть обработано повторно
💡 Надёжно, но нужна идемпотентность
Сообщение доставляется ровно один раз.
# Идемпотентная обработка
def process_order(order_id):
# Проверяем, не обработан ли уже
if is_processed(order_id):
return
# Обрабатываем
process(order_id)
# Помечаем как обработанное
mark_as_processed(order_id)
💡 Сложнее, но гарантирует отсутствие дубликатов
# Обработка с повторными попытками
def process_message(message):
try:
process(message.data)
queue.ack(message.id)
except Exception as e:
# Увеличиваем счётчик попыток
message.retry_count += 1
if message.retry_count > 3:
# Отправляем в DLQ
dlq.send(message)
queue.ack(message.id)
else:
# Возвращаем в очередь
queue.nack(message.id, requeue=True)
💡 Сообщения, которые не удалось обработать, отправляются в DLQ
class CircuitBreaker:
def __init__(self, failure_threshold=5):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.state = "closed" # closed, open, half_open
def call(self, func):
if self.state == "open":
raise Exception("Circuit breaker is open")
try:
result = func()
self.failure_count = 0
self.state = "closed"
return result
except Exception as e:
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise e
"Message Queues — это не про сложность. Это про надёжность и масштабируемость.
Используйте их, когда нужно развязать компоненты и обеспечить асинхронную обработку."