Python 排程自動化教學:用 APScheduler 打造定時任務系統
寫 Python 腳本很簡單,但要讓它「每天早上 8 點自動執行」就沒那麼直覺了。你可能想到用 crontab,但如果你的排程邏輯比較複雜(比如每隔 30 分鐘執行一次、但週末要暫停),crontab 就顯得力不從心了。
這就是 APScheduler(Advanced Python Scheduler)派上用場的時候。它是 Python 生態系中最成熟的排程套件,可以讓你在程式碼裡靈活地管理定時任務。之前我們介紹過Python Selenium 動態爬蟲教學,而爬蟲搭配排程,就能做出全自動的資料收集系統。
為什麼需要排程自動化
幾個常見的使用場景:
- 定時爬蟲:每天抓取某網站的最新資料
- 資料備份:每週自動備份資料庫
- 報表生成:每月初自動產生上月的業績報表
- 通知提醒:庫存低於門檻時自動發送 Slack 通知
- 快取清理:每隔幾小時清理過期的快取資料
這些任務如果每次都要手動觸發,不僅麻煩還容易忘記。排程自動化的價值就在於:設定一次,自動執行,你去做更有價值的事。
APScheduler 是什麼
APScheduler 是一個輕量但功能完整的 Python 排程框架。它的核心架構包含四個元件:
- Trigger(觸發器):定義任務何時執行(時間規則)
- Job Store(任務儲存):儲存排程任務的資訊(記憶體或資料庫)
- Executor(執行器):實際執行任務的元件(ThreadPool 或 ProcessPool)
- Scheduler(排程器):把上面三者串在一起的核心
相比其他排程方案(Celery、OS crontab),APScheduler 的優勢是簡單、純 Python、不依賴外部服務。不需要 Redis、不需要 RabbitMQ,import 進來就能用。
安裝與基本設定
pip install apscheduler
最簡單的使用範例:
from apscheduler.schedulers.blocking import BlockingScheduler
def my_task():
print("任務執行了!")
scheduler = BlockingScheduler()
scheduler.add_job(my_task, 'interval', seconds=10)
scheduler.start()
這段程式碼會每 10 秒執行一次 my_task。BlockingScheduler 會阻塞主執行緒(適合排程是程式的唯一用途時使用)。
如果你的排程要跑在 Web 應用裡面,應該用 BackgroundScheduler:
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.add_job(my_task, 'interval', minutes=30)
scheduler.start()
# 主程式繼續做其他事...
三種排程觸發器詳解
1. Interval Trigger(間隔觸發)
每隔固定時間執行一次:
# 每 30 分鐘
scheduler.add_job(task, 'interval', minutes=30)
# 每 2 小時
scheduler.add_job(task, 'interval', hours=2)
# 每天(每 24 小時)
scheduler.add_job(task, 'interval', days=1)
2. Cron Trigger(Cron 表達式觸發)
最靈活的觸發器,語法類似 Linux crontab:
# 每天早上 8:30
scheduler.add_job(task, 'cron', hour=8, minute=30)
# 每週一到五的 9:00
scheduler.add_job(task, 'cron', day_of_week='mon-fri', hour=9)
# 每月 1 號的 00:00
scheduler.add_job(task, 'cron', day=1, hour=0)
# 每小時的第 0 分鐘
scheduler.add_job(task, 'cron', minute=0)
3. Date Trigger(日期觸發)
在指定的時間點執行一次:
from datetime import datetime
scheduler.add_job(task, 'date', run_date=datetime(2026, 4, 1, 10, 0, 0))
實戰範例:定時爬蟲排程
來做一個實際的例子——每天早上 9 點自動爬取某個網站的文章列表:
import requests
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def crawl_articles():
print(f"[{datetime.now()}] 開始爬取文章...")
try:
resp = requests.get("https://example.com/api/articles", timeout=30)
articles = resp.json()
print(f"成功取得 {len(articles)} 篇文章")
# 存到資料庫或檔案...
except Exception as e:
print(f"爬取失敗: {e}")
scheduler = BlockingScheduler(timezone="Asia/Taipei")
scheduler.add_job(
crawl_articles,
'cron',
hour=9,
minute=0,
id='daily_crawl',
name='每日文章爬取',
misfire_grace_time=3600 # 錯過執行時間的容忍秒數
)
print("排程啟動中...")
scheduler.start()
注意 timezone 的設定很重要!沒設定的話,APScheduler 會用 UTC 時間。
任務持久化:重啟不遺失
預設情況下,APScheduler 的任務存在記憶體裡。程式一重啟,所有排程就消失了。如果你需要持久化,可以用 SQLAlchemy Job Store:
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
scheduler = BackgroundScheduler(jobstores=jobstores)
scheduler.add_job(task, 'interval', hours=1, id='my_task', replace_existing=True)
scheduler.start()
replace_existing=True 很重要——它確保程式重啟時,如果任務已存在就更新而不是報錯。
錯誤處理與重試機制
生產環境中,任務失敗是常態。APScheduler 提供了幾個處理機制:
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
def job_listener(event):
if event.exception:
print(f"任務 {event.job_id} 執行失敗: {event.exception}")
# 發送通知、記錄日誌等
else:
print(f"任務 {event.job_id} 執行成功")
scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
如果你需要自動重試,可以在任務函式裡自己實作:
import time
def resilient_task(max_retries=3):
for attempt in range(max_retries):
try:
# 你的業務邏輯
do_something()
return # 成功就結束
except Exception as e:
print(f"第 {attempt + 1} 次嘗試失敗: {e}")
if attempt < max_retries - 1:
time.sleep(5 * (attempt + 1)) # 遞增等待
print("所有重試都失敗了")
搭配 FastAPI 建立排程 API
如果你想要透過 API 來管理排程(新增、暫停、刪除任務),可以搭配 FastAPI:
from fastapi import FastAPI
from apscheduler.schedulers.background import BackgroundScheduler
app = FastAPI()
scheduler = BackgroundScheduler()
scheduler.start()
@app.post("/jobs")
def add_job(task_name: str, interval_minutes: int):
scheduler.add_job(
my_task,
'interval',
minutes=interval_minutes,
id=task_name,
replace_existing=True
)
return {"status": "Job added", "id": task_name}
@app.delete("/jobs/{job_id}")
def remove_job(job_id: str):
scheduler.remove_job(job_id)
return {"status": "Job removed"}
@app.get("/jobs")
def list_jobs():
jobs = scheduler.get_jobs()
return [{"id": j.id, "next_run": str(j.next_run_time)} for j in jobs]
進階技巧與效能優化
- 執行緒池大小:預設 ThreadPoolExecutor 的大小是 10。如果你有很多同時執行的任務,需要調大:
from apscheduler.executors.pool import ThreadPoolExecutor executors = {'default': ThreadPoolExecutor(20)} - Coalescing:如果一個任務因為某些原因積壓了多次(比如程式當機後重啟),設定
coalesce=True可以讓它只執行一次而不是執行所有積壓的次數 - Max Instances:設定
max_instances=1確保同一個任務不會同時有多個實例在執行 - Jitter:設定
jitter=60給執行時間加上隨機偏移,避免所有任務在同一秒觸發造成瞬間負載
結語
APScheduler 是 Python 排程自動化的瑞士刀——簡單上手、功能完整、不依賴外部服務。從簡單的定時爬蟲到複雜的企業級排程系統,它都能勝任。
如果你的需求更複雜(分散式排程、任務依賴關係),可以考慮升級到 Celery + Celery Beat。但對大部分場景來說,APScheduler 就夠了。先用簡單的工具解決問題,等真的遇到瓶頸再升級,這是最務實的做法。
繼續閱讀
Python Selenium 動態爬蟲教學:從入門到自動化實戰
相關文章
你可能也喜歡
探索其他領域的精選好文
DaVinci Resolve 免費影片剪輯入門教學:從安裝到完成第一支影片
DaVinci Resolve 是好萊塢等級的剪輯軟體,但免費版就能滿足 90% 的需求。這篇帶你從安裝開始,一步步完成第一支影片。
LangChain vs LlamaIndex 完整比較:2026 年 RAG 框架到底該怎麼選?
在 RAG 應用開發中,LangChain 和 LlamaIndex 是最常被拿來比較的兩大框架。這篇文章從架構設計、效能數據到實戰經驗,幫你釐清到底該選哪一個。
Google SGE 對 SEO 的影響:2026 年你必須知道的因應策略
Google AI Overview 已經出現在將近一半的搜尋結果中。SEO 不會死,但規則正在改變。這篇整理最新數據和五個你現在就該開始做的因應策略。
Redis 快取策略教學:Cache-Aside、Write-Through 到實戰踩坑全紀錄
快取不是 set/get 那麼簡單。這篇從 Cache-Aside、Write-Through 到 Write-Behind,帶你理解每種策略的取捨,加上我踩過的坑,幫你少走彎路。