Analyze and monitor privilege usage to detect security risks, excessive permissions, and abnormal patterns through CloudTrail logs.
aws-cloudtrail-logs-xxx
)PrivilegeAnalyticsEngine
import json
import boto3
import gzip
from datetime import datetime, timedelta
from urllib.parse import unquote_plus
def lambda_handler(event, context):
print("Privilege Analytics Engine Started")
# Initialize AWS clients
s3 = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
# Get the object from the event
bucket = event['Records'][0]['s3']['bucket']['name']
key = unquote_plus(event['Records'][0]['s3']['object']['key'])
try:
# Download and decompress CloudTrail log
response = s3.get_object(Bucket=bucket, Key=key)
if key.endswith('.gz'):
content = gzip.decompress(response['Body'].read())
else:
content = response['Body'].read()
# Parse CloudTrail log
log_data = json.loads(content.decode('utf-8'))
# Analyze privilege usage
privilege_events = analyze_privilege_events(log_data['Records'])
# Store analysis results
store_analysis_results(privilege_events, dynamodb)
return {
'statusCode': 200,
'body': json.dumps(f'Processed {len(privilege_events)} privilege events')
}
except Exception as e:
print(f'Error processing {key}: {str(e)}')
return {
'statusCode': 500,
'body': json.dumps(f'Error: {str(e)}')
}
def analyze_privilege_events(records):
"""Analyze CloudTrail records for privilege usage patterns"""
privilege_events = []
high_privilege_actions = [
'CreateUser', 'DeleteUser', 'AttachUserPolicy', 'DetachUserPolicy',
'CreateRole', 'DeleteRole', 'AttachRolePolicy', 'DetachRolePolicy',
'PutUserPolicy', 'DeleteUserPolicy', 'PutRolePolicy', 'DeleteRolePolicy'
]
for record in records:
event_name = record.get('eventName', '')
if event_name in high_privilege_actions:
privilege_event = {
'eventTime': record.get('eventTime'),
'eventName': event_name,
'userIdentity': record.get('userIdentity', {}),
'sourceIPAddress': record.get('sourceIPAddress'),
'userAgent': record.get('userAgent'),
'awsRegion': record.get('awsRegion'),
'riskScore': calculate_risk_score(record)
}
privilege_events.append(privilege_event)
return privilege_events
def calculate_risk_score(record):
"""Calculate risk score for privilege event (1-10 scale)"""
base_score = 5
# High-risk actions
high_risk_actions = ['DeleteUser', 'DeleteRole', 'DetachUserPolicy']
if record.get('eventName') in high_risk_actions:
base_score += 3
# External IP access
source_ip = record.get('sourceIPAddress', '')
if not source_ip.startswith('10.') and not source_ip.startswith('172.') and not source_ip.startswith('192.168.'):
base_score += 2
# Console vs API access
user_agent = record.get('userAgent', '')
if 'console' not in user_agent.lower():
base_score += 1
return min(base_score, 10)
def store_analysis_results(privilege_events, dynamodb):
"""Store analysis results in DynamoDB"""
table = dynamodb.Table('RiskAssessments')
for event in privilege_events:
table.put_item(
Item={
'AssessmentId': f"privilege-{datetime.now().isoformat()}",
'EventTime': event['eventTime'],
'EventName': event['eventName'],
'UserIdentity': json.dumps(event['userIdentity']),
'SourceIP': event['sourceIPAddress'],
'RiskScore': event['riskScore'],
'AssessmentType': 'Privilege Analysis'
}
)
AWSLogs/
(optional).json.gz
PrivilegeAnalyticsDashboard
Note: This metric shows write activity to DynamoDB, indicating when new risk assessments are stored.
test-privilege-user
test-privilege-role
aws-cloudtrail-logs-xxx
)Privilege Analytics Engine Started
Processed X privilege events
test-user-2
If Lambda doesn’t run:
If no data in DynamoDB:
If Dashboard doesn’t show data:
After completion:
Proceed to 6. Risk Assessment to set up comprehensive risk assessment.