/home/travis/build/MoarVM/MoarVM/src/io/syncstream.c
Line | Count | Source (jump to first uncovered line) |
1 | | #include "moar.h" |
2 | | |
3 | | /* We only get asynchronous forms of various kinds of I/O with libuv, yet we |
4 | | * also need to provide synchronous I/O on those. Here we do the work of that |
5 | | * adaptation. Since many things are exposed as streams in libuv, the code in |
6 | | * here is used to implement synchronous handling of TTYs, pipes, and sockets. |
7 | | * TTYs and (libuv) pipes use this directly; (real) pipes and sockets use many |
8 | | * of the functions, but have their own tables. |
9 | | */ |
10 | | |
11 | | /* Number of bytes we pull in at a time to the buffer. */ |
12 | 16 | #define CHUNK_SIZE 65536 |
13 | | |
14 | | /* Sets the encoding used for string-based I/O. */ |
15 | 6 | void MVM_io_syncstream_set_encoding(MVMThreadContext *tc, MVMOSHandle *h, MVMint64 encoding) { |
16 | 6 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
17 | 6 | if (data->ds) { |
18 | 0 | if (data->ds->chars_head) |
19 | 0 | MVM_exception_throw_adhoc(tc, "Too late to change handle encoding"); |
20 | 0 | data->ds->encoding = encoding; |
21 | 0 | } |
22 | 6 | data->encoding = encoding; |
23 | 6 | } |
24 | | |
25 | | /* Cannot seek a TTY of named pipe (could fake the forward case, probably). */ |
26 | 0 | void MVM_io_syncstream_seek(MVMThreadContext *tc, MVMOSHandle *h, MVMint64 offset, MVMint64 whence) { |
27 | 0 | MVM_exception_throw_adhoc(tc, "Cannot seek this kind of handle"); |
28 | 0 | } |
29 | | |
30 | | /* If we've been reading, the total number of bytes read. Otherwise, the total |
31 | | * number of bytes we've written. */ |
32 | 0 | MVMint64 MVM_io_syncstream_tell(MVMThreadContext *tc, MVMOSHandle *h) { |
33 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
34 | 0 | return data->ds |
35 | 0 | ? MVM_string_decodestream_tell_bytes(tc, data->ds) |
36 | 0 | : data->total_bytes_written; |
37 | 0 | } |
38 | | |
39 | | /* Set the line separator. */ |
40 | 6 | void MVM_io_syncstream_set_separator(MVMThreadContext *tc, MVMOSHandle *h, MVMString **seps, MVMint32 num_seps) { |
41 | 6 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
42 | 6 | MVM_string_decode_stream_sep_from_strings(tc, &(data->sep_spec), seps, num_seps); |
43 | 6 | } |
44 | | |
45 | | /* Read a bunch of bytes into the current decode stream. Returns true if we |
46 | | * read some data, and false if we hit EOF. */ |
47 | 10 | static void on_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { |
48 | 10 | size_t size = suggested_size > 0 ? suggested_size : 4; |
49 | 10 | buf->base = MVM_malloc(size); |
50 | 10 | buf->len = size; |
51 | 10 | } |
52 | 10 | static void on_read(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) { |
53 | 10 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)handle->data; |
54 | 10 | if (nread > 0) { |
55 | 4 | MVM_string_decodestream_add_bytes(data->cur_tc, data->ds, buf->base, nread); |
56 | 4 | } |
57 | 6 | else if (nread == UV_EOF) { |
58 | 6 | data->eof = 1; |
59 | 6 | if (buf->base) |
60 | 6 | MVM_free(buf->base); |
61 | 6 | } |
62 | 10 | uv_read_stop(handle); |
63 | 10 | uv_unref((uv_handle_t*)handle); |
64 | 10 | } |
65 | 16 | static MVMint32 read_to_buffer(MVMThreadContext *tc, MVMIOSyncStreamData *data, MVMint32 bytes) { |
66 | 16 | /* Don't try and read again if we already saw EOF. */ |
67 | 16 | if (!data->eof) { |
68 | 10 | int r; |
69 | 10 | data->handle->data = data; |
70 | 10 | data->cur_tc = tc; |
71 | 10 | if ((r = uv_read_start(data->handle, on_alloc, on_read)) < 0) |
72 | 0 | MVM_exception_throw_adhoc(tc, "Reading from stream failed: %s", |
73 | 0 | uv_strerror(r)); |
74 | 10 | uv_ref((uv_handle_t *)data->handle); |
75 | 10 | if (tc->loop != data->handle->loop) { |
76 | 0 | MVM_exception_throw_adhoc(tc, "Tried to read() from an IO handle outside its originating thread"); |
77 | 0 | } |
78 | 10 | MVM_gc_mark_thread_blocked(tc); |
79 | 10 | uv_run(tc->loop, UV_RUN_DEFAULT); |
80 | 10 | MVM_gc_mark_thread_unblocked(tc); |
81 | 10 | return 1; |
82 | 10 | } |
83 | 6 | else { |
84 | 6 | return 0; |
85 | 6 | } |
86 | 16 | } |
87 | | |
88 | | /* Ensures we have a decode stream, creating it if we're missing one. */ |
89 | 6 | static void ensure_decode_stream(MVMThreadContext *tc, MVMIOSyncStreamData *data) { |
90 | 6 | if (!data->ds) |
91 | 6 | data->ds = MVM_string_decodestream_create(tc, data->encoding, 0, |
92 | 6 | data->translate_newlines); |
93 | 6 | } |
94 | | |
95 | | /* Reads a single line from the stream. May serve it from a buffer, if we |
96 | | * already read enough data. */ |
97 | 0 | MVMString * MVM_io_syncstream_read_line(MVMThreadContext *tc, MVMOSHandle *h, MVMint32 chomp) { |
98 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
99 | 0 | ensure_decode_stream(tc, data); |
100 | 0 |
|
101 | 0 | /* Pull data until we can read a line. */ |
102 | 0 | do { |
103 | 0 | MVMString *line = MVM_string_decodestream_get_until_sep(tc, |
104 | 0 | data->ds, &(data->sep_spec), chomp); |
105 | 0 | if (line != NULL) |
106 | 0 | return line; |
107 | 0 | } while (read_to_buffer(tc, data, CHUNK_SIZE) > 0); |
108 | 0 |
|
109 | 0 | /* Reached end of stream, or last (non-termianted) line. */ |
110 | 0 | return MVM_string_decodestream_get_until_sep_eof(tc, data->ds, |
111 | 0 | &(data->sep_spec), chomp); |
112 | 0 | } |
113 | | |
114 | | /* Reads the stream from the current position to the end into a string, |
115 | | * fetching as much data is available. */ |
116 | 6 | MVMString * MVM_io_syncstream_slurp(MVMThreadContext *tc, MVMOSHandle *h) { |
117 | 6 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
118 | 6 | ensure_decode_stream(tc, data); |
119 | 6 | |
120 | 6 | /* Fetch as much data as we can (XXX this can be more efficient, by |
121 | 6 | * passing on down that we want to get many buffers from libuv). */ |
122 | 16 | while (read_to_buffer(tc, data, CHUNK_SIZE)) |
123 | 10 | ; |
124 | 6 | return MVM_string_decodestream_get_all(tc, data->ds); |
125 | 6 | } |
126 | | |
127 | | /* Gets the specified number of characters from the stream. */ |
128 | 0 | MVMString * MVM_io_syncstream_read_chars(MVMThreadContext *tc, MVMOSHandle *h, MVMint64 chars) { |
129 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
130 | 0 | MVMString *result; |
131 | 0 | ensure_decode_stream(tc, data); |
132 | 0 |
|
133 | 0 | /* Do we already have the chars available? */ |
134 | 0 | result = MVM_string_decodestream_get_chars(tc, data->ds, chars); |
135 | 0 | if (result) { |
136 | 0 | return result; |
137 | 0 | } |
138 | 0 | else { |
139 | 0 | /* No; read and try again. */ |
140 | 0 | read_to_buffer(tc, data, CHUNK_SIZE); |
141 | 0 | result = MVM_string_decodestream_get_chars(tc, data->ds, chars); |
142 | 0 | if (result != NULL) |
143 | 0 | return result; |
144 | 0 | } |
145 | 0 |
|
146 | 0 | /* Fetched all we immediately can, so just take what we have. */ |
147 | 0 | return MVM_string_decodestream_get_all(tc, data->ds); |
148 | 0 | } |
149 | | |
150 | | /* Reads the specified number of bytes into a the supplied buffer, returing |
151 | | * the number actually read. */ |
152 | 0 | MVMint64 MVM_io_syncstream_read_bytes(MVMThreadContext *tc, MVMOSHandle *h, char **buf, MVMint64 bytes) { |
153 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
154 | 0 | ensure_decode_stream(tc, data); |
155 | 0 |
|
156 | 0 | /* See if we've already enough; if not, try and grab more. */ |
157 | 0 | if (!MVM_string_decodestream_have_bytes(tc, data->ds, bytes)) |
158 | 0 | read_to_buffer(tc, data, bytes > CHUNK_SIZE ? bytes : CHUNK_SIZE); |
159 | 0 |
|
160 | 0 | /* Read as many as we can, up to the limit. */ |
161 | 0 | return MVM_string_decodestream_bytes_to_buf(tc, data->ds, buf, bytes); |
162 | 0 | } |
163 | | |
164 | | /* Checks if the end of stream has been reached. */ |
165 | 0 | MVMint64 MVM_io_syncstream_eof(MVMThreadContext *tc, MVMOSHandle *h) { |
166 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
167 | 0 |
|
168 | 0 | /* If we still have stuff in the buffer, certainly not the end (even if |
169 | 0 | * data->eof is set; that just means we read all we can from libuv, not |
170 | 0 | * that we processed it all). */ |
171 | 0 | if (data->ds && !MVM_string_decodestream_is_empty(tc, data->ds)) |
172 | 0 | return 0; |
173 | 0 |
|
174 | 0 | /* Otherwise, go on the EOF flag from the underlying stream. */ |
175 | 0 | return data->eof; |
176 | 0 | } |
177 | | |
178 | | /* Writes the specified string to the stream, maybe with a newline. */ |
179 | 11.3k | static void write_cb(uv_write_t* req, int status) { |
180 | 11.3k | uv_unref((uv_handle_t *)req->handle); |
181 | 11.3k | MVM_free(req); |
182 | 11.3k | } |
183 | 11.3k | MVMint64 MVM_io_syncstream_write_str(MVMThreadContext *tc, MVMOSHandle *h, MVMString *str, MVMint64 newline) { |
184 | 11.3k | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
185 | 11.3k | char *output; |
186 | 11.3k | MVMuint64 output_size; |
187 | 11.3k | uv_write_t *req; |
188 | 11.3k | uv_buf_t write_buf; |
189 | 11.3k | int r; |
190 | 11.3k | |
191 | 11.3k | output = MVM_string_encode(tc, str, 0, -1, &output_size, data->encoding, NULL, |
192 | 11.3k | data->translate_newlines ? MVM_TRANSLATE_NEWLINE_OUTPUT : 0); |
193 | 11.3k | if (newline) { |
194 | 11.3k | output = (char *)MVM_realloc(output, ++output_size); |
195 | 11.3k | output[output_size - 1] = '\n'; |
196 | 11.3k | } |
197 | 11.3k | req = MVM_malloc(sizeof(uv_write_t)); |
198 | 11.3k | write_buf = uv_buf_init(output, output_size); |
199 | 11.3k | uv_ref((uv_handle_t *)data->handle); |
200 | 11.3k | if ((r = uv_write(req, data->handle, &write_buf, 1, write_cb)) < 0) { |
201 | 0 | uv_unref((uv_handle_t *)data->handle); |
202 | 0 | MVM_free(req); |
203 | 0 | MVM_free(output); |
204 | 0 | MVM_exception_throw_adhoc(tc, "Failed to write string to stream: %s", uv_strerror(r)); |
205 | 0 | } |
206 | 11.3k | else { |
207 | 11.3k | uv_run(tc->loop, UV_RUN_DEFAULT); |
208 | 11.3k | MVM_free(output); |
209 | 11.3k | } |
210 | 11.3k | |
211 | 11.3k | data->total_bytes_written += output_size; |
212 | 11.3k | return output_size; |
213 | 11.3k | } |
214 | | |
215 | | /* Writes the specified bytes to the stream. */ |
216 | 0 | MVMint64 MVM_io_syncstream_write_bytes(MVMThreadContext *tc, MVMOSHandle *h, char *buf, MVMint64 bytes) { |
217 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
218 | 0 | uv_write_t *req = MVM_malloc(sizeof(uv_write_t)); |
219 | 0 | uv_buf_t write_buf = uv_buf_init(buf, bytes); |
220 | 0 | int r; |
221 | 0 | uv_ref((uv_handle_t *)data->handle); |
222 | 0 | if ((r = uv_write(req, data->handle, &write_buf, 1, write_cb)) < 0) { |
223 | 0 | uv_unref((uv_handle_t *)data->handle); |
224 | 0 | MVM_free(req); |
225 | 0 | MVM_exception_throw_adhoc(tc, "Failed to write bytes to stream: %s", uv_strerror(r)); |
226 | 0 | } |
227 | 0 | else { |
228 | 0 | uv_run(tc->loop, UV_RUN_DEFAULT); |
229 | 0 | } |
230 | 0 | data->total_bytes_written += bytes; |
231 | 0 | return bytes; |
232 | 0 | } |
233 | | |
234 | | /* No flush available for stream. */ |
235 | 0 | void MVM_io_syncstream_flush(MVMThreadContext *tc, MVMOSHandle *h){ |
236 | 0 | } |
237 | | |
238 | | /* Cannot truncate a stream. */ |
239 | 0 | void MVM_io_syncstream_truncate(MVMThreadContext *tc, MVMOSHandle *h, MVMint64 bytes) { |
240 | 0 | MVM_exception_throw_adhoc(tc, "Cannot truncate this kind of handle"); |
241 | 0 | } |
242 | | |
243 | | /* A close function for the case when we simply have a handle. */ |
244 | 0 | static MVMint64 not_std_handle(MVMThreadContext *tc, MVMObject *h) { |
245 | 0 | return h != tc->instance->stdin_handle && |
246 | 0 | h != tc->instance->stdout_handle && |
247 | 0 | h != tc->instance->stderr_handle; |
248 | 0 | } |
249 | 0 | static MVMint64 closefh(MVMThreadContext *tc, MVMOSHandle *h) { |
250 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
251 | 0 | if (data->handle && not_std_handle(tc, (MVMObject *)h)) { |
252 | 0 | uv_close((uv_handle_t *)data->handle, NULL); |
253 | 0 | data->handle = NULL; |
254 | 0 | if (data->ds) { |
255 | 0 | MVM_string_decodestream_destroy(tc, data->ds); |
256 | 0 | data->ds = NULL; |
257 | 0 | } |
258 | 0 | } |
259 | 0 | return 0; |
260 | 0 | } |
261 | | |
262 | 0 | static MVMint64 is_tty(MVMThreadContext *tc, MVMOSHandle *h) { |
263 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
264 | 0 | return data->is_tty; |
265 | 0 | } |
266 | | |
267 | | /* Get native file descriptor. */ |
268 | 0 | static MVMint64 mvm_fileno(MVMThreadContext *tc, MVMOSHandle *h) { |
269 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
270 | 0 | uv_os_fd_t fd; |
271 | 0 | if (uv_fileno((uv_handle_t *)data->handle, &fd) >= 0) |
272 | 0 | return (MVMint64)fd; |
273 | 0 | return -1; |
274 | 0 | } |
275 | | |
276 | | /* Operations aiding process spawning and I/O handling. */ |
277 | | static void bind_stdio_handle(MVMThreadContext *tc, MVMOSHandle *h, uv_stdio_container_t *stdio, |
278 | 0 | uv_process_t *process) { |
279 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
280 | 0 | stdio->flags = UV_INHERIT_STREAM; |
281 | 0 | stdio->data.stream = data->handle; |
282 | 0 | } |
283 | | |
284 | | /* Frees data associated with the handle, closing it if needed. */ |
285 | 0 | static void gc_free(MVMThreadContext *tc, MVMObject *h, void *d) { |
286 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)d; |
287 | 0 | if (data) { |
288 | 0 | if (data->handle) { |
289 | 0 | uv_close((uv_handle_t *)data->handle, NULL); |
290 | 0 | uv_run(tc->loop, UV_RUN_DEFAULT); |
291 | 0 | MVM_free(data->handle); |
292 | 0 | data->handle = NULL; |
293 | 0 | } |
294 | 0 | if (data->ds) { |
295 | 0 | MVM_string_decodestream_destroy(tc, data->ds); |
296 | 0 | data->ds = NULL; |
297 | 0 | } |
298 | 0 | MVM_string_decode_stream_sep_destroy(tc, &(data->sep_spec)); |
299 | 0 | MVM_free(data); |
300 | 0 | } |
301 | 0 | } |
302 | | |
303 | | /* IO ops table, populated with functions. */ |
304 | | static const MVMIOClosable closable = { closefh }; |
305 | | static const MVMIOEncodable encodable = { MVM_io_syncstream_set_encoding }; |
306 | | static const MVMIOSyncReadable sync_readable = { MVM_io_syncstream_set_separator, |
307 | | MVM_io_syncstream_read_line, |
308 | | MVM_io_syncstream_slurp, |
309 | | MVM_io_syncstream_read_chars, |
310 | | MVM_io_syncstream_read_bytes, |
311 | | MVM_io_syncstream_eof }; |
312 | | static const MVMIOSyncWritable sync_writable = { MVM_io_syncstream_write_str, |
313 | | MVM_io_syncstream_write_bytes, |
314 | | MVM_io_syncstream_flush, |
315 | | MVM_io_syncstream_truncate }; |
316 | | static const MVMIOSeekable seekable = { MVM_io_syncstream_seek, |
317 | | MVM_io_syncstream_tell }; |
318 | | static const MVMIOPipeable pipeable = { bind_stdio_handle }; |
319 | | |
320 | | static const MVMIOIntrospection introspection = { is_tty, mvm_fileno }; |
321 | | |
322 | | static const MVMIOOps op_table = { |
323 | | &closable, |
324 | | &encodable, |
325 | | &sync_readable, |
326 | | &sync_writable, |
327 | | NULL, |
328 | | NULL, |
329 | | NULL, |
330 | | &seekable, |
331 | | NULL, |
332 | | &pipeable, |
333 | | NULL, |
334 | | &introspection, |
335 | | NULL, |
336 | | gc_free |
337 | | }; |
338 | | |
339 | | /* Wraps a libuv stream (likely, libuv pipe or TTY) up in a sync stream. */ |
340 | 390 | MVMObject * MVM_io_syncstream_from_uvstream(MVMThreadContext *tc, uv_stream_t *handle, MVMint8 is_tty) { |
341 | 390 | MVMOSHandle * const result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO); |
342 | 390 | MVMIOSyncStreamData * const data = MVM_calloc(1, sizeof(MVMIOSyncStreamData)); |
343 | 390 | data->handle = handle; |
344 | 390 | data->encoding = MVM_encoding_type_utf8; |
345 | 390 | data->is_tty = is_tty; |
346 | 390 | data->translate_newlines = 1; |
347 | 390 | MVM_string_decode_stream_sep_default(tc, &(data->sep_spec)); |
348 | 390 | result->body.ops = &op_table; |
349 | 390 | result->body.data = data; |
350 | 390 | return (MVMObject *)result; |
351 | 390 | } |