chore: implement base consumer

This commit is contained in:
sriramveeraghanta
2025-12-15 21:43:05 +05:30
parent 6b160d980f
commit ccd48db2de
12 changed files with 359 additions and 171 deletions

View File

@@ -0,0 +1,12 @@
#!/bin/bash
set -e
python manage.py wait_for_db
# Wait for migrations
python manage.py wait_for_migrations
# Run the processes
python manage.py webhook_consumer \
--queue ${WEBHOOK_QUEUE_NAME:-plane.webhook} \
--prefetch ${WEBHOOK_PREFETCH_COUNT:-10}

View File

@@ -0,0 +1,185 @@
import sys
import time
import os
import pika
import signal
from django.conf import settings
from django.db import close_old_connections, connection
from django.db.utils import OperationalError
from plane.utils.logging import logger
from plane.utils.exception_logger import log_exception
class BaseConsumer:
"""Simple automation consumer for processing events from plane_event_stream."""
def __init__(
self,
queue_name: str = None,
prefetch_count: int = 10,
consumer_time_limit: float = 1.0,
consumer_backoff_delay: float = 0.1,
):
"""Initialize the base consumer."""
self.queue_name = queue_name
if not self.queue_name:
raise ValueError("queue_name must be provided")
self.exchange_name = "plane.event_stream"
self.queue_message_ttl = os.environ.get("EVENT_STREAM_QUEUE_MESSAGE_TTL", 3600000)
self.dlq_message_ttl = os.environ.get("EVENT_STREAM_DLQ_MESSAGE_TTL", 604800000)
self.consumer_time_limit = consumer_time_limit
self.consumer_backoff_delay = consumer_backoff_delay
self.dlq_name = f"{self.queue_name}_dlq"
self.prefetch_count = prefetch_count
# Consumer state
self._should_stop = False
self._consumer_tag = None
self._inflight_messages = 0
# Setup
self._setup_connection_params()
self._setup_signal_handlers()
logger.info(f"BaseConsumer initialized for queue '{self.queue_name}'")
def _setup_connection_params(self):
"""Set up RabbitMQ connection parameters."""
if hasattr(settings, "AMQP_URL") and settings.AMQP_URL:
self.connection_params = pika.URLParameters(settings.AMQP_URL)
else:
host = getattr(settings, "RABBITMQ_HOST", "localhost")
port = int(getattr(settings, "RABBITMQ_PORT", 5672))
username = getattr(settings, "RABBITMQ_USER", "guest")
password = getattr(settings, "RABBITMQ_PASSWORD", "guest")
virtual_host = getattr(settings, "RABBITMQ_VHOST", "/")
self.connection_params = pika.ConnectionParameters(
host=host,
port=port,
virtual_host=virtual_host,
credentials=pika.PlainCredentials(username, password),
heartbeat=600,
)
def _setup_signal_handlers(self):
"""Set up signal handlers for graceful shutdown."""
def signal_handler(signum, frame):
logger.info("Received shutdown signal, stopping consumer...")
self._should_stop = True
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
def ensure_database_connection(self):
"""Ensure database connection is alive, reconnect if necessary."""
try:
if not connection.is_usable():
close_old_connections()
except Exception as e:
logger.warning(f"Database connection lost, reconnecting... {e}")
close_old_connections()
def _setup_queues(self, channel):
"""Set up the automation queue bound to plane_event_stream exchange."""
# Connect to existing exchange
channel.exchange_declare(
exchange=self.exchange_name,
exchange_type="fanout",
durable=True,
passive=False, # Declare/create if it doesn't exist
)
# DLQ with 7-day TTL to match existing configuration
channel.queue_declare(
queue=self.dlq_name,
durable=True,
arguments={
"x-message-ttl": self.dlq_message_ttl, # 7 days TTL
},
)
# Main queue with DLQ setup to match existing configuration
channel.queue_declare(
queue=self.queue_name,
durable=True,
arguments={
"x-dead-letter-exchange": self.dlq_exchange_name,
"x-dead-letter-routing-key": self.dlq_name,
"x-message-ttl": self.queue_message_ttl, # 1 hour TTL
},
)
# Bind to fanout exchange
channel.queue_bind(exchange=self.exchange_name, queue=self.queue_name)
logger.info(f"Queue '{self.queue_name}' bound to exchange '{self.exchange_name}'")
def process_message(self, ch, method, properties, body):
"""
Process a message from the queue.
Implement this method in the subclass.
"""
pass
def start_consuming(self):
"""Start consuming automation events."""
retry_count = 0
max_retries = 3
while not self._should_stop and retry_count < max_retries:
try:
with pika.BlockingConnection(self.connection_params) as connection:
with connection.channel() as channel:
channel.basic_qos(prefetch_count=self.prefetch_count)
self._setup_queues(channel)
retry_count = 0 # Reset on successful connection
# Setup message consumer
def message_callback(ch, method, properties, body):
success = self.process_message(ch, method, properties, body)
if success:
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
self._consumer_tag = channel.basic_consume(
queue=self.queue_name, on_message_callback=message_callback
)
logger.info(f"Started consuming from '{self.queue_name}'")
# Consume messages
while not self._should_stop:
connection.process_data_events(time_limit=self.consumer_time_limit)
# Graceful shutdown
if self._consumer_tag:
channel.basic_cancel(self._consumer_tag)
# Wait for in-flight messages
while self._inflight_messages > 0:
time.sleep(self.consumer_backoff_delay)
except KeyboardInterrupt:
self._should_stop = True
except OperationalError:
logger.warning("Database connection lost permanently, stopping consumer")
sys.exit(1)
except Exception as e:
log_exception(e)
retry_count += 1
if retry_count < max_retries:
time.sleep(5 * retry_count) # Exponential backoff
logger.info("Consumer stopped")

View File

@@ -0,0 +1,5 @@
from django.apps import AppConfig
class AutomationConfig(AppConfig):
name = "plane.webhook"

View File

@@ -0,0 +1,70 @@
import pika
import signal
from django.conf import settings
from django.db import IntegrityError, close_old_connections, connection
from plane.utils.logging import logger
class WebhookConsumer:
"""Simple automation consumer for processing events from plane_event_stream."""
def __init__(self, queue_name: str = None, prefetch_count: int = 10):
"""Initialize the automation consumer."""
self.queue_name = queue_name or getattr(
settings,
"WEBHOOK_QUEUE_NAME",
"plane.event_stream.webhooks",
)
self.exchange_name = getattr(settings, "WEBHOOK_EXCHANGE_NAME", "plane.event_stream")
self.prefetch_count = prefetch_count
# Consumer state
self._should_stop = False
self._consumer_tag = None
self._inflight_messages = 0
# Setup
self._setup_connection_params()
self._setup_signal_handlers()
logger.info(f"WebhookConsumer initialized for queue '{self.queue_name}'")
def _setup_connection_params(self):
"""Set up RabbitMQ connection parameters."""
if hasattr(settings, "AMQP_URL") and settings.AMQP_URL:
self.connection_params = pika.URLParameters(settings.AMQP_URL)
else:
host = getattr(settings, "RABBITMQ_HOST", "localhost")
port = int(getattr(settings, "RABBITMQ_PORT", 5672))
username = getattr(settings, "RABBITMQ_USER", "guest")
password = getattr(settings, "RABBITMQ_PASSWORD", "guest")
virtual_host = getattr(settings, "RABBITMQ_VHOST", "/")
self.connection_params = pika.ConnectionParameters(
host=host,
port=port,
virtual_host=virtual_host,
credentials=pika.PlainCredentials(username, password),
heartbeat=600,
)
def _setup_signal_handlers(self):
"""Set up signal handlers for graceful shutdown."""
def signal_handler(signum, frame):
logger.info("Received shutdown signal, stopping consumer...")
self._should_stop = True
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
def ensure_database_connection(self):
"""Ensure database connection is alive, reconnect if necessary."""
try:
if not connection.is_usable():
close_old_connections()
except Exception as e:
logger.warning(f"Database connection lost, reconnecting... {e}")
close_old_connections()

View File

@@ -0,0 +1,68 @@
import os
from django.core.management.base import BaseCommand
from django.conf import settings
class WebhookConsumer(BaseCommand):
"""
Django management command to run the Webhook consumer.
This command starts the RabbitMQ consumer that processes Webhook events
from the plane_webhook_stream exchange and dispatches them to Celery tasks.
"""
help = "Run the Webhook consumer to process events from RabbitMQ"
def add_arguments(self, parser):
"""Add command line arguments."""
parser.add_argument(
"--queue",
type=str,
default=os.environ.get("WEBHOOK_QUEUE_NAME", "plane.webhook"),
help="RabbitMQ queue name to consume from (default: from Django settings)",
)
parser.add_argument(
"--prefetch",
type=int,
default=os.environ.get("WEBHOOK_PREFETCH_COUNT", 10),
help="Number of messages to prefetch (default: 10)",
)
def handle(self, *args, **options):
print("Consuming webhooks")
"""Handle the management command execution."""
# Get configuration info for display
if hasattr(settings, "AMQP_URL") and settings.AMQP_URL:
connection_info = f"AMQP URL: {settings.AMQP_URL}"
else:
host = getattr(settings, "RABBITMQ_HOST", "localhost")
port = getattr(settings, "RABBITMQ_PORT", "5672")
vhost = getattr(settings, "RABBITMQ_VHOST", "/")
connection_info = f"Host: {host}:{port}{vhost}"
# Get automation settings
queue_name = options["queue"] or getattr(
settings,
"WEBHOOK_QUEUE_NAME",
"plane.webhook",
)
exchange_name = getattr(settings, "WEBHOOK_EXCHANGE_NAME", "plane.webhook")
event_types = getattr(settings, "WEBHOOK_EVENT_TYPES", ["issue.created", "issue.updated", "issue.deleted"])
event_types_display = ", ".join(event_types)
# Display startup information
self.stdout.write(
self.style.SUCCESS(
f"Starting Webhook Consumer:\n"
f" Queue: {queue_name}\n"
f" Exchange: {exchange_name} (fanout)\n"
f" Event Types: {event_types_display}\n"
f" Prefetch: {options['prefetch']}\n"
f" {connection_info}"
)
)
consumer = WebhookConsumer(queue_name, options["prefetch"])
consumer.run()

View File

@@ -1,5 +0,0 @@
module plane/webhook
go 1.25.4
require github.com/rabbitmq/amqp091-go v1.10.0

View File

@@ -1,4 +0,0 @@
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=

View File

@@ -1,162 +0,0 @@
package main
import (
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"syscall"
amqp "github.com/rabbitmq/amqp091-go"
)
// Config holds RabbitMQ connection configuration
type Config struct {
Host string
Port string
User string
Password string
VHost string
Exchange string
}
// getConfig loads configuration from environment variables
func getConfig() *Config {
return &Config{
Host: getEnv("RABBITMQ_HOST", "localhost"),
Port: getEnv("RABBITMQ_PORT", "5672"),
User: getEnv("RABBITMQ_USER", "guest"),
Password: getEnv("RABBITMQ_PASSWORD", "guest"),
VHost: getEnv("RABBITMQ_VHOST", "/"),
Exchange: getEnv("WEBHOOK_EXCHANGE", "event_stream"),
}
}
// getEnv retrieves an environment variable or returns a default value
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
// connectRabbitMQ establishes a connection to RabbitMQ
func connectRabbitMQ(config *Config) (*amqp.Connection, error) {
amqpURL := fmt.Sprintf("amqp://%s:%s@%s:%s%s",
config.User,
config.Password,
config.Host,
config.Port,
config.VHost,
)
conn, err := amqp.Dial(amqpURL)
if err != nil {
return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
}
log.Printf("Successfully connected to RabbitMQ at %s:%s", config.Host, config.Port)
return conn, nil
}
// processMessage handles incoming messages from RabbitMQ
func processMessage(delivery amqp.Delivery) {
log.Printf("Received message: %s", string(delivery.Body))
// Try to parse as JSON for better logging
var msg map[string]interface{}
if err := json.Unmarshal(delivery.Body, &msg); err == nil {
log.Printf("Message parsed as JSON: %+v", msg)
}
// TODO: Add your webhook processing logic here
// For example:
// - Parse the webhook event
// - Send HTTP request to webhook URL
// - Handle retries and error logging
// Acknowledge the message
if err := delivery.Ack(false); err != nil {
log.Printf("Error acknowledging message: %v", err)
} else {
log.Printf("Message acknowledged successfully")
}
}
// consumeMessages starts consuming messages from the queue
func consumeMessages(ch *amqp.Channel, queue amqp.Queue) error {
msgs, err := ch.Consume(
queue.Name, // queue
"", // consumer tag (empty = auto-generated)
false, // auto-ack (set to false for manual acknowledgment)
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("failed to register consumer: %w", err)
}
log.Printf("Waiting for messages. To exit press CTRL+C")
// Process messages in a goroutine
go func() {
for delivery := range msgs {
processMessage(delivery)
}
}()
return nil
}
// handleShutdown gracefully shuts down the consumer
func handleShutdown(conn *amqp.Connection, ch *amqp.Channel) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
<-sigChan
log.Println("Shutting down gracefully...")
if ch != nil {
ch.Close()
}
if conn != nil {
conn.Close()
}
log.Println("Shutdown complete")
os.Exit(0)
}
func main() {
log.Println("Starting RabbitMQ webhook consumer...")
// Load configuration
config := getConfig()
log.Printf("Configuration loaded: Exchange=%s, Host=%s:%s", config.Exchange, config.Host, config.Port)
// Connect to RabbitMQ
conn, err := connectRabbitMQ(config)
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
// Create a channel
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open channel: %v", err)
}
defer ch.Close()
// Start consuming messages
if err := consumeMessages(ch, queue); err != nil {
log.Fatalf("Failed to start consuming messages: %v", err)
}
// Handle graceful shutdown
handleShutdown(conn, ch)
}

Binary file not shown.

View File

@@ -158,6 +158,25 @@ services:
- plane-db
- plane-mq
webhook-consumer:
build:
context: ./apps/api
dockerfile: Dockerfile.dev
args:
DOCKER_BUILDKIT: 1
restart: "no"
networks:
- dev_env
volumes:
- ./apps/api:/code
command: ./bin/docker-entrypoint-webhook-consumer.sh --settings=plane.settings.local
env_file:
- ./apps/api/.env
depends_on:
- plane-db
- plane-redis
- plane-mq
volumes:
redisdata:
uploads: