Perform final validation for the entire Identity Governance system, verify compliance with standards, and prepare for production deployment.
Check DynamoDB tables contain data:
Verify S3 buckets contain audit reports:
E2EValidationTest
import json
import boto3
from datetime import datetime, timedelta
def lambda_handler(event, context):
print("E2E Validation Test Started")
# Initialize AWS clients
dynamodb = boto3.resource('dynamodb')
lambda_client = boto3.client('lambda')
s3 = boto3.client('s3')
cloudwatch = boto3.client('cloudwatch')
test_results = {
'test_date': datetime.now().isoformat(),
'tests': []
}
try:
# Test 1: Data Integrity
test_results['tests'].append(test_data_integrity(dynamodb))
# Test 2: Lambda Functions
test_results['tests'].append(test_lambda_functions(lambda_client))
# Test 3: Audit Reports
test_results['tests'].append(test_audit_reports(s3))
# Test 4: Monitoring
test_results['tests'].append(test_monitoring_system(cloudwatch))
# Test 5: Compliance
test_results['tests'].append(test_compliance_validation(dynamodb))
# Calculate overall test result
passed_tests = sum(1 for test in test_results['tests'] if test['status'] == 'PASS')
total_tests = len(test_results['tests'])
test_results['summary'] = {
'total_tests': total_tests,
'passed_tests': passed_tests,
'failed_tests': total_tests - passed_tests,
'success_rate': (passed_tests / total_tests) * 100,
'overall_status': 'PASS' if passed_tests == total_tests else 'FAIL'
}
return {
'statusCode': 200,
'body': json.dumps(test_results, default=str)
}
except Exception as e:
print(f'Error in E2E validation: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps(f'Error: {str(e)}')
}
def test_data_integrity(dynamodb):
"""Test data integrity across DynamoDB tables"""
test_result = {
'test_name': 'Data Integrity Test',
'status': 'PASS',
'details': [],
'errors': []
}
try:
# Test AccessCertifications table
cert_table = dynamodb.Table('AccessCertifications')
cert_response = cert_table.scan(Limit=10)
if cert_response['Items']:
test_result['details'].append(f"AccessCertifications table: {len(cert_response['Items'])} records found")
else:
test_result['status'] = 'FAIL'
test_result['errors'].append("AccessCertifications table is empty")
# Test RiskAssessments table
risk_table = dynamodb.Table('RiskAssessments')
risk_response = risk_table.scan(Limit=10)
if risk_response['Items']:
test_result['details'].append(f"RiskAssessments table: {len(risk_response['Items'])} records found")
else:
test_result['status'] = 'FAIL'
test_result['errors'].append("RiskAssessments table is empty")
except Exception as e:
test_result['status'] = 'FAIL'
test_result['errors'].append(f"Data integrity test failed: {str(e)}")
return test_result
def test_lambda_functions(lambda_client):
"""Test all Lambda functions are working"""
test_result = {
'test_name': 'Lambda Functions Test',
'status': 'PASS',
'details': [],
'errors': []
}
functions_to_test = [
'AccessCertificationTrigger',
'PrivilegeAnalyticsEngine',
'RiskAssessmentEngine',
'CustomMetricsPublisher',
'DailyOperationsEngine',
'AuditReportGenerator'
]
for function_name in functions_to_test:
try:
response = lambda_client.get_function(FunctionName=function_name)
if response['Configuration']['State'] == 'Active':
test_result['details'].append(f"{function_name}: Active")
else:
test_result['status'] = 'FAIL'
test_result['errors'].append(f"{function_name}: Not active")
except lambda_client.exceptions.ResourceNotFoundException:
test_result['status'] = 'FAIL'
test_result['errors'].append(f"{function_name}: Function not found")
except Exception as e:
test_result['status'] = 'FAIL'
test_result['errors'].append(f"{function_name}: Error - {str(e)}")
return test_result
def test_audit_reports(s3):
"""Test audit reports are being generated"""
test_result = {
'test_name': 'Audit Reports Test',
'status': 'PASS',
'details': [],
'errors': []
}
try:
# Check for audit reports in S3
response = s3.list_objects_v2(
Bucket='identity-governance-reports',
Prefix='audit-reports/',
MaxKeys=10
)
if 'Contents' in response and response['Contents']:
test_result['details'].append(f"Found {len(response['Contents'])} audit reports")
# Check if reports are recent (within last 7 days)
recent_reports = 0
seven_days_ago = datetime.now() - timedelta(days=7)
for obj in response['Contents']:
if obj['LastModified'].replace(tzinfo=None) > seven_days_ago:
recent_reports += 1
test_result['details'].append(f"Recent reports (last 7 days): {recent_reports}")
else:
test_result['status'] = 'FAIL'
test_result['errors'].append("No audit reports found")
except Exception as e:
test_result['status'] = 'FAIL'
test_result['errors'].append(f"Audit reports test failed: {str(e)}")
return test_result
def test_monitoring_system(cloudwatch):
"""Test monitoring system is working"""
test_result = {
'test_name': 'Monitoring System Test',
'status': 'PASS',
'details': [],
'errors': []
}
try:
# Check for custom metrics
response = cloudwatch.list_metrics(
Namespace='IdentityGovernance'
)
if response['Metrics']:
test_result['details'].append(f"Found {len(response['Metrics'])} custom metrics")
else:
test_result['status'] = 'FAIL'
test_result['errors'].append("No custom metrics found")
# Check for alarms
alarms_response = cloudwatch.describe_alarms()
if alarms_response['MetricAlarms']:
test_result['details'].append(f"Found {len(alarms_response['MetricAlarms'])} CloudWatch alarms")
else:
test_result['details'].append("No CloudWatch alarms configured")
except Exception as e:
test_result['status'] = 'FAIL'
test_result['errors'].append(f"Monitoring system test failed: {str(e)}")
return test_result
def test_compliance_validation(dynamodb):
"""Test compliance validation"""
test_result = {
'test_name': 'Compliance Validation Test',
'status': 'PASS',
'details': [],
'errors': []
}
try:
# Check for recent compliance assessments
table = dynamodb.Table('RiskAssessments')
response = table.scan(
FilterExpression='AssessmentType = :type',
ExpressionAttributeValues={':type': 'Compliance Assessment'},
Limit=10
)
if response['Items']:
test_result['details'].append(f"Found {len(response['Items'])} compliance assessments")
else:
test_result['details'].append("No compliance assessments found - this is normal for new deployments")
# Check for user risk assessments
risk_response = table.scan(
FilterExpression='AssessmentType = :type',
ExpressionAttributeValues={':type': 'User Risk Assessment'},
Limit=5
)
if risk_response['Items']:
test_result['details'].append(f"Found {len(risk_response['Items'])} user risk assessments")
else:
test_result['details'].append("No user risk assessments found - ensure risk assessment engine is running")
except Exception as e:
test_result['status'] = 'FAIL'
test_result['errors'].append(f"Compliance validation test failed: {str(e)}")
return test_result
After completing the entire workshop:
Proceed to 11. Clean Resources to clean up AWS resources after completing the workshop.