""" Celery configuration for Nexus v5. Uses Redis as both broker and result backend (separate DB from Channels). """ import os from celery import Celery from celery.backends.redis import SentinelBackend class FixedSentinelBackend(SentinelBackend): """ Fixes Celery bug where SentinelBackend._params_from_url() doesn't copy 'username' from URL params, breaking Redis/Valkey ACL authentication. Celery only copies 'db' and 'password' but forgets 'username'. """ def _params_from_url(self, url, defaults): connparams = super()._params_from_url(url, defaults) # Fix: parent only copies 'db' and 'password', missing 'username' if connparams.get('hosts') and 'username' in connparams['hosts'][0]: connparams['username'] = connparams['hosts'][0]['username'] return connparams # Set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings') # Create Celery app app = Celery('nexus') # Load configuration from Django settings, using a "CELERY_" prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django apps. app.autodiscover_tasks() # Import tasks after Django setup to ensure they're registered from django.conf import settings if settings.configured: try: from core.tasks import notifications, event_cleanup except ImportError: pass @app.task(bind=True, ignore_result=True) def debug_task(self): """Debug task for testing Celery setup""" print(f'Request: {self.request!r}')