Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 65 additions & 4 deletions adobe-autotag-container/adobe_autotag_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,62 @@

s3 = boto3.client('s3')

# Pages per chunk -- must match the PDF splitter Lambda so reported page ranges
# are accurate. Each chunk N covers pages (N-1)*PAGES_PER_CHUNK+1 .. N*PAGES_PER_CHUNK.
PAGES_PER_CHUNK = int(os.environ.get('PAGES_PER_CHUNK', '200'))


def _chunk_index_from_key(chunk_key):
"""Extract the 1-based chunk index from a key like '.../<name>_chunk_8.pdf'."""
try:
match = re.search(r'_chunk_(\d+)\.pdf$', chunk_key or '')
return int(match.group(1)) if match else None
except Exception:
return None


def report_failure(bucket_name, file_base_name, chunk_key, reason_category, message):
"""Write a structured failure-detail file the Step Functions failure-handler
aggregates into the user-facing result/FAILED_<name>.json marker.

Station: 'adobe' (Adobe AutoTag/Extract). This carries the WHY (reason
category) and the WHERE (chunk index + page range) that only this station
knows. Best-effort and exception-proof: reporting a failure must never throw
a second failure that masks the original error.
"""
chunk_index = _chunk_index_from_key(chunk_key)
page_start = ((chunk_index - 1) * PAGES_PER_CHUNK + 1) if chunk_index else None
page_end = (chunk_index * PAGES_PER_CHUNK) if chunk_index else None

# Structured CloudWatch line for the dashboard "File status" widget.
pages_desc = f" | chunk={chunk_index} | pages={page_start}-{page_end}" if chunk_index else ""
logger.error(
f"File: {file_base_name}, Status: FAILED | station=adobe | "
f"reason={reason_category}{pages_desc} | {message}"
)

if not bucket_name or not file_base_name:
return
detail = {
"station": "adobe",
"reason_category": reason_category,
"message": str(message)[:2000],
"chunk_index": chunk_index,
"page_start": page_start,
"page_end": page_end,
}
try:
suffix = chunk_index if chunk_index is not None else "unknown"
s3.put_object(
Bucket=bucket_name,
Key=f"temp/{file_base_name}/_errors/adobe_chunk_{suffix}.json",
Body=json.dumps(detail).encode("utf-8"),
ContentType="application/json",
)
except Exception as e: # noqa: BLE001 - never mask the original failure
logger.error(f"Filename : {file_base_name} | Could not write failure detail: {e}")


def download_file_from_s3(bucket_name,file_base_name, file_key, local_path):
"""
Download a file from an S3 bucket.
Expand Down Expand Up @@ -640,11 +696,12 @@ def main():
"""
file_key = None
file_base_name = None

try:
bucket_name = os.getenv('S3_BUCKET_NAME')
s3_file_key = None
bucket_name = os.getenv('S3_BUCKET_NAME')

try:
s3_file_key = os.getenv('S3_FILE_KEY')

if not bucket_name or not s3_file_key:
logging.error("Error: S3_BUCKET_NAME and S3_FILE_KEY environment variables are required.")
sys.exit(1)
Expand Down Expand Up @@ -717,18 +774,22 @@ def main():
except (ServiceApiException, ServiceUsageException, SdkException) as e:
logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - Adobe API Error")
logger.error(f"Filename : {file_key} | Adobe API Error: {e}")
report_failure(bucket_name, file_base_name, s3_file_key, "ADOBE_API", f"Adobe API error: {e}")
sys.exit(1)
except ClientError as e:
logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - AWS Error")
logger.error(f"Filename : {file_key} | AWS Error: {e}")
report_failure(bucket_name, file_base_name, s3_file_key, "INFRA", f"AWS error: {e}")
sys.exit(1)
except FileNotFoundError as e:
logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - File Not Found")
logger.error(f"Filename : {file_key} | File Not Found Error: {e}")
report_failure(bucket_name, file_base_name, s3_file_key, "ADOBE_API", f"Expected Adobe output not found: {e}")
sys.exit(1)
except Exception as e:
logger.error(f"File: {file_base_name}, Status: Failed in First ECS task")
logger.error(f"Filename : {file_key} | Unexpected Error: {e}")
report_failure(bucket_name, file_base_name, s3_file_key, "UNKNOWN", f"Unexpected error: {e}")
sys.exit(1)

if __name__ == "__main__":
Expand Down
109 changes: 103 additions & 6 deletions alt-text-generator-container/alt_text_generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,87 @@ function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}

// Pages per chunk -- must match the PDF splitter Lambda so reported page ranges
// are accurate. Chunk N covers pages (N-1)*PAGES_PER_CHUNK+1 .. N*PAGES_PER_CHUNK.
const PAGES_PER_CHUNK = parseInt(process.env.PAGES_PER_CHUNK || "200", 10);

/**
* Writes a structured failure-detail file that the Step Functions failure-handler
* aggregates into the user-facing result/FAILED_<name>.json marker.
*
* Station: 'alttext' (Bedrock alt-text generation). Carries the WHY (reason
* category) and WHERE (chunk index + page range). Best-effort and exception-proof:
* reporting a failure must never throw a second failure that masks the original.
*/
async function reportFailure(bucketName, fileBaseName, s3FileKey, reasonCategory, message) {
let chunkIndex = null;
const match = /_chunk_(\d+)\.pdf$/.exec(s3FileKey || "");
if (match) chunkIndex = parseInt(match[1], 10);
const pageStart = chunkIndex ? (chunkIndex - 1) * PAGES_PER_CHUNK + 1 : null;
const pageEnd = chunkIndex ? chunkIndex * PAGES_PER_CHUNK : null;

// Structured CloudWatch line for the dashboard "File status" widget.
const pagesDesc = chunkIndex ? ` | chunk=${chunkIndex} | pages=${pageStart}-${pageEnd}` : "";
logger.error(`File: ${fileBaseName}, Status: FAILED | station=alttext | reason=${reasonCategory}${pagesDesc} | ${message}`);

if (!bucketName || !fileBaseName) return;
const detail = {
station: "alttext",
reason_category: reasonCategory,
message: String(message).slice(0, 2000),
chunk_index: chunkIndex,
page_start: pageStart,
page_end: pageEnd,
};
try {
const suffix = chunkIndex !== null ? chunkIndex : "unknown";
await s3Client.send(new PutObjectCommand({
Bucket: bucketName,
Key: `temp/${fileBaseName}/_errors/alttext_chunk_${suffix}.json`,
Body: JSON.stringify(detail),
ContentType: "application/json",
}));
} catch (e) {
logger.error(`Filename: ${fileBaseName} | Could not write failure detail: ${e.message || e}`);
}
}

const MAX_ASPECT_RATIO = 20;

function getImageDimensions(buffer) {
try {
if (buffer[0] === 0x89 && buffer[1] === 0x50 && buffer[2] === 0x4E && buffer[3] === 0x47) {
const width = buffer.readUInt32BE(16);
const height = buffer.readUInt32BE(20);
return { width, height };
}
if (buffer[0] === 0xFF && buffer[1] === 0xD8) {
let offset = 2;
while (offset < buffer.length - 1) {
if (buffer[offset] !== 0xFF) break;
const marker = buffer[offset + 1];
if (marker === 0xC0 || marker === 0xC2) {
const height = buffer.readUInt16BE(offset + 5);
const width = buffer.readUInt16BE(offset + 7);
return { width, height };
}
const segmentLength = buffer.readUInt16BE(offset + 2);
offset += 2 + segmentLength;
}
}
return null;
} catch (e) {
return null;
}
}

function isAspectRatioValid(dimensions) {
if (!dimensions || !dimensions.width || !dimensions.height) return true;
const { width, height } = dimensions;
const ratio = Math.max(width / height, height / width);
return ratio <= MAX_ASPECT_RATIO;
}


/**
* Invokes the Bedrock AI model to generate alt text for a given image.
Expand Down Expand Up @@ -492,6 +573,8 @@ async function startProcess() {
logger.info(`Filename: ${filebasename} | imageObjects: ${imageObjects}`);
logger.info(`Filename: ${filebasename} | Total images to process: ${imageObjects.length}`);

let skippedCount = 0;

for (const imageObject of imageObjects) {
try {
const getObjectParams = {
Expand All @@ -502,7 +585,7 @@ async function startProcess() {
logger.info(`Filename: ${filebasename} | Image Object Bucketname: ${bucketName}`);
const command = new GetObjectCommand(getObjectParams);
const { Body } = await s3Client.send(command);

// Stream the body contents to a buffer
const chunks = [];
await pipeline(Body, async function* (source) {
Expand All @@ -511,6 +594,16 @@ async function startProcess() {
}
});
const fileBuffer = Buffer.concat(chunks);

const dimensions = getImageDimensions(fileBuffer);
if (dimensions && !isAspectRatioValid(dimensions)) {
skippedCount++;
const altText = "Decorative element";
combinedResults[imageObject.id] = altText;
logger.info(`Filename: ${filebasename} | Skipping image ${imageObject.id} - aspect ratio ${Math.max(dimensions.width/dimensions.height, dimensions.height/dimensions.width).toFixed(1)}:1 exceeds ${MAX_ASPECT_RATIO}:1 limit (${dimensions.width}x${dimensions.height})`);
continue;
}

const localFilePath = path.join(__dirname, `${imageObject.path.split('/').pop()}`);
logger.info(`Filename: ${filebasename} | Local File Path: ${localFilePath}`);
fs_1.writeFileSync(localFilePath, fileBuffer);
Expand All @@ -519,23 +612,25 @@ async function startProcess() {
logger.info(`Filename: ${filebasename} | Response:${response}`);
Object.assign(combinedResults, JSON.parse(response));
successCount++;
logger.info(`Filename: ${filebasename} | Alt text generation succeeded for image ${imageObject.id} (${successCount} succeeded, ${failureCount} failed)`);
logger.info(`Filename: ${filebasename} | Alt text generation succeeded for image ${imageObject.id} (${successCount} succeeded, ${failureCount} failed, ${skippedCount} skipped)`);
} catch (error) {
failureCount++;
logger.error(`Filename: ${filebasename} | Alt text generation failed for image ${imageObject.id}: ${error.message || error}`);
logger.info(`Filename: ${filebasename} | Progress: ${successCount} succeeded, ${failureCount} failed`);
logger.info(`Filename: ${filebasename} | Progress: ${successCount} succeeded, ${failureCount} failed, ${skippedCount} skipped`);
}
await sleep(2000);
}

// Check if we have any images and if all of them failed
if (imageObjects.length > 0 && successCount === 0) {
// Only fail if all images failed AND none were skipped (skipped images got default alt text)
if (imageObjects.length > 0 && successCount === 0 && skippedCount === 0) {
logger.error(`Filename: ${filebasename} | All ${failureCount} alt text generation requests failed - likely due to throttling or Bedrock API issues`);
logger.error(`File: ${filebasename}, Status: Failed in second ECS task - All Bedrock requests failed`);
await reportFailure(bucketName, filebasename, process.env.S3_FILE_KEY, "BEDROCK_API",
`All ${failureCount} Bedrock alt-text requests failed (throttling or Bedrock API issues).`);
process.exit(1);
}

logger.info(`Filename: ${filebasename} | Alt text generation complete: ${successCount} succeeded, ${failureCount} failed out of ${imageObjects.length} images`);
logger.info(`Filename: ${filebasename} | Alt text generation complete: ${successCount} succeeded, ${failureCount} failed, ${skippedCount} skipped (bad aspect ratio) out of ${imageObjects.length} images`);

let defaultText = "No text available";

Expand All @@ -558,6 +653,8 @@ async function startProcess() {
} catch (error) {
logger.info(`File: ${filebasename}, Status: Error in second ECS task`);
logger.error(`Filename: ${filebasename} | Error processing images: ${error}`);
await reportFailure(bucketName, filebasename, process.env.S3_FILE_KEY, "UNKNOWN",
`Error processing images: ${error.message || error}`);
process.exit(1);
}
}
Expand Down
49 changes: 46 additions & 3 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
cluster=pdf_remediation_cluster,
task_definition=adobe_autotag_task_def,
assign_public_ip=False,

result_path="$.ecsResult",

container_overrides=[tasks.ContainerOverride(
container_definition = adobe_autotag_container_def,
environment=[
Expand Down Expand Up @@ -213,11 +214,11 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
environment=[
tasks.TaskEnvironmentVariable(
name="S3_BUCKET_NAME",
value=sfn.JsonPath.string_at("$.Overrides.ContainerOverrides[0].Environment[0].Value")
value=sfn.JsonPath.string_at("$.s3_bucket")
),
tasks.TaskEnvironmentVariable(
name="S3_FILE_KEY",
value=sfn.JsonPath.string_at("$.Overrides.ContainerOverrides[0].Environment[1].Value")
value=sfn.JsonPath.string_at("$.s3_key")
),
tasks.TaskEnvironmentVariable(
name="AWS_REGION",
Expand Down Expand Up @@ -366,6 +367,48 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
parallel_accessibility_workflow.branch(remediation_chain)
parallel_accessibility_workflow.branch(pre_remediation_accessibility_checker_task)

# ---------------------------------------------------------------------
# Failure-handler: the single, exhaustive safety net for the workflow.
#
# The frontend detects a finished job by polling the S3 result/ folder.
# Previously, any failure left the workflow FAILED with nothing written
# to result/, so the UI polled forever ("silent failure"). This Lambda
# is wired to a Catch that fires on EVERY error -- in-code (Adobe/Bedrock/
# complexity) and infrastructure (container can't start, OOM, timeout) --
# and writes result/FAILED_<name>.json carrying the reason category and
# the failing chunk/page range, so the UI can show the user what happened.
# It is invoked only on failure, so its cost is effectively zero.
# ---------------------------------------------------------------------
failure_handler_lambda = lambda_.Function(
self, 'WorkflowFailureHandlerLambda',
runtime=lambda_.Runtime.PYTHON_3_12,
handler='main.lambda_handler',
code=lambda_.Code.from_asset('lambda/failure-handler'),
timeout=Duration.seconds(120),
memory_size=256,
environment={
'BUCKET_NAME': pdf_processing_bucket.bucket_name,
'PAGES_PER_CHUNK': '200',
}
)
pdf_processing_bucket.grant_read_write(failure_handler_lambda)
failure_handler_lambda.add_to_role_policy(cloudwatch_metrics_policy)

failure_handler_task = tasks.LambdaInvoke(self, "HandleWorkflowFailure",
lambda_function=failure_handler_lambda,
payload=sfn.TaskInput.from_json_path_at("$"),
output_path="$.Payload")

# Route every failure of the parallel workflow to the failure handler.
# result_path keeps the original input (chunks, s3_bucket) intact and
# nests the Step Functions Error/Cause under $.failureInfo so the handler
# can recover both the file identity and the failure reason.
parallel_accessibility_workflow.add_catch(
failure_handler_task,
errors=["States.ALL"],
result_path="$.failureInfo"
)

pdf_remediation_workflow_log_group = logs.LogGroup(self, "PdfRemediationWorkflowLogs",
log_group_name="/aws/states/pdf-accessibility-remediation-workflow",
retention=logs.RetentionDays.ONE_MONTH,
Expand Down
Loading