/home/travis/build/MoarVM/MoarVM/src/io/eventloop.c
Line | Count | Source (jump to first uncovered line) |
1 | | #include "moar.h" |
2 | | |
3 | | /* Asynchronous I/O, timers, file system notifications and signal handlers |
4 | | * have their callbacks processed by this event loop. Its job is mostly to |
5 | | * fire off work, receive the callbacks, and put stuff into the concurrent |
6 | | * work queue of some scheduler or other. It's backed by a thread that is |
7 | | * started in the usual way, but never actually ends up in interpreter; |
8 | | * instead, it enters a libuv event loop "forever", until program exit. |
9 | | * |
10 | | * Work is sent to the event loop by |
11 | | */ |
12 | | |
13 | | /* Sets up an async task to be done on the loop. */ |
14 | 0 | static void setup_work(MVMThreadContext *tc) { |
15 | 0 | MVMConcBlockingQueue *queue = (MVMConcBlockingQueue *)tc->instance->event_loop_todo_queue; |
16 | 0 | MVMObject *task_obj; |
17 | 0 |
|
18 | 0 | MVMROOT(tc, queue, { |
19 | 0 | while (!MVM_is_null(tc, task_obj = MVM_concblockingqueue_poll(tc, queue))) { |
20 | 0 | MVMAsyncTask *task = (MVMAsyncTask *)task_obj; |
21 | 0 | task->body.ops->setup(tc, tc->loop, task_obj, task->body.data); |
22 | 0 | } |
23 | 0 | }); |
24 | 0 | } |
25 | | |
26 | | /* Performs an async cancellation on the loop. */ |
27 | 0 | static void cancel_work(MVMThreadContext *tc) { |
28 | 0 | MVMConcBlockingQueue *queue = (MVMConcBlockingQueue *)tc->instance->event_loop_cancel_queue; |
29 | 0 | MVMObject *task_obj; |
30 | 0 |
|
31 | 0 | MVMROOT(tc, queue, { |
32 | 0 | while (!MVM_is_null(tc, task_obj = MVM_concblockingqueue_poll(tc, queue))) { |
33 | 0 | MVMAsyncTask *task = (MVMAsyncTask *)task_obj; |
34 | 0 | if (task->body.ops->cancel) |
35 | 0 | task->body.ops->cancel(tc, tc->loop, task_obj, task->body.data); |
36 | 0 | } |
37 | 0 | }); |
38 | 0 | } |
39 | | |
40 | | /* Fired whenever we were signalled that there is a new task or a new |
41 | | * cancellation for the event loop to process. */ |
42 | 0 | static void async_handler(uv_async_t *handle) { |
43 | 0 | MVMThreadContext *tc = (MVMThreadContext *)handle->data; |
44 | 0 | GC_SYNC_POINT(tc); |
45 | 0 | setup_work(tc); |
46 | 0 | cancel_work(tc); |
47 | 0 | } |
48 | | |
49 | | /* Enters the event loop. */ |
50 | 0 | static void enter_loop(MVMThreadContext *tc, MVMCallsite *callsite, MVMRegister *args) { |
51 | 0 | uv_async_t *async; |
52 | 0 |
|
53 | 0 | /* Set up async handler so we can be woken up when there's new tasks. */ |
54 | 0 | async = MVM_malloc(sizeof(uv_async_t)); |
55 | 0 | if (uv_async_init(tc->loop, async, async_handler) != 0) |
56 | 0 | MVM_panic(1, "Unable to initialize async wake-up handle for event loop"); |
57 | 0 | async->data = tc; |
58 | 0 | tc->instance->event_loop_wakeup = async; |
59 | 0 |
|
60 | 0 | /* Signal that the event loop is ready for processing. */ |
61 | 0 | uv_sem_post(&(tc->instance->sem_event_loop_started)); |
62 | 0 |
|
63 | 0 | /* Enter event loop; should never leave it. */ |
64 | 0 | uv_run(tc->loop, UV_RUN_DEFAULT); |
65 | 0 | MVM_panic(1, "Supposedly unending event loop thread ended"); |
66 | 0 | } |
67 | | |
68 | | /* Sees if we have an event loop processing thread set up already, and |
69 | | * sets it up if not. */ |
70 | 0 | static uv_loop_t *get_or_vivify_loop(MVMThreadContext *tc) { |
71 | 0 | MVMInstance *instance = tc->instance; |
72 | 0 |
|
73 | 0 | if (!instance->event_loop_thread) { |
74 | 0 | /* Grab starting mutex and ensure we didn't lose the race. */ |
75 | 0 | MVM_gc_mark_thread_blocked(tc); |
76 | 0 | uv_mutex_lock(&instance->mutex_event_loop_start); |
77 | 0 | MVM_gc_mark_thread_unblocked(tc); |
78 | 0 | if (!instance->event_loop_thread) { |
79 | 0 | MVMObject *thread, *loop_runner; |
80 | 0 | int r; |
81 | 0 |
|
82 | 0 | /* Create various bits of state the async event loop thread needs. */ |
83 | 0 | instance->event_loop_todo_queue = MVM_repr_alloc_init(tc, |
84 | 0 | instance->boot_types.BOOTQueue); |
85 | 0 | instance->event_loop_cancel_queue = MVM_repr_alloc_init(tc, |
86 | 0 | instance->boot_types.BOOTQueue); |
87 | 0 | instance->event_loop_active = MVM_repr_alloc_init(tc, |
88 | 0 | instance->boot_types.BOOTArray); |
89 | 0 |
|
90 | 0 | /* We need to wait until we know the event loop has started; we'll |
91 | 0 | * use a semaphore for this purpose. */ |
92 | 0 | if ((r = uv_sem_init(&(instance->sem_event_loop_started), 0)) < 0) { |
93 | 0 | uv_mutex_unlock(&instance->mutex_event_loop_start); |
94 | 0 | MVM_exception_throw_adhoc(tc, "Failed to initialize event loop start semaphore: %s", |
95 | 0 | uv_strerror(r)); |
96 | 0 | } |
97 | 0 |
|
98 | 0 | /* Start the event loop thread, which will call a C function that |
99 | 0 | * sits in the uv loop, never leaving. */ |
100 | 0 | loop_runner = MVM_repr_alloc_init(tc, instance->boot_types.BOOTCCode); |
101 | 0 | ((MVMCFunction *)loop_runner)->body.func = enter_loop; |
102 | 0 | thread = MVM_thread_new(tc, loop_runner, 1); |
103 | 0 | MVMROOT(tc, thread, { |
104 | 0 | MVM_thread_run(tc, thread); |
105 | 0 |
|
106 | 0 | /* Block until we know it's fully started and initialized. */ |
107 | 0 | MVM_gc_mark_thread_blocked(tc); |
108 | 0 | uv_sem_wait(&(instance->sem_event_loop_started)); |
109 | 0 | MVM_gc_mark_thread_unblocked(tc); |
110 | 0 | uv_sem_destroy(&(instance->sem_event_loop_started)); |
111 | 0 |
|
112 | 0 | /* Make the started event loop thread visible to others. */ |
113 | 0 | instance->event_loop_thread = ((MVMThread *)thread)->body.tc; |
114 | 0 | }); |
115 | 0 | } |
116 | 0 | uv_mutex_unlock(&instance->mutex_event_loop_start); |
117 | 0 | } |
118 | 0 |
|
119 | 0 | return instance->event_loop_thread->loop; |
120 | 0 | } |
121 | | |
122 | | /* Adds a work item into the event loop work queue. */ |
123 | 0 | void MVM_io_eventloop_queue_work(MVMThreadContext *tc, MVMObject *work) { |
124 | 0 | MVMROOT(tc, work, { |
125 | 0 | get_or_vivify_loop(tc); |
126 | 0 | MVM_repr_push_o(tc, tc->instance->event_loop_todo_queue, work); |
127 | 0 | uv_async_send(tc->instance->event_loop_wakeup); |
128 | 0 | }); |
129 | 0 | } |
130 | | |
131 | | /* Cancels a piece of async work. */ |
132 | | void MVM_io_eventloop_cancel_work(MVMThreadContext *tc, MVMObject *task_obj, |
133 | 0 | MVMObject *notify_queue, MVMObject *notify_schedulee) { |
134 | 0 | if (REPR(task_obj)->ID == MVM_REPR_ID_MVMAsyncTask) { |
135 | 0 | if (notify_queue && notify_schedulee) { |
136 | 0 | MVMAsyncTask *task = (MVMAsyncTask *)task_obj; |
137 | 0 | MVM_ASSIGN_REF(tc, &(task_obj->header), task->body.cancel_notify_queue, |
138 | 0 | notify_queue); |
139 | 0 | MVM_ASSIGN_REF(tc, &(task_obj->header), task->body.cancel_notify_schedulee, |
140 | 0 | notify_schedulee); |
141 | 0 | } |
142 | 0 | MVMROOT(tc, task_obj, { |
143 | 0 | get_or_vivify_loop(tc); |
144 | 0 | MVM_repr_push_o(tc, tc->instance->event_loop_cancel_queue, task_obj); |
145 | 0 | uv_async_send(tc->instance->event_loop_wakeup); |
146 | 0 | }); |
147 | 0 | } |
148 | 0 | else { |
149 | 0 | MVM_exception_throw_adhoc(tc, "Can only cancel an AsyncTask handle"); |
150 | 0 | } |
151 | 0 | } |
152 | | |
153 | | /* Sends a task cancellation notification if requested for the specified task. */ |
154 | 0 | void MVM_io_eventloop_send_cancellation_notification(MVMThreadContext *tc, MVMAsyncTask *task) { |
155 | 0 | MVMObject *notify_queue = task->body.cancel_notify_queue; |
156 | 0 | MVMObject *notify_schedulee = task->body.cancel_notify_schedulee; |
157 | 0 | if (notify_queue && notify_schedulee) |
158 | 0 | MVM_repr_push_o(tc, notify_queue, notify_schedulee); |
159 | 0 | } |
160 | | |
161 | | /* Adds a work item to the active async task set. */ |
162 | 0 | int MVM_io_eventloop_add_active_work(MVMThreadContext *tc, MVMObject *async_task) { |
163 | 0 | int work_idx = MVM_repr_elems(tc, tc->instance->event_loop_active); |
164 | 0 | MVM_repr_push_o(tc, tc->instance->event_loop_active, async_task); |
165 | 0 | return work_idx; |
166 | 0 | } |
167 | | |
168 | | /* Gets an active work item from the active work eventloop. */ |
169 | 0 | MVMAsyncTask * MVM_io_eventloop_get_active_work(MVMThreadContext *tc, int work_idx) { |
170 | 0 | if (work_idx >= 0 && work_idx < MVM_repr_elems(tc, tc->instance->event_loop_active)) { |
171 | 0 | MVMObject *task_obj = MVM_repr_at_pos_o(tc, tc->instance->event_loop_active, work_idx); |
172 | 0 | if (REPR(task_obj)->ID != MVM_REPR_ID_MVMAsyncTask) |
173 | 0 | MVM_panic(1, "non-AsyncTask fetched from eventloop active work list"); |
174 | 0 | return (MVMAsyncTask *)task_obj; |
175 | 0 | } |
176 | 0 | else { |
177 | 0 | MVM_panic(1, "use of invalid eventloop work item index %d", work_idx); |
178 | 0 | } |
179 | 0 | } |
180 | | |
181 | | /* Removes an active work index from the active work list, enabling any |
182 | | * memory associated with it to be collected. Replaces the work index with -1 |
183 | | * so that any future use of the task will be a failed lookup. */ |
184 | 0 | void MVM_io_eventloop_remove_active_work(MVMThreadContext *tc, int *work_idx_to_clear) { |
185 | 0 | int work_idx = *work_idx_to_clear; |
186 | 0 | if (work_idx >= 0 && work_idx < MVM_repr_elems(tc, tc->instance->event_loop_active)) { |
187 | 0 | *work_idx_to_clear = -1; |
188 | 0 | MVM_repr_bind_pos_o(tc, tc->instance->event_loop_active, work_idx, tc->instance->VMNull); |
189 | 0 | /* TODO: start to re-use the indices */ |
190 | 0 | } |
191 | 0 | else { |
192 | 0 | MVM_panic(1, "cannot remove invalid eventloop work item index %d", work_idx); |
193 | 0 | } |
194 | 0 | } |