LCOV - code coverage report
Current view: top level - src/fe_utils - astreamer_gzip.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 119 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 11 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * astreamer_gzip.c
       4              :  *
       5              :  * Archive streamers that deal with data compressed using gzip.
       6              :  * astreamer_gzip_writer applies gzip compression to the input data
       7              :  * and writes the result to a file. astreamer_gzip_decompressor assumes
       8              :  * that the input stream is compressed using gzip and decompresses it.
       9              :  *
      10              :  * Note that the code in this file is asymmetric with what we do for
      11              :  * other compression types: for lz4 and zstd, there is a compressor and
      12              :  * a decompressor, rather than a writer and a decompressor. The approach
      13              :  * taken here is less flexible, because a writer can only write to a file,
      14              :  * while a compressor can write to a subsequent astreamer which is free
      15              :  * to do whatever it likes. The reason it's like this is because this
      16              :  * code was adapted from old, less-modular pg_basebackup code that used
      17              :  * the same APIs that astreamer_gzip_writer now uses, and it didn't seem
      18              :  * necessary to change anything at the time.
      19              :  *
      20              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      21              :  *
      22              :  * IDENTIFICATION
      23              :  *                src/fe_utils/astreamer_gzip.c
      24              :  *-------------------------------------------------------------------------
      25              :  */
      26              : 
      27              : #include "postgres_fe.h"
      28              : 
      29              : #include <unistd.h>
      30              : 
      31              : #ifdef HAVE_LIBZ
      32              : #include <zlib.h>
      33              : #endif
      34              : 
      35              : #include "common/logging.h"
      36              : #include "fe_utils/astreamer.h"
      37              : 
      38              : #ifdef HAVE_LIBZ
      39              : typedef struct astreamer_gzip_writer
      40              : {
      41              :         astreamer       base;
      42              :         char       *pathname;
      43              :         gzFile          gzfile;
      44              : } astreamer_gzip_writer;
      45              : 
      46              : typedef struct astreamer_gzip_decompressor
      47              : {
      48              :         astreamer       base;
      49              :         z_stream        zstream;
      50              :         size_t          bytes_written;
      51              : } astreamer_gzip_decompressor;
      52              : 
      53              : static void astreamer_gzip_writer_content(astreamer *streamer,
      54              :                                                                                   astreamer_member *member,
      55              :                                                                                   const char *data, int len,
      56              :                                                                                   astreamer_archive_context context);
      57              : static void astreamer_gzip_writer_finalize(astreamer *streamer);
      58              : static void astreamer_gzip_writer_free(astreamer *streamer);
      59              : static const char *get_gz_error(gzFile gzf);
      60              : 
      61              : static const astreamer_ops astreamer_gzip_writer_ops = {
      62              :         .content = astreamer_gzip_writer_content,
      63              :         .finalize = astreamer_gzip_writer_finalize,
      64              :         .free = astreamer_gzip_writer_free
      65              : };
      66              : 
      67              : static void astreamer_gzip_decompressor_content(astreamer *streamer,
      68              :                                                                                                 astreamer_member *member,
      69              :                                                                                                 const char *data, int len,
      70              :                                                                                                 astreamer_archive_context context);
      71              : static void astreamer_gzip_decompressor_finalize(astreamer *streamer);
      72              : static void astreamer_gzip_decompressor_free(astreamer *streamer);
      73              : static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
      74              : static void gzip_pfree(void *opaque, void *address);
      75              : 
      76              : static const astreamer_ops astreamer_gzip_decompressor_ops = {
      77              :         .content = astreamer_gzip_decompressor_content,
      78              :         .finalize = astreamer_gzip_decompressor_finalize,
      79              :         .free = astreamer_gzip_decompressor_free
      80              : };
      81              : #endif
      82              : 
      83              : /*
      84              :  * Create a astreamer that just compresses data using gzip, and then writes
      85              :  * it to a file.
      86              :  *
      87              :  * The caller must specify a pathname and may specify a file. The pathname is
      88              :  * used for error-reporting purposes either way. If file is NULL, the pathname
      89              :  * also identifies the file to which the data should be written: it is opened
      90              :  * for writing and closed when done. If file is not NULL, the data is written
      91              :  * there.
      92              :  *
      93              :  * Note that zlib does not use the FILE interface, but operates directly on
      94              :  * a duplicate of the underlying fd. Hence, callers must take care if they
      95              :  * plan to write any other data to the same FILE, either before or after using
      96              :  * this.
      97              :  */
      98              : astreamer *
      99            0 : astreamer_gzip_writer_new(char *pathname, FILE *file,
     100              :                                                   pg_compress_specification *compress)
     101              : {
     102              : #ifdef HAVE_LIBZ
     103            0 :         astreamer_gzip_writer *streamer;
     104              : 
     105            0 :         streamer = palloc0_object(astreamer_gzip_writer);
     106            0 :         *((const astreamer_ops **) &streamer->base.bbs_ops) =
     107              :                 &astreamer_gzip_writer_ops;
     108              : 
     109            0 :         streamer->pathname = pstrdup(pathname);
     110              : 
     111            0 :         if (file == NULL)
     112              :         {
     113            0 :                 streamer->gzfile = gzopen(pathname, "wb");
     114            0 :                 if (streamer->gzfile == NULL)
     115            0 :                         pg_fatal("could not create compressed file \"%s\": %m",
     116              :                                          pathname);
     117            0 :         }
     118              :         else
     119              :         {
     120              :                 /*
     121              :                  * We must dup the file handle so that gzclose doesn't break the
     122              :                  * caller's FILE.  See comment for astreamer_gzip_writer_finalize.
     123              :                  */
     124            0 :                 int                     fd = dup(fileno(file));
     125              : 
     126            0 :                 if (fd < 0)
     127            0 :                         pg_fatal("could not duplicate stdout: %m");
     128              : 
     129            0 :                 streamer->gzfile = gzdopen(fd, "wb");
     130            0 :                 if (streamer->gzfile == NULL)
     131            0 :                         pg_fatal("could not open output file: %m");
     132            0 :         }
     133              : 
     134            0 :         if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK)
     135            0 :                 pg_fatal("could not set compression level %d: %s",
     136              :                                  compress->level, get_gz_error(streamer->gzfile));
     137              : 
     138            0 :         return &streamer->base;
     139              : #else
     140              :         pg_fatal("this build does not support compression with %s", "gzip");
     141              :         return NULL;                            /* keep compiler quiet */
     142              : #endif
     143            0 : }
     144              : 
     145              : #ifdef HAVE_LIBZ
     146              : /*
     147              :  * Write archive content to gzip file.
     148              :  */
     149              : static void
     150            0 : astreamer_gzip_writer_content(astreamer *streamer,
     151              :                                                           astreamer_member *member, const char *data,
     152              :                                                           int len, astreamer_archive_context context)
     153              : {
     154            0 :         astreamer_gzip_writer *mystreamer;
     155              : 
     156            0 :         mystreamer = (astreamer_gzip_writer *) streamer;
     157              : 
     158            0 :         if (len == 0)
     159            0 :                 return;
     160              : 
     161            0 :         errno = 0;
     162            0 :         if (gzwrite(mystreamer->gzfile, data, len) != len)
     163              :         {
     164              :                 /* if write didn't set errno, assume problem is no disk space */
     165            0 :                 if (errno == 0)
     166            0 :                         errno = ENOSPC;
     167            0 :                 pg_fatal("could not write to compressed file \"%s\": %s",
     168              :                                  mystreamer->pathname, get_gz_error(mystreamer->gzfile));
     169            0 :         }
     170            0 : }
     171              : 
     172              : /*
     173              :  * End-of-archive processing when writing to a gzip file consists of just
     174              :  * calling gzclose.
     175              :  *
     176              :  * It makes no difference whether we opened the file or the caller did it,
     177              :  * because libz provides no way of avoiding a close on the underlying file
     178              :  * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to
     179              :  * work around this issue, so that the behavior from the caller's viewpoint
     180              :  * is the same as for astreamer_plain_writer.
     181              :  */
     182              : static void
     183            0 : astreamer_gzip_writer_finalize(astreamer *streamer)
     184              : {
     185            0 :         astreamer_gzip_writer *mystreamer;
     186              : 
     187            0 :         mystreamer = (astreamer_gzip_writer *) streamer;
     188              : 
     189            0 :         errno = 0;                                      /* in case gzclose() doesn't set it */
     190            0 :         if (gzclose(mystreamer->gzfile) != 0)
     191            0 :                 pg_fatal("could not close compressed file \"%s\": %m",
     192              :                                  mystreamer->pathname);
     193              : 
     194            0 :         mystreamer->gzfile = NULL;
     195            0 : }
     196              : 
     197              : /*
     198              :  * Free memory associated with this astreamer.
     199              :  */
     200              : static void
     201            0 : astreamer_gzip_writer_free(astreamer *streamer)
     202              : {
     203            0 :         astreamer_gzip_writer *mystreamer;
     204              : 
     205            0 :         mystreamer = (astreamer_gzip_writer *) streamer;
     206              : 
     207            0 :         Assert(mystreamer->base.bbs_next == NULL);
     208            0 :         Assert(mystreamer->gzfile == NULL);
     209              : 
     210            0 :         pfree(mystreamer->pathname);
     211            0 :         pfree(mystreamer);
     212            0 : }
     213              : 
     214              : /*
     215              :  * Helper function for libz error reporting.
     216              :  */
     217              : static const char *
     218            0 : get_gz_error(gzFile gzf)
     219              : {
     220            0 :         int                     errnum;
     221            0 :         const char *errmsg;
     222              : 
     223            0 :         errmsg = gzerror(gzf, &errnum);
     224            0 :         if (errnum == Z_ERRNO)
     225            0 :                 return strerror(errno);
     226              :         else
     227            0 :                 return errmsg;
     228            0 : }
     229              : #endif
     230              : 
     231              : /*
     232              :  * Create a new base backup streamer that performs decompression of gzip
     233              :  * compressed blocks.
     234              :  */
     235              : astreamer *
     236            0 : astreamer_gzip_decompressor_new(astreamer *next)
     237              : {
     238              : #ifdef HAVE_LIBZ
     239            0 :         astreamer_gzip_decompressor *streamer;
     240            0 :         z_stream   *zs;
     241              : 
     242            0 :         Assert(next != NULL);
     243              : 
     244            0 :         streamer = palloc0_object(astreamer_gzip_decompressor);
     245            0 :         *((const astreamer_ops **) &streamer->base.bbs_ops) =
     246              :                 &astreamer_gzip_decompressor_ops;
     247              : 
     248            0 :         streamer->base.bbs_next = next;
     249            0 :         initStringInfo(&streamer->base.bbs_buffer);
     250              : 
     251              :         /* Initialize internal stream state for decompression */
     252            0 :         zs = &streamer->zstream;
     253            0 :         zs->zalloc = gzip_palloc;
     254            0 :         zs->zfree = gzip_pfree;
     255            0 :         zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
     256            0 :         zs->avail_out = streamer->base.bbs_buffer.maxlen;
     257              : 
     258              :         /*
     259              :          * Data compression was initialized using deflateInit2 to request a gzip
     260              :          * header. Similarly, we are using inflateInit2 to initialize data
     261              :          * decompression.
     262              :          *
     263              :          * Per the documentation for inflateInit2, the second argument is
     264              :          * "windowBits" and its value must be greater than or equal to the value
     265              :          * provided while compressing the data, so we are using the maximum
     266              :          * possible value for safety.
     267              :          */
     268            0 :         if (inflateInit2(zs, 15 + 16) != Z_OK)
     269            0 :                 pg_fatal("could not initialize compression library");
     270              : 
     271            0 :         return &streamer->base;
     272              : #else
     273              :         pg_fatal("this build does not support compression with %s", "gzip");
     274              :         return NULL;                            /* keep compiler quiet */
     275              : #endif
     276            0 : }
     277              : 
     278              : #ifdef HAVE_LIBZ
     279              : /*
     280              :  * Decompress the input data to output buffer until we run out of input
     281              :  * data. Each time the output buffer is full, pass on the decompressed data
     282              :  * to the next streamer.
     283              :  */
     284              : static void
     285            0 : astreamer_gzip_decompressor_content(astreamer *streamer,
     286              :                                                                         astreamer_member *member,
     287              :                                                                         const char *data, int len,
     288              :                                                                         astreamer_archive_context context)
     289              : {
     290            0 :         astreamer_gzip_decompressor *mystreamer;
     291            0 :         z_stream   *zs;
     292              : 
     293            0 :         mystreamer = (astreamer_gzip_decompressor *) streamer;
     294              : 
     295            0 :         zs = &mystreamer->zstream;
     296            0 :         zs->next_in = (const uint8 *) data;
     297            0 :         zs->avail_in = len;
     298              : 
     299              :         /* Process the current chunk */
     300            0 :         while (zs->avail_in > 0)
     301              :         {
     302            0 :                 int                     res;
     303              : 
     304            0 :                 Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
     305              : 
     306            0 :                 zs->next_out = (uint8 *)
     307            0 :                         mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
     308            0 :                 zs->avail_out =
     309            0 :                         mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     310              : 
     311              :                 /*
     312              :                  * This call decompresses data starting at zs->next_in and updates
     313              :                  * zs->next_in * and zs->avail_in. It generates output data starting
     314              :                  * at zs->next_out and updates zs->next_out and zs->avail_out
     315              :                  * accordingly.
     316              :                  */
     317            0 :                 res = inflate(zs, Z_NO_FLUSH);
     318              : 
     319            0 :                 if (res == Z_STREAM_ERROR)
     320            0 :                         pg_log_error("could not decompress data: %s", zs->msg);
     321              : 
     322            0 :                 mystreamer->bytes_written =
     323            0 :                         mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
     324              : 
     325              :                 /* If output buffer is full then pass data to next streamer */
     326            0 :                 if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
     327              :                 {
     328            0 :                         astreamer_content(mystreamer->base.bbs_next, member,
     329            0 :                                                           mystreamer->base.bbs_buffer.data,
     330            0 :                                                           mystreamer->base.bbs_buffer.maxlen, context);
     331            0 :                         mystreamer->bytes_written = 0;
     332            0 :                 }
     333            0 :         }
     334            0 : }
     335              : 
     336              : /*
     337              :  * End-of-stream processing.
     338              :  */
     339              : static void
     340            0 : astreamer_gzip_decompressor_finalize(astreamer *streamer)
     341              : {
     342            0 :         astreamer_gzip_decompressor *mystreamer;
     343              : 
     344            0 :         mystreamer = (astreamer_gzip_decompressor *) streamer;
     345              : 
     346              :         /*
     347              :          * End of the stream, if there is some pending data in output buffers then
     348              :          * we must forward it to next streamer.
     349              :          */
     350            0 :         astreamer_content(mystreamer->base.bbs_next, NULL,
     351            0 :                                           mystreamer->base.bbs_buffer.data,
     352            0 :                                           mystreamer->base.bbs_buffer.maxlen,
     353              :                                           ASTREAMER_UNKNOWN);
     354              : 
     355            0 :         astreamer_finalize(mystreamer->base.bbs_next);
     356            0 : }
     357              : 
     358              : /*
     359              :  * Free memory.
     360              :  */
     361              : static void
     362            0 : astreamer_gzip_decompressor_free(astreamer *streamer)
     363              : {
     364            0 :         astreamer_free(streamer->bbs_next);
     365            0 :         pfree(streamer->bbs_buffer.data);
     366            0 :         pfree(streamer);
     367            0 : }
     368              : 
     369              : /*
     370              :  * Wrapper function to adjust the signature of palloc to match what libz
     371              :  * expects.
     372              :  */
     373              : static void *
     374            0 : gzip_palloc(void *opaque, unsigned items, unsigned size)
     375              : {
     376            0 :         return palloc(items * size);
     377              : }
     378              : 
     379              : /*
     380              :  * Wrapper function to adjust the signature of pfree to match what libz
     381              :  * expects.
     382              :  */
     383              : static void
     384            0 : gzip_pfree(void *opaque, void *address)
     385              : {
     386            0 :         pfree(address);
     387            0 : }
     388              : #endif
        

Generated by: LCOV version 2.3.2-1