Set up comprehensive monitoring system with CloudWatch Alarms, Dashboard and SNS notifications to track Identity Governance metrics.
Lambda-AccessCertification-Errors
IdentityGovernanceDashboard
CustomMetricsPublisher
import json
import boto3
from datetime import datetime, timedelta
from boto3.dynamodb.conditions import Key
def lambda_handler(event, context):
print("Custom Metrics Publisher Started")
# Initialize AWS clients
dynamodb = boto3.resource('dynamodb')
cloudwatch = boto3.client('cloudwatch')
try:
# Get risk assessment data
risk_metrics = get_risk_metrics(dynamodb)
# Get certification metrics
cert_metrics = get_certification_metrics(dynamodb)
# Publish custom metrics
publish_metrics(cloudwatch, risk_metrics, cert_metrics)
return {
'statusCode': 200,
'body': json.dumps('Custom metrics published successfully')
}
except Exception as e:
print(f'Error publishing metrics: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps(f'Error: {str(e)}')
}
def get_risk_metrics(dynamodb):
"""Get risk assessment metrics from DynamoDB"""
table = dynamodb.Table('RiskAssessments')
# Get latest user risk assessments
response = table.scan(
FilterExpression='AssessmentType = :type',
ExpressionAttributeValues={':type': 'User Risk Assessment'}
)
risk_levels = {'LOW': 0, 'MEDIUM': 0, 'HIGH': 0, 'CRITICAL': 0}
for item in response['Items']:
risk_level = item.get('RiskLevel', 'LOW')
if risk_level in risk_levels:
risk_levels[risk_level] += 1
return risk_levels
def get_certification_metrics(dynamodb):
"""Get certification metrics from DynamoDB"""
table = dynamodb.Table('AccessCertifications')
response = table.scan()
total_certifications = len(response['Items'])
recent_certifications = 0
# Count recent certifications (last 30 days)
thirty_days_ago = (datetime.now() - timedelta(days=30)).isoformat()
for item in response['Items']:
cert_date = item.get('CertificationDate', '')
if cert_date > thirty_days_ago:
recent_certifications += 1
return {
'total': total_certifications,
'recent': recent_certifications
}
def publish_metrics(cloudwatch, risk_metrics, cert_metrics):
"""Publish custom metrics to CloudWatch"""
# Publish risk metrics
for risk_level, count in risk_metrics.items():
cloudwatch.put_metric_data(
Namespace='IdentityGovernance',
MetricData=[
{
'MetricName': f'{risk_level}RiskUserCount',
'Value': count,
'Unit': 'Count',
'Timestamp': datetime.now()
}
]
)
# Publish certification metrics
cloudwatch.put_metric_data(
Namespace='IdentityGovernance',
MetricData=[
{
'MetricName': 'TotalCertifications',
'Value': cert_metrics['total'],
'Unit': 'Count',
'Timestamp': datetime.now()
},
{
'MetricName': 'RecentCertifications',
'Value': cert_metrics['recent'],
'Unit': 'Count',
'Timestamp': datetime.now()
}
]
)
print(f"Published metrics: Risk={risk_metrics}, Cert={cert_metrics}")
After completion:
Proceed to 8. Operational Procedures to set up daily operational procedures.