LCOV - code coverage report
Current view: top level - src/test/modules/worker_spi - worker_spi.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 201 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 6 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /* -------------------------------------------------------------------------
       2              :  *
       3              :  * worker_spi.c
       4              :  *              Sample background worker code that demonstrates various coding
       5              :  *              patterns: establishing a database connection; starting and committing
       6              :  *              transactions; using GUC variables, and heeding SIGHUP to reread
       7              :  *              the configuration file; reporting to pg_stat_activity; using the
       8              :  *              process latch to sleep and exit in case of postmaster death.
       9              :  *
      10              :  * This code connects to a database, creates a schema and table, and summarizes
      11              :  * the numbers contained therein.  To see it working, insert an initial value
      12              :  * with "total" type and some initial value; then insert some other rows with
      13              :  * "delta" type.  Delta rows will be deleted by this worker and their values
      14              :  * aggregated into the total.
      15              :  *
      16              :  * Copyright (c) 2013-2026, PostgreSQL Global Development Group
      17              :  *
      18              :  * IDENTIFICATION
      19              :  *              src/test/modules/worker_spi/worker_spi.c
      20              :  *
      21              :  * -------------------------------------------------------------------------
      22              :  */
      23              : #include "postgres.h"
      24              : 
      25              : /* These are always necessary for a bgworker */
      26              : #include "miscadmin.h"
      27              : #include "postmaster/bgworker.h"
      28              : #include "postmaster/interrupt.h"
      29              : #include "storage/latch.h"
      30              : 
      31              : /* these headers are used by this particular worker's code */
      32              : #include "access/xact.h"
      33              : #include "catalog/pg_database.h"
      34              : #include "executor/spi.h"
      35              : #include "fmgr.h"
      36              : #include "lib/stringinfo.h"
      37              : #include "pgstat.h"
      38              : #include "tcop/utility.h"
      39              : #include "utils/acl.h"
      40              : #include "utils/builtins.h"
      41              : #include "utils/snapmgr.h"
      42              : 
      43            0 : PG_MODULE_MAGIC;
      44              : 
      45            0 : PG_FUNCTION_INFO_V1(worker_spi_launch);
      46              : 
      47              : PGDLLEXPORT pg_noreturn void worker_spi_main(Datum main_arg);
      48              : 
      49              : /* GUC variables */
      50              : static int      worker_spi_naptime = 10;
      51              : static int      worker_spi_total_workers = 2;
      52              : static char *worker_spi_database = NULL;
      53              : static char *worker_spi_role = NULL;
      54              : 
      55              : /* value cached, fetched from shared memory */
      56              : static uint32 worker_spi_wait_event_main = 0;
      57              : 
      58              : typedef struct worktable
      59              : {
      60              :         const char *schema;
      61              :         const char *name;
      62              : } worktable;
      63              : 
      64              : /*
      65              :  * Initialize workspace for a worker process: create the schema if it doesn't
      66              :  * already exist.
      67              :  */
      68              : static void
      69            0 : initialize_worker_spi(worktable *table)
      70              : {
      71            0 :         int                     ret;
      72            0 :         int                     ntup;
      73            0 :         bool            isnull;
      74            0 :         StringInfoData buf;
      75              : 
      76            0 :         SetCurrentStatementStartTimestamp();
      77            0 :         StartTransactionCommand();
      78            0 :         SPI_connect();
      79            0 :         PushActiveSnapshot(GetTransactionSnapshot());
      80            0 :         pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
      81              : 
      82              :         /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
      83            0 :         initStringInfo(&buf);
      84            0 :         appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
      85            0 :                                          table->schema);
      86              : 
      87            0 :         debug_query_string = buf.data;
      88            0 :         ret = SPI_execute(buf.data, true, 0);
      89            0 :         if (ret != SPI_OK_SELECT)
      90            0 :                 elog(FATAL, "SPI_execute failed: error code %d", ret);
      91              : 
      92            0 :         if (SPI_processed != 1)
      93            0 :                 elog(FATAL, "not a singleton result");
      94              : 
      95            0 :         ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
      96            0 :                                                                            SPI_tuptable->tupdesc,
      97              :                                                                            1, &isnull));
      98            0 :         if (isnull)
      99            0 :                 elog(FATAL, "null result");
     100              : 
     101            0 :         if (ntup == 0)
     102              :         {
     103            0 :                 debug_query_string = NULL;
     104            0 :                 resetStringInfo(&buf);
     105            0 :                 appendStringInfo(&buf,
     106              :                                                  "CREATE SCHEMA \"%s\" "
     107              :                                                  "CREATE TABLE \"%s\" ("
     108              :                                                  "         type text CHECK (type IN ('total', 'delta')), "
     109              :                                                  "         value   integer)"
     110              :                                                  "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
     111              :                                                  "WHERE type = 'total'",
     112            0 :                                                  table->schema, table->name, table->name, table->name);
     113              : 
     114              :                 /* set statement start time */
     115            0 :                 SetCurrentStatementStartTimestamp();
     116              : 
     117            0 :                 debug_query_string = buf.data;
     118            0 :                 ret = SPI_execute(buf.data, false, 0);
     119              : 
     120            0 :                 if (ret != SPI_OK_UTILITY)
     121            0 :                         elog(FATAL, "failed to create my schema");
     122              : 
     123            0 :                 debug_query_string = NULL;      /* rest is not statement-specific */
     124            0 :         }
     125              : 
     126            0 :         SPI_finish();
     127            0 :         PopActiveSnapshot();
     128            0 :         CommitTransactionCommand();
     129            0 :         debug_query_string = NULL;
     130            0 :         pgstat_report_activity(STATE_IDLE, NULL);
     131            0 : }
     132              : 
     133              : void
     134            0 : worker_spi_main(Datum main_arg)
     135              : {
     136            0 :         int                     index = DatumGetInt32(main_arg);
     137            0 :         worktable  *table;
     138            0 :         StringInfoData buf;
     139            0 :         char            name[20];
     140            0 :         Oid                     dboid;
     141            0 :         Oid                     roleoid;
     142            0 :         char       *p;
     143            0 :         bits32          flags = 0;
     144              : 
     145            0 :         table = palloc_object(worktable);
     146            0 :         sprintf(name, "schema%d", index);
     147            0 :         table->schema = pstrdup(name);
     148            0 :         table->name = pstrdup("counted");
     149              : 
     150              :         /* fetch database and role OIDs, these are set for a dynamic worker */
     151            0 :         p = MyBgworkerEntry->bgw_extra;
     152            0 :         memcpy(&dboid, p, sizeof(Oid));
     153            0 :         p += sizeof(Oid);
     154            0 :         memcpy(&roleoid, p, sizeof(Oid));
     155            0 :         p += sizeof(Oid);
     156            0 :         memcpy(&flags, p, sizeof(bits32));
     157              : 
     158              :         /* Establish signal handlers before unblocking signals. */
     159            0 :         pqsignal(SIGHUP, SignalHandlerForConfigReload);
     160            0 :         pqsignal(SIGTERM, die);
     161              : 
     162              :         /* We're now ready to receive signals */
     163            0 :         BackgroundWorkerUnblockSignals();
     164              : 
     165              :         /* Connect to our database */
     166            0 :         if (OidIsValid(dboid))
     167            0 :                 BackgroundWorkerInitializeConnectionByOid(dboid, roleoid, flags);
     168              :         else
     169            0 :                 BackgroundWorkerInitializeConnection(worker_spi_database,
     170            0 :                                                                                          worker_spi_role, flags);
     171              : 
     172            0 :         elog(LOG, "%s initialized with %s.%s",
     173              :                  MyBgworkerEntry->bgw_name, table->schema, table->name);
     174            0 :         initialize_worker_spi(table);
     175              : 
     176              :         /*
     177              :          * Quote identifiers passed to us.  Note that this must be done after
     178              :          * initialize_worker_spi, because that routine assumes the names are not
     179              :          * quoted.
     180              :          *
     181              :          * Note some memory might be leaked here.
     182              :          */
     183            0 :         table->schema = quote_identifier(table->schema);
     184            0 :         table->name = quote_identifier(table->name);
     185              : 
     186            0 :         initStringInfo(&buf);
     187            0 :         appendStringInfo(&buf,
     188              :                                          "WITH deleted AS (DELETE "
     189              :                                          "FROM %s.%s "
     190              :                                          "WHERE type = 'delta' RETURNING value), "
     191              :                                          "total AS (SELECT coalesce(sum(value), 0) as sum "
     192              :                                          "FROM deleted) "
     193              :                                          "UPDATE %s.%s "
     194              :                                          "SET value = %s.value + total.sum "
     195              :                                          "FROM total WHERE type = 'total' "
     196              :                                          "RETURNING %s.value",
     197            0 :                                          table->schema, table->name,
     198            0 :                                          table->schema, table->name,
     199            0 :                                          table->name,
     200            0 :                                          table->name);
     201              : 
     202              :         /*
     203              :          * Main loop: do this until SIGTERM is received and processed by
     204              :          * ProcessInterrupts.
     205              :          */
     206            0 :         for (;;)
     207              :         {
     208            0 :                 int                     ret;
     209              : 
     210              :                 /* First time, allocate or get the custom wait event */
     211            0 :                 if (worker_spi_wait_event_main == 0)
     212            0 :                         worker_spi_wait_event_main = WaitEventExtensionNew("WorkerSpiMain");
     213              : 
     214              :                 /*
     215              :                  * Background workers mustn't call usleep() or any direct equivalent:
     216              :                  * instead, they may wait on their process latch, which sleeps as
     217              :                  * necessary, but is awakened if postmaster dies.  That way the
     218              :                  * background process goes away immediately in an emergency.
     219              :                  */
     220            0 :                 (void) WaitLatch(MyLatch,
     221              :                                                  WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     222            0 :                                                  worker_spi_naptime * 1000L,
     223            0 :                                                  worker_spi_wait_event_main);
     224            0 :                 ResetLatch(MyLatch);
     225              : 
     226            0 :                 CHECK_FOR_INTERRUPTS();
     227              : 
     228              :                 /*
     229              :                  * In case of a SIGHUP, just reload the configuration.
     230              :                  */
     231            0 :                 if (ConfigReloadPending)
     232              :                 {
     233            0 :                         ConfigReloadPending = false;
     234            0 :                         ProcessConfigFile(PGC_SIGHUP);
     235            0 :                 }
     236              : 
     237              :                 /*
     238              :                  * Start a transaction on which we can run queries.  Note that each
     239              :                  * StartTransactionCommand() call should be preceded by a
     240              :                  * SetCurrentStatementStartTimestamp() call, which sets both the time
     241              :                  * for the statement we're about the run, and also the transaction
     242              :                  * start time.  Also, each other query sent to SPI should probably be
     243              :                  * preceded by SetCurrentStatementStartTimestamp(), so that statement
     244              :                  * start time is always up to date.
     245              :                  *
     246              :                  * The SPI_connect() call lets us run queries through the SPI manager,
     247              :                  * and the PushActiveSnapshot() call creates an "active" snapshot
     248              :                  * which is necessary for queries to have MVCC data to work on.
     249              :                  *
     250              :                  * The pgstat_report_activity() call makes our activity visible
     251              :                  * through the pgstat views.
     252              :                  */
     253            0 :                 SetCurrentStatementStartTimestamp();
     254            0 :                 StartTransactionCommand();
     255            0 :                 SPI_connect();
     256            0 :                 PushActiveSnapshot(GetTransactionSnapshot());
     257            0 :                 debug_query_string = buf.data;
     258            0 :                 pgstat_report_activity(STATE_RUNNING, buf.data);
     259              : 
     260              :                 /* We can now execute queries via SPI */
     261            0 :                 ret = SPI_execute(buf.data, false, 0);
     262              : 
     263            0 :                 if (ret != SPI_OK_UPDATE_RETURNING)
     264            0 :                         elog(FATAL, "cannot select from table %s.%s: error code %d",
     265              :                                  table->schema, table->name, ret);
     266              : 
     267            0 :                 if (SPI_processed > 0)
     268              :                 {
     269            0 :                         bool            isnull;
     270            0 :                         int32           val;
     271              : 
     272            0 :                         val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
     273            0 :                                                                                           SPI_tuptable->tupdesc,
     274              :                                                                                           1, &isnull));
     275            0 :                         if (!isnull)
     276            0 :                                 elog(LOG, "%s: count in %s.%s is now %d",
     277              :                                          MyBgworkerEntry->bgw_name,
     278              :                                          table->schema, table->name, val);
     279            0 :                 }
     280              : 
     281              :                 /*
     282              :                  * And finish our transaction.
     283              :                  */
     284            0 :                 SPI_finish();
     285            0 :                 PopActiveSnapshot();
     286            0 :                 CommitTransactionCommand();
     287            0 :                 debug_query_string = NULL;
     288            0 :                 pgstat_report_stat(true);
     289            0 :                 pgstat_report_activity(STATE_IDLE, NULL);
     290            0 :         }
     291              : 
     292              :         /* Not reachable */
     293              : }
     294              : 
     295              : /*
     296              :  * Entrypoint of this module.
     297              :  *
     298              :  * We register more than one worker process here, to demonstrate how that can
     299              :  * be done.
     300              :  */
     301              : void
     302            0 : _PG_init(void)
     303              : {
     304            0 :         BackgroundWorker worker;
     305              : 
     306              :         /* get the configuration */
     307              : 
     308              :         /*
     309              :          * These GUCs are defined even if this library is not loaded with
     310              :          * shared_preload_libraries, for worker_spi_launch().
     311              :          */
     312            0 :         DefineCustomIntVariable("worker_spi.naptime",
     313              :                                                         "Duration between each check (in seconds).",
     314              :                                                         NULL,
     315              :                                                         &worker_spi_naptime,
     316              :                                                         10,
     317              :                                                         1,
     318              :                                                         INT_MAX,
     319              :                                                         PGC_SIGHUP,
     320              :                                                         0,
     321              :                                                         NULL,
     322              :                                                         NULL,
     323              :                                                         NULL);
     324              : 
     325            0 :         DefineCustomStringVariable("worker_spi.database",
     326              :                                                            "Database to connect to.",
     327              :                                                            NULL,
     328              :                                                            &worker_spi_database,
     329              :                                                            "postgres",
     330              :                                                            PGC_SIGHUP,
     331              :                                                            0,
     332              :                                                            NULL, NULL, NULL);
     333              : 
     334            0 :         DefineCustomStringVariable("worker_spi.role",
     335              :                                                            "Role to connect with.",
     336              :                                                            NULL,
     337              :                                                            &worker_spi_role,
     338              :                                                            NULL,
     339              :                                                            PGC_SIGHUP,
     340              :                                                            0,
     341              :                                                            NULL, NULL, NULL);
     342              : 
     343            0 :         if (!process_shared_preload_libraries_in_progress)
     344            0 :                 return;
     345              : 
     346            0 :         DefineCustomIntVariable("worker_spi.total_workers",
     347              :                                                         "Number of workers.",
     348              :                                                         NULL,
     349              :                                                         &worker_spi_total_workers,
     350              :                                                         2,
     351              :                                                         1,
     352              :                                                         100,
     353              :                                                         PGC_POSTMASTER,
     354              :                                                         0,
     355              :                                                         NULL,
     356              :                                                         NULL,
     357              :                                                         NULL);
     358              : 
     359            0 :         MarkGUCPrefixReserved("worker_spi");
     360              : 
     361              :         /* set up common data for all our workers */
     362            0 :         memset(&worker, 0, sizeof(worker));
     363            0 :         worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
     364              :                 BGWORKER_BACKEND_DATABASE_CONNECTION;
     365            0 :         worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
     366            0 :         worker.bgw_restart_time = BGW_NEVER_RESTART;
     367            0 :         sprintf(worker.bgw_library_name, "worker_spi");
     368            0 :         sprintf(worker.bgw_function_name, "worker_spi_main");
     369            0 :         worker.bgw_notify_pid = 0;
     370              : 
     371              :         /*
     372              :          * Now fill in worker-specific data, and do the actual registrations.
     373              :          *
     374              :          * bgw_extra can optionally include a database OID, a role OID and a set
     375              :          * of flags.  This is left empty here to fallback to the related GUCs at
     376              :          * startup (0 for the bgworker flags).
     377              :          */
     378            0 :         for (int i = 1; i <= worker_spi_total_workers; i++)
     379              :         {
     380            0 :                 snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
     381            0 :                 snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
     382            0 :                 worker.bgw_main_arg = Int32GetDatum(i);
     383              : 
     384            0 :                 RegisterBackgroundWorker(&worker);
     385            0 :         }
     386            0 : }
     387              : 
     388              : /*
     389              :  * Dynamically launch an SPI worker.
     390              :  */
     391              : Datum
     392            0 : worker_spi_launch(PG_FUNCTION_ARGS)
     393              : {
     394            0 :         int32           i = PG_GETARG_INT32(0);
     395            0 :         Oid                     dboid = PG_GETARG_OID(1);
     396            0 :         Oid                     roleoid = PG_GETARG_OID(2);
     397            0 :         BackgroundWorker worker;
     398            0 :         BackgroundWorkerHandle *handle;
     399            0 :         BgwHandleStatus status;
     400            0 :         pid_t           pid;
     401            0 :         char       *p;
     402            0 :         bits32          flags = 0;
     403            0 :         ArrayType  *arr = PG_GETARG_ARRAYTYPE_P(3);
     404            0 :         Size            ndim;
     405            0 :         int                     nelems;
     406            0 :         Datum      *datum_flags;
     407            0 :         bool            interruptible = PG_GETARG_BOOL(4);
     408              : 
     409            0 :         memset(&worker, 0, sizeof(worker));
     410            0 :         worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
     411              :                 BGWORKER_BACKEND_DATABASE_CONNECTION;
     412              : 
     413            0 :         if (interruptible)
     414            0 :                 worker.bgw_flags |= BGWORKER_INTERRUPTIBLE;
     415              : 
     416            0 :         worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
     417            0 :         worker.bgw_restart_time = BGW_NEVER_RESTART;
     418            0 :         sprintf(worker.bgw_library_name, "worker_spi");
     419            0 :         sprintf(worker.bgw_function_name, "worker_spi_main");
     420            0 :         snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi dynamic worker %d", i);
     421            0 :         snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi dynamic");
     422            0 :         worker.bgw_main_arg = Int32GetDatum(i);
     423              :         /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
     424            0 :         worker.bgw_notify_pid = MyProcPid;
     425              : 
     426              :         /* extract flags, if any */
     427            0 :         ndim = ARR_NDIM(arr);
     428            0 :         if (ndim > 1)
     429            0 :                 ereport(ERROR,
     430              :                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     431              :                                  errmsg("flags array must be one-dimensional")));
     432              : 
     433            0 :         if (array_contains_nulls(arr))
     434            0 :                 ereport(ERROR,
     435              :                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     436              :                                  errmsg("flags array must not contain nulls")));
     437              : 
     438            0 :         Assert(ARR_ELEMTYPE(arr) == TEXTOID);
     439            0 :         deconstruct_array_builtin(arr, TEXTOID, &datum_flags, NULL, &nelems);
     440              : 
     441            0 :         for (i = 0; i < nelems; i++)
     442              :         {
     443            0 :                 char       *optname = TextDatumGetCString(datum_flags[i]);
     444              : 
     445            0 :                 if (strcmp(optname, "ALLOWCONN") == 0)
     446            0 :                         flags |= BGWORKER_BYPASS_ALLOWCONN;
     447            0 :                 else if (strcmp(optname, "ROLELOGINCHECK") == 0)
     448            0 :                         flags |= BGWORKER_BYPASS_ROLELOGINCHECK;
     449              :                 else
     450            0 :                         ereport(ERROR,
     451              :                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     452              :                                          errmsg("incorrect flag value found in array")));
     453            0 :         }
     454              : 
     455              :         /*
     456              :          * Register database and role to use for the worker started in bgw_extra.
     457              :          * If none have been provided, this will fall back to the GUCs at startup.
     458              :          */
     459            0 :         if (!OidIsValid(dboid))
     460            0 :                 dboid = get_database_oid(worker_spi_database, false);
     461              : 
     462              :         /*
     463              :          * worker_spi_role is NULL by default, so this gives to worker_spi_main()
     464              :          * an invalid OID in this case.
     465              :          */
     466            0 :         if (!OidIsValid(roleoid) && worker_spi_role)
     467            0 :                 roleoid = get_role_oid(worker_spi_role, false);
     468              : 
     469            0 :         p = worker.bgw_extra;
     470            0 :         memcpy(p, &dboid, sizeof(Oid));
     471            0 :         p += sizeof(Oid);
     472            0 :         memcpy(p, &roleoid, sizeof(Oid));
     473            0 :         p += sizeof(Oid);
     474            0 :         memcpy(p, &flags, sizeof(bits32));
     475              : 
     476            0 :         if (!RegisterDynamicBackgroundWorker(&worker, &handle))
     477            0 :                 PG_RETURN_NULL();
     478              : 
     479            0 :         status = WaitForBackgroundWorkerStartup(handle, &pid);
     480              : 
     481            0 :         if (status == BGWH_STOPPED)
     482            0 :                 ereport(ERROR,
     483              :                                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     484              :                                  errmsg("could not start background process"),
     485              :                                  errhint("More details may be available in the server log.")));
     486            0 :         if (status == BGWH_POSTMASTER_DIED)
     487            0 :                 ereport(ERROR,
     488              :                                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     489              :                                  errmsg("cannot start background processes without postmaster"),
     490              :                                  errhint("Kill all remaining database processes and restart the database.")));
     491            0 :         Assert(status == BGWH_STARTED);
     492              : 
     493            0 :         PG_RETURN_INT32(pid);
     494            0 : }
        

Generated by: LCOV version 2.3.2-1