Coverage Report

Created: 2017-04-15 07:07

/home/travis/build/MoarVM/MoarVM/src/io/syncstream.c
Line
Count
Source (jump to first uncovered line)
1
#include "moar.h"
2
3
/* We only get asynchronous forms of various kinds of I/O with libuv, yet we
4
 * also need to provide synchronous I/O on those. Here we do the work of that
5
 * adaptation. Since many things are exposed as streams in libuv, the code in
6
 * here is used to implement synchronous handling of TTYs, pipes, and sockets.
7
 * TTYs and (libuv) pipes use this directly; (real) pipes and sockets use many
8
 * of the functions, but have their own tables.
9
 */
10
11
/* Number of bytes we pull in at a time to the buffer. */
12
16
#define CHUNK_SIZE 65536
13
14
/* Sets the encoding used for string-based I/O. */
15
6
void MVM_io_syncstream_set_encoding(MVMThreadContext *tc, MVMOSHandle *h, MVMint64 encoding) {
16
6
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
17
6
    if (data->ds) {
18
0
        if (data->ds->chars_head)
19
0
            MVM_exception_throw_adhoc(tc, "Too late to change handle encoding");
20
0
        data->ds->encoding = encoding;
21
0
    }
22
6
    data->encoding = encoding;
23
6
}
24
25
/* Cannot seek a TTY of named pipe (could fake the forward case, probably). */
26
0
void MVM_io_syncstream_seek(MVMThreadContext *tc, MVMOSHandle *h, MVMint64 offset, MVMint64 whence) {
27
0
    MVM_exception_throw_adhoc(tc, "Cannot seek this kind of handle");
28
0
}
29
30
/* If we've been reading, the total number of bytes read. Otherwise, the total
31
 * number of bytes we've written. */
32
0
MVMint64 MVM_io_syncstream_tell(MVMThreadContext *tc, MVMOSHandle *h) {
33
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
34
0
    return data->ds
35
0
        ? MVM_string_decodestream_tell_bytes(tc, data->ds)
36
0
        : data->total_bytes_written;
37
0
}
38
39
/* Set the line separator. */
40
6
void MVM_io_syncstream_set_separator(MVMThreadContext *tc, MVMOSHandle *h, MVMString **seps, MVMint32 num_seps) {
41
6
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
42
6
    MVM_string_decode_stream_sep_from_strings(tc, &(data->sep_spec), seps, num_seps);
43
6
}
44
45
/* Read a bunch of bytes into the current decode stream. Returns true if we
46
 * read some data, and false if we hit EOF. */
47
10
static void on_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
48
10
    size_t size = suggested_size > 0 ? suggested_size : 4;
49
10
    buf->base   = MVM_malloc(size);
50
10
    buf->len    = size;
51
10
}
52
10
static void on_read(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
53
10
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)handle->data;
54
10
    if (nread > 0) {
55
4
        MVM_string_decodestream_add_bytes(data->cur_tc, data->ds, buf->base, nread);
56
4
    }
57
6
    else if (nread == UV_EOF) {
58
6
        data->eof = 1;
59
6
        if (buf->base)
60
6
            MVM_free(buf->base);
61
6
    }
62
10
    uv_read_stop(handle);
63
10
    uv_unref((uv_handle_t*)handle);
64
10
}
65
16
static MVMint32 read_to_buffer(MVMThreadContext *tc, MVMIOSyncStreamData *data, MVMint32 bytes) {
66
16
    /* Don't try and read again if we already saw EOF. */
67
16
    if (!data->eof) {
68
10
        int r;
69
10
        data->handle->data = data;
70
10
        data->cur_tc = tc;
71
10
        if ((r = uv_read_start(data->handle, on_alloc, on_read)) < 0)
72
0
            MVM_exception_throw_adhoc(tc, "Reading from stream failed: %s",
73
0
                uv_strerror(r));
74
10
        uv_ref((uv_handle_t *)data->handle);
75
10
        if (tc->loop != data->handle->loop) {
76
0
            MVM_exception_throw_adhoc(tc, "Tried to read() from an IO handle outside its originating thread");
77
0
        }
78
10
        MVM_gc_mark_thread_blocked(tc);
79
10
        uv_run(tc->loop, UV_RUN_DEFAULT);
80
10
        MVM_gc_mark_thread_unblocked(tc);
81
10
        return 1;
82
10
    }
83
6
    else {
84
6
        return 0;
85
6
    }
86
16
}
87
88
/* Ensures we have a decode stream, creating it if we're missing one. */
89
6
static void ensure_decode_stream(MVMThreadContext *tc, MVMIOSyncStreamData *data) {
90
6
    if (!data->ds)
91
6
        data->ds = MVM_string_decodestream_create(tc, data->encoding, 0,
92
6
            data->translate_newlines);
93
6
}
94
95
/* Reads a single line from the stream. May serve it from a buffer, if we
96
 * already read enough data. */
97
0
MVMString * MVM_io_syncstream_read_line(MVMThreadContext *tc, MVMOSHandle *h, MVMint32 chomp) {
98
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
99
0
    ensure_decode_stream(tc, data);
100
0
101
0
    /* Pull data until we can read a line. */
102
0
    do {
103
0
        MVMString *line = MVM_string_decodestream_get_until_sep(tc,
104
0
            data->ds, &(data->sep_spec), chomp);
105
0
        if (line != NULL)
106
0
            return line;
107
0
    } while (read_to_buffer(tc, data, CHUNK_SIZE) > 0);
108
0
109
0
    /* Reached end of stream, or last (non-termianted) line. */
110
0
    return MVM_string_decodestream_get_until_sep_eof(tc, data->ds,
111
0
        &(data->sep_spec), chomp);
112
0
}
113
114
/* Reads the stream from the current position to the end into a string,
115
 * fetching as much data is available. */
116
6
MVMString * MVM_io_syncstream_slurp(MVMThreadContext *tc, MVMOSHandle *h) {
117
6
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
118
6
    ensure_decode_stream(tc, data);
119
6
120
6
    /* Fetch as much data as we can (XXX this can be more efficient, by
121
6
     * passing on down that we want to get many buffers from libuv). */
122
16
    while (read_to_buffer(tc, data, CHUNK_SIZE))
123
10
        ;
124
6
    return MVM_string_decodestream_get_all(tc, data->ds);
125
6
}
126
127
/* Gets the specified number of characters from the stream. */
128
0
MVMString * MVM_io_syncstream_read_chars(MVMThreadContext *tc, MVMOSHandle *h, MVMint64 chars) {
129
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
130
0
    MVMString *result;
131
0
    ensure_decode_stream(tc, data);
132
0
133
0
    /* Do we already have the chars available? */
134
0
    result = MVM_string_decodestream_get_chars(tc, data->ds, chars);
135
0
    if (result) {
136
0
        return result;
137
0
    }
138
0
    else {
139
0
        /* No; read and try again. */
140
0
        read_to_buffer(tc, data, CHUNK_SIZE);
141
0
        result = MVM_string_decodestream_get_chars(tc, data->ds, chars);
142
0
        if (result != NULL)
143
0
            return result;
144
0
    }
145
0
146
0
    /* Fetched all we immediately can, so just take what we have. */
147
0
    return MVM_string_decodestream_get_all(tc, data->ds);
148
0
}
149
150
/* Reads the specified number of bytes into a the supplied buffer, returing
151
 * the number actually read. */
152
0
MVMint64 MVM_io_syncstream_read_bytes(MVMThreadContext *tc, MVMOSHandle *h, char **buf, MVMint64 bytes) {
153
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
154
0
    ensure_decode_stream(tc, data);
155
0
156
0
    /* See if we've already enough; if not, try and grab more. */
157
0
    if (!MVM_string_decodestream_have_bytes(tc, data->ds, bytes))
158
0
        read_to_buffer(tc, data, bytes > CHUNK_SIZE ? bytes : CHUNK_SIZE);
159
0
160
0
    /* Read as many as we can, up to the limit. */
161
0
    return MVM_string_decodestream_bytes_to_buf(tc, data->ds, buf, bytes);
162
0
}
163
164
/* Checks if the end of stream has been reached. */
165
0
MVMint64 MVM_io_syncstream_eof(MVMThreadContext *tc, MVMOSHandle *h) {
166
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
167
0
168
0
    /* If we still have stuff in the buffer, certainly not the end (even if
169
0
     * data->eof is set; that just means we read all we can from libuv, not
170
0
     * that we processed it all). */
171
0
    if (data->ds && !MVM_string_decodestream_is_empty(tc, data->ds))
172
0
        return 0;
173
0
174
0
    /* Otherwise, go on the EOF flag from the underlying stream. */
175
0
    return data->eof;
176
0
}
177
178
/* Writes the specified string to the stream, maybe with a newline. */
179
11.3k
static void write_cb(uv_write_t* req, int status) {
180
11.3k
    uv_unref((uv_handle_t *)req->handle);
181
11.3k
    MVM_free(req);
182
11.3k
}
183
11.3k
MVMint64 MVM_io_syncstream_write_str(MVMThreadContext *tc, MVMOSHandle *h, MVMString *str, MVMint64 newline) {
184
11.3k
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
185
11.3k
    char *output;
186
11.3k
    MVMuint64 output_size;
187
11.3k
    uv_write_t *req;
188
11.3k
    uv_buf_t write_buf;
189
11.3k
    int r;
190
11.3k
191
11.3k
    output = MVM_string_encode(tc, str, 0, -1, &output_size, data->encoding, NULL,
192
11.3k
        data->translate_newlines ? MVM_TRANSLATE_NEWLINE_OUTPUT : 0);
193
11.3k
    if (newline) {
194
11.3k
        output = (char *)MVM_realloc(output, ++output_size);
195
11.3k
        output[output_size - 1] = '\n';
196
11.3k
    }
197
11.3k
    req = MVM_malloc(sizeof(uv_write_t));
198
11.3k
    write_buf = uv_buf_init(output, output_size);
199
11.3k
    uv_ref((uv_handle_t *)data->handle);
200
11.3k
    if ((r = uv_write(req, data->handle, &write_buf, 1, write_cb)) < 0) {
201
0
        uv_unref((uv_handle_t *)data->handle);
202
0
        MVM_free(req);
203
0
        MVM_free(output);
204
0
        MVM_exception_throw_adhoc(tc, "Failed to write string to stream: %s", uv_strerror(r));
205
0
    }
206
11.3k
    else {
207
11.3k
        uv_run(tc->loop, UV_RUN_DEFAULT);
208
11.3k
        MVM_free(output);
209
11.3k
    }
210
11.3k
211
11.3k
    data->total_bytes_written += output_size;
212
11.3k
    return output_size;
213
11.3k
}
214
215
/* Writes the specified bytes to the stream. */
216
0
MVMint64 MVM_io_syncstream_write_bytes(MVMThreadContext *tc, MVMOSHandle *h, char *buf, MVMint64 bytes) {
217
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
218
0
    uv_write_t *req = MVM_malloc(sizeof(uv_write_t));
219
0
    uv_buf_t write_buf = uv_buf_init(buf, bytes);
220
0
    int r;
221
0
    uv_ref((uv_handle_t *)data->handle);
222
0
    if ((r = uv_write(req, data->handle, &write_buf, 1, write_cb)) < 0) {
223
0
        uv_unref((uv_handle_t *)data->handle);
224
0
        MVM_free(req);
225
0
        MVM_exception_throw_adhoc(tc, "Failed to write bytes to stream: %s", uv_strerror(r));
226
0
    }
227
0
    else {
228
0
        uv_run(tc->loop, UV_RUN_DEFAULT);
229
0
    }
230
0
    data->total_bytes_written += bytes;
231
0
    return bytes;
232
0
}
233
234
/* No flush available for stream. */
235
0
void MVM_io_syncstream_flush(MVMThreadContext *tc, MVMOSHandle *h){
236
0
}
237
238
/* Cannot truncate a stream. */
239
0
void MVM_io_syncstream_truncate(MVMThreadContext *tc, MVMOSHandle *h, MVMint64 bytes) {
240
0
    MVM_exception_throw_adhoc(tc, "Cannot truncate this kind of handle");
241
0
}
242
243
/* A close function for the case when we simply have a handle. */
244
0
static MVMint64 not_std_handle(MVMThreadContext *tc, MVMObject *h) {
245
0
    return h != tc->instance->stdin_handle &&
246
0
           h != tc->instance->stdout_handle &&
247
0
           h != tc->instance->stderr_handle;
248
0
}
249
0
static MVMint64 closefh(MVMThreadContext *tc, MVMOSHandle *h) {
250
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
251
0
    if (data->handle && not_std_handle(tc, (MVMObject *)h)) {
252
0
        uv_close((uv_handle_t *)data->handle, NULL);
253
0
        data->handle = NULL;
254
0
        if (data->ds) {
255
0
            MVM_string_decodestream_destroy(tc, data->ds);
256
0
            data->ds = NULL;
257
0
        }
258
0
    }
259
0
    return 0;
260
0
}
261
262
0
static MVMint64 is_tty(MVMThreadContext *tc, MVMOSHandle *h) {
263
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
264
0
    return data->is_tty;
265
0
}
266
267
/* Get native file descriptor. */
268
0
static MVMint64 mvm_fileno(MVMThreadContext *tc, MVMOSHandle *h) {
269
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
270
0
    uv_os_fd_t fd;
271
0
    if (uv_fileno((uv_handle_t *)data->handle, &fd) >= 0)
272
0
        return (MVMint64)fd;
273
0
    return -1;
274
0
}
275
276
/* Operations aiding process spawning and I/O handling. */
277
static void bind_stdio_handle(MVMThreadContext *tc, MVMOSHandle *h, uv_stdio_container_t *stdio,
278
0
        uv_process_t *process) {
279
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
280
0
    stdio->flags              = UV_INHERIT_STREAM;
281
0
    stdio->data.stream        = data->handle;
282
0
}
283
284
/* Frees data associated with the handle, closing it if needed. */
285
0
static void gc_free(MVMThreadContext *tc, MVMObject *h, void *d) {
286
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)d;
287
0
    if (data) {
288
0
        if (data->handle) {
289
0
            uv_close((uv_handle_t *)data->handle, NULL);
290
0
            uv_run(tc->loop, UV_RUN_DEFAULT);
291
0
            MVM_free(data->handle);
292
0
            data->handle = NULL;
293
0
        }
294
0
        if (data->ds) {
295
0
            MVM_string_decodestream_destroy(tc, data->ds);
296
0
            data->ds = NULL;
297
0
        }
298
0
        MVM_string_decode_stream_sep_destroy(tc, &(data->sep_spec));
299
0
        MVM_free(data);
300
0
    }
301
0
}
302
303
/* IO ops table, populated with functions. */
304
static const MVMIOClosable     closable      = { closefh };
305
static const MVMIOEncodable    encodable     = { MVM_io_syncstream_set_encoding };
306
static const MVMIOSyncReadable sync_readable = { MVM_io_syncstream_set_separator,
307
                                                 MVM_io_syncstream_read_line,
308
                                                 MVM_io_syncstream_slurp,
309
                                                 MVM_io_syncstream_read_chars,
310
                                                 MVM_io_syncstream_read_bytes,
311
                                                 MVM_io_syncstream_eof };
312
static const MVMIOSyncWritable sync_writable = { MVM_io_syncstream_write_str,
313
                                                 MVM_io_syncstream_write_bytes,
314
                                                 MVM_io_syncstream_flush,
315
                                                 MVM_io_syncstream_truncate };
316
static const MVMIOSeekable          seekable = { MVM_io_syncstream_seek,
317
                                                 MVM_io_syncstream_tell };
318
static const MVMIOPipeable     pipeable      = { bind_stdio_handle };
319
320
static const MVMIOIntrospection introspection = { is_tty, mvm_fileno };
321
322
static const MVMIOOps op_table = {
323
    &closable,
324
    &encodable,
325
    &sync_readable,
326
    &sync_writable,
327
    NULL,
328
    NULL,
329
    NULL,
330
    &seekable,
331
    NULL,
332
    &pipeable,
333
    NULL,
334
    &introspection,
335
    NULL,
336
    gc_free
337
};
338
339
/* Wraps a libuv stream (likely, libuv pipe or TTY) up in a sync stream. */
340
390
MVMObject * MVM_io_syncstream_from_uvstream(MVMThreadContext *tc, uv_stream_t *handle, MVMint8 is_tty) {
341
390
    MVMOSHandle         * const result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO);
342
390
    MVMIOSyncStreamData * const data   = MVM_calloc(1, sizeof(MVMIOSyncStreamData));
343
390
    data->handle      = handle;
344
390
    data->encoding    = MVM_encoding_type_utf8;
345
390
    data->is_tty      = is_tty;
346
390
    data->translate_newlines = 1;
347
390
    MVM_string_decode_stream_sep_default(tc, &(data->sep_spec));
348
390
    result->body.ops  = &op_table;
349
390
    result->body.data = data;
350
390
    return (MVMObject *)result;
351
390
}