Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * compress_zstd.c
4 : * Routines for archivers to write a Zstd compressed data stream.
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/bin/pg_dump/compress_zstd.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres_fe.h"
16 : #include <unistd.h>
17 :
18 : #include "compress_zstd.h"
19 : #include "pg_backup_utils.h"
20 :
21 : #ifndef USE_ZSTD
22 :
23 : void
24 : InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
25 : {
26 : pg_fatal("this build does not support compression with %s", "ZSTD");
27 : }
28 :
29 : void
30 : InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
31 : {
32 : pg_fatal("this build does not support compression with %s", "ZSTD");
33 : }
34 :
35 : #else
36 :
37 : #include <zstd.h>
38 :
39 : typedef struct ZstdCompressorState
40 : {
41 : /* This is a normal file to which we read/write compressed data */
42 : FILE *fp;
43 :
44 : ZSTD_CStream *cstream;
45 : ZSTD_DStream *dstream;
46 : ZSTD_outBuffer output;
47 : ZSTD_inBuffer input;
48 :
49 : /* pointer to a static string like from strerror(), for Zstd_write() */
50 : const char *zstderror;
51 : } ZstdCompressorState;
52 :
53 : static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress);
54 : static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
55 : static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
56 : const void *data, size_t dLen);
57 : static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
58 :
59 : static void
60 0 : _Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
61 : ZSTD_cParameter param, int value, char *paramname)
62 : {
63 0 : size_t res;
64 :
65 0 : res = ZSTD_CCtx_setParameter(cstream, param, value);
66 0 : if (ZSTD_isError(res))
67 0 : pg_fatal("could not set compression parameter \"%s\": %s",
68 : paramname, ZSTD_getErrorName(res));
69 0 : }
70 :
71 : /* Return a compression stream with parameters set per argument */
72 : static ZSTD_CStream *
73 0 : _ZstdCStreamParams(pg_compress_specification compress)
74 : {
75 0 : ZSTD_CStream *cstream;
76 :
77 0 : cstream = ZSTD_createCStream();
78 0 : if (cstream == NULL)
79 0 : pg_fatal("could not initialize compression library");
80 :
81 0 : _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
82 0 : compress.level, "level");
83 :
84 0 : if (compress.options & PG_COMPRESSION_OPTION_LONG_DISTANCE)
85 0 : _Zstd_CCtx_setParam_or_die(cstream,
86 : ZSTD_c_enableLongDistanceMatching,
87 0 : compress.long_distance, "long");
88 :
89 0 : return cstream;
90 0 : }
91 :
92 : /* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
93 : static void
94 0 : _ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
95 : {
96 0 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
97 0 : ZSTD_inBuffer *input = &zstdcs->input;
98 0 : ZSTD_outBuffer *output = &zstdcs->output;
99 :
100 : /* Loop while there's any input or until flushed */
101 0 : while (input->pos < input->size || flush)
102 : {
103 0 : size_t res;
104 :
105 0 : res = ZSTD_compressStream2(zstdcs->cstream, output,
106 0 : input, flush ? ZSTD_e_end : ZSTD_e_continue);
107 :
108 0 : if (ZSTD_isError(res))
109 0 : pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
110 :
111 : /* Dump output buffer if full, or if we're told to flush */
112 0 : if (output->pos >= output->size || flush)
113 : {
114 0 : cs->writeF(AH, output->dst, output->pos);
115 0 : output->pos = 0;
116 0 : }
117 :
118 0 : if (res == 0)
119 0 : break; /* End of frame or all input consumed */
120 0 : }
121 0 : }
122 :
123 : static void
124 0 : EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
125 : {
126 0 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
127 :
128 0 : if (cs->readF != NULL)
129 : {
130 0 : Assert(zstdcs->cstream == NULL);
131 0 : ZSTD_freeDStream(zstdcs->dstream);
132 0 : pg_free(unconstify(void *, zstdcs->input.src));
133 0 : }
134 0 : else if (cs->writeF != NULL)
135 : {
136 0 : Assert(zstdcs->dstream == NULL);
137 0 : _ZstdWriteCommon(AH, cs, true);
138 0 : ZSTD_freeCStream(zstdcs->cstream);
139 0 : }
140 :
141 : /* output buffer may be allocated in either mode */
142 0 : pg_free(zstdcs->output.dst);
143 0 : pg_free(zstdcs);
144 0 : cs->private_data = NULL;
145 0 : }
146 :
147 : static void
148 0 : WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
149 : const void *data, size_t dLen)
150 : {
151 0 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
152 :
153 0 : zstdcs->input.src = data;
154 0 : zstdcs->input.size = dLen;
155 0 : zstdcs->input.pos = 0;
156 :
157 0 : _ZstdWriteCommon(AH, cs, false);
158 0 : }
159 :
160 : static void
161 0 : ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
162 : {
163 0 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
164 0 : ZSTD_outBuffer *output = &zstdcs->output;
165 0 : ZSTD_inBuffer *input = &zstdcs->input;
166 0 : size_t input_allocated_size = ZSTD_DStreamInSize();
167 0 : size_t res;
168 :
169 0 : for (;;)
170 : {
171 0 : size_t cnt;
172 :
173 : /*
174 : * Read compressed data. Note that readF can resize the buffer; the
175 : * new size is tracked and used for future loops.
176 : */
177 0 : input->size = input_allocated_size;
178 0 : cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
179 :
180 : /* ensure that readF didn't *shrink* the buffer */
181 0 : Assert(input->size >= input_allocated_size);
182 0 : input_allocated_size = input->size;
183 0 : input->size = cnt;
184 0 : input->pos = 0;
185 :
186 0 : if (cnt == 0)
187 0 : break;
188 :
189 : /* Now decompress */
190 0 : while (input->pos < input->size)
191 : {
192 0 : output->pos = 0;
193 0 : res = ZSTD_decompressStream(zstdcs->dstream, output, input);
194 0 : if (ZSTD_isError(res))
195 0 : pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
196 :
197 : /*
198 : * then write the decompressed data to the output handle
199 : */
200 0 : ((char *) output->dst)[output->pos] = '\0';
201 0 : ahwrite(output->dst, 1, output->pos, AH);
202 :
203 0 : if (res == 0)
204 0 : break; /* End of frame */
205 : }
206 0 : }
207 0 : }
208 :
209 : /* Public routine that supports Zstd compressed data I/O */
210 : void
211 0 : InitCompressorZstd(CompressorState *cs,
212 : const pg_compress_specification compression_spec)
213 : {
214 0 : ZstdCompressorState *zstdcs;
215 :
216 0 : cs->readData = ReadDataFromArchiveZstd;
217 0 : cs->writeData = WriteDataToArchiveZstd;
218 0 : cs->end = EndCompressorZstd;
219 :
220 0 : cs->compression_spec = compression_spec;
221 :
222 0 : zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
223 0 : cs->private_data = zstdcs;
224 :
225 : /* We expect that exactly one of readF/writeF is specified */
226 0 : Assert((cs->readF == NULL) != (cs->writeF == NULL));
227 :
228 0 : if (cs->readF != NULL)
229 : {
230 0 : zstdcs->dstream = ZSTD_createDStream();
231 0 : if (zstdcs->dstream == NULL)
232 0 : pg_fatal("could not initialize compression library");
233 :
234 0 : zstdcs->input.size = ZSTD_DStreamInSize();
235 0 : zstdcs->input.src = pg_malloc(zstdcs->input.size);
236 :
237 : /*
238 : * output.size is the buffer size we tell zstd it can output to.
239 : * Allocate an additional byte such that ReadDataFromArchiveZstd() can
240 : * call ahwrite() with a null-terminated string, which is an optimized
241 : * case in ExecuteSqlCommandBuf().
242 : */
243 0 : zstdcs->output.size = ZSTD_DStreamOutSize();
244 0 : zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
245 0 : }
246 0 : else if (cs->writeF != NULL)
247 : {
248 0 : zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec);
249 :
250 0 : zstdcs->output.size = ZSTD_CStreamOutSize();
251 0 : zstdcs->output.dst = pg_malloc(zstdcs->output.size);
252 0 : zstdcs->output.pos = 0;
253 0 : }
254 0 : }
255 :
256 : /*
257 : * Compressed stream API
258 : */
259 :
260 : static size_t
261 0 : Zstd_read_internal(void *ptr, size_t size, CompressFileHandle *CFH, bool exit_on_error)
262 : {
263 0 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
264 0 : ZSTD_inBuffer *input = &zstdcs->input;
265 0 : ZSTD_outBuffer *output = &zstdcs->output;
266 0 : size_t input_allocated_size = ZSTD_DStreamInSize();
267 0 : size_t res,
268 : cnt;
269 :
270 : /*
271 : * If this is the first call to the reading function, initialize the
272 : * required datastructures.
273 : */
274 0 : if (zstdcs->dstream == NULL)
275 : {
276 0 : zstdcs->input.src = pg_malloc0(input_allocated_size);
277 0 : zstdcs->dstream = ZSTD_createDStream();
278 0 : if (zstdcs->dstream == NULL)
279 : {
280 0 : if (exit_on_error)
281 0 : pg_fatal("could not initialize compression library");
282 0 : return -1;
283 : }
284 0 : }
285 :
286 0 : output->size = size;
287 0 : output->dst = ptr;
288 0 : output->pos = 0;
289 :
290 0 : while (output->pos < output->size)
291 : {
292 0 : Assert(input->pos <= input->size);
293 0 : Assert(input->size <= input_allocated_size);
294 :
295 : /*
296 : * If the input is completely consumed, start back at the beginning
297 : */
298 0 : if (input->pos == input->size)
299 : {
300 : /* input->size is size produced by "fread" */
301 0 : input->size = 0;
302 : /* input->pos is position consumed by decompress */
303 0 : input->pos = 0;
304 0 : }
305 :
306 : /* read compressed data if we must produce more input */
307 0 : if (input->pos == input->size)
308 : {
309 0 : cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
310 0 : if (ferror(zstdcs->fp))
311 : {
312 0 : if (exit_on_error)
313 0 : pg_fatal("could not read from input file: %m");
314 0 : return -1;
315 : }
316 :
317 0 : input->size = cnt;
318 :
319 0 : Assert(cnt <= input_allocated_size);
320 :
321 : /* If we have no more input to consume, we're done */
322 0 : if (cnt == 0)
323 0 : break;
324 0 : }
325 :
326 0 : while (input->pos < input->size)
327 : {
328 : /* now decompress */
329 0 : res = ZSTD_decompressStream(zstdcs->dstream, output, input);
330 :
331 0 : if (ZSTD_isError(res))
332 : {
333 0 : if (exit_on_error)
334 0 : pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
335 0 : return -1;
336 : }
337 :
338 0 : if (output->pos == output->size)
339 0 : break; /* No more room for output */
340 :
341 0 : if (res == 0)
342 0 : break; /* End of frame */
343 : }
344 : }
345 :
346 0 : return output->pos;
347 0 : }
348 :
349 : static void
350 0 : Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
351 : {
352 0 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
353 0 : ZSTD_inBuffer *input = &zstdcs->input;
354 0 : ZSTD_outBuffer *output = &zstdcs->output;
355 0 : size_t res,
356 : cnt;
357 :
358 0 : input->src = ptr;
359 0 : input->size = size;
360 0 : input->pos = 0;
361 :
362 0 : if (zstdcs->cstream == NULL)
363 : {
364 0 : zstdcs->output.size = ZSTD_CStreamOutSize();
365 0 : zstdcs->output.dst = pg_malloc(zstdcs->output.size);
366 0 : zstdcs->output.pos = 0;
367 0 : zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
368 0 : if (zstdcs->cstream == NULL)
369 0 : pg_fatal("could not initialize compression library");
370 0 : }
371 :
372 : /* Consume all input, to be flushed later */
373 0 : while (input->pos < input->size)
374 : {
375 0 : res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
376 0 : if (ZSTD_isError(res))
377 0 : pg_fatal("could not write to file: %s", ZSTD_getErrorName(res));
378 :
379 : /* Dump output buffer if full */
380 0 : if (output->pos >= output->size)
381 : {
382 0 : errno = 0;
383 0 : cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
384 0 : if (cnt != output->pos)
385 : {
386 0 : errno = (errno) ? errno : ENOSPC;
387 0 : pg_fatal("could not write to file: %m");
388 0 : }
389 0 : output->pos = 0;
390 0 : }
391 : }
392 0 : }
393 :
394 : static int
395 0 : Zstd_getc(CompressFileHandle *CFH)
396 : {
397 0 : unsigned char ret;
398 :
399 0 : if (CFH->read_func(&ret, 1, CFH) != 1)
400 0 : pg_fatal("could not read from input file: end of file");
401 0 : return ret;
402 0 : }
403 :
404 : static char *
405 0 : Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
406 : {
407 0 : int i;
408 :
409 0 : Assert(len > 0);
410 :
411 : /*
412 : * Read one byte at a time until newline or EOF. This is only used to read
413 : * the list of LOs, and the I/O is buffered anyway.
414 : */
415 0 : for (i = 0; i < len - 1; ++i)
416 : {
417 0 : if (Zstd_read_internal(&buf[i], 1, CFH, false) != 1)
418 0 : break;
419 0 : if (buf[i] == '\n')
420 : {
421 0 : ++i;
422 0 : break;
423 : }
424 0 : }
425 0 : buf[i] = '\0';
426 0 : return i > 0 ? buf : NULL;
427 0 : }
428 :
429 : static size_t
430 0 : Zstd_read(void *ptr, size_t size, CompressFileHandle *CFH)
431 : {
432 0 : return Zstd_read_internal(ptr, size, CFH, true);
433 : }
434 :
435 : static bool
436 0 : Zstd_close(CompressFileHandle *CFH)
437 : {
438 0 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
439 0 : bool success = true;
440 :
441 0 : if (zstdcs->cstream)
442 : {
443 0 : size_t res,
444 : cnt;
445 0 : ZSTD_inBuffer *input = &zstdcs->input;
446 0 : ZSTD_outBuffer *output = &zstdcs->output;
447 :
448 : /* Loop until the compression buffers are fully consumed */
449 0 : for (;;)
450 : {
451 0 : res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
452 0 : if (ZSTD_isError(res))
453 : {
454 0 : zstdcs->zstderror = ZSTD_getErrorName(res);
455 0 : success = false;
456 0 : break;
457 : }
458 :
459 0 : errno = 0;
460 0 : cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
461 0 : if (cnt != output->pos)
462 : {
463 0 : errno = (errno) ? errno : ENOSPC;
464 0 : zstdcs->zstderror = strerror(errno);
465 0 : success = false;
466 0 : break;
467 : }
468 0 : output->pos = 0;
469 :
470 0 : if (res == 0)
471 0 : break; /* End of frame */
472 : }
473 :
474 0 : ZSTD_freeCStream(zstdcs->cstream);
475 0 : pg_free(zstdcs->output.dst);
476 0 : }
477 :
478 0 : if (zstdcs->dstream)
479 : {
480 0 : ZSTD_freeDStream(zstdcs->dstream);
481 0 : pg_free(unconstify(void *, zstdcs->input.src));
482 0 : }
483 :
484 0 : errno = 0;
485 0 : if (fclose(zstdcs->fp) != 0)
486 : {
487 0 : zstdcs->zstderror = strerror(errno);
488 0 : success = false;
489 0 : }
490 :
491 0 : pg_free(zstdcs);
492 0 : CFH->private_data = NULL;
493 0 : return success;
494 0 : }
495 :
496 : static bool
497 0 : Zstd_eof(CompressFileHandle *CFH)
498 : {
499 0 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
500 :
501 0 : return feof(zstdcs->fp);
502 0 : }
503 :
504 : static bool
505 0 : Zstd_open(const char *path, int fd, const char *mode,
506 : CompressFileHandle *CFH)
507 : {
508 0 : FILE *fp;
509 0 : ZstdCompressorState *zstdcs;
510 :
511 : /*
512 : * Clear state storage to avoid having the fd point to non-NULL memory on
513 : * error return.
514 : */
515 0 : CFH->private_data = NULL;
516 :
517 0 : zstdcs = (ZstdCompressorState *) pg_malloc_extended(sizeof(*zstdcs),
518 : MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
519 0 : if (!zstdcs)
520 : {
521 0 : errno = ENOMEM;
522 0 : return false;
523 : }
524 :
525 0 : if (fd >= 0)
526 0 : fp = fdopen(dup(fd), mode);
527 : else
528 0 : fp = fopen(path, mode);
529 :
530 0 : if (fp == NULL)
531 : {
532 0 : pg_free(zstdcs);
533 0 : return false;
534 : }
535 :
536 0 : zstdcs->fp = fp;
537 0 : CFH->private_data = zstdcs;
538 :
539 0 : return true;
540 0 : }
541 :
542 : static bool
543 0 : Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
544 : {
545 0 : char fname[MAXPGPATH];
546 :
547 0 : sprintf(fname, "%s.zst", path);
548 0 : return CFH->open_func(fname, -1, mode, CFH);
549 0 : }
550 :
551 : static const char *
552 0 : Zstd_get_error(CompressFileHandle *CFH)
553 : {
554 0 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
555 :
556 0 : return zstdcs->zstderror;
557 0 : }
558 :
559 : void
560 0 : InitCompressFileHandleZstd(CompressFileHandle *CFH,
561 : const pg_compress_specification compression_spec)
562 : {
563 0 : CFH->open_func = Zstd_open;
564 0 : CFH->open_write_func = Zstd_open_write;
565 0 : CFH->read_func = Zstd_read;
566 0 : CFH->write_func = Zstd_write;
567 0 : CFH->gets_func = Zstd_gets;
568 0 : CFH->getc_func = Zstd_getc;
569 0 : CFH->close_func = Zstd_close;
570 0 : CFH->eof_func = Zstd_eof;
571 0 : CFH->get_error_func = Zstd_get_error;
572 :
573 0 : CFH->compression_spec = compression_spec;
574 :
575 0 : CFH->private_data = NULL;
576 0 : }
577 :
578 : #endif /* USE_ZSTD */
|