Co je RabbitMQ?
RabbitMQ je message broker implementující protokol AMQP (Advanced Message Queuing Protocol). Umožňuje komunikaci mezi aplikacemi nebo jejich komponentami. RabbitMQ umožňuje oddělení producentů a konzumentů, což podporuje asynchronní komunikaci.
Hlavní pojmy
- Message Broker: Implementuje systém front zpráv.
- Exchanges (výměníky): Komponenty odpovědné za směrování zpráv do správných front.
- Binding: Spojení mezi výměníkem a frontou, identifikován pomocí binding key.
- Routing key: Identifikace jednotlivých zpráv.
Příklad: Posílání zprávy (producent)
import pika
# Navázání spojení s RabbitMQ serverem
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Deklarace fronty
channel.queue_declare(queue='hello')
# Odeslání zprávy
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello, RabbitMQ!')
print(" [x] Zpráva 'Hello, RabbitMQ!' byla odeslána.")
# Uzavření spojení
connection.close()
Příklad: Příjem zprávy (konzument)
import pika
def callback(ch, method, properties, body):
print(f" [x] Přijatá zpráva: {body}")
# Navázání spojení s RabbitMQ serverem
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Deklarace fronty
channel.queue_declare(queue='hello')
# Přihlášení ke zprávám z fronty
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Čekám na zprávy. Pro ukončení stiskněte CTRL+C')
channel.start_consuming()
Výhody RabbitMQ
Mezi hlavní výhody RabbitMQ patří:
- Oddělení (decoupling): Producenti nemusí čekat na zpracování zpráv, což umožňuje asynchronní provádění úloh.
- Škálování: RabbitMQ usnadňuje přidávání nových producentů nebo konzumentů, což podporuje horizontální škálování.
- Výkon: Broker může běžet na samostatném zařízení, což zlepšuje výkon.
Typy výměníků (Exchanges)
Výměníky definují, jakým způsobem jsou zprávy směrovány do front:
- Direct: Směřuje zprávy do konkrétní fronty na základě kompletní shody binding key a routing key.
- Fanout: Rozesílá všechny zprávy do všech front napojených na výměník.
- Topic: Směruje zprávy do front na základě částečné shody mezi binding key a routing key (pattern matching).
- Header: Směruje zprávy podle hodnot v hlavičkách zpráv.
Příklad: Implementace Fanout Exchange
import pika
# Navázání spojení s RabbitMQ serverem
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Deklarace výměníku typu fanout
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# Deklarace fronty
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
# Přiřazení fronty k výměníku
channel.queue_bind(exchange='logs', queue=queue_name)
# Callback pro zpracování přijatých zpráv
def callback(ch, method, properties, body):
print(f" [x] Přijatý log: {body}")
# Přihlášení ke zprávám
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Čekám na logy. Pro ukončení stiskněte CTRL+C')
channel.start_consuming()
Funkce a vlastnosti
- Spolehlivost: Možnost zajištění, že se zprávy neztratí ani při výpadcích.
- Flexibilita: Podpora více protokolů, například AMQP, MQTT nebo STOMP.
- Škálovatelnost: Jednoduché přidávání producentů, konzumentů nebo zvýšení výkonu systému.
- Pluggability: Možnost snadného rozšíření funkcionality pomocí pluginů.
Příklad: Spolehlivost zpráv
import pika
# Navázání spojení s RabbitMQ serverem
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Deklarace fronty s povolením trvalosti (durable)
channel.queue_declare(queue='task_queue', durable=True)
# Publikování zprávy s potvrzením
channel.basic_publish(exchange='',
routing_key='task_queue',
body='Úkol 1',
properties=pika.BasicProperties(
delivery_mode=2, # Zpráva je trvalá
))
print(" [x] Zpráva 'Úkol 1' byla odeslána.")
# Uzavření spojení
connection.close()
Využití RabbitMQ
RabbitMQ nachází využití v různých scénářích:
- Asynchronní zpracování úloh: Oddělení umožňuje producentům odesílat úlohy bez čekání na jejich dokončení, což dovoluje zpracovávání úloh, které jsou časově náročné nebo nevyžadují okamžité zpracování.
- Load balancing: Rozložení zátěže mezi více konzumentů.
- Zpracování dat v reálném čase: Užitečné pro monitoring, logování a další.
Příklad: Zpracování logů v reálném čase
import pika
# Navázání spojení s RabbitMQ serverem
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Deklarace výměníku typu fanout pro logy
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# Deklarace fronty pro příjem logů
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
# Přiřazení fronty k výměníku
channel.queue_bind(exchange='logs', queue=queue_name)
# Callback pro zpracování logů
def callback(ch, method, properties, body):
print(f" [x] Přijatý log: {body}")
# Přihlášení ke zprávám
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Čekám na logy. Pro ukončení stiskněte CTRL+C')
channel.start_consuming()