Coverage Report

Created: 2017-04-15 07:07

/home/travis/build/MoarVM/MoarVM/src/io/asyncsocket.c
Line
Count
Source (jump to first uncovered line)
1
#include "moar.h"
2
3
/* Data that we keep for an asynchronous socket handle. */
4
typedef struct {
5
    /* The libuv handle to the socket. */
6
    uv_stream_t *handle;
7
8
    /* Decode stream, for turning bytes into strings. */
9
    MVMDecodeStream *ds;
10
} MVMIOAsyncSocketData;
11
12
/* Info we convey about a read task. */
13
typedef struct {
14
    MVMOSHandle      *handle;
15
    MVMDecodeStream  *ds;
16
    MVMObject        *buf_type;
17
    int               seq_number;
18
    MVMThreadContext *tc;
19
    int               work_idx;
20
} ReadInfo;
21
22
/* Allocates a buffer of the suggested size. */
23
0
static void on_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
24
0
    size_t size = suggested_size > 0 ? suggested_size : 4;
25
0
    buf->base   = MVM_malloc(size);
26
0
    buf->len    = size;
27
0
}
28
29
/* Callback used to simply free memory on close. */
30
0
static void free_on_close_cb(uv_handle_t *handle) {
31
0
    MVM_free(handle);
32
0
}
33
34
/* Read handler. */
35
0
static void on_read(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
36
0
    ReadInfo         *ri  = (ReadInfo *)handle->data;
37
0
    MVMThreadContext *tc  = ri->tc;
38
0
    MVMObject        *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
39
0
    MVMAsyncTask     *t   = MVM_io_eventloop_get_active_work(tc, ri->work_idx);
40
0
    MVM_repr_push_o(tc, arr, t->body.schedulee);
41
0
    if (nread >= 0) {
42
0
        MVMROOT(tc, t, {
43
0
        MVMROOT(tc, arr, {
44
0
            /* Push the sequence number. */
45
0
            MVMObject *seq_boxed = MVM_repr_box_int(tc,
46
0
                tc->instance->boot_types.BOOTInt, ri->seq_number++);
47
0
            MVM_repr_push_o(tc, arr, seq_boxed);
48
0
49
0
            /* Either need to produce a buffer or decode characters. */
50
0
            if (ri->ds) {
51
0
                MVMString *str;
52
0
                MVMObject *boxed_str;
53
0
                MVM_string_decodestream_add_bytes(tc, ri->ds, buf->base, nread);
54
0
                str = MVM_string_decodestream_get_all(tc, ri->ds);
55
0
                boxed_str = MVM_repr_box_str(tc, tc->instance->boot_types.BOOTStr, str);
56
0
                MVM_repr_push_o(tc, arr, boxed_str);
57
0
            }
58
0
            else {
59
0
                MVMArray *res_buf      = (MVMArray *)MVM_repr_alloc_init(tc, ri->buf_type);
60
0
                res_buf->body.slots.i8 = (MVMint8 *)buf->base;
61
0
                res_buf->body.start    = 0;
62
0
                res_buf->body.ssize    = buf->len;
63
0
                res_buf->body.elems    = nread;
64
0
                MVM_repr_push_o(tc, arr, (MVMObject *)res_buf);
65
0
            }
66
0
67
0
            /* Finally, no error. */
68
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
69
0
        });
70
0
        });
71
0
    }
72
0
    else if (nread == UV_EOF) {
73
0
        MVMROOT(tc, t, {
74
0
        MVMROOT(tc, arr, {
75
0
            MVMObject *final = MVM_repr_box_int(tc,
76
0
                tc->instance->boot_types.BOOTInt, ri->seq_number);
77
0
            MVM_repr_push_o(tc, arr, final);
78
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
79
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
80
0
        });
81
0
        });
82
0
        if (buf->base)
83
0
            MVM_free(buf->base);
84
0
        uv_read_stop(handle);
85
0
        MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx));
86
0
    }
87
0
    else {
88
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
89
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
90
0
        MVMROOT(tc, t, {
91
0
        MVMROOT(tc, arr, {
92
0
            MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
93
0
                tc->instance->VMString, uv_strerror(nread));
94
0
            MVMObject *msg_box = MVM_repr_box_str(tc,
95
0
                tc->instance->boot_types.BOOTStr, msg_str);
96
0
            MVM_repr_push_o(tc, arr, msg_box);
97
0
        });
98
0
        });
99
0
        if (buf->base)
100
0
            MVM_free(buf->base);
101
0
        uv_read_stop(handle);
102
0
        MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx));
103
0
    }
104
0
    MVM_repr_push_o(tc, t->body.queue, arr);
105
0
}
106
107
/* Does setup work for setting up asynchronous reads. */
108
0
static void read_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
109
0
    MVMIOAsyncSocketData *handle_data;
110
0
    ReadInfo             *ri;
111
0
    int                   r;
112
0
113
0
    /* Ensure not closed. */
114
0
    ri = (ReadInfo *)data;
115
0
    handle_data = (MVMIOAsyncSocketData *)ri->handle->body.data;
116
0
    if (!handle_data->handle || uv_is_closing((uv_handle_t *)handle_data->handle)) {
117
0
        /* Closed, so immediately send done. */
118
0
        MVMAsyncTask *t = (MVMAsyncTask *)async_task;
119
0
        MVMROOT(tc, t, {
120
0
            MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
121
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
122
0
            MVMROOT(tc, arr, {
123
0
                MVMObject *final = MVM_repr_box_int(tc,
124
0
                    tc->instance->boot_types.BOOTInt, ri->seq_number);
125
0
                MVM_repr_push_o(tc, arr, final);
126
0
            });
127
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
128
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
129
0
            MVM_repr_push_o(tc, t->body.queue, arr);
130
0
        });
131
0
        return;
132
0
    }
133
0
134
0
    /* Add to work in progress. */
135
0
    ri->tc        = tc;
136
0
    ri->work_idx  = MVM_io_eventloop_add_active_work(tc, async_task);
137
0
138
0
    /* Start reading the stream. */
139
0
    handle_data->handle->data = data;
140
0
    if ((r = uv_read_start(handle_data->handle, on_alloc, on_read)) < 0) {
141
0
        /* Error; need to notify. */
142
0
        MVMROOT(tc, async_task, {
143
0
            MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
144
0
            MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
145
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
146
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
147
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
148
0
            MVMROOT(tc, arr, {
149
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
150
0
                    tc->instance->VMString, uv_strerror(r));
151
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
152
0
                    tc->instance->boot_types.BOOTStr, msg_str);
153
0
                MVM_repr_push_o(tc, arr, msg_box);
154
0
            });
155
0
            MVM_repr_push_o(tc, t->body.queue, arr);
156
0
        });
157
0
        MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx));
158
0
    }
159
0
}
160
161
/* Marks objects for a read task. */
162
0
static void read_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) {
163
0
    ReadInfo *ri = (ReadInfo *)data;
164
0
    MVM_gc_worklist_add(tc, worklist, &ri->buf_type);
165
0
    MVM_gc_worklist_add(tc, worklist, &ri->handle);
166
0
}
167
168
/* Frees info for a read task. */
169
0
static void read_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
170
0
    if (data) {
171
0
        ReadInfo *ri = (ReadInfo *)data;
172
0
        if (ri->ds)
173
0
            MVM_string_decodestream_destroy(tc, ri->ds);
174
0
        MVM_free(data);
175
0
    }
176
0
}
177
178
/* Operations table for async read task. */
179
static const MVMAsyncTaskOps read_op_table = {
180
    read_setup,
181
    NULL,
182
    read_gc_mark,
183
    read_gc_free
184
};
185
186
static MVMAsyncTask * read_chars(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue,
187
0
                                 MVMObject *schedulee, MVMObject *async_type) {
188
0
    MVMAsyncTask *task;
189
0
    ReadInfo    *ri;
190
0
191
0
    /* Validate REPRs. */
192
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
193
0
        MVM_exception_throw_adhoc(tc,
194
0
            "asyncreadchars target queue must have ConcBlockingQueue REPR");
195
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
196
0
        MVM_exception_throw_adhoc(tc,
197
0
            "asyncreadchars result type must have REPR AsyncTask");
198
0
199
0
    /* Create async task handle. */
200
0
    MVMROOT(tc, queue, {
201
0
    MVMROOT(tc, schedulee, {
202
0
    MVMROOT(tc, h, {
203
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
204
0
    });
205
0
    });
206
0
    });
207
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
208
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
209
0
    task->body.ops  = &read_op_table;
210
0
    ri              = MVM_calloc(1, sizeof(ReadInfo));
211
0
    ri->ds          = MVM_string_decodestream_create(tc, MVM_encoding_type_utf8, 0, 0);
212
0
    MVM_ASSIGN_REF(tc, &(task->common.header), ri->handle, h);
213
0
    task->body.data = ri;
214
0
215
0
    /* Hand the task off to the event loop. */
216
0
    MVMROOT(tc, task, {
217
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
218
0
    });
219
0
220
0
    return task;
221
0
}
222
223
static MVMAsyncTask * read_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue,
224
0
                                 MVMObject *schedulee, MVMObject *buf_type, MVMObject *async_type) {
225
0
    MVMAsyncTask *task;
226
0
    ReadInfo    *ri;
227
0
228
0
    /* Validate REPRs. */
229
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
230
0
        MVM_exception_throw_adhoc(tc,
231
0
            "asyncreadbytes target queue must have ConcBlockingQueue REPR");
232
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
233
0
        MVM_exception_throw_adhoc(tc,
234
0
            "asyncreadbytes result type must have REPR AsyncTask");
235
0
    if (REPR(buf_type)->ID == MVM_REPR_ID_VMArray) {
236
0
        MVMint32 slot_type = ((MVMArrayREPRData *)STABLE(buf_type)->REPR_data)->slot_type;
237
0
        if (slot_type != MVM_ARRAY_U8 && slot_type != MVM_ARRAY_I8)
238
0
            MVM_exception_throw_adhoc(tc, "asyncreadbytes buffer type must be an array of uint8 or int8");
239
0
    }
240
0
    else {
241
0
        MVM_exception_throw_adhoc(tc, "asyncreadbytes buffer type must be an array");
242
0
    }
243
0
244
0
    /* Create async task handle. */
245
0
    MVMROOT(tc, queue, {
246
0
    MVMROOT(tc, schedulee, {
247
0
    MVMROOT(tc, h, {
248
0
    MVMROOT(tc, buf_type, {
249
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
250
0
    });
251
0
    });
252
0
    });
253
0
    });
254
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
255
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
256
0
    task->body.ops  = &read_op_table;
257
0
    ri              = MVM_calloc(1, sizeof(ReadInfo));
258
0
    MVM_ASSIGN_REF(tc, &(task->common.header), ri->buf_type, buf_type);
259
0
    MVM_ASSIGN_REF(tc, &(task->common.header), ri->handle, h);
260
0
    task->body.data = ri;
261
0
262
0
    /* Hand the task off to the event loop. */
263
0
    MVMROOT(tc, task, {
264
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
265
0
    });
266
0
267
0
    return task;
268
0
}
269
270
/* Info we convey about a write task. */
271
typedef struct {
272
    MVMOSHandle      *handle;
273
    MVMString        *str_data;
274
    MVMObject        *buf_data;
275
    uv_write_t       *req;
276
    uv_buf_t          buf;
277
    MVMThreadContext *tc;
278
    int               work_idx;
279
} WriteInfo;
280
281
/* Completion handler for an asynchronous write. */
282
0
static void on_write(uv_write_t *req, int status) {
283
0
    WriteInfo        *wi  = (WriteInfo *)req->data;
284
0
    MVMThreadContext *tc  = wi->tc;
285
0
    MVMObject        *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
286
0
    MVMAsyncTask     *t   = MVM_io_eventloop_get_active_work(tc, wi->work_idx);
287
0
    MVM_repr_push_o(tc, arr, t->body.schedulee);
288
0
    if (status >= 0) {
289
0
        MVMROOT(tc, arr, {
290
0
        MVMROOT(tc, t, {
291
0
            MVMObject *bytes_box = MVM_repr_box_int(tc,
292
0
                tc->instance->boot_types.BOOTInt,
293
0
                wi->buf.len);
294
0
            MVM_repr_push_o(tc, arr, bytes_box);
295
0
        });
296
0
        });
297
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
298
0
    }
299
0
    else {
300
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
301
0
        MVMROOT(tc, arr, {
302
0
        MVMROOT(tc, t, {
303
0
            MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
304
0
                tc->instance->VMString, uv_strerror(status));
305
0
            MVMObject *msg_box = MVM_repr_box_str(tc,
306
0
                tc->instance->boot_types.BOOTStr, msg_str);
307
0
            MVM_repr_push_o(tc, arr, msg_box);
308
0
        });
309
0
        });
310
0
    }
311
0
    MVM_repr_push_o(tc, t->body.queue, arr);
312
0
    if (wi->str_data)
313
0
        MVM_free(wi->buf.base);
314
0
    MVM_free(wi->req);
315
0
    MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx));
316
0
}
317
318
/* Does setup work for an asynchronous write. */
319
0
static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
320
0
    MVMIOAsyncSocketData *handle_data;
321
0
    WriteInfo            *wi;
322
0
    char                 *output;
323
0
    int                   output_size, r;
324
0
325
0
    /* Ensure not closed. */
326
0
    wi = (WriteInfo *)data;
327
0
    handle_data = (MVMIOAsyncSocketData *)wi->handle->body.data;
328
0
    if (!handle_data->handle || uv_is_closing((uv_handle_t *)handle_data->handle)) {
329
0
        MVMROOT(tc, async_task, {
330
0
            MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
331
0
            MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
332
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
333
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
334
0
            MVMROOT(tc, arr, {
335
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
336
0
                    tc->instance->VMString, "Cannot write to a closed socket");
337
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
338
0
                    tc->instance->boot_types.BOOTStr, msg_str);
339
0
                MVM_repr_push_o(tc, arr, msg_box);
340
0
            });
341
0
            MVM_repr_push_o(tc, t->body.queue, arr);
342
0
        });
343
0
        return;
344
0
    }
345
0
346
0
    /* Add to work in progress. */
347
0
    wi->tc = tc;
348
0
    wi->work_idx = MVM_io_eventloop_add_active_work(tc, async_task);
349
0
350
0
    /* Encode the string, or extract buf data. */
351
0
    if (wi->str_data) {
352
0
        MVMuint64 output_size_64;
353
0
        output = MVM_string_utf8_encode(tc, wi->str_data, &output_size_64, 0);
354
0
        output_size = (int)output_size_64;
355
0
    }
356
0
    else {
357
0
        MVMArray *buffer = (MVMArray *)wi->buf_data;
358
0
        output = (char *)(buffer->body.slots.i8 + buffer->body.start);
359
0
        output_size = (int)buffer->body.elems;
360
0
    }
361
0
362
0
    /* Create and initialize write request. */
363
0
    wi->req           = MVM_malloc(sizeof(uv_write_t));
364
0
    wi->buf           = uv_buf_init(output, output_size);
365
0
    wi->req->data     = data;
366
0
367
0
    if ((r = uv_write(wi->req, handle_data->handle, &(wi->buf), 1, on_write)) < 0) {
368
0
        /* Error; need to notify. */
369
0
        MVMROOT(tc, async_task, {
370
0
            MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
371
0
            MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
372
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
373
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
374
0
            MVMROOT(tc, arr, {
375
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
376
0
                    tc->instance->VMString, uv_strerror(r));
377
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
378
0
                    tc->instance->boot_types.BOOTStr, msg_str);
379
0
                MVM_repr_push_o(tc, arr, msg_box);
380
0
            });
381
0
            MVM_repr_push_o(tc, t->body.queue, arr);
382
0
        });
383
0
384
0
        /* Cleanup handle. */
385
0
        MVM_free(wi->req);
386
0
        wi->req = NULL;
387
0
        MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx));
388
0
    }
389
0
}
390
391
/* Marks objects for a write task. */
392
0
static void write_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) {
393
0
    WriteInfo *wi = (WriteInfo *)data;
394
0
    MVM_gc_worklist_add(tc, worklist, &wi->handle);
395
0
    MVM_gc_worklist_add(tc, worklist, &wi->str_data);
396
0
    MVM_gc_worklist_add(tc, worklist, &wi->buf_data);
397
0
}
398
399
/* Frees info for a write task. */
400
0
static void write_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
401
0
    if (data)
402
0
        MVM_free(data);
403
0
}
404
405
/* Operations table for async write task. */
406
static const MVMAsyncTaskOps write_op_table = {
407
    write_setup,
408
    NULL,
409
    write_gc_mark,
410
    write_gc_free
411
};
412
413
static MVMAsyncTask * write_str(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue,
414
0
                                MVMObject *schedulee, MVMString *s, MVMObject *async_type) {
415
0
    MVMAsyncTask *task;
416
0
    WriteInfo    *wi;
417
0
418
0
    /* Validate REPRs. */
419
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
420
0
        MVM_exception_throw_adhoc(tc,
421
0
            "asyncwritestr target queue must have ConcBlockingQueue REPR");
422
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
423
0
        MVM_exception_throw_adhoc(tc,
424
0
            "asyncwritestr result type must have REPR AsyncTask");
425
0
426
0
    /* Create async task handle. */
427
0
    MVMROOT(tc, queue, {
428
0
    MVMROOT(tc, schedulee, {
429
0
    MVMROOT(tc, h, {
430
0
    MVMROOT(tc, s, {
431
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
432
0
    });
433
0
    });
434
0
    });
435
0
    });
436
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
437
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
438
0
    task->body.ops  = &write_op_table;
439
0
    wi              = MVM_calloc(1, sizeof(WriteInfo));
440
0
    MVM_ASSIGN_REF(tc, &(task->common.header), wi->handle, h);
441
0
    MVM_ASSIGN_REF(tc, &(task->common.header), wi->str_data, s);
442
0
    task->body.data = wi;
443
0
444
0
    /* Hand the task off to the event loop. */
445
0
    MVMROOT(tc, task, {
446
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
447
0
    });
448
0
449
0
    return task;
450
0
}
451
452
static MVMAsyncTask * write_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue,
453
0
                                  MVMObject *schedulee, MVMObject *buffer, MVMObject *async_type) {
454
0
    MVMAsyncTask *task;
455
0
    WriteInfo    *wi;
456
0
457
0
    /* Validate REPRs. */
458
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
459
0
        MVM_exception_throw_adhoc(tc,
460
0
            "asyncwritebytes target queue must have ConcBlockingQueue REPR");
461
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
462
0
        MVM_exception_throw_adhoc(tc,
463
0
            "asyncwritebytes result type must have REPR AsyncTask");
464
0
    if (!IS_CONCRETE(buffer) || REPR(buffer)->ID != MVM_REPR_ID_VMArray)
465
0
        MVM_exception_throw_adhoc(tc, "asyncwritebytes requires a native array to read from");
466
0
    if (((MVMArrayREPRData *)STABLE(buffer)->REPR_data)->slot_type != MVM_ARRAY_U8
467
0
        && ((MVMArrayREPRData *)STABLE(buffer)->REPR_data)->slot_type != MVM_ARRAY_I8)
468
0
        MVM_exception_throw_adhoc(tc, "asyncwritebytes requires a native array of uint8 or int8");
469
0
470
0
    /* Create async task handle. */
471
0
    MVMROOT(tc, queue, {
472
0
    MVMROOT(tc, schedulee, {
473
0
    MVMROOT(tc, h, {
474
0
    MVMROOT(tc, buffer, {
475
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
476
0
    });
477
0
    });
478
0
    });
479
0
    });
480
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
481
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
482
0
    task->body.ops  = &write_op_table;
483
0
    wi              = MVM_calloc(1, sizeof(WriteInfo));
484
0
    MVM_ASSIGN_REF(tc, &(task->common.header), wi->handle, h);
485
0
    MVM_ASSIGN_REF(tc, &(task->common.header), wi->buf_data, buffer);
486
0
    task->body.data = wi;
487
0
488
0
    /* Hand the task off to the event loop. */
489
0
    MVMROOT(tc, task, {
490
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
491
0
    });
492
0
493
0
    return task;
494
0
}
495
496
/* Info we convey about a socket close task. */
497
typedef struct {
498
    MVMOSHandle *handle;
499
} CloseInfo;
500
501
/* Does an asynchronous close (since it must run on the event loop). */
502
0
static void close_perform(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
503
0
    CloseInfo *ci = (CloseInfo *)data;
504
0
    MVMIOAsyncSocketData *handle_data = (MVMIOAsyncSocketData *)ci->handle->body.data;
505
0
    uv_handle_t *handle = (uv_handle_t *)handle_data->handle;
506
0
    if (handle && !uv_is_closing(handle)) {
507
0
        handle_data->handle = NULL;
508
0
        uv_close(handle, free_on_close_cb);
509
0
    }
510
0
}
511
512
/* Marks objects for a close task. */
513
0
static void close_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) {
514
0
    CloseInfo *ci = (CloseInfo *)data;
515
0
    MVM_gc_worklist_add(tc, worklist, &ci->handle);
516
0
}
517
518
/* Frees info for a close task. */
519
0
static void close_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
520
0
    if (data)
521
0
        MVM_free(data);
522
0
}
523
524
/* Operations table for async close task. */
525
static const MVMAsyncTaskOps close_op_table = {
526
    close_perform,
527
    NULL,
528
    close_gc_mark,
529
    close_gc_free
530
};
531
532
0
static MVMint64 close_socket(MVMThreadContext *tc, MVMOSHandle *h) {
533
0
    MVMIOAsyncSocketData *data = (MVMIOAsyncSocketData *)h->body.data;
534
0
    MVMAsyncTask *task;
535
0
    CloseInfo *ci;
536
0
537
0
    MVMROOT(tc, h, {
538
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc,
539
0
            tc->instance->boot_types.BOOTAsync);
540
0
    });
541
0
    task->body.ops = &close_op_table;
542
0
    ci = MVM_calloc(1, sizeof(CloseInfo));
543
0
    MVM_ASSIGN_REF(tc, &(task->common.header), ci->handle, h);
544
0
    task->body.data = ci;
545
0
    MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
546
0
547
0
    return 0;
548
0
}
549
550
0
static void gc_free(MVMThreadContext *tc, MVMObject *h, void *d) {
551
0
    MVMIOAsyncSocketData *data = (MVMIOAsyncSocketData *)d;
552
0
    if (data->ds) {
553
0
        MVM_string_decodestream_destroy(tc, data->ds);
554
0
        data->ds = NULL;
555
0
    }
556
0
}
557
558
/* IO ops table, populated with functions. */
559
static const MVMIOClosable      closable       = { close_socket };
560
static const MVMIOAsyncReadable async_readable = { read_chars, read_bytes };
561
static const MVMIOAsyncWritable async_writable = { write_str, write_bytes };
562
static const MVMIOOps op_table = {
563
    &closable,
564
    NULL,
565
    NULL,
566
    NULL,
567
    &async_readable,
568
    &async_writable,
569
    NULL,
570
    NULL,
571
    NULL,
572
    NULL,
573
    NULL,
574
    NULL,
575
    NULL,
576
    gc_free
577
};
578
579
/* Info we convey about a connection attempt task. */
580
typedef struct {
581
    struct sockaddr  *dest;
582
    uv_tcp_t         *socket;
583
    uv_connect_t     *connect;
584
    MVMThreadContext *tc;
585
    int               work_idx;
586
} ConnectInfo;
587
588
/* When a connection takes place, need to send result. */
589
0
static void on_connect(uv_connect_t* req, int status) {
590
0
    ConnectInfo      *ci  = (ConnectInfo *)req->data;
591
0
    MVMThreadContext *tc  = ci->tc;
592
0
    MVMObject        *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
593
0
    MVMAsyncTask     *t   = MVM_io_eventloop_get_active_work(tc, ci->work_idx);
594
0
    MVM_repr_push_o(tc, arr, t->body.schedulee);
595
0
    if (status >= 0) {
596
0
        /* Allocate and set up handle. */
597
0
        MVMROOT(tc, arr, {
598
0
        MVMROOT(tc, t, {
599
0
            MVMOSHandle          *result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO);
600
0
            MVMIOAsyncSocketData *data   = MVM_calloc(1, sizeof(MVMIOAsyncSocketData));
601
0
            data->handle                 = (uv_stream_t *)ci->socket;
602
0
            result->body.ops             = &op_table;
603
0
            result->body.data            = data;
604
0
            MVM_repr_push_o(tc, arr, (MVMObject *)result);
605
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
606
0
        });
607
0
        });
608
0
    }
609
0
    else {
610
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO);
611
0
        MVMROOT(tc, arr, {
612
0
        MVMROOT(tc, t, {
613
0
            MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
614
0
                tc->instance->VMString, uv_strerror(status));
615
0
            MVMObject *msg_box = MVM_repr_box_str(tc,
616
0
                tc->instance->boot_types.BOOTStr, msg_str);
617
0
            MVM_repr_push_o(tc, arr, msg_box);
618
0
        });
619
0
        });
620
0
    }
621
0
    MVM_repr_push_o(tc, t->body.queue, arr);
622
0
    MVM_free(req);
623
0
    MVM_io_eventloop_remove_active_work(tc, &(ci->work_idx));
624
0
}
625
626
/* Initilalize the connection on the event loop. */
627
0
static void connect_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
628
0
    int r;
629
0
630
0
    /* Add to work in progress. */
631
0
    ConnectInfo *ci = (ConnectInfo *)data;
632
0
    ci->tc        = tc;
633
0
    ci->work_idx  = MVM_io_eventloop_add_active_work(tc, async_task);
634
0
635
0
    /* Create and initialize socket and connection. */
636
0
    ci->socket        = MVM_malloc(sizeof(uv_tcp_t));
637
0
    ci->connect       = MVM_malloc(sizeof(uv_connect_t));
638
0
    ci->connect->data = data;
639
0
    if ((r = uv_tcp_init(loop, ci->socket)) < 0 ||
640
0
        (r = uv_tcp_connect(ci->connect, ci->socket, ci->dest, on_connect)) < 0) {
641
0
        /* Error; need to notify. */
642
0
        MVMROOT(tc, async_task, {
643
0
            MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
644
0
            MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
645
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
646
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO);
647
0
            MVMROOT(tc, arr, {
648
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
649
0
                    tc->instance->VMString, uv_strerror(r));
650
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
651
0
                    tc->instance->boot_types.BOOTStr, msg_str);
652
0
                MVM_repr_push_o(tc, arr, msg_box);
653
0
            });
654
0
            MVM_repr_push_o(tc, t->body.queue, arr);
655
0
        });
656
0
657
0
        /* Cleanup handles. */
658
0
        MVM_free(ci->connect);
659
0
        ci->connect = NULL;
660
0
        uv_close((uv_handle_t *)ci->socket, free_on_close_cb);
661
0
        ci->socket = NULL;
662
0
        MVM_io_eventloop_remove_active_work(tc, &(ci->work_idx));
663
0
    }
664
0
}
665
666
/* Frees info for a connection task. */
667
0
static void connect_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
668
0
    if (data) {
669
0
        ConnectInfo *ci = (ConnectInfo *)data;
670
0
        if (ci->dest)
671
0
            MVM_free(ci->dest);
672
0
        MVM_free(ci);
673
0
    }
674
0
}
675
676
/* Operations table for async connect task. */
677
static const MVMAsyncTaskOps connect_op_table = {
678
    connect_setup,
679
    NULL,
680
    NULL,
681
    connect_gc_free
682
};
683
684
/* Sets off an asynchronous socket connection. */
685
MVMObject * MVM_io_socket_connect_async(MVMThreadContext *tc, MVMObject *queue,
686
                                        MVMObject *schedulee, MVMString *host,
687
0
                                        MVMint64 port, MVMObject *async_type) {
688
0
    MVMAsyncTask *task;
689
0
    ConnectInfo  *ci;
690
0
691
0
    /* Resolve hostname. (Could be done asynchronously too.) */
692
0
    struct sockaddr *dest = MVM_io_resolve_host_name(tc, host, port);
693
0
694
0
    /* Validate REPRs. */
695
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
696
0
        MVM_exception_throw_adhoc(tc,
697
0
            "asyncconnect target queue must have ConcBlockingQueue REPR");
698
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
699
0
        MVM_exception_throw_adhoc(tc,
700
0
            "asyncconnect result type must have REPR AsyncTask");
701
0
702
0
    /* Create async task handle. */
703
0
    MVMROOT(tc, queue, {
704
0
    MVMROOT(tc, schedulee, {
705
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
706
0
    });
707
0
    });
708
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
709
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
710
0
    task->body.ops  = &connect_op_table;
711
0
    ci              = MVM_calloc(1, sizeof(ConnectInfo));
712
0
    ci->dest        = dest;
713
0
    task->body.data = ci;
714
0
715
0
    /* Hand the task off to the event loop. */
716
0
    MVMROOT(tc, task, {
717
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
718
0
    });
719
0
720
0
    return (MVMObject *)task;
721
0
}
722
723
/* Info we convey about a socket listen task. */
724
typedef struct {
725
    struct sockaddr  *dest;
726
    uv_tcp_t         *socket;
727
    MVMThreadContext *tc;
728
    int               work_idx;
729
    int               backlog;
730
} ListenInfo;
731
732
/* Handles an incoming connection. */
733
0
static void on_connection(uv_stream_t *server, int status) {
734
0
    ListenInfo       *li     = (ListenInfo *)server->data;
735
0
    MVMThreadContext *tc     = li->tc;
736
0
    MVMObject        *arr    = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
737
0
    MVMAsyncTask     *t      = MVM_io_eventloop_get_active_work(tc, li->work_idx);
738
0
739
0
    uv_tcp_t         *client = MVM_malloc(sizeof(uv_tcp_t));
740
0
    int               r;
741
0
    uv_tcp_init(tc->loop, client);
742
0
743
0
    MVM_repr_push_o(tc, arr, t->body.schedulee);
744
0
    if ((r = uv_accept(server, (uv_stream_t *)client)) == 0) {
745
0
        /* Allocate and set up handle. */
746
0
        MVMROOT(tc, arr, {
747
0
        MVMROOT(tc, t, {
748
0
            MVMOSHandle          *result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO);
749
0
            MVMIOAsyncSocketData *data   = MVM_calloc(1, sizeof(MVMIOAsyncSocketData));
750
0
            data->handle                 = (uv_stream_t *)client;
751
0
            result->body.ops             = &op_table;
752
0
            result->body.data            = data;
753
0
            MVM_repr_push_o(tc, arr, (MVMObject *)result);
754
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
755
0
        });
756
0
        });
757
0
    }
758
0
    else {
759
0
        uv_close((uv_handle_t*)client, NULL);
760
0
        MVM_free(client);
761
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO);
762
0
        MVMROOT(tc, arr, {
763
0
        MVMROOT(tc, t, {
764
0
            MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
765
0
                tc->instance->VMString, uv_strerror(r));
766
0
            MVMObject *msg_box = MVM_repr_box_str(tc,
767
0
                tc->instance->boot_types.BOOTStr, msg_str);
768
0
            MVM_repr_push_o(tc, arr, msg_box);
769
0
        });
770
0
        });
771
0
    }
772
0
    MVM_repr_push_o(tc, t->body.queue, arr);
773
0
}
774
775
/* Sets up a socket listener. */
776
0
static void listen_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
777
0
    int r;
778
0
779
0
    /* Add to work in progress. */
780
0
    ListenInfo *li = (ListenInfo *)data;
781
0
    li->tc         = tc;
782
0
    li->work_idx   = MVM_io_eventloop_add_active_work(tc, async_task);
783
0
784
0
    /* Create and initialize socket and connection, and start listening. */
785
0
    li->socket        = MVM_malloc(sizeof(uv_tcp_t));
786
0
    li->socket->data  = data;
787
0
    if ((r = uv_tcp_init(loop, li->socket)) < 0 ||
788
0
        (r = uv_tcp_bind(li->socket, li->dest, 0)) < 0 ||
789
0
        (r = uv_listen((uv_stream_t *)li->socket, li->backlog, on_connection))) {
790
0
        /* Error; need to notify. */
791
0
        MVMROOT(tc, async_task, {
792
0
            MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
793
0
            MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
794
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
795
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO);
796
0
            MVMROOT(tc, arr, {
797
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
798
0
                    tc->instance->VMString, uv_strerror(r));
799
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
800
0
                    tc->instance->boot_types.BOOTStr, msg_str);
801
0
                MVM_repr_push_o(tc, arr, msg_box);
802
0
            });
803
0
            MVM_repr_push_o(tc, t->body.queue, arr);
804
0
        });
805
0
        uv_close((uv_handle_t *)li->socket, free_on_close_cb);
806
0
        li->socket = NULL;
807
0
        MVM_io_eventloop_remove_active_work(tc, &(li->work_idx));
808
0
        return;
809
0
    }
810
0
}
811
812
/* Stops listening. */
813
0
static void on_listen_cancelled(uv_handle_t *handle) {
814
0
    ListenInfo       *li = (ListenInfo *)handle->data;
815
0
    MVMThreadContext *tc = li->tc;
816
0
    MVM_io_eventloop_send_cancellation_notification(tc,
817
0
        MVM_io_eventloop_get_active_work(tc, li->work_idx));
818
0
    MVM_io_eventloop_remove_active_work(tc, &(li->work_idx));
819
0
}
820
0
static void listen_cancel(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
821
0
    ListenInfo *li = (ListenInfo *)data;
822
0
    uv_close((uv_handle_t *)li->socket, on_listen_cancelled);
823
0
}
824
825
/* Frees info for a listen task. */
826
0
static void listen_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
827
0
    if (data) {
828
0
        ListenInfo *li = (ListenInfo *)data;
829
0
        if (li->dest)
830
0
            MVM_free(li->dest);
831
0
        MVM_free(li);
832
0
    }
833
0
}
834
835
/* Operations table for async listen task. */
836
static const MVMAsyncTaskOps listen_op_table = {
837
    listen_setup,
838
    listen_cancel,
839
    NULL,
840
    listen_gc_free
841
};
842
843
/* Initiates an async socket listener. */
844
MVMObject * MVM_io_socket_listen_async(MVMThreadContext *tc, MVMObject *queue,
845
                                       MVMObject *schedulee, MVMString *host,
846
0
                                       MVMint64 port, MVMint32 backlog, MVMObject *async_type) {
847
0
    MVMAsyncTask *task;
848
0
    ListenInfo   *li;
849
0
850
0
    /* Resolve hostname. (Could be done asynchronously too.) */
851
0
    struct sockaddr *dest = MVM_io_resolve_host_name(tc, host, port);
852
0
853
0
    /* Validate REPRs. */
854
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
855
0
        MVM_exception_throw_adhoc(tc,
856
0
            "asynclisten target queue must have ConcBlockingQueue REPR");
857
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
858
0
        MVM_exception_throw_adhoc(tc,
859
0
            "asynclisten result type must have REPR AsyncTask");
860
0
861
0
    /* Create async task handle. */
862
0
    MVMROOT(tc, queue, {
863
0
    MVMROOT(tc, schedulee, {
864
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
865
0
    });
866
0
    });
867
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
868
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
869
0
    task->body.ops  = &listen_op_table;
870
0
    li              = MVM_calloc(1, sizeof(ListenInfo));
871
0
    li->dest        = dest;
872
0
    li->backlog     = backlog;
873
0
    task->body.data = li;
874
0
875
0
    /* Hand the task off to the event loop. */
876
0
    MVMROOT(tc, task, {
877
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
878
0
    });
879
0
880
0
    return (MVMObject *)task;
881
0
}