LCOV - code coverage report
Current view: top level - src/fe_utils - astreamer_lz4.c (source / functions) Coverage Total Hit
Test: Code coverage Lines: 0.0 % 156 0
Test Date: 2026-01-26 10:56:24 Functions: 0.0 % 8 0
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * astreamer_lz4.c
       4              :  *
       5              :  * Archive streamers that deal with data compressed using lz4.
       6              :  * astreamer_lz4_compressor applies lz4 compression to the input stream,
       7              :  * and astreamer_lz4_decompressor does the reverse.
       8              :  *
       9              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      10              :  *
      11              :  * IDENTIFICATION
      12              :  *                src/fe_utils/astreamer_lz4.c
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : 
      16              : #include "postgres_fe.h"
      17              : 
      18              : #include <unistd.h>
      19              : 
      20              : #ifdef USE_LZ4
      21              : #include <lz4frame.h>
      22              : #endif
      23              : 
      24              : #include "common/logging.h"
      25              : #include "fe_utils/astreamer.h"
      26              : 
      27              : #ifdef USE_LZ4
      28              : typedef struct astreamer_lz4_frame
      29              : {
      30              :         astreamer       base;
      31              : 
      32              :         LZ4F_compressionContext_t cctx;
      33              :         LZ4F_decompressionContext_t dctx;
      34              :         LZ4F_preferences_t prefs;
      35              : 
      36              :         size_t          bytes_written;
      37              :         bool            header_written;
      38              : } astreamer_lz4_frame;
      39              : 
      40              : static void astreamer_lz4_compressor_content(astreamer *streamer,
      41              :                                                                                          astreamer_member *member,
      42              :                                                                                          const char *data, int len,
      43              :                                                                                          astreamer_archive_context context);
      44              : static void astreamer_lz4_compressor_finalize(astreamer *streamer);
      45              : static void astreamer_lz4_compressor_free(astreamer *streamer);
      46              : 
      47              : static const astreamer_ops astreamer_lz4_compressor_ops = {
      48              :         .content = astreamer_lz4_compressor_content,
      49              :         .finalize = astreamer_lz4_compressor_finalize,
      50              :         .free = astreamer_lz4_compressor_free
      51              : };
      52              : 
      53              : static void astreamer_lz4_decompressor_content(astreamer *streamer,
      54              :                                                                                            astreamer_member *member,
      55              :                                                                                            const char *data, int len,
      56              :                                                                                            astreamer_archive_context context);
      57              : static void astreamer_lz4_decompressor_finalize(astreamer *streamer);
      58              : static void astreamer_lz4_decompressor_free(astreamer *streamer);
      59              : 
      60              : static const astreamer_ops astreamer_lz4_decompressor_ops = {
      61              :         .content = astreamer_lz4_decompressor_content,
      62              :         .finalize = astreamer_lz4_decompressor_finalize,
      63              :         .free = astreamer_lz4_decompressor_free
      64              : };
      65              : #endif
      66              : 
      67              : /*
      68              :  * Create a new base backup streamer that performs lz4 compression of tar
      69              :  * blocks.
      70              :  */
      71              : astreamer *
      72            0 : astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress)
      73              : {
      74              : #ifdef USE_LZ4
      75            0 :         astreamer_lz4_frame *streamer;
      76            0 :         LZ4F_errorCode_t ctxError;
      77            0 :         LZ4F_preferences_t *prefs;
      78              : 
      79            0 :         Assert(next != NULL);
      80              : 
      81            0 :         streamer = palloc0_object(astreamer_lz4_frame);
      82            0 :         *((const astreamer_ops **) &streamer->base.bbs_ops) =
      83              :                 &astreamer_lz4_compressor_ops;
      84              : 
      85            0 :         streamer->base.bbs_next = next;
      86            0 :         initStringInfo(&streamer->base.bbs_buffer);
      87            0 :         streamer->header_written = false;
      88              : 
      89              :         /* Initialize stream compression preferences */
      90            0 :         prefs = &streamer->prefs;
      91            0 :         memset(prefs, 0, sizeof(LZ4F_preferences_t));
      92            0 :         prefs->frameInfo.blockSizeID = LZ4F_max256KB;
      93            0 :         prefs->compressionLevel = compress->level;
      94              : 
      95            0 :         ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
      96            0 :         if (LZ4F_isError(ctxError))
      97            0 :                 pg_log_error("could not create lz4 compression context: %s",
      98              :                                          LZ4F_getErrorName(ctxError));
      99              : 
     100            0 :         return &streamer->base;
     101              : #else
     102              :         pg_fatal("this build does not support compression with %s", "LZ4");
     103              :         return NULL;                            /* keep compiler quiet */
     104              : #endif
     105            0 : }
     106              : 
     107              : #ifdef USE_LZ4
     108              : /*
     109              :  * Compress the input data to output buffer.
     110              :  *
     111              :  * Find out the compression bound based on input data length for each
     112              :  * invocation to make sure that output buffer has enough capacity to
     113              :  * accommodate the compressed data. In case if the output buffer
     114              :  * capacity falls short of compression bound then forward the content
     115              :  * of output buffer to next streamer and empty the buffer.
     116              :  */
     117              : static void
     118            0 : astreamer_lz4_compressor_content(astreamer *streamer,
     119              :                                                                  astreamer_member *member,
     120              :                                                                  const char *data, int len,
     121              :                                                                  astreamer_archive_context context)
     122              : {
     123            0 :         astreamer_lz4_frame *mystreamer;
     124            0 :         uint8      *next_in,
     125              :                            *next_out;
     126            0 :         size_t          out_bound,
     127              :                                 compressed_size,
     128              :                                 avail_out;
     129              : 
     130            0 :         mystreamer = (astreamer_lz4_frame *) streamer;
     131            0 :         next_in = (uint8 *) data;
     132              : 
     133              :         /* Write header before processing the first input chunk. */
     134            0 :         if (!mystreamer->header_written)
     135              :         {
     136            0 :                 compressed_size = LZ4F_compressBegin(mystreamer->cctx,
     137            0 :                                                                                          (uint8 *) mystreamer->base.bbs_buffer.data,
     138            0 :                                                                                          mystreamer->base.bbs_buffer.maxlen,
     139            0 :                                                                                          &mystreamer->prefs);
     140              : 
     141            0 :                 if (LZ4F_isError(compressed_size))
     142            0 :                         pg_log_error("could not write lz4 header: %s",
     143              :                                                  LZ4F_getErrorName(compressed_size));
     144              : 
     145            0 :                 mystreamer->bytes_written += compressed_size;
     146            0 :                 mystreamer->header_written = true;
     147            0 :         }
     148              : 
     149              :         /*
     150              :          * Update the offset and capacity of output buffer based on number of
     151              :          * bytes written to output buffer.
     152              :          */
     153            0 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
     154            0 :         avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     155              : 
     156              :         /*
     157              :          * Find out the compression bound and make sure that output buffer has the
     158              :          * required capacity for the success of LZ4F_compressUpdate. If needed
     159              :          * forward the content to next streamer and empty the buffer.
     160              :          */
     161            0 :         out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
     162            0 :         if (avail_out < out_bound)
     163              :         {
     164            0 :                 astreamer_content(mystreamer->base.bbs_next, member,
     165            0 :                                                   mystreamer->base.bbs_buffer.data,
     166            0 :                                                   mystreamer->bytes_written,
     167            0 :                                                   context);
     168              : 
     169              :                 /* Enlarge buffer if it falls short of out bound. */
     170            0 :                 if (mystreamer->base.bbs_buffer.maxlen < out_bound)
     171            0 :                         enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
     172              : 
     173            0 :                 avail_out = mystreamer->base.bbs_buffer.maxlen;
     174            0 :                 mystreamer->bytes_written = 0;
     175            0 :                 next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     176            0 :         }
     177              : 
     178              :         /*
     179              :          * This call compresses the data starting at next_in and generates the
     180              :          * output starting at next_out. It expects the caller to provide the size
     181              :          * of input buffer and capacity of output buffer by providing parameters
     182              :          * len and avail_out.
     183              :          *
     184              :          * It returns the number of bytes compressed to output buffer.
     185              :          */
     186            0 :         compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
     187            0 :                                                                                   next_out, avail_out,
     188            0 :                                                                                   next_in, len, NULL);
     189              : 
     190            0 :         if (LZ4F_isError(compressed_size))
     191            0 :                 pg_log_error("could not compress data: %s",
     192              :                                          LZ4F_getErrorName(compressed_size));
     193              : 
     194            0 :         mystreamer->bytes_written += compressed_size;
     195            0 : }
     196              : 
     197              : /*
     198              :  * End-of-stream processing.
     199              :  */
     200              : static void
     201            0 : astreamer_lz4_compressor_finalize(astreamer *streamer)
     202              : {
     203            0 :         astreamer_lz4_frame *mystreamer;
     204            0 :         uint8      *next_out;
     205            0 :         size_t          footer_bound,
     206              :                                 compressed_size,
     207              :                                 avail_out;
     208              : 
     209            0 :         mystreamer = (astreamer_lz4_frame *) streamer;
     210              : 
     211              :         /* Find out the footer bound and update the output buffer. */
     212            0 :         footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
     213            0 :         if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
     214            0 :                 footer_bound)
     215              :         {
     216            0 :                 astreamer_content(mystreamer->base.bbs_next, NULL,
     217            0 :                                                   mystreamer->base.bbs_buffer.data,
     218            0 :                                                   mystreamer->bytes_written,
     219              :                                                   ASTREAMER_UNKNOWN);
     220              : 
     221              :                 /* Enlarge buffer if it falls short of footer bound. */
     222            0 :                 if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
     223            0 :                         enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
     224              : 
     225            0 :                 avail_out = mystreamer->base.bbs_buffer.maxlen;
     226            0 :                 mystreamer->bytes_written = 0;
     227            0 :                 next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     228            0 :         }
     229              :         else
     230              :         {
     231            0 :                 next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
     232            0 :                 avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     233              :         }
     234              : 
     235              :         /*
     236              :          * Finalize the frame and flush whatever data remaining in compression
     237              :          * context.
     238              :          */
     239            0 :         compressed_size = LZ4F_compressEnd(mystreamer->cctx,
     240            0 :                                                                            next_out, avail_out, NULL);
     241              : 
     242            0 :         if (LZ4F_isError(compressed_size))
     243            0 :                 pg_log_error("could not end lz4 compression: %s",
     244              :                                          LZ4F_getErrorName(compressed_size));
     245              : 
     246            0 :         mystreamer->bytes_written += compressed_size;
     247              : 
     248            0 :         astreamer_content(mystreamer->base.bbs_next, NULL,
     249            0 :                                           mystreamer->base.bbs_buffer.data,
     250            0 :                                           mystreamer->bytes_written,
     251              :                                           ASTREAMER_UNKNOWN);
     252              : 
     253            0 :         astreamer_finalize(mystreamer->base.bbs_next);
     254            0 : }
     255              : 
     256              : /*
     257              :  * Free memory.
     258              :  */
     259              : static void
     260            0 : astreamer_lz4_compressor_free(astreamer *streamer)
     261              : {
     262            0 :         astreamer_lz4_frame *mystreamer;
     263              : 
     264            0 :         mystreamer = (astreamer_lz4_frame *) streamer;
     265            0 :         astreamer_free(streamer->bbs_next);
     266            0 :         LZ4F_freeCompressionContext(mystreamer->cctx);
     267            0 :         pfree(streamer->bbs_buffer.data);
     268            0 :         pfree(streamer);
     269            0 : }
     270              : #endif
     271              : 
     272              : /*
     273              :  * Create a new base backup streamer that performs decompression of lz4
     274              :  * compressed blocks.
     275              :  */
     276              : astreamer *
     277            0 : astreamer_lz4_decompressor_new(astreamer *next)
     278              : {
     279              : #ifdef USE_LZ4
     280            0 :         astreamer_lz4_frame *streamer;
     281            0 :         LZ4F_errorCode_t ctxError;
     282              : 
     283            0 :         Assert(next != NULL);
     284              : 
     285            0 :         streamer = palloc0_object(astreamer_lz4_frame);
     286            0 :         *((const astreamer_ops **) &streamer->base.bbs_ops) =
     287              :                 &astreamer_lz4_decompressor_ops;
     288              : 
     289            0 :         streamer->base.bbs_next = next;
     290            0 :         initStringInfo(&streamer->base.bbs_buffer);
     291              : 
     292              :         /* Initialize internal stream state for decompression */
     293            0 :         ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
     294            0 :         if (LZ4F_isError(ctxError))
     295            0 :                 pg_fatal("could not initialize compression library: %s",
     296              :                                  LZ4F_getErrorName(ctxError));
     297              : 
     298            0 :         return &streamer->base;
     299              : #else
     300              :         pg_fatal("this build does not support compression with %s", "LZ4");
     301              :         return NULL;                            /* keep compiler quiet */
     302              : #endif
     303            0 : }
     304              : 
     305              : #ifdef USE_LZ4
     306              : /*
     307              :  * Decompress the input data to output buffer until we run out of input
     308              :  * data. Each time the output buffer is full, pass on the decompressed data
     309              :  * to the next streamer.
     310              :  */
     311              : static void
     312            0 : astreamer_lz4_decompressor_content(astreamer *streamer,
     313              :                                                                    astreamer_member *member,
     314              :                                                                    const char *data, int len,
     315              :                                                                    astreamer_archive_context context)
     316              : {
     317            0 :         astreamer_lz4_frame *mystreamer;
     318            0 :         uint8      *next_in,
     319              :                            *next_out;
     320            0 :         size_t          avail_in,
     321              :                                 avail_out;
     322              : 
     323            0 :         mystreamer = (astreamer_lz4_frame *) streamer;
     324            0 :         next_in = (uint8 *) data;
     325            0 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
     326            0 :         avail_in = len;
     327            0 :         avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     328              : 
     329            0 :         while (avail_in > 0)
     330              :         {
     331            0 :                 size_t          ret,
     332              :                                         read_size,
     333              :                                         out_size;
     334              : 
     335            0 :                 read_size = avail_in;
     336            0 :                 out_size = avail_out;
     337              : 
     338              :                 /*
     339              :                  * This call decompresses the data starting at next_in and generates
     340              :                  * the output data starting at next_out. It expects the caller to
     341              :                  * provide size of the input buffer and total capacity of the output
     342              :                  * buffer by providing the read_size and out_size parameters
     343              :                  * respectively.
     344              :                  *
     345              :                  * Per the documentation of LZ4, parameters read_size and out_size
     346              :                  * behaves as dual parameters. On return, the number of bytes consumed
     347              :                  * from the input buffer will be written back to read_size and the
     348              :                  * number of bytes decompressed to output buffer will be written back
     349              :                  * to out_size respectively.
     350              :                  */
     351            0 :                 ret = LZ4F_decompress(mystreamer->dctx,
     352            0 :                                                           next_out, &out_size,
     353            0 :                                                           next_in, &read_size, NULL);
     354              : 
     355            0 :                 if (LZ4F_isError(ret))
     356            0 :                         pg_log_error("could not decompress data: %s",
     357              :                                                  LZ4F_getErrorName(ret));
     358              : 
     359              :                 /* Update input buffer based on number of bytes consumed */
     360            0 :                 avail_in -= read_size;
     361            0 :                 next_in += read_size;
     362              : 
     363            0 :                 mystreamer->bytes_written += out_size;
     364              : 
     365              :                 /*
     366              :                  * If output buffer is full then forward the content to next streamer
     367              :                  * and update the output buffer.
     368              :                  */
     369            0 :                 if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
     370              :                 {
     371            0 :                         astreamer_content(mystreamer->base.bbs_next, member,
     372            0 :                                                           mystreamer->base.bbs_buffer.data,
     373            0 :                                                           mystreamer->base.bbs_buffer.maxlen,
     374            0 :                                                           context);
     375              : 
     376            0 :                         avail_out = mystreamer->base.bbs_buffer.maxlen;
     377            0 :                         mystreamer->bytes_written = 0;
     378            0 :                         next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     379            0 :                 }
     380              :                 else
     381              :                 {
     382            0 :                         avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     383            0 :                         next_out += mystreamer->bytes_written;
     384              :                 }
     385            0 :         }
     386            0 : }
     387              : 
     388              : /*
     389              :  * End-of-stream processing.
     390              :  */
     391              : static void
     392            0 : astreamer_lz4_decompressor_finalize(astreamer *streamer)
     393              : {
     394            0 :         astreamer_lz4_frame *mystreamer;
     395              : 
     396            0 :         mystreamer = (astreamer_lz4_frame *) streamer;
     397              : 
     398              :         /*
     399              :          * End of the stream, if there is some pending data in output buffers then
     400              :          * we must forward it to next streamer.
     401              :          */
     402            0 :         astreamer_content(mystreamer->base.bbs_next, NULL,
     403            0 :                                           mystreamer->base.bbs_buffer.data,
     404            0 :                                           mystreamer->base.bbs_buffer.maxlen,
     405              :                                           ASTREAMER_UNKNOWN);
     406              : 
     407            0 :         astreamer_finalize(mystreamer->base.bbs_next);
     408            0 : }
     409              : 
     410              : /*
     411              :  * Free memory.
     412              :  */
     413              : static void
     414            0 : astreamer_lz4_decompressor_free(astreamer *streamer)
     415              : {
     416            0 :         astreamer_lz4_frame *mystreamer;
     417              : 
     418            0 :         mystreamer = (astreamer_lz4_frame *) streamer;
     419            0 :         astreamer_free(streamer->bbs_next);
     420            0 :         LZ4F_freeDecompressionContext(mystreamer->dctx);
     421            0 :         pfree(streamer->bbs_buffer.data);
     422            0 :         pfree(streamer);
     423            0 : }
     424              : #endif
        

Generated by: LCOV version 2.3.2-1