From b57a0eac3c451b932ed73c2b6b5f3e9b1893aa6f Mon Sep 17 00:00:00 2001 From: AyoubHmd Date: Mon, 15 Jun 2026 11:42:57 +0200 Subject: [PATCH] fix(submissions): prevent silent submission losses --- compute_worker/compute_worker.py | 22 ++++++++-- docker-compose.yml | 10 +++-- src/apps/competitions/tasks.py | 72 +++++++++++++++++++++++++++++++- src/settings/base.py | 19 +++++++-- 4 files changed, 111 insertions(+), 12 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index c02abcb0f..05f562a75 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -499,12 +499,19 @@ def __init__(self, run_args): websocket_scheme = "ws" if submission_api_url_parsed.scheme == "http" else "wss" self.websocket_url = f"{websocket_scheme}://{websocket_host}/submission_input/{self.user_pk}/{self.submission_id}/{self.secret}/" - # Nice requests adapter with generous retries/etc. + # M1.B: urllib3.Retry skips PATCH by default. The submission status + # callback uses PATCH, so without explicitly allowing it any 5xx / + # transient timeout on the final PATCH status=Finished was lost, + # leaving the submission stuck in Scoring forever. self.requests_session = requests.Session() adapter = requests.adapters.HTTPAdapter( max_retries=Retry( - total=3, - backoff_factor=1, + total=5, + backoff_factor=2, + status_forcelist=(500, 502, 503, 504), + allowed_methods=frozenset( + ("GET", "POST", "PUT", "DELETE", "HEAD", "OPTIONS", "PATCH") + ), ) ) self.requests_session.mount("http://", adapter) @@ -639,8 +646,15 @@ def _update_status(self, status, extra_information=None): try: self._update_submission(data) except Exception as e: - # Always catch exception and never raise error logger.exception(f"Failed to update submission status to {status}: {e}") + # M1.B: when transitioning to the terminal Finished state, raise + # so Celery (task_acks_late=True) requeues the job instead of + # silently leaving the submission stuck in Scoring. Intermediate + # states (Running, Scoring) stay best-effort to avoid killing a + # scoring run for a transient network glitch. Failed already has + # a raise in every caller, so the requeue path is covered. + if status == SubmissionStatus.FINISHED: + raise def _get_container_image(self, image_name): logger.info("Running pull for image: {}".format(image_name)) diff --git a/docker-compose.yml b/docker-compose.yml index f5b32b753..69cf1e3f1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -200,7 +200,7 @@ services: #---------------------------------------------------------------------------------------------------- site_worker: # This auto-reloads - command: ["watchmedo auto-restart -p '*.py' --recursive -- celery -A celery_config worker -B -Q site-worker -l info -n site-worker@%n --concurrency=2"] + command: ["celery -A celery_config worker -B -Q site-worker,celery -l info -n site-worker@%n --concurrency=2"] working_dir: /app/src container_name: site_worker image: django_site-worker @@ -219,9 +219,11 @@ services: deploy: resources: limits: - # Limit memory substantially here so we see any problems that may - # appear on Heroku ahead of time - memory: 256M + # Bumped from 256M: with --concurrency=2 prefork the two ForkPoolWorker + # processes plus master + beat exceed 256M and get SIGKILLed by the + # cgroup, silently losing run_submission tasks (orphan Submitting, + # celery_task_id=None). See incident finding F3. + memory: 1G compute_worker: command: ["celery -A compute_worker worker -l info -Q compute-worker -n compute-worker@%n"] diff --git a/src/apps/competitions/tasks.py b/src/apps/competitions/tasks.py index 07acb270a..fb7978524 100644 --- a/src/apps/competitions/tasks.py +++ b/src/apps/competitions/tasks.py @@ -366,7 +366,7 @@ def mark_status_as_failed_and_delete_dataset(competition_creation_status, detail try: with NamedTemporaryFile(mode="w+b") as temp_file: logger.info(f"Download competition bundle: {competition_dataset.data_file.name}") - competition_bundle_url = make_url_sassy(competition_dataset.data_file.name) + competition_bundle_url = make_url_sassy(competition_dataset.data_file.url) try: with requests.get(competition_bundle_url, stream=True) as r: r.raise_for_status() @@ -806,6 +806,76 @@ def submission_status_cleanup(): sub.cancel(status=Submission.FAILED) +@app.task(queue='site-worker', soft_time_limit=60 * 5) +def reaper_stuck_scoring(threshold_minutes=None): + """M1 watchdog — re-dispatch submissions that have been stuck in a + pipeline state (Scoring/Running) past a reasonable threshold. + + A submission is considered stuck when: + * status is Scoring or Running + * started_when (or created_when as fallback) is older than + ``execution_time_limit + threshold_minutes`` minutes ago + + For each stuck submission we: + 1) annotate ``status_details`` with a reaper marker so the trace is + visible in /admin and via the API, + 2) call ``run_submission(pk)`` to ask Celery to dispatch a fresh + attempt. The worker is responsible for idempotent handling. + + Intended to be wired into ``CELERY_BEAT_SCHEDULE`` every 5-10 minutes + once the team is confident in the reaping policy. Until then it can be + invoked manually from the Django shell: + + from competitions.tasks import reaper_stuck_scoring + reaper_stuck_scoring.delay(threshold_minutes=30) + """ + threshold_minutes = int(threshold_minutes or 30) + threshold = timedelta(minutes=threshold_minutes) + cutoff = now() - threshold + + candidates = ( + Submission.objects + .filter(status__in=(Submission.SCORING, Submission.RUNNING)) + .filter(has_children=False) + .select_related('phase', 'parent') + ) + + reaped = 0 + for sub in candidates: + anchor = sub.started_when or sub.created_when + if anchor is None or anchor > cutoff: + continue + # Phase execution_time_limit is in milliseconds; allow it as headroom. + exec_limit_ms = getattr(sub.phase, 'execution_time_limit', 0) or 0 + anchor_with_headroom = anchor + timedelta(milliseconds=exec_limit_ms) + if anchor_with_headroom > cutoff: + continue + + marker = ( + f'[reaper_stuck_scoring] re-dispatched at {now().isoformat()} ' + f'(stuck in {sub.status} since {anchor.isoformat()}, ' + f'threshold={threshold_minutes}m)' + ) + sub.status_details = ((sub.status_details or '') + '\n' + marker).strip() + sub.save(update_fields=['status_details']) + + try: + run_submission(sub.pk, is_scoring=(sub.status == Submission.SCORING)) + reaped += 1 + logger.warning( + 'reaper_stuck_scoring: re-dispatched submission pk=%s status=%s ' + 'anchor=%s', sub.pk, sub.status, anchor.isoformat(), + ) + except Exception: + logger.exception( + 'reaper_stuck_scoring: failed to re-dispatch pk=%s', sub.pk, + ) + + if reaped: + logger.warning('reaper_stuck_scoring: re-dispatched %s submission(s)', reaped) + return reaped + + # ------------------------------------------------- def _broadcast_worker_state(payload): channel_layer = get_channel_layer() diff --git a/src/settings/base.py b/src/settings/base.py index 47c76ad6b..35035bfa4 100644 --- a/src/settings/base.py +++ b/src/settings/base.py @@ -276,9 +276,22 @@ 'task': 'profiles.tasks.clean_non_activated_users', 'schedule': timedelta(days=1), # Run every 24 hours }, - "refresh_compute_worker_health": { - "task": "competitions.tasks.refresh_compute_worker_health", - "schedule": 60, + # F2: disabled — task 'competitions.tasks.refresh_compute_worker_health' + # does not exist anywhere in the codebase. Sending it every 60 s caused + # a KeyError in the prefork consumer that silently broke the channel and + # blocked run_submission dispatch. + # "refresh_compute_worker_health": { + # "task": "competitions.tasks.refresh_compute_worker_health", + # "schedule": 60, + # }, + # M1.A: Django-side watchdog. Re-dispatches submissions stuck in + # Scoring/Running past `threshold_minutes` (default 30) of inactivity. + # Complements M1.B (compute_worker side) so even if a worker dies + # mid-scoring, the submission is recovered within ~`schedule` minutes. + 'reaper_stuck_scoring': { + 'task': 'competitions.tasks.reaper_stuck_scoring', + 'schedule': timedelta(minutes=5), + 'kwargs': {'threshold_minutes': 30}, }, } CELERY_TIMEZONE = 'UTC'