from datetime import datetime import json from typing import Dict, Any, Final, List, Optional import requests import os import time from dataclasses import dataclass import menu import schedule # Load environment variables (recommended for sensitive data) # SLACK_WEBHOOK_URL = os.environ['SLACK_WEBHOOK_URL'] #EXTERNAL_API_URL = os.environ['EXTERNAL_API_URL'] PARAM_ERROR: Final[Dict[str, str]] = { 'statusCode': '200', 'error': 'wrong parameter' } MAX_RETRY: Final[int] = 5 RETRY_INTERVAL: Final[int] = 15 @dataclass class Param: slack_url: str date: str count: int def __parse_param(evt: Dict[str, Any]) -> Optional[Param]: if 'slack_url' not in evt: return None try: date = evt['date'] except: date = datetime.now().strftime('%Y%m%d') cnt = evt.get('count') or 0 return Param(str(evt.get('slack_url')), date, cnt + 1) def retry(p: Param, lambda_arn: str, schedule_role_arn: str) -> Dict[str, str]: schedule.one_time_schedule(lambda_arn, schedule_role_arn, RETRY_INTERVAL, p) return {'statusCode': '200', 'body': f'retry {p.count}'} def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, str]: os.environ['TZ'] = 'Asia/Seoul' time.tzset() lambda_arn = os.environ['LAMBDA_ARN'] schedule_role_arn = os.environ['SCHEDULE_ROLE_ARN'] param = __parse_param(event) if param is None: return PARAM_ERROR menus = menu.menu(param.date) menus_without_img = [ x for x in menus if x.menu_image is None ] if param.count < MAX_RETRY and (len(menus) == 0 or len(menus_without_img) > 0): return retry(param, lambda_arn, schedule_role_arn) if len(menus) == 0: # 슬랙 메시지 전송( 오늘의 메뉴 존재 X ) return retry(param, lambda_arn, schedule_role_arn) # Extract relevant data for your Slack message (adjust as needed) message_content: str = f"*Important Data from External API*\n" message_content += f"`\n{json.dumps(api_data, indent=4)}\n`" # Construct payload for the Slack webhook slack_payload: Dict[str, Any] = { "text": message_content, "blocks": [ {"type": "section", "text": {"type": "mrkdwn", "text": message_content}} ] } # Send the message to Slack slack_response: requests.Response = requests.post( '', #SLACK_WEBHOOK_URL, data=json.dumps(slack_payload), headers={'Content-Type': 'application/json'} ) slack_response.raise_for_status() return { 'statusCode': '200', 'body': json.dumps('Data successfully sent to Slack.') }