Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions src/murfey/server/feedback.py
Original file line number Diff line number Diff line change
Expand Up @@ -2113,7 +2113,9 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None:
if bfactors_registered:
murfey.server._transport_object.transport.ack(header)
else:
murfey.server._transport_object.transport.nack(header)
murfey.server._transport_object.transport.nack(
header, requeue=False
)
return None
elif message["register"] == "done_bfactor":
_save_bfactor(message, _db)
Expand Down Expand Up @@ -2144,8 +2146,8 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None:
else:
# Send it directly to DLQ without trying to rerun it
murfey.server._transport_object.transport.nack(
header, requeue=result.get("requeue", False)
)
header, requeue=False
) # should be result.get("requeue", False)
if not result:
logger.error(
f"Workflow {sanitise(message['register'])} returned {result}"
Expand All @@ -2160,18 +2162,24 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None:
_db.close()
logger.warning("Murfey database required a rollback")
if murfey.server._transport_object:
murfey.server._transport_object.transport.nack(header, requeue=True)
murfey.server._transport_object.transport.nack(
header, requeue=False
) # should be True
except OperationalError:
logger.warning("Murfey database error encountered", exc_info=True)
time.sleep(1)
if murfey.server._transport_object:
murfey.server._transport_object.transport.nack(header, requeue=True)
murfey.server._transport_object.transport.nack(
header, requeue=False
) # should be True
except NoResultFound:
# Missing rows might be due to a race condition and should be requeued
logger.warning("No matching database row was found", exc_info=True)
time.sleep(1)
if murfey.server._transport_object:
murfey.server._transport_object.transport.nack(header, requeue=True)
murfey.server._transport_object.transport.nack(
header, requeue=False
) # should be True
except Exception:
logger.warning(
"Exception encountered in server RabbitMQ callback", exc_info=True
Expand Down