팀원들과 오늘 업무에 대해서 나누었다.
stats count(*) as resource by detail.findings.0.Resources.0.Type AS resourceType
stats count(*) as SERVICE_COUNT by detail.findings.0.ProductName AS SERVICE
stats count(*) as complianceStautsCount by detail.findings.0.Compliance.Status as complianceStatus | sort complianceStautsCount desc
stats count(*) as SEVERITY_COUNT by detail.findings.0.FindingProviderFields.Severity.Label AS SEVERITY
Grafana 가시화 부분도 로그만 띄우는게 아니라 직접적인 보안 정보를 볼 수 있도록 수정했다.
SecurityHub의 리소스, 취약점, 준수 현황, 라벨을 나누어 시각화 하였다.
import gzip
import json
import base64
import urllib.request
import datetime
import pytz
import re
import os
from base64 import b64decode
import boto3
def send_email(message):
CHARSET = "UTF-8"
ses_client = boto3.client('ses', region_name='ap-northeast-2')
sender_email = os.environ['SENDER_EMAIL']
recipient_emails = os.environ['RECIPIENT_EMAILS'].split(',')
message = message.replace("\\n>", "</p><p>").replace(">*","<h1>").replace("*</p>","</h1>").replace(" `", "<span style=\\"color:red;font-weight:bold;\\">").replace("`", "</span>")
response = ses_client.send_email(
Source=sender_email,
Destination={
'ToAddresses': recipient_emails
},
Message={
'Subject': {
'Data': "[🚨SECURITY NOTIFICATION🚨]"
},
'Body': {
'Html': {
'Charset': CHARSET,
'Data': message
}
}
}
)
# slack에 메시지 전송
def post_slack(argStr):
message = argStr
send_data = {
"text": message,
}
send_text = json.dumps(send_data)
slack_url = os.environ['SLACK_URL']
request = urllib.request.Request(
slack_url,
data=send_text.encode('utf-8'),
)
with urllib.request.urlopen(request) as response:
message = response.read()
def lambda_handler(event, context):
if 'awslogs' in event:
cw_data = event['awslogs']['data']
# base64 디코딩
compressed_payload = base64.b64decode(cw_data)
uncompressed_payload = gzip.decompress(compressed_payload).decode('utf-8')
payload = json.loads(uncompressed_payload)
log_events = payload['logEvents']
for log_event in log_events:
#print(f'[LogEvent]: {log_event}')
messages = log_event['message']
if messages is not None:
#print(messages)
messages = json.loads(messages)
detail = messages['detail']
if detail is not None:
#print(detail)
findings = detail['findings']
if findings is not None:
#print(findings)
for finding in findings:
# 보고 구성 항목
## 주요 항목
types = finding['Types'][0]
title = finding['Title']
severity_label = finding['Severity']['Label']
severity_score = finding['Severity']['Normalized']
targets = ""
for resource in finding['Resources']:
# 리소스 마스킹처리
pat = re.compile("(\\d{12})")
target = pat.sub("************", resource['Id'])
targets += f"{target} "
## 세부 항목
# 시간 한국 시간으로 변경
dt = datetime.datetime.strptime(finding['CreatedAt'][:19], '%Y-%m-%dT%H:%M:%S')
KST = pytz.timezone("Asia/Seoul")
time = dt.astimezone(KST)
productName = f"{finding['CompanyName']} {finding['ProductName']}"
compliance_status = finding['Compliance']['Status']
record_state = finding['RecordState']
workflow_status = finding['Workflow']['Status']
description = finding['Description']
# slack 메시지 생성
message = f">*[🚨SECURITY NOTIFICATION🚨]*\\n>취약점명: `{title}`\\n>심각도 수준: `{severity_label}({severity_score}점)`\\n>발생 리소스: `{target}`\\n>취약점 유형: `{types}`\\n>\\n>현지 발생 시각: {time} \\n> 탐지 서비스: {productName}\\n>준수 상태: {compliance_status}\\n>활성화 상태: {record_state}\\n>생성 상태: {workflow_status}\\n>상세 내용: {description}"
#print(message)
post_slack(message)
send_email(message)
else:
print("'awslogs' key not found in event data")
Lambda가 Slack뿐 아니라 Email까지 보내도록 수정했다.
Budgets을 통해서 예상 비용이 초과하면 이메일로 알람을 보내주도록 구현했다.