/home/travis/build/MoarVM/MoarVM/src/6model/reprs/ConcBlockingQueue.c
Line | Count | Source (jump to first uncovered line) |
1 | | #include "moar.h" |
2 | | |
3 | | /* This representation's function pointer table. */ |
4 | | static const MVMREPROps ConcBlockingQueue_this_repr; |
5 | | |
6 | | /* Creates a new type object of this representation, and associates it with |
7 | | * the given HOW. */ |
8 | 145 | static MVMObject * type_object_for(MVMThreadContext *tc, MVMObject *HOW) { |
9 | 145 | MVMSTable *st = MVM_gc_allocate_stable(tc, &ConcBlockingQueue_this_repr, HOW); |
10 | 145 | |
11 | 145 | MVMROOT(tc, st, { |
12 | 145 | MVMObject *obj = MVM_gc_allocate_type_object(tc, st); |
13 | 145 | MVM_ASSIGN_REF(tc, &(st->header), st->WHAT, obj); |
14 | 145 | st->size = sizeof(MVMConcBlockingQueue); |
15 | 145 | }); |
16 | 145 | |
17 | 145 | return st->WHAT; |
18 | 145 | } |
19 | | |
20 | | /* Initializes a new instance. */ |
21 | 159 | static void initialize(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data) { |
22 | 159 | MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; |
23 | 159 | |
24 | 159 | /* Initialize locks. */ |
25 | 159 | int init_stat; |
26 | 159 | cbq->locks = MVM_calloc(1, sizeof(MVMConcBlockingQueueLocks)); |
27 | 159 | if ((init_stat = uv_mutex_init(&cbq->locks->head_lock)) < 0) |
28 | 0 | MVM_exception_throw_adhoc(tc, "Failed to initialize mutex: %s", |
29 | 0 | uv_strerror(init_stat)); |
30 | 159 | if ((init_stat = uv_mutex_init(&cbq->locks->tail_lock)) < 0) |
31 | 0 | MVM_exception_throw_adhoc(tc, "Failed to initialize mutex: %s", |
32 | 0 | uv_strerror(init_stat)); |
33 | 159 | if ((init_stat = uv_cond_init(&cbq->locks->head_cond)) < 0) |
34 | 0 | MVM_exception_throw_adhoc(tc, "Failed to initialize condition variable: %s", |
35 | 0 | uv_strerror(init_stat)); |
36 | 159 | |
37 | 159 | /* Head and tail point to a null node. */ |
38 | 159 | cbq->tail = cbq->head = MVM_calloc(1, sizeof(MVMConcBlockingQueueNode)); |
39 | 159 | } |
40 | | |
41 | | /* Copies the body of one object to another. */ |
42 | 0 | static void copy_to(MVMThreadContext *tc, MVMSTable *st, void *src, MVMObject *dest_root, void *dest) { |
43 | 0 | MVM_exception_throw_adhoc(tc, "Cannot copy object with representation ConcBlockingQueue"); |
44 | 0 | } |
45 | | |
46 | | /* Called by the VM to mark any GCable items. */ |
47 | 143 | static void gc_mark(MVMThreadContext *tc, MVMSTable *st, void *data, MVMGCWorklist *worklist) { |
48 | 143 | /* At this point we know the world is stopped, and thus we can safely do a |
49 | 143 | * traversal of the data structure without needing locks. */ |
50 | 143 | MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; |
51 | 143 | MVMConcBlockingQueueNode *cur = cbq->head; |
52 | 327 | while (cur) { |
53 | 184 | MVM_gc_worklist_add(tc, worklist, &cur->value); |
54 | 184 | cur = cur->next; |
55 | 184 | } |
56 | 143 | } |
57 | | |
58 | | /* Called by the VM in order to free memory associated with this object. */ |
59 | 0 | static void gc_free(MVMThreadContext *tc, MVMObject *obj) { |
60 | 0 | MVMConcBlockingQueue *cbq = (MVMConcBlockingQueue *)obj; |
61 | 0 |
|
62 | 0 | /* First, free all the nodes. */ |
63 | 0 | MVMConcBlockingQueueNode *cur = cbq->body.head; |
64 | 0 | while (cur) { |
65 | 0 | MVMConcBlockingQueueNode *next = cur->next; |
66 | 0 | MVM_free(cur); |
67 | 0 | cur = next; |
68 | 0 | } |
69 | 0 | cbq->body.head = cbq->body.tail = NULL; |
70 | 0 |
|
71 | 0 | /* Clean up locks. */ |
72 | 0 | uv_mutex_destroy(&cbq->body.locks->head_lock); |
73 | 0 | uv_mutex_destroy(&cbq->body.locks->tail_lock); |
74 | 0 | uv_cond_destroy(&cbq->body.locks->head_cond); |
75 | 0 | MVM_free(cbq->body.locks); |
76 | 0 | cbq->body.locks = NULL; |
77 | 0 | } |
78 | | |
79 | | static const MVMStorageSpec storage_spec = { |
80 | | MVM_STORAGE_SPEC_REFERENCE, /* inlineable */ |
81 | | 0, /* bits */ |
82 | | 0, /* align */ |
83 | | MVM_STORAGE_SPEC_BP_NONE, /* boxed_primitive */ |
84 | | 0, /* can_box */ |
85 | | 0, /* is_unsigned */ |
86 | | }; |
87 | | |
88 | | /* Gets the storage specification for this representation. */ |
89 | 0 | static const MVMStorageSpec * get_storage_spec(MVMThreadContext *tc, MVMSTable *st) { |
90 | 0 | return &storage_spec; |
91 | 0 | } |
92 | | |
93 | | /* Compose the representation. */ |
94 | 1 | static void compose(MVMThreadContext *tc, MVMSTable *st, MVMObject *info) { |
95 | 1 | /* Nothing to do for this REPR. */ |
96 | 1 | } |
97 | | |
98 | 0 | static void at_pos(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMint64 index, MVMRegister *value, MVMuint16 kind) { |
99 | 0 | MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; |
100 | 0 |
|
101 | 0 | if (index != 0) |
102 | 0 | MVM_exception_throw_adhoc(tc, |
103 | 0 | "Can only request (peek) head of a concurrent blocking queue"); |
104 | 0 | if (kind != MVM_reg_obj) |
105 | 0 | MVM_exception_throw_adhoc(tc, |
106 | 0 | "Can only get objects from a concurrent blocking queue"); |
107 | 0 |
|
108 | 0 | if (MVM_load(&cbq->elems) > 0) { |
109 | 0 | MVMConcBlockingQueueNode *peeked; |
110 | 0 | unsigned int interval_id; |
111 | 0 | interval_id = MVM_telemetry_interval_start(tc, "ConcBlockingQueue.at_pos"); |
112 | 0 | MVMROOT(tc, root, { |
113 | 0 | MVM_gc_mark_thread_blocked(tc); |
114 | 0 | data = OBJECT_BODY(root); |
115 | 0 | cbq = (MVMConcBlockingQueueBody *)data; |
116 | 0 | uv_mutex_lock(&cbq->locks->head_lock); |
117 | 0 | MVM_gc_mark_thread_unblocked(tc); |
118 | 0 | data = OBJECT_BODY(root); |
119 | 0 | cbq = (MVMConcBlockingQueueBody *)data; |
120 | 0 | }); |
121 | 0 | peeked = cbq->head->next; |
122 | 0 | value->o = peeked ? peeked->value : tc->instance->VMNull; |
123 | 0 | uv_mutex_unlock(&cbq->locks->head_lock); |
124 | 0 | MVM_telemetry_interval_stop(tc, interval_id, "ConcBlockingQueue.at_pos"); |
125 | 0 | } |
126 | 0 | else { |
127 | 0 | value->o = tc->instance->VMNull; |
128 | 0 | } |
129 | 0 | } |
130 | | |
131 | 0 | static MVMuint64 elems(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data) { |
132 | 0 | MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; |
133 | 0 | return MVM_load(&(cbq->elems)); |
134 | 0 | } |
135 | | |
136 | 1.75k | static void push(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMRegister value, MVMuint16 kind) { |
137 | 1.75k | MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; |
138 | 1.75k | MVMConcBlockingQueueNode *add; |
139 | 1.75k | AO_t orig_elems; |
140 | 1.75k | MVMObject *to_add = value.o; |
141 | 1.75k | unsigned int interval_id; |
142 | 1.75k | |
143 | 1.75k | if (kind != MVM_reg_obj) |
144 | 0 | MVM_exception_throw_adhoc(tc, |
145 | 0 | "Can only push objects to a concurrent blocking queue"); |
146 | 1.75k | if (value.o == NULL) |
147 | 0 | MVM_exception_throw_adhoc(tc, |
148 | 0 | "Cannot store a null value in a concurrent blocking queue"); |
149 | 1.75k | |
150 | 1.75k | add = MVM_calloc(1, sizeof(MVMConcBlockingQueueNode)); |
151 | 1.75k | |
152 | 1.75k | interval_id = MVM_telemetry_interval_start(tc, "ConcBlockingQueue.push"); |
153 | 1.75k | MVMROOT2(tc, root, to_add, { |
154 | 1.75k | MVM_gc_mark_thread_blocked(tc); |
155 | 1.75k | data = OBJECT_BODY(root); |
156 | 1.75k | cbq = (MVMConcBlockingQueueBody *)data; |
157 | 1.75k | uv_mutex_lock(&cbq->locks->tail_lock); |
158 | 1.75k | MVM_gc_mark_thread_unblocked(tc); |
159 | 1.75k | data = OBJECT_BODY(root); |
160 | 1.75k | cbq = (MVMConcBlockingQueueBody *)data; |
161 | 1.75k | }); |
162 | 1.75k | MVM_ASSIGN_REF(tc, &(root->header), add->value, to_add); |
163 | 1.75k | cbq->tail->next = add; |
164 | 1.75k | cbq->tail = add; |
165 | 1.75k | orig_elems = MVM_incr(&cbq->elems); |
166 | 1.75k | uv_mutex_unlock(&cbq->locks->tail_lock); |
167 | 1.75k | |
168 | 1.75k | if (orig_elems == 0) { |
169 | 1.72k | MVMROOT(tc, root, { |
170 | 1.72k | MVM_gc_mark_thread_blocked(tc); |
171 | 1.72k | data = OBJECT_BODY(root); |
172 | 1.72k | cbq = (MVMConcBlockingQueueBody *)data; |
173 | 1.72k | uv_mutex_lock(&cbq->locks->head_lock); |
174 | 1.72k | MVM_gc_mark_thread_unblocked(tc); |
175 | 1.72k | data = OBJECT_BODY(root); |
176 | 1.72k | cbq = (MVMConcBlockingQueueBody *)data; |
177 | 1.72k | }); |
178 | 1.72k | uv_cond_signal(&cbq->locks->head_cond); |
179 | 1.72k | uv_mutex_unlock(&cbq->locks->head_lock); |
180 | 1.72k | } |
181 | 1.75k | MVM_telemetry_interval_stop(tc, interval_id, "ConcBlockingQueue.push"); |
182 | 1.75k | } |
183 | | |
184 | 1.65k | static void shift(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMRegister *value, MVMuint16 kind) { |
185 | 1.65k | MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; |
186 | 1.65k | MVMConcBlockingQueueNode *taken; |
187 | 1.65k | unsigned int interval_id; |
188 | 1.65k | |
189 | 1.65k | if (kind != MVM_reg_obj) |
190 | 0 | MVM_exception_throw_adhoc(tc, "Can only shift objects from a ConcBlockingQueue"); |
191 | 1.65k | |
192 | 1.65k | interval_id = MVM_telemetry_interval_start(tc, "ConcBlockingQueue.shift"); |
193 | 1.65k | MVMROOT(tc, root, { |
194 | 1.65k | MVM_gc_mark_thread_blocked(tc); |
195 | 1.65k | data = OBJECT_BODY(root); |
196 | 1.65k | cbq = (MVMConcBlockingQueueBody *)data; |
197 | 1.65k | uv_mutex_lock(&cbq->locks->head_lock); |
198 | 1.65k | MVM_gc_mark_thread_unblocked(tc); |
199 | 1.65k | data = OBJECT_BODY(root); |
200 | 1.65k | cbq = (MVMConcBlockingQueueBody *)data; |
201 | 1.65k | |
202 | 1.65k | while (MVM_load(&cbq->elems) == 0) { |
203 | 1.65k | MVM_gc_mark_thread_blocked(tc); |
204 | 1.65k | data = OBJECT_BODY(root); |
205 | 1.65k | cbq = (MVMConcBlockingQueueBody *)data; |
206 | 1.65k | uv_cond_wait(&cbq->locks->head_cond, &cbq->locks->head_lock); |
207 | 1.65k | MVM_gc_mark_thread_unblocked(tc); |
208 | 1.65k | data = OBJECT_BODY(root); |
209 | 1.65k | cbq = (MVMConcBlockingQueueBody *)data; |
210 | 1.65k | } |
211 | 1.65k | }); |
212 | 1.65k | |
213 | 1.65k | taken = cbq->head->next; |
214 | 1.65k | MVM_free(cbq->head); |
215 | 1.65k | cbq->head = taken; |
216 | 1.65k | MVM_barrier(); |
217 | 1.65k | value->o = taken->value; |
218 | 1.65k | taken->value = NULL; |
219 | 1.65k | MVM_barrier(); |
220 | 1.65k | |
221 | 1.65k | if (MVM_decr(&cbq->elems) > 1) |
222 | 25 | uv_cond_signal(&cbq->locks->head_cond); |
223 | 1.65k | |
224 | 1.65k | uv_mutex_unlock(&cbq->locks->head_lock); |
225 | 1.65k | MVM_telemetry_interval_stop(tc, interval_id, "ConcBlockingQueue.shift"); |
226 | 1.65k | } |
227 | | |
228 | | /* Set the size of the STable. */ |
229 | 2 | static void deserialize_stable_size(MVMThreadContext *tc, MVMSTable *st, MVMSerializationReader *reader) { |
230 | 2 | st->size = sizeof(MVMConcBlockingQueue); |
231 | 2 | } |
232 | | |
233 | | /* Initializes the representation. */ |
234 | 144 | const MVMREPROps * MVMConcBlockingQueue_initialize(MVMThreadContext *tc) { |
235 | 144 | return &ConcBlockingQueue_this_repr; |
236 | 144 | } |
237 | | |
238 | | static const MVMREPROps ConcBlockingQueue_this_repr = { |
239 | | type_object_for, |
240 | | MVM_gc_allocate_object, |
241 | | initialize, |
242 | | copy_to, |
243 | | MVM_REPR_DEFAULT_ATTR_FUNCS, |
244 | | MVM_REPR_DEFAULT_BOX_FUNCS, |
245 | | { |
246 | | at_pos, |
247 | | MVM_REPR_DEFAULT_BIND_POS, |
248 | | MVM_REPR_DEFAULT_SET_ELEMS, |
249 | | push, |
250 | | MVM_REPR_DEFAULT_POP, |
251 | | MVM_REPR_DEFAULT_UNSHIFT, |
252 | | shift, |
253 | | MVM_REPR_DEFAULT_SLICE, |
254 | | MVM_REPR_DEFAULT_SPLICE, |
255 | | MVM_REPR_DEFAULT_AT_POS_MULTIDIM, |
256 | | MVM_REPR_DEFAULT_BIND_POS_MULTIDIM, |
257 | | MVM_REPR_DEFAULT_DIMENSIONS, |
258 | | MVM_REPR_DEFAULT_SET_DIMENSIONS, |
259 | | MVM_REPR_DEFAULT_GET_ELEM_STORAGE_SPEC, |
260 | | MVM_REPR_DEFAULT_POS_AS_ATOMIC, |
261 | | MVM_REPR_DEFAULT_POS_AS_ATOMIC_MULTIDIM |
262 | | }, /* pos_funcs */ |
263 | | MVM_REPR_DEFAULT_ASS_FUNCS, |
264 | | elems, |
265 | | get_storage_spec, |
266 | | NULL, /* change_type */ |
267 | | NULL, /* serialize */ |
268 | | NULL, /* deserialize */ |
269 | | NULL, /* serialize_repr_data */ |
270 | | NULL, /* deserialize_repr_data */ |
271 | | deserialize_stable_size, |
272 | | gc_mark, |
273 | | gc_free, |
274 | | NULL, /* gc_cleanup */ |
275 | | NULL, /* gc_mark_repr_data */ |
276 | | NULL, /* gc_free_repr_data */ |
277 | | compose, |
278 | | NULL, /* spesh */ |
279 | | "ConcBlockingQueue", /* name */ |
280 | | MVM_REPR_ID_ConcBlockingQueue, |
281 | | NULL, /* unmanaged_size */ |
282 | | NULL, /* describe_refs */ |
283 | | }; |
284 | | |
285 | 0 | MVMObject * MVM_concblockingqueue_jit_poll(MVMThreadContext *tc, MVMObject *queue) { |
286 | 0 | if (REPR(queue)->ID == MVM_REPR_ID_ConcBlockingQueue && IS_CONCRETE(queue)) |
287 | 0 | return MVM_concblockingqueue_poll(tc, (MVMConcBlockingQueue *)queue); |
288 | 0 | else |
289 | 0 | MVM_exception_throw_adhoc(tc, |
290 | 0 | "queuepoll requires a concrete object with REPR ConcBlockingQueue"); |
291 | 0 | } |
292 | | |
293 | | /* Polls a queue for a value, returning NULL if none is available. */ |
294 | 42 | MVMObject * MVM_concblockingqueue_poll(MVMThreadContext *tc, MVMConcBlockingQueue *queue) { |
295 | 42 | MVMConcBlockingQueue *cbq = (MVMConcBlockingQueue *)queue; |
296 | 42 | MVMConcBlockingQueueNode *taken; |
297 | 42 | MVMObject *result = tc->instance->VMNull; |
298 | 42 | unsigned int interval_id; |
299 | 42 | |
300 | 42 | interval_id = MVM_telemetry_interval_start(tc, "ConcBlockingQueue.poll"); |
301 | 42 | MVMROOT(tc, cbq, { |
302 | 42 | MVM_gc_mark_thread_blocked(tc); |
303 | 42 | uv_mutex_lock(&cbq->body.locks->head_lock); |
304 | 42 | MVM_gc_mark_thread_unblocked(tc); |
305 | 42 | }); |
306 | 42 | |
307 | 42 | if (MVM_load(&cbq->body.elems) > 0) { |
308 | 21 | taken = cbq->body.head->next; |
309 | 21 | MVM_free(cbq->body.head); |
310 | 21 | cbq->body.head = taken; |
311 | 21 | MVM_barrier(); |
312 | 21 | result = taken->value; |
313 | 21 | taken->value = NULL; |
314 | 21 | MVM_barrier(); |
315 | 21 | if (MVM_decr(&cbq->body.elems) > 1) |
316 | 7 | uv_cond_signal(&cbq->body.locks->head_cond); |
317 | 21 | } |
318 | 42 | |
319 | 42 | uv_mutex_unlock(&cbq->body.locks->head_lock); |
320 | 42 | |
321 | 42 | MVM_telemetry_interval_stop(tc, interval_id, "ConcBlockingQueue.poll"); |
322 | 42 | return result; |
323 | 42 | } |