Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 142 additions & 35 deletions adobe-autotag-container/adobe_autotag_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,68 @@
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")


class BadPdfError(Exception):
"""Raised when Adobe rejects the PDF as damaged or too complex (BAD_PDF / 400)."""

s3 = boto3.client('s3')

# Pages per chunk -- must match the PDF splitter Lambda so reported page ranges
# are accurate. Each chunk N covers pages (N-1)*PAGES_PER_CHUNK+1 .. N*PAGES_PER_CHUNK.
PAGES_PER_CHUNK = int(os.environ.get('PAGES_PER_CHUNK', '200'))


def _chunk_index_from_key(chunk_key):
"""Extract the 1-based chunk index from a key like '.../<name>_chunk_8.pdf'."""
try:
match = re.search(r'_chunk_(\d+)\.pdf$', chunk_key or '')
return int(match.group(1)) if match else None
except Exception:
return None


def report_failure(bucket_name, file_base_name, chunk_key, reason_category, message):
"""Write a structured failure-detail file the Step Functions failure-handler
aggregates into the user-facing result/FAILED_<name>.json marker.

Station: 'adobe' (Adobe AutoTag/Extract). This carries the WHY (reason
category) and the WHERE (chunk index + page range) that only this station
knows. Best-effort and exception-proof: reporting a failure must never throw
a second failure that masks the original error.
"""
chunk_index = _chunk_index_from_key(chunk_key)
page_start = ((chunk_index - 1) * PAGES_PER_CHUNK + 1) if chunk_index else None
page_end = (chunk_index * PAGES_PER_CHUNK) if chunk_index else None

# Structured CloudWatch line for the dashboard "File status" widget.
pages_desc = f" | chunk={chunk_index} | pages={page_start}-{page_end}" if chunk_index else ""
logger.error(
f"File: {file_base_name}, Status: FAILED | station=adobe | "
f"reason={reason_category}{pages_desc} | {message}"
)

if not bucket_name or not file_base_name:
return
detail = {
"station": "adobe",
"reason_category": reason_category,
"message": str(message)[:2000],
"chunk_index": chunk_index,
"page_start": page_start,
"page_end": page_end,
}
try:
suffix = chunk_index if chunk_index is not None else "unknown"
s3.put_object(
Bucket=bucket_name,
Key=f"temp/{file_base_name}/_errors/adobe_chunk_{suffix}.json",
Body=json.dumps(detail).encode("utf-8"),
ContentType="application/json",
)
except Exception as e: # noqa: BLE001 - never mask the original failure
logger.error(f"Filename : {file_base_name} | Could not write failure detail: {e}")


def download_file_from_s3(bucket_name,file_base_name, file_key, local_path):
"""
Download a file from an S3 bucket.
Expand Down Expand Up @@ -245,9 +305,15 @@ def autotag_pdf_with_options(filename, client_id, client_secret):

logging.info(f'Filename : {filename} | Adobe Autotag completed successfully')

except (ServiceApiException, ServiceUsageException, SdkException) as e:
except ServiceApiException as e:
if "BAD_PDF" in str(e) or "400" in str(e):
logging.warning(f'Filename : {filename} | Adobe AutoTag rejected PDF as damaged/too complex (BAD_PDF) — will use fallback path')
raise BadPdfError(str(e))
logging.error(f'Filename : {filename} | Adobe Autotag API failed: {e}')
raise # Re-raise to stop the container
raise
except (ServiceUsageException, SdkException) as e:
logging.error(f'Filename : {filename} | Adobe Autotag API failed: {e}')
raise
def extract_api(filename, client_id, client_secret):
"""
Extracts text, tables, and figures from a PDF using Adobe PDF Services.
Expand Down Expand Up @@ -634,17 +700,34 @@ def extract_images_from_excel(filename, figure_path, autotag_report_path, images
f'{s3_folder_autotag}/{file_key}_temp_images_data.db')
logging.info(f'Filename : {filename} | Uploaded SQLite DB to S3 With No Images')

def _write_empty_image_db(images_output_dir, bucket_name, s3_folder_autotag, file_key, file_base_name):
"""Write an empty SQLite image DB so the alt-text step sees zero images and completes cleanly."""
os.makedirs(images_output_dir, exist_ok=True)
db_path = os.path.join(images_output_dir, "temp_images_data.db")
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS image_data (
objid TEXT, img_path TEXT, prev TEXT, current TEXT, next TEXT, context TEXT
)
""")
conn.commit()
conn.close()
s3.upload_file(db_path, bucket_name, f'{s3_folder_autotag}/{file_key}_temp_images_data.db')
logging.info(f'Filename : {file_base_name} | Uploaded empty image DB (BAD_PDF fallback)')


def main():
"""
Main function that coordinates the downloading, processing, and uploading of PDF files and associated content.
"""
file_key = None
file_base_name = None

try:
bucket_name = os.getenv('S3_BUCKET_NAME')
s3_file_key = None
bucket_name = os.getenv('S3_BUCKET_NAME')

try:
s3_file_key = os.getenv('S3_FILE_KEY')

if not bucket_name or not s3_file_key:
logging.error("Error: S3_BUCKET_NAME and S3_FILE_KEY environment variables are required.")
sys.exit(1)
Expand Down Expand Up @@ -673,62 +756,86 @@ def main():

# Run Adobe Autotag API
logging.info(f'Filename : {file_key} | Running Adobe Autotag API...')
autotag_pdf_with_options(filename, client_id, client_secret)
bad_pdf_fallback = False
try:
autotag_pdf_with_options(filename, client_id, client_secret)
except BadPdfError as e:
# Adobe cannot process this PDF (damaged / too complex).
# Fall back: use the viewer-prefs PDF as-is, treat all images as
# decorative (empty DB), and let the rest of the pipeline complete
# so the user gets output rather than a silent failure.
bad_pdf_fallback = True
logging.warning(f'Filename : {file_key} | BAD_PDF fallback active — skipping Adobe autotag/extract, all images treated as decorative')
s3_folder_autotag = f"temp/{file_base_name}/output_autotag"
images_output_dir = "output/zipfile/images"
# Upload the viewer-prefs PDF as the autotag output so downstream steps have a file
save_to_s3(filename, bucket_name, "output_autotag", file_base_name, file_key)
# Write an empty image DB so the alt-text step finds zero images and skips all of them
_write_empty_image_db(images_output_dir, bucket_name, s3_folder_autotag, file_key, file_base_name)
logging.info(f'Filename : {file_key} | BAD_PDF fallback: uploaded viewer-prefs PDF and empty image DB')

# Run Adobe Extract API
logging.info(f'Filename : {file_key} | Running Adobe Extract API...')
extract_api(filename, client_id, client_secret)
if not bad_pdf_fallback:
# Run Adobe Extract API
logging.info(f'Filename : {file_key} | Running Adobe Extract API...')
extract_api(filename, client_id, client_secret)

extract_api_zip_path = f"output/ExtractTextInfoFromPDF/extract${filename}.zip"
extract_to = f"output/zipfile/{filename}"

logging.info(f'Filename : {file_key} | Unzipping extracted content...')
unzip_file(filename, extract_api_zip_path, extract_to)
extract_api_zip_path = f"output/ExtractTextInfoFromPDF/extract${filename}.zip"
extract_to = f"output/zipfile/{filename}"

with open(f"output/zipfile/{filename}/structuredData.json") as file:
data = json.load(file)
logging.info(f'Filename : {file_key} | Unzipping extracted content...')
unzip_file(filename, extract_api_zip_path, extract_to)

pdf_document = pymupdf.open(filename)
with open(f"output/zipfile/{filename}/structuredData.json") as file:
data = json.load(file)

# Add TOC entries
logging.info(f'Filename : {file_key} | Adding TOC entries...')
add_toc_to_pdf(filename, pdf_document, data)
pdf_document = pymupdf.open(filename)

pdf_document.saveIncr()
pdf_document.close()

logging.info(f'Filename : {file_key} | Uploading processed PDF to S3...')
save_to_s3(filename, bucket_name, "output_autotag", file_base_name, file_key)
# Add TOC entries
logging.info(f'Filename : {file_key} | Adding TOC entries...')
add_toc_to_pdf(filename, pdf_document, data)

logging.info(f"PDF saved with updated metadata and TOC. File location: COMPLIANT_{file_key}")
pdf_document.saveIncr()
pdf_document.close()

figure_path = f"{extract_to}/figures"
autotag_report_path = f"output/AutotagPDF/{filename}.xlsx"
images_output_dir = "output/zipfile/images"
logging.info(f'Filename : {file_key} | Uploading processed PDF to S3...')
save_to_s3(filename, bucket_name, "output_autotag", file_base_name, file_key)

logging.info(f"PDF saved with updated metadata and TOC. File location: COMPLIANT_{file_key}")

figure_path = f"{extract_to}/figures"
autotag_report_path = f"output/AutotagPDF/{filename}.xlsx"
images_output_dir = "output/zipfile/images"

s3_folder_autotag = f"temp/{file_base_name}/output_autotag"

logging.info(f'Filename : {file_key} | Extracting and uploading images...')
extract_images_from_excel(filename, figure_path, autotag_report_path, images_output_dir, bucket_name, s3_folder_autotag, file_key)

s3_folder_autotag = f"temp/{file_base_name}/output_autotag"

logging.info(f'Filename : {file_key} | Extracting and uploading images...')
extract_images_from_excel(filename, figure_path, autotag_report_path, images_output_dir, bucket_name, s3_folder_autotag, file_key)

logging.info(f'Filename : {file_key} | Processing completed successfully')
logger.info(f"File: {file_base_name}, Status: Succeeded in First ECS task")

except BadPdfError:
# Already handled above via fallback path — should not reach here
pass
except (ServiceApiException, ServiceUsageException, SdkException) as e:
logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - Adobe API Error")
logger.error(f"Filename : {file_key} | Adobe API Error: {e}")
report_failure(bucket_name, file_base_name, s3_file_key, "ADOBE_API", f"Adobe API error: {e}")
sys.exit(1)
except ClientError as e:
logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - AWS Error")
logger.error(f"Filename : {file_key} | AWS Error: {e}")
report_failure(bucket_name, file_base_name, s3_file_key, "INFRA", f"AWS error: {e}")
sys.exit(1)
except FileNotFoundError as e:
logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - File Not Found")
logger.error(f"Filename : {file_key} | File Not Found Error: {e}")
report_failure(bucket_name, file_base_name, s3_file_key, "ADOBE_API", f"Expected Adobe output not found: {e}")
sys.exit(1)
except Exception as e:
logger.error(f"File: {file_base_name}, Status: Failed in First ECS task")
logger.error(f"Filename : {file_key} | Unexpected Error: {e}")
report_failure(bucket_name, file_base_name, s3_file_key, "UNKNOWN", f"Unexpected error: {e}")
sys.exit(1)

if __name__ == "__main__":
Expand Down
Loading