Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions compute_worker/compute_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,12 +499,19 @@ def __init__(self, run_args):
websocket_scheme = "ws" if submission_api_url_parsed.scheme == "http" else "wss"
self.websocket_url = f"{websocket_scheme}://{websocket_host}/submission_input/{self.user_pk}/{self.submission_id}/{self.secret}/"

# Nice requests adapter with generous retries/etc.
# M1.B: urllib3.Retry skips PATCH by default. The submission status
# callback uses PATCH, so without explicitly allowing it any 5xx /
# transient timeout on the final PATCH status=Finished was lost,
# leaving the submission stuck in Scoring forever.
self.requests_session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
max_retries=Retry(
total=3,
backoff_factor=1,
total=5,
backoff_factor=2,
status_forcelist=(500, 502, 503, 504),
allowed_methods=frozenset(
("GET", "POST", "PUT", "DELETE", "HEAD", "OPTIONS", "PATCH")
),
)
)
self.requests_session.mount("http://", adapter)
Expand Down Expand Up @@ -639,8 +646,15 @@ def _update_status(self, status, extra_information=None):
try:
self._update_submission(data)
except Exception as e:
# Always catch exception and never raise error
logger.exception(f"Failed to update submission status to {status}: {e}")
# M1.B: when transitioning to the terminal Finished state, raise
# so Celery (task_acks_late=True) requeues the job instead of
# silently leaving the submission stuck in Scoring. Intermediate
# states (Running, Scoring) stay best-effort to avoid killing a
# scoring run for a transient network glitch. Failed already has
# a raise in every caller, so the requeue path is covered.
if status == SubmissionStatus.FINISHED:
raise

def _get_container_image(self, image_name):
logger.info("Running pull for image: {}".format(image_name))
Expand Down
10 changes: 6 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ services:
#----------------------------------------------------------------------------------------------------
site_worker:
# This auto-reloads
command: ["watchmedo auto-restart -p '*.py' --recursive -- celery -A celery_config worker -B -Q site-worker -l info -n site-worker@%n --concurrency=2"]
command: ["celery -A celery_config worker -B -Q site-worker,celery -l info -n site-worker@%n --concurrency=2"]
working_dir: /app/src
container_name: site_worker
image: django_site-worker
Expand All @@ -219,9 +219,11 @@ services:
deploy:
resources:
limits:
# Limit memory substantially here so we see any problems that may
# appear on Heroku ahead of time
memory: 256M
# Bumped from 256M: with --concurrency=2 prefork the two ForkPoolWorker
# processes plus master + beat exceed 256M and get SIGKILLed by the
# cgroup, silently losing run_submission tasks (orphan Submitting,
# celery_task_id=None). See incident finding F3.
memory: 1G

compute_worker:
command: ["celery -A compute_worker worker -l info -Q compute-worker -n compute-worker@%n"]
Expand Down
72 changes: 71 additions & 1 deletion src/apps/competitions/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ def mark_status_as_failed_and_delete_dataset(competition_creation_status, detail
try:
with NamedTemporaryFile(mode="w+b") as temp_file:
logger.info(f"Download competition bundle: {competition_dataset.data_file.name}")
competition_bundle_url = make_url_sassy(competition_dataset.data_file.name)
competition_bundle_url = make_url_sassy(competition_dataset.data_file.url)
try:
with requests.get(competition_bundle_url, stream=True) as r:
r.raise_for_status()
Expand Down Expand Up @@ -806,6 +806,76 @@ def submission_status_cleanup():
sub.cancel(status=Submission.FAILED)


@app.task(queue='site-worker', soft_time_limit=60 * 5)
def reaper_stuck_scoring(threshold_minutes=None):
"""M1 watchdog — re-dispatch submissions that have been stuck in a
pipeline state (Scoring/Running) past a reasonable threshold.

A submission is considered stuck when:
* status is Scoring or Running
* started_when (or created_when as fallback) is older than
``execution_time_limit + threshold_minutes`` minutes ago

For each stuck submission we:
1) annotate ``status_details`` with a reaper marker so the trace is
visible in /admin and via the API,
2) call ``run_submission(pk)`` to ask Celery to dispatch a fresh
attempt. The worker is responsible for idempotent handling.

Intended to be wired into ``CELERY_BEAT_SCHEDULE`` every 5-10 minutes
once the team is confident in the reaping policy. Until then it can be
invoked manually from the Django shell:

from competitions.tasks import reaper_stuck_scoring
reaper_stuck_scoring.delay(threshold_minutes=30)
"""
threshold_minutes = int(threshold_minutes or 30)
threshold = timedelta(minutes=threshold_minutes)
cutoff = now() - threshold

candidates = (
Submission.objects
.filter(status__in=(Submission.SCORING, Submission.RUNNING))
.filter(has_children=False)
.select_related('phase', 'parent')
)

reaped = 0
for sub in candidates:
anchor = sub.started_when or sub.created_when
if anchor is None or anchor > cutoff:
continue
# Phase execution_time_limit is in milliseconds; allow it as headroom.
exec_limit_ms = getattr(sub.phase, 'execution_time_limit', 0) or 0
anchor_with_headroom = anchor + timedelta(milliseconds=exec_limit_ms)
if anchor_with_headroom > cutoff:
continue

marker = (
f'[reaper_stuck_scoring] re-dispatched at {now().isoformat()} '
f'(stuck in {sub.status} since {anchor.isoformat()}, '
f'threshold={threshold_minutes}m)'
)
sub.status_details = ((sub.status_details or '') + '\n' + marker).strip()
sub.save(update_fields=['status_details'])

try:
run_submission(sub.pk, is_scoring=(sub.status == Submission.SCORING))
reaped += 1
logger.warning(
'reaper_stuck_scoring: re-dispatched submission pk=%s status=%s '
'anchor=%s', sub.pk, sub.status, anchor.isoformat(),
)
except Exception:
logger.exception(
'reaper_stuck_scoring: failed to re-dispatch pk=%s', sub.pk,
)

if reaped:
logger.warning('reaper_stuck_scoring: re-dispatched %s submission(s)', reaped)
return reaped


# -------------------------------------------------
def _broadcast_worker_state(payload):
channel_layer = get_channel_layer()
Expand Down
19 changes: 16 additions & 3 deletions src/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,22 @@
'task': 'profiles.tasks.clean_non_activated_users',
'schedule': timedelta(days=1), # Run every 24 hours
},
"refresh_compute_worker_health": {
"task": "competitions.tasks.refresh_compute_worker_health",
"schedule": 60,
# F2: disabled — task 'competitions.tasks.refresh_compute_worker_health'
# does not exist anywhere in the codebase. Sending it every 60 s caused
# a KeyError in the prefork consumer that silently broke the channel and
# blocked run_submission dispatch.
# "refresh_compute_worker_health": {
# "task": "competitions.tasks.refresh_compute_worker_health",
# "schedule": 60,
# },
# M1.A: Django-side watchdog. Re-dispatches submissions stuck in
# Scoring/Running past `threshold_minutes` (default 30) of inactivity.
# Complements M1.B (compute_worker side) so even if a worker dies
# mid-scoring, the submission is recovered within ~`schedule` minutes.
'reaper_stuck_scoring': {
'task': 'competitions.tasks.reaper_stuck_scoring',
'schedule': timedelta(minutes=5),
'kwargs': {'threshold_minutes': 30},
},
}
CELERY_TIMEZONE = 'UTC'
Expand Down