어제 성공하지 못한 securityhub의 로그를 slack으로 보내는 아키텍처 구현을 진행했다.
본래 아키텍처에서는 EventBridge - SNS 를 통해 Lambda가 트리거 되어 함수가 작동했지만
CloudWatch의 구독필터를 이용해 바로 Lambda가 트리거 될 수 있도록, 데이터가 필터링 되어 넘어올 수 있도록 하였다.
해서 아키텍처의 수정이 있었다.
결론부터 말하면 오늘도 성공하지 못했다.
마지막 프로젝트에 알고있는 언어를 사용해서 하면 되겠지만 대부분의 정보가 Python으로 되어있어 선택을 했는데 하나도 모르는 언어를 바로 사용하려고 하니 이해하는 부분도 적고 구현도 더딘 것 같다.
import zlib
import gzip
import json
import base64
from urllib.request import urlopen
from base64 import b64decode
def decode(data):
compressed_payload = b64decode(data)
json_payload = zlib.decompress(compressed_payload, 16+zlib.MAX_WBITS)
return json.loads(json_payload)
def post_slack(argStr):
message = argStr
send_data = {
"text": message,
}
send_text = json.dumps(send_data)
request = urllib.request.Request(
"<슬랙 훅 주소>",
data=send_text.encode('utf-8'),
)
with urllib.request.urlopen(request) as response:
slack_message = response.read()
def lambda_handler(event, context):
if 'awslogs' in event:
cw_data = event['awslogs']['data']
compressed_payload = base64.b64decode(cw_data)
uncompressed_payload = gzip.decompress(compressed_payload).decode('utf-8')
payload = json.loads(uncompressed_payload)
log_events = payload['logEvents']
for log_event in log_events:
print(f'LogEvent: {log_event}')
detail = log_event['message']
if detail is not None:
return
print(log_event['message'])
findings = log_eventdetail['findings']
if findings is not None:
return
for finding in findings:
ProductName = finding['ProductName']
Severity = finding['FindingProviderFields', {}]['Severity', {}]['Label']
description = finding['Description']
time = event['time']
message = "Time: {}\\nProductName: {}\\nLabel: {}\\nDescription: {}\\n".format(time, ProductName, Severity, description)
print(message)
post_slack('qoooook')
else:
print("'awslogs' key not found in event data")
코드의 수정이 있었다.
SecurityHub의 로그가 CloudWatch에 찍히는 것을 확인.
그 로그가 Lambda 함수로 전달 되는 것을 확인했지만 인코딩이 되어 있었음.
인코딩 된 로그를 디코딩 해서 로그에 프린드 후 확인.