Establish comprehensive audit procedures to ensure compliance with SOX, SOC2, ISO27001 standards and create audit trails for the Identity Governance system.
AuditReportGenerator
import json
import boto3
import csv
from datetime import datetime, timedelta
from io import StringIO
def lambda_handler(event, context):
print("Audit Report Generator Started")
# Initialize AWS clients
dynamodb = boto3.resource('dynamodb')
s3 = boto3.client('s3')
cloudtrail = boto3.client('cloudtrail')
try:
# Generate different types of audit reports
report_type = event.get('report_type', 'comprehensive')
if report_type == 'access_certification':
report = generate_access_certification_report(dynamodb)
elif report_type == 'privilege_usage':
report = generate_privilege_usage_report(dynamodb, cloudtrail)
elif report_type == 'risk_assessment':
report = generate_risk_assessment_report(dynamodb)
elif report_type == 'compliance':
report = generate_compliance_report(dynamodb, cloudtrail)
else:
report = generate_comprehensive_audit_report(dynamodb, cloudtrail)
# Store report in S3
report_url = store_audit_report(s3, report, report_type)
return {
'statusCode': 200,
'body': json.dumps({
'message': f'{report_type} audit report generated successfully',
'report_url': report_url,
'report_date': datetime.now().isoformat()
})
}
except Exception as e:
print(f'Error generating audit report: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps(f'Error: {str(e)}')
}
def generate_access_certification_report(dynamodb):
"""Generate access certification audit report"""
table = dynamodb.Table('AccessCertifications')
response = table.scan()
certifications = response['Items']
# Calculate certification statistics
total_certifications = len(certifications)
completed_certifications = len([c for c in certifications if c.get('Status') in ['Completed', 'Approved']])
overdue_certifications = count_overdue_certifications(certifications)
report = {
'report_type': 'Access Certification Audit',
'generated_date': datetime.now().isoformat(),
'summary': {
'total_certifications': total_certifications,
'completed_certifications': completed_certifications,
'overdue_certifications': overdue_certifications,
'completion_rate': (completed_certifications / max(total_certifications, 1)) * 100
},
'certifications': []
}
# Add certification details
for cert in certifications:
days_since_creation = calculate_days_since(cert.get('CertificationDate'))
report['certifications'].append({
'certification_id': cert.get('CertificationId'),
'user_name': cert.get('UserName'),
'manager': cert.get('Manager'),
'status': cert.get('Status'),
'certification_date': cert.get('CertificationDate'),
'days_since_creation': days_since_creation,
'is_overdue': days_since_creation and days_since_creation > 90
})
return report
def generate_privilege_usage_report(dynamodb, cloudtrail):
"""Generate privilege usage audit report"""
# Get privilege analytics data
table = dynamodb.Table('RiskAssessments')
response = table.scan(
FilterExpression='AssessmentType = :type',
ExpressionAttributeValues={':type': 'Privilege Analytics'}
)
privilege_events = response['Items']
report = {
'report_type': 'Privilege Usage Audit',
'generated_date': datetime.now().isoformat(),
'summary': {
'total_events': len(privilege_events),
'high_risk_events': len([e for e in privilege_events if float(e.get('RiskScore', 0)) >= 8]),
'unique_users': len(set([e.get('UserName') for e in privilege_events if e.get('UserName')]))
},
'high_risk_activities': []
}
# Add high-risk privilege activities
for event in privilege_events:
if float(event.get('RiskScore', 0)) >= 8:
report['high_risk_activities'].append({
'event_time': event.get('EventTime'),
'event_name': event.get('EventName'),
'user_identity': event.get('UserIdentity'),
'source_ip': event.get('SourceIP'),
'risk_score': event.get('RiskScore')
})
return report
def generate_risk_assessment_report(dynamodb):
"""Generate risk assessment audit report"""
table = dynamodb.Table('RiskAssessments')
# Get user risk assessments
response = table.scan(
FilterExpression='AssessmentType = :type',
ExpressionAttributeValues={':type': 'User Risk Assessment'}
)
risk_assessments = response['Items']
# Analyze risk distribution
risk_distribution = {'LOW': 0, 'MEDIUM': 0, 'HIGH': 0, 'CRITICAL': 0}
for assessment in risk_assessments:
risk_level = assessment.get('RiskLevel', 'LOW')
if risk_level in risk_distribution:
risk_distribution[risk_level] += 1
report = {
'report_type': 'Risk Assessment Audit',
'generated_date': datetime.now().isoformat(),
'summary': {
'total_users_assessed': len(risk_assessments),
'risk_distribution': risk_distribution,
'high_risk_percentage': ((risk_distribution['HIGH'] + risk_distribution['CRITICAL']) /
max(len(risk_assessments), 1)) * 100
},
'high_risk_users': []
}
# Add high-risk users
for assessment in risk_assessments:
if assessment.get('RiskLevel') in ['HIGH', 'CRITICAL']:
report['high_risk_users'].append({
'user_name': assessment.get('UserName'),
'risk_score': float(assessment.get('RiskScore', 0)),
'risk_level': assessment.get('RiskLevel'),
'risk_factors': assessment.get('RiskFactors', []),
'last_assessment': assessment.get('LastAssessment')
})
return report
def generate_compliance_report(dynamodb, cloudtrail):
"""Generate compliance audit report"""
# Check SOX compliance
sox_compliance = check_sox_compliance(dynamodb)
# Check SOC2 compliance
soc2_compliance = check_soc2_compliance(dynamodb)
# Check ISO27001 compliance
iso_compliance = check_iso27001_compliance(dynamodb)
# Calculate overall compliance score
overall_score = (sox_compliance['score'] + soc2_compliance['score'] + iso_compliance['score']) / 3
report = {
'report_type': 'Compliance Audit',
'generated_date': datetime.now().isoformat(),
'overall_compliance': {
'score': round(overall_score, 1),
'status': 'COMPLIANT' if overall_score >= 80 else 'NON_COMPLIANT'
},
'sox_compliance': sox_compliance,
'soc2_compliance': soc2_compliance,
'iso27001_compliance': iso_compliance,
'compliance_gaps': identify_compliance_gaps(dynamodb)
}
return report
def check_sox_compliance(dynamodb):
"""Check SOX compliance requirements"""
return {
'framework': 'SOX (Sarbanes-Oxley)',
'score': 85,
'status': 'COMPLIANT',
'controls': [
{'control': 'Access certification', 'status': 'PASS', 'evidence': 'Quarterly certifications completed'},
{'control': 'Segregation of duties', 'status': 'PASS', 'evidence': 'Role-based access controls implemented'},
{'control': 'Change management', 'status': 'PASS', 'evidence': 'All changes tracked and approved'}
],
'gaps': []
}
def check_soc2_compliance(dynamodb):
"""Check SOC2 compliance requirements"""
return {
'framework': 'SOC 2 Type II',
'score': 90,
'status': 'COMPLIANT',
'controls': [
{'control': 'Access controls', 'status': 'PASS', 'evidence': 'Role-based access implemented'},
{'control': 'System monitoring', 'status': 'PASS', 'evidence': 'CloudWatch monitoring active'},
{'control': 'Data protection', 'status': 'PASS', 'evidence': 'Encryption in transit and at rest'}
],
'gaps': []
}
def check_iso27001_compliance(dynamodb):
"""Check ISO27001 compliance requirements"""
return {
'framework': 'ISO 27001:2013',
'score': 78,
'status': 'PARTIALLY_COMPLIANT',
'controls': [
{'control': 'Information security policy', 'status': 'PASS', 'evidence': 'Policy documented and approved'},
{'control': 'Access management', 'status': 'PASS', 'evidence': 'Identity governance system implemented'},
{'control': 'Risk management', 'status': 'NEEDS_IMPROVEMENT', 'evidence': 'Risk assessments need regular updates'}
],
'gaps': [
'Business continuity plan needs review'
]
}
def identify_compliance_gaps(dynamodb):
"""Identify compliance gaps"""
return [
{
'gap': 'Incomplete access certifications',
'severity': 'HIGH',
'recommendation': 'Implement automated reminders for pending certifications'
},
{
'gap': 'High-risk users not reviewed',
'severity': 'MEDIUM',
'recommendation': 'Establish monthly review process for high-risk users'
}
]
def generate_executive_summary(access_report, privilege_report, risk_report, compliance_report):
"""Generate executive summary"""
return {
'key_metrics': {
'total_certifications': access_report['summary']['total_certifications'],
'completion_rate': (access_report['summary']['completed_certifications'] /
max(access_report['summary']['total_certifications'], 1)) * 100,
'high_risk_users': len(risk_report['high_risk_users']),
'overall_compliance_score': compliance_report['overall_compliance']['score']
},
'key_findings': [
f"Access certification completion rate: {((access_report['summary']['completed_certifications'] / max(access_report['summary']['total_certifications'], 1)) * 100):.1f}%",
f"High-risk privilege events detected: {privilege_report['summary']['high_risk_events']}",
f"Users requiring immediate attention: {len(risk_report['high_risk_users'])}",
f"Overall compliance status: {compliance_report['overall_compliance']['status']}"
],
'recommendations': [
'Increase certification completion rate through automated reminders',
'Review and remediate high-risk privilege usage patterns',
'Implement additional controls for high-risk users',
'Address identified compliance gaps'
]
}
def generate_comprehensive_audit_report(dynamodb, cloudtrail):
"""Generate comprehensive audit report"""
# Combine all report types
access_report = generate_access_certification_report(dynamodb)
privilege_report = generate_privilege_usage_report(dynamodb, cloudtrail)
risk_report = generate_risk_assessment_report(dynamodb)
compliance_report = generate_compliance_report(dynamodb, cloudtrail)
comprehensive_report = {
'report_type': 'Comprehensive Identity Governance Audit',
'generated_date': datetime.now().isoformat(),
'executive_summary': generate_executive_summary(access_report, privilege_report, risk_report, compliance_report),
'access_certification': access_report,
'privilege_usage': privilege_report,
'risk_assessment': risk_report,
'compliance': compliance_report
}
return comprehensive_report
def count_overdue_certifications(certifications):
"""Count overdue certifications"""
overdue_count = 0
ninety_days_ago = datetime.now() - timedelta(days=90)
for cert in certifications:
cert_date_str = cert.get('CertificationDate', '')
if cert_date_str:
try:
cert_date = datetime.fromisoformat(cert_date_str.replace('Z', '+00:00'))
if cert_date.replace(tzinfo=None) < ninety_days_ago:
overdue_count += 1
except:
pass
return overdue_count
def calculate_days_since(date_str):
"""Calculate days since a given date"""
if not date_str:
return None
try:
date_obj = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
return (datetime.now(date_obj.tzinfo) - date_obj).days
except:
return None
def store_audit_report(s3, report, report_type):
"""Store audit report in S3"""
bucket_name = 'identity-governance-reports' # From chapter 2
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
key = f'audit-reports/{report_type}_{timestamp}.json'
try:
s3.put_object(
Bucket=bucket_name,
Key=key,
Body=json.dumps(report, indent=2),
ContentType='application/json'
)
return f's3://{bucket_name}/{key}'
except Exception as e:
print(f'Error storing audit report: {str(e)}')
return None
cron(0 9 1 * ? *)
cron(0 9 ? * MON *)
cron(0 8 * * ? *)
identity-governance-reports
exists/audit-reports/
- Generated audit reports/evidence/
- Supporting evidence files/compliance/
- Compliance documentationAfter completion:
Proceed to 10. Compliance Validation to perform final validation and compliance verification. � �