Coverage Report

Created: 2017-04-15 07:07

/home/travis/build/MoarVM/MoarVM/src/io/eventloop.c
Line
Count
Source (jump to first uncovered line)
1
#include "moar.h"
2
3
/* Asynchronous I/O, timers, file system notifications and signal handlers
4
 * have their callbacks processed by this event loop. Its job is mostly to
5
 * fire off work, receive the callbacks, and put stuff into the concurrent
6
 * work queue of some scheduler or other. It's backed by a thread that is
7
 * started in the usual way, but never actually ends up in interpreter;
8
 * instead, it enters a libuv event loop "forever", until program exit.
9
 *
10
 * Work is sent to the event loop by
11
 */
12
13
/* Sets up an async task to be done on the loop. */
14
0
static void setup_work(MVMThreadContext *tc) {
15
0
    MVMConcBlockingQueue *queue = (MVMConcBlockingQueue *)tc->instance->event_loop_todo_queue;
16
0
    MVMObject *task_obj;
17
0
18
0
    MVMROOT(tc, queue, {
19
0
        while (!MVM_is_null(tc, task_obj = MVM_concblockingqueue_poll(tc, queue))) {
20
0
            MVMAsyncTask *task = (MVMAsyncTask *)task_obj;
21
0
            task->body.ops->setup(tc, tc->loop, task_obj, task->body.data);
22
0
        }
23
0
    });
24
0
}
25
26
/* Performs an async cancellation on the loop. */
27
0
static void cancel_work(MVMThreadContext *tc) {
28
0
    MVMConcBlockingQueue *queue = (MVMConcBlockingQueue *)tc->instance->event_loop_cancel_queue;
29
0
    MVMObject *task_obj;
30
0
31
0
    MVMROOT(tc, queue, {
32
0
        while (!MVM_is_null(tc, task_obj = MVM_concblockingqueue_poll(tc, queue))) {
33
0
            MVMAsyncTask *task = (MVMAsyncTask *)task_obj;
34
0
            if (task->body.ops->cancel)
35
0
                task->body.ops->cancel(tc, tc->loop, task_obj, task->body.data);
36
0
        }
37
0
    });
38
0
}
39
40
/* Fired whenever we were signalled that there is a new task or a new
41
 * cancellation for the event loop to process. */
42
0
static void async_handler(uv_async_t *handle) {
43
0
    MVMThreadContext *tc = (MVMThreadContext *)handle->data;
44
0
    GC_SYNC_POINT(tc);
45
0
    setup_work(tc);
46
0
    cancel_work(tc);
47
0
}
48
49
/* Enters the event loop. */
50
0
static void enter_loop(MVMThreadContext *tc, MVMCallsite *callsite, MVMRegister *args) {
51
0
    uv_async_t   *async;
52
0
53
0
    /* Set up async handler so we can be woken up when there's new tasks. */
54
0
    async = MVM_malloc(sizeof(uv_async_t));
55
0
    if (uv_async_init(tc->loop, async, async_handler) != 0)
56
0
        MVM_panic(1, "Unable to initialize async wake-up handle for event loop");
57
0
    async->data = tc;
58
0
    tc->instance->event_loop_wakeup = async;
59
0
60
0
    /* Signal that the event loop is ready for processing. */
61
0
    uv_sem_post(&(tc->instance->sem_event_loop_started));
62
0
63
0
    /* Enter event loop; should never leave it. */
64
0
    uv_run(tc->loop, UV_RUN_DEFAULT);
65
0
    MVM_panic(1, "Supposedly unending event loop thread ended");
66
0
}
67
68
/* Sees if we have an event loop processing thread set up already, and
69
 * sets it up if not. */
70
0
static uv_loop_t *get_or_vivify_loop(MVMThreadContext *tc) {
71
0
    MVMInstance *instance = tc->instance;
72
0
73
0
    if (!instance->event_loop_thread) {
74
0
        /* Grab starting mutex and ensure we didn't lose the race. */
75
0
        MVM_gc_mark_thread_blocked(tc);
76
0
        uv_mutex_lock(&instance->mutex_event_loop_start);
77
0
        MVM_gc_mark_thread_unblocked(tc);
78
0
        if (!instance->event_loop_thread) {
79
0
            MVMObject *thread, *loop_runner;
80
0
            int r;
81
0
82
0
            /* Create various bits of state the async event loop thread needs. */
83
0
            instance->event_loop_todo_queue   = MVM_repr_alloc_init(tc,
84
0
                instance->boot_types.BOOTQueue);
85
0
            instance->event_loop_cancel_queue = MVM_repr_alloc_init(tc,
86
0
                instance->boot_types.BOOTQueue);
87
0
            instance->event_loop_active       = MVM_repr_alloc_init(tc,
88
0
                instance->boot_types.BOOTArray);
89
0
90
0
            /* We need to wait until we know the event loop has started; we'll
91
0
             * use a semaphore for this purpose. */
92
0
            if ((r = uv_sem_init(&(instance->sem_event_loop_started), 0)) < 0) {
93
0
                uv_mutex_unlock(&instance->mutex_event_loop_start);
94
0
                MVM_exception_throw_adhoc(tc, "Failed to initialize event loop start semaphore: %s",
95
0
                    uv_strerror(r));
96
0
            }
97
0
98
0
            /* Start the event loop thread, which will call a C function that
99
0
             * sits in the uv loop, never leaving. */
100
0
            loop_runner = MVM_repr_alloc_init(tc, instance->boot_types.BOOTCCode);
101
0
            ((MVMCFunction *)loop_runner)->body.func = enter_loop;
102
0
            thread = MVM_thread_new(tc, loop_runner, 1);
103
0
            MVMROOT(tc, thread, {
104
0
                MVM_thread_run(tc, thread);
105
0
106
0
                /* Block until we know it's fully started and initialized. */
107
0
                MVM_gc_mark_thread_blocked(tc);
108
0
                uv_sem_wait(&(instance->sem_event_loop_started));
109
0
                MVM_gc_mark_thread_unblocked(tc);
110
0
                uv_sem_destroy(&(instance->sem_event_loop_started));
111
0
112
0
                /* Make the started event loop thread visible to others. */
113
0
                instance->event_loop_thread = ((MVMThread *)thread)->body.tc;
114
0
            });
115
0
        }
116
0
        uv_mutex_unlock(&instance->mutex_event_loop_start);
117
0
    }
118
0
119
0
    return instance->event_loop_thread->loop;
120
0
}
121
122
/* Adds a work item into the event loop work queue. */
123
0
void MVM_io_eventloop_queue_work(MVMThreadContext *tc, MVMObject *work) {
124
0
    MVMROOT(tc, work, {
125
0
        get_or_vivify_loop(tc);
126
0
        MVM_repr_push_o(tc, tc->instance->event_loop_todo_queue, work);
127
0
        uv_async_send(tc->instance->event_loop_wakeup);
128
0
    });
129
0
}
130
131
/* Cancels a piece of async work. */
132
void MVM_io_eventloop_cancel_work(MVMThreadContext *tc, MVMObject *task_obj,
133
0
        MVMObject *notify_queue, MVMObject *notify_schedulee) {
134
0
    if (REPR(task_obj)->ID == MVM_REPR_ID_MVMAsyncTask) {
135
0
        if (notify_queue && notify_schedulee) {
136
0
            MVMAsyncTask *task = (MVMAsyncTask *)task_obj;
137
0
            MVM_ASSIGN_REF(tc, &(task_obj->header), task->body.cancel_notify_queue,
138
0
                notify_queue);
139
0
            MVM_ASSIGN_REF(tc, &(task_obj->header), task->body.cancel_notify_schedulee,
140
0
                notify_schedulee);
141
0
        }
142
0
        MVMROOT(tc, task_obj, {
143
0
            get_or_vivify_loop(tc);
144
0
            MVM_repr_push_o(tc, tc->instance->event_loop_cancel_queue, task_obj);
145
0
            uv_async_send(tc->instance->event_loop_wakeup);
146
0
        });
147
0
    }
148
0
    else {
149
0
        MVM_exception_throw_adhoc(tc, "Can only cancel an AsyncTask handle");
150
0
    }
151
0
}
152
153
/* Sends a task cancellation notification if requested for the specified task. */
154
0
void MVM_io_eventloop_send_cancellation_notification(MVMThreadContext *tc, MVMAsyncTask *task) {
155
0
    MVMObject *notify_queue = task->body.cancel_notify_queue;
156
0
    MVMObject *notify_schedulee = task->body.cancel_notify_schedulee;
157
0
    if (notify_queue && notify_schedulee)
158
0
        MVM_repr_push_o(tc, notify_queue, notify_schedulee);
159
0
}
160
161
/* Adds a work item to the active async task set. */
162
0
int MVM_io_eventloop_add_active_work(MVMThreadContext *tc, MVMObject *async_task) {
163
0
    int work_idx = MVM_repr_elems(tc, tc->instance->event_loop_active);
164
0
    MVM_repr_push_o(tc, tc->instance->event_loop_active, async_task);
165
0
    return work_idx;
166
0
}
167
168
/* Gets an active work item from the active work eventloop. */
169
0
MVMAsyncTask * MVM_io_eventloop_get_active_work(MVMThreadContext *tc, int work_idx) {
170
0
    if (work_idx >= 0 && work_idx < MVM_repr_elems(tc, tc->instance->event_loop_active)) {
171
0
        MVMObject *task_obj = MVM_repr_at_pos_o(tc, tc->instance->event_loop_active, work_idx);
172
0
        if (REPR(task_obj)->ID != MVM_REPR_ID_MVMAsyncTask)
173
0
            MVM_panic(1, "non-AsyncTask fetched from eventloop active work list");
174
0
        return (MVMAsyncTask *)task_obj;
175
0
    }
176
0
    else {
177
0
        MVM_panic(1, "use of invalid eventloop work item index %d", work_idx);
178
0
    }
179
0
}
180
181
/* Removes an active work index from the active work list, enabling any
182
 * memory associated with it to be collected. Replaces the work index with -1
183
 * so that any future use of the task will be a failed lookup. */
184
0
void MVM_io_eventloop_remove_active_work(MVMThreadContext *tc, int *work_idx_to_clear) {
185
0
    int work_idx = *work_idx_to_clear;
186
0
    if (work_idx >= 0 && work_idx < MVM_repr_elems(tc, tc->instance->event_loop_active)) {
187
0
        *work_idx_to_clear = -1;
188
0
        MVM_repr_bind_pos_o(tc, tc->instance->event_loop_active, work_idx, tc->instance->VMNull);
189
0
        /* TODO: start to re-use the indices */
190
0
    }
191
0
    else {
192
0
        MVM_panic(1, "cannot remove invalid eventloop work item index %d", work_idx);
193
0
    }
194
0
}