Supabase Cron & Queues — スケジュール実行とメッセージキューを Postgres で完結させる
Supabase の Cron(pg_cron ベース)と Queues(pgmq ベース)の役割、設定、運用、Edge Functions との連携、リトライと dead letter まで実用観点で整理する。
この章の要点
- Supabase Cron は Postgres 拡張
pg_cronをそのままマネージドで利用できる仕組みであり、SQL・データベース関数・HTTP リクエスト(Edge Function 含む)をスケジュール実行できる。 - Supabase Queues は Postgres 拡張
pgmqをベースにしたプル型メッセージキューであり、外部の Redis や SQS を使わず Postgres 内だけでバックグラウンド処理を構成できる。 - どちらも Postgres の中で完結するため、外部 worker や別プロダクトのアカウントを増やさずに「定期実行」「非同期処理」「リトライ」を組める。
- 一方で Cron はスケジュールが UTC 固定であり、Queues には visibility timeout・メッセージサイズ・Compute プランによる制限といった運用上の癖があるため、設計時に把握しておく必要がある。
Supabase の Cron / Queues とは
Supabase Cron は、Postgres の上で動く pg_cron 拡張を Supabase のダッシュボードから扱えるようにしたもので、cron 式に従って SQL や HTTP リクエストを定期実行できる。ジョブとその実行履歴は cron.job および cron.job_run_details というテーブルに保存され、外部のスケジューラサービスに依存しない構成になっている。
Supabase Queues は、Postgres 上のメッセージキュー拡張 pgmq をマネージド化したもので、JSON ペイロードのメッセージを「送信」「受信」「アーカイブ」「削除」するためのデータベース関数群が提供される。プル型 FIFO であり、コンシューマーが能動的にメッセージを取り出して処理する設計となっている。
両者の本質は「Postgres を一級のジョブインフラとして使う」ことであり、Edge Functions と組み合わせれば「Cron でキックされた Edge Function が Queue を drain する」といったパターンを Supabase 一つで完結できる。
何が解説されているか
公式ドキュメントは以下の構造で整理されている。
Cron
- Overview —
pg_cronベースであること、SQL/関数/HTTP を実行できること、cron.job_run_detailsで監視できること、推奨は同時実行 8 ジョブ以内・1 ジョブ 10 分以内であること。 - Install — Dashboard の Integrations から有効化する手順、または
create extension pg_cron with schema pg_catalog;を含む SQL での有効化手順。拡張を無効化するとジョブが完全削除される警告も含まれる。 - Quickstart —
cron.schedule()でジョブを登録し、cron.alter_job()で編集、cron.unschedule()で削除する操作。Postgres 15.1.1.61 以降では秒単位スケジュール(例:'30 seconds')も指定できる。 - Edge Functions invocation / HTTP request via pg_net —
pg_net拡張のnet.http_post等を Cron から呼び、Edge Function や任意の HTTP エンドポイントを叩くパターン。
Queues
- Overview —
pgmqベース、Postgres ネイティブ、ダッシュボード対応、API 権限と RLS による細粒度認可。 - Quickstart —
pgmq拡張の有効化、キュー作成、Basic / Unlogged / Partitioned のキュータイプ、pgmq.q_<name>とpgmq.a_<name>の二つのテーブルが作られること。 - API —
pgmq_publicスキーマに公開されるsend/send_batch/read/pop/archive/deleteの各関数。 - Authorization — Data API 経由で使う際は「Expose Queues via PostgREST」を有効化し、
pgmq_public関数への権限付与と RLS ポリシーを設定すること。 - Reading messages / Archiving / Database Functions — 受信時の visibility timeout、
archive()による監査用退避、データベース関数からの直接利用。
使い方
pg_cron 有効化と cron job 登録
-- 拡張を有効化
create extension if not exists pg_cron with schema pg_catalog;
-- 毎時 0 分に古いログを削除する
select cron.schedule(
'cleanup-old-logs',
'0 * * * *',
$$ delete from app_logs where created_at < now() - interval '30 days' $$
);
ジョブ名はケースセンシティブで、同名で再登録すると上書きされる点に注意する。cron.alter_job() で active := false を渡せば一時停止でき、cron.unschedule('cleanup-old-logs') で削除できる。
Edge Function を定期呼び出しする例(pg_net)
create extension if not exists pg_net;
select cron.schedule(
'invoke-daily-digest',
'0 9 * * *', -- UTC 09:00(JST 18:00)
$$
select net.http_post(
url := 'https://<project-ref>.functions.supabase.co/daily-digest',
headers := jsonb_build_object(
'Content-Type', 'application/json',
'Authorization', 'Bearer ' || current_setting('app.settings.service_role_key', true)
),
body := jsonb_build_object('triggered_at', now())
);
$$
);
pg_net の net.http_post は非同期キューに HTTP リクエストを積む方式であり、戻り値は request_id である。レスポンス本体は net._http_response テーブルに後から書き込まれる。Cron からは「投げっぱなし」で十分な処理に向いている。
pgmq でキュー作成・送信・受信・archive
create extension if not exists pgmq;
-- キュー作成(基本タイプ)
select pgmq.create('email_jobs');
-- 送信
select pgmq.send('email_jobs', jsonb_build_object(
'to', 'user@example.com',
'template', 'welcome'
));
-- 受信(visibility timeout 30 秒、最大 5 件)
select * from pgmq.read('email_jobs', 30, 5);
-- 処理完了後にアーカイブ(監査用に残す)
select pgmq.archive('email_jobs', <msg_id>);
-- もしくは完全削除
select pgmq.delete('email_jobs', <msg_id>);
クライアント SDK から扱う場合は、Dashboard で「Expose Queues via PostgREST」を有効化したうえで、pgmq_public.send / read / pop / archive を RPC として呼び出す。
const { data, error } = await supabase.schema('pgmq_public').rpc('send', {
queue_name: 'email_jobs',
message: { to: 'user@example.com', template: 'welcome' },
});
Queue を Edge Function でドレインする例
// supabase/functions/drain-email-jobs/index.ts
import { createClient } from 'jsr:@supabase/supabase-js@2';
Deno.serve(async () => {
const supabase = createClient(
Deno.env.get('SUPABASE_URL')!,
Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!,
);
const { data: messages } = await supabase
.schema('pgmq_public')
.rpc('read', { queue_name: 'email_jobs', sleep_seconds: 30, n: 10 });
for (const m of messages ?? []) {
try {
await sendEmail(m.message);
await supabase.schema('pgmq_public').rpc('archive', {
queue_name: 'email_jobs',
message_id: m.msg_id,
});
} catch (_e) {
// visibility timeout が切れれば自動的に再配送される
}
}
return new Response('ok');
});
このエンドポイントを Cron から net.http_post で 1 分おきに叩けば、「Cron でトリガー → Edge Function が Queue を一括処理」という素朴な worker パターンが完成する。
リトライ・dead letter 的な運用パターン
pgmq は read() で取り出した時点でメッセージを削除せず、visibility timeout の間だけ他のコンシューマーから不可視にする。タイムアウト内に archive も delete もされなければ、メッセージは自動的にキューに戻り、次の read で再度配送される。これがそのままリトライ機構として機能する。
dead letter キュー機能は Supabase Queues 自体には組み込まれていないため、自前で「失敗回数」を扱う設計を採る。たとえばメッセージペイロードに attempts を持たせ、Edge Function 側で attempts >= 5 のとき dlq_email_jobs キューに送り直してから元メッセージを archive する、といった実装である。
if ((m.message.attempts ?? 0) >= 5) {
await supabase.schema('pgmq_public').rpc('send', {
queue_name: 'dlq_email_jobs',
message: { ...m.message, failed_at: new Date().toISOString() },
});
await supabase.schema('pgmq_public').rpc('archive', {
queue_name: 'email_jobs',
message_id: m.msg_id,
});
}
注意点・セキュリティ観点
- Cron のスケジュールは UTC 固定である。JST で「毎朝 9 時」に実行したい場合は cron 式を
0 0 * * *と書く必要があり、サマータイム圏のチームと運用するときは特に混乱しやすい。 - 誤った schedule(例:
* * * * *を秒間隔のつもりで設定)は容易にジョブの暴走を招く。cron.job_run_detailsを必ず定期確認し、推奨ライン「同時実行 8 ジョブ以内・1 ジョブ 10 分以内」を超えないようにする。 pg_cron拡張をdrop extensionすると登録済みジョブが完全削除される。本番では拡張の無効化を運用フローに組み込まないこと。pg_netの HTTP リクエストは非同期キュー方式である。Cron から呼ぶときに「実行されたか」をその場で判定できないため、Edge Function 側で受信ログを残すか、net._http_responseを後追いで監視する。- Service Role キーを Cron 経由で HTTP ヘッダに載せる場合は、Vault や
ALTER DATABASE ... SET app.settings.service_role_key = ...等で安全に保管し、SQL 文へ平文で埋め込まない。 - Queues の
read()は visibility timeout 中に処理を終えなければメッセージが復活する。タイムアウトは処理の最悪所要時間より十分長く取る必要があり、短すぎると同一メッセージが二重実行されやすくなる。 - メッセージはあくまで Postgres の行として保存されるため、巨大ペイロードはキューに載せず、Storage に置いてキーだけを送る設計にする。
- Compute プランが小さい場合、Cron / Queues のスループットは Postgres インスタンスの CPU / メモリにそのまま縛られる。高負荷ジョブは Compute サイズの引き上げか、外部 worker へのオフロードを検討する。
- Queues を Data API から使うときは、
pgmq_public関数へのgrant executeと RLS ポリシー設定を必ず行い、anon ロールから任意キューへ送信できる状態を放置しない。