Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * local_source.c
4 : * Functions for using a local data directory as the source.
5 : *
6 : * Portions Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 : *
8 : *-------------------------------------------------------------------------
9 : */
10 : #include "postgres_fe.h"
11 :
12 : #include <fcntl.h>
13 : #include <unistd.h>
14 :
15 : #include "common/logging.h"
16 : #include "file_ops.h"
17 : #include "rewind_source.h"
18 :
19 : typedef struct
20 : {
21 : rewind_source common; /* common interface functions */
22 :
23 : const char *datadir; /* path to the source data directory */
24 : } local_source;
25 :
26 : static void local_traverse_files(rewind_source *source,
27 : process_file_callback_t callback);
28 : static char *local_fetch_file(rewind_source *source, const char *path,
29 : size_t *filesize);
30 : static void local_queue_fetch_file(rewind_source *source, const char *path,
31 : size_t len);
32 : static void local_queue_fetch_range(rewind_source *source, const char *path,
33 : off_t off, size_t len);
34 : static void local_finish_fetch(rewind_source *source);
35 : static void local_destroy(rewind_source *source);
36 :
37 : rewind_source *
38 0 : init_local_source(const char *datadir)
39 : {
40 0 : local_source *src;
41 :
42 0 : src = pg_malloc0(sizeof(local_source));
43 :
44 0 : src->common.traverse_files = local_traverse_files;
45 0 : src->common.fetch_file = local_fetch_file;
46 0 : src->common.queue_fetch_file = local_queue_fetch_file;
47 0 : src->common.queue_fetch_range = local_queue_fetch_range;
48 0 : src->common.finish_fetch = local_finish_fetch;
49 0 : src->common.get_current_wal_insert_lsn = NULL;
50 0 : src->common.destroy = local_destroy;
51 :
52 0 : src->datadir = datadir;
53 :
54 0 : return &src->common;
55 0 : }
56 :
57 : static void
58 0 : local_traverse_files(rewind_source *source, process_file_callback_t callback)
59 : {
60 0 : traverse_datadir(((local_source *) source)->datadir, callback);
61 0 : }
62 :
63 : static char *
64 0 : local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
65 : {
66 0 : return slurpFile(((local_source *) source)->datadir, path, filesize);
67 : }
68 :
69 : /*
70 : * Copy a file from source to target.
71 : *
72 : * 'len' is the expected length of the file.
73 : */
74 : static void
75 0 : local_queue_fetch_file(rewind_source *source, const char *path, size_t len)
76 : {
77 0 : const char *datadir = ((local_source *) source)->datadir;
78 0 : PGIOAlignedBlock buf;
79 0 : char srcpath[MAXPGPATH];
80 0 : int srcfd;
81 0 : size_t written_len;
82 :
83 0 : snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
84 :
85 : /* Open source file for reading */
86 0 : srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
87 0 : if (srcfd < 0)
88 0 : pg_fatal("could not open source file \"%s\": %m",
89 : srcpath);
90 :
91 : /* Truncate and open the target file for writing */
92 0 : open_target_file(path, true);
93 :
94 0 : written_len = 0;
95 0 : for (;;)
96 : {
97 0 : ssize_t read_len;
98 :
99 0 : read_len = read(srcfd, buf.data, sizeof(buf));
100 :
101 0 : if (read_len < 0)
102 0 : pg_fatal("could not read file \"%s\": %m", srcpath);
103 0 : else if (read_len == 0)
104 0 : break; /* EOF reached */
105 :
106 0 : write_target_range(buf.data, written_len, read_len);
107 0 : written_len += read_len;
108 0 : }
109 :
110 : /*
111 : * A local source is not expected to change while we're rewinding, so
112 : * check that the size of the file matches our earlier expectation.
113 : */
114 0 : if (written_len != len)
115 0 : pg_fatal("size of source file \"%s\" changed concurrently: %zu bytes expected, %zu copied",
116 : srcpath, len, written_len);
117 :
118 0 : if (close(srcfd) != 0)
119 0 : pg_fatal("could not close file \"%s\": %m", srcpath);
120 0 : }
121 :
122 : /*
123 : * Copy a file from source to target, starting at 'off', for 'len' bytes.
124 : */
125 : static void
126 0 : local_queue_fetch_range(rewind_source *source, const char *path, off_t off,
127 : size_t len)
128 : {
129 0 : const char *datadir = ((local_source *) source)->datadir;
130 0 : PGIOAlignedBlock buf;
131 0 : char srcpath[MAXPGPATH];
132 0 : int srcfd;
133 0 : off_t begin = off;
134 0 : off_t end = off + len;
135 :
136 0 : snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
137 :
138 0 : srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
139 0 : if (srcfd < 0)
140 0 : pg_fatal("could not open source file \"%s\": %m",
141 : srcpath);
142 :
143 0 : if (lseek(srcfd, begin, SEEK_SET) == -1)
144 0 : pg_fatal("could not seek in source file: %m");
145 :
146 0 : open_target_file(path, false);
147 :
148 0 : while (end - begin > 0)
149 : {
150 0 : ssize_t readlen;
151 0 : size_t thislen;
152 :
153 0 : if (end - begin > sizeof(buf))
154 0 : thislen = sizeof(buf);
155 : else
156 0 : thislen = end - begin;
157 :
158 0 : readlen = read(srcfd, buf.data, thislen);
159 :
160 0 : if (readlen < 0)
161 0 : pg_fatal("could not read file \"%s\": %m", srcpath);
162 0 : else if (readlen == 0)
163 0 : pg_fatal("unexpected EOF while reading file \"%s\"", srcpath);
164 :
165 0 : write_target_range(buf.data, begin, readlen);
166 0 : begin += readlen;
167 0 : }
168 :
169 0 : if (close(srcfd) != 0)
170 0 : pg_fatal("could not close file \"%s\": %m", srcpath);
171 0 : }
172 :
173 : static void
174 0 : local_finish_fetch(rewind_source *source)
175 : {
176 : /*
177 : * Nothing to do, local_queue_fetch_range() copies the ranges immediately.
178 : */
179 0 : }
180 :
181 : static void
182 0 : local_destroy(rewind_source *source)
183 : {
184 0 : pfree(source);
185 0 : }
|