ssg_today_menu_slack/schedule.py

46 lines
1.4 KiB
Python

import boto3
from datetime import datetime, timedelta
import json
def one_time_schedule(
lambda_arn: str,
schedule_role_arn: str,
minutes: int,
slack_url: str,
date: str,
count: int,
) -> str:
client = boto3.client('scheduler')
schedule_time = datetime.now() + timedelta(minutes = minutes)
schedule_expression = f"at({schedule_time.strftime('%Y-%m-%dT%H:%M:%S')})"
schedule_name = f"menu-trigger-{schedule_time.strftime('%Y%m%d%H%M%S')}"
target = {
'Arn': lambda_arn,
'RoleArn': schedule_role_arn,
'Input': json.dumps({
'slack_url': slack_url,
'date': date,
'count': count
}),
'RetryPolicy': {'MaximumRetryAttempts': 0}
}
rule_response = client.create_schedule(
Name = schedule_name,
ScheduleExpression = schedule_expression,
ScheduleExpressionTimezone = 'Asia/Seoul',
State = 'ENABLED',
FlexibleTimeWindow = {'Mode': 'OFF'},
Target = target
)
rule_response = client.update_schedule(
Name = schedule_name,
ScheduleExpression = schedule_expression,
ScheduleExpressionTimezone = 'Asia/Seoul',
State = 'ENABLED',
FlexibleTimeWindow = {'Mode': 'OFF'},
# 이 값 수정 위함
ActionAfterCompletion = 'DELETE',
Target = target
)
return rule_response['ScheduleArn']