Coverage Report

Created: 2018-07-03 15:31

/home/travis/build/MoarVM/MoarVM/src/6model/reprs/ConcBlockingQueue.c
Line
Count
Source (jump to first uncovered line)
1
#include "moar.h"
2
3
/* This representation's function pointer table. */
4
static const MVMREPROps ConcBlockingQueue_this_repr;
5
6
/* Creates a new type object of this representation, and associates it with
7
 * the given HOW. */
8
145
static MVMObject * type_object_for(MVMThreadContext *tc, MVMObject *HOW) {
9
145
    MVMSTable *st  = MVM_gc_allocate_stable(tc, &ConcBlockingQueue_this_repr, HOW);
10
145
11
145
    MVMROOT(tc, st, {
12
145
        MVMObject *obj = MVM_gc_allocate_type_object(tc, st);
13
145
        MVM_ASSIGN_REF(tc, &(st->header), st->WHAT, obj);
14
145
        st->size = sizeof(MVMConcBlockingQueue);
15
145
    });
16
145
17
145
    return st->WHAT;
18
145
}
19
20
/* Initializes a new instance. */
21
159
static void initialize(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data) {
22
159
    MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
23
159
24
159
    /* Initialize locks. */
25
159
    int init_stat;
26
159
    cbq->locks = MVM_calloc(1, sizeof(MVMConcBlockingQueueLocks));
27
159
    if ((init_stat = uv_mutex_init(&cbq->locks->head_lock)) < 0)
28
0
        MVM_exception_throw_adhoc(tc, "Failed to initialize mutex: %s",
29
0
            uv_strerror(init_stat));
30
159
    if ((init_stat = uv_mutex_init(&cbq->locks->tail_lock)) < 0)
31
0
        MVM_exception_throw_adhoc(tc, "Failed to initialize mutex: %s",
32
0
            uv_strerror(init_stat));
33
159
    if ((init_stat = uv_cond_init(&cbq->locks->head_cond)) < 0)
34
0
        MVM_exception_throw_adhoc(tc, "Failed to initialize condition variable: %s",
35
0
            uv_strerror(init_stat));
36
159
37
159
    /* Head and tail point to a null node. */
38
159
    cbq->tail = cbq->head = MVM_calloc(1, sizeof(MVMConcBlockingQueueNode));
39
159
}
40
41
/* Copies the body of one object to another. */
42
0
static void copy_to(MVMThreadContext *tc, MVMSTable *st, void *src, MVMObject *dest_root, void *dest) {
43
0
    MVM_exception_throw_adhoc(tc, "Cannot copy object with representation ConcBlockingQueue");
44
0
}
45
46
/* Called by the VM to mark any GCable items. */
47
143
static void gc_mark(MVMThreadContext *tc, MVMSTable *st, void *data, MVMGCWorklist *worklist) {
48
143
    /* At this point we know the world is stopped, and thus we can safely do a
49
143
     * traversal of the data structure without needing locks. */
50
143
    MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
51
143
    MVMConcBlockingQueueNode *cur = cbq->head;
52
327
    while (cur) {
53
184
        MVM_gc_worklist_add(tc, worklist, &cur->value);
54
184
        cur = cur->next;
55
184
    }
56
143
}
57
58
/* Called by the VM in order to free memory associated with this object. */
59
0
static void gc_free(MVMThreadContext *tc, MVMObject *obj) {
60
0
    MVMConcBlockingQueue *cbq = (MVMConcBlockingQueue *)obj;
61
0
62
0
    /* First, free all the nodes. */
63
0
    MVMConcBlockingQueueNode *cur = cbq->body.head;
64
0
    while (cur) {
65
0
        MVMConcBlockingQueueNode *next = cur->next;
66
0
        MVM_free(cur);
67
0
        cur = next;
68
0
    }
69
0
    cbq->body.head = cbq->body.tail = NULL;
70
0
71
0
    /* Clean up locks. */
72
0
    uv_mutex_destroy(&cbq->body.locks->head_lock);
73
0
    uv_mutex_destroy(&cbq->body.locks->tail_lock);
74
0
    uv_cond_destroy(&cbq->body.locks->head_cond);
75
0
    MVM_free(cbq->body.locks);
76
0
    cbq->body.locks = NULL;
77
0
}
78
79
static const MVMStorageSpec storage_spec = {
80
    MVM_STORAGE_SPEC_REFERENCE, /* inlineable */
81
    0,                          /* bits */
82
    0,                          /* align */
83
    MVM_STORAGE_SPEC_BP_NONE,   /* boxed_primitive */
84
    0,                          /* can_box */
85
    0,                          /* is_unsigned */
86
};
87
88
/* Gets the storage specification for this representation. */
89
0
static const MVMStorageSpec * get_storage_spec(MVMThreadContext *tc, MVMSTable *st) {
90
0
    return &storage_spec;
91
0
}
92
93
/* Compose the representation. */
94
1
static void compose(MVMThreadContext *tc, MVMSTable *st, MVMObject *info) {
95
1
    /* Nothing to do for this REPR. */
96
1
}
97
98
0
static void at_pos(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMint64 index, MVMRegister *value, MVMuint16 kind) {
99
0
    MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
100
0
101
0
    if (index != 0)
102
0
        MVM_exception_throw_adhoc(tc,
103
0
            "Can only request (peek) head of a concurrent blocking queue");
104
0
    if (kind != MVM_reg_obj)
105
0
        MVM_exception_throw_adhoc(tc,
106
0
            "Can only get objects from a concurrent blocking queue");
107
0
108
0
    if (MVM_load(&cbq->elems) > 0) {
109
0
        MVMConcBlockingQueueNode *peeked;
110
0
        unsigned int interval_id;
111
0
        interval_id = MVM_telemetry_interval_start(tc, "ConcBlockingQueue.at_pos");
112
0
        MVMROOT(tc, root, {
113
0
            MVM_gc_mark_thread_blocked(tc);
114
0
            data = OBJECT_BODY(root);
115
0
            cbq = (MVMConcBlockingQueueBody *)data;
116
0
            uv_mutex_lock(&cbq->locks->head_lock);
117
0
            MVM_gc_mark_thread_unblocked(tc);
118
0
            data = OBJECT_BODY(root);
119
0
            cbq = (MVMConcBlockingQueueBody *)data;
120
0
        });
121
0
        peeked = cbq->head->next;
122
0
        value->o = peeked ? peeked->value : tc->instance->VMNull;
123
0
        uv_mutex_unlock(&cbq->locks->head_lock);
124
0
        MVM_telemetry_interval_stop(tc, interval_id, "ConcBlockingQueue.at_pos");
125
0
    }
126
0
    else {
127
0
        value->o = tc->instance->VMNull;
128
0
    }
129
0
}
130
131
0
static MVMuint64 elems(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data) {
132
0
    MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
133
0
    return MVM_load(&(cbq->elems));
134
0
}
135
136
1.75k
static void push(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMRegister value, MVMuint16 kind) {
137
1.75k
    MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
138
1.75k
    MVMConcBlockingQueueNode *add;
139
1.75k
    AO_t orig_elems;
140
1.75k
    MVMObject *to_add = value.o;
141
1.75k
    unsigned int interval_id;
142
1.75k
143
1.75k
    if (kind != MVM_reg_obj)
144
0
        MVM_exception_throw_adhoc(tc,
145
0
            "Can only push objects to a concurrent blocking queue");
146
1.75k
    if (value.o == NULL)
147
0
        MVM_exception_throw_adhoc(tc,
148
0
            "Cannot store a null value in a concurrent blocking queue");
149
1.75k
150
1.75k
    add = MVM_calloc(1, sizeof(MVMConcBlockingQueueNode));
151
1.75k
152
1.75k
    interval_id = MVM_telemetry_interval_start(tc, "ConcBlockingQueue.push");
153
1.75k
    MVMROOT2(tc, root, to_add, {
154
1.75k
        MVM_gc_mark_thread_blocked(tc);
155
1.75k
        data = OBJECT_BODY(root);
156
1.75k
        cbq = (MVMConcBlockingQueueBody *)data;
157
1.75k
        uv_mutex_lock(&cbq->locks->tail_lock);
158
1.75k
        MVM_gc_mark_thread_unblocked(tc);
159
1.75k
        data = OBJECT_BODY(root);
160
1.75k
        cbq = (MVMConcBlockingQueueBody *)data;
161
1.75k
    });
162
1.75k
    MVM_ASSIGN_REF(tc, &(root->header), add->value, to_add);
163
1.75k
    cbq->tail->next = add;
164
1.75k
    cbq->tail = add;
165
1.75k
    orig_elems = MVM_incr(&cbq->elems);
166
1.75k
    uv_mutex_unlock(&cbq->locks->tail_lock);
167
1.75k
168
1.75k
    if (orig_elems == 0) {
169
1.72k
        MVMROOT(tc, root, {
170
1.72k
            MVM_gc_mark_thread_blocked(tc);
171
1.72k
            data = OBJECT_BODY(root);
172
1.72k
            cbq = (MVMConcBlockingQueueBody *)data;
173
1.72k
            uv_mutex_lock(&cbq->locks->head_lock);
174
1.72k
            MVM_gc_mark_thread_unblocked(tc);
175
1.72k
            data = OBJECT_BODY(root);
176
1.72k
            cbq = (MVMConcBlockingQueueBody *)data;
177
1.72k
        });
178
1.72k
        uv_cond_signal(&cbq->locks->head_cond);
179
1.72k
        uv_mutex_unlock(&cbq->locks->head_lock);
180
1.72k
    }
181
1.75k
    MVM_telemetry_interval_stop(tc, interval_id, "ConcBlockingQueue.push");
182
1.75k
}
183
184
1.65k
static void shift(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMRegister *value, MVMuint16 kind) {
185
1.65k
    MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
186
1.65k
    MVMConcBlockingQueueNode *taken;
187
1.65k
    unsigned int interval_id;
188
1.65k
189
1.65k
    if (kind != MVM_reg_obj)
190
0
        MVM_exception_throw_adhoc(tc, "Can only shift objects from a ConcBlockingQueue");
191
1.65k
192
1.65k
    interval_id = MVM_telemetry_interval_start(tc, "ConcBlockingQueue.shift");
193
1.65k
    MVMROOT(tc, root, {
194
1.65k
        MVM_gc_mark_thread_blocked(tc);
195
1.65k
        data = OBJECT_BODY(root);
196
1.65k
        cbq = (MVMConcBlockingQueueBody *)data;
197
1.65k
        uv_mutex_lock(&cbq->locks->head_lock);
198
1.65k
        MVM_gc_mark_thread_unblocked(tc);
199
1.65k
        data = OBJECT_BODY(root);
200
1.65k
        cbq  = (MVMConcBlockingQueueBody *)data;
201
1.65k
202
1.65k
        while (MVM_load(&cbq->elems) == 0) {
203
1.65k
                MVM_gc_mark_thread_blocked(tc);
204
1.65k
                data = OBJECT_BODY(root);
205
1.65k
                cbq = (MVMConcBlockingQueueBody *)data;
206
1.65k
                uv_cond_wait(&cbq->locks->head_cond, &cbq->locks->head_lock);
207
1.65k
                MVM_gc_mark_thread_unblocked(tc);
208
1.65k
                data = OBJECT_BODY(root);
209
1.65k
                cbq  = (MVMConcBlockingQueueBody *)data;
210
1.65k
        }
211
1.65k
    });
212
1.65k
213
1.65k
    taken = cbq->head->next;
214
1.65k
    MVM_free(cbq->head);
215
1.65k
    cbq->head = taken;
216
1.65k
    MVM_barrier();
217
1.65k
    value->o = taken->value;
218
1.65k
    taken->value = NULL;
219
1.65k
    MVM_barrier();
220
1.65k
221
1.65k
    if (MVM_decr(&cbq->elems) > 1)
222
25
        uv_cond_signal(&cbq->locks->head_cond);
223
1.65k
224
1.65k
    uv_mutex_unlock(&cbq->locks->head_lock);
225
1.65k
    MVM_telemetry_interval_stop(tc, interval_id, "ConcBlockingQueue.shift");
226
1.65k
}
227
228
/* Set the size of the STable. */
229
2
static void deserialize_stable_size(MVMThreadContext *tc, MVMSTable *st, MVMSerializationReader *reader) {
230
2
    st->size = sizeof(MVMConcBlockingQueue);
231
2
}
232
233
/* Initializes the representation. */
234
144
const MVMREPROps * MVMConcBlockingQueue_initialize(MVMThreadContext *tc) {
235
144
    return &ConcBlockingQueue_this_repr;
236
144
}
237
238
static const MVMREPROps ConcBlockingQueue_this_repr = {
239
    type_object_for,
240
    MVM_gc_allocate_object,
241
    initialize,
242
    copy_to,
243
    MVM_REPR_DEFAULT_ATTR_FUNCS,
244
    MVM_REPR_DEFAULT_BOX_FUNCS,
245
    {
246
        at_pos,
247
        MVM_REPR_DEFAULT_BIND_POS,
248
        MVM_REPR_DEFAULT_SET_ELEMS,
249
        push,
250
        MVM_REPR_DEFAULT_POP,
251
        MVM_REPR_DEFAULT_UNSHIFT,
252
        shift,
253
        MVM_REPR_DEFAULT_SLICE,
254
        MVM_REPR_DEFAULT_SPLICE,
255
        MVM_REPR_DEFAULT_AT_POS_MULTIDIM,
256
        MVM_REPR_DEFAULT_BIND_POS_MULTIDIM,
257
        MVM_REPR_DEFAULT_DIMENSIONS,
258
        MVM_REPR_DEFAULT_SET_DIMENSIONS,
259
        MVM_REPR_DEFAULT_GET_ELEM_STORAGE_SPEC,
260
        MVM_REPR_DEFAULT_POS_AS_ATOMIC,
261
        MVM_REPR_DEFAULT_POS_AS_ATOMIC_MULTIDIM
262
    },    /* pos_funcs */
263
    MVM_REPR_DEFAULT_ASS_FUNCS,
264
    elems,
265
    get_storage_spec,
266
    NULL, /* change_type */
267
    NULL, /* serialize */
268
    NULL, /* deserialize */
269
    NULL, /* serialize_repr_data */
270
    NULL, /* deserialize_repr_data */
271
    deserialize_stable_size,
272
    gc_mark,
273
    gc_free,
274
    NULL, /* gc_cleanup */
275
    NULL, /* gc_mark_repr_data */
276
    NULL, /* gc_free_repr_data */
277
    compose,
278
    NULL, /* spesh */
279
    "ConcBlockingQueue", /* name */
280
    MVM_REPR_ID_ConcBlockingQueue,
281
    NULL, /* unmanaged_size */
282
    NULL, /* describe_refs */
283
};
284
285
0
MVMObject * MVM_concblockingqueue_jit_poll(MVMThreadContext *tc, MVMObject *queue) {
286
0
    if (REPR(queue)->ID == MVM_REPR_ID_ConcBlockingQueue && IS_CONCRETE(queue))
287
0
        return MVM_concblockingqueue_poll(tc, (MVMConcBlockingQueue *)queue);
288
0
    else
289
0
        MVM_exception_throw_adhoc(tc,
290
0
                "queuepoll requires a concrete object with REPR ConcBlockingQueue");
291
0
}
292
293
/* Polls a queue for a value, returning NULL if none is available. */
294
42
MVMObject * MVM_concblockingqueue_poll(MVMThreadContext *tc, MVMConcBlockingQueue *queue) {
295
42
    MVMConcBlockingQueue *cbq = (MVMConcBlockingQueue *)queue;
296
42
    MVMConcBlockingQueueNode *taken;
297
42
    MVMObject *result = tc->instance->VMNull;
298
42
    unsigned int interval_id;
299
42
300
42
    interval_id = MVM_telemetry_interval_start(tc, "ConcBlockingQueue.poll");
301
42
    MVMROOT(tc, cbq, {
302
42
        MVM_gc_mark_thread_blocked(tc);
303
42
        uv_mutex_lock(&cbq->body.locks->head_lock);
304
42
        MVM_gc_mark_thread_unblocked(tc);
305
42
    });
306
42
307
42
    if (MVM_load(&cbq->body.elems) > 0) {
308
21
        taken = cbq->body.head->next;
309
21
        MVM_free(cbq->body.head);
310
21
        cbq->body.head = taken;
311
21
        MVM_barrier();
312
21
        result = taken->value;
313
21
        taken->value = NULL;
314
21
        MVM_barrier();
315
21
        if (MVM_decr(&cbq->body.elems) > 1)
316
7
            uv_cond_signal(&cbq->body.locks->head_cond);
317
21
    }
318
42
319
42
    uv_mutex_unlock(&cbq->body.locks->head_lock);
320
42
321
42
    MVM_telemetry_interval_stop(tc, interval_id, "ConcBlockingQueue.poll");
322
42
    return result;
323
42
}