feat: trigger billing (#28335)
Signed-off-by: lyzno1 <yuanyouhuilyz@gmail.com> Co-authored-by: lyzno1 <yuanyouhuilyz@gmail.com> Co-authored-by: lyzno1 <92089059+lyzno1@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
@@ -9,7 +9,6 @@ from extensions.ext_database import db
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from libs.schedule_utils import calculate_next_run_at
|
||||
from models.trigger import AppTrigger, AppTriggerStatus, AppTriggerType, WorkflowSchedulePlan
|
||||
from services.workflow.queue_dispatcher import QueueDispatcherManager
|
||||
from tasks.workflow_schedule_tasks import run_schedule_trigger
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -29,7 +28,6 @@ def poll_workflow_schedules() -> None:
|
||||
|
||||
with session_factory() as session:
|
||||
total_dispatched = 0
|
||||
total_rate_limited = 0
|
||||
|
||||
# Process in batches until we've handled all due schedules or hit the limit
|
||||
while True:
|
||||
@@ -38,11 +36,10 @@ def poll_workflow_schedules() -> None:
|
||||
if not due_schedules:
|
||||
break
|
||||
|
||||
dispatched_count, rate_limited_count = _process_schedules(session, due_schedules)
|
||||
dispatched_count = _process_schedules(session, due_schedules)
|
||||
total_dispatched += dispatched_count
|
||||
total_rate_limited += rate_limited_count
|
||||
|
||||
logger.debug("Batch processed: %d dispatched, %d rate limited", dispatched_count, rate_limited_count)
|
||||
logger.debug("Batch processed: %d dispatched", dispatched_count)
|
||||
|
||||
# Circuit breaker: check if we've hit the per-tick limit (if enabled)
|
||||
if (
|
||||
@@ -55,8 +52,8 @@ def poll_workflow_schedules() -> None:
|
||||
)
|
||||
break
|
||||
|
||||
if total_dispatched > 0 or total_rate_limited > 0:
|
||||
logger.info("Total processed: %d dispatched, %d rate limited", total_dispatched, total_rate_limited)
|
||||
if total_dispatched > 0:
|
||||
logger.info("Total processed: %d dispatched", total_dispatched)
|
||||
|
||||
|
||||
def _fetch_due_schedules(session: Session) -> list[WorkflowSchedulePlan]:
|
||||
@@ -93,15 +90,12 @@ def _fetch_due_schedules(session: Session) -> list[WorkflowSchedulePlan]:
|
||||
return list(due_schedules)
|
||||
|
||||
|
||||
def _process_schedules(session: Session, schedules: list[WorkflowSchedulePlan]) -> tuple[int, int]:
|
||||
def _process_schedules(session: Session, schedules: list[WorkflowSchedulePlan]) -> int:
|
||||
"""Process schedules: check quota, update next run time and dispatch to Celery in parallel."""
|
||||
if not schedules:
|
||||
return 0, 0
|
||||
return 0
|
||||
|
||||
dispatcher_manager = QueueDispatcherManager()
|
||||
tasks_to_dispatch: list[str] = []
|
||||
rate_limited_count = 0
|
||||
|
||||
for schedule in schedules:
|
||||
next_run_at = calculate_next_run_at(
|
||||
schedule.cron_expression,
|
||||
@@ -109,12 +103,7 @@ def _process_schedules(session: Session, schedules: list[WorkflowSchedulePlan])
|
||||
)
|
||||
schedule.next_run_at = next_run_at
|
||||
|
||||
dispatcher = dispatcher_manager.get_dispatcher(schedule.tenant_id)
|
||||
if not dispatcher.check_daily_quota(schedule.tenant_id):
|
||||
logger.info("Tenant %s rate limited, skipping schedule_plan %s", schedule.tenant_id, schedule.id)
|
||||
rate_limited_count += 1
|
||||
else:
|
||||
tasks_to_dispatch.append(schedule.id)
|
||||
tasks_to_dispatch.append(schedule.id)
|
||||
|
||||
if tasks_to_dispatch:
|
||||
job = group(run_schedule_trigger.s(schedule_id) for schedule_id in tasks_to_dispatch)
|
||||
@@ -124,4 +113,4 @@ def _process_schedules(session: Session, schedules: list[WorkflowSchedulePlan])
|
||||
|
||||
session.commit()
|
||||
|
||||
return len(tasks_to_dispatch), rate_limited_count
|
||||
return len(tasks_to_dispatch)
|
||||
|
||||
Reference in New Issue
Block a user