/home/travis/build/MoarVM/MoarVM/src/io/syncstream.c
Line | Count | Source (jump to first uncovered line) |
1 | | #include "moar.h" |
2 | | |
3 | | /* We only get asynchronous forms of various kinds of I/O with libuv, yet we |
4 | | * also need to provide synchronous I/O on those. Here we do the work of that |
5 | | * adaptation. Since many things are exposed as streams in libuv, the code in |
6 | | * here is used to implement synchronous handling of TTYs, pipes, and sockets. |
7 | | * TTYs and (libuv) pipes use this directly; (real) pipes and sockets use many |
8 | | * of the functions, but have their own tables. |
9 | | */ |
10 | | |
11 | | /* Number of bytes we pull in at a time to the buffer. */ |
12 | | #define CHUNK_SIZE 65536 |
13 | | |
14 | | /* Cannot seek a TTY of named pipe (could fake the forward case, probably). */ |
15 | 0 | void MVM_io_syncstream_seek(MVMThreadContext *tc, MVMOSHandle *h, MVMint64 offset, MVMint64 whence) { |
16 | 0 | MVM_exception_throw_adhoc(tc, "Cannot seek this kind of handle"); |
17 | 0 | } |
18 | | |
19 | | /* If we've been reading, the total number of bytes read. Otherwise, the total |
20 | | * number of bytes we've written. */ |
21 | 0 | MVMint64 MVM_io_syncstream_tell(MVMThreadContext *tc, MVMOSHandle *h) { |
22 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
23 | 0 | return data->position; |
24 | 0 | } |
25 | | |
26 | | /* Reads the specified number of bytes into a the supplied buffer, returing |
27 | | * the number actually read. */ |
28 | 0 | static void on_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { |
29 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)handle->data; |
30 | 0 | buf->base = MVM_malloc(data->to_read); |
31 | 0 | buf->len = data->to_read; |
32 | 0 | MVM_telemetry_interval_annotate(data->to_read, data->interval_id, "alloced this much space"); |
33 | 0 | } |
34 | 0 | static void on_read(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) { |
35 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)handle->data; |
36 | 0 | if (nread > 0) { |
37 | 0 | data->buf = buf->base; |
38 | 0 | data->nread = nread; |
39 | 0 | } |
40 | 0 | else if (nread == UV_EOF) { |
41 | 0 | data->buf = NULL; |
42 | 0 | data->nread = 0; |
43 | 0 | data->eof = 1; |
44 | 0 | if (buf->base) |
45 | 0 | MVM_free(buf->base); |
46 | 0 | } |
47 | 0 | uv_read_stop(handle); |
48 | 0 | uv_unref((uv_handle_t*)handle); |
49 | 0 | } |
50 | | MVMint64 MVM_io_syncstream_read_bytes(MVMThreadContext *tc, MVMOSHandle *h, char **buf_out, |
51 | 0 | MVMint64 bytes) { |
52 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
53 | 0 | if (bytes > 0 && !data->eof) { |
54 | 0 | int r; |
55 | 0 | unsigned int interval_id; |
56 | 0 |
|
57 | 0 | interval_id = MVM_telemetry_interval_start(tc, "syncstream.read_bytes"); |
58 | 0 | data->handle->data = data; |
59 | 0 | data->cur_tc = tc; |
60 | 0 | data->to_read = bytes; |
61 | 0 | if ((r = uv_read_start(data->handle, on_alloc, on_read)) < 0) |
62 | 0 | MVM_exception_throw_adhoc(tc, "Reading from stream failed: %s", |
63 | 0 | uv_strerror(r)); |
64 | 0 | uv_ref((uv_handle_t *)data->handle); |
65 | 0 | if (tc->loop != data->handle->loop) |
66 | 0 | MVM_exception_throw_adhoc(tc, "Tried to read() from an IO handle outside its originating thread"); |
67 | 0 | MVM_gc_mark_thread_blocked(tc); |
68 | 0 | uv_run(tc->loop, UV_RUN_DEFAULT); |
69 | 0 | MVM_gc_mark_thread_unblocked(tc); |
70 | 0 | MVM_telemetry_interval_annotate(data->nread, data->interval_id, "read this many bytes"); |
71 | 0 | MVM_telemetry_interval_stop(tc, interval_id, "syncstream.read_to_buffer"); |
72 | 0 | *buf_out = data->buf; |
73 | 0 | return data->nread; |
74 | 0 | } |
75 | 0 | else { |
76 | 0 | *buf_out = NULL; |
77 | 0 | return 0; |
78 | 0 | } |
79 | 0 | } |
80 | | |
81 | | /* Checks if the end of stream has been reached. */ |
82 | 0 | MVMint64 MVM_io_syncstream_eof(MVMThreadContext *tc, MVMOSHandle *h) { |
83 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
84 | 0 | return data->eof; |
85 | 0 | } |
86 | | |
87 | | /* Writes the specified bytes to the stream. */ |
88 | 0 | static void write_cb(uv_write_t* req, int status) { |
89 | 0 | uv_unref((uv_handle_t *)req->handle); |
90 | 0 | MVM_free(req); |
91 | 0 | } |
92 | 0 | MVMint64 MVM_io_syncstream_write_bytes(MVMThreadContext *tc, MVMOSHandle *h, char *buf, MVMint64 bytes) { |
93 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
94 | 0 | uv_write_t *req = MVM_malloc(sizeof(uv_write_t)); |
95 | 0 | uv_buf_t write_buf = uv_buf_init(buf, bytes); |
96 | 0 | int r; |
97 | 0 | unsigned int interval_id; |
98 | 0 |
|
99 | 0 | interval_id = MVM_telemetry_interval_start(tc, "syncstream.write_bytes"); |
100 | 0 | uv_ref((uv_handle_t *)data->handle); |
101 | 0 | if ((r = uv_write(req, data->handle, &write_buf, 1, write_cb)) < 0) { |
102 | 0 | uv_unref((uv_handle_t *)data->handle); |
103 | 0 | MVM_free(req); |
104 | 0 | MVM_telemetry_interval_stop(tc, interval_id, "syncstream.write_bytes failed"); |
105 | 0 | MVM_exception_throw_adhoc(tc, "Failed to write bytes to stream: %s", uv_strerror(r)); |
106 | 0 | } |
107 | 0 | else { |
108 | 0 | MVM_gc_mark_thread_blocked(tc); |
109 | 0 | uv_run(tc->loop, UV_RUN_DEFAULT); |
110 | 0 | MVM_gc_mark_thread_unblocked(tc); |
111 | 0 | } |
112 | 0 | MVM_telemetry_interval_annotate(bytes, interval_id, "written this many bytes"); |
113 | 0 | MVM_telemetry_interval_stop(tc, interval_id, "syncstream.write_bytes"); |
114 | 0 | data->position += bytes; |
115 | 0 | return bytes; |
116 | 0 | } |
117 | | |
118 | | /* No flush available for stream. */ |
119 | 0 | void MVM_io_syncstream_flush(MVMThreadContext *tc, MVMOSHandle *h){ |
120 | 0 | } |
121 | | |
122 | | /* Cannot truncate a stream. */ |
123 | 0 | void MVM_io_syncstream_truncate(MVMThreadContext *tc, MVMOSHandle *h, MVMint64 bytes) { |
124 | 0 | MVM_exception_throw_adhoc(tc, "Cannot truncate this kind of handle"); |
125 | 0 | } |
126 | | |
127 | | /* A close function for the case when we simply have a handle. */ |
128 | 0 | static MVMint64 not_std_handle(MVMThreadContext *tc, MVMObject *h) { |
129 | 0 | return h != tc->instance->stdin_handle && |
130 | 0 | h != tc->instance->stdout_handle && |
131 | 0 | h != tc->instance->stderr_handle; |
132 | 0 | } |
133 | 0 | static MVMint64 closefh(MVMThreadContext *tc, MVMOSHandle *h) { |
134 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
135 | 0 | if (data->handle && not_std_handle(tc, (MVMObject *)h)) { |
136 | 0 | uv_close((uv_handle_t *)data->handle, NULL); |
137 | 0 | data->handle = NULL; |
138 | 0 | } |
139 | 0 | return 0; |
140 | 0 | } |
141 | | |
142 | 0 | static MVMint64 is_tty(MVMThreadContext *tc, MVMOSHandle *h) { |
143 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
144 | 0 | return data->is_tty; |
145 | 0 | } |
146 | | |
147 | | /* Get native file descriptor. */ |
148 | 0 | static MVMint64 mvm_fileno(MVMThreadContext *tc, MVMOSHandle *h) { |
149 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
150 | 0 | uv_os_fd_t fd; |
151 | 0 | if (uv_fileno((uv_handle_t *)data->handle, &fd) >= 0) |
152 | 0 | return (MVMint64)fd; |
153 | 0 | return -1; |
154 | 0 | } |
155 | | |
156 | | /* Operations aiding process spawning and I/O handling. */ |
157 | 0 | static void bind_stdio_handle(MVMThreadContext *tc, MVMOSHandle *h, uv_stdio_container_t *stdio) { |
158 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)h->body.data; |
159 | 0 | stdio->flags = UV_INHERIT_STREAM; |
160 | 0 | stdio->data.stream = data->handle; |
161 | 0 | } |
162 | | |
163 | | /* Frees data associated with the handle, closing it if needed. */ |
164 | 0 | static void gc_free(MVMThreadContext *tc, MVMObject *h, void *d) { |
165 | 0 | MVMIOSyncStreamData *data = (MVMIOSyncStreamData *)d; |
166 | 0 | if (data) { |
167 | 0 | if (data->handle) { |
168 | 0 | uv_close((uv_handle_t *)data->handle, NULL); |
169 | 0 | uv_run(tc->loop, UV_RUN_DEFAULT); |
170 | 0 | MVM_free(data->handle); |
171 | 0 | data->handle = NULL; |
172 | 0 | } |
173 | 0 | MVM_free(data); |
174 | 0 | } |
175 | 0 | } |
176 | | |
177 | | /* IO ops table, populated with functions. */ |
178 | | static const MVMIOClosable closable = { closefh }; |
179 | | static const MVMIOSyncReadable sync_readable = { MVM_io_syncstream_read_bytes, |
180 | | MVM_io_syncstream_eof }; |
181 | | static const MVMIOSyncWritable sync_writable = { MVM_io_syncstream_write_bytes, |
182 | | MVM_io_syncstream_flush, |
183 | | MVM_io_syncstream_truncate }; |
184 | | static const MVMIOSeekable seekable = { MVM_io_syncstream_seek, |
185 | | MVM_io_syncstream_tell }; |
186 | | static const MVMIOPipeable pipeable = { bind_stdio_handle }; |
187 | | |
188 | | static const MVMIOIntrospection introspection = { is_tty, mvm_fileno }; |
189 | | |
190 | | static const MVMIOOps op_table = { |
191 | | &closable, |
192 | | &sync_readable, |
193 | | &sync_writable, |
194 | | NULL, |
195 | | NULL, |
196 | | NULL, |
197 | | &seekable, |
198 | | NULL, |
199 | | &pipeable, |
200 | | NULL, |
201 | | &introspection, |
202 | | NULL, |
203 | | NULL, |
204 | | gc_free |
205 | | }; |