node -> python 변경 진행중

This commit is contained in:
bumpsoo 2024-07-01 02:13:51 +09:00
commit 660f85f145
5 changed files with 285 additions and 0 deletions

79
main.py Normal file
View file

@ -0,0 +1,79 @@
from datetime import datetime
import json
from typing import Dict, Any, Final, List, Optional
import requests
import os
import time
from dataclasses import dataclass
import menu
import schedule
# Load environment variables (recommended for sensitive data)
# SLACK_WEBHOOK_URL = os.environ['SLACK_WEBHOOK_URL']
#EXTERNAL_API_URL = os.environ['EXTERNAL_API_URL']
PARAM_ERROR: Final[Dict[str, str]] = {
'statusCode': '200', 'error': 'wrong parameter'
}
MAX_RETRY: Final[int] = 5
RETRY_INTERVAL: Final[int] = 15
@dataclass
class Param:
slack_url: str
date: str
count: int
def __parse_param(evt: Dict[str, Any]) -> Optional[Param]:
if 'slack_url' not in evt:
return None
try:
date = evt['date']
except:
date = datetime.now().strftime('%Y%m%d')
cnt = evt.get('count') or 0
return Param(str(evt.get('slack_url')), date, cnt + 1)
def retry(p: Param, lambda_arn: str, schedule_role_arn: str) -> Dict[str, str]:
schedule.one_time_schedule(lambda_arn, schedule_role_arn, RETRY_INTERVAL, p)
return {'statusCode': '200', 'body': f'retry {p.count}'}
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, str]:
os.environ['TZ'] = 'Asia/Seoul'
time.tzset()
lambda_arn = os.environ['LAMBDA_ARN']
schedule_role_arn = os.environ['SCHEDULE_ROLE_ARN']
param = __parse_param(event)
if param is None:
return PARAM_ERROR
menus = menu.menu(param.date)
menus_without_img = [ x for x in menus if x.menu_image is None ]
if len(menus) == 0:
return retry(param, lambda_arn, schedule_role_arn)
menus_without_img: List[menu.Menu] = [ x for x in menus if x.menu_image is None ]
if len(menus_without_img) > 0 and param.count < MAX_RETRY:
return retry(param, lambda_arn, schedule_role_arn)
# Extract relevant data for your Slack message (adjust as needed)
message_content: str = f"*Important Data from External API*\n"
message_content += f"`\n{json.dumps(api_data, indent=4)}\n`"
# Construct payload for the Slack webhook
slack_payload: Dict[str, Any] = {
"text": message_content,
"blocks": [
{"type": "section", "text": {"type": "mrkdwn", "text": message_content}}
]
}
# Send the message to Slack
slack_response: requests.Response = requests.post(
'',
#SLACK_WEBHOOK_URL,
data=json.dumps(slack_payload),
headers={'Content-Type': 'application/json'}
)
slack_response.raise_for_status()
return {
'statusCode': '200',
'body': json.dumps('Data successfully sent to Slack.')
}