Coverage Report

Created: 2017-06-23 10:30

/home/travis/build/MoarVM/MoarVM/src/io/syncstream.c
Line
Count
Source (jump to first uncovered line)
1
#include "moar.h"
2
3
/* We only get asynchronous forms of various kinds of I/O with libuv, yet we
4
 * also need to provide synchronous I/O on those. Here we do the work of that
5
 * adaptation. Since many things are exposed as streams in libuv, the code in
6
 * here is used to implement synchronous handling of TTYs, pipes, and sockets.
7
 * TTYs and (libuv) pipes use this directly; (real) pipes and sockets use many
8
 * of the functions, but have their own tables.
9
 */
10
11
/* Number of bytes we pull in at a time to the buffer. */
12
#define CHUNK_SIZE 65536
13
14
/* Cannot seek a TTY of named pipe (could fake the forward case, probably). */
15
0
void MVM_io_syncstream_seek(MVMThreadContext *tc, MVMOSHandle *h, MVMint64 offset, MVMint64 whence) {
16
0
    MVM_exception_throw_adhoc(tc, "Cannot seek this kind of handle");
17
0
}
18
19
/* If we've been reading, the total number of bytes read. Otherwise, the total
20
 * number of bytes we've written. */
21
0
MVMint64 MVM_io_syncstream_tell(MVMThreadContext *tc, MVMOSHandle *h) {
22
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
23
0
    return data->position;
24
0
}
25
26
/* Reads the specified number of bytes into a the supplied buffer, returing
27
 * the number actually read. */
28
0
static void on_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
29
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)handle->data;
30
0
    buf->base = MVM_malloc(data->to_read);
31
0
    buf->len = data->to_read;
32
0
    MVM_telemetry_interval_annotate(data->to_read, data->interval_id, "alloced this much space");
33
0
}
34
0
static void on_read(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
35
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)handle->data;
36
0
    if (nread > 0) {
37
0
        data->buf = buf->base;
38
0
        data->nread = nread;
39
0
    }
40
0
    else if (nread == UV_EOF) {
41
0
        data->buf = NULL;
42
0
        data->nread = 0;
43
0
        data->eof = 1;
44
0
        if (buf->base)
45
0
            MVM_free(buf->base);
46
0
    }
47
0
    uv_read_stop(handle);
48
0
    uv_unref((uv_handle_t*)handle);
49
0
}
50
MVMint64 MVM_io_syncstream_read_bytes(MVMThreadContext *tc, MVMOSHandle *h, char **buf_out,
51
0
                                      MVMint64 bytes) {
52
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
53
0
    if (bytes > 0 && !data->eof) {
54
0
        int r;
55
0
        unsigned int interval_id;
56
0
57
0
        interval_id = MVM_telemetry_interval_start(tc, "syncstream.read_bytes");
58
0
        data->handle->data = data;
59
0
        data->cur_tc = tc;
60
0
        data->to_read = bytes;
61
0
        if ((r = uv_read_start(data->handle, on_alloc, on_read)) < 0)
62
0
            MVM_exception_throw_adhoc(tc, "Reading from stream failed: %s",
63
0
                uv_strerror(r));
64
0
        uv_ref((uv_handle_t *)data->handle);
65
0
        if (tc->loop != data->handle->loop)
66
0
            MVM_exception_throw_adhoc(tc, "Tried to read() from an IO handle outside its originating thread");
67
0
        MVM_gc_mark_thread_blocked(tc);
68
0
        uv_run(tc->loop, UV_RUN_DEFAULT);
69
0
        MVM_gc_mark_thread_unblocked(tc);
70
0
        MVM_telemetry_interval_annotate(data->nread, data->interval_id, "read this many bytes");
71
0
        MVM_telemetry_interval_stop(tc, interval_id, "syncstream.read_to_buffer");
72
0
        *buf_out = data->buf;
73
0
        return data->nread;
74
0
    }
75
0
    else {
76
0
        *buf_out = NULL;
77
0
        return 0;
78
0
    }
79
0
}
80
81
/* Checks if the end of stream has been reached. */
82
0
MVMint64 MVM_io_syncstream_eof(MVMThreadContext *tc, MVMOSHandle *h) {
83
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
84
0
    return data->eof;
85
0
}
86
87
/* Writes the specified bytes to the stream. */
88
0
static void write_cb(uv_write_t* req, int status) {
89
0
    uv_unref((uv_handle_t *)req->handle);
90
0
    MVM_free(req);
91
0
}
92
0
MVMint64 MVM_io_syncstream_write_bytes(MVMThreadContext *tc, MVMOSHandle *h, char *buf, MVMint64 bytes) {
93
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
94
0
    uv_write_t *req = MVM_malloc(sizeof(uv_write_t));
95
0
    uv_buf_t write_buf = uv_buf_init(buf, bytes);
96
0
    int r;
97
0
    unsigned int interval_id;
98
0
99
0
    interval_id = MVM_telemetry_interval_start(tc, "syncstream.write_bytes");
100
0
    uv_ref((uv_handle_t *)data->handle);
101
0
    if ((r = uv_write(req, data->handle, &write_buf, 1, write_cb)) < 0) {
102
0
        uv_unref((uv_handle_t *)data->handle);
103
0
        MVM_free(req);
104
0
        MVM_telemetry_interval_stop(tc, interval_id, "syncstream.write_bytes failed");
105
0
        MVM_exception_throw_adhoc(tc, "Failed to write bytes to stream: %s", uv_strerror(r));
106
0
    }
107
0
    else {
108
0
        MVM_gc_mark_thread_blocked(tc);
109
0
        uv_run(tc->loop, UV_RUN_DEFAULT);
110
0
        MVM_gc_mark_thread_unblocked(tc);
111
0
    }
112
0
    MVM_telemetry_interval_annotate(bytes, interval_id, "written this many bytes");
113
0
    MVM_telemetry_interval_stop(tc, interval_id, "syncstream.write_bytes");
114
0
    data->position += bytes;
115
0
    return bytes;
116
0
}
117
118
/* No flush available for stream. */
119
0
void MVM_io_syncstream_flush(MVMThreadContext *tc, MVMOSHandle *h){
120
0
}
121
122
/* Cannot truncate a stream. */
123
0
void MVM_io_syncstream_truncate(MVMThreadContext *tc, MVMOSHandle *h, MVMint64 bytes) {
124
0
    MVM_exception_throw_adhoc(tc, "Cannot truncate this kind of handle");
125
0
}
126
127
/* A close function for the case when we simply have a handle. */
128
0
static MVMint64 not_std_handle(MVMThreadContext *tc, MVMObject *h) {
129
0
    return h != tc->instance->stdin_handle &&
130
0
           h != tc->instance->stdout_handle &&
131
0
           h != tc->instance->stderr_handle;
132
0
}
133
0
static MVMint64 closefh(MVMThreadContext *tc, MVMOSHandle *h) {
134
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
135
0
    if (data->handle && not_std_handle(tc, (MVMObject *)h)) {
136
0
        uv_close((uv_handle_t *)data->handle, NULL);
137
0
        data->handle = NULL;
138
0
    }
139
0
    return 0;
140
0
}
141
142
0
static MVMint64 is_tty(MVMThreadContext *tc, MVMOSHandle *h) {
143
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
144
0
    return data->is_tty;
145
0
}
146
147
/* Get native file descriptor. */
148
0
static MVMint64 mvm_fileno(MVMThreadContext *tc, MVMOSHandle *h) {
149
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
150
0
    uv_os_fd_t fd;
151
0
    if (uv_fileno((uv_handle_t *)data->handle, &fd) >= 0)
152
0
        return (MVMint64)fd;
153
0
    return -1;
154
0
}
155
156
/* Operations aiding process spawning and I/O handling. */
157
0
static void bind_stdio_handle(MVMThreadContext *tc, MVMOSHandle *h, uv_stdio_container_t *stdio) {
158
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data;
159
0
    stdio->flags              = UV_INHERIT_STREAM;
160
0
    stdio->data.stream        = data->handle;
161
0
}
162
163
/* Frees data associated with the handle, closing it if needed. */
164
0
static void gc_free(MVMThreadContext *tc, MVMObject *h, void *d) {
165
0
    MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)d;
166
0
    if (data) {
167
0
        if (data->handle) {
168
0
            uv_close((uv_handle_t *)data->handle, NULL);
169
0
            uv_run(tc->loop, UV_RUN_DEFAULT);
170
0
            MVM_free(data->handle);
171
0
            data->handle = NULL;
172
0
        }
173
0
        MVM_free(data);
174
0
    }
175
0
}
176
177
/* IO ops table, populated with functions. */
178
static const MVMIOClosable     closable      = { closefh };
179
static const MVMIOSyncReadable sync_readable = { MVM_io_syncstream_read_bytes,
180
                                                 MVM_io_syncstream_eof };
181
static const MVMIOSyncWritable sync_writable = { MVM_io_syncstream_write_bytes,
182
                                                 MVM_io_syncstream_flush,
183
                                                 MVM_io_syncstream_truncate };
184
static const MVMIOSeekable          seekable = { MVM_io_syncstream_seek,
185
                                                 MVM_io_syncstream_tell };
186
static const MVMIOPipeable     pipeable      = { bind_stdio_handle };
187
188
static const MVMIOIntrospection introspection = { is_tty, mvm_fileno };
189
190
static const MVMIOOps op_table = {
191
    &closable,
192
    &sync_readable,
193
    &sync_writable,
194
    NULL,
195
    NULL,
196
    NULL,
197
    &seekable,
198
    NULL,
199
    &pipeable,
200
    NULL,
201
    &introspection,
202
    NULL,
203
    NULL,
204
    gc_free
205
};