/home/travis/build/MoarVM/MoarVM/src/6model/reprs/ConcBlockingQueue.c
Line | Count | Source (jump to first uncovered line) |
1 | | #include "moar.h" |
2 | | |
3 | | /* This representation's function pointer table. */ |
4 | | static const MVMREPROps ConcBlockingQueue_this_repr; |
5 | | |
6 | | /* Creates a new type object of this representation, and associates it with |
7 | | * the given HOW. */ |
8 | 130 | static MVMObject * type_object_for(MVMThreadContext *tc, MVMObject *HOW) { |
9 | 130 | MVMSTable *st = MVM_gc_allocate_stable(tc, &ConcBlockingQueue_this_repr, HOW); |
10 | 130 | |
11 | 130 | MVMROOT(tc, st, { |
12 | 130 | MVMObject *obj = MVM_gc_allocate_type_object(tc, st); |
13 | 130 | MVM_ASSIGN_REF(tc, &(st->header), st->WHAT, obj); |
14 | 130 | st->size = sizeof(MVMConcBlockingQueue); |
15 | 130 | }); |
16 | 130 | |
17 | 130 | return st->WHAT; |
18 | 130 | } |
19 | | |
20 | | /* Initializes a new instance. */ |
21 | 0 | static void initialize(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data) { |
22 | 0 | MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; |
23 | 0 |
|
24 | 0 | /* Initialize locks. */ |
25 | 0 | int init_stat; |
26 | 0 | cbq->locks = MVM_calloc(1, sizeof(MVMConcBlockingQueueLocks)); |
27 | 0 | if ((init_stat = uv_mutex_init(&cbq->locks->head_lock)) < 0) |
28 | 0 | MVM_exception_throw_adhoc(tc, "Failed to initialize mutex: %s", |
29 | 0 | uv_strerror(init_stat)); |
30 | 0 | if ((init_stat = uv_mutex_init(&cbq->locks->tail_lock)) < 0) |
31 | 0 | MVM_exception_throw_adhoc(tc, "Failed to initialize mutex: %s", |
32 | 0 | uv_strerror(init_stat)); |
33 | 0 | if ((init_stat = uv_cond_init(&cbq->locks->head_cond)) < 0) |
34 | 0 | MVM_exception_throw_adhoc(tc, "Failed to initialize condition variable: %s", |
35 | 0 | uv_strerror(init_stat)); |
36 | 0 |
|
37 | 0 | /* Head and tail point to a null node. */ |
38 | 0 | cbq->tail = cbq->head = MVM_calloc(1, sizeof(MVMConcBlockingQueueNode)); |
39 | 0 | } |
40 | | |
41 | | /* Copies the body of one object to another. */ |
42 | 0 | static void copy_to(MVMThreadContext *tc, MVMSTable *st, void *src, MVMObject *dest_root, void *dest) { |
43 | 0 | MVM_exception_throw_adhoc(tc, "Cannot copy object with representation ConcBlockingQueue"); |
44 | 0 | } |
45 | | |
46 | | /* Called by the VM to mark any GCable items. */ |
47 | 0 | static void gc_mark(MVMThreadContext *tc, MVMSTable *st, void *data, MVMGCWorklist *worklist) { |
48 | 0 | /* At this point we know the world is stopped, and thus we can safely do a |
49 | 0 | * traversal of the data structure without needing locks. */ |
50 | 0 | MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; |
51 | 0 | MVMConcBlockingQueueNode *cur = cbq->head; |
52 | 0 | while (cur) { |
53 | 0 | MVM_gc_worklist_add(tc, worklist, &cur->value); |
54 | 0 | cur = cur->next; |
55 | 0 | } |
56 | 0 | } |
57 | | |
58 | | /* Called by the VM in order to free memory associated with this object. */ |
59 | 0 | static void gc_free(MVMThreadContext *tc, MVMObject *obj) { |
60 | 0 | MVMConcBlockingQueue *cbq = (MVMConcBlockingQueue *)obj; |
61 | 0 |
|
62 | 0 | /* First, free all the nodes. */ |
63 | 0 | MVMConcBlockingQueueNode *cur = cbq->body.head; |
64 | 0 | while (cur) { |
65 | 0 | MVMConcBlockingQueueNode *next = cur->next; |
66 | 0 | MVM_free(cur); |
67 | 0 | cur = next; |
68 | 0 | } |
69 | 0 | cbq->body.head = cbq->body.tail = NULL; |
70 | 0 |
|
71 | 0 | /* Clean up locks. */ |
72 | 0 | uv_mutex_destroy(&cbq->body.locks->head_lock); |
73 | 0 | uv_mutex_destroy(&cbq->body.locks->tail_lock); |
74 | 0 | uv_cond_destroy(&cbq->body.locks->head_cond); |
75 | 0 | MVM_free(cbq->body.locks); |
76 | 0 | cbq->body.locks = NULL; |
77 | 0 | } |
78 | | |
79 | | static const MVMStorageSpec storage_spec = { |
80 | | MVM_STORAGE_SPEC_REFERENCE, /* inlineable */ |
81 | | 0, /* bits */ |
82 | | 0, /* align */ |
83 | | MVM_STORAGE_SPEC_BP_NONE, /* boxed_primitive */ |
84 | | 0, /* can_box */ |
85 | | 0, /* is_unsigned */ |
86 | | }; |
87 | | |
88 | | /* Gets the storage specification for this representation. */ |
89 | 0 | static const MVMStorageSpec * get_storage_spec(MVMThreadContext *tc, MVMSTable *st) { |
90 | 0 | return &storage_spec; |
91 | 0 | } |
92 | | |
93 | | /* Compose the representation. */ |
94 | 0 | static void compose(MVMThreadContext *tc, MVMSTable *st, MVMObject *info) { |
95 | 0 | /* Nothing to do for this REPR. */ |
96 | 0 | } |
97 | | |
98 | 0 | static void at_pos(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMint64 index, MVMRegister *value, MVMuint16 kind) { |
99 | 0 | MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; |
100 | 0 |
|
101 | 0 | if (index != 0) |
102 | 0 | MVM_exception_throw_adhoc(tc, |
103 | 0 | "Can only request (peek) head of a concurrent blocking queue"); |
104 | 0 | if (kind != MVM_reg_obj) |
105 | 0 | MVM_exception_throw_adhoc(tc, |
106 | 0 | "Can only get objects from a concurrent blocking queue"); |
107 | 0 |
|
108 | 0 | if (MVM_load(&cbq->elems) > 0) { |
109 | 0 | MVMConcBlockingQueueNode *peeked; |
110 | 0 | MVMROOT(tc, root, { |
111 | 0 | MVM_gc_mark_thread_blocked(tc); |
112 | 0 | uv_mutex_lock(&cbq->locks->head_lock); |
113 | 0 | MVM_gc_mark_thread_unblocked(tc); |
114 | 0 | data = OBJECT_BODY(root); |
115 | 0 | cbq = (MVMConcBlockingQueueBody *)data; |
116 | 0 | }); |
117 | 0 | peeked = cbq->head->next; |
118 | 0 | value->o = peeked ? peeked->value : tc->instance->VMNull; |
119 | 0 | uv_mutex_unlock(&cbq->locks->head_lock); |
120 | 0 | } |
121 | 0 | else { |
122 | 0 | value->o = tc->instance->VMNull; |
123 | 0 | } |
124 | 0 | } |
125 | | |
126 | 0 | static MVMuint64 elems(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data) { |
127 | 0 | MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; |
128 | 0 | return MVM_load(cbq->elems); |
129 | 0 | } |
130 | | |
131 | 0 | static void push(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMRegister value, MVMuint16 kind) { |
132 | 0 | MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; |
133 | 0 | MVMConcBlockingQueueNode *add; |
134 | 0 | AO_t orig_elems; |
135 | 0 | MVMObject *to_add = value.o; |
136 | 0 |
|
137 | 0 | if (kind != MVM_reg_obj) |
138 | 0 | MVM_exception_throw_adhoc(tc, |
139 | 0 | "Can only push objects to a concurrent blocking queue"); |
140 | 0 | if (value.o == NULL) |
141 | 0 | MVM_exception_throw_adhoc(tc, |
142 | 0 | "Cannot store a null value in a concurrent blocking queue"); |
143 | 0 |
|
144 | 0 | add = MVM_calloc(1, sizeof(MVMConcBlockingQueueNode)); |
145 | 0 |
|
146 | 0 | MVMROOT(tc, root, { |
147 | 0 | MVMROOT(tc, to_add, { |
148 | 0 | MVM_gc_mark_thread_blocked(tc); |
149 | 0 | uv_mutex_lock(&cbq->locks->tail_lock); |
150 | 0 | MVM_gc_mark_thread_unblocked(tc); |
151 | 0 | }); |
152 | 0 | }); |
153 | 0 | data = OBJECT_BODY(root); |
154 | 0 | cbq = (MVMConcBlockingQueueBody *)data; |
155 | 0 | MVM_ASSIGN_REF(tc, &(root->header), add->value, to_add); |
156 | 0 | cbq->tail->next = add; |
157 | 0 | cbq->tail = add; |
158 | 0 | orig_elems = MVM_incr(&cbq->elems); |
159 | 0 | uv_mutex_unlock(&cbq->locks->tail_lock); |
160 | 0 |
|
161 | 0 | if (orig_elems == 0) { |
162 | 0 | MVMROOT(tc, root, { |
163 | 0 | MVM_gc_mark_thread_blocked(tc); |
164 | 0 | uv_mutex_lock(&cbq->locks->head_lock); |
165 | 0 | MVM_gc_mark_thread_unblocked(tc); |
166 | 0 | }); |
167 | 0 | data = OBJECT_BODY(root); |
168 | 0 | cbq = (MVMConcBlockingQueueBody *)data; |
169 | 0 | uv_cond_signal(&cbq->locks->head_cond); |
170 | 0 | uv_mutex_unlock(&cbq->locks->head_lock); |
171 | 0 | } |
172 | 0 | } |
173 | | |
174 | 0 | static void shift(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMRegister *value, MVMuint16 kind) { |
175 | 0 | MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data; |
176 | 0 | MVMConcBlockingQueueNode *taken; |
177 | 0 |
|
178 | 0 | if (kind != MVM_reg_obj) |
179 | 0 | MVM_exception_throw_adhoc(tc, "Can only shift objects from a ConcBlockingQueue"); |
180 | 0 |
|
181 | 0 | MVMROOT(tc, root, { |
182 | 0 | MVM_gc_mark_thread_blocked(tc); |
183 | 0 | uv_mutex_lock(&cbq->locks->head_lock); |
184 | 0 | MVM_gc_mark_thread_unblocked(tc); |
185 | 0 | data = OBJECT_BODY(root); |
186 | 0 | cbq = (MVMConcBlockingQueueBody *)data; |
187 | 0 |
|
188 | 0 | while (MVM_load(&cbq->elems) == 0) { |
189 | 0 | MVM_gc_mark_thread_blocked(tc); |
190 | 0 | uv_cond_wait(&cbq->locks->head_cond, &cbq->locks->head_lock); |
191 | 0 | MVM_gc_mark_thread_unblocked(tc); |
192 | 0 | data = OBJECT_BODY(root); |
193 | 0 | cbq = (MVMConcBlockingQueueBody *)data; |
194 | 0 | } |
195 | 0 | }); |
196 | 0 |
|
197 | 0 | taken = cbq->head->next; |
198 | 0 | MVM_free(cbq->head); |
199 | 0 | cbq->head = taken; |
200 | 0 | MVM_barrier(); |
201 | 0 | value->o = taken->value; |
202 | 0 | taken->value = NULL; |
203 | 0 | MVM_barrier(); |
204 | 0 |
|
205 | 0 | if (MVM_decr(&cbq->elems) > 1) |
206 | 0 | uv_cond_signal(&cbq->locks->head_cond); |
207 | 0 |
|
208 | 0 | uv_mutex_unlock(&cbq->locks->head_lock); |
209 | 0 | } |
210 | | |
211 | | /* Set the size of the STable. */ |
212 | 0 | static void deserialize_stable_size(MVMThreadContext *tc, MVMSTable *st, MVMSerializationReader *reader) { |
213 | 0 | st->size = sizeof(MVMConcBlockingQueue); |
214 | 0 | } |
215 | | |
216 | | /* Initializes the representation. */ |
217 | 130 | const MVMREPROps * MVMConcBlockingQueue_initialize(MVMThreadContext *tc) { |
218 | 130 | return &ConcBlockingQueue_this_repr; |
219 | 130 | } |
220 | | |
221 | | static const MVMREPROps ConcBlockingQueue_this_repr = { |
222 | | type_object_for, |
223 | | MVM_gc_allocate_object, |
224 | | initialize, |
225 | | copy_to, |
226 | | MVM_REPR_DEFAULT_ATTR_FUNCS, |
227 | | MVM_REPR_DEFAULT_BOX_FUNCS, |
228 | | { |
229 | | at_pos, |
230 | | MVM_REPR_DEFAULT_BIND_POS, |
231 | | MVM_REPR_DEFAULT_SET_ELEMS, |
232 | | push, |
233 | | MVM_REPR_DEFAULT_POP, |
234 | | MVM_REPR_DEFAULT_UNSHIFT, |
235 | | shift, |
236 | | MVM_REPR_DEFAULT_SPLICE, |
237 | | MVM_REPR_DEFAULT_AT_POS_MULTIDIM, |
238 | | MVM_REPR_DEFAULT_BIND_POS_MULTIDIM, |
239 | | MVM_REPR_DEFAULT_DIMENSIONS, |
240 | | MVM_REPR_DEFAULT_SET_DIMENSIONS, |
241 | | MVM_REPR_DEFAULT_GET_ELEM_STORAGE_SPEC |
242 | | }, /* pos_funcs */ |
243 | | MVM_REPR_DEFAULT_ASS_FUNCS, |
244 | | elems, |
245 | | get_storage_spec, |
246 | | NULL, /* change_type */ |
247 | | NULL, /* serialize */ |
248 | | NULL, /* deserialize */ |
249 | | NULL, /* serialize_repr_data */ |
250 | | NULL, /* deserialize_repr_data */ |
251 | | deserialize_stable_size, |
252 | | gc_mark, |
253 | | gc_free, |
254 | | NULL, /* gc_cleanup */ |
255 | | NULL, /* gc_mark_repr_data */ |
256 | | NULL, /* gc_free_repr_data */ |
257 | | compose, |
258 | | NULL, /* spesh */ |
259 | | "ConcBlockingQueue", /* name */ |
260 | | MVM_REPR_ID_ConcBlockingQueue, |
261 | | NULL, /* unmanaged_size */ |
262 | | NULL, /* describe_refs */ |
263 | | }; |
264 | | |
265 | | /* Polls a queue for a value, returning NULL if none is available. */ |
266 | 0 | MVMObject * MVM_concblockingqueue_poll(MVMThreadContext *tc, MVMConcBlockingQueue *queue) { |
267 | 0 | MVMConcBlockingQueue *cbq = (MVMConcBlockingQueue *)queue; |
268 | 0 | MVMConcBlockingQueueNode *taken; |
269 | 0 | MVMObject *result = tc->instance->VMNull; |
270 | 0 |
|
271 | 0 | MVMROOT(tc, cbq, { |
272 | 0 | MVM_gc_mark_thread_blocked(tc); |
273 | 0 | uv_mutex_lock(&cbq->body.locks->head_lock); |
274 | 0 | MVM_gc_mark_thread_unblocked(tc); |
275 | 0 | }); |
276 | 0 |
|
277 | 0 | if (MVM_load(&cbq->body.elems) > 0) { |
278 | 0 | taken = cbq->body.head->next; |
279 | 0 | MVM_free(cbq->body.head); |
280 | 0 | cbq->body.head = taken; |
281 | 0 | MVM_barrier(); |
282 | 0 | result = taken->value; |
283 | 0 | taken->value = NULL; |
284 | 0 | MVM_barrier(); |
285 | 0 | if (MVM_decr(&cbq->body.elems) > 1) |
286 | 0 | uv_cond_signal(&cbq->body.locks->head_cond); |
287 | 0 | } |
288 | 0 |
|
289 | 0 | uv_mutex_unlock(&cbq->body.locks->head_lock); |
290 | 0 |
|
291 | 0 | return result; |
292 | 0 | } |