diff --git a/adobe-autotag-container/adobe_autotag_processor.py b/adobe-autotag-container/adobe_autotag_processor.py index 0afd7185..14b6673b 100644 --- a/adobe-autotag-container/adobe_autotag_processor.py +++ b/adobe-autotag-container/adobe_autotag_processor.py @@ -89,8 +89,68 @@ logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + +class BadPdfError(Exception): + """Raised when Adobe rejects the PDF as damaged or too complex (BAD_PDF / 400).""" + s3 = boto3.client('s3') +# Pages per chunk -- must match the PDF splitter Lambda so reported page ranges +# are accurate. Each chunk N covers pages (N-1)*PAGES_PER_CHUNK+1 .. N*PAGES_PER_CHUNK. +PAGES_PER_CHUNK = int(os.environ.get('PAGES_PER_CHUNK', '200')) + + +def _chunk_index_from_key(chunk_key): + """Extract the 1-based chunk index from a key like '.../_chunk_8.pdf'.""" + try: + match = re.search(r'_chunk_(\d+)\.pdf$', chunk_key or '') + return int(match.group(1)) if match else None + except Exception: + return None + + +def report_failure(bucket_name, file_base_name, chunk_key, reason_category, message): + """Write a structured failure-detail file the Step Functions failure-handler + aggregates into the user-facing result/FAILED_.json marker. + + Station: 'adobe' (Adobe AutoTag/Extract). This carries the WHY (reason + category) and the WHERE (chunk index + page range) that only this station + knows. Best-effort and exception-proof: reporting a failure must never throw + a second failure that masks the original error. + """ + chunk_index = _chunk_index_from_key(chunk_key) + page_start = ((chunk_index - 1) * PAGES_PER_CHUNK + 1) if chunk_index else None + page_end = (chunk_index * PAGES_PER_CHUNK) if chunk_index else None + + # Structured CloudWatch line for the dashboard "File status" widget. + pages_desc = f" | chunk={chunk_index} | pages={page_start}-{page_end}" if chunk_index else "" + logger.error( + f"File: {file_base_name}, Status: FAILED | station=adobe | " + f"reason={reason_category}{pages_desc} | {message}" + ) + + if not bucket_name or not file_base_name: + return + detail = { + "station": "adobe", + "reason_category": reason_category, + "message": str(message)[:2000], + "chunk_index": chunk_index, + "page_start": page_start, + "page_end": page_end, + } + try: + suffix = chunk_index if chunk_index is not None else "unknown" + s3.put_object( + Bucket=bucket_name, + Key=f"temp/{file_base_name}/_errors/adobe_chunk_{suffix}.json", + Body=json.dumps(detail).encode("utf-8"), + ContentType="application/json", + ) + except Exception as e: # noqa: BLE001 - never mask the original failure + logger.error(f"Filename : {file_base_name} | Could not write failure detail: {e}") + + def download_file_from_s3(bucket_name,file_base_name, file_key, local_path): """ Download a file from an S3 bucket. @@ -245,9 +305,15 @@ def autotag_pdf_with_options(filename, client_id, client_secret): logging.info(f'Filename : {filename} | Adobe Autotag completed successfully') - except (ServiceApiException, ServiceUsageException, SdkException) as e: + except ServiceApiException as e: + if "BAD_PDF" in str(e) or "400" in str(e): + logging.warning(f'Filename : {filename} | Adobe AutoTag rejected PDF as damaged/too complex (BAD_PDF) — will use fallback path') + raise BadPdfError(str(e)) logging.error(f'Filename : {filename} | Adobe Autotag API failed: {e}') - raise # Re-raise to stop the container + raise + except (ServiceUsageException, SdkException) as e: + logging.error(f'Filename : {filename} | Adobe Autotag API failed: {e}') + raise def extract_api(filename, client_id, client_secret): """ Extracts text, tables, and figures from a PDF using Adobe PDF Services. @@ -634,17 +700,34 @@ def extract_images_from_excel(filename, figure_path, autotag_report_path, images f'{s3_folder_autotag}/{file_key}_temp_images_data.db') logging.info(f'Filename : {filename} | Uploaded SQLite DB to S3 With No Images') +def _write_empty_image_db(images_output_dir, bucket_name, s3_folder_autotag, file_key, file_base_name): + """Write an empty SQLite image DB so the alt-text step sees zero images and completes cleanly.""" + os.makedirs(images_output_dir, exist_ok=True) + db_path = os.path.join(images_output_dir, "temp_images_data.db") + conn = sqlite3.connect(db_path) + conn.execute(""" + CREATE TABLE IF NOT EXISTS image_data ( + objid TEXT, img_path TEXT, prev TEXT, current TEXT, next TEXT, context TEXT + ) + """) + conn.commit() + conn.close() + s3.upload_file(db_path, bucket_name, f'{s3_folder_autotag}/{file_key}_temp_images_data.db') + logging.info(f'Filename : {file_base_name} | Uploaded empty image DB (BAD_PDF fallback)') + + def main(): """ Main function that coordinates the downloading, processing, and uploading of PDF files and associated content. """ file_key = None file_base_name = None - - try: - bucket_name = os.getenv('S3_BUCKET_NAME') + s3_file_key = None + bucket_name = os.getenv('S3_BUCKET_NAME') + + try: s3_file_key = os.getenv('S3_FILE_KEY') - + if not bucket_name or not s3_file_key: logging.error("Error: S3_BUCKET_NAME and S3_FILE_KEY environment variables are required.") sys.exit(1) @@ -673,62 +756,86 @@ def main(): # Run Adobe Autotag API logging.info(f'Filename : {file_key} | Running Adobe Autotag API...') - autotag_pdf_with_options(filename, client_id, client_secret) + bad_pdf_fallback = False + try: + autotag_pdf_with_options(filename, client_id, client_secret) + except BadPdfError as e: + # Adobe cannot process this PDF (damaged / too complex). + # Fall back: use the viewer-prefs PDF as-is, treat all images as + # decorative (empty DB), and let the rest of the pipeline complete + # so the user gets output rather than a silent failure. + bad_pdf_fallback = True + logging.warning(f'Filename : {file_key} | BAD_PDF fallback active — skipping Adobe autotag/extract, all images treated as decorative') + s3_folder_autotag = f"temp/{file_base_name}/output_autotag" + images_output_dir = "output/zipfile/images" + # Upload the viewer-prefs PDF as the autotag output so downstream steps have a file + save_to_s3(filename, bucket_name, "output_autotag", file_base_name, file_key) + # Write an empty image DB so the alt-text step finds zero images and skips all of them + _write_empty_image_db(images_output_dir, bucket_name, s3_folder_autotag, file_key, file_base_name) + logging.info(f'Filename : {file_key} | BAD_PDF fallback: uploaded viewer-prefs PDF and empty image DB') - # Run Adobe Extract API - logging.info(f'Filename : {file_key} | Running Adobe Extract API...') - extract_api(filename, client_id, client_secret) + if not bad_pdf_fallback: + # Run Adobe Extract API + logging.info(f'Filename : {file_key} | Running Adobe Extract API...') + extract_api(filename, client_id, client_secret) - extract_api_zip_path = f"output/ExtractTextInfoFromPDF/extract${filename}.zip" - extract_to = f"output/zipfile/{filename}" - - logging.info(f'Filename : {file_key} | Unzipping extracted content...') - unzip_file(filename, extract_api_zip_path, extract_to) + extract_api_zip_path = f"output/ExtractTextInfoFromPDF/extract${filename}.zip" + extract_to = f"output/zipfile/{filename}" - with open(f"output/zipfile/{filename}/structuredData.json") as file: - data = json.load(file) + logging.info(f'Filename : {file_key} | Unzipping extracted content...') + unzip_file(filename, extract_api_zip_path, extract_to) - pdf_document = pymupdf.open(filename) + with open(f"output/zipfile/{filename}/structuredData.json") as file: + data = json.load(file) - # Add TOC entries - logging.info(f'Filename : {file_key} | Adding TOC entries...') - add_toc_to_pdf(filename, pdf_document, data) + pdf_document = pymupdf.open(filename) - pdf_document.saveIncr() - pdf_document.close() - - logging.info(f'Filename : {file_key} | Uploading processed PDF to S3...') - save_to_s3(filename, bucket_name, "output_autotag", file_base_name, file_key) + # Add TOC entries + logging.info(f'Filename : {file_key} | Adding TOC entries...') + add_toc_to_pdf(filename, pdf_document, data) - logging.info(f"PDF saved with updated metadata and TOC. File location: COMPLIANT_{file_key}") + pdf_document.saveIncr() + pdf_document.close() - figure_path = f"{extract_to}/figures" - autotag_report_path = f"output/AutotagPDF/{filename}.xlsx" - images_output_dir = "output/zipfile/images" + logging.info(f'Filename : {file_key} | Uploading processed PDF to S3...') + save_to_s3(filename, bucket_name, "output_autotag", file_base_name, file_key) + + logging.info(f"PDF saved with updated metadata and TOC. File location: COMPLIANT_{file_key}") + + figure_path = f"{extract_to}/figures" + autotag_report_path = f"output/AutotagPDF/{filename}.xlsx" + images_output_dir = "output/zipfile/images" + + s3_folder_autotag = f"temp/{file_base_name}/output_autotag" + + logging.info(f'Filename : {file_key} | Extracting and uploading images...') + extract_images_from_excel(filename, figure_path, autotag_report_path, images_output_dir, bucket_name, s3_folder_autotag, file_key) - s3_folder_autotag = f"temp/{file_base_name}/output_autotag" - - logging.info(f'Filename : {file_key} | Extracting and uploading images...') - extract_images_from_excel(filename, figure_path, autotag_report_path, images_output_dir, bucket_name, s3_folder_autotag, file_key) - logging.info(f'Filename : {file_key} | Processing completed successfully') logger.info(f"File: {file_base_name}, Status: Succeeded in First ECS task") + except BadPdfError: + # Already handled above via fallback path — should not reach here + pass except (ServiceApiException, ServiceUsageException, SdkException) as e: logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - Adobe API Error") logger.error(f"Filename : {file_key} | Adobe API Error: {e}") + report_failure(bucket_name, file_base_name, s3_file_key, "ADOBE_API", f"Adobe API error: {e}") sys.exit(1) except ClientError as e: logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - AWS Error") logger.error(f"Filename : {file_key} | AWS Error: {e}") + report_failure(bucket_name, file_base_name, s3_file_key, "INFRA", f"AWS error: {e}") sys.exit(1) except FileNotFoundError as e: logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - File Not Found") logger.error(f"Filename : {file_key} | File Not Found Error: {e}") + report_failure(bucket_name, file_base_name, s3_file_key, "ADOBE_API", f"Expected Adobe output not found: {e}") sys.exit(1) except Exception as e: logger.error(f"File: {file_base_name}, Status: Failed in First ECS task") logger.error(f"Filename : {file_key} | Unexpected Error: {e}") + report_failure(bucket_name, file_base_name, s3_file_key, "UNKNOWN", f"Unexpected error: {e}") sys.exit(1) if __name__ == "__main__": diff --git a/alt-text-generator-container/alt_text_generator.js b/alt-text-generator-container/alt_text_generator.js index 05e39a98..4b566c14 100644 --- a/alt-text-generator-container/alt_text_generator.js +++ b/alt-text-generator-container/alt_text_generator.js @@ -69,6 +69,87 @@ function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); } +// Pages per chunk -- must match the PDF splitter Lambda so reported page ranges +// are accurate. Chunk N covers pages (N-1)*PAGES_PER_CHUNK+1 .. N*PAGES_PER_CHUNK. +const PAGES_PER_CHUNK = parseInt(process.env.PAGES_PER_CHUNK || "200", 10); + +/** + * Writes a structured failure-detail file that the Step Functions failure-handler + * aggregates into the user-facing result/FAILED_.json marker. + * + * Station: 'alttext' (Bedrock alt-text generation). Carries the WHY (reason + * category) and WHERE (chunk index + page range). Best-effort and exception-proof: + * reporting a failure must never throw a second failure that masks the original. + */ +async function reportFailure(bucketName, fileBaseName, s3FileKey, reasonCategory, message) { + let chunkIndex = null; + const match = /_chunk_(\d+)\.pdf$/.exec(s3FileKey || ""); + if (match) chunkIndex = parseInt(match[1], 10); + const pageStart = chunkIndex ? (chunkIndex - 1) * PAGES_PER_CHUNK + 1 : null; + const pageEnd = chunkIndex ? chunkIndex * PAGES_PER_CHUNK : null; + + // Structured CloudWatch line for the dashboard "File status" widget. + const pagesDesc = chunkIndex ? ` | chunk=${chunkIndex} | pages=${pageStart}-${pageEnd}` : ""; + logger.error(`File: ${fileBaseName}, Status: FAILED | station=alttext | reason=${reasonCategory}${pagesDesc} | ${message}`); + + if (!bucketName || !fileBaseName) return; + const detail = { + station: "alttext", + reason_category: reasonCategory, + message: String(message).slice(0, 2000), + chunk_index: chunkIndex, + page_start: pageStart, + page_end: pageEnd, + }; + try { + const suffix = chunkIndex !== null ? chunkIndex : "unknown"; + await s3Client.send(new PutObjectCommand({ + Bucket: bucketName, + Key: `temp/${fileBaseName}/_errors/alttext_chunk_${suffix}.json`, + Body: JSON.stringify(detail), + ContentType: "application/json", + })); + } catch (e) { + logger.error(`Filename: ${fileBaseName} | Could not write failure detail: ${e.message || e}`); + } +} + +const MAX_ASPECT_RATIO = 20; + +function getImageDimensions(buffer) { + try { + if (buffer[0] === 0x89 && buffer[1] === 0x50 && buffer[2] === 0x4E && buffer[3] === 0x47) { + const width = buffer.readUInt32BE(16); + const height = buffer.readUInt32BE(20); + return { width, height }; + } + if (buffer[0] === 0xFF && buffer[1] === 0xD8) { + let offset = 2; + while (offset < buffer.length - 1) { + if (buffer[offset] !== 0xFF) break; + const marker = buffer[offset + 1]; + if (marker === 0xC0 || marker === 0xC2) { + const height = buffer.readUInt16BE(offset + 5); + const width = buffer.readUInt16BE(offset + 7); + return { width, height }; + } + const segmentLength = buffer.readUInt16BE(offset + 2); + offset += 2 + segmentLength; + } + } + return null; + } catch (e) { + return null; + } +} + +function isAspectRatioValid(dimensions) { + if (!dimensions || !dimensions.width || !dimensions.height) return true; + const { width, height } = dimensions; + const ratio = Math.max(width / height, height / width); + return ratio <= MAX_ASPECT_RATIO; +} + /** * Invokes the Bedrock AI model to generate alt text for a given image. @@ -492,6 +573,8 @@ async function startProcess() { logger.info(`Filename: ${filebasename} | imageObjects: ${imageObjects}`); logger.info(`Filename: ${filebasename} | Total images to process: ${imageObjects.length}`); + let skippedCount = 0; + for (const imageObject of imageObjects) { try { const getObjectParams = { @@ -502,7 +585,7 @@ async function startProcess() { logger.info(`Filename: ${filebasename} | Image Object Bucketname: ${bucketName}`); const command = new GetObjectCommand(getObjectParams); const { Body } = await s3Client.send(command); - + // Stream the body contents to a buffer const chunks = []; await pipeline(Body, async function* (source) { @@ -511,6 +594,16 @@ async function startProcess() { } }); const fileBuffer = Buffer.concat(chunks); + + const dimensions = getImageDimensions(fileBuffer); + if (dimensions && !isAspectRatioValid(dimensions)) { + skippedCount++; + const altText = "Decorative element"; + combinedResults[imageObject.id] = altText; + logger.info(`Filename: ${filebasename} | Skipping image ${imageObject.id} - aspect ratio ${Math.max(dimensions.width/dimensions.height, dimensions.height/dimensions.width).toFixed(1)}:1 exceeds ${MAX_ASPECT_RATIO}:1 limit (${dimensions.width}x${dimensions.height})`); + continue; + } + const localFilePath = path.join(__dirname, `${imageObject.path.split('/').pop()}`); logger.info(`Filename: ${filebasename} | Local File Path: ${localFilePath}`); fs_1.writeFileSync(localFilePath, fileBuffer); @@ -519,23 +612,25 @@ async function startProcess() { logger.info(`Filename: ${filebasename} | Response:${response}`); Object.assign(combinedResults, JSON.parse(response)); successCount++; - logger.info(`Filename: ${filebasename} | Alt text generation succeeded for image ${imageObject.id} (${successCount} succeeded, ${failureCount} failed)`); + logger.info(`Filename: ${filebasename} | Alt text generation succeeded for image ${imageObject.id} (${successCount} succeeded, ${failureCount} failed, ${skippedCount} skipped)`); } catch (error) { failureCount++; logger.error(`Filename: ${filebasename} | Alt text generation failed for image ${imageObject.id}: ${error.message || error}`); - logger.info(`Filename: ${filebasename} | Progress: ${successCount} succeeded, ${failureCount} failed`); + logger.info(`Filename: ${filebasename} | Progress: ${successCount} succeeded, ${failureCount} failed, ${skippedCount} skipped`); } await sleep(2000); } - // Check if we have any images and if all of them failed - if (imageObjects.length > 0 && successCount === 0) { + // Only fail if all images failed AND none were skipped (skipped images got default alt text) + if (imageObjects.length > 0 && successCount === 0 && skippedCount === 0) { logger.error(`Filename: ${filebasename} | All ${failureCount} alt text generation requests failed - likely due to throttling or Bedrock API issues`); logger.error(`File: ${filebasename}, Status: Failed in second ECS task - All Bedrock requests failed`); + await reportFailure(bucketName, filebasename, process.env.S3_FILE_KEY, "BEDROCK_API", + `All ${failureCount} Bedrock alt-text requests failed (throttling or Bedrock API issues).`); process.exit(1); } - logger.info(`Filename: ${filebasename} | Alt text generation complete: ${successCount} succeeded, ${failureCount} failed out of ${imageObjects.length} images`); + logger.info(`Filename: ${filebasename} | Alt text generation complete: ${successCount} succeeded, ${failureCount} failed, ${skippedCount} skipped (bad aspect ratio) out of ${imageObjects.length} images`); let defaultText = "No text available"; @@ -558,6 +653,8 @@ async function startProcess() { } catch (error) { logger.info(`File: ${filebasename}, Status: Error in second ECS task`); logger.error(`Filename: ${filebasename} | Error processing images: ${error}`); + await reportFailure(bucketName, filebasename, process.env.S3_FILE_KEY, "UNKNOWN", + `Error processing images: ${error.message || error}`); process.exit(1); } } diff --git a/app.py b/app.py index 1148d0bd..6369e00c 100644 --- a/app.py +++ b/app.py @@ -174,7 +174,8 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: cluster=pdf_remediation_cluster, task_definition=adobe_autotag_task_def, assign_public_ip=False, - + result_path="$.ecsResult", + container_overrides=[tasks.ContainerOverride( container_definition = adobe_autotag_container_def, environment=[ @@ -213,11 +214,11 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: environment=[ tasks.TaskEnvironmentVariable( name="S3_BUCKET_NAME", - value=sfn.JsonPath.string_at("$.Overrides.ContainerOverrides[0].Environment[0].Value") + value=sfn.JsonPath.string_at("$.s3_bucket") ), tasks.TaskEnvironmentVariable( name="S3_FILE_KEY", - value=sfn.JsonPath.string_at("$.Overrides.ContainerOverrides[0].Environment[1].Value") + value=sfn.JsonPath.string_at("$.s3_key") ), tasks.TaskEnvironmentVariable( name="AWS_REGION", @@ -366,6 +367,58 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: parallel_accessibility_workflow.branch(remediation_chain) parallel_accessibility_workflow.branch(pre_remediation_accessibility_checker_task) + # --------------------------------------------------------------------- + # Failure-handler: the single, exhaustive safety net for the workflow. + # + # The frontend detects a finished job by polling the S3 result/ folder. + # Previously, any failure left the workflow FAILED with nothing written + # to result/, so the UI polled forever ("silent failure"). This Lambda + # is wired to a Catch that fires on EVERY error -- in-code (Adobe/Bedrock/ + # complexity) and infrastructure (container can't start, OOM, timeout) -- + # and writes result/FAILED_.json carrying the reason category and + # the failing chunk/page range, so the UI can show the user what happened. + # It is invoked only on failure, so its cost is effectively zero. + # --------------------------------------------------------------------- + failure_handler_lambda = lambda_.Function( + self, 'WorkflowFailureHandlerLambda', + runtime=lambda_.Runtime.PYTHON_3_12, + handler='main.lambda_handler', + code=lambda_.Code.from_asset('lambda/failure-handler'), + timeout=Duration.seconds(120), + memory_size=256, + environment={ + 'BUCKET_NAME': pdf_processing_bucket.bucket_name, + 'PAGES_PER_CHUNK': '200', + } + ) + pdf_processing_bucket.grant_read_write(failure_handler_lambda) + failure_handler_lambda.add_to_role_policy(cloudwatch_metrics_policy) + + # Pass the original state ($) plus the execution ARN from the Step Functions + # context object ($$). The ARN lets the failure marker reference the exact + # failed execution for support/tracing. chunks/s3_bucket/failureInfo are + # carried through from the state so the handler can identify the file and + # the failure reason. + failure_handler_task = tasks.LambdaInvoke(self, "HandleWorkflowFailure", + lambda_function=failure_handler_lambda, + payload=sfn.TaskInput.from_object({ + "chunks": sfn.JsonPath.string_at("$.chunks"), + "s3_bucket": sfn.JsonPath.string_at("$.s3_bucket"), + "failureInfo": sfn.JsonPath.string_at("$.failureInfo"), + "executionArn": sfn.JsonPath.string_at("$$.Execution.Id"), + }), + output_path="$.Payload") + + # Route every failure of the parallel workflow to the failure handler. + # result_path keeps the original input (chunks, s3_bucket) intact and + # nests the Step Functions Error/Cause under $.failureInfo so the handler + # can recover both the file identity and the failure reason. + parallel_accessibility_workflow.add_catch( + failure_handler_task, + errors=["States.ALL"], + result_path="$.failureInfo" + ) + pdf_remediation_workflow_log_group = logs.LogGroup(self, "PdfRemediationWorkflowLogs", log_group_name="/aws/states/pdf-accessibility-remediation-workflow", retention=logs.RetentionDays.ONE_MONTH, diff --git a/docs/ERROR_HANDLING.md b/docs/ERROR_HANDLING.md new file mode 100644 index 00000000..76621337 --- /dev/null +++ b/docs/ERROR_HANDLING.md @@ -0,0 +1,119 @@ +# Error Handling & Failure Reporting + +This document describes how the PDF-to-PDF remediation pipeline reports +failures so that **no failure is ever silent**, and the contract the frontend +uses to detect and display those failures. + +## Background: why this exists + +The frontend detects a finished job by **polling the S3 `result/` folder**: + +- On success the pipeline writes `result/COMPLIANT_.pdf`. +- Previously, on **any** failure the workflow ended with nothing written to + `result/`. The UI kept polling indefinitely and the user saw the job "spin" + for hours with no explanation. + +The error-handling layer closes that gap by guaranteeing that every failed job +produces a **failure marker** in the same `result/` folder the frontend already +polls, carrying a human-readable reason and the location (chunk / page range). + +## The two-layer design + +``` +Each station (on failure) Step Functions Catch (catches EVERYTHING) + writes a detail file ───────► routes any error to the failure-handler + temp//_errors/ Lambda, which aggregates the detail + .json files and writes the user-facing marker + (station, reason, chunk, pages) result/FAILED_.json + │ + └──► structured CloudWatch line: + "File: , Status: FAILED | station=adobe | reason=ADOBE_API | chunk=8 | pages=1401-1600" +``` + +1. **Station detail files** (`temp//_errors/*.json`) — each station writes + rich detail only it knows (the exact reason and which chunk/pages). +2. **Step Functions `Catch` → failure-handler Lambda** — the exhaustive safety + net. It fires on *every* failure, including infrastructure failures where a + container dies before it can write a detail file (image-pull failure, OOM, + task timeout, IAM error). It aggregates any detail files and always writes the + marker. If no detail file exists, it derives the reason from the Step + Functions error `Cause`. + +> The **PDF splitter** is a special case: it runs *before* the Step Functions +> execution starts, so the Catch cannot cover it. The splitter therefore writes +> the `result/FAILED_.json` marker directly. + +## Frontend contract + +When polling for results for an uploaded file `.pdf` (base name ``), +the frontend should check for **both** of these keys: + +| Key | Meaning | +|---|---| +| `result/COMPLIANT_.pdf` | Success — the remediated PDF is ready. | +| `result/FAILED_.json` | Failure — stop polling and show the reason. | + +### `FAILED_.json` schema + +```json +{ + "status": "FAILED", + "filename": "", + "reason_category": "ADOBE_API", + "summary": "The Adobe PDF Services API (auto-tagging/extraction) failed.", + "failed_chunks": [ + { + "station": "adobe", + "reason_category": "ADOBE_API", + "message": "Adobe API error: ...", + "chunk_index": 8, + "page_start": 1401, + "page_end": 1600 + } + ], + "execution_arn": "arn:aws:states:...:execution:...", + "timestamp": "2026-06-08T17:42:11.123456+00:00" +} +``` + +- `reason_category` and `summary` are safe to show directly to the user. +- `failed_chunks[].page_start`/`page_end` tell the user **which pages** failed + (e.g. "Processing failed on pages 1401–1600"). These fields are absent for + failures that aren't chunk-specific (e.g. split or title failures). +- `message` is the raw technical detail — useful for support, not necessarily + for end users. + +### Reason categories + +| `reason_category` | Meaning | +|---|---| +| `SPLIT` | The document could not be split into pages. | +| `ADOBE_API` | Adobe PDF Services (auto-tag/extract) failed. | +| `BEDROCK_API` | Amazon Bedrock (alt text / title) failed. | +| `COMPLEXITY` | The document exceeded processing complexity limits. | +| `MERGE` | The processed pages could not be merged. | +| `TITLE` | The title/metadata step failed. | +| `INFRA` | A task failed to run (infrastructure-level failure). | +| `UNKNOWN` | Unexpected failure; see `message`. | + +## CloudWatch visibility + +Every station emits a structured failure line that the dashboard "File status" +widget parses: + +``` +File: , Status: FAILED | station= | reason= | chunk= | pages=- | +``` + +This lets operators see, per file, exactly which station failed and on which +pages — without opening individual log streams. + +## Notes for maintainers + +- `PAGES_PER_CHUNK` (default `200`) must stay in sync between the PDF splitter, + the stations, and the failure-handler so reported page ranges are accurate. It + is overridable via the `PAGES_PER_CHUNK` environment variable. +- The failure-handler is invoked **only on failure**, so its cost is effectively + zero. No always-on services or new datastores are introduced. +- Stations report failures on a **best-effort** basis: a failure while writing a + detail file is logged but never masks the original error. diff --git a/lambda/failure-handler/main.py b/lambda/failure-handler/main.py new file mode 100644 index 00000000..90c43f1a --- /dev/null +++ b/lambda/failure-handler/main.py @@ -0,0 +1,230 @@ +""" +Failure-handler Lambda for the PDF Accessibility remediation workflow. + +This function is the single, exhaustive safety net for the Step Functions +state machine. It is wired to a `Catch` block that captures EVERY failure in +the pipeline -- both in-code errors (Adobe/Bedrock API failures, complexity +issues) and infrastructure failures (a container that cannot start, an +out-of-memory kill, a task timeout, an IAM error). Whenever the workflow +fails, Step Functions routes here instead of ending silently. + +The frontend detects a finished job by polling the S3 `result/` folder. On +success the pipeline writes `result/COMPLIANT_`. This function provides +the missing failure signal in the SAME place the frontend already looks: + + result/FAILED_.json + +so the UI can stop polling and show the user WHY the job failed and WHERE +(which chunk / page range), instead of spinning indefinitely. + +Detail sources (in priority order): + 1. Per-station detail files at `temp//_errors/*.json`, written by + each station right before it exits on failure. These carry the rich + reason + chunk + page-range context that only the station knows. + 2. The Step Functions error `Cause`/`Error`, used as a fallback when a + station died before it could write a detail file (pure infra failure). + +The function NEVER raises: a safety net that can itself fail is not a safety +net. Any internal problem is logged and a best-effort marker is still written. +""" + +import json +import os +import boto3 +from datetime import datetime, timezone + +s3_client = boto3.client("s3") + +# Pages per chunk -- must match the value used by the PDF splitter Lambda +# (lambda/pdf-splitter-lambda/main.py). Kept here so the page-range shown to +# the user is accurate. Overridable via env var without a code change. +PAGES_PER_CHUNK = int(os.environ.get("PAGES_PER_CHUNK", "200")) + +# Human-readable summaries for each reason category. The category itself is +# emitted by the stations; this map is only for the user-facing message. +REASON_SUMMARIES = { + "SPLIT": "The document could not be split into pages for processing.", + "ADOBE_API": "The Adobe PDF Services API (auto-tagging/extraction) failed.", + "BEDROCK_API": "The AI model (Amazon Bedrock) failed while generating alt text.", + "COMPLEXITY": "The document exceeded processing complexity limits.", + "MERGE": "The processed pages could not be merged back into one document.", + "TITLE": "The document title/metadata step failed.", + "INFRA": "A processing task failed to run (infrastructure-level failure).", + "UNKNOWN": "Processing failed for an unexpected reason.", +} + + +def _derive_basename(event): + """Best-effort extraction of the original file's base name from the event. + + The state machine input is `{"chunks": [...], "s3_bucket": ...}` where each + chunk key looks like `temp//_chunk_.pdf`. We use that + to recover `` so we can both locate the `_errors/` folder and name + the marker `result/FAILED_.json`. + """ + chunks = event.get("chunks") or [] + if chunks and isinstance(chunks, list): + key = chunks[0].get("s3_key") or chunks[0].get("chunk_key") or "" + # key = temp//_chunk_1.pdf + parts = key.split("/") + if len(parts) >= 2 and parts[0] == "temp": + return parts[1] + return None + + +def _resolve_bucket(event): + """Find the processing bucket name from the event, with an env fallback.""" + return ( + event.get("s3_bucket") + or event.get("bucket") + or os.environ.get("BUCKET_NAME") + ) + + +def _collect_station_errors(bucket, basename): + """Read every `temp//_errors/*.json` detail file. + + Returns a list of dicts. Tolerates malformed/partial files -- a corrupt + detail file must never prevent the marker from being written. + """ + if not bucket or not basename: + return [] + + prefix = f"temp/{basename}/_errors/" + errors = [] + try: + paginator = s3_client.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=bucket, Prefix=prefix): + for obj in page.get("Contents", []): + key = obj["Key"] + if not key.endswith(".json"): + continue + try: + body = s3_client.get_object(Bucket=bucket, Key=key)["Body"].read() + errors.append(json.loads(body)) + except Exception as e: # noqa: BLE001 - tolerate any bad file + print(f"[failure-handler] Could not read detail file {key}: {e}") + except Exception as e: # noqa: BLE001 + print(f"[failure-handler] Could not list {prefix}: {e}") + return errors + + +def _parse_sfn_cause(event): + """Extract a reason from the Step Functions Catch error payload. + + When a station dies before writing a detail file (e.g. the container could + not start), Step Functions still hands us `Error` and `Cause`. We map that + to the INFRA category so the user always gets *something* actionable. + + The Catch in app.py nests this under `$.failureInfo` (result_path), so we + look there first and fall back to the top level for robustness. + """ + failure_info = event.get("failureInfo") or event + err = failure_info.get("Error") + cause_raw = failure_info.get("Cause") + cause = cause_raw + if cause_raw: + try: + # ECS/Lambda causes are often JSON strings + parsed = json.loads(cause_raw) + cause = parsed.get("errorMessage") or parsed.get("Cause") or cause_raw + except (ValueError, TypeError): + cause = cause_raw + if err or cause: + return { + "station": "workflow", + "reason_category": "INFRA", + "message": cause or err or "Unknown infrastructure failure.", + } + return None + + +def lambda_handler(event, context): + """Aggregate failure detail and write the user-facing FAILED_ marker. + + `event` is the state-machine execution input augmented by the Catch block + with `Error`/`Cause`. This function is intentionally exception-proof. + """ + print(f"[failure-handler] Invoked with event: {json.dumps(event, default=str)}") + + bucket = _resolve_bucket(event) + basename = _derive_basename(event) + + station_errors = _collect_station_errors(bucket, basename) + + # Fall back to the Step Functions cause if no station detail was written. + if not station_errors: + sfn_error = _parse_sfn_cause(event) + if sfn_error: + station_errors = [sfn_error] + + # Build the failed-chunk / page-range summary from whatever detail exists. + failed_chunks = [] + categories = [] + for err in station_errors: + category = err.get("reason_category", "UNKNOWN") + categories.append(category) + chunk_index = err.get("chunk_index") + entry = { + "station": err.get("station", "unknown"), + "reason_category": category, + "message": err.get("message", ""), + } + if chunk_index is not None: + entry["chunk_index"] = chunk_index + # Prefer page range from the detail file; otherwise compute it. + entry["page_start"] = err.get( + "page_start", (chunk_index - 1) * PAGES_PER_CHUNK + 1 + ) + entry["page_end"] = err.get("page_end", chunk_index * PAGES_PER_CHUNK) + failed_chunks.append(entry) + + # Choose the primary category (first non-UNKNOWN, else UNKNOWN). + primary_category = next((c for c in categories if c != "UNKNOWN"), None) or ( + categories[0] if categories else "UNKNOWN" + ) + summary = REASON_SUMMARIES.get(primary_category, REASON_SUMMARIES["UNKNOWN"]) + + marker = { + "status": "FAILED", + "filename": basename, + "reason_category": primary_category, + "summary": summary, + "failed_chunks": failed_chunks, + "execution_arn": event.get("executionArn"), + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + # Structured CloudWatch line for the dashboard "File status" widget. + pages_desc = ", ".join( + f"chunk {c['chunk_index']} (pages {c['page_start']}-{c['page_end']})" + for c in failed_chunks + if "chunk_index" in c + ) + print( + f"File: {basename}, Status: FAILED | reason={primary_category}" + + (f" | {pages_desc}" if pages_desc else "") + ) + + # Write the marker where the frontend already polls. Best-effort: if even + # this fails, we log loudly but do not raise (raising would re-fail the + # state machine and produce a confusing double-failure). + if bucket and basename: + marker_key = f"result/FAILED_{basename}.json" + try: + s3_client.put_object( + Bucket=bucket, + Key=marker_key, + Body=json.dumps(marker, indent=2).encode("utf-8"), + ContentType="application/json", + ) + print(f"[failure-handler] Wrote failure marker to s3://{bucket}/{marker_key}") + except Exception as e: # noqa: BLE001 + print(f"[failure-handler] CRITICAL: could not write marker {marker_key}: {e}") + else: + print( + "[failure-handler] CRITICAL: missing bucket/basename; " + f"bucket={bucket} basename={basename}. Marker NOT written." + ) + + return marker diff --git a/lambda/pdf-merger-lambda/PDFMergerLambda/src/main/java/com/example/App.java b/lambda/pdf-merger-lambda/PDFMergerLambda/src/main/java/com/example/App.java index 7b73ad16..c35d60f8 100644 --- a/lambda/pdf-merger-lambda/PDFMergerLambda/src/main/java/com/example/App.java +++ b/lambda/pdf-merger-lambda/PDFMergerLambda/src/main/java/com/example/App.java @@ -6,9 +6,12 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; import org.apache.pdfbox.multipdf.PDFMergerUtility; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -79,10 +82,51 @@ public String handleRequest(Map input, Context context) { return String.format("PDFs merged successfully.\nBucket: %s\nMerged File Key: %s\nMerged File Name: %s", bucketName, outputKey, baseFileName); } catch (Exception e) { - baseFileName = baseFileName.replace(".pdf", ""); - System.out.println("File: " + baseFileName + ", Status: Failed in Merging the PDF"); - System.out.println(String.format("Filename: %s, File not found: %s", baseFileName, e.getMessage())); - return "Failed to merge PDFs."; + String reportName = baseFileName.replace(".pdf", ""); + System.out.println("File: " + reportName + ", Status: FAILED | station=merge | reason=MERGE | " + e.getMessage()); + // Write a station detail file the Step Functions failure-handler aggregates + // into the user-facing result/FAILED_.json marker. + reportFailure(bucketName, reportName, "MERGE", e.getMessage()); + // Re-throw so the state machine's Catch fires. Returning a string here + // would be treated as SUCCESS by Step Functions and continue silently. + throw new RuntimeException("Failed to merge PDFs for " + reportName, e); + } + } + + /** + * Writes a structured failure-detail file the Step Functions failure-handler + * aggregates into result/FAILED_.json. Station: 'merge'. + * Best-effort: a failure while reporting must not mask the original error. + * + * @param bucketName The S3 bucket. + * @param fileBaseName The base file name (no extension) matching temp//. + * @param reasonCategory The failure category (e.g. "MERGE"). + * @param message The error message. + */ + private void reportFailure(String bucketName, String fileBaseName, String reasonCategory, String message) { + if (bucketName == null || fileBaseName == null) { + return; + } + try { + // Escape for safe embedding in a JSON string literal: backslashes + // first, then quotes, then control chars that would break parsing. + String safeMessage = message == null ? "" : message + .replace("\\", "\\\\") + .replace("\"", "\\\"") + .replace("\n", " ") + .replace("\r", " ") + .replace("\t", " "); + String json = String.format( + "{\"station\":\"merge\",\"reason_category\":\"%s\",\"message\":\"%s\"}", + reasonCategory, safeMessage); + byte[] bytes = json.getBytes(StandardCharsets.UTF_8); + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(bytes.length); + metadata.setContentType("application/json"); + String key = String.format("temp/%s/_errors/merge.json", fileBaseName); + s3Client.putObject(new PutObjectRequest(bucketName, key, new ByteArrayInputStream(bytes), metadata)); + } catch (Exception ex) { + System.out.println(String.format("Filename: %s, Could not write failure detail: %s", fileBaseName, ex.getMessage())); } } diff --git a/lambda/pdf-merger-lambda/PDFMergerLambda/target/PDFMergerLambda-1.0-SNAPSHOT.jar b/lambda/pdf-merger-lambda/PDFMergerLambda/target/PDFMergerLambda-1.0-SNAPSHOT.jar index 99f5a0d5..d9e93f0e 100644 Binary files a/lambda/pdf-merger-lambda/PDFMergerLambda/target/PDFMergerLambda-1.0-SNAPSHOT.jar and b/lambda/pdf-merger-lambda/PDFMergerLambda/target/PDFMergerLambda-1.0-SNAPSHOT.jar differ diff --git a/lambda/pdf-merger-lambda/PDFMergerLambda/target/classes/com/example/App.class b/lambda/pdf-merger-lambda/PDFMergerLambda/target/classes/com/example/App.class index 68125709..86272587 100644 Binary files a/lambda/pdf-merger-lambda/PDFMergerLambda/target/classes/com/example/App.class and b/lambda/pdf-merger-lambda/PDFMergerLambda/target/classes/com/example/App.class differ diff --git a/lambda/pdf-merger-lambda/PDFMergerLambda/target/original-PDFMergerLambda-1.0-SNAPSHOT.jar b/lambda/pdf-merger-lambda/PDFMergerLambda/target/original-PDFMergerLambda-1.0-SNAPSHOT.jar index 175c54d9..76c540af 100644 Binary files a/lambda/pdf-merger-lambda/PDFMergerLambda/target/original-PDFMergerLambda-1.0-SNAPSHOT.jar and b/lambda/pdf-merger-lambda/PDFMergerLambda/target/original-PDFMergerLambda-1.0-SNAPSHOT.jar differ diff --git a/lambda/pdf-splitter-lambda/main.py b/lambda/pdf-splitter-lambda/main.py index b9b0fcab..34b23331 100644 --- a/lambda/pdf-splitter-lambda/main.py +++ b/lambda/pdf-splitter-lambda/main.py @@ -14,6 +14,7 @@ import urllib.parse import io import os +import datetime # Initialize AWS clients cloudwatch = boto3.client('cloudwatch') @@ -22,6 +23,41 @@ state_machine_arn = os.environ['STATE_MACHINE_ARN'] + +def report_failure(bucket_name, file_basename, reason_category, message): + """Write the user-facing result/FAILED_.json marker directly. + + The splitter is the FIRST station and it is what STARTS the Step Functions + execution. A failure here therefore happens before any state machine exists, + so the Step Functions Catch safety net cannot cover it. To keep the "no + silent failure" guarantee, the splitter writes the failure marker itself, in + the same result/ folder the frontend polls. Station: 'split'. + + Best-effort and exception-proof: a failure while reporting a failure must not + crash the handler. + """ + # Structured CloudWatch line for the dashboard "File status" widget. + print(f"File: {file_basename}, Status: FAILED | station=split | reason={reason_category} | {message}") + if not bucket_name or not file_basename: + return + marker = { + "status": "FAILED", + "filename": file_basename, + "reason_category": reason_category, + "summary": "The document could not be split into pages for processing.", + "failed_chunks": [{"station": "split", "reason_category": reason_category, "message": str(message)[:2000]}], + "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), + } + try: + s3_client.put_object( + Bucket=bucket_name, + Key=f"result/FAILED_{file_basename}.json", + Body=json.dumps(marker, indent=2).encode("utf-8"), + ContentType="application/json", + ) + except Exception as e: + print(f"Filename - {file_basename} | CRITICAL: could not write failure marker: {e}") + def log_chunk_created(filename): """ Logs the creation of a PDF chunk. @@ -120,8 +156,11 @@ def lambda_handler(event, context): Returns: dict: HTTP response indicating the success or failure of the Lambda function execution. """ + bucket_name = None + pdf_file_key = None + file_basename = None try: - + print("Received event: " + json.dumps(event, indent=2)) # Access the S3 event structure @@ -155,17 +194,19 @@ def lambda_handler(event, context): print(f"Filename - {pdf_file_key} | Step Function started: {response['executionArn']}") except KeyError as e: - + print(f"File: {file_basename}, Status: Failed in split lambda function") print(f"Filename - {pdf_file_key} | KeyError: {str(e)}") + report_failure(bucket_name, file_basename, "SPLIT", f"Missing key in event: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f"Error: Missing key in event: {str(e)}") } except ValueError as e: - + print(f"File: {file_basename}, Status: Failed in split lambda function") print(f"Filename - {pdf_file_key} | ValueError: {str(e)}") + report_failure(bucket_name, file_basename, "SPLIT", f"Invalid input: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f"Error: {str(e)}") @@ -174,6 +215,7 @@ def lambda_handler(event, context): print(f"File: {file_basename}, Status: Failed in split lambda function") print(f"Filename - {pdf_file_key} | Error occurred: {str(e)}", exc_info=True) + report_failure(bucket_name, file_basename, "SPLIT", f"Error processing event: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f"Error processing event: {str(e)}") diff --git a/lambda/title-generator-lambda/title_generator.py b/lambda/title-generator-lambda/title_generator.py index 9aa9ac0c..5a68e3c8 100644 --- a/lambda/title-generator-lambda/title_generator.py +++ b/lambda/title-generator-lambda/title_generator.py @@ -5,6 +5,33 @@ import random import fitz # PyMuPDF + +def report_failure(bucket, file_basename, reason_category, message): + """Write a station failure-detail file that the Step Functions failure-handler + aggregates into the user-facing result/FAILED_.json marker. Station: 'title'. + + The title generator runs inside the state machine, so the Step Functions Catch + is the ultimate safety net; this detail file simply gives the user the precise + reason. Best-effort and exception-proof. + """ + print(f"File: {file_basename}, Status: FAILED | station=title | reason={reason_category} | {message}") + if not bucket or not file_basename: + return + detail = { + "station": "title", + "reason_category": reason_category, + "message": str(message)[:2000], + } + try: + boto3.client('s3').put_object( + Bucket=bucket, + Key=f"temp/{file_basename}/_errors/title.json", + Body=json.dumps(detail).encode("utf-8"), + ContentType="application/json", + ) + except Exception as e: + print(f"Filename: {file_basename} | Could not write failure detail: {e}") + # Helper function for exponential backoff and retry def exponential_backoff_retry( func, @@ -199,12 +226,16 @@ def generate_title(extracted_text, current_title): def lambda_handler(event, context): + file_info = {} + file_basename = None try: payload = event.get("Payload") file_info = parse_payload(payload) print(f"(lambda_handler | Parsed file information: {file_info})") file_name = file_info['merged_file_name'] + # Folder basename matches temp// created upstream (no extension). + file_basename = file_name.rsplit('.', 1)[0] local_path = f'/tmp/{file_name}' download_file_from_s3(file_info['bucket'], file_info['merged_file_key'], local_path, file_info['merged_file_name']) @@ -212,13 +243,8 @@ def lambda_handler(event, context): pdf_document = fitz.open(local_path) except Exception as e: print(f"(lambda_handler | Failed to open PDF file {file_name}: {e})") - return { - "statusCode": 500, - "body": { - "error": f"Failed to open PDF file {file_name}.", - "details": f"{file_name} - {str(e)}" - } - } + report_failure(file_info.get('bucket'), file_basename, "TITLE", f"Failed to open merged PDF: {e}") + raise try: extracted_text = extract_text_from_pdf(pdf_document) @@ -226,13 +252,8 @@ def lambda_handler(event, context): except Exception as e: print(f"(lambda_handler | Failed to extract text from PDF: {e})") pdf_document.close() - return { - "statusCode": 500, - "body": { - "error": "Failed to extract text from PDF.", - "details": f"{file_name} - {str(e)}" - } - } + report_failure(file_info.get('bucket'), file_basename, "TITLE", f"Failed to extract text: {e}") + raise try: title = generate_title(extracted_text, file_name) @@ -240,13 +261,8 @@ def lambda_handler(event, context): except Exception as e: print(f"(lambda_handler | Failed to generate title: {e})") pdf_document.close() - return { - "statusCode": 500, - "body": { - "error": "Failed to generate title.", - "details": f"{file_name} - {str(e)}" - } - } + report_failure(file_info.get('bucket'), file_basename, "BEDROCK_API", f"Failed to generate title via Bedrock: {e}") + raise try: set_custom_metadata(pdf_document, file_name, title) @@ -255,26 +271,16 @@ def lambda_handler(event, context): except Exception as e: print(f"(lambda_handler | Failed to set metadata or save PDF: {e})") pdf_document.close() - return { - "statusCode": 500, - "body": { - "error": "Failed to set metadata or save PDF.", - "details": f"{file_name} - {str(e)}" - } - } + report_failure(file_info.get('bucket'), file_basename, "TITLE", f"Failed to set metadata or save PDF: {e}") + raise try: save_path = save_to_s3(local_path, file_info['bucket'], file_name) print(f"(lambda_handler | Saved file to S3 at: {save_path})") except Exception as e: print(f"(lambda_handler | Failed to save file to S3: {e})") - return { - "statusCode": 500, - "body": { - "error": "Failed to save file to S3.", - "details": f"{file_name} - {str(e)}" - } - } + report_failure(file_info.get('bucket'), file_basename, "TITLE", f"Failed to save final PDF to S3: {e}") + raise return { "statusCode": 200, @@ -286,10 +292,8 @@ def lambda_handler(event, context): } except Exception as e: print(f"(lambda_handler | General error in lambda_handler: {e})") - return { - "statusCode": 500, - "body": { - "error": "An unexpected error occurred.", - "details": f"Filename: {file_info.get('merged_file_name','Unknown')} - {str(e)}" - } - } + # Report and re-raise: returning a 500 dict would be treated as SUCCESS by + # the Step Functions LambdaInvoke and let the workflow continue silently. + # Raising ensures the state machine's Catch fires and the user is notified. + report_failure(file_info.get('bucket'), file_basename, "TITLE", f"Unexpected error: {e}") + raise