Running a Mass Assign Grouping Event for Large Amount of Users

We are running into resource limits when trying to update user group assignments in mass. We traditionally use Lambda triggers to process grouping. However we need to backdate a large amount of users (37k) across a few thousand groups (9.7k) so Lambda will be slow and require multiple instances to run. What is an Ideal Solution to use?

1 Like

To answer this use case and process quickly we are using a tool that we own that will use EMR and EC2 resources to run a Python script to process this in QuickSight APIs. This will process 500 requests with 10 workers in threadpooling and a batch size of 50 assignments per worker.

Summary of Parallel Batch Processing for QuickSight Group Membership Assignments

This approach involves processing QuickSight group membership assignments in parallel batches, significantly improving efficiency and scalability. The script is designed to handle large datasets by splitting the assignments into manageable batches and processing them concurrently.

The Code (with Comments Added):

import sys
import boto3
import csv
import os
from datetime import datetime, timedelta
from io import StringIO
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
from pyspark.sql import SparkSession

# Initialize Spark
spark = SparkSession.builder.appName("QuickSightGroupAssignment").getOrCreate()

date = datetime.now().strftime('%Y-%m-%d')
account_id = "<account_id>"  # Masked Account ID
namespace = "default"
region_name = 'us-east-1'

DEFAULT_ROLE_ARN = "arn:aws:iam::<account_id>:role/<role_name>"  # Masked Role ARN
os.environ["AWS_DEFAULT_REGION"] = region_name

s3 = boto3.client('s3')
sts = boto3.client('sts')

# Function to assume a role and return temporary credentials
# NOTE: This is necessary if running the script in an environment
#       not directly linked to the AWS account (e.g., a different account
#       with more resources, a local machine, or a third-party tool).
#       Ensure the necessary IAM policies are in place for role assumption.
def assume_role():
    stsresponse = sts.assume_role(
        RoleArn=DEFAULT_ROLE_ARN,
        RoleSessionName='newsession',
        DurationSeconds=43200
    )
    return stsresponse['Credentials']

# Create a QuickSight client using the assumed role credentials
def create_qs_client(credentials):
    return boto3.client(
        'quicksight',
        region_name=region_name,
        aws_access_key_id=credentials['AccessKeyId'],
        aws_secret_access_key=credentials['SecretAccessKey'],
        aws_session_token=credentials['SessionToken']
    )

# Initial credentials and client setup
credentials = assume_role()
quicksight = create_qs_client(credentials)

def refresh_credentials_if_expired(func):
    def wrapper(*args, **kwargs):
        global credentials
        global quicksight
        try:
            return func(*args, **kwargs)
        except quicksight.exceptions.ExpiredTokenException:
            print("Token expired, refreshing credentials...")
            credentials = assume_role()
            quicksight = create_qs_client(credentials)
            return func(*args, **kwargs)
    return wrapper

@refresh_credentials_if_expired
def create_group_membership(groupname, username):
    try:
        response = quicksight.create_group_membership(
            AwsAccountId=account_id,
            Namespace=namespace,
            GroupName=groupname,
            MemberName=username
        )
        return f"Group assignment for {username} to {groupname} successfully created."
    except quicksight.exceptions.ThrottlingException:
        raise
    except Exception as e:
        return f"Failed to assign {username} to {groupname}: {e}"

def process_csv_data():
    csv_file = s3.get_object(Bucket='my-bucket-name', Key=f'{date}')  # Masked S3 Bucket
    reader = csv.reader(csv_file['Body'].read().decode('utf-8-sig').splitlines())
    next(reader)  # Skip header
    rows = list(reader)
    return rows

def batch_generator(iterable, batch_size):
    for i in range(0, len(iterable), batch_size):
        yield iterable[i:i + batch_size]

@refresh_credentials_if_expired
def process_data(batch_size=50):
    rows = process_csv_data()

    def process_batch(batch):
        results = []
        for row in batch:
            groupname = row[1]
            username = row[2]
            retries = 0
            max_retries = 10

            while retries < max_retries:
                try:
                    result = create_group_membership(groupname, username)
                    results.append(result)
                    break
                except quicksight.exceptions.ThrottlingException:
                    retries += 1
                    wait_time = min(2 ** retries + random.uniform(0, 1), 60)  # Exponential backoff with jitter
                    print(f"ThrottlingException: retrying in {wait_time:.2f} seconds...")
                    time.sleep(wait_time)
            else:
                results.append(f"Max retries exceeded for {username} in group {groupname}")
        return results

    with ThreadPoolExecutor(max_workers=10) as executor:
        batch_futures = [executor.submit(process_batch, batch) for batch in batch_generator(rows, batch_size)]
        for future in as_completed(batch_futures):
            batch_results = future.result()
            for result in batch_results:
                print(result)

# Call the process_data function to start processing with parallel batch execution
if __name__ == "__main__":
    process_data(batch_size=50)

Notes:

  • Running in a Different Environment (e.g., another AWS account, local, or third-party tool):
    • Role Assumption: The assume_role() function is crucial if the script is run in an environment that doesn’t have direct access to the AWS account where QuickSight resides. For example, if you’re running the script in a different AWS account with more resources or on a local machine, you’ll need to assume a role in the target account.
    • IAM Policy Setup: Ensure the IAM user or role executing the script has the necessary permissions (sts:AssumeRole, quicksight:CreateGroupMembership, s3:GetObject). These permissions are required for role assumption and performing the operations in QuickSight and S3.
  • If Role Assumption is Not Needed:
    • Remove assume_role() Function: If the script runs within the same AWS account or environment that already has the necessary permissions, you can remove the assume_role() function and the related role assumption logic.
    • Directly Initialize QuickSight Client: Instead of using the assumed credentials, directly initialize the QuickSight client using the default AWS credentials:
quicksight = boto3.client('quicksight', region_name=region_name)
2 Likes