/home/travis/build/MoarVM/MoarVM/src/io/eventloop.c
Line | Count | Source (jump to first uncovered line) |
1 | | #include "moar.h" |
2 | | |
3 | | /* Asynchronous I/O, timers, file system notifications and signal handlers |
4 | | * have their callbacks processed by this event loop. Its job is mostly to |
5 | | * fire off work, receive the callbacks, and put stuff into the concurrent |
6 | | * work queue of some scheduler or other. It's backed by a thread that is |
7 | | * started in the usual way, but never actually ends up in interpreter; |
8 | | * instead, it enters a libuv event loop "forever", until program exit. |
9 | | * |
10 | | * Work is sent to the event loop by |
11 | | */ |
12 | | |
13 | | /* Sets up an async task to be done on the loop. */ |
14 | 7 | static void setup_work(MVMThreadContext *tc) { |
15 | 7 | MVMConcBlockingQueue *queue = (MVMConcBlockingQueue *)tc->instance->event_loop_todo_queue; |
16 | 7 | MVMObject *task_obj; |
17 | 7 | |
18 | 7 | MVMROOT(tc, queue, { |
19 | 7 | while (!MVM_is_null(tc, task_obj = MVM_concblockingqueue_poll(tc, queue))) { |
20 | 7 | MVMAsyncTask *task = (MVMAsyncTask *)task_obj; |
21 | 7 | MVM_ASSERT_NOT_FROMSPACE(tc, task); |
22 | 7 | if (task->body.state == MVM_ASYNC_TASK_STATE_NEW) { |
23 | 7 | MVMROOT(tc, task, { |
24 | 7 | task->body.ops->setup(tc, tc->loop, task_obj, task->body.data); |
25 | 7 | task->body.state = MVM_ASYNC_TASK_STATE_SETUP; |
26 | 7 | }); |
27 | 7 | } |
28 | 7 | } |
29 | 7 | }); |
30 | 7 | } |
31 | | |
32 | | /* Performs an async emit permit grant on the loop. */ |
33 | 7 | static void permit_work(MVMThreadContext *tc) { |
34 | 7 | MVMConcBlockingQueue *queue = (MVMConcBlockingQueue *)tc->instance->event_loop_permit_queue; |
35 | 7 | MVMObject *task_arr; |
36 | 7 | |
37 | 7 | MVMROOT(tc, queue, { |
38 | 7 | while (!MVM_is_null(tc, task_arr = MVM_concblockingqueue_poll(tc, queue))) { |
39 | 7 | MVMObject *task_obj = MVM_repr_at_pos_o(tc, task_arr, 0); |
40 | 7 | MVMAsyncTask *task = (MVMAsyncTask *)task_obj; |
41 | 7 | MVM_ASSERT_NOT_FROMSPACE(tc, task); |
42 | 7 | if (task->body.ops->permit) { |
43 | 7 | MVMint64 channel = MVM_repr_get_int(tc, MVM_repr_at_pos_o(tc, task_arr, 1)); |
44 | 7 | MVMint64 permit = MVM_repr_get_int(tc, MVM_repr_at_pos_o(tc, task_arr, 2)); |
45 | 7 | task->body.ops->permit(tc, tc->loop, task_obj, task->body.data, channel, permit); |
46 | 7 | } |
47 | 7 | } |
48 | 7 | }); |
49 | 7 | } |
50 | | |
51 | | /* Performs an async cancellation on the loop. */ |
52 | 7 | static void cancel_work(MVMThreadContext *tc) { |
53 | 7 | MVMConcBlockingQueue *queue = (MVMConcBlockingQueue *)tc->instance->event_loop_cancel_queue; |
54 | 7 | MVMObject *task_obj; |
55 | 7 | |
56 | 7 | MVMROOT(tc, queue, { |
57 | 7 | while (!MVM_is_null(tc, task_obj = MVM_concblockingqueue_poll(tc, queue))) { |
58 | 7 | MVMAsyncTask *task = (MVMAsyncTask *)task_obj; |
59 | 7 | MVM_ASSERT_NOT_FROMSPACE(tc, task); |
60 | 7 | if (task->body.state == MVM_ASYNC_TASK_STATE_SETUP) { |
61 | 7 | MVMROOT(tc, task, { |
62 | 7 | if (task->body.ops->cancel) |
63 | 7 | task->body.ops->cancel(tc, tc->loop, task_obj, task->body.data); |
64 | 7 | }); |
65 | 7 | } |
66 | 7 | task->body.state = MVM_ASYNC_TASK_STATE_CANCELLED; |
67 | 7 | } |
68 | 7 | }); |
69 | 7 | } |
70 | | |
71 | | /* Fired whenever we were signalled that there is a new task or a new |
72 | | * cancellation for the event loop to process. */ |
73 | 7 | static void async_handler(uv_async_t *handle) { |
74 | 7 | MVMThreadContext *tc = (MVMThreadContext *)handle->data; |
75 | 7 | GC_SYNC_POINT(tc); |
76 | 7 | setup_work(tc); |
77 | 7 | permit_work(tc); |
78 | 7 | cancel_work(tc); |
79 | 7 | } |
80 | | |
81 | | /* Enters the event loop. */ |
82 | 3 | static void enter_loop(MVMThreadContext *tc, MVMCallsite *callsite, MVMRegister *args) { |
83 | 3 | uv_async_t *async; |
84 | 3 | |
85 | 3 | /* Set up async handler so we can be woken up when there's new tasks. */ |
86 | 3 | async = MVM_malloc(sizeof(uv_async_t)); |
87 | 3 | if (uv_async_init(tc->loop, async, async_handler) != 0) |
88 | 0 | MVM_panic(1, "Unable to initialize async wake-up handle for event loop"); |
89 | 3 | async->data = tc; |
90 | 3 | tc->instance->event_loop_wakeup = async; |
91 | 3 | |
92 | 3 | /* Signal that the event loop is ready for processing. */ |
93 | 3 | uv_sem_post(&(tc->instance->sem_event_loop_started)); |
94 | 3 | |
95 | 3 | /* Enter event loop; should never leave it. */ |
96 | 3 | uv_run(tc->loop, UV_RUN_DEFAULT); |
97 | 3 | MVM_panic(1, "Supposedly unending event loop thread ended"); |
98 | 3 | } |
99 | | |
100 | | /* Sees if we have an event loop processing thread set up already, and |
101 | | * sets it up if not. */ |
102 | 21 | static uv_loop_t *get_or_vivify_loop(MVMThreadContext *tc) { |
103 | 21 | MVMInstance *instance = tc->instance; |
104 | 21 | |
105 | 21 | if (!instance->event_loop_thread) { |
106 | 3 | /* Grab starting mutex and ensure we didn't lose the race. */ |
107 | 3 | MVM_telemetry_timestamp(tc, "hoping to start an event loop thread"); |
108 | 3 | MVM_gc_mark_thread_blocked(tc); |
109 | 3 | uv_mutex_lock(&instance->mutex_event_loop_start); |
110 | 3 | MVM_gc_mark_thread_unblocked(tc); |
111 | 3 | if (!instance->event_loop_thread) { |
112 | 3 | MVMObject *thread, *loop_runner; |
113 | 3 | int r; |
114 | 3 | unsigned int interval_id; |
115 | 3 | |
116 | 3 | interval_id = MVM_telemetry_interval_start(tc, "creating the event loop thread"); |
117 | 3 | |
118 | 3 | /* Create various bits of state the async event loop thread needs. */ |
119 | 3 | instance->event_loop_todo_queue = MVM_repr_alloc_init(tc, |
120 | 3 | instance->boot_types.BOOTQueue); |
121 | 3 | instance->event_loop_permit_queue = MVM_repr_alloc_init(tc, |
122 | 3 | instance->boot_types.BOOTQueue); |
123 | 3 | instance->event_loop_cancel_queue = MVM_repr_alloc_init(tc, |
124 | 3 | instance->boot_types.BOOTQueue); |
125 | 3 | instance->event_loop_active = MVM_repr_alloc_init(tc, |
126 | 3 | instance->boot_types.BOOTArray); |
127 | 3 | |
128 | 3 | /* We need to wait until we know the event loop has started; we'll |
129 | 3 | * use a semaphore for this purpose. */ |
130 | 3 | if ((r = uv_sem_init(&(instance->sem_event_loop_started), 0)) < 0) { |
131 | 0 | uv_mutex_unlock(&instance->mutex_event_loop_start); |
132 | 0 | MVM_exception_throw_adhoc(tc, "Failed to initialize event loop start semaphore: %s", |
133 | 0 | uv_strerror(r)); |
134 | 0 | } |
135 | 3 | |
136 | 3 | /* Start the event loop thread, which will call a C function that |
137 | 3 | * sits in the uv loop, never leaving. */ |
138 | 3 | loop_runner = MVM_repr_alloc_init(tc, instance->boot_types.BOOTCCode); |
139 | 3 | ((MVMCFunction *)loop_runner)->body.func = enter_loop; |
140 | 3 | thread = MVM_thread_new(tc, loop_runner, 1); |
141 | 3 | MVMROOT(tc, thread, { |
142 | 3 | MVM_thread_run(tc, thread); |
143 | 3 | |
144 | 3 | /* Block until we know it's fully started and initialized. */ |
145 | 3 | MVM_gc_mark_thread_blocked(tc); |
146 | 3 | uv_sem_wait(&(instance->sem_event_loop_started)); |
147 | 3 | MVM_gc_mark_thread_unblocked(tc); |
148 | 3 | uv_sem_destroy(&(instance->sem_event_loop_started)); |
149 | 3 | |
150 | 3 | /* Make the started event loop thread visible to others. */ |
151 | 3 | instance->event_loop_thread = ((MVMThread *)thread)->body.tc; |
152 | 3 | }); |
153 | 3 | |
154 | 3 | MVM_telemetry_interval_stop(tc, interval_id, "created the event loop thread"); |
155 | 3 | } |
156 | 3 | uv_mutex_unlock(&instance->mutex_event_loop_start); |
157 | 3 | } |
158 | 21 | |
159 | 21 | return instance->event_loop_thread->loop; |
160 | 21 | } |
161 | | |
162 | | /* Adds a work item into the event loop work queue. */ |
163 | 7 | void MVM_io_eventloop_queue_work(MVMThreadContext *tc, MVMObject *work) { |
164 | 7 | MVMROOT(tc, work, { |
165 | 7 | get_or_vivify_loop(tc); |
166 | 7 | MVM_repr_push_o(tc, tc->instance->event_loop_todo_queue, work); |
167 | 7 | uv_async_send(tc->instance->event_loop_wakeup); |
168 | 7 | }); |
169 | 7 | } |
170 | | |
171 | | /* Permits an asynchronous task to emit more events. This is used to provide a |
172 | | * back-pressure mechanism. */ |
173 | | void MVM_io_eventloop_permit(MVMThreadContext *tc, MVMObject *task_obj, |
174 | 14 | MVMint64 channel, MVMint64 permits) { |
175 | 14 | if (REPR(task_obj)->ID == MVM_REPR_ID_MVMOSHandle) |
176 | 14 | task_obj = MVM_io_get_async_task_handle(tc, task_obj); |
177 | 14 | if (REPR(task_obj)->ID == MVM_REPR_ID_MVMAsyncTask) { |
178 | 14 | MVMROOT(tc, task_obj, { |
179 | 14 | MVMObject *channel_box = NULL; |
180 | 14 | MVMObject *permits_box = NULL; |
181 | 14 | MVMObject *arr = NULL; |
182 | 14 | MVMROOT3(tc, channel_box, permits_box, arr, { |
183 | 14 | channel_box = MVM_repr_box_int(tc, tc->instance->boot_types.BOOTInt, channel); |
184 | 14 | permits_box = MVM_repr_box_int(tc, tc->instance->boot_types.BOOTInt, permits); |
185 | 14 | arr = MVM_repr_alloc_init(tc, tc->instance->boot_types.BOOTArray); |
186 | 14 | MVM_repr_push_o(tc, arr, task_obj); |
187 | 14 | MVM_repr_push_o(tc, arr, channel_box); |
188 | 14 | MVM_repr_push_o(tc, arr, permits_box); |
189 | 14 | get_or_vivify_loop(tc); |
190 | 14 | MVM_repr_push_o(tc, tc->instance->event_loop_permit_queue, arr); |
191 | 14 | uv_async_send(tc->instance->event_loop_wakeup); |
192 | 14 | }); |
193 | 14 | }); |
194 | 14 | } |
195 | 0 | else { |
196 | 0 | MVM_exception_throw_adhoc(tc, "Can only permit an AsyncTask handle"); |
197 | 0 | } |
198 | 14 | } |
199 | | |
200 | | /* Cancels a piece of async work. */ |
201 | | void MVM_io_eventloop_cancel_work(MVMThreadContext *tc, MVMObject *task_obj, |
202 | 0 | MVMObject *notify_queue, MVMObject *notify_schedulee) { |
203 | 0 | if (REPR(task_obj)->ID == MVM_REPR_ID_MVMAsyncTask) { |
204 | 0 | if (notify_queue && notify_schedulee) { |
205 | 0 | MVMAsyncTask *task = (MVMAsyncTask *)task_obj; |
206 | 0 | MVM_ASSIGN_REF(tc, &(task_obj->header), task->body.cancel_notify_queue, |
207 | 0 | notify_queue); |
208 | 0 | MVM_ASSIGN_REF(tc, &(task_obj->header), task->body.cancel_notify_schedulee, |
209 | 0 | notify_schedulee); |
210 | 0 | } |
211 | 0 | MVMROOT(tc, task_obj, { |
212 | 0 | get_or_vivify_loop(tc); |
213 | 0 | MVM_repr_push_o(tc, tc->instance->event_loop_cancel_queue, task_obj); |
214 | 0 | uv_async_send(tc->instance->event_loop_wakeup); |
215 | 0 | }); |
216 | 0 | } |
217 | 0 | else { |
218 | 0 | MVM_exception_throw_adhoc(tc, "Can only cancel an AsyncTask handle"); |
219 | 0 | } |
220 | 0 | } |
221 | | |
222 | | /* Sends a task cancellation notification if requested for the specified task. */ |
223 | 0 | void MVM_io_eventloop_send_cancellation_notification(MVMThreadContext *tc, MVMAsyncTask *task) { |
224 | 0 | MVMObject *notify_queue = task->body.cancel_notify_queue; |
225 | 0 | MVMObject *notify_schedulee = task->body.cancel_notify_schedulee; |
226 | 0 | if (notify_queue && notify_schedulee) |
227 | 0 | MVM_repr_push_o(tc, notify_queue, notify_schedulee); |
228 | 0 | } |
229 | | |
230 | | /* Adds a work item to the active async task set. */ |
231 | 7 | int MVM_io_eventloop_add_active_work(MVMThreadContext *tc, MVMObject *async_task) { |
232 | 7 | int work_idx = MVM_repr_elems(tc, tc->instance->event_loop_active); |
233 | 7 | MVM_ASSERT_NOT_FROMSPACE(tc, async_task); |
234 | 7 | MVM_repr_push_o(tc, tc->instance->event_loop_active, async_task); |
235 | 7 | return work_idx; |
236 | 7 | } |
237 | | |
238 | | /* Gets an active work item from the active work eventloop. */ |
239 | 32 | MVMAsyncTask * MVM_io_eventloop_get_active_work(MVMThreadContext *tc, int work_idx) { |
240 | 32 | if (work_idx >= 0 && work_idx < MVM_repr_elems(tc, tc->instance->event_loop_active)) { |
241 | 32 | MVMObject *task_obj = MVM_repr_at_pos_o(tc, tc->instance->event_loop_active, work_idx); |
242 | 32 | if (REPR(task_obj)->ID != MVM_REPR_ID_MVMAsyncTask) |
243 | 0 | MVM_panic(1, "non-AsyncTask fetched from eventloop active work list"); |
244 | 32 | MVM_ASSERT_NOT_FROMSPACE(tc, task_obj); |
245 | 32 | return (MVMAsyncTask *)task_obj; |
246 | 32 | } |
247 | 0 | else { |
248 | 0 | MVM_panic(1, "use of invalid eventloop work item index %d", work_idx); |
249 | 0 | } |
250 | 32 | } |
251 | | |
252 | | /* Removes an active work index from the active work list, enabling any |
253 | | * memory associated with it to be collected. Replaces the work index with -1 |
254 | | * so that any future use of the task will be a failed lookup. */ |
255 | 6 | void MVM_io_eventloop_remove_active_work(MVMThreadContext *tc, int *work_idx_to_clear) { |
256 | 6 | int work_idx = *work_idx_to_clear; |
257 | 6 | if (work_idx >= 0 && work_idx < MVM_repr_elems(tc, tc->instance->event_loop_active)) { |
258 | 6 | *work_idx_to_clear = -1; |
259 | 6 | MVM_repr_bind_pos_o(tc, tc->instance->event_loop_active, work_idx, tc->instance->VMNull); |
260 | 6 | /* TODO: start to re-use the indices */ |
261 | 6 | } |
262 | 0 | else { |
263 | 0 | MVM_panic(1, "cannot remove invalid eventloop work item index %d", work_idx); |
264 | 0 | } |
265 | 6 | } |