WorkBlog

Python 排程自動化教學:用 APScheduler 打造定時任務系統

劉芷晴
Python 排程自動化教學:用 APScheduler 打造定時任務系統

寫 Python 腳本很簡單,但要讓它「每天早上 8 點自動執行」就沒那麼直覺了。你可能想到用 crontab,但如果你的排程邏輯比較複雜(比如每隔 30 分鐘執行一次、但週末要暫停),crontab 就顯得力不從心了。

這就是 APScheduler(Advanced Python Scheduler)派上用場的時候。它是 Python 生態系中最成熟的排程套件,可以讓你在程式碼裡靈活地管理定時任務。之前我們介紹過Python Selenium 動態爬蟲教學,而爬蟲搭配排程,就能做出全自動的資料收集系統。

為什麼需要排程自動化

幾個常見的使用場景:

  • 定時爬蟲:每天抓取某網站的最新資料
  • 資料備份:每週自動備份資料庫
  • 報表生成:每月初自動產生上月的業績報表
  • 通知提醒:庫存低於門檻時自動發送 Slack 通知
  • 快取清理:每隔幾小時清理過期的快取資料

這些任務如果每次都要手動觸發,不僅麻煩還容易忘記。排程自動化的價值就在於:設定一次,自動執行,你去做更有價值的事

APScheduler 是什麼

APScheduler 是一個輕量但功能完整的 Python 排程框架。它的核心架構包含四個元件:

  • Trigger(觸發器):定義任務何時執行(時間規則)
  • Job Store(任務儲存):儲存排程任務的資訊(記憶體或資料庫)
  • Executor(執行器):實際執行任務的元件(ThreadPool 或 ProcessPool)
  • Scheduler(排程器):把上面三者串在一起的核心

相比其他排程方案(Celery、OS crontab),APScheduler 的優勢是簡單、純 Python、不依賴外部服務。不需要 Redis、不需要 RabbitMQ,import 進來就能用。

安裝與基本設定

pip install apscheduler

最簡單的使用範例:

from apscheduler.schedulers.blocking import BlockingScheduler

def my_task():
    print("任務執行了!")

scheduler = BlockingScheduler()
scheduler.add_job(my_task, 'interval', seconds=10)
scheduler.start()

這段程式碼會每 10 秒執行一次 my_taskBlockingScheduler 會阻塞主執行緒(適合排程是程式的唯一用途時使用)。

如果你的排程要跑在 Web 應用裡面,應該用 BackgroundScheduler

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
scheduler.add_job(my_task, 'interval', minutes=30)
scheduler.start()
# 主程式繼續做其他事...

三種排程觸發器詳解

1. Interval Trigger(間隔觸發)

每隔固定時間執行一次:

# 每 30 分鐘
scheduler.add_job(task, 'interval', minutes=30)

# 每 2 小時
scheduler.add_job(task, 'interval', hours=2)

# 每天(每 24 小時)
scheduler.add_job(task, 'interval', days=1)

2. Cron Trigger(Cron 表達式觸發)

最靈活的觸發器,語法類似 Linux crontab:

# 每天早上 8:30
scheduler.add_job(task, 'cron', hour=8, minute=30)

# 每週一到五的 9:00
scheduler.add_job(task, 'cron', day_of_week='mon-fri', hour=9)

# 每月 1 號的 00:00
scheduler.add_job(task, 'cron', day=1, hour=0)

# 每小時的第 0 分鐘
scheduler.add_job(task, 'cron', minute=0)

3. Date Trigger(日期觸發)

在指定的時間點執行一次:

from datetime import datetime

scheduler.add_job(task, 'date', run_date=datetime(2026, 4, 1, 10, 0, 0))

實戰範例:定時爬蟲排程

來做一個實際的例子——每天早上 9 點自動爬取某個網站的文章列表:

import requests
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime

def crawl_articles():
    print(f"[{datetime.now()}] 開始爬取文章...")
    try:
        resp = requests.get("https://example.com/api/articles", timeout=30)
        articles = resp.json()
        print(f"成功取得 {len(articles)} 篇文章")
        # 存到資料庫或檔案...
    except Exception as e:
        print(f"爬取失敗: {e}")

scheduler = BlockingScheduler(timezone="Asia/Taipei")
scheduler.add_job(
    crawl_articles,
    'cron',
    hour=9,
    minute=0,
    id='daily_crawl',
    name='每日文章爬取',
    misfire_grace_time=3600  # 錯過執行時間的容忍秒數
)

print("排程啟動中...")
scheduler.start()

注意 timezone 的設定很重要!沒設定的話,APScheduler 會用 UTC 時間。

任務持久化:重啟不遺失

預設情況下,APScheduler 的任務存在記憶體裡。程式一重啟,所有排程就消失了。如果你需要持久化,可以用 SQLAlchemy Job Store:

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}

scheduler = BackgroundScheduler(jobstores=jobstores)
scheduler.add_job(task, 'interval', hours=1, id='my_task', replace_existing=True)
scheduler.start()

replace_existing=True 很重要——它確保程式重啟時,如果任務已存在就更新而不是報錯。

錯誤處理與重試機制

生產環境中,任務失敗是常態。APScheduler 提供了幾個處理機制:

from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED

def job_listener(event):
    if event.exception:
        print(f"任務 {event.job_id} 執行失敗: {event.exception}")
        # 發送通知、記錄日誌等
    else:
        print(f"任務 {event.job_id} 執行成功")

scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

如果你需要自動重試,可以在任務函式裡自己實作:

import time

def resilient_task(max_retries=3):
    for attempt in range(max_retries):
        try:
            # 你的業務邏輯
            do_something()
            return  # 成功就結束
        except Exception as e:
            print(f"第 {attempt + 1} 次嘗試失敗: {e}")
            if attempt < max_retries - 1:
                time.sleep(5 * (attempt + 1))  # 遞增等待
    print("所有重試都失敗了")

搭配 FastAPI 建立排程 API

如果你想要透過 API 來管理排程(新增、暫停、刪除任務),可以搭配 FastAPI:

from fastapi import FastAPI
from apscheduler.schedulers.background import BackgroundScheduler

app = FastAPI()
scheduler = BackgroundScheduler()
scheduler.start()

@app.post("/jobs")
def add_job(task_name: str, interval_minutes: int):
    scheduler.add_job(
        my_task,
        'interval',
        minutes=interval_minutes,
        id=task_name,
        replace_existing=True
    )
    return {"status": "Job added", "id": task_name}

@app.delete("/jobs/{job_id}")
def remove_job(job_id: str):
    scheduler.remove_job(job_id)
    return {"status": "Job removed"}

@app.get("/jobs")
def list_jobs():
    jobs = scheduler.get_jobs()
    return [{"id": j.id, "next_run": str(j.next_run_time)} for j in jobs]

進階技巧與效能優化

  • 執行緒池大小:預設 ThreadPoolExecutor 的大小是 10。如果你有很多同時執行的任務,需要調大:
    from apscheduler.executors.pool import ThreadPoolExecutor
    executors = {'default': ThreadPoolExecutor(20)}
  • Coalescing:如果一個任務因為某些原因積壓了多次(比如程式當機後重啟),設定 coalesce=True 可以讓它只執行一次而不是執行所有積壓的次數
  • Max Instances:設定 max_instances=1 確保同一個任務不會同時有多個實例在執行
  • Jitter:設定 jitter=60 給執行時間加上隨機偏移,避免所有任務在同一秒觸發造成瞬間負載

結語

APScheduler 是 Python 排程自動化的瑞士刀——簡單上手、功能完整、不依賴外部服務。從簡單的定時爬蟲到複雜的企業級排程系統,它都能勝任。

如果你的需求更複雜(分散式排程、任務依賴關係),可以考慮升級到 Celery + Celery Beat。但對大部分場景來說,APScheduler 就夠了。先用簡單的工具解決問題,等真的遇到瓶頸再升級,這是最務實的做法。

劉芷晴

AI 研究員與資料科學家,專注於 NLP 與 LLM 應用。

機器學習NLPLLM資料分析

繼續閱讀

Python Selenium 動態爬蟲教學:從入門到自動化實戰

Python Selenium 動態爬蟲教學:從入門到自動化實戰

相關文章

你可能也喜歡

探索其他領域的精選好文