/home/travis/build/MoarVM/MoarVM/src/io/asyncsocket.c
Line | Count | Source (jump to first uncovered line) |
1 | | #include "moar.h" |
2 | | |
3 | | /* Data that we keep for an asynchronous socket handle. */ |
4 | | typedef struct { |
5 | | /* The libuv handle to the socket. */ |
6 | | uv_stream_t *handle; |
7 | | } MVMIOAsyncSocketData; |
8 | | |
9 | | /* Info we convey about a read task. */ |
10 | | typedef struct { |
11 | | MVMOSHandle *handle; |
12 | | MVMObject *buf_type; |
13 | | int seq_number; |
14 | | MVMThreadContext *tc; |
15 | | int work_idx; |
16 | | } ReadInfo; |
17 | | |
18 | | /* Allocates a buffer of the suggested size. */ |
19 | 0 | static void on_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { |
20 | 0 | size_t size = suggested_size > 0 ? suggested_size : 4; |
21 | 0 | buf->base = MVM_malloc(size); |
22 | 0 | buf->len = size; |
23 | 0 | } |
24 | | |
25 | | /* Callback used to simply free memory on close. */ |
26 | 0 | static void free_on_close_cb(uv_handle_t *handle) { |
27 | 0 | MVM_free(handle); |
28 | 0 | } |
29 | | |
30 | | /* Read handler. */ |
31 | 0 | static void on_read(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) { |
32 | 0 | ReadInfo *ri = (ReadInfo *)handle->data; |
33 | 0 | MVMThreadContext *tc = ri->tc; |
34 | 0 | MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); |
35 | 0 | MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, ri->work_idx); |
36 | 0 | MVM_repr_push_o(tc, arr, t->body.schedulee); |
37 | 0 | if (nread >= 0) { |
38 | 0 | MVMROOT2(tc, t, arr, { |
39 | 0 | MVMArray *res_buf; |
40 | 0 |
|
41 | 0 | /* Push the sequence number. */ |
42 | 0 | MVMObject *seq_boxed = MVM_repr_box_int(tc, |
43 | 0 | tc->instance->boot_types.BOOTInt, ri->seq_number++); |
44 | 0 | MVM_repr_push_o(tc, arr, seq_boxed); |
45 | 0 |
|
46 | 0 | /* Produce a buffer and push it. */ |
47 | 0 | res_buf = (MVMArray *)MVM_repr_alloc_init(tc, ri->buf_type); |
48 | 0 | res_buf->body.slots.i8 = (MVMint8 *)buf->base; |
49 | 0 | res_buf->body.start = 0; |
50 | 0 | res_buf->body.ssize = buf->len; |
51 | 0 | res_buf->body.elems = nread; |
52 | 0 | MVM_repr_push_o(tc, arr, (MVMObject *)res_buf); |
53 | 0 |
|
54 | 0 | /* Finally, no error. */ |
55 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
56 | 0 | }); |
57 | 0 | } |
58 | 0 | else { |
59 | 0 | MVMIOAsyncSocketData *handle_data = (MVMIOAsyncSocketData *)ri->handle->body.data; |
60 | 0 | uv_handle_t *conn_handle = (uv_handle_t *)handle_data->handle; |
61 | 0 | if (nread == UV_EOF) { |
62 | 0 | MVMROOT2(tc, t, arr, { |
63 | 0 | MVMObject *final = MVM_repr_box_int(tc, |
64 | 0 | tc->instance->boot_types.BOOTInt, ri->seq_number); |
65 | 0 | MVM_repr_push_o(tc, arr, final); |
66 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
67 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
68 | 0 | }); |
69 | 0 | } |
70 | 0 | else { |
71 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); |
72 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
73 | 0 | MVMROOT2(tc, t, arr, { |
74 | 0 | MVMString *msg_str = MVM_string_ascii_decode_nt(tc, |
75 | 0 | tc->instance->VMString, uv_strerror(nread)); |
76 | 0 | MVMObject *msg_box = MVM_repr_box_str(tc, |
77 | 0 | tc->instance->boot_types.BOOTStr, msg_str); |
78 | 0 | MVM_repr_push_o(tc, arr, msg_box); |
79 | 0 | }); |
80 | 0 | } |
81 | 0 | if (buf->base) |
82 | 0 | MVM_free(buf->base); |
83 | 0 | MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx)); |
84 | 0 | if (conn_handle && !uv_is_closing(conn_handle)) { |
85 | 0 | handle_data->handle = NULL; |
86 | 0 | uv_close(conn_handle, free_on_close_cb); |
87 | 0 | } |
88 | 0 | } |
89 | 0 | MVM_repr_push_o(tc, t->body.queue, arr); |
90 | 0 | } |
91 | | |
92 | | /* Does setup work for setting up asynchronous reads. */ |
93 | 0 | static void read_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { |
94 | 0 | MVMIOAsyncSocketData *handle_data; |
95 | 0 | ReadInfo *ri; |
96 | 0 | int r; |
97 | 0 |
|
98 | 0 | /* Ensure not closed. */ |
99 | 0 | ri = (ReadInfo *)data; |
100 | 0 | handle_data = (MVMIOAsyncSocketData *)ri->handle->body.data; |
101 | 0 | if (!handle_data->handle || uv_is_closing((uv_handle_t *)handle_data->handle)) { |
102 | 0 | /* Closed, so immediately send done. */ |
103 | 0 | MVMAsyncTask *t = (MVMAsyncTask *)async_task; |
104 | 0 | MVMROOT(tc, t, { |
105 | 0 | MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); |
106 | 0 | MVM_repr_push_o(tc, arr, t->body.schedulee); |
107 | 0 | MVMROOT(tc, arr, { |
108 | 0 | MVMObject *final = MVM_repr_box_int(tc, |
109 | 0 | tc->instance->boot_types.BOOTInt, ri->seq_number); |
110 | 0 | MVM_repr_push_o(tc, arr, final); |
111 | 0 | }); |
112 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
113 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
114 | 0 | MVM_repr_push_o(tc, t->body.queue, arr); |
115 | 0 | }); |
116 | 0 | return; |
117 | 0 | } |
118 | 0 |
|
119 | 0 | /* Add to work in progress. */ |
120 | 0 | ri->tc = tc; |
121 | 0 | ri->work_idx = MVM_io_eventloop_add_active_work(tc, async_task); |
122 | 0 |
|
123 | 0 | /* Start reading the stream. */ |
124 | 0 | handle_data->handle->data = data; |
125 | 0 | if ((r = uv_read_start(handle_data->handle, on_alloc, on_read)) < 0) { |
126 | 0 | /* Error; need to notify. */ |
127 | 0 | MVMROOT(tc, async_task, { |
128 | 0 | MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); |
129 | 0 | MVMAsyncTask *t = (MVMAsyncTask *)async_task; |
130 | 0 | MVM_repr_push_o(tc, arr, t->body.schedulee); |
131 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); |
132 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
133 | 0 | MVMROOT(tc, arr, { |
134 | 0 | MVMString *msg_str = MVM_string_ascii_decode_nt(tc, |
135 | 0 | tc->instance->VMString, uv_strerror(r)); |
136 | 0 | MVMObject *msg_box = MVM_repr_box_str(tc, |
137 | 0 | tc->instance->boot_types.BOOTStr, msg_str); |
138 | 0 | MVM_repr_push_o(tc, arr, msg_box); |
139 | 0 | }); |
140 | 0 | MVM_repr_push_o(tc, t->body.queue, arr); |
141 | 0 | }); |
142 | 0 | MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx)); |
143 | 0 | } |
144 | 0 | } |
145 | | |
146 | | /* Stops reading. */ |
147 | 0 | static void read_cancel(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { |
148 | 0 | ReadInfo *ri = (ReadInfo *)data; |
149 | 0 | if (ri->work_idx >= 0) { |
150 | 0 | MVMIOAsyncSocketData *handle_data = (MVMIOAsyncSocketData *)ri->handle->body.data; |
151 | 0 | if (handle_data->handle && !uv_is_closing((uv_handle_t *)handle_data->handle)) |
152 | 0 | uv_read_stop(handle_data->handle); |
153 | 0 | MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx)); |
154 | 0 | } |
155 | 0 | } |
156 | | |
157 | | /* Marks objects for a read task. */ |
158 | 0 | static void read_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) { |
159 | 0 | ReadInfo *ri = (ReadInfo *)data; |
160 | 0 | MVM_gc_worklist_add(tc, worklist, &ri->buf_type); |
161 | 0 | MVM_gc_worklist_add(tc, worklist, &ri->handle); |
162 | 0 | } |
163 | | |
164 | | /* Frees info for a read task. */ |
165 | 0 | static void read_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) { |
166 | 0 | if (data) |
167 | 0 | MVM_free(data); |
168 | 0 | } |
169 | | |
170 | | /* Operations table for async read task. */ |
171 | | static const MVMAsyncTaskOps read_op_table = { |
172 | | read_setup, |
173 | | NULL, |
174 | | read_cancel, |
175 | | read_gc_mark, |
176 | | read_gc_free |
177 | | }; |
178 | | |
179 | | static MVMAsyncTask * read_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue, |
180 | 0 | MVMObject *schedulee, MVMObject *buf_type, MVMObject *async_type) { |
181 | 0 | MVMAsyncTask *task; |
182 | 0 | ReadInfo *ri; |
183 | 0 |
|
184 | 0 | /* Validate REPRs. */ |
185 | 0 | if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue) |
186 | 0 | MVM_exception_throw_adhoc(tc, |
187 | 0 | "asyncreadbytes target queue must have ConcBlockingQueue REPR (got %s)", |
188 | 0 | MVM_6model_get_stable_debug_name(tc, queue->st)); |
189 | 0 | if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask) |
190 | 0 | MVM_exception_throw_adhoc(tc, |
191 | 0 | "asyncreadbytes result type must have REPR AsyncTask"); |
192 | 0 | if (REPR(buf_type)->ID == MVM_REPR_ID_VMArray) { |
193 | 0 | MVMint32 slot_type = ((MVMArrayREPRData *)STABLE(buf_type)->REPR_data)->slot_type; |
194 | 0 | if (slot_type != MVM_ARRAY_U8 && slot_type != MVM_ARRAY_I8) |
195 | 0 | MVM_exception_throw_adhoc(tc, "asyncreadbytes buffer type must be an array of uint8 or int8"); |
196 | 0 | } |
197 | 0 | else { |
198 | 0 | MVM_exception_throw_adhoc(tc, "asyncreadbytes buffer type must be an array"); |
199 | 0 | } |
200 | 0 |
|
201 | 0 | /* Create async task handle. */ |
202 | 0 | MVMROOT4(tc, queue, schedulee, h, buf_type, { |
203 | 0 | task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type); |
204 | 0 | }); |
205 | 0 | MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue); |
206 | 0 | MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee); |
207 | 0 | task->body.ops = &read_op_table; |
208 | 0 | ri = MVM_calloc(1, sizeof(ReadInfo)); |
209 | 0 | MVM_ASSIGN_REF(tc, &(task->common.header), ri->buf_type, buf_type); |
210 | 0 | MVM_ASSIGN_REF(tc, &(task->common.header), ri->handle, h); |
211 | 0 | task->body.data = ri; |
212 | 0 |
|
213 | 0 | /* Hand the task off to the event loop. */ |
214 | 0 | MVMROOT(tc, task, { |
215 | 0 | MVM_io_eventloop_queue_work(tc, (MVMObject *)task); |
216 | 0 | }); |
217 | 0 |
|
218 | 0 | return task; |
219 | 0 | } |
220 | | |
221 | | /* Info we convey about a write task. */ |
222 | | typedef struct { |
223 | | MVMOSHandle *handle; |
224 | | MVMObject *buf_data; |
225 | | uv_write_t *req; |
226 | | uv_buf_t buf; |
227 | | MVMThreadContext *tc; |
228 | | int work_idx; |
229 | | } WriteInfo; |
230 | | |
231 | | /* Completion handler for an asynchronous write. */ |
232 | 0 | static void on_write(uv_write_t *req, int status) { |
233 | 0 | WriteInfo *wi = (WriteInfo *)req->data; |
234 | 0 | MVMThreadContext *tc = wi->tc; |
235 | 0 | MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); |
236 | 0 | MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, wi->work_idx); |
237 | 0 | MVM_repr_push_o(tc, arr, t->body.schedulee); |
238 | 0 | if (status >= 0) { |
239 | 0 | MVMROOT2(tc, arr, t, { |
240 | 0 | MVMObject *bytes_box = MVM_repr_box_int(tc, |
241 | 0 | tc->instance->boot_types.BOOTInt, |
242 | 0 | wi->buf.len); |
243 | 0 | MVM_repr_push_o(tc, arr, bytes_box); |
244 | 0 | }); |
245 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
246 | 0 | } |
247 | 0 | else { |
248 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); |
249 | 0 | MVMROOT2(tc, arr, t, { |
250 | 0 | MVMString *msg_str = MVM_string_ascii_decode_nt(tc, |
251 | 0 | tc->instance->VMString, uv_strerror(status)); |
252 | 0 | MVMObject *msg_box = MVM_repr_box_str(tc, |
253 | 0 | tc->instance->boot_types.BOOTStr, msg_str); |
254 | 0 | MVM_repr_push_o(tc, arr, msg_box); |
255 | 0 | }); |
256 | 0 | } |
257 | 0 | MVM_repr_push_o(tc, t->body.queue, arr); |
258 | 0 | MVM_free(wi->req); |
259 | 0 | MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx)); |
260 | 0 | } |
261 | | |
262 | | /* Does setup work for an asynchronous write. */ |
263 | 0 | static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { |
264 | 0 | MVMIOAsyncSocketData *handle_data; |
265 | 0 | MVMArray *buffer; |
266 | 0 | WriteInfo *wi; |
267 | 0 | char *output; |
268 | 0 | int output_size, r; |
269 | 0 |
|
270 | 0 | /* Ensure not closed. */ |
271 | 0 | wi = (WriteInfo *)data; |
272 | 0 | handle_data = (MVMIOAsyncSocketData *)wi->handle->body.data; |
273 | 0 | if (!handle_data->handle || uv_is_closing((uv_handle_t *)handle_data->handle)) { |
274 | 0 | MVMROOT(tc, async_task, { |
275 | 0 | MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); |
276 | 0 | MVMAsyncTask *t = (MVMAsyncTask *)async_task; |
277 | 0 | MVM_repr_push_o(tc, arr, t->body.schedulee); |
278 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); |
279 | 0 | MVMROOT(tc, arr, { |
280 | 0 | MVMString *msg_str = MVM_string_ascii_decode_nt(tc, |
281 | 0 | tc->instance->VMString, "Cannot write to a closed socket"); |
282 | 0 | MVMObject *msg_box = MVM_repr_box_str(tc, |
283 | 0 | tc->instance->boot_types.BOOTStr, msg_str); |
284 | 0 | MVM_repr_push_o(tc, arr, msg_box); |
285 | 0 | }); |
286 | 0 | MVM_repr_push_o(tc, t->body.queue, arr); |
287 | 0 | }); |
288 | 0 | return; |
289 | 0 | } |
290 | 0 |
|
291 | 0 | /* Add to work in progress. */ |
292 | 0 | wi->tc = tc; |
293 | 0 | wi->work_idx = MVM_io_eventloop_add_active_work(tc, async_task); |
294 | 0 |
|
295 | 0 | /* Extract buf data. */ |
296 | 0 | buffer = (MVMArray *)wi->buf_data; |
297 | 0 | output = (char *)(buffer->body.slots.i8 + buffer->body.start); |
298 | 0 | output_size = (int)buffer->body.elems; |
299 | 0 |
|
300 | 0 | /* Create and initialize write request. */ |
301 | 0 | wi->req = MVM_malloc(sizeof(uv_write_t)); |
302 | 0 | wi->buf = uv_buf_init(output, output_size); |
303 | 0 | wi->req->data = data; |
304 | 0 |
|
305 | 0 | if ((r = uv_write(wi->req, handle_data->handle, &(wi->buf), 1, on_write)) < 0) { |
306 | 0 | /* Error; need to notify. */ |
307 | 0 | MVMROOT(tc, async_task, { |
308 | 0 | MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); |
309 | 0 | MVMAsyncTask *t = (MVMAsyncTask *)async_task; |
310 | 0 | MVM_repr_push_o(tc, arr, t->body.schedulee); |
311 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); |
312 | 0 | MVMROOT(tc, arr, { |
313 | 0 | MVMString *msg_str = MVM_string_ascii_decode_nt(tc, |
314 | 0 | tc->instance->VMString, uv_strerror(r)); |
315 | 0 | MVMObject *msg_box = MVM_repr_box_str(tc, |
316 | 0 | tc->instance->boot_types.BOOTStr, msg_str); |
317 | 0 | MVM_repr_push_o(tc, arr, msg_box); |
318 | 0 | }); |
319 | 0 | MVM_repr_push_o(tc, t->body.queue, arr); |
320 | 0 | }); |
321 | 0 |
|
322 | 0 | /* Cleanup handle. */ |
323 | 0 | MVM_free(wi->req); |
324 | 0 | wi->req = NULL; |
325 | 0 | MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx)); |
326 | 0 | } |
327 | 0 | } |
328 | | |
329 | | /* Marks objects for a write task. */ |
330 | 0 | static void write_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) { |
331 | 0 | WriteInfo *wi = (WriteInfo *)data; |
332 | 0 | MVM_gc_worklist_add(tc, worklist, &wi->handle); |
333 | 0 | MVM_gc_worklist_add(tc, worklist, &wi->buf_data); |
334 | 0 | } |
335 | | |
336 | | /* Frees info for a write task. */ |
337 | 0 | static void write_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) { |
338 | 0 | if (data) |
339 | 0 | MVM_free(data); |
340 | 0 | } |
341 | | |
342 | | /* Operations table for async write task. */ |
343 | | static const MVMAsyncTaskOps write_op_table = { |
344 | | write_setup, |
345 | | NULL, |
346 | | NULL, |
347 | | write_gc_mark, |
348 | | write_gc_free |
349 | | }; |
350 | | |
351 | | static MVMAsyncTask * write_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue, |
352 | 0 | MVMObject *schedulee, MVMObject *buffer, MVMObject *async_type) { |
353 | 0 | MVMAsyncTask *task; |
354 | 0 | WriteInfo *wi; |
355 | 0 |
|
356 | 0 | /* Validate REPRs. */ |
357 | 0 | if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue) |
358 | 0 | MVM_exception_throw_adhoc(tc, |
359 | 0 | "asyncwritebytes target queue must have ConcBlockingQueue REPR"); |
360 | 0 | if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask) |
361 | 0 | MVM_exception_throw_adhoc(tc, |
362 | 0 | "asyncwritebytes result type must have REPR AsyncTask"); |
363 | 0 | if (!IS_CONCRETE(buffer) || REPR(buffer)->ID != MVM_REPR_ID_VMArray) |
364 | 0 | MVM_exception_throw_adhoc(tc, "asyncwritebytes requires a native array to read from"); |
365 | 0 | if (((MVMArrayREPRData *)STABLE(buffer)->REPR_data)->slot_type != MVM_ARRAY_U8 |
366 | 0 | && ((MVMArrayREPRData *)STABLE(buffer)->REPR_data)->slot_type != MVM_ARRAY_I8) |
367 | 0 | MVM_exception_throw_adhoc(tc, "asyncwritebytes requires a native array of uint8 or int8"); |
368 | 0 |
|
369 | 0 | /* Create async task handle. */ |
370 | 0 | MVMROOT4(tc, queue, schedulee, h, buffer, { |
371 | 0 | task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type); |
372 | 0 | }); |
373 | 0 | MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue); |
374 | 0 | MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee); |
375 | 0 | task->body.ops = &write_op_table; |
376 | 0 | wi = MVM_calloc(1, sizeof(WriteInfo)); |
377 | 0 | MVM_ASSIGN_REF(tc, &(task->common.header), wi->handle, h); |
378 | 0 | MVM_ASSIGN_REF(tc, &(task->common.header), wi->buf_data, buffer); |
379 | 0 | task->body.data = wi; |
380 | 0 |
|
381 | 0 | /* Hand the task off to the event loop. */ |
382 | 0 | MVMROOT(tc, task, { |
383 | 0 | MVM_io_eventloop_queue_work(tc, (MVMObject *)task); |
384 | 0 | }); |
385 | 0 |
|
386 | 0 | return task; |
387 | 0 | } |
388 | | |
389 | | /* Info we convey about a socket close task. */ |
390 | | typedef struct { |
391 | | MVMOSHandle *handle; |
392 | | } CloseInfo; |
393 | | |
394 | | /* Does an asynchronous close (since it must run on the event loop). */ |
395 | 0 | static void close_perform(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { |
396 | 0 | CloseInfo *ci = (CloseInfo *)data; |
397 | 0 | MVMIOAsyncSocketData *handle_data = (MVMIOAsyncSocketData *)ci->handle->body.data; |
398 | 0 | uv_handle_t *handle = (uv_handle_t *)handle_data->handle; |
399 | 0 | if (handle && !uv_is_closing(handle)) { |
400 | 0 | handle_data->handle = NULL; |
401 | 0 | uv_close(handle, free_on_close_cb); |
402 | 0 | } |
403 | 0 | } |
404 | | |
405 | | /* Marks objects for a close task. */ |
406 | 0 | static void close_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) { |
407 | 0 | CloseInfo *ci = (CloseInfo *)data; |
408 | 0 | MVM_gc_worklist_add(tc, worklist, &ci->handle); |
409 | 0 | } |
410 | | |
411 | | /* Frees info for a close task. */ |
412 | 0 | static void close_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) { |
413 | 0 | if (data) |
414 | 0 | MVM_free(data); |
415 | 0 | } |
416 | | |
417 | | /* Operations table for async close task. */ |
418 | | static const MVMAsyncTaskOps close_op_table = { |
419 | | close_perform, |
420 | | NULL, |
421 | | NULL, |
422 | | close_gc_mark, |
423 | | close_gc_free |
424 | | }; |
425 | | |
426 | 0 | static MVMint64 close_socket(MVMThreadContext *tc, MVMOSHandle *h) { |
427 | 0 | MVMAsyncTask *task; |
428 | 0 | CloseInfo *ci; |
429 | 0 |
|
430 | 0 | MVMROOT(tc, h, { |
431 | 0 | task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, |
432 | 0 | tc->instance->boot_types.BOOTAsync); |
433 | 0 | }); |
434 | 0 | task->body.ops = &close_op_table; |
435 | 0 | ci = MVM_calloc(1, sizeof(CloseInfo)); |
436 | 0 | MVM_ASSIGN_REF(tc, &(task->common.header), ci->handle, h); |
437 | 0 | task->body.data = ci; |
438 | 0 | MVM_io_eventloop_queue_work(tc, (MVMObject *)task); |
439 | 0 |
|
440 | 0 | return 0; |
441 | 0 | } |
442 | | |
443 | | /* IO ops table, populated with functions. */ |
444 | | static const MVMIOClosable closable = { close_socket }; |
445 | | static const MVMIOAsyncReadable async_readable = { read_bytes }; |
446 | | static const MVMIOAsyncWritable async_writable = { write_bytes }; |
447 | | static const MVMIOOps op_table = { |
448 | | &closable, |
449 | | NULL, |
450 | | NULL, |
451 | | &async_readable, |
452 | | &async_writable, |
453 | | NULL, |
454 | | NULL, |
455 | | NULL, |
456 | | NULL, |
457 | | NULL, |
458 | | NULL, |
459 | | NULL, |
460 | | NULL, |
461 | | NULL |
462 | | }; |
463 | | |
464 | 0 | static void push_name_and_port(MVMThreadContext *tc, struct sockaddr_storage *name, MVMObject *arr) { |
465 | 0 | char addrstr[INET6_ADDRSTRLEN + 1]; |
466 | 0 | /* XXX windows support kludge. 64 bit is much too big, but we'll |
467 | 0 | * get the proper data from the struct anyway, however windows |
468 | 0 | * decides to declare it. */ |
469 | 0 | MVMuint64 port; |
470 | 0 | MVMObject *host_o; |
471 | 0 | MVMObject *port_o; |
472 | 0 | switch (name->ss_family) { |
473 | 0 | case AF_INET6: { |
474 | 0 | uv_ip6_name((struct sockaddr_in6*)name, addrstr, INET6_ADDRSTRLEN + 1); |
475 | 0 | port = ntohs(((struct sockaddr_in6*)name)->sin6_port); |
476 | 0 | break; |
477 | 0 | } |
478 | 0 | case AF_INET: { |
479 | 0 | uv_ip4_name((struct sockaddr_in*)name, addrstr, INET6_ADDRSTRLEN + 1); |
480 | 0 | port = ntohs(((struct sockaddr_in*)name)->sin_port); |
481 | 0 | break; |
482 | 0 | } |
483 | 0 | default: |
484 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
485 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); |
486 | 0 | return; |
487 | 0 | break; |
488 | 0 | } |
489 | 0 | MVMROOT(tc, arr, { |
490 | 0 | port_o = MVM_repr_box_int(tc, tc->instance->boot_types.BOOTInt, port); |
491 | 0 | MVMROOT(tc, port_o, { |
492 | 0 | host_o = (MVMObject *)MVM_repr_box_str(tc, tc->instance->boot_types.BOOTStr, |
493 | 0 | MVM_string_ascii_decode_nt(tc, tc->instance->VMString, addrstr)); |
494 | 0 | }); |
495 | 0 | }); |
496 | 0 | MVM_repr_push_o(tc, arr, host_o); |
497 | 0 | MVM_repr_push_o(tc, arr, port_o); |
498 | 0 | } |
499 | | |
500 | | /* Info we convey about a connection attempt task. */ |
501 | | typedef struct { |
502 | | struct sockaddr *dest; |
503 | | uv_tcp_t *socket; |
504 | | uv_connect_t *connect; |
505 | | MVMThreadContext *tc; |
506 | | int work_idx; |
507 | | } ConnectInfo; |
508 | | |
509 | | /* When a connection takes place, need to send result. */ |
510 | 0 | static void on_connect(uv_connect_t* req, int status) { |
511 | 0 | ConnectInfo *ci = (ConnectInfo *)req->data; |
512 | 0 | MVMThreadContext *tc = ci->tc; |
513 | 0 | MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); |
514 | 0 | MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, ci->work_idx); |
515 | 0 | MVM_repr_push_o(tc, arr, t->body.schedulee); |
516 | 0 | if (status >= 0) { |
517 | 0 | /* Allocate and set up handle. */ |
518 | 0 | MVMROOT2(tc, arr, t, { |
519 | 0 | MVMOSHandle *result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO); |
520 | 0 | MVMIOAsyncSocketData *data = MVM_calloc(1, sizeof(MVMIOAsyncSocketData)); |
521 | 0 | data->handle = (uv_stream_t *)ci->socket; |
522 | 0 | result->body.ops = &op_table; |
523 | 0 | result->body.data = data; |
524 | 0 | MVM_repr_push_o(tc, arr, (MVMObject *)result); |
525 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
526 | 0 | { |
527 | 0 | struct sockaddr_storage sockaddr; |
528 | 0 | int name_len = sizeof(struct sockaddr_storage); |
529 | 0 |
|
530 | 0 | uv_tcp_getpeername(ci->socket, (struct sockaddr *)&sockaddr, &name_len); |
531 | 0 | push_name_and_port(tc, &sockaddr, arr); |
532 | 0 |
|
533 | 0 | uv_tcp_getsockname(ci->socket, (struct sockaddr *)&sockaddr, &name_len); |
534 | 0 | push_name_and_port(tc, &sockaddr, arr); |
535 | 0 | } |
536 | 0 | }); |
537 | 0 | } |
538 | 0 | else { |
539 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); |
540 | 0 | MVMROOT2(tc, arr, t, { |
541 | 0 | MVMString *msg_str = MVM_string_ascii_decode_nt(tc, |
542 | 0 | tc->instance->VMString, uv_strerror(status)); |
543 | 0 | MVMObject *msg_box = MVM_repr_box_str(tc, |
544 | 0 | tc->instance->boot_types.BOOTStr, msg_str); |
545 | 0 | MVM_repr_push_o(tc, arr, msg_box); |
546 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
547 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); |
548 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
549 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); |
550 | 0 | }); |
551 | 0 | } |
552 | 0 | MVM_repr_push_o(tc, t->body.queue, arr); |
553 | 0 | MVM_free(req); |
554 | 0 | MVM_io_eventloop_remove_active_work(tc, &(ci->work_idx)); |
555 | 0 | } |
556 | | |
557 | | /* Initilalize the connection on the event loop. */ |
558 | 0 | static void connect_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { |
559 | 0 | int r; |
560 | 0 |
|
561 | 0 | /* Add to work in progress. */ |
562 | 0 | ConnectInfo *ci = (ConnectInfo *)data; |
563 | 0 | ci->tc = tc; |
564 | 0 | ci->work_idx = MVM_io_eventloop_add_active_work(tc, async_task); |
565 | 0 |
|
566 | 0 | /* Create and initialize socket and connection. */ |
567 | 0 | ci->socket = MVM_malloc(sizeof(uv_tcp_t)); |
568 | 0 | ci->connect = MVM_malloc(sizeof(uv_connect_t)); |
569 | 0 | ci->connect->data = data; |
570 | 0 | if ((r = uv_tcp_init(loop, ci->socket)) < 0 || |
571 | 0 | (r = uv_tcp_connect(ci->connect, ci->socket, ci->dest, on_connect)) < 0) { |
572 | 0 | /* Error; need to notify. */ |
573 | 0 | MVMROOT(tc, async_task, { |
574 | 0 | MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); |
575 | 0 | MVMAsyncTask *t = (MVMAsyncTask *)async_task; |
576 | 0 | MVM_repr_push_o(tc, arr, t->body.schedulee); |
577 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); |
578 | 0 | MVMROOT(tc, arr, { |
579 | 0 | MVMString *msg_str = MVM_string_ascii_decode_nt(tc, |
580 | 0 | tc->instance->VMString, uv_strerror(r)); |
581 | 0 | MVMObject *msg_box = MVM_repr_box_str(tc, |
582 | 0 | tc->instance->boot_types.BOOTStr, msg_str); |
583 | 0 | MVM_repr_push_o(tc, arr, msg_box); |
584 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
585 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); |
586 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
587 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); |
588 | 0 | }); |
589 | 0 | MVM_repr_push_o(tc, t->body.queue, arr); |
590 | 0 | }); |
591 | 0 |
|
592 | 0 | /* Cleanup handles. */ |
593 | 0 | MVM_free(ci->connect); |
594 | 0 | ci->connect = NULL; |
595 | 0 | uv_close((uv_handle_t *)ci->socket, free_on_close_cb); |
596 | 0 | ci->socket = NULL; |
597 | 0 | MVM_io_eventloop_remove_active_work(tc, &(ci->work_idx)); |
598 | 0 | } |
599 | 0 | } |
600 | | |
601 | | /* Frees info for a connection task. */ |
602 | 0 | static void connect_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) { |
603 | 0 | if (data) { |
604 | 0 | ConnectInfo *ci = (ConnectInfo *)data; |
605 | 0 | if (ci->dest) |
606 | 0 | MVM_free(ci->dest); |
607 | 0 | MVM_free(ci); |
608 | 0 | } |
609 | 0 | } |
610 | | |
611 | | /* Operations table for async connect task. */ |
612 | | static const MVMAsyncTaskOps connect_op_table = { |
613 | | connect_setup, |
614 | | NULL, |
615 | | NULL, |
616 | | NULL, |
617 | | connect_gc_free |
618 | | }; |
619 | | |
620 | | /* Sets off an asynchronous socket connection. */ |
621 | | MVMObject * MVM_io_socket_connect_async(MVMThreadContext *tc, MVMObject *queue, |
622 | | MVMObject *schedulee, MVMString *host, |
623 | 0 | MVMint64 port, MVMObject *async_type) { |
624 | 0 | MVMAsyncTask *task; |
625 | 0 | ConnectInfo *ci; |
626 | 0 | struct sockaddr *dest; |
627 | 0 |
|
628 | 0 | /* Validate REPRs. */ |
629 | 0 | if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue) |
630 | 0 | MVM_exception_throw_adhoc(tc, |
631 | 0 | "asyncconnect target queue must have ConcBlockingQueue REPR"); |
632 | 0 | if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask) |
633 | 0 | MVM_exception_throw_adhoc(tc, |
634 | 0 | "asyncconnect result type must have REPR AsyncTask"); |
635 | 0 |
|
636 | 0 | /* Resolve hostname. (Could be done asynchronously too.) */ |
637 | 0 | MVMROOT3(tc, queue, schedulee, async_type, { |
638 | 0 | dest = MVM_io_resolve_host_name(tc, host, port); |
639 | 0 | }); |
640 | 0 |
|
641 | 0 | /* Create async task handle. */ |
642 | 0 | MVMROOT2(tc, queue, schedulee, { |
643 | 0 | task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type); |
644 | 0 | }); |
645 | 0 | MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue); |
646 | 0 | MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee); |
647 | 0 | task->body.ops = &connect_op_table; |
648 | 0 | ci = MVM_calloc(1, sizeof(ConnectInfo)); |
649 | 0 | ci->dest = dest; |
650 | 0 | task->body.data = ci; |
651 | 0 |
|
652 | 0 | /* Hand the task off to the event loop. */ |
653 | 0 | MVMROOT(tc, task, { |
654 | 0 | MVM_io_eventloop_queue_work(tc, (MVMObject *)task); |
655 | 0 | }); |
656 | 0 |
|
657 | 0 | return (MVMObject *)task; |
658 | 0 | } |
659 | | |
660 | | /* Info we convey about a socket listen task. */ |
661 | | typedef struct { |
662 | | struct sockaddr *dest; |
663 | | uv_tcp_t *socket; |
664 | | MVMThreadContext *tc; |
665 | | int work_idx; |
666 | | int backlog; |
667 | | } ListenInfo; |
668 | | |
669 | | |
670 | | /* Handles an incoming connection. */ |
671 | 0 | static void on_connection(uv_stream_t *server, int status) { |
672 | 0 | ListenInfo *li = (ListenInfo *)server->data; |
673 | 0 | MVMThreadContext *tc = li->tc; |
674 | 0 | MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); |
675 | 0 | MVMAsyncTask *t = MVM_io_eventloop_get_active_work(tc, li->work_idx); |
676 | 0 |
|
677 | 0 | uv_tcp_t *client = MVM_malloc(sizeof(uv_tcp_t)); |
678 | 0 | int r; |
679 | 0 | uv_tcp_init(tc->loop, client); |
680 | 0 |
|
681 | 0 | MVM_repr_push_o(tc, arr, t->body.schedulee); |
682 | 0 | if ((r = uv_accept(server, (uv_stream_t *)client)) == 0) { |
683 | 0 | /* Allocate and set up handle. */ |
684 | 0 | MVMROOT2(tc, arr, t, { |
685 | 0 | MVMOSHandle *result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO); |
686 | 0 | MVMIOAsyncSocketData *data = MVM_calloc(1, sizeof(MVMIOAsyncSocketData)); |
687 | 0 | data->handle = (uv_stream_t *)client; |
688 | 0 | result->body.ops = &op_table; |
689 | 0 | result->body.data = data; |
690 | 0 |
|
691 | 0 | MVM_repr_push_o(tc, arr, (MVMObject *)result); |
692 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
693 | 0 |
|
694 | 0 | { |
695 | 0 | struct sockaddr_storage sockaddr; |
696 | 0 | int name_len = sizeof(struct sockaddr_storage); |
697 | 0 |
|
698 | 0 | uv_tcp_getpeername(client, (struct sockaddr *)&sockaddr, &name_len); |
699 | 0 | push_name_and_port(tc, &sockaddr, arr); |
700 | 0 |
|
701 | 0 | uv_tcp_getsockname(client, (struct sockaddr *)&sockaddr, &name_len); |
702 | 0 | push_name_and_port(tc, &sockaddr, arr); |
703 | 0 | } |
704 | 0 | }); |
705 | 0 | } |
706 | 0 | else { |
707 | 0 | uv_close((uv_handle_t*)client, NULL); |
708 | 0 | MVM_free(client); |
709 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); |
710 | 0 | MVMROOT2(tc, arr, t, { |
711 | 0 | MVMString *msg_str = MVM_string_ascii_decode_nt(tc, |
712 | 0 | tc->instance->VMString, uv_strerror(r)); |
713 | 0 | MVMObject *msg_box = MVM_repr_box_str(tc, |
714 | 0 | tc->instance->boot_types.BOOTStr, msg_str); |
715 | 0 | MVM_repr_push_o(tc, arr, msg_box); |
716 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
717 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); |
718 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
719 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); |
720 | 0 | }); |
721 | 0 | } |
722 | 0 | MVM_repr_push_o(tc, t->body.queue, arr); |
723 | 0 | } |
724 | | |
725 | | /* Sets up a socket listener. */ |
726 | 0 | static void listen_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { |
727 | 0 | int r; |
728 | 0 |
|
729 | 0 | /* Add to work in progress. */ |
730 | 0 | ListenInfo *li = (ListenInfo *)data; |
731 | 0 | li->tc = tc; |
732 | 0 | li->work_idx = MVM_io_eventloop_add_active_work(tc, async_task); |
733 | 0 |
|
734 | 0 | /* Create and initialize socket and connection, and start listening. */ |
735 | 0 | li->socket = MVM_malloc(sizeof(uv_tcp_t)); |
736 | 0 | li->socket->data = data; |
737 | 0 | if ((r = uv_tcp_init(loop, li->socket)) < 0 || |
738 | 0 | (r = uv_tcp_bind(li->socket, li->dest, 0)) < 0 || |
739 | 0 | (r = uv_listen((uv_stream_t *)li->socket, li->backlog, on_connection))) { |
740 | 0 | /* Error; need to notify. */ |
741 | 0 | MVMROOT(tc, async_task, { |
742 | 0 | MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); |
743 | 0 | MVMAsyncTask *t = (MVMAsyncTask *)async_task; |
744 | 0 | MVM_repr_push_o(tc, arr, t->body.schedulee); |
745 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO); |
746 | 0 | MVMROOT(tc, arr, { |
747 | 0 | MVMString *msg_str = MVM_string_ascii_decode_nt(tc, |
748 | 0 | tc->instance->VMString, uv_strerror(r)); |
749 | 0 | MVMObject *msg_box = MVM_repr_box_str(tc, |
750 | 0 | tc->instance->boot_types.BOOTStr, msg_str); |
751 | 0 |
|
752 | 0 | MVM_repr_push_o(tc, arr, msg_box); |
753 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
754 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); |
755 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr); |
756 | 0 | MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt); |
757 | 0 | }); |
758 | 0 | MVM_repr_push_o(tc, t->body.queue, arr); |
759 | 0 | }); |
760 | 0 | uv_close((uv_handle_t *)li->socket, free_on_close_cb); |
761 | 0 | li->socket = NULL; |
762 | 0 | MVM_io_eventloop_remove_active_work(tc, &(li->work_idx)); |
763 | 0 | return; |
764 | 0 | } |
765 | 0 | } |
766 | | |
767 | | /* Stops listening. */ |
768 | 0 | static void on_listen_cancelled(uv_handle_t *handle) { |
769 | 0 | ListenInfo *li = (ListenInfo *)handle->data; |
770 | 0 | MVMThreadContext *tc = li->tc; |
771 | 0 | MVM_io_eventloop_send_cancellation_notification(tc, |
772 | 0 | MVM_io_eventloop_get_active_work(tc, li->work_idx)); |
773 | 0 | MVM_io_eventloop_remove_active_work(tc, &(li->work_idx)); |
774 | 0 | } |
775 | 0 | static void listen_cancel(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) { |
776 | 0 | ListenInfo *li = (ListenInfo *)data; |
777 | 0 | if (li->socket) { |
778 | 0 | uv_close((uv_handle_t *)li->socket, on_listen_cancelled); |
779 | 0 | li->socket = NULL; |
780 | 0 | } |
781 | 0 | } |
782 | | |
783 | | /* Frees info for a listen task. */ |
784 | 0 | static void listen_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) { |
785 | 0 | if (data) { |
786 | 0 | ListenInfo *li = (ListenInfo *)data; |
787 | 0 | if (li->dest) |
788 | 0 | MVM_free(li->dest); |
789 | 0 | MVM_free(li); |
790 | 0 | } |
791 | 0 | } |
792 | | |
793 | | /* Operations table for async listen task. */ |
794 | | static const MVMAsyncTaskOps listen_op_table = { |
795 | | listen_setup, |
796 | | NULL, |
797 | | listen_cancel, |
798 | | NULL, |
799 | | listen_gc_free |
800 | | }; |
801 | | |
802 | | /* Initiates an async socket listener. */ |
803 | | MVMObject * MVM_io_socket_listen_async(MVMThreadContext *tc, MVMObject *queue, |
804 | | MVMObject *schedulee, MVMString *host, |
805 | 0 | MVMint64 port, MVMint32 backlog, MVMObject *async_type) { |
806 | 0 | MVMAsyncTask *task; |
807 | 0 | ListenInfo *li; |
808 | 0 | struct sockaddr *dest; |
809 | 0 |
|
810 | 0 | /* Validate REPRs. */ |
811 | 0 | if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue) |
812 | 0 | MVM_exception_throw_adhoc(tc, |
813 | 0 | "asynclisten target queue must have ConcBlockingQueue REPR"); |
814 | 0 | if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask) |
815 | 0 | MVM_exception_throw_adhoc(tc, |
816 | 0 | "asynclisten result type must have REPR AsyncTask"); |
817 | 0 |
|
818 | 0 | /* Resolve hostname. (Could be done asynchronously too.) */ |
819 | 0 | MVMROOT3(tc, queue, schedulee, async_type, { |
820 | 0 | dest = MVM_io_resolve_host_name(tc, host, port); |
821 | 0 | }); |
822 | 0 |
|
823 | 0 | /* Create async task handle. */ |
824 | 0 | MVMROOT2(tc, queue, schedulee, { |
825 | 0 | task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type); |
826 | 0 | }); |
827 | 0 | MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue); |
828 | 0 | MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee); |
829 | 0 | task->body.ops = &listen_op_table; |
830 | 0 | li = MVM_calloc(1, sizeof(ListenInfo)); |
831 | 0 | li->dest = dest; |
832 | 0 | li->backlog = backlog; |
833 | 0 | task->body.data = li; |
834 | 0 |
|
835 | 0 | /* Hand the task off to the event loop. */ |
836 | 0 | MVMROOT(tc, task, { |
837 | 0 | MVM_io_eventloop_queue_work(tc, (MVMObject *)task); |
838 | 0 | }); |
839 | 0 |
|
840 | 0 | return (MVMObject *)task; |
841 | 0 | } |