From 32cac9a49f7d38ea0de5fc91a19dbd789249b791 Mon Sep 17 00:00:00 2001 From: Alexander Amiri Date: Wed, 27 May 2026 23:23:03 +0200 Subject: [PATCH] Parallelize team-provisioner Google Admin SDK calls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Provision Groups CI in registry has been failing with a read-timeout on `aws lambda invoke`. The Lambda itself was running ~90s end-to-end (well under its 300s timeout) but the AWS CLI's default 60s read-timeout was tripping and triggering retries. Root cause: per-hero and per-group Google Admin Directory API calls were issued sequentially. With 55 heroes and 35 groups, the ~230 round-trips at ~400ms each sum to ~90s. Fix in this PR: - Extract per-hero (`_sync_hero_account`, `_sync_hero_alias`), per-group (`_sync_one_group`) and per-access-group helpers from `handle_sync_groups_and_heros`. - Dispatch each step's per-item work through a ThreadPoolExecutor with `GROUP_SYNC_WORKERS` workers (default 5, env-tunable). - Pre-fetch the OAuth token once before parallel work so workers don't race to refresh it. - Worker count sized to stay under Google's per-user quota (1,500 queries / 100s ≈ 15 QPS). 5 workers × ~3 QPS each sits at the budget ceiling, well below the 40 QPS project quota. Belt-and-suspenders in the same PR: - Add `--cli-read-timeout 0` to `scripts/provision-groups.py` and `scripts/provision-teams.sh` so a future regression past 60s won't surface as a false CI failure. Expected effect: total Lambda duration drops from ~90s to ~20-25s, comfortably under the 60s CLI default (which we've also removed) and the 300s Lambda timeout. --- scripts/provision-groups.py | 3 + scripts/provision-teams.sh | 1 + .../lambda-src/team_provisioner/handler.py | 593 ++++++++++-------- 3 files changed, 324 insertions(+), 273 deletions(-) diff --git a/scripts/provision-groups.py b/scripts/provision-groups.py index 6893a82..c0e04ef 100644 --- a/scripts/provision-groups.py +++ b/scripts/provision-groups.py @@ -80,6 +80,9 @@ def invoke_lambda(payload): "--function-name", "javabin-team-provisioner", "--payload", f"fileb://{payload_file}", "--cli-binary-format", "raw-in-base64-out", + # Lambda timeout is 300s; default CLI read timeout (60s) is too low. + # Disable client-side read timeout so the CLI waits for the Lambda response. + "--cli-read-timeout", "0", "/tmp/lambda-response.json", ], capture_output=True, diff --git a/scripts/provision-teams.sh b/scripts/provision-teams.sh index 9e82d01..067835a 100644 --- a/scripts/provision-teams.sh +++ b/scripts/provision-teams.sh @@ -45,6 +45,7 @@ aws lambda invoke \ --function-name javabin-team-provisioner \ --payload "$PAYLOAD" \ --cli-binary-format raw-in-base64-out \ + --cli-read-timeout 0 \ /tmp/lambda-response.json cat /tmp/lambda-response.json diff --git a/terraform/lambda-src/team_provisioner/handler.py b/terraform/lambda-src/team_provisioner/handler.py index 5bee64e..772c34e 100644 --- a/terraform/lambda-src/team_provisioner/handler.py +++ b/terraform/lambda-src/team_provisioner/handler.py @@ -21,6 +21,7 @@ import urllib.error import urllib.parse import urllib.request +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone import boto3 @@ -65,6 +66,11 @@ ) BUDGET_ENFORCEMENT_TOPIC_ARN = os.environ.get("BUDGET_ENFORCEMENT_TOPIC_ARN", "") +# Parallelism for per-group / per-hero Google Admin SDK calls in handle_sync_groups_and_heros. +# Google Directory API per-user quota: 1,500 queries / 100 s (~15 QPS sustained). +# We impersonate a single admin, so 5 workers × ~3 calls/s each ≈ 15 QPS — at the budget ceiling. +GROUP_SYNC_WORKERS = int(os.environ.get("GROUP_SYNC_WORKERS", "5")) + # Team name constraints: lowercase letters only (a-z), max 12 characters. # This keeps resource names within AWS limits (ALB TG: 32 chars for {team}-{service}). import re @@ -1383,90 +1389,303 @@ def _assign_permission_set(group_id, permission_set_name): return _create_account_assignment(ps_arn, group_id, permission_set_name) +def _sync_hero_account(hero, access_token): + """Check/create a single hero's Google Workspace account. + + Returns dict with keys: email, status ("existed"|"created"|"failed"|"skipped"), + and optional "error" for failed. + """ + email = hero.get("javabin_google_email", "") + if not email: + return {"status": "skipped"} + + user_key = urllib.parse.quote(email, safe="") + existing_user = _google_api("GET", f"/users/{user_key}", access_token) + if existing_user and not existing_user.get("already_exists"): + logger.info("Google Workspace account already exists: %s", email) + return {"email": email, "status": "existed"} + + import secrets + import string + temp_password = "".join( + secrets.choice(string.ascii_letters + string.digits + "!@#$%") for _ in range(24) + ) + personal_email = hero.get("personal_email", "") + user_body = { + "primaryEmail": email, + "name": { + "givenName": hero.get("firstname", ""), + "familyName": hero.get("lastname", ""), + }, + "password": temp_password, + "changePasswordAtNextLogin": True, + "recoveryEmail": personal_email, + } + + try: + create_result = _google_api("POST", "/users", access_token, user_body) + if create_result and not create_result.get("already_exists"): + logger.info("Created Google Workspace account: %s (recovery: %s)", email, personal_email) + if personal_email: + try: + _setup_email_forwarding(email, personal_email) + except Exception as fe: + logger.warning("Could not set up forwarding %s → %s: %s", email, personal_email, fe) + try: + pw_url = _generate_password_set_url(email) + _send_welcome_email( + access_token, email, personal_email, + hero.get("firstname", ""), password_set_url=pw_url, + ) + except Exception as we: + logger.warning("Could not send welcome email to %s: %s", personal_email, we) + return {"email": email, "status": "created"} + # 409 / already_exists — still ensure forwarding + if personal_email: + try: + _setup_email_forwarding(email, personal_email) + except Exception as fe: + logger.warning("Could not set up forwarding %s → %s: %s", email, personal_email, fe) + return {"email": email, "status": "existed"} + except Exception as e: + logger.error("Failed to create account %s: %s", email, e) + return {"email": email, "status": "failed", "error": str(e)[:200]} + + +def _sync_hero_alias(hero, access_token): + """Check/create one hero's email alias. Returns dict or None if no alias.""" + alias = hero.get("alias", "") + email = hero.get("javabin_google_email", "") + if not alias or not email: + return None + + user_key = urllib.parse.quote(email, safe="") + try: + existing_aliases = _google_api("GET", f"/users/{user_key}/aliases", access_token) + existing_alias_emails = set() + if existing_aliases and "aliases" in existing_aliases: + existing_alias_emails = {a["alias"].lower() for a in existing_aliases["aliases"]} + + if alias.lower() not in existing_alias_emails: + alias_result = _google_api( + "POST", f"/users/{user_key}/aliases", access_token, {"alias": alias} + ) + if alias_result: + logger.info("Created alias %s for %s", alias, email) + return {"status": "created", "user": email, "alias": alias} + return {"status": "exists", "user": email, "alias": alias} + except Exception as e: + logger.error("Failed to create alias %s for %s: %s", alias, email, e) + return {"status": "failed", "user": email, "alias": alias, "error": str(e)[:200]} + + +def _sync_one_group(group, access_token): + """Sync a single group: Google Workspace + optional Cognito + Identity Center. + + Returns (name, result_dict) where result_dict has keys "google", + optionally "cognito", "identity_center". Returns (None, None) if invalid. + """ + name = group.get("name") + google_email = group.get("google") + members = group.get("members", []) + + if not name or not google_email: + return None, None + + logger.info("Syncing group: %s (%s) with %d members", name, google_email, len(members)) + gr = {} + + try: + group_key = urllib.parse.quote(google_email, safe="") + group_body = { + "email": google_email, + "name": name, + "description": f"javaBin group: {name}", + } + + existing = _google_api("GET", f"/groups/{group_key}", access_token) + if existing and not existing.get("already_exists"): + # Group exists — do NOT update name/description/email. + # Updating existing groups can rename unrelated groups that + # share a display name or alias (e.g. jz-ledelsen → javazone). + logger.info("Google group %s already exists — skipping metadata update", google_email) + else: + _google_api("POST", "/groups", access_token, group_body) + + current_emails = set() + page_token = None + while True: + path = f"/groups/{group_key}/members?maxResults=200" + if page_token: + path += f"&pageToken={urllib.parse.quote(page_token, safe='')}" + resp = _google_api("GET", path, access_token) + if resp and "members" in resp: + for m in resp["members"]: + current_emails.add(m["email"].lower()) + if resp and resp.get("nextPageToken"): + page_token = resp["nextPageToken"] + else: + break + + desired_emails = {m.lower() for m in members if m} + + added = 0 + for email in desired_emails - current_emails: + _google_api( + "POST", f"/groups/{group_key}/members", access_token, + {"email": email, "role": "MEMBER"}, + ) + added += 1 + + # Additive only — never remove members automatically. + # Members added manually via Google Admin UI are preserved. + extra = current_emails - desired_emails + if extra: + logger.info( + "Group %s has %d members not in YAML (preserved): %s", + google_email, len(extra), ", ".join(sorted(extra)[:5]), + ) + + gr["google"] = { + "synced": True, + "group": google_email, + "member_count": len(desired_emails | current_emails), + "added": added, + "extra_preserved": len(extra), + } + except Exception as e: + logger.error("Google group sync failed for %s: %s", name, e) + gr["google"] = {"error": str(e)[:200]} + + if group.get("cognito"): + try: + gr["cognito"] = sync_group_to_cognito(group) + except Exception as e: + logger.error("Cognito sync failed for %s: %s", name, e) + gr["cognito"] = {"error": str(e)[:200]} + + if group.get("identity_center"): + try: + gr["identity_center"] = sync_group_to_identity_center(group) + except Exception as e: + logger.error("Identity Center sync failed for %s: %s", name, e) + gr["identity_center"] = {"error": str(e)[:200]} + + return name, gr + + +def _sync_one_access_group(ag, group_email_map, access_token): + """Sync a single access group (nested group membership). Returns (name, result_dict).""" + ag_name = ag.get("name", "") + ag_google = ag.get("google", "") + role_groups = ag.get("groups", []) + if not ag_name or not ag_google: + return None, None + + ag_key = urllib.parse.quote(ag_google, safe="") + try: + existing_ag = _google_api("GET", f"/groups/{ag_key}", access_token) + if not existing_ag or existing_ag.get("already_exists"): + _google_api("POST", "/groups", access_token, { + "email": ag_google, + "name": ag_name, + "description": ag.get("description", f"Access group: {ag_name}"), + }) + logger.info("Created access group %s", ag_google) + + current_members = set() + page_token = None + while True: + path = f"/groups/{ag_key}/members?maxResults=200" + if page_token: + path += f"&pageToken={urllib.parse.quote(page_token, safe='')}" + resp = _google_api("GET", path, access_token) + if resp and "members" in resp: + for m in resp["members"]: + current_members.add(m["email"].lower()) + if resp and resp.get("nextPageToken"): + page_token = resp["nextPageToken"] + else: + break + + desired_members = set() + for rg in role_groups: + rg_email = group_email_map.get(rg) + if rg_email: + desired_members.add(rg_email.lower()) + else: + logger.warning("Access group %s references unknown group: %s", ag_name, rg) + + added = 0 + for email in desired_members - current_members: + _google_api( + "POST", f"/groups/{ag_key}/members", access_token, + {"email": email, "role": "MEMBER"}, + ) + added += 1 + + removed = 0 + managed_group_emails = {v.lower() for v in group_email_map.values()} + for email in current_members - desired_members: + # Only remove group emails (containing @java.no) that we manage. + # Preserve any directly-added individual members. + if email.endswith("@java.no") and email in managed_group_emails: + member_key = urllib.parse.quote(email, safe="") + _google_api("DELETE", f"/groups/{ag_key}/members/{member_key}", access_token) + removed += 1 + + logger.info( + "Synced access group %s: %d role groups (+%d/-%d)", + ag_google, len(desired_members), added, removed, + ) + return ag_name, { + "synced": True, + "added": added, + "removed": removed, + "total_groups": len(desired_members), + } + except Exception as e: + logger.error("Access group sync failed for %s: %s", ag_name, e) + return ag_name, {"error": str(e)[:200]} + + def handle_sync_groups_and_heros(event): """Handle the sync_groups_and_heros action — full hero + group reconciliation. Processes both groups and heroes in a single pass: - 1. Auto-create Google Workspace accounts for new heroes - 2. Manage email aliases for heroes - 3. Sync Google Workspace groups with resolved member lists - 4. Sync Cognito groups where flagged - 5. Sync Identity Center groups where flagged + 1. Auto-create Google Workspace accounts for new heroes (parallel) + 2. Manage email aliases for heroes (parallel) + 3. Sync Google Workspace groups with resolved member lists (parallel) + 4. Sync Cognito groups where flagged (per-group, inside Step 3) + 5. Sync Identity Center groups where flagged (per-group, inside Step 3) + 6. Sync access groups (nested role-group membership, parallel) + + Per-item work in each step is dispatched via ThreadPoolExecutor with + GROUP_SYNC_WORKERS workers, sized to stay within Google Directory API's + per-user quota (1,500 queries / 100 s). """ groups = event.get("groups", []) heros = event.get("heros", []) results = {} - # Step 1: Auto-create Google Workspace accounts for heroes + # Pre-fetch the OAuth token once so parallel workers don't all race to refresh it. access_token = _get_google_access_token() + + # Step 1: Auto-create Google Workspace accounts for heroes (parallel) accounts_created = [] accounts_failed = [] accounts_existed = [] - for hero in heros: - email = hero.get("javabin_google_email", "") - if not email: - continue - - user_key = urllib.parse.quote(email, safe="") - - # Check if account exists - existing_user = _google_api("GET", f"/users/{user_key}", access_token) - if existing_user and not existing_user.get("already_exists"): - accounts_existed.append(email) - logger.info("Google Workspace account already exists: %s", email) - else: - # Create the account - import secrets - import string - temp_password = "".join(secrets.choice(string.ascii_letters + string.digits + "!@#$%") for _ in range(24)) - - personal_email = hero.get("personal_email", "") - user_body = { - "primaryEmail": email, - "name": { - "givenName": hero.get("firstname", ""), - "familyName": hero.get("lastname", ""), - }, - "password": temp_password, - "changePasswordAtNextLogin": True, - "recoveryEmail": personal_email, - } - - try: - create_result = _google_api("POST", "/users", access_token, user_body) - if create_result and not create_result.get("already_exists"): - accounts_created.append(email) - logger.info("Created Google Workspace account: %s (recovery: %s)", email, personal_email) - - if personal_email: - # Set up auto-forwarding to personal email - try: - _setup_email_forwarding(email, personal_email) - except Exception as fe: - logger.warning("Could not set up forwarding %s → %s: %s", email, personal_email, fe) - - # Send welcome email to personal address via Gmail API - try: - pw_url = _generate_password_set_url(email) - _send_welcome_email( - access_token, email, personal_email, - hero.get("firstname", ""), password_set_url=pw_url, - ) - except Exception as we: - logger.warning("Could not send welcome email to %s: %s", personal_email, we) - else: - accounts_existed.append(email) - # Ensure forwarding is set up for existing accounts too - if personal_email: - try: - _setup_email_forwarding(email, personal_email) - except Exception as fe: - logger.warning("Could not set up forwarding %s → %s: %s", email, personal_email, fe) - except Exception as e: - logger.error("Failed to create account %s: %s", email, e) - accounts_failed.append({"email": email, "error": str(e)[:200]}) + with ThreadPoolExecutor(max_workers=GROUP_SYNC_WORKERS) as executor: + for r in executor.map(lambda h: _sync_hero_account(h, access_token), heros): + status = r.get("status") + if status == "created": + accounts_created.append(r["email"]) + elif status == "existed": + accounts_existed.append(r["email"]) + elif status == "failed": + accounts_failed.append({"email": r["email"], "error": r.get("error", "")}) results["accounts"] = { "created": accounts_created, @@ -1474,223 +1693,51 @@ def handle_sync_groups_and_heros(event): "failed": accounts_failed, } - # Step 2: Manage email aliases + # Step 2: Manage email aliases (parallel) aliases_created = [] aliases_failed = [] - for hero in heros: - alias = hero.get("alias", "") - email = hero.get("javabin_google_email", "") - if not alias or not email: - continue - - user_key = urllib.parse.quote(email, safe="") - - # Check if alias already exists - try: - existing_aliases = _google_api("GET", f"/users/{user_key}/aliases", access_token) - existing_alias_emails = set() - if existing_aliases and "aliases" in existing_aliases: - existing_alias_emails = {a["alias"].lower() for a in existing_aliases["aliases"]} - - if alias.lower() not in existing_alias_emails: - alias_result = _google_api( - "POST", f"/users/{user_key}/aliases", access_token, {"alias": alias} - ) - if alias_result: - aliases_created.append({"user": email, "alias": alias}) - logger.info("Created alias %s for %s", alias, email) - except Exception as e: - logger.error("Failed to create alias %s for %s: %s", alias, email, e) - aliases_failed.append({"user": email, "alias": alias, "error": str(e)[:200]}) + with ThreadPoolExecutor(max_workers=GROUP_SYNC_WORKERS) as executor: + for r in executor.map(lambda h: _sync_hero_alias(h, access_token), heros): + if not r: + continue + if r["status"] == "created": + aliases_created.append({"user": r["user"], "alias": r["alias"]}) + elif r["status"] == "failed": + aliases_failed.append({ + "user": r["user"], "alias": r["alias"], "error": r.get("error", ""), + }) results["aliases"] = { "created": aliases_created, "failed": aliases_failed, } - # Step 3: Sync Google Workspace groups with resolved member lists - for group in groups: - name = group.get("name") - google_email = group.get("google") - members = group.get("members", []) - - if not name or not google_email: - continue - - logger.info("Syncing group: %s (%s) with %d members", name, google_email, len(members)) - gr = {} - - # Sync Google Workspace group - try: - group_key = urllib.parse.quote(google_email, safe="") - group_body = { - "email": google_email, - "name": name, - "description": f"javaBin group: {name}", - } - - existing = _google_api("GET", f"/groups/{group_key}", access_token) - if existing and not existing.get("already_exists"): - # Group exists — do NOT update name/description/email. - # Updating existing groups can rename unrelated groups that - # share a display name or alias (e.g. jz-ledelsen → javazone). - logger.info("Google group %s already exists — skipping metadata update", google_email) - else: - _google_api("POST", "/groups", access_token, group_body) - - # Paginated current member list - current_emails = set() - page_token = None - while True: - path = f"/groups/{group_key}/members?maxResults=200" - if page_token: - path += f"&pageToken={urllib.parse.quote(page_token, safe='')}" - resp = _google_api("GET", path, access_token) - if resp and "members" in resp: - for m in resp["members"]: - current_emails.add(m["email"].lower()) - if resp and resp.get("nextPageToken"): - page_token = resp["nextPageToken"] - else: - break - - # Desired members - desired_emails = {m.lower() for m in members if m} - - # Add missing members - added = 0 - for email in desired_emails - current_emails: - _google_api( - "POST", f"/groups/{group_key}/members", access_token, - {"email": email, "role": "MEMBER"}, - ) - added += 1 - - # Additive only — never remove members automatically. - # Members added manually via Google Admin UI are preserved. - # To remove members, use a separate explicit process. - extra = current_emails - desired_emails - if extra: - logger.info( - "Group %s has %d members not in YAML (preserved): %s", - google_email, len(extra), ", ".join(sorted(extra)[:5]), - ) - - gr["google"] = { - "synced": True, - "group": google_email, - "member_count": len(desired_emails | current_emails), - "added": added, - "extra_preserved": len(extra), - } - except Exception as e: - logger.error("Google group sync failed for %s: %s", name, e) - gr["google"] = {"error": str(e)[:200]} - - # Step 4: Sync Cognito if flagged - if group.get("cognito"): - try: - gr["cognito"] = sync_group_to_cognito(group) - except Exception as e: - logger.error("Cognito sync failed for %s: %s", name, e) - gr["cognito"] = {"error": str(e)[:200]} - - # Step 5: Sync Identity Center if flagged - if group.get("identity_center"): - try: - gr["identity_center"] = sync_group_to_identity_center(group) - except Exception as e: - logger.error("Identity Center sync failed for %s: %s", name, e) - gr["identity_center"] = {"error": str(e)[:200]} - - results[name] = gr + # Step 3 (+ 4, 5): Sync Google groups, Cognito, Identity Center per group (parallel) + with ThreadPoolExecutor(max_workers=GROUP_SYNC_WORKERS) as executor: + futures = [executor.submit(_sync_one_group, group, access_token) for group in groups] + for future in as_completed(futures): + name, gr = future.result() + if name: + results[name] = gr - # Step 6: Sync access groups (nested group membership for Workspace service access) + # Step 6: Sync access groups (depends on Step 3 — Google groups must exist first) access_groups = event.get("access_groups", []) access_results = {} if access_groups: - # Build a map of group name → google email from the groups we just synced - group_email_map = {g.get("name"): g.get("google") for g in groups if g.get("name") and g.get("google")} - - for ag in access_groups: - ag_name = ag.get("name", "") - ag_google = ag.get("google", "") - role_groups = ag.get("groups", []) - if not ag_name or not ag_google: - continue - - ag_key = urllib.parse.quote(ag_google, safe="") - try: - # Ensure the access group exists - existing_ag = _google_api("GET", f"/groups/{ag_key}", access_token) - if not existing_ag or existing_ag.get("already_exists"): - _google_api("POST", "/groups", access_token, { - "email": ag_google, - "name": ag_name, - "description": ag.get("description", f"Access group: {ag_name}"), - }) - logger.info("Created access group %s", ag_google) - - # Get current members of the access group - current_members = set() - page_token = None - while True: - path = f"/groups/{ag_key}/members?maxResults=200" - if page_token: - path += f"&pageToken={urllib.parse.quote(page_token, safe='')}" - resp = _google_api("GET", path, access_token) - if resp and "members" in resp: - for m in resp["members"]: - current_members.add(m["email"].lower()) - if resp and resp.get("nextPageToken"): - page_token = resp["nextPageToken"] - else: - break - - # Desired: role group emails as members (nested groups) - desired_members = set() - for rg in role_groups: - rg_email = group_email_map.get(rg) - if rg_email: - desired_members.add(rg_email.lower()) - else: - logger.warning("Access group %s references unknown group: %s", ag_name, rg) - - # Add missing group members - added = 0 - for email in desired_members - current_members: - _google_api( - "POST", f"/groups/{ag_key}/members", access_token, - {"email": email, "role": "MEMBER"}, - ) - added += 1 - - # Remove groups no longer in the access list - removed = 0 - for email in current_members - desired_members: - # Only remove group emails (containing @java.no) that are role groups - # Preserve any directly-added individual members - if email.endswith("@java.no") and email in { - v.lower() for v in group_email_map.values() - }: - member_key = urllib.parse.quote(email, safe="") - _google_api("DELETE", f"/groups/{ag_key}/members/{member_key}", access_token) - removed += 1 - - access_results[ag_name] = { - "synced": True, - "added": added, - "removed": removed, - "total_groups": len(desired_members), - } - logger.info( - "Synced access group %s: %d role groups (+%d/-%d)", - ag_google, len(desired_members), added, removed, - ) - except Exception as e: - logger.error("Access group sync failed for %s: %s", ag_name, e) - access_results[ag_name] = {"error": str(e)[:200]} + group_email_map = { + g.get("name"): g.get("google") + for g in groups if g.get("name") and g.get("google") + } + with ThreadPoolExecutor(max_workers=GROUP_SYNC_WORKERS) as executor: + futures = [ + executor.submit(_sync_one_access_group, ag, group_email_map, access_token) + for ag in access_groups + ] + for future in as_completed(futures): + name, gr = future.result() + if name: + access_results[name] = gr if access_results: results["_access_groups"] = access_results