ssg_today_menu_slack/main.py
bumpsoo 1b8b580554 기본 구조 완성. 파일 정리하면 끝. 일단 배포 완료
todo
- tf 파일의 slack_url 환경변수 등으로 정리
- 배포하는 스크립트? 필요할 것 같기도 함
2024-07-03 05:06:54 +09:00

70 lines
2.1 KiB
Python

from datetime import datetime
from typing import Dict, Any, Final, Optional
import os
import time
from dataclasses import dataclass
import menu
import schedule
import slack
# Load environment variables (recommended for sensitive data)
# SLACK_WEBHOOK_URL = os.environ['SLACK_WEBHOOK_URL']
#EXTERNAL_API_URL = os.environ['EXTERNAL_API_URL']
PARAM_ERROR: Final[Dict[str, str]] = {
'statusCode': '200', 'error': 'wrong parameter'
}
MAX_RETRY: Final[int] = 5
RETRY_INTERVAL: Final[int] = 15
@dataclass
class Param:
slack_url: str
date: str
count: int
def __parse_param(evt: Dict[str, Any]) -> Optional[Param]:
if 'slack_url' not in evt:
return None
try:
date = evt['date']
except:
date = datetime.now().strftime('%Y%m%d')
cnt = evt.get('count') or 0
return Param(str(evt.get('slack_url')), date, cnt + 1)
def retry(p: Param, lambda_arn: str, schedule_role_arn: str) -> Dict[str, str]:
schedule.one_time_schedule(lambda_arn, schedule_role_arn, RETRY_INTERVAL, p.slack_url, p.date, p.count)
return {'statusCode': '200', 'body': f'retry {p.count}'}
def wrap_return(ret: bool) -> Dict[str, str]:
return {
'statusCode': '200',
'body': 'success' if ret else 'fail'
}
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, str]:
os.environ['TZ'] = 'Asia/Seoul'
time.tzset()
if context.invoked_function_arn is None:
raise Exception('None???')
lambda_arn = str(context.invoked_function_arn)
schedule_role_arn = os.environ['SCHEDULE_ROLE_ARN']
bucket_name = os.environ['S3_BUCKET_NAME']
param = __parse_param(event)
if param is None:
return PARAM_ERROR
menus = menu.menu(param.date)
menus_without_img = [ x for x in menus if x.menu_image is None ]
print('menus', menus)
print('menus_without_img', menus_without_img)
print('param', param)
if param.count <= MAX_RETRY and (len(menus) == 0 or len(menus_without_img) > 0):
return retry(param, lambda_arn, schedule_role_arn)
for each in menus:
each.resize_image(bucket_name, param.date)
return wrap_return(slack.send(param.slack_url, param.date, menus))