Coverage Report

Created: 2018-07-03 15:31

/home/travis/build/MoarVM/MoarVM/src/io/asyncsocketudp.c
Line
Count
Source (jump to first uncovered line)
1
#include "moar.h"
2
3
/* Number of bytes we accept per read. */
4
#define CHUNK_SIZE 65536
5
6
/* Data that we keep for an asynchronous UDP socket handle. */
7
typedef struct {
8
    /* The libuv handle to the socket. */
9
    uv_udp_t *handle;
10
} MVMIOAsyncUDPSocketData;
11
12
/* Info we convey about a read task. */
13
typedef struct {
14
    MVMOSHandle      *handle;
15
    MVMObject        *buf_type;
16
    int               seq_number;
17
    MVMThreadContext *tc;
18
    int               work_idx;
19
} ReadInfo;
20
21
/* Allocates a buffer of the suggested size. */
22
0
static void on_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
23
0
    size_t size = suggested_size > 0 ? suggested_size : 4;
24
0
    buf->base   = MVM_malloc(size);
25
0
    buf->len    = size;
26
0
}
27
28
/* Callback used to simply free memory on close. */
29
0
static void free_on_close_cb(uv_handle_t *handle) {
30
0
    MVM_free(handle);
31
0
}
32
33
/* XXX this is duplicated from asyncsocket.c; put it in some shared file */
34
0
static void push_name_and_port(MVMThreadContext *tc, struct sockaddr_storage *name, MVMObject *arr) {
35
0
    char addrstr[INET6_ADDRSTRLEN + 1];
36
0
    /* XXX windows support kludge. 64 bit is much too big, but we'll
37
0
     * get the proper data from the struct anyway, however windows
38
0
     * decides to declare it. */
39
0
    MVMuint64 port;
40
0
    MVMObject *host_o;
41
0
    MVMObject *port_o;
42
0
    if (name) {
43
0
        switch (name->ss_family) {
44
0
            case AF_INET6: {
45
0
                uv_ip6_name((struct sockaddr_in6*)name, addrstr, INET6_ADDRSTRLEN + 1);
46
0
                port = ntohs(((struct sockaddr_in6*)name)->sin6_port);
47
0
                break;
48
0
            }
49
0
            case AF_INET: {
50
0
                uv_ip4_name((struct sockaddr_in*)name, addrstr, INET6_ADDRSTRLEN + 1);
51
0
                port = ntohs(((struct sockaddr_in*)name)->sin_port);
52
0
                break;
53
0
            }
54
0
            default:
55
0
                MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
56
0
                MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
57
0
                return;
58
0
                break;
59
0
        }
60
0
        MVMROOT(tc, arr, {
61
0
            port_o = MVM_repr_box_int(tc, tc->instance->boot_types.BOOTInt, port);
62
0
            MVMROOT(tc, port_o, {
63
0
                host_o = (MVMObject *)MVM_repr_box_str(tc, tc->instance->boot_types.BOOTStr,
64
0
                        MVM_string_ascii_decode_nt(tc, tc->instance->VMString, addrstr));
65
0
            });
66
0
        });
67
0
    } else {
68
0
        host_o = tc->instance->boot_types.BOOTStr;
69
0
        port_o = tc->instance->boot_types.BOOTInt;
70
0
    }
71
0
    MVM_repr_push_o(tc, arr, host_o);
72
0
    MVM_repr_push_o(tc, arr, port_o);
73
0
}
74
75
/* Read handler. */
76
0
static void on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
77
0
    ReadInfo         *ri  = (ReadInfo *)handle->data;
78
0
    MVMThreadContext *tc  = ri->tc;
79
0
    MVMObject        *arr;
80
0
    MVMAsyncTask     *t;
81
0
82
0
    /* libuv will call on_read once after all datagram read operations
83
0
     * to "give us back a buffer". in that case, nread and addr are NULL.
84
0
     * This is an artifact of the underlying implementation and we shouldn't
85
0
     * pass it through to the user. */
86
0
87
0
    if (nread == 0 && addr == NULL)
88
0
        return;
89
0
90
0
    arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
91
0
    t = MVM_io_eventloop_get_active_work(tc, ri->work_idx);
92
0
93
0
    MVM_repr_push_o(tc, arr, t->body.schedulee);
94
0
    if (nread >= 0) {
95
0
        MVMROOT2(tc, t, arr, {
96
0
            MVMArray *res_buf;
97
0
98
0
            /* Push the sequence number. */
99
0
            MVMObject *seq_boxed = MVM_repr_box_int(tc,
100
0
                tc->instance->boot_types.BOOTInt, ri->seq_number++);
101
0
            MVM_repr_push_o(tc, arr, seq_boxed);
102
0
103
0
            /* Produce a buffer and push it. */
104
0
            res_buf      = (MVMArray *)MVM_repr_alloc_init(tc, ri->buf_type);
105
0
            res_buf->body.slots.i8 = (MVMint8 *)buf->base;
106
0
            res_buf->body.start    = 0;
107
0
            res_buf->body.ssize    = buf->len;
108
0
            res_buf->body.elems    = nread;
109
0
            MVM_repr_push_o(tc, arr, (MVMObject *)res_buf);
110
0
111
0
            /* next, no error. */
112
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
113
0
114
0
            /* and finally, address and port */
115
0
            push_name_and_port(tc, (struct sockaddr_storage *)addr, arr);
116
0
        });
117
0
    }
118
0
    else if (nread == UV_EOF) {
119
0
        MVMROOT2(tc, t, arr, {
120
0
            MVMObject *final = MVM_repr_box_int(tc,
121
0
                tc->instance->boot_types.BOOTInt, ri->seq_number);
122
0
            MVM_repr_push_o(tc, arr, final);
123
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
124
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
125
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
126
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
127
0
        });
128
0
        if (buf->base)
129
0
            MVM_free(buf->base);
130
0
        uv_udp_recv_stop(handle);
131
0
        MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx));
132
0
    }
133
0
    else {
134
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
135
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
136
0
        MVMROOT2(tc, t, arr, {
137
0
            MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
138
0
                tc->instance->VMString, uv_strerror(nread));
139
0
            MVMObject *msg_box = MVM_repr_box_str(tc,
140
0
                tc->instance->boot_types.BOOTStr, msg_str);
141
0
            MVM_repr_push_o(tc, arr, msg_box);
142
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
143
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
144
0
        });
145
0
        if (buf->base)
146
0
            MVM_free(buf->base);
147
0
        uv_udp_recv_stop(handle);
148
0
        MVM_io_eventloop_remove_active_work(tc, &(ri->work_idx));
149
0
    }
150
0
    MVM_repr_push_o(tc, t->body.queue, arr);
151
0
}
152
153
/* Does setup work for setting up asynchronous reads. */
154
0
static void read_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
155
0
    MVMIOAsyncUDPSocketData *handle_data;
156
0
    int                   r;
157
0
158
0
    /* Add to work in progress. */
159
0
    ReadInfo *ri  = (ReadInfo *)data;
160
0
    ri->tc        = tc;
161
0
    ri->work_idx  = MVM_io_eventloop_add_active_work(tc, async_task);
162
0
163
0
    /* Start reading the stream. */
164
0
    handle_data = (MVMIOAsyncUDPSocketData *)ri->handle->body.data;
165
0
    handle_data->handle->data = data;
166
0
    if ((r = uv_udp_recv_start(handle_data->handle, on_alloc, on_read)) < 0) {
167
0
        /* Error; need to notify. */
168
0
        MVMROOT(tc, async_task, {
169
0
            MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
170
0
            MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
171
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
172
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
173
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
174
0
            MVMROOT(tc, arr, {
175
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
176
0
                    tc->instance->VMString, uv_strerror(r));
177
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
178
0
                    tc->instance->boot_types.BOOTStr, msg_str);
179
0
                MVM_repr_push_o(tc, arr, msg_box);
180
0
            });
181
0
            MVM_repr_push_o(tc, t->body.queue, arr);
182
0
        });
183
0
    }
184
0
}
185
186
/* Marks objects for a read task. */
187
0
static void read_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) {
188
0
    ReadInfo *ri = (ReadInfo *)data;
189
0
    MVM_gc_worklist_add(tc, worklist, &ri->buf_type);
190
0
    MVM_gc_worklist_add(tc, worklist, &ri->handle);
191
0
}
192
193
/* Frees info for a read task. */
194
0
static void read_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
195
0
    if (data)
196
0
        MVM_free(data);
197
0
}
198
199
/* Operations table for async read task. */
200
static const MVMAsyncTaskOps read_op_table = {
201
    read_setup,
202
    NULL,
203
    NULL,
204
    read_gc_mark,
205
    read_gc_free
206
};
207
208
static MVMAsyncTask * read_bytes(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue,
209
0
                                 MVMObject *schedulee, MVMObject *buf_type, MVMObject *async_type) {
210
0
    MVMAsyncTask *task;
211
0
    ReadInfo    *ri;
212
0
213
0
    /* Validate REPRs. */
214
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
215
0
        MVM_exception_throw_adhoc(tc,
216
0
            "asyncreadbytes target queue must have ConcBlockingQueue REPR (got %s)",
217
0
             MVM_6model_get_stable_debug_name(tc, queue->st));
218
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
219
0
        MVM_exception_throw_adhoc(tc,
220
0
            "asyncreadbytes result type must have REPR AsyncTask");
221
0
    if (REPR(buf_type)->ID == MVM_REPR_ID_VMArray) {
222
0
        MVMint32 slot_type = ((MVMArrayREPRData *)STABLE(buf_type)->REPR_data)->slot_type;
223
0
        if (slot_type != MVM_ARRAY_U8 && slot_type != MVM_ARRAY_I8)
224
0
            MVM_exception_throw_adhoc(tc, "asyncreadbytes buffer type must be an array of uint8 or int8");
225
0
    }
226
0
    else {
227
0
        MVM_exception_throw_adhoc(tc, "asyncreadbytes buffer type must be an array");
228
0
    }
229
0
230
0
    /* Create async task handle. */
231
0
    MVMROOT4(tc, queue, schedulee, h, buf_type, {
232
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
233
0
    });
234
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
235
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
236
0
    task->body.ops  = &read_op_table;
237
0
    ri              = MVM_calloc(1, sizeof(ReadInfo));
238
0
    MVM_ASSIGN_REF(tc, &(task->common.header), ri->buf_type, buf_type);
239
0
    MVM_ASSIGN_REF(tc, &(task->common.header), ri->handle, h);
240
0
    task->body.data = ri;
241
0
242
0
    /* Hand the task off to the event loop. */
243
0
    MVMROOT(tc, task, {
244
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
245
0
    });
246
0
247
0
    return task;
248
0
}
249
250
/* Info we convey about a write task. */
251
typedef struct {
252
    MVMOSHandle      *handle;
253
    MVMObject        *buf_data;
254
    uv_udp_send_t    *req;
255
    uv_buf_t          buf;
256
    MVMThreadContext *tc;
257
    int               work_idx;
258
    struct sockaddr  *dest_addr;
259
} WriteInfo;
260
261
/* Completion handler for an asynchronous write. */
262
0
static void on_write(uv_udp_send_t *req, int status) {
263
0
    WriteInfo        *wi  = (WriteInfo *)req->data;
264
0
    MVMThreadContext *tc  = wi->tc;
265
0
    MVMObject        *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
266
0
    MVMAsyncTask     *t   = MVM_io_eventloop_get_active_work(tc, wi->work_idx);
267
0
    MVM_repr_push_o(tc, arr, t->body.schedulee);
268
0
    if (status >= 0) {
269
0
        MVMROOT2(tc, arr, t, {
270
0
            MVMObject *bytes_box = MVM_repr_box_int(tc,
271
0
                tc->instance->boot_types.BOOTInt,
272
0
                wi->buf.len);
273
0
            MVM_repr_push_o(tc, arr, bytes_box);
274
0
        });
275
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
276
0
    }
277
0
    else {
278
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
279
0
        MVMROOT2(tc, arr, t, {
280
0
            MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
281
0
                tc->instance->VMString, uv_strerror(status));
282
0
            MVMObject *msg_box = MVM_repr_box_str(tc,
283
0
                tc->instance->boot_types.BOOTStr, msg_str);
284
0
            MVM_repr_push_o(tc, arr, msg_box);
285
0
        });
286
0
    }
287
0
    MVM_repr_push_o(tc, t->body.queue, arr);
288
0
    MVM_free(wi->req);
289
0
    MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx));
290
0
}
291
292
/* Does setup work for an asynchronous write. */
293
0
static void write_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
294
0
    MVMIOAsyncUDPSocketData *handle_data;
295
0
    MVMArray                *buffer;
296
0
    char                    *output;
297
0
    int                      output_size, r;
298
0
299
0
    /* Add to work in progress. */
300
0
    WriteInfo *wi = (WriteInfo *)data;
301
0
    wi->tc        = tc;
302
0
    wi->work_idx  = MVM_io_eventloop_add_active_work(tc, async_task);
303
0
304
0
    /* Extract buf data. */
305
0
    buffer = (MVMArray *)wi->buf_data;
306
0
    output = (char *)(buffer->body.slots.i8 + buffer->body.start);
307
0
    output_size = (int)buffer->body.elems;
308
0
309
0
    /* Create and initialize write request. */
310
0
    wi->req           = MVM_malloc(sizeof(uv_udp_send_t));
311
0
    wi->buf           = uv_buf_init(output, output_size);
312
0
    wi->req->data     = data;
313
0
    handle_data       = (MVMIOAsyncUDPSocketData *)wi->handle->body.data;
314
0
315
0
    if (uv_is_closing((uv_handle_t *)handle_data->handle))
316
0
        MVM_exception_throw_adhoc(tc, "cannot write to a closed socket");
317
0
318
0
    if ((r = uv_udp_send(wi->req, handle_data->handle, &(wi->buf), 1, wi->dest_addr, on_write)) < 0) {
319
0
        /* Error; need to notify. */
320
0
        MVMROOT(tc, async_task, {
321
0
            MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
322
0
            MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
323
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
324
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTInt);
325
0
            MVMROOT(tc, arr, {
326
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
327
0
                    tc->instance->VMString, uv_strerror(r));
328
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
329
0
                    tc->instance->boot_types.BOOTStr, msg_str);
330
0
                MVM_repr_push_o(tc, arr, msg_box);
331
0
            });
332
0
            MVM_repr_push_o(tc, t->body.queue, arr);
333
0
        });
334
0
335
0
        /* Cleanup handle. */
336
0
        MVM_free(wi->req);
337
0
        wi->req = NULL;
338
0
        MVM_io_eventloop_remove_active_work(tc, &(wi->work_idx));
339
0
    }
340
0
}
341
342
/* Marks objects for a write task. */
343
0
static void write_gc_mark(MVMThreadContext *tc, void *data, MVMGCWorklist *worklist) {
344
0
    WriteInfo *wi = (WriteInfo *)data;
345
0
    MVM_gc_worklist_add(tc, worklist, &wi->handle);
346
0
    MVM_gc_worklist_add(tc, worklist, &wi->buf_data);
347
0
}
348
349
/* Frees info for a write task. */
350
0
static void write_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
351
0
    if (data) {
352
0
        WriteInfo *wi = (WriteInfo *)data;
353
0
        if (wi->dest_addr)
354
0
            MVM_free(wi->dest_addr);
355
0
        MVM_free(data);
356
0
    }
357
0
}
358
359
/* Operations table for async write task. */
360
static const MVMAsyncTaskOps write_op_table = {
361
    write_setup,
362
    NULL,
363
    NULL,
364
    write_gc_mark,
365
    write_gc_free
366
};
367
368
static MVMAsyncTask * write_bytes_to(MVMThreadContext *tc, MVMOSHandle *h, MVMObject *queue,
369
                                     MVMObject *schedulee, MVMObject *buffer, MVMObject *async_type,
370
0
                                     MVMString *host, MVMint64 port) {
371
0
    MVMAsyncTask    *task;
372
0
    WriteInfo       *wi;
373
0
    struct sockaddr *dest_addr;
374
0
375
0
    /* Validate REPRs. */
376
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
377
0
        MVM_exception_throw_adhoc(tc,
378
0
            "asyncwritebytesto target queue must have ConcBlockingQueue REPR");
379
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
380
0
        MVM_exception_throw_adhoc(tc,
381
0
            "asyncwritebytesto result type must have REPR AsyncTask");
382
0
    if (!IS_CONCRETE(buffer) || REPR(buffer)->ID != MVM_REPR_ID_VMArray)
383
0
        MVM_exception_throw_adhoc(tc, "asyncwritebytesto requires a native array to read from");
384
0
    if (((MVMArrayREPRData *)STABLE(buffer)->REPR_data)->slot_type != MVM_ARRAY_U8
385
0
        && ((MVMArrayREPRData *)STABLE(buffer)->REPR_data)->slot_type != MVM_ARRAY_I8)
386
0
        MVM_exception_throw_adhoc(tc, "asyncwritebytesto requires a native array of uint8 or int8");
387
0
388
0
    /* Resolve destination and create async task handle. */
389
0
    MVMROOT4(tc, queue, schedulee, h, buffer, {
390
0
        MVMROOT(tc, async_type, {
391
0
            dest_addr = MVM_io_resolve_host_name(tc, host, port);
392
0
        });
393
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
394
0
    });
395
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
396
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
397
0
    task->body.ops  = &write_op_table;
398
0
    wi              = MVM_calloc(1, sizeof(WriteInfo));
399
0
    MVM_ASSIGN_REF(tc, &(task->common.header), wi->handle, h);
400
0
    MVM_ASSIGN_REF(tc, &(task->common.header), wi->buf_data, buffer);
401
0
    wi->dest_addr = dest_addr;
402
0
    task->body.data = wi;
403
0
404
0
    /* Hand the task off to the event loop. */
405
0
    MVMROOT(tc, task, {
406
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
407
0
    });
408
0
409
0
    return task;
410
0
}
411
412
/* Does an asynchronous close (since it must run on the event loop). */
413
0
static void close_perform(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
414
0
    uv_handle_t *handle = (uv_handle_t *)data;
415
0
416
0
    if (uv_is_closing(handle))
417
0
        MVM_exception_throw_adhoc(tc, "cannot close a closed socket");
418
0
419
0
    uv_close(handle, free_on_close_cb);
420
0
}
421
422
/* Operations table for async close task. */
423
static const MVMAsyncTaskOps close_op_table = {
424
    close_perform,
425
    NULL,
426
    NULL,
427
    NULL,
428
    NULL
429
};
430
431
0
static MVMint64 close_socket(MVMThreadContext *tc, MVMOSHandle *h) {
432
0
    MVMIOAsyncUDPSocketData *data = (MVMIOAsyncUDPSocketData *)h->body.data;
433
0
    MVMAsyncTask *task;
434
0
435
0
    MVMROOT(tc, h, {
436
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc,
437
0
            tc->instance->boot_types.BOOTAsync);
438
0
    });
439
0
    task->body.ops  = &close_op_table;
440
0
    task->body.data = data->handle;
441
0
    MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
442
0
443
0
    return 0;
444
0
}
445
446
/* IO ops table, populated with functions. */
447
static const MVMIOClosable        closable          = { close_socket };
448
static const MVMIOAsyncReadable   async_readable    = { read_bytes };
449
static const MVMIOAsyncWritableTo async_writable_to = { write_bytes_to };
450
static const MVMIOOps op_table = {
451
    &closable,
452
    NULL,
453
    NULL,
454
    &async_readable,
455
    NULL,
456
    &async_writable_to,
457
    NULL,
458
    NULL,
459
    NULL,
460
    NULL,
461
    NULL,
462
    NULL,
463
    NULL,
464
    NULL
465
};
466
467
/* Info we convey about a socket setup task. */
468
typedef struct {
469
    struct sockaddr  *bind_addr;
470
    MVMint64          flags;
471
} SocketSetupInfo;
472
473
/* Initilalize the UDP socket on the event loop. */
474
0
static void setup_setup(MVMThreadContext *tc, uv_loop_t *loop, MVMObject *async_task, void *data) {
475
0
    /* Set up the UDP handle. */
476
0
    SocketSetupInfo *ssi = (SocketSetupInfo *)data;
477
0
    uv_udp_t *udp_handle = MVM_malloc(sizeof(uv_udp_t));
478
0
    int r;
479
0
    if ((r = uv_udp_init(loop, udp_handle)) >= 0) {
480
0
        if (ssi->bind_addr)
481
0
            r = uv_udp_bind(udp_handle, ssi->bind_addr, 0);
482
0
        if (r >= 0 && (ssi->flags & 1))
483
0
            r = uv_udp_set_broadcast(udp_handle, 1);
484
0
    }
485
0
486
0
    if (r >= 0) {
487
0
        /* UDP handle initialized; wrap it up in an I/O handle and send. */
488
0
        MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
489
0
        MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
490
0
        MVM_repr_push_o(tc, arr, t->body.schedulee);
491
0
        MVMROOT2(tc, arr, t, {
492
0
            MVMOSHandle          *result = (MVMOSHandle *)MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTIO);
493
0
            MVMIOAsyncUDPSocketData *data   = MVM_calloc(1, sizeof(MVMIOAsyncUDPSocketData));
494
0
            data->handle                 = udp_handle;
495
0
            result->body.ops             = &op_table;
496
0
            result->body.data            = data;
497
0
            MVM_repr_push_o(tc, arr, (MVMObject *)result);
498
0
        });
499
0
        MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTStr);
500
0
        MVM_repr_push_o(tc, t->body.queue, arr);
501
0
    }
502
0
    else {
503
0
        /* Something failed; need to notify. */
504
0
        MVMROOT(tc, async_task, {
505
0
            MVMObject    *arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
506
0
            MVMAsyncTask *t   = (MVMAsyncTask *)async_task;
507
0
            MVM_repr_push_o(tc, arr, t->body.schedulee);
508
0
            MVM_repr_push_o(tc, arr, tc->instance->boot_types.BOOTIO);
509
0
            MVMROOT2(tc, arr, t, {
510
0
                MVMString *msg_str = MVM_string_ascii_decode_nt(tc,
511
0
                    tc->instance->VMString, uv_strerror(r));
512
0
                MVMObject *msg_box = MVM_repr_box_str(tc,
513
0
                    tc->instance->boot_types.BOOTStr, msg_str);
514
0
                MVM_repr_push_o(tc, arr, msg_box);
515
0
            });
516
0
            MVM_repr_push_o(tc, t->body.queue, arr);
517
0
            uv_close((uv_handle_t *)udp_handle, free_on_close_cb);
518
0
        });
519
0
    }
520
0
}
521
522
/* Frees info for a connection task. */
523
0
static void setup_gc_free(MVMThreadContext *tc, MVMObject *t, void *data) {
524
0
    if (data) {
525
0
        SocketSetupInfo *ssi = (SocketSetupInfo *)data;
526
0
        if (ssi->bind_addr)
527
0
            MVM_free(ssi->bind_addr);
528
0
        MVM_free(ssi);
529
0
    }
530
0
}
531
532
/* Operations table for async connect task. */
533
static const MVMAsyncTaskOps setup_op_table = {
534
    setup_setup,
535
    NULL,
536
    NULL,
537
    NULL,
538
    setup_gc_free
539
};
540
541
/* Creates a UDP socket and binds it to the specified host/port. */
542
MVMObject * MVM_io_socket_udp_async(MVMThreadContext *tc, MVMObject *queue,
543
                                    MVMObject *schedulee, MVMString *host,
544
                                    MVMint64 port, MVMint64 flags,
545
0
                                    MVMObject *async_type) {
546
0
    MVMAsyncTask    *task;
547
0
    SocketSetupInfo *ssi;
548
0
    struct sockaddr *bind_addr = NULL;
549
0
550
0
    /* Validate REPRs. */
551
0
    if (REPR(queue)->ID != MVM_REPR_ID_ConcBlockingQueue)
552
0
        MVM_exception_throw_adhoc(tc,
553
0
            "asyncudp target queue must have ConcBlockingQueue REPR");
554
0
    if (REPR(async_type)->ID != MVM_REPR_ID_MVMAsyncTask)
555
0
        MVM_exception_throw_adhoc(tc,
556
0
            "asyncudp result type must have REPR AsyncTask");
557
0
558
0
    /* Resolve hostname. (Could be done asynchronously too.) */
559
0
    if (host && IS_CONCRETE(host)) {
560
0
        MVMROOT3(tc, queue, schedulee, async_type, {
561
0
            bind_addr = MVM_io_resolve_host_name(tc, host, port);
562
0
        });
563
0
    }
564
0
565
0
    /* Create async task handle. */
566
0
    MVMROOT2(tc, queue, schedulee, {
567
0
        task = (MVMAsyncTask *)MVM_repr_alloc_init(tc, async_type);
568
0
    });
569
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.queue, queue);
570
0
    MVM_ASSIGN_REF(tc, &(task->common.header), task->body.schedulee, schedulee);
571
0
    task->body.ops  = &setup_op_table;
572
0
    ssi             = MVM_calloc(1, sizeof(SocketSetupInfo));
573
0
    ssi->bind_addr  = bind_addr;
574
0
    ssi->flags      = flags;
575
0
    task->body.data = ssi;
576
0
577
0
    /* Hand the task off to the event loop. */
578
0
    MVMROOT(tc, task, {
579
0
        MVM_io_eventloop_queue_work(tc, (MVMObject *)task);
580
0
    });
581
0
582
0
    return (MVMObject *)task;
583
0
}