Coverage Report

Created: 2018-07-03 15:31

/home/travis/build/MoarVM/MoarVM/src/io/eventloop.c
Line
Count
Source (jump to first uncovered line)
1
#include "moar.h"
2
3
/* Asynchronous I/O, timers, file system notifications and signal handlers
4
 * have their callbacks processed by this event loop. Its job is mostly to
5
 * fire off work, receive the callbacks, and put stuff into the concurrent
6
 * work queue of some scheduler or other. It's backed by a thread that is
7
 * started in the usual way, but never actually ends up in interpreter;
8
 * instead, it enters a libuv event loop "forever", until program exit.
9
 *
10
 * Work is sent to the event loop by
11
 */
12
13
/* Sets up an async task to be done on the loop. */
14
7
static void setup_work(MVMThreadContext *tc) {
15
7
    MVMConcBlockingQueue *queue = (MVMConcBlockingQueue *)tc->instance->event_loop_todo_queue;
16
7
    MVMObject *task_obj;
17
7
18
7
    MVMROOT(tc, queue, {
19
7
        while (!MVM_is_null(tc, task_obj = MVM_concblockingqueue_poll(tc, queue))) {
20
7
            MVMAsyncTask *task = (MVMAsyncTask *)task_obj;
21
7
            MVM_ASSERT_NOT_FROMSPACE(tc, task);
22
7
            if (task->body.state == MVM_ASYNC_TASK_STATE_NEW) {
23
7
                MVMROOT(tc, task, {
24
7
                    task->body.ops->setup(tc, tc->loop, task_obj, task->body.data);
25
7
                    task->body.state = MVM_ASYNC_TASK_STATE_SETUP;
26
7
                });
27
7
            }
28
7
        }
29
7
    });
30
7
}
31
32
/* Performs an async emit permit grant on the loop. */
33
7
static void permit_work(MVMThreadContext *tc) {
34
7
    MVMConcBlockingQueue *queue = (MVMConcBlockingQueue *)tc->instance->event_loop_permit_queue;
35
7
    MVMObject *task_arr;
36
7
37
7
    MVMROOT(tc, queue, {
38
7
        while (!MVM_is_null(tc, task_arr = MVM_concblockingqueue_poll(tc, queue))) {
39
7
            MVMObject *task_obj = MVM_repr_at_pos_o(tc, task_arr, 0);
40
7
            MVMAsyncTask *task = (MVMAsyncTask *)task_obj;
41
7
            MVM_ASSERT_NOT_FROMSPACE(tc, task);
42
7
            if (task->body.ops->permit) {
43
7
                MVMint64 channel = MVM_repr_get_int(tc, MVM_repr_at_pos_o(tc, task_arr, 1));
44
7
                MVMint64 permit = MVM_repr_get_int(tc, MVM_repr_at_pos_o(tc, task_arr, 2));
45
7
                task->body.ops->permit(tc, tc->loop, task_obj, task->body.data, channel, permit);
46
7
            }
47
7
        }
48
7
    });
49
7
}
50
51
/* Performs an async cancellation on the loop. */
52
7
static void cancel_work(MVMThreadContext *tc) {
53
7
    MVMConcBlockingQueue *queue = (MVMConcBlockingQueue *)tc->instance->event_loop_cancel_queue;
54
7
    MVMObject *task_obj;
55
7
56
7
    MVMROOT(tc, queue, {
57
7
        while (!MVM_is_null(tc, task_obj = MVM_concblockingqueue_poll(tc, queue))) {
58
7
            MVMAsyncTask *task = (MVMAsyncTask *)task_obj;
59
7
            MVM_ASSERT_NOT_FROMSPACE(tc, task);
60
7
            if (task->body.state == MVM_ASYNC_TASK_STATE_SETUP) {
61
7
                MVMROOT(tc, task, {
62
7
                    if (task->body.ops->cancel)
63
7
                        task->body.ops->cancel(tc, tc->loop, task_obj, task->body.data);
64
7
                });
65
7
            }
66
7
            task->body.state = MVM_ASYNC_TASK_STATE_CANCELLED;
67
7
        }
68
7
    });
69
7
}
70
71
/* Fired whenever we were signalled that there is a new task or a new
72
 * cancellation for the event loop to process. */
73
7
static void async_handler(uv_async_t *handle) {
74
7
    MVMThreadContext *tc = (MVMThreadContext *)handle->data;
75
7
    GC_SYNC_POINT(tc);
76
7
    setup_work(tc);
77
7
    permit_work(tc);
78
7
    cancel_work(tc);
79
7
}
80
81
/* Enters the event loop. */
82
3
static void enter_loop(MVMThreadContext *tc, MVMCallsite *callsite, MVMRegister *args) {
83
3
    uv_async_t   *async;
84
3
85
3
    /* Set up async handler so we can be woken up when there's new tasks. */
86
3
    async = MVM_malloc(sizeof(uv_async_t));
87
3
    if (uv_async_init(tc->loop, async, async_handler) != 0)
88
0
        MVM_panic(1, "Unable to initialize async wake-up handle for event loop");
89
3
    async->data = tc;
90
3
    tc->instance->event_loop_wakeup = async;
91
3
92
3
    /* Signal that the event loop is ready for processing. */
93
3
    uv_sem_post(&(tc->instance->sem_event_loop_started));
94
3
95
3
    /* Enter event loop; should never leave it. */
96
3
    uv_run(tc->loop, UV_RUN_DEFAULT);
97
3
    MVM_panic(1, "Supposedly unending event loop thread ended");
98
3
}
99
100
/* Sees if we have an event loop processing thread set up already, and
101
 * sets it up if not. */
102
21
static uv_loop_t *get_or_vivify_loop(MVMThreadContext *tc) {
103
21
    MVMInstance *instance = tc->instance;
104
21
105
21
    if (!instance->event_loop_thread) {
106
3
        /* Grab starting mutex and ensure we didn't lose the race. */
107
3
        MVM_telemetry_timestamp(tc, "hoping to start an event loop thread");
108
3
        MVM_gc_mark_thread_blocked(tc);
109
3
        uv_mutex_lock(&instance->mutex_event_loop_start);
110
3
        MVM_gc_mark_thread_unblocked(tc);
111
3
        if (!instance->event_loop_thread) {
112
3
            MVMObject *thread, *loop_runner;
113
3
            int r;
114
3
            unsigned int interval_id;
115
3
116
3
            interval_id = MVM_telemetry_interval_start(tc, "creating the event loop thread");
117
3
118
3
            /* Create various bits of state the async event loop thread needs. */
119
3
            instance->event_loop_todo_queue   = MVM_repr_alloc_init(tc,
120
3
                instance->boot_types.BOOTQueue);
121
3
            instance->event_loop_permit_queue = MVM_repr_alloc_init(tc,
122
3
                instance->boot_types.BOOTQueue);
123
3
            instance->event_loop_cancel_queue = MVM_repr_alloc_init(tc,
124
3
                instance->boot_types.BOOTQueue);
125
3
            instance->event_loop_active       = MVM_repr_alloc_init(tc,
126
3
                instance->boot_types.BOOTArray);
127
3
128
3
            /* We need to wait until we know the event loop has started; we'll
129
3
             * use a semaphore for this purpose. */
130
3
            if ((r = uv_sem_init(&(instance->sem_event_loop_started), 0)) < 0) {
131
0
                uv_mutex_unlock(&instance->mutex_event_loop_start);
132
0
                MVM_exception_throw_adhoc(tc, "Failed to initialize event loop start semaphore: %s",
133
0
                    uv_strerror(r));
134
0
            }
135
3
136
3
            /* Start the event loop thread, which will call a C function that
137
3
             * sits in the uv loop, never leaving. */
138
3
            loop_runner = MVM_repr_alloc_init(tc, instance->boot_types.BOOTCCode);
139
3
            ((MVMCFunction *)loop_runner)->body.func = enter_loop;
140
3
            thread = MVM_thread_new(tc, loop_runner, 1);
141
3
            MVMROOT(tc, thread, {
142
3
                MVM_thread_run(tc, thread);
143
3
144
3
                /* Block until we know it's fully started and initialized. */
145
3
                MVM_gc_mark_thread_blocked(tc);
146
3
                uv_sem_wait(&(instance->sem_event_loop_started));
147
3
                MVM_gc_mark_thread_unblocked(tc);
148
3
                uv_sem_destroy(&(instance->sem_event_loop_started));
149
3
150
3
                /* Make the started event loop thread visible to others. */
151
3
                instance->event_loop_thread = ((MVMThread *)thread)->body.tc;
152
3
            });
153
3
154
3
            MVM_telemetry_interval_stop(tc, interval_id, "created the event loop thread");
155
3
        }
156
3
        uv_mutex_unlock(&instance->mutex_event_loop_start);
157
3
    }
158
21
159
21
    return instance->event_loop_thread->loop;
160
21
}
161
162
/* Adds a work item into the event loop work queue. */
163
7
void MVM_io_eventloop_queue_work(MVMThreadContext *tc, MVMObject *work) {
164
7
    MVMROOT(tc, work, {
165
7
        get_or_vivify_loop(tc);
166
7
        MVM_repr_push_o(tc, tc->instance->event_loop_todo_queue, work);
167
7
        uv_async_send(tc->instance->event_loop_wakeup);
168
7
    });
169
7
}
170
171
/* Permits an asynchronous task to emit more events. This is used to provide a
172
 * back-pressure mechanism. */
173
void MVM_io_eventloop_permit(MVMThreadContext *tc, MVMObject *task_obj,
174
14
                              MVMint64 channel, MVMint64 permits) {
175
14
    if (REPR(task_obj)->ID == MVM_REPR_ID_MVMOSHandle)
176
14
        task_obj = MVM_io_get_async_task_handle(tc, task_obj);
177
14
    if (REPR(task_obj)->ID == MVM_REPR_ID_MVMAsyncTask) {
178
14
        MVMROOT(tc, task_obj, {
179
14
            MVMObject *channel_box = NULL;
180
14
            MVMObject *permits_box = NULL;
181
14
            MVMObject *arr = NULL;
182
14
            MVMROOT3(tc, channel_box, permits_box, arr, {
183
14
                channel_box = MVM_repr_box_int(tc, tc->instance->boot_types.BOOTInt, channel);
184
14
                permits_box = MVM_repr_box_int(tc, tc->instance->boot_types.BOOTInt, permits);
185
14
                arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray);
186
14
                MVM_repr_push_o(tc, arr, task_obj);
187
14
                MVM_repr_push_o(tc, arr, channel_box);
188
14
                MVM_repr_push_o(tc, arr, permits_box);
189
14
                get_or_vivify_loop(tc);
190
14
                MVM_repr_push_o(tc, tc->instance->event_loop_permit_queue, arr);
191
14
                uv_async_send(tc->instance->event_loop_wakeup);
192
14
            });
193
14
        });
194
14
    }
195
0
    else {
196
0
        MVM_exception_throw_adhoc(tc, "Can only permit an AsyncTask handle");
197
0
    }
198
14
}
199
200
/* Cancels a piece of async work. */
201
void MVM_io_eventloop_cancel_work(MVMThreadContext *tc, MVMObject *task_obj,
202
0
        MVMObject *notify_queue, MVMObject *notify_schedulee) {
203
0
    if (REPR(task_obj)->ID == MVM_REPR_ID_MVMAsyncTask) {
204
0
        if (notify_queue && notify_schedulee) {
205
0
            MVMAsyncTask *task = (MVMAsyncTask *)task_obj;
206
0
            MVM_ASSIGN_REF(tc, &(task_obj->header), task->body.cancel_notify_queue,
207
0
                notify_queue);
208
0
            MVM_ASSIGN_REF(tc, &(task_obj->header), task->body.cancel_notify_schedulee,
209
0
                notify_schedulee);
210
0
        }
211
0
        MVMROOT(tc, task_obj, {
212
0
            get_or_vivify_loop(tc);
213
0
            MVM_repr_push_o(tc, tc->instance->event_loop_cancel_queue, task_obj);
214
0
            uv_async_send(tc->instance->event_loop_wakeup);
215
0
        });
216
0
    }
217
0
    else {
218
0
        MVM_exception_throw_adhoc(tc, "Can only cancel an AsyncTask handle");
219
0
    }
220
0
}
221
222
/* Sends a task cancellation notification if requested for the specified task. */
223
0
void MVM_io_eventloop_send_cancellation_notification(MVMThreadContext *tc, MVMAsyncTask *task) {
224
0
    MVMObject *notify_queue = task->body.cancel_notify_queue;
225
0
    MVMObject *notify_schedulee = task->body.cancel_notify_schedulee;
226
0
    if (notify_queue && notify_schedulee)
227
0
        MVM_repr_push_o(tc, notify_queue, notify_schedulee);
228
0
}
229
230
/* Adds a work item to the active async task set. */
231
7
int MVM_io_eventloop_add_active_work(MVMThreadContext *tc, MVMObject *async_task) {
232
7
    int work_idx = MVM_repr_elems(tc, tc->instance->event_loop_active);
233
7
    MVM_ASSERT_NOT_FROMSPACE(tc, async_task);
234
7
    MVM_repr_push_o(tc, tc->instance->event_loop_active, async_task);
235
7
    return work_idx;
236
7
}
237
238
/* Gets an active work item from the active work eventloop. */
239
32
MVMAsyncTask * MVM_io_eventloop_get_active_work(MVMThreadContext *tc, int work_idx) {
240
32
    if (work_idx >= 0 && work_idx < MVM_repr_elems(tc, tc->instance->event_loop_active)) {
241
32
        MVMObject *task_obj = MVM_repr_at_pos_o(tc, tc->instance->event_loop_active, work_idx);
242
32
        if (REPR(task_obj)->ID != MVM_REPR_ID_MVMAsyncTask)
243
0
            MVM_panic(1, "non-AsyncTask fetched from eventloop active work list");
244
32
        MVM_ASSERT_NOT_FROMSPACE(tc, task_obj);
245
32
        return (MVMAsyncTask *)task_obj;
246
32
    }
247
0
    else {
248
0
        MVM_panic(1, "use of invalid eventloop work item index %d", work_idx);
249
0
    }
250
32
}
251
252
/* Removes an active work index from the active work list, enabling any
253
 * memory associated with it to be collected. Replaces the work index with -1
254
 * so that any future use of the task will be a failed lookup. */
255
6
void MVM_io_eventloop_remove_active_work(MVMThreadContext *tc, int *work_idx_to_clear) {
256
6
    int work_idx = *work_idx_to_clear;
257
6
    if (work_idx >= 0 && work_idx < MVM_repr_elems(tc, tc->instance->event_loop_active)) {
258
6
        *work_idx_to_clear = -1;
259
6
        MVM_repr_bind_pos_o(tc, tc->instance->event_loop_active, work_idx, tc->instance->VMNull);
260
6
        /* TODO: start to re-use the indices */
261
6
    }
262
0
    else {
263
0
        MVM_panic(1, "cannot remove invalid eventloop work item index %d", work_idx);
264
0
    }
265
6
}