Web開発 2026年5月8日

Supabase Cron & Queues — スケジュール実行とメッセージキューを Postgres で完結させる

Supabase の Cron(pg_cron ベース)と Queues(pgmq ベース)の役割、設定、運用、Edge Functions との連携、リトライと dead letter まで実用観点で整理する。

この章の要点

  • Supabase Cron は Postgres 拡張 pg_cron をそのままマネージドで利用できる仕組みであり、SQL・データベース関数・HTTP リクエスト(Edge Function 含む)をスケジュール実行できる。
  • Supabase Queues は Postgres 拡張 pgmq をベースにしたプル型メッセージキューであり、外部の Redis や SQS を使わず Postgres 内だけでバックグラウンド処理を構成できる。
  • どちらも Postgres の中で完結するため、外部 worker や別プロダクトのアカウントを増やさずに「定期実行」「非同期処理」「リトライ」を組める。
  • 一方で Cron はスケジュールが UTC 固定であり、Queues には visibility timeout・メッセージサイズ・Compute プランによる制限といった運用上の癖があるため、設計時に把握しておく必要がある。

Supabase の Cron / Queues とは

Supabase Cron は、Postgres の上で動く pg_cron 拡張を Supabase のダッシュボードから扱えるようにしたもので、cron 式に従って SQL や HTTP リクエストを定期実行できる。ジョブとその実行履歴は cron.job および cron.job_run_details というテーブルに保存され、外部のスケジューラサービスに依存しない構成になっている。

Supabase Queues は、Postgres 上のメッセージキュー拡張 pgmq をマネージド化したもので、JSON ペイロードのメッセージを「送信」「受信」「アーカイブ」「削除」するためのデータベース関数群が提供される。プル型 FIFO であり、コンシューマーが能動的にメッセージを取り出して処理する設計となっている。

両者の本質は「Postgres を一級のジョブインフラとして使う」ことであり、Edge Functions と組み合わせれば「Cron でキックされた Edge Function が Queue を drain する」といったパターンを Supabase 一つで完結できる。

何が解説されているか

公式ドキュメントは以下の構造で整理されている。

Cron

  • Overview — pg_cron ベースであること、SQL/関数/HTTP を実行できること、cron.job_run_details で監視できること、推奨は同時実行 8 ジョブ以内・1 ジョブ 10 分以内であること。
  • Install — Dashboard の Integrations から有効化する手順、または create extension pg_cron with schema pg_catalog; を含む SQL での有効化手順。拡張を無効化するとジョブが完全削除される警告も含まれる。
  • Quickstart — cron.schedule() でジョブを登録し、cron.alter_job() で編集、cron.unschedule() で削除する操作。Postgres 15.1.1.61 以降では秒単位スケジュール(例: '30 seconds')も指定できる。
  • Edge Functions invocation / HTTP request via pg_net — pg_net 拡張の net.http_post 等を Cron から呼び、Edge Function や任意の HTTP エンドポイントを叩くパターン。

Queues

  • Overview — pgmq ベース、Postgres ネイティブ、ダッシュボード対応、API 権限と RLS による細粒度認可。
  • Quickstart — pgmq 拡張の有効化、キュー作成、Basic / Unlogged / Partitioned のキュータイプ、pgmq.q_<name>pgmq.a_<name> の二つのテーブルが作られること。
  • API — pgmq_public スキーマに公開される send / send_batch / read / pop / archive / delete の各関数。
  • Authorization — Data API 経由で使う際は「Expose Queues via PostgREST」を有効化し、pgmq_public 関数への権限付与と RLS ポリシーを設定すること。
  • Reading messages / Archiving / Database Functions — 受信時の visibility timeout、archive() による監査用退避、データベース関数からの直接利用。

使い方

pg_cron 有効化と cron job 登録

-- 拡張を有効化
create extension if not exists pg_cron with schema pg_catalog;

-- 毎時 0 分に古いログを削除する
select cron.schedule(
  'cleanup-old-logs',
  '0 * * * *',
  $$ delete from app_logs where created_at < now() - interval '30 days' $$
);

ジョブ名はケースセンシティブで、同名で再登録すると上書きされる点に注意する。cron.alter_job()active := false を渡せば一時停止でき、cron.unschedule('cleanup-old-logs') で削除できる。

Edge Function を定期呼び出しする例(pg_net)

create extension if not exists pg_net;

select cron.schedule(
  'invoke-daily-digest',
  '0 9 * * *', -- UTC 09:00(JST 18:00)
  $$
  select net.http_post(
    url := 'https://<project-ref>.functions.supabase.co/daily-digest',
    headers := jsonb_build_object(
      'Content-Type', 'application/json',
      'Authorization', 'Bearer ' || current_setting('app.settings.service_role_key', true)
    ),
    body := jsonb_build_object('triggered_at', now())
  );
  $$
);

pg_netnet.http_post は非同期キューに HTTP リクエストを積む方式であり、戻り値は request_id である。レスポンス本体は net._http_response テーブルに後から書き込まれる。Cron からは「投げっぱなし」で十分な処理に向いている。

pgmq でキュー作成・送信・受信・archive

create extension if not exists pgmq;

-- キュー作成(基本タイプ)
select pgmq.create('email_jobs');

-- 送信
select pgmq.send('email_jobs', jsonb_build_object(
  'to', 'user@example.com',
  'template', 'welcome'
));

-- 受信(visibility timeout 30 秒、最大 5 件)
select * from pgmq.read('email_jobs', 30, 5);

-- 処理完了後にアーカイブ(監査用に残す)
select pgmq.archive('email_jobs', <msg_id>);

-- もしくは完全削除
select pgmq.delete('email_jobs', <msg_id>);

クライアント SDK から扱う場合は、Dashboard で「Expose Queues via PostgREST」を有効化したうえで、pgmq_public.send / read / pop / archive を RPC として呼び出す。

const { data, error } = await supabase.schema('pgmq_public').rpc('send', {
  queue_name: 'email_jobs',
  message: { to: 'user@example.com', template: 'welcome' },
});

Queue を Edge Function でドレインする例

// supabase/functions/drain-email-jobs/index.ts
import { createClient } from 'jsr:@supabase/supabase-js@2';

Deno.serve(async () => {
  const supabase = createClient(
    Deno.env.get('SUPABASE_URL')!,
    Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!,
  );

  const { data: messages } = await supabase
    .schema('pgmq_public')
    .rpc('read', { queue_name: 'email_jobs', sleep_seconds: 30, n: 10 });

  for (const m of messages ?? []) {
    try {
      await sendEmail(m.message);
      await supabase.schema('pgmq_public').rpc('archive', {
        queue_name: 'email_jobs',
        message_id: m.msg_id,
      });
    } catch (_e) {
      // visibility timeout が切れれば自動的に再配送される
    }
  }

  return new Response('ok');
});

このエンドポイントを Cron から net.http_post で 1 分おきに叩けば、「Cron でトリガー → Edge Function が Queue を一括処理」という素朴な worker パターンが完成する。

リトライ・dead letter 的な運用パターン

pgmqread() で取り出した時点でメッセージを削除せず、visibility timeout の間だけ他のコンシューマーから不可視にする。タイムアウト内に archivedelete もされなければ、メッセージは自動的にキューに戻り、次の read で再度配送される。これがそのままリトライ機構として機能する。

dead letter キュー機能は Supabase Queues 自体には組み込まれていないため、自前で「失敗回数」を扱う設計を採る。たとえばメッセージペイロードに attempts を持たせ、Edge Function 側で attempts >= 5 のとき dlq_email_jobs キューに送り直してから元メッセージを archive する、といった実装である。

if ((m.message.attempts ?? 0) >= 5) {
  await supabase.schema('pgmq_public').rpc('send', {
    queue_name: 'dlq_email_jobs',
    message: { ...m.message, failed_at: new Date().toISOString() },
  });
  await supabase.schema('pgmq_public').rpc('archive', {
    queue_name: 'email_jobs',
    message_id: m.msg_id,
  });
}

注意点・セキュリティ観点

  • Cron のスケジュールは UTC 固定である。JST で「毎朝 9 時」に実行したい場合は cron 式を 0 0 * * * と書く必要があり、サマータイム圏のチームと運用するときは特に混乱しやすい。
  • 誤った schedule(例: * * * * * を秒間隔のつもりで設定)は容易にジョブの暴走を招く。cron.job_run_details を必ず定期確認し、推奨ライン「同時実行 8 ジョブ以内・1 ジョブ 10 分以内」を超えないようにする。
  • pg_cron 拡張を drop extension すると登録済みジョブが完全削除される。本番では拡張の無効化を運用フローに組み込まないこと。
  • pg_net の HTTP リクエストは非同期キュー方式である。Cron から呼ぶときに「実行されたか」をその場で判定できないため、Edge Function 側で受信ログを残すか、net._http_response を後追いで監視する。
  • Service Role キーを Cron 経由で HTTP ヘッダに載せる場合は、Vault や ALTER DATABASE ... SET app.settings.service_role_key = ... 等で安全に保管し、SQL 文へ平文で埋め込まない。
  • Queues の read() は visibility timeout 中に処理を終えなければメッセージが復活する。タイムアウトは処理の最悪所要時間より十分長く取る必要があり、短すぎると同一メッセージが二重実行されやすくなる。
  • メッセージはあくまで Postgres の行として保存されるため、巨大ペイロードはキューに載せず、Storage に置いてキーだけを送る設計にする。
  • Compute プランが小さい場合、Cron / Queues のスループットは Postgres インスタンスの CPU / メモリにそのまま縛られる。高負荷ジョブは Compute サイズの引き上げか、外部 worker へのオフロードを検討する。
  • Queues を Data API から使うときは、pgmq_public 関数への grant execute と RLS ポリシー設定を必ず行い、anon ロールから任意キューへ送信できる状態を放置しない。

一次ソース(原文)