Coverage Report

Created: 2017-04-15 07:07

/home/travis/build/MoarVM/MoarVM/src/6model/reprs/ConcBlockingQueue.c
Line
Count
Source (jump to first uncovered line)
1
#include "moar.h"
2
3
/* This representation's function pointer table. */
4
static const MVMREPROps ConcBlockingQueue_this_repr;
5
6
/* Creates a new type object of this representation, and associates it with
7
 * the given HOW. */
8
130
static MVMObject * type_object_for(MVMThreadContext *tc, MVMObject *HOW) {
9
130
    MVMSTable *st  = MVM_gc_allocate_stable(tc, &ConcBlockingQueue_this_repr, HOW);
10
130
11
130
    MVMROOT(tc, st, {
12
130
        MVMObject *obj = MVM_gc_allocate_type_object(tc, st);
13
130
        MVM_ASSIGN_REF(tc, &(st->header), st->WHAT, obj);
14
130
        st->size = sizeof(MVMConcBlockingQueue);
15
130
    });
16
130
17
130
    return st->WHAT;
18
130
}
19
20
/* Initializes a new instance. */
21
0
static void initialize(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data) {
22
0
    MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
23
0
24
0
    /* Initialize locks. */
25
0
    int init_stat;
26
0
    cbq->locks = MVM_calloc(1, sizeof(MVMConcBlockingQueueLocks));
27
0
    if ((init_stat = uv_mutex_init(&cbq->locks->head_lock)) < 0)
28
0
        MVM_exception_throw_adhoc(tc, "Failed to initialize mutex: %s",
29
0
            uv_strerror(init_stat));
30
0
    if ((init_stat = uv_mutex_init(&cbq->locks->tail_lock)) < 0)
31
0
        MVM_exception_throw_adhoc(tc, "Failed to initialize mutex: %s",
32
0
            uv_strerror(init_stat));
33
0
    if ((init_stat = uv_cond_init(&cbq->locks->head_cond)) < 0)
34
0
        MVM_exception_throw_adhoc(tc, "Failed to initialize condition variable: %s",
35
0
            uv_strerror(init_stat));
36
0
37
0
    /* Head and tail point to a null node. */
38
0
    cbq->tail = cbq->head = MVM_calloc(1, sizeof(MVMConcBlockingQueueNode));
39
0
}
40
41
/* Copies the body of one object to another. */
42
0
static void copy_to(MVMThreadContext *tc, MVMSTable *st, void *src, MVMObject *dest_root, void *dest) {
43
0
    MVM_exception_throw_adhoc(tc, "Cannot copy object with representation ConcBlockingQueue");
44
0
}
45
46
/* Called by the VM to mark any GCable items. */
47
0
static void gc_mark(MVMThreadContext *tc, MVMSTable *st, void *data, MVMGCWorklist *worklist) {
48
0
    /* At this point we know the world is stopped, and thus we can safely do a
49
0
     * traversal of the data structure without needing locks. */
50
0
    MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
51
0
    MVMConcBlockingQueueNode *cur = cbq->head;
52
0
    while (cur) {
53
0
        MVM_gc_worklist_add(tc, worklist, &cur->value);
54
0
        cur = cur->next;
55
0
    }
56
0
}
57
58
/* Called by the VM in order to free memory associated with this object. */
59
0
static void gc_free(MVMThreadContext *tc, MVMObject *obj) {
60
0
    MVMConcBlockingQueue *cbq = (MVMConcBlockingQueue *)obj;
61
0
62
0
    /* First, free all the nodes. */
63
0
    MVMConcBlockingQueueNode *cur = cbq->body.head;
64
0
    while (cur) {
65
0
        MVMConcBlockingQueueNode *next = cur->next;
66
0
        MVM_free(cur);
67
0
        cur = next;
68
0
    }
69
0
    cbq->body.head = cbq->body.tail = NULL;
70
0
71
0
    /* Clean up locks. */
72
0
    uv_mutex_destroy(&cbq->body.locks->head_lock);
73
0
    uv_mutex_destroy(&cbq->body.locks->tail_lock);
74
0
    uv_cond_destroy(&cbq->body.locks->head_cond);
75
0
    MVM_free(cbq->body.locks);
76
0
    cbq->body.locks = NULL;
77
0
}
78
79
static const MVMStorageSpec storage_spec = {
80
    MVM_STORAGE_SPEC_REFERENCE, /* inlineable */
81
    0,                          /* bits */
82
    0,                          /* align */
83
    MVM_STORAGE_SPEC_BP_NONE,   /* boxed_primitive */
84
    0,                          /* can_box */
85
    0,                          /* is_unsigned */
86
};
87
88
/* Gets the storage specification for this representation. */
89
0
static const MVMStorageSpec * get_storage_spec(MVMThreadContext *tc, MVMSTable *st) {
90
0
    return &storage_spec;
91
0
}
92
93
/* Compose the representation. */
94
0
static void compose(MVMThreadContext *tc, MVMSTable *st, MVMObject *info) {
95
0
    /* Nothing to do for this REPR. */
96
0
}
97
98
0
static void at_pos(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMint64 index, MVMRegister *value, MVMuint16 kind) {
99
0
    MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
100
0
101
0
    if (index != 0)
102
0
        MVM_exception_throw_adhoc(tc,
103
0
            "Can only request (peek) head of a concurrent blocking queue");
104
0
    if (kind != MVM_reg_obj)
105
0
        MVM_exception_throw_adhoc(tc,
106
0
            "Can only get objects from a concurrent blocking queue");
107
0
108
0
    if (MVM_load(&cbq->elems) > 0) {
109
0
        MVMConcBlockingQueueNode *peeked;
110
0
        MVMROOT(tc, root, {
111
0
            MVM_gc_mark_thread_blocked(tc);
112
0
            uv_mutex_lock(&cbq->locks->head_lock);
113
0
            MVM_gc_mark_thread_unblocked(tc);
114
0
            data = OBJECT_BODY(root);
115
0
            cbq = (MVMConcBlockingQueueBody *)data;
116
0
        });
117
0
        peeked = cbq->head->next;
118
0
        value->o = peeked ? peeked->value : tc->instance->VMNull;
119
0
        uv_mutex_unlock(&cbq->locks->head_lock);
120
0
    }
121
0
    else {
122
0
        value->o = tc->instance->VMNull;
123
0
    }
124
0
}
125
126
0
static MVMuint64 elems(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data) {
127
0
    MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
128
0
    return MVM_load(cbq->elems);
129
0
}
130
131
0
static void push(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMRegister value, MVMuint16 kind) {
132
0
    MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
133
0
    MVMConcBlockingQueueNode *add;
134
0
    AO_t orig_elems;
135
0
    MVMObject *to_add = value.o;
136
0
137
0
    if (kind != MVM_reg_obj)
138
0
        MVM_exception_throw_adhoc(tc,
139
0
            "Can only push objects to a concurrent blocking queue");
140
0
    if (value.o == NULL)
141
0
        MVM_exception_throw_adhoc(tc,
142
0
            "Cannot store a null value in a concurrent blocking queue");
143
0
144
0
    add = MVM_calloc(1, sizeof(MVMConcBlockingQueueNode));
145
0
146
0
    MVMROOT(tc, root, {
147
0
    MVMROOT(tc, to_add, {
148
0
        MVM_gc_mark_thread_blocked(tc);
149
0
        uv_mutex_lock(&cbq->locks->tail_lock);
150
0
        MVM_gc_mark_thread_unblocked(tc);
151
0
    });
152
0
    });
153
0
    data = OBJECT_BODY(root);
154
0
    cbq = (MVMConcBlockingQueueBody *)data;
155
0
    MVM_ASSIGN_REF(tc, &(root->header), add->value, to_add);
156
0
    cbq->tail->next = add;
157
0
    cbq->tail = add;
158
0
    orig_elems = MVM_incr(&cbq->elems);
159
0
    uv_mutex_unlock(&cbq->locks->tail_lock);
160
0
161
0
    if (orig_elems == 0) {
162
0
        MVMROOT(tc, root, {
163
0
            MVM_gc_mark_thread_blocked(tc);
164
0
            uv_mutex_lock(&cbq->locks->head_lock);
165
0
            MVM_gc_mark_thread_unblocked(tc);
166
0
        });
167
0
        data = OBJECT_BODY(root);
168
0
        cbq = (MVMConcBlockingQueueBody *)data;
169
0
        uv_cond_signal(&cbq->locks->head_cond);
170
0
        uv_mutex_unlock(&cbq->locks->head_lock);
171
0
    }
172
0
}
173
174
0
static void shift(MVMThreadContext *tc, MVMSTable *st, MVMObject *root, void *data, MVMRegister *value, MVMuint16 kind) {
175
0
    MVMConcBlockingQueueBody *cbq = (MVMConcBlockingQueueBody *)data;
176
0
    MVMConcBlockingQueueNode *taken;
177
0
178
0
    if (kind != MVM_reg_obj)
179
0
        MVM_exception_throw_adhoc(tc, "Can only shift objects from a ConcBlockingQueue");
180
0
181
0
    MVMROOT(tc, root, {
182
0
        MVM_gc_mark_thread_blocked(tc);
183
0
        uv_mutex_lock(&cbq->locks->head_lock);
184
0
        MVM_gc_mark_thread_unblocked(tc);
185
0
        data = OBJECT_BODY(root);
186
0
        cbq  = (MVMConcBlockingQueueBody *)data;
187
0
188
0
        while (MVM_load(&cbq->elems) == 0) {
189
0
                MVM_gc_mark_thread_blocked(tc);
190
0
                uv_cond_wait(&cbq->locks->head_cond, &cbq->locks->head_lock);
191
0
                MVM_gc_mark_thread_unblocked(tc);
192
0
                data = OBJECT_BODY(root);
193
0
                cbq  = (MVMConcBlockingQueueBody *)data;
194
0
        }
195
0
    });
196
0
197
0
    taken = cbq->head->next;
198
0
    MVM_free(cbq->head);
199
0
    cbq->head = taken;
200
0
    MVM_barrier();
201
0
    value->o = taken->value;
202
0
    taken->value = NULL;
203
0
    MVM_barrier();
204
0
205
0
    if (MVM_decr(&cbq->elems) > 1)
206
0
        uv_cond_signal(&cbq->locks->head_cond);
207
0
208
0
    uv_mutex_unlock(&cbq->locks->head_lock);
209
0
}
210
211
/* Set the size of the STable. */
212
0
static void deserialize_stable_size(MVMThreadContext *tc, MVMSTable *st, MVMSerializationReader *reader) {
213
0
    st->size = sizeof(MVMConcBlockingQueue);
214
0
}
215
216
/* Initializes the representation. */
217
130
const MVMREPROps * MVMConcBlockingQueue_initialize(MVMThreadContext *tc) {
218
130
    return &ConcBlockingQueue_this_repr;
219
130
}
220
221
static const MVMREPROps ConcBlockingQueue_this_repr = {
222
    type_object_for,
223
    MVM_gc_allocate_object,
224
    initialize,
225
    copy_to,
226
    MVM_REPR_DEFAULT_ATTR_FUNCS,
227
    MVM_REPR_DEFAULT_BOX_FUNCS,
228
    {
229
        at_pos,
230
        MVM_REPR_DEFAULT_BIND_POS,
231
        MVM_REPR_DEFAULT_SET_ELEMS,
232
        push,
233
        MVM_REPR_DEFAULT_POP,
234
        MVM_REPR_DEFAULT_UNSHIFT,
235
        shift,
236
        MVM_REPR_DEFAULT_SPLICE,
237
        MVM_REPR_DEFAULT_AT_POS_MULTIDIM,
238
        MVM_REPR_DEFAULT_BIND_POS_MULTIDIM,
239
        MVM_REPR_DEFAULT_DIMENSIONS,
240
        MVM_REPR_DEFAULT_SET_DIMENSIONS,
241
        MVM_REPR_DEFAULT_GET_ELEM_STORAGE_SPEC
242
    },    /* pos_funcs */
243
    MVM_REPR_DEFAULT_ASS_FUNCS,
244
    elems,
245
    get_storage_spec,
246
    NULL, /* change_type */
247
    NULL, /* serialize */
248
    NULL, /* deserialize */
249
    NULL, /* serialize_repr_data */
250
    NULL, /* deserialize_repr_data */
251
    deserialize_stable_size,
252
    gc_mark,
253
    gc_free,
254
    NULL, /* gc_cleanup */
255
    NULL, /* gc_mark_repr_data */
256
    NULL, /* gc_free_repr_data */
257
    compose,
258
    NULL, /* spesh */
259
    "ConcBlockingQueue", /* name */
260
    MVM_REPR_ID_ConcBlockingQueue,
261
    NULL, /* unmanaged_size */
262
    NULL, /* describe_refs */
263
};
264
265
/* Polls a queue for a value, returning NULL if none is available. */
266
0
MVMObject * MVM_concblockingqueue_poll(MVMThreadContext *tc, MVMConcBlockingQueue *queue) {
267
0
    MVMConcBlockingQueue *cbq = (MVMConcBlockingQueue *)queue;
268
0
    MVMConcBlockingQueueNode *taken;
269
0
    MVMObject *result = tc->instance->VMNull;
270
0
271
0
    MVMROOT(tc, cbq, {
272
0
        MVM_gc_mark_thread_blocked(tc);
273
0
        uv_mutex_lock(&cbq->body.locks->head_lock);
274
0
        MVM_gc_mark_thread_unblocked(tc);
275
0
    });
276
0
277
0
    if (MVM_load(&cbq->body.elems) > 0) {
278
0
        taken = cbq->body.head->next;
279
0
        MVM_free(cbq->body.head);
280
0
        cbq->body.head = taken;
281
0
        MVM_barrier();
282
0
        result = taken->value;
283
0
        taken->value = NULL;
284
0
        MVM_barrier();
285
0
        if (MVM_decr(&cbq->body.elems) > 1)
286
0
            uv_cond_signal(&cbq->body.locks->head_cond);
287
0
    }
288
0
289
0
    uv_mutex_unlock(&cbq->body.locks->head_lock);
290
0
291
0
    return result;
292
0
}