Python e Kafka no Azure Event Hubs

Python e Kafka no Azure Event Hubs

Neste artigo você vai produzir e consumir mensagens usando Python kafka no Azure Event Hubs com autenticação via SAS tokens. O Terraform provisiona auth rules e consumer groups, e o código Python usa a biblioteca confluent-kafka para se conectar ao namespace Event Hubs via protocolo Kafka na porta 9093. Ao final, você terá um produtor e um consumidor Python kafka no Azure Event Hubs funcionando de ponta a ponta.

Série: Kafka no Azure com Event Hubs
  • 📖 Art. 01: Kafka no Azure com Event Hubs
  • 📖 Art. 02: Azure Event Hubs com Terraform
  • ⚙️ Art. 03 (este): Python e Kafka no Azure Event Hubs
  • 🔒 Art. 04: Kafka Connect no Azure Container Instances
  • 🔒 Art. 05: Schema Registry no Azure Event Hubs
  • 🔒 Art. 06: Monitoramento Kafka com Grafana no Azure

Sumário

Pré-requisitos

Para este artigo de Python kafka no Azure Event Hubs, você precisa ter concluído o Art. 02 — o namespace evhns-kafka-blog-castilho-eus com os três event hubs deve estar provisionado. Você também precisa de Python 3.8+ e instalar a biblioteca confluent-kafka:

pip install confluent-kafka

A biblioteca confluent-kafka é a escolha recomendada para Python kafka no Azure Event Hubs porque usa o librdkafka nativo, tem suporte completo a SASL/SSL e é mantida pela Confluent com compatibilidade garantida com o protocolo Kafka. A alternativa kafka-python também funciona, mas tem menos suporte a funcionalidades avançadas de autenticação.

Auth Rules e Consumer Groups com Terraform

Antes de conectar o Python kafka no Azure Event Hubs, o Terraform provisiona as auth rules SAS e o consumer group. Auth rules são credenciais no nível do event hub que concedem permissões específicas: send para produtores, listen para consumidores. O princípio do menor privilégio é aplicado: o produtor não tem permissão de listen, e o consumidor não tem permissão de send.

Passo 1 — main.tf: módulo de autenticação

module "auth_pedidos" {
  source              = "../../terraform-eventhub-auth-modules"
  namespace_name      = "evhns-kafka-blog-castilho-eus"
  eventhub_name       = "evh-pedidos-blog-castilho-eus"
  resource_group_name = "rg-kafka-blog-castilho-eus"

  auth_rules = [
    { name = "rule-producer-pedidos", listen = false, send = true, manage = false },
    { name = "rule-consumer-pedidos", listen = true, send = false, manage = false },
  ]

  consumer_group_names = ["cg-python-app"]
}

O módulo terraform-eventhub-auth-modules cria as auth rules e o consumer group no event hub de pedidos. O consumer group cg-python-app é o identificador que o consumidor Python vai usar para manter seu offset de leitura — se múltiplas instâncias usarem o mesmo consumer group, o Event Hubs distribui as partições entre elas automaticamente.

Passo 2 — Executar o Terraform

cp backend.hcl.example backend.hcl
terraform init -backend-config=backend.hcl
terraform plan
terraform apply

Resultado do apply

Após o apply, os seguintes recursos são criados para suportar o Python kafka no Azure Event Hubs:

RecursoNomePermissão
Auth Rulerule-producer-pedidossend=true, listen=false
Auth Rulerule-consumer-pedidoslisten=true, send=false
Consumer Groupcg-python-app

Passo 3 — Obter connection string via CLI

Para autenticar o Python kafka no Azure Event Hubs, você precisa da connection string SAS da auth rule. Obtenha-a via Azure CLI (nunca commite no repositório):

# Connection string do produtor
az eventhubs eventhub authorization-rule keys list \
  --resource-group rg-kafka-blog-castilho-eus \
  --namespace-name evhns-kafka-blog-castilho-eus \
  --eventhub-name evh-pedidos-blog-castilho-eus \
  --name rule-producer-pedidos \
  --query primaryConnectionString -o tsv

# Connection string do consumidor
az eventhubs eventhub authorization-rule keys list \
  --resource-group rg-kafka-blog-castilho-eus \
  --namespace-name evhns-kafka-blog-castilho-eus \
  --eventhub-name evh-pedidos-blog-castilho-eus \
  --name rule-consumer-pedidos \
  --query primaryConnectionString -o tsv

A connection string tem o formato: Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<rule>;SharedAccessKey=<key>;EntityPath=<eventhub>. A senha para o protocolo Kafka é a connection string completa, e o username é sempre $ConnectionString.

Passo 4 — Produtor Python

O produtor Python kafka no Azure Event Hubs usa SASL_SSL com a connection string como senha. O tópico é o nome do event hub.

import json
import os
from confluent_kafka import Producer

NAMESPACE = "evhns-kafka-blog-castilho-eus.servicebus.windows.net"
EVENTHUB  = "evh-pedidos-blog-castilho-eus"
CONN_STR  = os.environ["PRODUCER_CONN_STR"]  # nunca hardcode

conf = {
    "bootstrap.servers": f"{NAMESPACE}:9093",
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "PLAIN",
    "sasl.username": "$ConnectionString",
    "sasl.password": CONN_STR,
}

producer = Producer(conf)

def delivery_report(err, msg):
    if err:
        print(f"Erro na entrega: {err}")
    else:
        print(f"Mensagem entregue: partition={msg.partition()}, offset={msg.offset()}")

for i in range(10):
    pedido = {"pedido_id": i, "produto": "notebook", "quantidade": 1}
    producer.produce(
        topic=EVENTHUB,
        value=json.dumps(pedido).encode("utf-8"),
        key=str(i).encode("utf-8"),
        callback=delivery_report,
    )
    producer.poll(0)

producer.flush()
print("Produção concluída.")

Passo 5 — Consumidor Python

O consumidor Python kafka no Azure Event Hubs usa o consumer group cg-python-app provisionado pelo Terraform. O auto.offset.reset=earliest garante que o consumidor leia desde o início quando não há offset salvo para o grupo.

import os
from confluent_kafka import Consumer, KafkaError

NAMESPACE = "evhns-kafka-blog-castilho-eus.servicebus.windows.net"
EVENTHUB  = "evh-pedidos-blog-castilho-eus"
CONN_STR  = os.environ["CONSUMER_CONN_STR"]

conf = {
    "bootstrap.servers": f"{NAMESPACE}:9093",
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "PLAIN",
    "sasl.username": "$ConnectionString",
    "sasl.password": CONN_STR,
    "group.id": "cg-python-app",
    "auto.offset.reset": "earliest",
}

consumer = Consumer(conf)
consumer.subscribe([EVENTHUB])

try:
    while True:
        msg = consumer.poll(timeout=5.0)
        if msg is None:
            print("Aguardando mensagens...")
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print(f"Fim da partição {msg.partition()}")
            else:
                print(f"Erro: {msg.error()}")
        else:
            print(f"Recebido: partition={msg.partition()}, offset={msg.offset()}, value={msg.value().decode('utf-8')}")
finally:
    consumer.close()

Executar o teste de ponta a ponta

Para testar o Python kafka no Azure Event Hubs de ponta a ponta, defina as variáveis de ambiente e execute os dois scripts em terminais separados:

# Terminal 1 — consumidor (inicie primeiro)
export CONSUMER_CONN_STR="$(az eventhubs eventhub authorization-rule keys list \
  --resource-group rg-kafka-blog-castilho-eus \
  --namespace-name evhns-kafka-blog-castilho-eus \
  --eventhub-name evh-pedidos-blog-castilho-eus \
  --name rule-consumer-pedidos --query primaryConnectionString -o tsv)"
python consumer.py

# Terminal 2 — produtor
export PRODUCER_CONN_STR="$(az eventhubs eventhub authorization-rule keys list \
  --resource-group rg-kafka-blog-castilho-eus \
  --namespace-name evhns-kafka-blog-castilho-eus \
  --eventhub-name evh-pedidos-blog-castilho-eus \
  --name rule-producer-pedidos --query primaryConnectionString -o tsv)"
python producer.py

O consumidor vai exibir as 10 mensagens de pedido produzidas, com a partição e o offset de cada uma. Esse é o fluxo básico do Python kafka no Azure Event Hubs funcionando.

Troubleshooting

Erro: SASL authentication failure — verifique se a connection string está completa e se a auth rule tem a permissão correta (send para produtor, listen para consumidor). A connection string expira se a chave for regenerada no portal.

Erro: Topic not found — no Event Hubs, o topic name deve ser exatamente o nome do event hub, incluindo o sufixo de região. Verifique se o EVENTHUB no código está correto.

Consumidor não recebe mensagens — se o produtor rodou antes do consumidor e o auto.offset.reset não está como earliest, o consumidor pode estar lendo de um offset futuro. Redefina o consumer group ou configure auto.offset.reset=earliest.

Próximos passos

Com o Python kafka no Azure Event Hubs produzindo e consumindo mensagens, o próximo artigo adiciona o Kafka Connect no Azure Container Instances — um conector que integra o Event Hubs com outros sistemas sem escrever código de produtor/consumidor.

Série: Kafka no Azure com Event Hubs
  • 📖 Art. 01: Kafka no Azure com Event Hubs
  • 📖 Art. 02: Azure Event Hubs com Terraform
  • ⚙️ Art. 03 (este): Python e Kafka no Azure Event Hubs
  • 🔒 Art. 04: Kafka Connect no Azure Container Instances
  • 🔒 Art. 05: Schema Registry no Azure Event Hubs
  • 🔒 Art. 06: Monitoramento Kafka com Grafana no Azure

Interessado em saber mais sobre artigos relacionados ao Microsoft Azure CLIQUE AQUI

🚀 Vamos nos conectar?

Não perca nenhuma oportunidade! Cadastre-se nas minhas redes e no canal do YouTube para receber conteúdos de TI, Cloud, Azure, Kubernetes e DevOps em primeira mão.

Dica: No Facebook, todos os artigos do blog são publicados automaticamente. Vale a pena curtir!


💬 Dúvidas ou Problemas?

Com o intuito de ajudar a comunidade, caso você tenha dúvidas ou encontre problemas na execução dos comandos deste artigo, deixe um comentário abaixo. Responderei o mais breve possível!

Muito obrigado pela visita e até o próximo post!

Jefferson Castilho Especialista em Cloud & DevOps.

Este guia técnico é exclusivo do Blog do Castilho. Explore nossa para mais conteúdos sobre IA e Cloud.

 

Deixe uma resposta

Rolar para cima

Descubra mais sobre Blog do Castilho - Tecnologia | FinOps | DevOps | Cloud

Assine agora mesmo para continuar lendo e ter acesso ao arquivo completo.

Continue reading