Coverage Report

Created: 2018-07-03 15:31

/home/travis/build/MoarVM/MoarVM/src/io/asyncsocket.c
Line
Count
Source (jump to first uncovered line)
1
#include "moar.h"
2
3
/* Data that we keep for an asynchronous socket handle. */
4
typedef struct {
5
    /* The libuv handle to the socket. */
6
    uv_stream_t *handle;
7
} MVMIOAsyncSocketData;
8
9
/* Info we convey about a read task. */
10
typedef struct {
11
    MVMOSHandle      *handle;
12
    MVMObject        *buf_type;
13
    int               seq_number;
14
    MVMThreadContext *tc;
15
    int               work_idx;
16
} ReadInfo;
17
18
/* Allocates a buffer of the suggested size. */
19
0
static void on_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
20
0
    size_t size = suggested_size > 0 ? suggested_size : 4;
21
0
    buf->base   = MVM_malloc(size);
22
0
    buf->len    = size;
23
0
}
24
25
/* Callback used to simply free memory on close. */
26
0
static void free_on_close_cb(uv_handle_t *handle) {
27
0
    MVM_free(handle);
28
0
}
29
30
/* Read handler. */
31
0
static void on_read(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
32
0
    ReadInfo         *ri  = (ReadInfo *)handle->data;
33
0
    MVMThreadContext *tc  = ri->tc;
34
0
    MVMObject        *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
35
0
    MVMAsyncTask     *t   = MVM_io_eventloop_get_active_work(tc, ri->work_idx);
36
0
    MVM_repr_push_o(tc, arr, t->body.schedulee);
37
0
    if (nread >= 0) {
38
0
        MVMROOT2(tc, t, arr, {
39
0
            MVMArray *res_buf;
40
0
41
0
            /* Push the sequence number. */
42
0
            MVMObject *seq_boxed = MVM_repr_box_int(tc,
43
0
                tc->instance->boot_types.BOOTInt, ri->seq_number++);
44
0
            MVM_repr_push_o(tc, arr, seq_boxed);
45
0
46
0
            /* Produce a buffer and push it. */
47
0
            res_buf      = (MVMArray *)MVM_repr_alloc_init(tc, ri->buf_type);
48
0
            res_buf->body.slots.i8 = (MVMint8 *)buf->base;
49
0
            res_buf->body.start    = 0;
50
0
            res_buf->body.ssize    = buf->len;
51
0
            res_buf->body.elems    = nread;
52
0
            MVM_repr_push_o(tc, arr, (MVMObject *)res_buf);
53
0
54
0
            /* Finally, no error. */
55
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
56
0
        });
57
0
    }
58
0
    else {
59
0
        MVMIOAsyncSocketData *handle_data = (MVMIOAsyncSocketData *)ri->handle->body.data;
60
0
        uv_handle_t *conn_handle = (uv_handle_t *)handle_data->handle;
61
0
        if (nread == UV_EOF) {
62
0
            MVMROOT2(tc, t, arr, {
63
0
                MVMObject *final = MVM_repr_box_int(tc,
64
0
                    tc->instance->boot_types.BOOTInt, ri->seq_number);
65
0
                MVM_repr_push_o(tc, arr, final);
66
0
                MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
67
0
                MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
68
0
            });
69
0
        }
70
0
        else {
71
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
72
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
73
0
            MVMROOT2(tc, t, arr, {
74
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
75
0
                    tc->instance->VMString, uv_strerror(nread));
76
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
77
0
                    tc->instance->boot_types.BOOTStr, msg_str);
78
0
                MVM_repr_push_o(tc, arr, msg_box);
79
0
            });
80
0
        }
81
0
        if (buf->base)
82
0
            MVM_free(buf->base);
83
0
        MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx));
84
0
        if (conn_handle && !uv_is_closing(conn_handle)) {
85
0
            handle_data->handle = NULL;
86
0
            uv_close(conn_handle, free_on_close_cb);
87
0
        }
88
0
    }
89
0
    MVM_repr_push_o(tc, t->body.queue, arr);
90
0
}
91
92
/* Does setup work for setting up asynchronous reads. */
93
0
static void read_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
94
0
    MVMIOAsyncSocketData *handle_data;
95
0
    ReadInfo             *ri;
96
0
    int                   r;
97
0
98
0
    /* Ensure not closed. */
99
0
    ri = (ReadInfo *)data;
100
0
    handle_data = (MVMIOAsyncSocketData *)ri->handle->body.data;
101
0
    if (!handle_data->handle || uv_is_closing((uv_handle_t *)handle_data->handle)) {
102
0
        /* Closed, so immediately send done. */
103
0
        MVMAsyncTask *t = (MVMAsyncTask *)async_task;
104
0
        MVMROOT(tc, t, {
105
0
            MVMObject *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
106
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
107
0
            MVMROOT(tc, arr, {
108
0
                MVMObject *final = MVM_repr_box_int(tc,
109
0
                    tc->instance->boot_types.BOOTInt, ri->seq_number);
110
0
                MVM_repr_push_o(tc, arr, final);
111
0
            });
112
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
113
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
114
0
            MVM_repr_push_o(tc, t->body.queue, arr);
115
0
        });
116
0
        return;
117
0
    }
118
0
119
0
    /* Add to work in progress. */
120
0
    ri->tc        = tc;
121
0
    ri->work_idx  = MVM_io_eventloop_add_active_work(tc, async_task);
122
0
123
0
    /* Start reading the stream. */
124
0
    handle_data->handle->data = data;
125
0
    if ((r = uv_read_start(handle_data->handle, on_alloc, on_read)) < 0) {
126
0
        /* Error; need to notify. */
127
0
        MVMROOT(tc, async_task, {
128
0
            MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
129
0
            MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
130
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
131
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
132
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
133
0
            MVMROOT(tc, arr, {
134
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
135
0
                    tc->instance->VMString, uv_strerror(r));
136
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
137
0
                    tc->instance->boot_types.BOOTStr, msg_str);
138
0
                MVM_repr_push_o(tc, arr, msg_box);
139
0
            });
140
0
            MVM_repr_push_o(tc, t->body.queue, arr);
141
0
        });
142
0
        MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx));
143
0
    }
144
0
}
145
146
/* Stops reading. */
147
0
static void read_cancel(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
148
0
    ReadInfo *ri = (ReadInfo *)data;
149
0
    if (ri->work_idx >= 0) {
150
0
        MVMIOAsyncSocketData *handle_data = (MVMIOAsyncSocketData *)ri->handle->body.data;
151
0
        if (handle_data->handle && !uv_is_closing((uv_handle_t *)handle_data->handle))
152
0
            uv_read_stop(handle_data->handle);
153
0
        MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx));
154
0
    }
155
0
}
156
157
/* Marks objects for a read task. */
158
0
static void read_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) {
159
0
    ReadInfo *ri = (ReadInfo *)data;
160
0
    MVM_gc_worklist_add(tc, worklist, &ri->buf_type);
161
0
    MVM_gc_worklist_add(tc, worklist, &ri->handle);
162
0
}
163
164
/* Frees info for a read task. */
165
0
static void read_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
166
0
    if (data)
167
0
        MVM_free(data);
168
0
}
169
170
/* Operations table for async read task. */
171
static const MVMAsyncTaskOps read_op_table = {
172
    read_setup,
173
    NULL,
174
    read_cancel,
175
    read_gc_mark,
176
    read_gc_free
177
};
178
179
static MVMAsyncTask * read_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue,
180
0
                                 MVMObject *schedulee, MVMObject *buf_type, MVMObject *async_type) {
181
0
    MVMAsyncTask *task;
182
0
    ReadInfo    *ri;
183
0
184
0
    /* Validate REPRs. */
185
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
186
0
        MVM_exception_throw_adhoc(tc,
187
0
            "asyncreadbytes target queue must have ConcBlockingQueue REPR (got %s)",
188
0
             MVM_6model_get_stable_debug_name(tc, queue->st));
189
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
190
0
        MVM_exception_throw_adhoc(tc,
191
0
            "asyncreadbytes result type must have REPR AsyncTask");
192
0
    if (REPR(buf_type)->ID == MVM_REPR_ID_VMArray) {
193
0
        MVMint32 slot_type = ((MVMArrayREPRData *)STABLE(buf_type)->REPR_data)->slot_type;
194
0
        if (slot_type != MVM_ARRAY_U8 && slot_type != MVM_ARRAY_I8)
195
0
            MVM_exception_throw_adhoc(tc, "asyncreadbytes buffer type must be an array of uint8 or int8");
196
0
    }
197
0
    else {
198
0
        MVM_exception_throw_adhoc(tc, "asyncreadbytes buffer type must be an array");
199
0
    }
200
0
201
0
    /* Create async task handle. */
202
0
    MVMROOT4(tc, queue, schedulee, h, buf_type, {
203
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
204
0
    });
205
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
206
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
207
0
    task->body.ops  = &read_op_table;
208
0
    ri              = MVM_calloc(1, sizeof(ReadInfo));
209
0
    MVM_ASSIGN_REF(tc, &(task->common.header), ri->buf_type, buf_type);
210
0
    MVM_ASSIGN_REF(tc, &(task->common.header), ri->handle, h);
211
0
    task->body.data = ri;
212
0
213
0
    /* Hand the task off to the event loop. */
214
0
    MVMROOT(tc, task, {
215
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
216
0
    });
217
0
218
0
    return task;
219
0
}
220
221
/* Info we convey about a write task. */
222
typedef struct {
223
    MVMOSHandle      *handle;
224
    MVMObject        *buf_data;
225
    uv_write_t       *req;
226
    uv_buf_t          buf;
227
    MVMThreadContext *tc;
228
    int               work_idx;
229
} WriteInfo;
230
231
/* Completion handler for an asynchronous write. */
232
0
static void on_write(uv_write_t *req, int status) {
233
0
    WriteInfo        *wi  = (WriteInfo *)req->data;
234
0
    MVMThreadContext *tc  = wi->tc;
235
0
    MVMObject        *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
236
0
    MVMAsyncTask     *t   = MVM_io_eventloop_get_active_work(tc, wi->work_idx);
237
0
    MVM_repr_push_o(tc, arr, t->body.schedulee);
238
0
    if (status >= 0) {
239
0
        MVMROOT2(tc, arr, t, {
240
0
            MVMObject *bytes_box = MVM_repr_box_int(tc,
241
0
                tc->instance->boot_types.BOOTInt,
242
0
                wi->buf.len);
243
0
            MVM_repr_push_o(tc, arr, bytes_box);
244
0
        });
245
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
246
0
    }
247
0
    else {
248
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
249
0
        MVMROOT2(tc, arr, t, {
250
0
            MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
251
0
                tc->instance->VMString, uv_strerror(status));
252
0
            MVMObject *msg_box = MVM_repr_box_str(tc,
253
0
                tc->instance->boot_types.BOOTStr, msg_str);
254
0
            MVM_repr_push_o(tc, arr, msg_box);
255
0
        });
256
0
    }
257
0
    MVM_repr_push_o(tc, t->body.queue, arr);
258
0
    MVM_free(wi->req);
259
0
    MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx));
260
0
}
261
262
/* Does setup work for an asynchronous write. */
263
0
static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
264
0
    MVMIOAsyncSocketData *handle_data;
265
0
    MVMArray             *buffer;
266
0
    WriteInfo            *wi;
267
0
    char                 *output;
268
0
    int                   output_size, r;
269
0
270
0
    /* Ensure not closed. */
271
0
    wi = (WriteInfo *)data;
272
0
    handle_data = (MVMIOAsyncSocketData *)wi->handle->body.data;
273
0
    if (!handle_data->handle || uv_is_closing((uv_handle_t *)handle_data->handle)) {
274
0
        MVMROOT(tc, async_task, {
275
0
            MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
276
0
            MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
277
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
278
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
279
0
            MVMROOT(tc, arr, {
280
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
281
0
                    tc->instance->VMString, "Cannot write to a closed socket");
282
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
283
0
                    tc->instance->boot_types.BOOTStr, msg_str);
284
0
                MVM_repr_push_o(tc, arr, msg_box);
285
0
            });
286
0
            MVM_repr_push_o(tc, t->body.queue, arr);
287
0
        });
288
0
        return;
289
0
    }
290
0
291
0
    /* Add to work in progress. */
292
0
    wi->tc = tc;
293
0
    wi->work_idx = MVM_io_eventloop_add_active_work(tc, async_task);
294
0
295
0
    /* Extract buf data. */
296
0
    buffer = (MVMArray *)wi->buf_data;
297
0
    output = (char *)(buffer->body.slots.i8 + buffer->body.start);
298
0
    output_size = (int)buffer->body.elems;
299
0
300
0
    /* Create and initialize write request. */
301
0
    wi->req           = MVM_malloc(sizeof(uv_write_t));
302
0
    wi->buf           = uv_buf_init(output, output_size);
303
0
    wi->req->data     = data;
304
0
305
0
    if ((r = uv_write(wi->req, handle_data->handle, &(wi->buf), 1, on_write)) < 0) {
306
0
        /* Error; need to notify. */
307
0
        MVMROOT(tc, async_task, {
308
0
            MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
309
0
            MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
310
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
311
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
312
0
            MVMROOT(tc, arr, {
313
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
314
0
                    tc->instance->VMString, uv_strerror(r));
315
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
316
0
                    tc->instance->boot_types.BOOTStr, msg_str);
317
0
                MVM_repr_push_o(tc, arr, msg_box);
318
0
            });
319
0
            MVM_repr_push_o(tc, t->body.queue, arr);
320
0
        });
321
0
322
0
        /* Cleanup handle. */
323
0
        MVM_free(wi->req);
324
0
        wi->req = NULL;
325
0
        MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx));
326
0
    }
327
0
}
328
329
/* Marks objects for a write task. */
330
0
static void write_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) {
331
0
    WriteInfo *wi = (WriteInfo *)data;
332
0
    MVM_gc_worklist_add(tc, worklist, &wi->handle);
333
0
    MVM_gc_worklist_add(tc, worklist, &wi->buf_data);
334
0
}
335
336
/* Frees info for a write task. */
337
0
static void write_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
338
0
    if (data)
339
0
        MVM_free(data);
340
0
}
341
342
/* Operations table for async write task. */
343
static const MVMAsyncTaskOps write_op_table = {
344
    write_setup,
345
    NULL,
346
    NULL,
347
    write_gc_mark,
348
    write_gc_free
349
};
350
351
static MVMAsyncTask * write_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue,
352
0
                                  MVMObject *schedulee, MVMObject *buffer, MVMObject *async_type) {
353
0
    MVMAsyncTask *task;
354
0
    WriteInfo    *wi;
355
0
356
0
    /* Validate REPRs. */
357
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
358
0
        MVM_exception_throw_adhoc(tc,
359
0
            "asyncwritebytes target queue must have ConcBlockingQueue REPR");
360
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
361
0
        MVM_exception_throw_adhoc(tc,
362
0
            "asyncwritebytes result type must have REPR AsyncTask");
363
0
    if (!IS_CONCRETE(buffer) || REPR(buffer)->ID != MVM_REPR_ID_VMArray)
364
0
        MVM_exception_throw_adhoc(tc, "asyncwritebytes requires a native array to read from");
365
0
    if (((MVMArrayREPRData *)STABLE(buffer)->REPR_data)->slot_type != MVM_ARRAY_U8
366
0
        && ((MVMArrayREPRData *)STABLE(buffer)->REPR_data)->slot_type != MVM_ARRAY_I8)
367
0
        MVM_exception_throw_adhoc(tc, "asyncwritebytes requires a native array of uint8 or int8");
368
0
369
0
    /* Create async task handle. */
370
0
    MVMROOT4(tc, queue, schedulee, h, buffer, {
371
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
372
0
    });
373
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
374
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
375
0
    task->body.ops  = &write_op_table;
376
0
    wi              = MVM_calloc(1, sizeof(WriteInfo));
377
0
    MVM_ASSIGN_REF(tc, &(task->common.header), wi->handle, h);
378
0
    MVM_ASSIGN_REF(tc, &(task->common.header), wi->buf_data, buffer);
379
0
    task->body.data = wi;
380
0
381
0
    /* Hand the task off to the event loop. */
382
0
    MVMROOT(tc, task, {
383
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
384
0
    });
385
0
386
0
    return task;
387
0
}
388
389
/* Info we convey about a socket close task. */
390
typedef struct {
391
    MVMOSHandle *handle;
392
} CloseInfo;
393
394
/* Does an asynchronous close (since it must run on the event loop). */
395
0
static void close_perform(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
396
0
    CloseInfo *ci = (CloseInfo *)data;
397
0
    MVMIOAsyncSocketData *handle_data = (MVMIOAsyncSocketData *)ci->handle->body.data;
398
0
    uv_handle_t *handle = (uv_handle_t *)handle_data->handle;
399
0
    if (handle && !uv_is_closing(handle)) {
400
0
        handle_data->handle = NULL;
401
0
        uv_close(handle, free_on_close_cb);
402
0
    }
403
0
}
404
405
/* Marks objects for a close task. */
406
0
static void close_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) {
407
0
    CloseInfo *ci = (CloseInfo *)data;
408
0
    MVM_gc_worklist_add(tc, worklist, &ci->handle);
409
0
}
410
411
/* Frees info for a close task. */
412
0
static void close_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
413
0
    if (data)
414
0
        MVM_free(data);
415
0
}
416
417
/* Operations table for async close task. */
418
static const MVMAsyncTaskOps close_op_table = {
419
    close_perform,
420
    NULL,
421
    NULL,
422
    close_gc_mark,
423
    close_gc_free
424
};
425
426
0
static MVMint64 close_socket(MVMThreadContext *tc, MVMOSHandle *h) {
427
0
    MVMAsyncTask *task;
428
0
    CloseInfo *ci;
429
0
430
0
    MVMROOT(tc, h, {
431
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc,
432
0
            tc->instance->boot_types.BOOTAsync);
433
0
    });
434
0
    task->body.ops = &close_op_table;
435
0
    ci = MVM_calloc(1, sizeof(CloseInfo));
436
0
    MVM_ASSIGN_REF(tc, &(task->common.header), ci->handle, h);
437
0
    task->body.data = ci;
438
0
    MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
439
0
440
0
    return 0;
441
0
}
442
443
/* IO ops table, populated with functions. */
444
static const MVMIOClosable      closable       = { close_socket };
445
static const MVMIOAsyncReadable async_readable = { read_bytes };
446
static const MVMIOAsyncWritable async_writable = { write_bytes };
447
static const MVMIOOps op_table = {
448
    &closable,
449
    NULL,
450
    NULL,
451
    &async_readable,
452
    &async_writable,
453
    NULL,
454
    NULL,
455
    NULL,
456
    NULL,
457
    NULL,
458
    NULL,
459
    NULL,
460
    NULL,
461
    NULL
462
};
463
464
0
static void push_name_and_port(MVMThreadContext *tc, struct sockaddr_storage *name, MVMObject *arr) {
465
0
    char addrstr[INET6_ADDRSTRLEN + 1];
466
0
    /* XXX windows support kludge. 64 bit is much too big, but we'll
467
0
     * get the proper data from the struct anyway, however windows
468
0
     * decides to declare it. */
469
0
    MVMuint64 port;
470
0
    MVMObject *host_o;
471
0
    MVMObject *port_o;
472
0
    switch (name->ss_family) {
473
0
        case AF_INET6: {
474
0
            uv_ip6_name((struct sockaddr_in6*)name, addrstr, INET6_ADDRSTRLEN + 1);
475
0
            port = ntohs(((struct sockaddr_in6*)name)->sin6_port);
476
0
            break;
477
0
        }
478
0
        case AF_INET: {
479
0
            uv_ip4_name((struct sockaddr_in*)name, addrstr, INET6_ADDRSTRLEN + 1);
480
0
            port = ntohs(((struct sockaddr_in*)name)->sin_port);
481
0
            break;
482
0
        }
483
0
        default:
484
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
485
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
486
0
            return;
487
0
            break;
488
0
    }
489
0
    MVMROOT(tc, arr, {
490
0
        port_o = MVM_repr_box_int(tc, tc->instance->boot_types.BOOTInt, port);
491
0
        MVMROOT(tc, port_o, {
492
0
            host_o = (MVMObject *)MVM_repr_box_str(tc, tc->instance->boot_types.BOOTStr,
493
0
                    MVM_string_ascii_decode_nt(tc, tc->instance->VMString, addrstr));
494
0
        });
495
0
    });
496
0
    MVM_repr_push_o(tc, arr, host_o);
497
0
    MVM_repr_push_o(tc, arr, port_o);
498
0
}
499
500
/* Info we convey about a connection attempt task. */
501
typedef struct {
502
    struct sockaddr  *dest;
503
    uv_tcp_t         *socket;
504
    uv_connect_t     *connect;
505
    MVMThreadContext *tc;
506
    int               work_idx;
507
} ConnectInfo;
508
509
/* When a connection takes place, need to send result. */
510
0
static void on_connect(uv_connect_t* req, int status) {
511
0
    ConnectInfo      *ci  = (ConnectInfo *)req->data;
512
0
    MVMThreadContext *tc  = ci->tc;
513
0
    MVMObject        *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
514
0
    MVMAsyncTask     *t   = MVM_io_eventloop_get_active_work(tc, ci->work_idx);
515
0
    MVM_repr_push_o(tc, arr, t->body.schedulee);
516
0
    if (status >= 0) {
517
0
        /* Allocate and set up handle. */
518
0
        MVMROOT2(tc, arr, t, {
519
0
            MVMOSHandle          *result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO);
520
0
            MVMIOAsyncSocketData *data   = MVM_calloc(1, sizeof(MVMIOAsyncSocketData));
521
0
            data->handle                 = (uv_stream_t *)ci->socket;
522
0
            result->body.ops             = &op_table;
523
0
            result->body.data            = data;
524
0
            MVM_repr_push_o(tc, arr, (MVMObject *)result);
525
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
526
0
            {
527
0
                struct sockaddr_storage sockaddr;
528
0
                int name_len = sizeof(struct sockaddr_storage);
529
0
530
0
                uv_tcp_getpeername(ci->socket, (struct sockaddr *)&sockaddr, &name_len);
531
0
                push_name_and_port(tc, &sockaddr, arr);
532
0
533
0
                uv_tcp_getsockname(ci->socket, (struct sockaddr *)&sockaddr, &name_len);
534
0
                push_name_and_port(tc, &sockaddr, arr);
535
0
            }
536
0
        });
537
0
    }
538
0
    else {
539
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO);
540
0
        MVMROOT2(tc, arr, t, {
541
0
            MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
542
0
                tc->instance->VMString, uv_strerror(status));
543
0
            MVMObject *msg_box = MVM_repr_box_str(tc,
544
0
                tc->instance->boot_types.BOOTStr, msg_str);
545
0
            MVM_repr_push_o(tc, arr, msg_box);
546
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
547
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
548
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
549
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
550
0
        });
551
0
    }
552
0
    MVM_repr_push_o(tc, t->body.queue, arr);
553
0
    MVM_free(req);
554
0
    MVM_io_eventloop_remove_active_work(tc, &(ci->work_idx));
555
0
}
556
557
/* Initilalize the connection on the event loop. */
558
0
static void connect_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
559
0
    int r;
560
0
561
0
    /* Add to work in progress. */
562
0
    ConnectInfo *ci = (ConnectInfo *)data;
563
0
    ci->tc        = tc;
564
0
    ci->work_idx  = MVM_io_eventloop_add_active_work(tc, async_task);
565
0
566
0
    /* Create and initialize socket and connection. */
567
0
    ci->socket        = MVM_malloc(sizeof(uv_tcp_t));
568
0
    ci->connect       = MVM_malloc(sizeof(uv_connect_t));
569
0
    ci->connect->data = data;
570
0
    if ((r = uv_tcp_init(loop, ci->socket)) < 0 ||
571
0
        (r = uv_tcp_connect(ci->connect, ci->socket, ci->dest, on_connect)) < 0) {
572
0
        /* Error; need to notify. */
573
0
        MVMROOT(tc, async_task, {
574
0
            MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
575
0
            MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
576
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
577
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO);
578
0
            MVMROOT(tc, arr, {
579
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
580
0
                    tc->instance->VMString, uv_strerror(r));
581
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
582
0
                    tc->instance->boot_types.BOOTStr, msg_str);
583
0
                MVM_repr_push_o(tc, arr, msg_box);
584
0
                MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
585
0
                MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
586
0
                MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
587
0
                MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
588
0
            });
589
0
            MVM_repr_push_o(tc, t->body.queue, arr);
590
0
        });
591
0
592
0
        /* Cleanup handles. */
593
0
        MVM_free(ci->connect);
594
0
        ci->connect = NULL;
595
0
        uv_close((uv_handle_t *)ci->socket, free_on_close_cb);
596
0
        ci->socket = NULL;
597
0
        MVM_io_eventloop_remove_active_work(tc, &(ci->work_idx));
598
0
    }
599
0
}
600
601
/* Frees info for a connection task. */
602
0
static void connect_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
603
0
    if (data) {
604
0
        ConnectInfo *ci = (ConnectInfo *)data;
605
0
        if (ci->dest)
606
0
            MVM_free(ci->dest);
607
0
        MVM_free(ci);
608
0
    }
609
0
}
610
611
/* Operations table for async connect task. */
612
static const MVMAsyncTaskOps connect_op_table = {
613
    connect_setup,
614
    NULL,
615
    NULL,
616
    NULL,
617
    connect_gc_free
618
};
619
620
/* Sets off an asynchronous socket connection. */
621
MVMObject * MVM_io_socket_connect_async(MVMThreadContext *tc, MVMObject *queue,
622
                                        MVMObject *schedulee, MVMString *host,
623
0
                                        MVMint64 port, MVMObject *async_type) {
624
0
    MVMAsyncTask *task;
625
0
    ConnectInfo  *ci;
626
0
    struct sockaddr *dest;
627
0
628
0
    /* Validate REPRs. */
629
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
630
0
        MVM_exception_throw_adhoc(tc,
631
0
            "asyncconnect target queue must have ConcBlockingQueue REPR");
632
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
633
0
        MVM_exception_throw_adhoc(tc,
634
0
            "asyncconnect result type must have REPR AsyncTask");
635
0
636
0
    /* Resolve hostname. (Could be done asynchronously too.) */
637
0
    MVMROOT3(tc, queue, schedulee, async_type, {
638
0
        dest = MVM_io_resolve_host_name(tc, host, port);
639
0
    });
640
0
641
0
    /* Create async task handle. */
642
0
    MVMROOT2(tc, queue, schedulee, {
643
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
644
0
    });
645
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
646
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
647
0
    task->body.ops  = &connect_op_table;
648
0
    ci              = MVM_calloc(1, sizeof(ConnectInfo));
649
0
    ci->dest        = dest;
650
0
    task->body.data = ci;
651
0
652
0
    /* Hand the task off to the event loop. */
653
0
    MVMROOT(tc, task, {
654
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
655
0
    });
656
0
657
0
    return (MVMObject *)task;
658
0
}
659
660
/* Info we convey about a socket listen task. */
661
typedef struct {
662
    struct sockaddr  *dest;
663
    uv_tcp_t         *socket;
664
    MVMThreadContext *tc;
665
    int               work_idx;
666
    int               backlog;
667
} ListenInfo;
668
669
670
/* Handles an incoming connection. */
671
0
static void on_connection(uv_stream_t *server, int status) {
672
0
    ListenInfo       *li     = (ListenInfo *)server->data;
673
0
    MVMThreadContext *tc     = li->tc;
674
0
    MVMObject        *arr    = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
675
0
    MVMAsyncTask     *t      = MVM_io_eventloop_get_active_work(tc, li->work_idx);
676
0
677
0
    uv_tcp_t         *client = MVM_malloc(sizeof(uv_tcp_t));
678
0
    int               r;
679
0
    uv_tcp_init(tc->loop, client);
680
0
681
0
    MVM_repr_push_o(tc, arr, t->body.schedulee);
682
0
    if ((r = uv_accept(server, (uv_stream_t *)client)) == 0) {
683
0
        /* Allocate and set up handle. */
684
0
        MVMROOT2(tc, arr, t, {
685
0
            MVMOSHandle          *result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO);
686
0
            MVMIOAsyncSocketData *data   = MVM_calloc(1, sizeof(MVMIOAsyncSocketData));
687
0
            data->handle                 = (uv_stream_t *)client;
688
0
            result->body.ops             = &op_table;
689
0
            result->body.data            = data;
690
0
691
0
            MVM_repr_push_o(tc, arr, (MVMObject *)result);
692
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
693
0
694
0
            {
695
0
                struct sockaddr_storage sockaddr;
696
0
                int name_len = sizeof(struct sockaddr_storage);
697
0
698
0
                uv_tcp_getpeername(client, (struct sockaddr *)&sockaddr, &name_len);
699
0
                push_name_and_port(tc, &sockaddr, arr);
700
0
701
0
                uv_tcp_getsockname(client, (struct sockaddr *)&sockaddr, &name_len);
702
0
                push_name_and_port(tc, &sockaddr, arr);
703
0
            }
704
0
        });
705
0
    }
706
0
    else {
707
0
        uv_close((uv_handle_t*)client, NULL);
708
0
        MVM_free(client);
709
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO);
710
0
        MVMROOT2(tc, arr, t, {
711
0
            MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
712
0
                tc->instance->VMString, uv_strerror(r));
713
0
            MVMObject *msg_box = MVM_repr_box_str(tc,
714
0
                tc->instance->boot_types.BOOTStr, msg_str);
715
0
            MVM_repr_push_o(tc, arr, msg_box);
716
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
717
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
718
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
719
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
720
0
        });
721
0
    }
722
0
    MVM_repr_push_o(tc, t->body.queue, arr);
723
0
}
724
725
/* Sets up a socket listener. */
726
0
static void listen_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
727
0
    int r;
728
0
729
0
    /* Add to work in progress. */
730
0
    ListenInfo *li = (ListenInfo *)data;
731
0
    li->tc         = tc;
732
0
    li->work_idx   = MVM_io_eventloop_add_active_work(tc, async_task);
733
0
734
0
    /* Create and initialize socket and connection, and start listening. */
735
0
    li->socket        = MVM_malloc(sizeof(uv_tcp_t));
736
0
    li->socket->data  = data;
737
0
    if ((r = uv_tcp_init(loop, li->socket)) < 0 ||
738
0
        (r = uv_tcp_bind(li->socket, li->dest, 0)) < 0 ||
739
0
        (r = uv_listen((uv_stream_t *)li->socket, li->backlog, on_connection))) {
740
0
        /* Error; need to notify. */
741
0
        MVMROOT(tc, async_task, {
742
0
            MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
743
0
            MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
744
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
745
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO);
746
0
            MVMROOT(tc, arr, {
747
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
748
0
                    tc->instance->VMString, uv_strerror(r));
749
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
750
0
                    tc->instance->boot_types.BOOTStr, msg_str);
751
0
752
0
                MVM_repr_push_o(tc, arr, msg_box);
753
0
                MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
754
0
                MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
755
0
                MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
756
0
                MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
757
0
            });
758
0
            MVM_repr_push_o(tc, t->body.queue, arr);
759
0
        });
760
0
        uv_close((uv_handle_t *)li->socket, free_on_close_cb);
761
0
        li->socket = NULL;
762
0
        MVM_io_eventloop_remove_active_work(tc, &(li->work_idx));
763
0
        return;
764
0
    }
765
0
}
766
767
/* Stops listening. */
768
0
static void on_listen_cancelled(uv_handle_t *handle) {
769
0
    ListenInfo       *li = (ListenInfo *)handle->data;
770
0
    MVMThreadContext *tc = li->tc;
771
0
    MVM_io_eventloop_send_cancellation_notification(tc,
772
0
        MVM_io_eventloop_get_active_work(tc, li->work_idx));
773
0
    MVM_io_eventloop_remove_active_work(tc, &(li->work_idx));
774
0
}
775
0
static void listen_cancel(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
776
0
    ListenInfo *li = (ListenInfo *)data;
777
0
    if (li->socket) {
778
0
        uv_close((uv_handle_t *)li->socket, on_listen_cancelled);
779
0
        li->socket = NULL;
780
0
    }
781
0
}
782
783
/* Frees info for a listen task. */
784
0
static void listen_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
785
0
    if (data) {
786
0
        ListenInfo *li = (ListenInfo *)data;
787
0
        if (li->dest)
788
0
            MVM_free(li->dest);
789
0
        MVM_free(li);
790
0
    }
791
0
}
792
793
/* Operations table for async listen task. */
794
static const MVMAsyncTaskOps listen_op_table = {
795
    listen_setup,
796
    NULL,
797
    listen_cancel,
798
    NULL,
799
    listen_gc_free
800
};
801
802
/* Initiates an async socket listener. */
803
MVMObject * MVM_io_socket_listen_async(MVMThreadContext *tc, MVMObject *queue,
804
                                       MVMObject *schedulee, MVMString *host,
805
0
                                       MVMint64 port, MVMint32 backlog, MVMObject *async_type) {
806
0
    MVMAsyncTask *task;
807
0
    ListenInfo   *li;
808
0
    struct sockaddr *dest;
809
0
810
0
    /* Validate REPRs. */
811
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
812
0
        MVM_exception_throw_adhoc(tc,
813
0
            "asynclisten target queue must have ConcBlockingQueue REPR");
814
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
815
0
        MVM_exception_throw_adhoc(tc,
816
0
            "asynclisten result type must have REPR AsyncTask");
817
0
818
0
    /* Resolve hostname. (Could be done asynchronously too.) */
819
0
    MVMROOT3(tc, queue, schedulee, async_type, {
820
0
        dest = MVM_io_resolve_host_name(tc, host, port);
821
0
    });
822
0
823
0
    /* Create async task handle. */
824
0
    MVMROOT2(tc, queue, schedulee, {
825
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
826
0
    });
827
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
828
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
829
0
    task->body.ops  = &listen_op_table;
830
0
    li              = MVM_calloc(1, sizeof(ListenInfo));
831
0
    li->dest        = dest;
832
0
    li->backlog     = backlog;
833
0
    task->body.data = li;
834
0
835
0
    /* Hand the task off to the event loop. */
836
0
    MVMROOT(tc, task, {
837
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
838
0
    });
839
0
840
0
    return (MVMObject *)task;
841
0
}