Coverage Report

Created: 2017-04-15 07:07

/home/travis/build/MoarVM/MoarVM/src/io/asyncsocketudp.c
Line
Count
Source (jump to first uncovered line)
1
#include "moar.h"
2
3
/* Number of bytes we accept per read. */
4
#define CHUNK_SIZE 65536
5
6
/* Data that we keep for an asynchronous UDP socket handle. */
7
typedef struct {
8
    /* The libuv handle to the socket. */
9
    uv_udp_t *handle;
10
11
    /* Decode stream, for turning bytes into strings. */
12
    MVMDecodeStream *ds;
13
} MVMIOAsyncUDPSocketData;
14
15
/* Info we convey about a read task. */
16
typedef struct {
17
    MVMOSHandle      *handle;
18
    MVMDecodeStream  *ds;
19
    MVMObject        *buf_type;
20
    int               seq_number;
21
    MVMThreadContext *tc;
22
    int               work_idx;
23
} ReadInfo;
24
25
/* Allocates a buffer of the suggested size. */
26
0
static void on_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
27
0
    size_t size = suggested_size > 0 ? suggested_size : 4;
28
0
    buf->base   = MVM_malloc(size);
29
0
    buf->len    = size;
30
0
}
31
32
/* Callback used to simply free memory on close. */
33
0
static void free_on_close_cb(uv_handle_t *handle) {
34
0
    MVM_free(handle);
35
0
}
36
37
/* Read handler. */
38
0
static void on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
39
0
    ReadInfo         *ri  = (ReadInfo *)handle->data;
40
0
    MVMThreadContext *tc  = ri->tc;
41
0
    MVMObject        *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
42
0
    MVMAsyncTask     *t   = MVM_io_eventloop_get_active_work(tc, ri->work_idx);
43
0
    MVM_repr_push_o(tc, arr, t->body.schedulee);
44
0
    if (nread >= 0) {
45
0
        MVMROOT(tc, t, {
46
0
        MVMROOT(tc, arr, {
47
0
            /* Push the sequence number. */
48
0
            MVMObject *seq_boxed = MVM_repr_box_int(tc,
49
0
                tc->instance->boot_types.BOOTInt, ri->seq_number++);
50
0
            MVM_repr_push_o(tc, arr, seq_boxed);
51
0
52
0
            /* Either need to produce a buffer or decode characters. */
53
0
            if (ri->ds) {
54
0
                MVMString *str;
55
0
                MVMObject *boxed_str;
56
0
                MVM_string_decodestream_add_bytes(tc, ri->ds, buf->base, nread);
57
0
                str = MVM_string_decodestream_get_all(tc, ri->ds);
58
0
                boxed_str = MVM_repr_box_str(tc, tc->instance->boot_types.BOOTStr, str);
59
0
                MVM_repr_push_o(tc, arr, boxed_str);
60
0
            }
61
0
            else {
62
0
                MVMArray *res_buf      = (MVMArray *)MVM_repr_alloc_init(tc, ri->buf_type);
63
0
                res_buf->body.slots.i8 = (MVMint8 *)buf->base;
64
0
                res_buf->body.start    = 0;
65
0
                res_buf->body.ssize    = buf->len;
66
0
                res_buf->body.elems    = nread;
67
0
                MVM_repr_push_o(tc, arr, (MVMObject *)res_buf);
68
0
            }
69
0
70
0
            /* Finally, no error. */
71
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
72
0
        });
73
0
        });
74
0
    }
75
0
    else if (nread == UV_EOF) {
76
0
        MVMROOT(tc, t, {
77
0
        MVMROOT(tc, arr, {
78
0
            MVMObject *final = MVM_repr_box_int(tc,
79
0
                tc->instance->boot_types.BOOTInt, ri->seq_number);
80
0
            MVM_repr_push_o(tc, arr, final);
81
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
82
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
83
0
        });
84
0
        });
85
0
        if (buf->base)
86
0
            MVM_free(buf->base);
87
0
        uv_udp_recv_stop(handle);
88
0
        MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx));
89
0
    }
90
0
    else {
91
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
92
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
93
0
        MVMROOT(tc, t, {
94
0
        MVMROOT(tc, arr, {
95
0
            MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
96
0
                tc->instance->VMString, uv_strerror(nread));
97
0
            MVMObject *msg_box = MVM_repr_box_str(tc,
98
0
                tc->instance->boot_types.BOOTStr, msg_str);
99
0
            MVM_repr_push_o(tc, arr, msg_box);
100
0
        });
101
0
        });
102
0
        if (buf->base)
103
0
            MVM_free(buf->base);
104
0
        uv_udp_recv_stop(handle);
105
0
        MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx));
106
0
    }
107
0
    MVM_repr_push_o(tc, t->body.queue, arr);
108
0
}
109
110
/* Does setup work for setting up asynchronous reads. */
111
0
static void read_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
112
0
    MVMIOAsyncUDPSocketData *handle_data;
113
0
    int                   r;
114
0
115
0
    /* Add to work in progress. */
116
0
    ReadInfo *ri  = (ReadInfo *)data;
117
0
    ri->tc        = tc;
118
0
    ri->work_idx  = MVM_io_eventloop_add_active_work(tc, async_task);
119
0
120
0
    /* Start reading the stream. */
121
0
    handle_data = (MVMIOAsyncUDPSocketData *)ri->handle->body.data;
122
0
    handle_data->handle->data = data;
123
0
    if ((r = uv_udp_recv_start(handle_data->handle, on_alloc, on_read)) < 0) {
124
0
        /* Error; need to notify. */
125
0
        MVMROOT(tc, async_task, {
126
0
            MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
127
0
            MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
128
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
129
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
130
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
131
0
            MVMROOT(tc, arr, {
132
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
133
0
                    tc->instance->VMString, uv_strerror(r));
134
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
135
0
                    tc->instance->boot_types.BOOTStr, msg_str);
136
0
                MVM_repr_push_o(tc, arr, msg_box);
137
0
            });
138
0
            MVM_repr_push_o(tc, t->body.queue, arr);
139
0
        });
140
0
    }
141
0
}
142
143
/* Marks objects for a read task. */
144
0
static void read_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) {
145
0
    ReadInfo *ri = (ReadInfo *)data;
146
0
    MVM_gc_worklist_add(tc, worklist, &ri->buf_type);
147
0
    MVM_gc_worklist_add(tc, worklist, &ri->handle);
148
0
}
149
150
/* Frees info for a read task. */
151
0
static void read_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
152
0
    if (data) {
153
0
        ReadInfo *ri = (ReadInfo *)data;
154
0
        if (ri->ds)
155
0
            MVM_string_decodestream_destroy(tc, ri->ds);
156
0
        MVM_free(data);
157
0
    }
158
0
}
159
160
/* Operations table for async read task. */
161
static const MVMAsyncTaskOps read_op_table = {
162
    read_setup,
163
    NULL,
164
    read_gc_mark,
165
    read_gc_free
166
};
167
168
static MVMAsyncTask * read_chars(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue,
169
0
                                 MVMObject *schedulee, MVMObject *async_type) {
170
0
    MVMAsyncTask *task;
171
0
    ReadInfo    *ri;
172
0
173
0
    /* Validate REPRs. */
174
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
175
0
        MVM_exception_throw_adhoc(tc,
176
0
            "asyncreadchars target queue must have ConcBlockingQueue REPR");
177
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
178
0
        MVM_exception_throw_adhoc(tc,
179
0
            "asyncreadchars result type must have REPR AsyncTask");
180
0
181
0
    /* Create async task handle. */
182
0
    MVMROOT(tc, queue, {
183
0
    MVMROOT(tc, schedulee, {
184
0
    MVMROOT(tc, h, {
185
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
186
0
    });
187
0
    });
188
0
    });
189
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
190
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
191
0
    task->body.ops  = &read_op_table;
192
0
    ri              = MVM_calloc(1, sizeof(ReadInfo));
193
0
    ri->ds          = MVM_string_decodestream_create(tc, MVM_encoding_type_utf8, 0, 0);
194
0
    MVM_ASSIGN_REF(tc, &(task->common.header), ri->handle, h);
195
0
    task->body.data = ri;
196
0
197
0
    /* Hand the task off to the event loop. */
198
0
    MVMROOT(tc, task, {
199
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
200
0
    });
201
0
202
0
    return task;
203
0
}
204
205
static MVMAsyncTask * read_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue,
206
0
                                 MVMObject *schedulee, MVMObject *buf_type, MVMObject *async_type) {
207
0
    MVMAsyncTask *task;
208
0
    ReadInfo    *ri;
209
0
210
0
    /* Validate REPRs. */
211
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
212
0
        MVM_exception_throw_adhoc(tc,
213
0
            "asyncreadbytes target queue must have ConcBlockingQueue REPR");
214
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
215
0
        MVM_exception_throw_adhoc(tc,
216
0
            "asyncreadbytes result type must have REPR AsyncTask");
217
0
    if (REPR(buf_type)->ID == MVM_REPR_ID_VMArray) {
218
0
        MVMint32 slot_type = ((MVMArrayREPRData *)STABLE(buf_type)->REPR_data)->slot_type;
219
0
        if (slot_type != MVM_ARRAY_U8 && slot_type != MVM_ARRAY_I8)
220
0
            MVM_exception_throw_adhoc(tc, "asyncreadbytes buffer type must be an array of uint8 or int8");
221
0
    }
222
0
    else {
223
0
        MVM_exception_throw_adhoc(tc, "asyncreadbytes buffer type must be an array");
224
0
    }
225
0
226
0
    /* Create async task handle. */
227
0
    MVMROOT(tc, queue, {
228
0
    MVMROOT(tc, schedulee, {
229
0
    MVMROOT(tc, h, {
230
0
    MVMROOT(tc, buf_type, {
231
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
232
0
    });
233
0
    });
234
0
    });
235
0
    });
236
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
237
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
238
0
    task->body.ops  = &read_op_table;
239
0
    ri              = MVM_calloc(1, sizeof(ReadInfo));
240
0
    MVM_ASSIGN_REF(tc, &(task->common.header), ri->buf_type, buf_type);
241
0
    MVM_ASSIGN_REF(tc, &(task->common.header), ri->handle, h);
242
0
    task->body.data = ri;
243
0
244
0
    /* Hand the task off to the event loop. */
245
0
    MVMROOT(tc, task, {
246
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
247
0
    });
248
0
249
0
    return task;
250
0
}
251
252
/* Info we convey about a write task. */
253
typedef struct {
254
    MVMOSHandle      *handle;
255
    MVMString        *str_data;
256
    MVMObject        *buf_data;
257
    uv_udp_send_t    *req;
258
    uv_buf_t          buf;
259
    MVMThreadContext *tc;
260
    int               work_idx;
261
    struct sockaddr  *dest_addr;
262
} WriteInfo;
263
264
/* Completion handler for an asynchronous write. */
265
0
static void on_write(uv_udp_send_t *req, int status) {
266
0
    WriteInfo        *wi  = (WriteInfo *)req->data;
267
0
    MVMThreadContext *tc  = wi->tc;
268
0
    MVMObject        *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
269
0
    MVMAsyncTask     *t   = MVM_io_eventloop_get_active_work(tc, wi->work_idx);
270
0
    MVM_repr_push_o(tc, arr, t->body.schedulee);
271
0
    if (status >= 0) {
272
0
        MVMROOT(tc, arr, {
273
0
        MVMROOT(tc, t, {
274
0
            MVMObject *bytes_box = MVM_repr_box_int(tc,
275
0
                tc->instance->boot_types.BOOTInt,
276
0
                wi->buf.len);
277
0
            MVM_repr_push_o(tc, arr, bytes_box);
278
0
        });
279
0
        });
280
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
281
0
    }
282
0
    else {
283
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
284
0
        MVMROOT(tc, arr, {
285
0
        MVMROOT(tc, t, {
286
0
            MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
287
0
                tc->instance->VMString, uv_strerror(status));
288
0
            MVMObject *msg_box = MVM_repr_box_str(tc,
289
0
                tc->instance->boot_types.BOOTStr, msg_str);
290
0
            MVM_repr_push_o(tc, arr, msg_box);
291
0
        });
292
0
        });
293
0
    }
294
0
    MVM_repr_push_o(tc, t->body.queue, arr);
295
0
    if (wi->str_data)
296
0
        MVM_free(wi->buf.base);
297
0
    MVM_free(wi->req);
298
0
    MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx));
299
0
}
300
301
/* Does setup work for an asynchronous write. */
302
0
static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
303
0
    MVMIOAsyncUDPSocketData *handle_data;
304
0
    char                 *output;
305
0
    int                   output_size, r;
306
0
307
0
    /* Add to work in progress. */
308
0
    WriteInfo *wi = (WriteInfo *)data;
309
0
    wi->tc        = tc;
310
0
    wi->work_idx  = MVM_io_eventloop_add_active_work(tc, async_task);
311
0
312
0
    /* Encode the string, or extract buf data. */
313
0
    if (wi->str_data) {
314
0
        MVMuint64 output_size_64;
315
0
        output = MVM_string_utf8_encode(tc, wi->str_data, &output_size_64, 0);
316
0
        output_size = (int)output_size_64;
317
0
    }
318
0
    else {
319
0
        MVMArray *buffer = (MVMArray *)wi->buf_data;
320
0
        output = (char *)(buffer->body.slots.i8 + buffer->body.start);
321
0
        output_size = (int)buffer->body.elems;
322
0
    }
323
0
324
0
    /* Create and initialize write request. */
325
0
    wi->req           = MVM_malloc(sizeof(uv_udp_send_t));
326
0
    wi->buf           = uv_buf_init(output, output_size);
327
0
    wi->req->data     = data;
328
0
    handle_data       = (MVMIOAsyncUDPSocketData *)wi->handle->body.data;
329
0
330
0
    if (uv_is_closing((uv_handle_t *)handle_data->handle))
331
0
        MVM_exception_throw_adhoc(tc, "cannot write to a closed socket");
332
0
333
0
    if ((r = uv_udp_send(wi->req, handle_data->handle, &(wi->buf), 1, wi->dest_addr, on_write)) < 0) {
334
0
        /* Error; need to notify. */
335
0
        MVMROOT(tc, async_task, {
336
0
            MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
337
0
            MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
338
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
339
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
340
0
            MVMROOT(tc, arr, {
341
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
342
0
                    tc->instance->VMString, uv_strerror(r));
343
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
344
0
                    tc->instance->boot_types.BOOTStr, msg_str);
345
0
                MVM_repr_push_o(tc, arr, msg_box);
346
0
            });
347
0
            MVM_repr_push_o(tc, t->body.queue, arr);
348
0
        });
349
0
350
0
        /* Cleanup handle. */
351
0
        MVM_free(wi->req);
352
0
        wi->req = NULL;
353
0
        MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx));
354
0
    }
355
0
}
356
357
/* Marks objects for a write task. */
358
0
static void write_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) {
359
0
    WriteInfo *wi = (WriteInfo *)data;
360
0
    MVM_gc_worklist_add(tc, worklist, &wi->handle);
361
0
    MVM_gc_worklist_add(tc, worklist, &wi->str_data);
362
0
    MVM_gc_worklist_add(tc, worklist, &wi->buf_data);
363
0
}
364
365
/* Frees info for a write task. */
366
0
static void write_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
367
0
    if (data) {
368
0
        WriteInfo *wi = (WriteInfo *)data;
369
0
        if (wi->dest_addr)
370
0
            MVM_free(wi->dest_addr);
371
0
        MVM_free(data);
372
0
    }
373
0
}
374
375
/* Operations table for async write task. */
376
static const MVMAsyncTaskOps write_op_table = {
377
    write_setup,
378
    NULL,
379
    write_gc_mark,
380
    write_gc_free
381
};
382
383
static MVMAsyncTask * write_str_to(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue,
384
                                   MVMObject *schedulee, MVMString *s, MVMObject *async_type,
385
0
                                   MVMString *host, MVMint64 port) {
386
0
    MVMAsyncTask    *task;
387
0
    WriteInfo       *wi;
388
0
    struct sockaddr *dest_addr;
389
0
390
0
    /* Validate REPRs. */
391
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
392
0
        MVM_exception_throw_adhoc(tc,
393
0
            "asyncwritestrto target queue must have ConcBlockingQueue REPR");
394
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
395
0
        MVM_exception_throw_adhoc(tc,
396
0
            "asyncwritestrto result type must have REPR AsyncTask");
397
0
398
0
    /* Resolve destination. */
399
0
    dest_addr = MVM_io_resolve_host_name(tc, host, port);
400
0
401
0
    /* Create async task handle. */
402
0
    MVMROOT(tc, queue, {
403
0
    MVMROOT(tc, schedulee, {
404
0
    MVMROOT(tc, h, {
405
0
    MVMROOT(tc, s, {
406
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
407
0
    });
408
0
    });
409
0
    });
410
0
    });
411
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
412
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
413
0
    task->body.ops  = &write_op_table;
414
0
    wi              = MVM_calloc(1, sizeof(WriteInfo));
415
0
    MVM_ASSIGN_REF(tc, &(task->common.header), wi->handle, h);
416
0
    MVM_ASSIGN_REF(tc, &(task->common.header), wi->str_data, s);
417
0
    wi->dest_addr = dest_addr;
418
0
    task->body.data = wi;
419
0
420
0
    /* Hand the task off to the event loop. */
421
0
    MVMROOT(tc, task, {
422
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
423
0
    });
424
0
425
0
    return task;
426
0
}
427
428
static MVMAsyncTask * write_bytes_to(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue,
429
                                     MVMObject *schedulee, MVMObject *buffer, MVMObject *async_type,
430
0
                                     MVMString *host, MVMint64 port) {
431
0
    MVMAsyncTask    *task;
432
0
    WriteInfo       *wi;
433
0
    struct sockaddr *dest_addr;
434
0
435
0
    /* Validate REPRs. */
436
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
437
0
        MVM_exception_throw_adhoc(tc,
438
0
            "asyncwritebytesto target queue must have ConcBlockingQueue REPR");
439
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
440
0
        MVM_exception_throw_adhoc(tc,
441
0
            "asyncwritebytesto result type must have REPR AsyncTask");
442
0
    if (!IS_CONCRETE(buffer) || REPR(buffer)->ID != MVM_REPR_ID_VMArray)
443
0
        MVM_exception_throw_adhoc(tc, "asyncwritebytesto requires a native array to read from");
444
0
    if (((MVMArrayREPRData *)STABLE(buffer)->REPR_data)->slot_type != MVM_ARRAY_U8
445
0
        && ((MVMArrayREPRData *)STABLE(buffer)->REPR_data)->slot_type != MVM_ARRAY_I8)
446
0
        MVM_exception_throw_adhoc(tc, "asyncwritebytesto requires a native array of uint8 or int8");
447
0
448
0
    /* Resolve destination. */
449
0
    dest_addr = MVM_io_resolve_host_name(tc, host, port);
450
0
451
0
    /* Create async task handle. */
452
0
    MVMROOT(tc, queue, {
453
0
    MVMROOT(tc, schedulee, {
454
0
    MVMROOT(tc, h, {
455
0
    MVMROOT(tc, buffer, {
456
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
457
0
    });
458
0
    });
459
0
    });
460
0
    });
461
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
462
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
463
0
    task->body.ops  = &write_op_table;
464
0
    wi              = MVM_calloc(1, sizeof(WriteInfo));
465
0
    MVM_ASSIGN_REF(tc, &(task->common.header), wi->handle, h);
466
0
    MVM_ASSIGN_REF(tc, &(task->common.header), wi->buf_data, buffer);
467
0
    wi->dest_addr = dest_addr;
468
0
    task->body.data = wi;
469
0
470
0
    /* Hand the task off to the event loop. */
471
0
    MVMROOT(tc, task, {
472
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
473
0
    });
474
0
475
0
    return task;
476
0
}
477
478
/* Does an asynchronous close (since it must run on the event loop). */
479
0
static void close_perform(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
480
0
    uv_handle_t *handle = (uv_handle_t *)data;
481
0
482
0
    if (uv_is_closing(handle))
483
0
        MVM_exception_throw_adhoc(tc, "cannot close a closed socket");
484
0
485
0
    uv_close(handle, free_on_close_cb);
486
0
}
487
488
/* Operations table for async close task. */
489
static const MVMAsyncTaskOps close_op_table = {
490
    close_perform,
491
    NULL,
492
    NULL,
493
    NULL
494
};
495
496
0
static MVMint64 close_socket(MVMThreadContext *tc, MVMOSHandle *h) {
497
0
    MVMIOAsyncUDPSocketData *data = (MVMIOAsyncUDPSocketData *)h->body.data;
498
0
    MVMAsyncTask *task;
499
0
500
0
    MVMROOT(tc, h, {
501
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc,
502
0
            tc->instance->boot_types.BOOTAsync);
503
0
    });
504
0
    task->body.ops  = &close_op_table;
505
0
    task->body.data = data->handle;
506
0
    MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
507
0
508
0
    return 0;
509
0
}
510
511
0
static void gc_free(MVMThreadContext *tc, MVMObject *h, void *d) {
512
0
    MVMIOAsyncUDPSocketData *data = (MVMIOAsyncUDPSocketData *)d;
513
0
    if (data->ds) {
514
0
        MVM_string_decodestream_destroy(tc, data->ds);
515
0
        data->ds = NULL;
516
0
    }
517
0
}
518
519
/* IO ops table, populated with functions. */
520
static const MVMIOClosable        closable          = { close_socket };
521
static const MVMIOAsyncReadable   async_readable    = { read_chars, read_bytes };
522
static const MVMIOAsyncWritableTo async_writable_to = { write_str_to, write_bytes_to };
523
static const MVMIOOps op_table = {
524
    &closable,
525
    NULL,
526
    NULL,
527
    NULL,
528
    &async_readable,
529
    NULL,
530
    &async_writable_to,
531
    NULL,
532
    NULL,
533
    NULL,
534
    NULL,
535
    NULL,
536
    NULL,
537
    gc_free
538
};
539
540
/* Info we convey about a socket setup task. */
541
typedef struct {
542
    struct sockaddr  *bind_addr;
543
    MVMint64          flags;
544
} SocketSetupInfo;
545
546
/* Initilalize the UDP socket on the event loop. */
547
0
static void setup_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
548
0
    /* Set up the UDP handle. */
549
0
    SocketSetupInfo *ssi = (SocketSetupInfo *)data;
550
0
    uv_udp_t *udp_handle = MVM_malloc(sizeof(uv_udp_t));
551
0
    int r;
552
0
    if ((r = uv_udp_init(loop, udp_handle)) >= 0) {
553
0
        if (ssi->bind_addr)
554
0
            r = uv_udp_bind(udp_handle, ssi->bind_addr, 0);
555
0
        if (r >= 0 && (ssi->flags & 1))
556
0
            r = uv_udp_set_broadcast(udp_handle, 1);
557
0
    }
558
0
559
0
    if (r >= 0) {
560
0
        /* UDP handle initialized; wrap it up in an I/O handle and send. */
561
0
        MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
562
0
        MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
563
0
        MVM_repr_push_o(tc, arr, t->body.schedulee);
564
0
        MVMROOT(tc, arr, {
565
0
        MVMROOT(tc, t, {
566
0
            MVMOSHandle          *result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO);
567
0
            MVMIOAsyncUDPSocketData *data   = MVM_calloc(1, sizeof(MVMIOAsyncUDPSocketData));
568
0
            data->handle                 = udp_handle;
569
0
            result->body.ops             = &op_table;
570
0
            result->body.data            = data;
571
0
            MVM_repr_push_o(tc, arr, (MVMObject *)result);
572
0
        });
573
0
        });
574
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
575
0
        MVM_repr_push_o(tc, t->body.queue, arr);
576
0
    }
577
0
    else {
578
0
        /* Something failed; need to notify. */
579
0
        MVMROOT(tc, async_task, {
580
0
            MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
581
0
            MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
582
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
583
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO);
584
0
            MVMROOT(tc, arr, {
585
0
            MVMROOT(tc, t, {
586
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
587
0
                    tc->instance->VMString, uv_strerror(r));
588
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
589
0
                    tc->instance->boot_types.BOOTStr, msg_str);
590
0
                MVM_repr_push_o(tc, arr, msg_box);
591
0
            });
592
0
            });
593
0
            MVM_repr_push_o(tc, t->body.queue, arr);
594
0
            uv_close((uv_handle_t *)udp_handle, free_on_close_cb);
595
0
        });
596
0
    }
597
0
}
598
599
/* Frees info for a connection task. */
600
0
static void setup_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
601
0
    if (data) {
602
0
        SocketSetupInfo *ssi = (SocketSetupInfo *)data;
603
0
        if (ssi->bind_addr)
604
0
            MVM_free(ssi->bind_addr);
605
0
        MVM_free(ssi);
606
0
    }
607
0
}
608
609
/* Operations table for async connect task. */
610
static const MVMAsyncTaskOps setup_op_table = {
611
    setup_setup,
612
    NULL,
613
    NULL,
614
    setup_gc_free
615
};
616
617
/* Creates a UDP socket and binds it to the specified host/port. */
618
MVMObject * MVM_io_socket_udp_async(MVMThreadContext *tc, MVMObject *queue,
619
                                    MVMObject *schedulee, MVMString *host,
620
                                    MVMint64 port, MVMint64 flags,
621
0
                                    MVMObject *async_type) {
622
0
    MVMAsyncTask    *task;
623
0
    SocketSetupInfo *ssi;
624
0
    struct sockaddr *bind_addr = NULL;
625
0
626
0
    /* Validate REPRs. */
627
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
628
0
        MVM_exception_throw_adhoc(tc,
629
0
            "asyncudp target queue must have ConcBlockingQueue REPR");
630
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
631
0
        MVM_exception_throw_adhoc(tc,
632
0
            "asyncudp result type must have REPR AsyncTask");
633
0
634
0
    /* Resolve hostname. (Could be done asynchronously too.) */
635
0
    if (host && IS_CONCRETE(host))
636
0
        bind_addr = MVM_io_resolve_host_name(tc, host, port);
637
0
638
0
    /* Create async task handle. */
639
0
    MVMROOT(tc, queue, {
640
0
    MVMROOT(tc, schedulee, {
641
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
642
0
    });
643
0
    });
644
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
645
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
646
0
    task->body.ops  = &setup_op_table;
647
0
    ssi             = MVM_calloc(1, sizeof(SocketSetupInfo));
648
0
    ssi->bind_addr  = bind_addr;
649
0
    ssi->flags      = flags;
650
0
    task->body.data = ssi;
651
0
652
0
    /* Hand the task off to the event loop. */
653
0
    MVMROOT(tc, task, {
654
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
655
0
    });
656
0
657
0
    return (MVMObject *)task;
658
0
}