from datetime import datetime from typing import Dict, Any, Final, Optional import os import time from dataclasses import dataclass import menu import schedule import slack # Load environment variables (recommended for sensitive data) # SLACK_WEBHOOK_URL = os.environ['SLACK_WEBHOOK_URL'] #EXTERNAL_API_URL = os.environ['EXTERNAL_API_URL'] PARAM_ERROR: Final[Dict[str, str]] = { 'statusCode': '200', 'error': 'wrong parameter' } MAX_RETRY: Final[int] = 5 RETRY_INTERVAL: Final[int] = 15 @dataclass class Param: slack_url: str date: str count: int def __parse_param(evt: Dict[str, Any]) -> Optional[Param]: if 'slack_url' not in evt: return None try: date = evt['date'] except: date = datetime.now().strftime('%Y%m%d') cnt = evt.get('count') or 0 return Param(str(evt.get('slack_url')), date, cnt + 1) def retry(p: Param, lambda_arn: str, schedule_role_arn: str) -> Dict[str, str]: schedule.one_time_schedule(lambda_arn, schedule_role_arn, RETRY_INTERVAL, p.slack_url, p.date, p.count) return {'statusCode': '200', 'body': f'retry {p.count}'} def wrap_return(ret: bool) -> Dict[str, str]: return { 'statusCode': '200', 'body': 'success' if ret else 'fail' } def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, str]: os.environ['TZ'] = 'Asia/Seoul' time.tzset() if context.invoked_function_arn is None: raise Exception('None???') lambda_arn = str(context.invoked_function_arn) schedule_role_arn = os.environ['SCHEDULE_ROLE_ARN'] bucket_name = os.environ['S3_BUCKET_NAME'] param = __parse_param(event) if param is None: return PARAM_ERROR menus = menu.menu(param.date) menus_without_img = [ x for x in menus if x.menu_image is None ] print('menus', menus) print('menus_without_img', menus_without_img) print('param', param) if param.count <= MAX_RETRY and (len(menus) == 0 or len(menus_without_img) > 0): return retry(param, lambda_arn, schedule_role_arn) for each in menus: each.resize_image(bucket_name, param.date) return wrap_return(slack.send(param.slack_url, param.date, menus))