LCOV - code coverage report
Current view: top level - src/fe_utils - astreamer_zstd.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 140 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 8 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * astreamer_zstd.c
       4              :  *
       5              :  * Archive streamers that deal with data compressed using zstd.
       6              :  * astreamer_zstd_compressor applies lz4 compression to the input stream,
       7              :  * and astreamer_zstd_decompressor does the reverse.
       8              :  *
       9              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      10              :  *
      11              :  * IDENTIFICATION
      12              :  *                src/fe_utils/astreamer_zstd.c
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : 
      16              : #include "postgres_fe.h"
      17              : 
      18              : #include <unistd.h>
      19              : 
      20              : #ifdef USE_ZSTD
      21              : #include <zstd.h>
      22              : #endif
      23              : 
      24              : #include "common/logging.h"
      25              : #include "fe_utils/astreamer.h"
      26              : 
      27              : #ifdef USE_ZSTD
      28              : 
      29              : typedef struct astreamer_zstd_frame
      30              : {
      31              :         astreamer       base;
      32              : 
      33              :         ZSTD_CCtx  *cctx;
      34              :         ZSTD_DCtx  *dctx;
      35              :         ZSTD_outBuffer zstd_outBuf;
      36              : } astreamer_zstd_frame;
      37              : 
      38              : static void astreamer_zstd_compressor_content(astreamer *streamer,
      39              :                                                                                           astreamer_member *member,
      40              :                                                                                           const char *data, int len,
      41              :                                                                                           astreamer_archive_context context);
      42              : static void astreamer_zstd_compressor_finalize(astreamer *streamer);
      43              : static void astreamer_zstd_compressor_free(astreamer *streamer);
      44              : 
      45              : static const astreamer_ops astreamer_zstd_compressor_ops = {
      46              :         .content = astreamer_zstd_compressor_content,
      47              :         .finalize = astreamer_zstd_compressor_finalize,
      48              :         .free = astreamer_zstd_compressor_free
      49              : };
      50              : 
      51              : static void astreamer_zstd_decompressor_content(astreamer *streamer,
      52              :                                                                                                 astreamer_member *member,
      53              :                                                                                                 const char *data, int len,
      54              :                                                                                                 astreamer_archive_context context);
      55              : static void astreamer_zstd_decompressor_finalize(astreamer *streamer);
      56              : static void astreamer_zstd_decompressor_free(astreamer *streamer);
      57              : 
      58              : static const astreamer_ops astreamer_zstd_decompressor_ops = {
      59              :         .content = astreamer_zstd_decompressor_content,
      60              :         .finalize = astreamer_zstd_decompressor_finalize,
      61              :         .free = astreamer_zstd_decompressor_free
      62              : };
      63              : #endif
      64              : 
      65              : /*
      66              :  * Create a new base backup streamer that performs zstd compression of tar
      67              :  * blocks.
      68              :  */
      69              : astreamer *
      70            0 : astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress)
      71              : {
      72              : #ifdef USE_ZSTD
      73            0 :         astreamer_zstd_frame *streamer;
      74            0 :         size_t          ret;
      75              : 
      76            0 :         Assert(next != NULL);
      77              : 
      78            0 :         streamer = palloc0_object(astreamer_zstd_frame);
      79              : 
      80            0 :         *((const astreamer_ops **) &streamer->base.bbs_ops) =
      81              :                 &astreamer_zstd_compressor_ops;
      82              : 
      83            0 :         streamer->base.bbs_next = next;
      84            0 :         initStringInfo(&streamer->base.bbs_buffer);
      85            0 :         enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
      86              : 
      87            0 :         streamer->cctx = ZSTD_createCCtx();
      88            0 :         if (!streamer->cctx)
      89            0 :                 pg_fatal("could not create zstd compression context");
      90              : 
      91              :         /* Set compression level */
      92            0 :         ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
      93            0 :                                                                  compress->level);
      94            0 :         if (ZSTD_isError(ret))
      95            0 :                 pg_fatal("could not set zstd compression level to %d: %s",
      96              :                                  compress->level, ZSTD_getErrorName(ret));
      97              : 
      98              :         /* Set # of workers, if specified */
      99            0 :         if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
     100              :         {
     101              :                 /*
     102              :                  * On older versions of libzstd, this option does not exist, and
     103              :                  * trying to set it will fail. Similarly for newer versions if they
     104              :                  * are compiled without threading support.
     105              :                  */
     106            0 :                 ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
     107            0 :                                                                          compress->workers);
     108            0 :                 if (ZSTD_isError(ret))
     109            0 :                         pg_fatal("could not set compression worker count to %d: %s",
     110              :                                          compress->workers, ZSTD_getErrorName(ret));
     111            0 :         }
     112              : 
     113            0 :         if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
     114              :         {
     115            0 :                 ret = ZSTD_CCtx_setParameter(streamer->cctx,
     116              :                                                                          ZSTD_c_enableLongDistanceMatching,
     117            0 :                                                                          compress->long_distance);
     118            0 :                 if (ZSTD_isError(ret))
     119              :                 {
     120            0 :                         pg_log_error("could not enable long-distance mode: %s",
     121              :                                                  ZSTD_getErrorName(ret));
     122            0 :                         exit(1);
     123              :                 }
     124            0 :         }
     125              : 
     126              :         /* Initialize the ZSTD output buffer. */
     127            0 :         streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
     128            0 :         streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
     129            0 :         streamer->zstd_outBuf.pos = 0;
     130              : 
     131            0 :         return &streamer->base;
     132              : #else
     133              :         pg_fatal("this build does not support compression with %s", "ZSTD");
     134              :         return NULL;                            /* keep compiler quiet */
     135              : #endif
     136            0 : }
     137              : 
     138              : #ifdef USE_ZSTD
     139              : /*
     140              :  * Compress the input data to output buffer.
     141              :  *
     142              :  * Find out the compression bound based on input data length for each
     143              :  * invocation to make sure that output buffer has enough capacity to
     144              :  * accommodate the compressed data. In case if the output buffer
     145              :  * capacity falls short of compression bound then forward the content
     146              :  * of output buffer to next streamer and empty the buffer.
     147              :  */
     148              : static void
     149            0 : astreamer_zstd_compressor_content(astreamer *streamer,
     150              :                                                                   astreamer_member *member,
     151              :                                                                   const char *data, int len,
     152              :                                                                   astreamer_archive_context context)
     153              : {
     154            0 :         astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     155            0 :         ZSTD_inBuffer inBuf = {data, len, 0};
     156              : 
     157            0 :         while (inBuf.pos < inBuf.size)
     158              :         {
     159            0 :                 size_t          yet_to_flush;
     160            0 :                 size_t          max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
     161              : 
     162              :                 /*
     163              :                  * If the output buffer is not left with enough space, send the
     164              :                  * compressed bytes to the next streamer, and empty the buffer.
     165              :                  */
     166            0 :                 if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
     167            0 :                         max_needed)
     168              :                 {
     169            0 :                         astreamer_content(mystreamer->base.bbs_next, member,
     170            0 :                                                           mystreamer->zstd_outBuf.dst,
     171            0 :                                                           mystreamer->zstd_outBuf.pos,
     172            0 :                                                           context);
     173              : 
     174              :                         /* Reset the ZSTD output buffer. */
     175            0 :                         mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
     176            0 :                         mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
     177            0 :                         mystreamer->zstd_outBuf.pos = 0;
     178            0 :                 }
     179              : 
     180            0 :                 yet_to_flush =
     181            0 :                         ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
     182              :                                                                  &inBuf, ZSTD_e_continue);
     183              : 
     184            0 :                 if (ZSTD_isError(yet_to_flush))
     185            0 :                         pg_log_error("could not compress data: %s",
     186              :                                                  ZSTD_getErrorName(yet_to_flush));
     187            0 :         }
     188            0 : }
     189              : 
     190              : /*
     191              :  * End-of-stream processing.
     192              :  */
     193              : static void
     194            0 : astreamer_zstd_compressor_finalize(astreamer *streamer)
     195              : {
     196            0 :         astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     197            0 :         size_t          yet_to_flush;
     198              : 
     199            0 :         do
     200              :         {
     201            0 :                 ZSTD_inBuffer in = {NULL, 0, 0};
     202            0 :                 size_t          max_needed = ZSTD_compressBound(0);
     203              : 
     204              :                 /*
     205              :                  * If the output buffer is not left with enough space, send the
     206              :                  * compressed bytes to the next streamer, and empty the buffer.
     207              :                  */
     208            0 :                 if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
     209            0 :                         max_needed)
     210              :                 {
     211            0 :                         astreamer_content(mystreamer->base.bbs_next, NULL,
     212            0 :                                                           mystreamer->zstd_outBuf.dst,
     213            0 :                                                           mystreamer->zstd_outBuf.pos,
     214              :                                                           ASTREAMER_UNKNOWN);
     215              : 
     216              :                         /* Reset the ZSTD output buffer. */
     217            0 :                         mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
     218            0 :                         mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
     219            0 :                         mystreamer->zstd_outBuf.pos = 0;
     220            0 :                 }
     221              : 
     222            0 :                 yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
     223            0 :                                                                                         &mystreamer->zstd_outBuf,
     224              :                                                                                         &in, ZSTD_e_end);
     225              : 
     226            0 :                 if (ZSTD_isError(yet_to_flush))
     227            0 :                         pg_log_error("could not compress data: %s",
     228              :                                                  ZSTD_getErrorName(yet_to_flush));
     229              : 
     230            0 :         } while (yet_to_flush > 0);
     231              : 
     232              :         /* Make sure to pass any remaining bytes to the next streamer. */
     233            0 :         if (mystreamer->zstd_outBuf.pos > 0)
     234            0 :                 astreamer_content(mystreamer->base.bbs_next, NULL,
     235            0 :                                                   mystreamer->zstd_outBuf.dst,
     236            0 :                                                   mystreamer->zstd_outBuf.pos,
     237              :                                                   ASTREAMER_UNKNOWN);
     238              : 
     239            0 :         astreamer_finalize(mystreamer->base.bbs_next);
     240            0 : }
     241              : 
     242              : /*
     243              :  * Free memory.
     244              :  */
     245              : static void
     246            0 : astreamer_zstd_compressor_free(astreamer *streamer)
     247              : {
     248            0 :         astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     249              : 
     250            0 :         astreamer_free(streamer->bbs_next);
     251            0 :         ZSTD_freeCCtx(mystreamer->cctx);
     252            0 :         pfree(streamer->bbs_buffer.data);
     253            0 :         pfree(streamer);
     254            0 : }
     255              : #endif
     256              : 
     257              : /*
     258              :  * Create a new base backup streamer that performs decompression of zstd
     259              :  * compressed blocks.
     260              :  */
     261              : astreamer *
     262            0 : astreamer_zstd_decompressor_new(astreamer *next)
     263              : {
     264              : #ifdef USE_ZSTD
     265            0 :         astreamer_zstd_frame *streamer;
     266              : 
     267            0 :         Assert(next != NULL);
     268              : 
     269            0 :         streamer = palloc0_object(astreamer_zstd_frame);
     270            0 :         *((const astreamer_ops **) &streamer->base.bbs_ops) =
     271              :                 &astreamer_zstd_decompressor_ops;
     272              : 
     273            0 :         streamer->base.bbs_next = next;
     274            0 :         initStringInfo(&streamer->base.bbs_buffer);
     275            0 :         enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
     276              : 
     277            0 :         streamer->dctx = ZSTD_createDCtx();
     278            0 :         if (!streamer->dctx)
     279            0 :                 pg_fatal("could not create zstd decompression context");
     280              : 
     281              :         /* Initialize the ZSTD output buffer. */
     282            0 :         streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
     283            0 :         streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
     284            0 :         streamer->zstd_outBuf.pos = 0;
     285              : 
     286            0 :         return &streamer->base;
     287              : #else
     288              :         pg_fatal("this build does not support compression with %s", "ZSTD");
     289              :         return NULL;                            /* keep compiler quiet */
     290              : #endif
     291            0 : }
     292              : 
     293              : #ifdef USE_ZSTD
     294              : /*
     295              :  * Decompress the input data to output buffer until we run out of input
     296              :  * data. Each time the output buffer is full, pass on the decompressed data
     297              :  * to the next streamer.
     298              :  */
     299              : static void
     300            0 : astreamer_zstd_decompressor_content(astreamer *streamer,
     301              :                                                                         astreamer_member *member,
     302              :                                                                         const char *data, int len,
     303              :                                                                         astreamer_archive_context context)
     304              : {
     305            0 :         astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     306            0 :         ZSTD_inBuffer inBuf = {data, len, 0};
     307              : 
     308            0 :         while (inBuf.pos < inBuf.size)
     309              :         {
     310            0 :                 size_t          ret;
     311              : 
     312              :                 /*
     313              :                  * If output buffer is full then forward the content to next streamer
     314              :                  * and update the output buffer.
     315              :                  */
     316            0 :                 if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
     317              :                 {
     318            0 :                         astreamer_content(mystreamer->base.bbs_next, member,
     319            0 :                                                           mystreamer->zstd_outBuf.dst,
     320            0 :                                                           mystreamer->zstd_outBuf.pos,
     321            0 :                                                           context);
     322              : 
     323              :                         /* Reset the ZSTD output buffer. */
     324            0 :                         mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
     325            0 :                         mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
     326            0 :                         mystreamer->zstd_outBuf.pos = 0;
     327            0 :                 }
     328              : 
     329            0 :                 ret = ZSTD_decompressStream(mystreamer->dctx,
     330            0 :                                                                         &mystreamer->zstd_outBuf, &inBuf);
     331              : 
     332            0 :                 if (ZSTD_isError(ret))
     333            0 :                         pg_log_error("could not decompress data: %s",
     334              :                                                  ZSTD_getErrorName(ret));
     335            0 :         }
     336            0 : }
     337              : 
     338              : /*
     339              :  * End-of-stream processing.
     340              :  */
     341              : static void
     342            0 : astreamer_zstd_decompressor_finalize(astreamer *streamer)
     343              : {
     344            0 :         astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     345              : 
     346              :         /*
     347              :          * End of the stream, if there is some pending data in output buffers then
     348              :          * we must forward it to next streamer.
     349              :          */
     350            0 :         if (mystreamer->zstd_outBuf.pos > 0)
     351            0 :                 astreamer_content(mystreamer->base.bbs_next, NULL,
     352            0 :                                                   mystreamer->base.bbs_buffer.data,
     353            0 :                                                   mystreamer->base.bbs_buffer.maxlen,
     354              :                                                   ASTREAMER_UNKNOWN);
     355              : 
     356            0 :         astreamer_finalize(mystreamer->base.bbs_next);
     357            0 : }
     358              : 
     359              : /*
     360              :  * Free memory.
     361              :  */
     362              : static void
     363            0 : astreamer_zstd_decompressor_free(astreamer *streamer)
     364              : {
     365            0 :         astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
     366              : 
     367            0 :         astreamer_free(streamer->bbs_next);
     368            0 :         ZSTD_freeDCtx(mystreamer->dctx);
     369            0 :         pfree(streamer->bbs_buffer.data);
     370            0 :         pfree(streamer);
     371            0 : }
     372              : #endif
        

Generated by: LCOV version 2.3.2-1