blob: fa705e4ea629285701475a41b92fe93e0165c07d [file] [log] [blame]
fei.dengf7a0cd32023-08-29 09:36:37 +00001/*
2 * Copyright (C) 2021 Amlogic Corporation.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#include <unistd.h>
17#include "Queue.h"
18
19namespace Tls {
20
21Queue::Queue()
22{
23 _init();
24}
25
26Queue::Queue(uint32_t maxElement)
27{
28 _init();
29 if (maxElement > 0) {
30 mCapability = maxElement;
31 }
32}
33
34Queue::Queue(bool ascendingOrder, int (*cmp)(void *, void *))
35{
36 _init();
37 if (cmp) {
38 mSort = true;
39 mAscendingOrder = ascendingOrder;
40 cmpEleFun = cmp;
41 }
42}
43
44Queue::Queue(uint32_t maxElement, bool ascendingOrder, int (*cmp)(void *, void *))
45{
46 _init();
47 if (maxElement > 0) {
48 mCapability = maxElement;
49 }
50 if (cmp) {
51 mSort = true;
52 mAscendingOrder = ascendingOrder;
53 cmpEleFun = cmp;
54 }
55}
56
57Queue::~Queue()
58{
59 _release();
60}
61
62int32_t Queue::push(void *ele)
63{
64 if (Q_OK != _lock())
65 return Q_ERR_LOCK;
66
67 int32_t ret = _pushElement(ele, false);
68
69 if (Q_OK != _unlock())
70 return Q_ERR_LOCK;
71
72 return ret;
73}
74
75int32_t Queue::pushAndWait(void *ele)
76{
77 if (Q_OK != _lock())
78 return Q_ERR_LOCK;
79
80 int32_t ret = _pushElement(ele, true);
81
82 if (Q_OK != _unlock())
83 return Q_ERR_LOCK;
84
85 return ret;
86}
87
88int32_t Queue::pop(void **e)
89{
90 if (Q_OK != _lock())
91 return Q_ERR_LOCK;
92
93 int32_t ret = _popElement(e, false, NULL, NULL);
94
95 if (Q_OK != _unlock())
96 return Q_ERR_LOCK;
97
98 return ret;
99}
100
101int32_t Queue::popAndWait(void **e)
102{
103 *e = NULL;
104
105 if (Q_OK != _lock())
106 return Q_ERR_LOCK;
107
108 int32_t ret = _popElement(e, true, NULL, NULL);
109
110 if (Q_OK != _unlock())
111 return Q_ERR_LOCK;
112
113 return ret;
114}
115
116int32_t Queue::peek(void **e, int32_t pos)
117{
118 *e = NULL;
119
120 if (Q_OK != _lock())
121 return Q_ERR_LOCK;
122
123 int32_t ret = _peekElement(e, pos);
124
125 if (Q_OK != _unlock())
126 return Q_ERR_LOCK;
127
128 return ret;
129}
130
131
132int32_t Queue::flush()
133{
134 if (Q_OK != _lock())
135 return Q_ERR_LOCK;
136
137 int32_t ret = _flushElements(NULL, NULL);
138
139 if (Q_OK != _unlock())
140 return Q_ERR_LOCK;
141 return ret;
142}
143int32_t Queue::flushAndCallback(void *userdata, void (*fcb)(void *userdata, void *ele))
144{
145 if (Q_OK != _lock())
146 return Q_ERR_LOCK;
147
148 int32_t ret = _flushElements(userdata, fcb);
149
150 if (Q_OK != _unlock())
151 return Q_ERR_LOCK;
152
153 return ret;
154}
155
156
157int32_t Queue::getCnt()
158{
159 int32_t cnt = 0;
160 if (Q_OK != _lock())
161 return Q_ERR_LOCK;
162
163 cnt = mUsedElementCnts;
164
165 if (Q_OK != _unlock())
166 return Q_ERR_LOCK;
167
168 return cnt;
169}
170
171
172bool Queue::isEmpty()
173{
174 bool isEmpty;
175
176 _lock();
177
178 if (mUsedFirstElement == NULL || mUsedLastElement == NULL)
179 isEmpty = true;
180 else
181 isEmpty = false;
182
183 _unlock();
184
185 return isEmpty;
186}
187
188
189int32_t Queue::setAllowedNewData(bool allowed)
190{
191 if (Q_OK != _lock())
192 return Q_ERR_LOCK;
193
194 mAllowedNewData = allowed;
195
196 if (Q_OK != _unlock())
197 return Q_ERR_LOCK;
198
199 if (mAllowedNewData == false) {
200 // notify waiting threads, when new data isn't accepted
201 pthread_cond_broadcast(&mCondGet);
202 pthread_cond_broadcast(&mCondPut);
203 }
204
205 return Q_OK;
206}
207
208bool Queue::isAllowedNewData()
209{
210 if (Q_OK != _lock())
211 return Q_ERR_LOCK;
212
213 bool allowed = mAllowedNewData;
214
215 if (Q_OK != _unlock())
216 return Q_ERR_LOCK;
217
218 return allowed;
219}
220
221
222int32_t Queue::_init()
223{
224 pthread_mutex_init(&mMutex, NULL);
225
226 pthread_cond_init(&mCondGet, NULL);
227
228 pthread_cond_init(&mCondPut, NULL);
229
230 mUsedFirstElement = NULL;
231 mUsedLastElement = NULL;
232 mUsedElementCnts = 0;
233 mCapability = INT32_MAX - 1;//max capability
234 mAllowedNewData = true;
235 mSort = false;
236 mAscendingOrder = 1;
237 cmpEleFun = NULL;
238 return Q_OK;
239}
240
241int32_t Queue::_release()
242{
243 // this method will not immediately return on error,
244 // it will try to release all the memory that was allocated.
245 int error = Q_OK;
246 // make sure no new data comes and wake all waiting threads
247 error = setAllowedNewData(false);
248 error = _lock();
249
250 error = _flushElements(NULL, NULL);
251
252 mUsedFirstElement = NULL;
253 mUsedLastElement = NULL;
254
255 // destroy lock and queue etc
256 error = pthread_cond_destroy(&mCondGet);
257 error = pthread_cond_destroy(&mCondPut);
258
259 error = _unlock();
260 while (EBUSY == (error = pthread_mutex_destroy(&mMutex)))
261 usleep(100*1000);
262
263 return error;
264}
265
266int32_t Queue::_lock()
267{
268 // all errors are unrecoverable for us
269 if (0 != pthread_mutex_lock(&mMutex))
270 return Q_ERR_LOCK;
271 return Q_OK;
272}
273int32_t Queue::_unlock()
274{
275 // all errors are unrecoverable for us
276 if (0 != pthread_mutex_unlock(&mMutex))
277 return Q_ERR_LOCK;
278 return Q_OK;
279}
280
281int32_t Queue::_flushElements(void *userdata, void (*fcb)(void *userdata, void *ele))
282{
283 Element *ele = NULL;
284 while (mUsedFirstElement != NULL) {
285 ele = mUsedFirstElement;
286 mUsedFirstElement = mUsedFirstElement->next;
287 if (fcb != NULL) {
288 fcb(userdata,ele->data);
289 }
290 free(ele);
291 }
292 mUsedFirstElement = NULL;
293 mUsedLastElement = NULL;
294 mUsedElementCnts = 0;
295
296 return Q_OK;
297}
298
299int32_t Queue::_pushElement(void *ele, bool isWait)
300{
301 Element *newEle = NULL;
302 if (mAllowedNewData == false) { // no new data allowed
303 return Q_ERR_NONEWDATA;
304 }
305
306 // max_elements already reached?
307 // if condition _needs_ to be in sync with while loop below!
308 if (mUsedElementCnts == mCapability) {
309 if (isWait == false) {
310 return Q_ERR_NUM_ELEMENTS;
311 } else {
312 while ((mUsedElementCnts == mCapability) && mAllowedNewData) {
313 pthread_cond_wait(&mCondPut, &mMutex);
314 }
315 if (mAllowedNewData == false) {
316 return Q_ERR_NONEWDATA;
317 }
318 }
319 }
320
321 newEle = (Element *)calloc(1,sizeof(Element));
322 if (newEle == NULL) { // could not allocate memory for new elements
323 return Q_ERR_MEM;
324 }
325 newEle->data = ele;
326 newEle->next = NULL;
327//printf("_pushElement,%p,%p,data:%p\n",mUsedLastElement,newEle,ele);
328 if (mSort == false || mUsedFirstElement == NULL) {
329 // insert at the end when we don't want to sort or the queue is empty
330 if (mUsedLastElement == NULL)
331 mUsedFirstElement = newEle;
332 else
333 mUsedLastElement->next = newEle;
334 mUsedLastElement = newEle;
335 } else {
336 // search appropriate place to sort element in
337 Element *s = mUsedFirstElement; // s != NULL, because of if condition above
338 Element *t = NULL;
339 //check if insert new element to the first element
340 int asc_first_el = (mAscendingOrder == true && cmpEleFun(s->data, ele) >= 0);
341 int desc_first_el = (mAscendingOrder == false && cmpEleFun(s->data, ele) <= 0);
342
343 if (asc_first_el == 0 && desc_first_el == 0) {
344 // element will be inserted between s and t
345 for (s = mUsedFirstElement, t = s->next; s != NULL && t != NULL; s = t, t = t->next) {
346 if (mAscendingOrder == true && cmpEleFun(s->data, ele) <= 0 && cmpEleFun(ele, t->data) <= 0) {
347 // asc: s <= e <= t
348 break;
349 } else if(mAscendingOrder == false && cmpEleFun(s->data, ele) >= 0 && cmpEleFun(ele, t->data) >= 0) {
350 // desc: s >= e >= t
351 break;
352 }
353 }
354 // actually insert
355 s->next = newEle;
356 newEle->next = t;
357 if (t == NULL)
358 mUsedLastElement = newEle;
359 } else if(asc_first_el != 0 || desc_first_el != 0) {
360 // add at front
361 newEle->next = mUsedFirstElement;
362 mUsedFirstElement = newEle;
363 }
364 }
365 mUsedElementCnts++;
366 //printf("_pushElement,%p,data:%p,%d\n",mUsedLastElement,mUsedLastElement->data,mUsedElementCnts);
367 // notify only one waiting thread, so that we don't have to check and fall to sleep because we were to slow
368 pthread_cond_signal(&mCondGet);
369
370 return Q_OK;
371}
372
373int32_t Queue::_popElement(void **e, bool isWait, int (*cmp)(void *, void *), void *cmpEle)
374{
375 // are elements in the queue?
376 if (mUsedElementCnts == 0) {
377 if (isWait == false) {
378 *e = NULL;
379 return Q_ERR_NUM_ELEMENTS;
380 } else {
381 while (mUsedElementCnts == 0 && mAllowedNewData) {
382 pthread_cond_wait(&mCondGet, &mMutex);
383 }
384 if (mUsedElementCnts == 0 && mAllowedNewData == false) {
385 return Q_ERR_NONEWDATA;
386 }
387 }
388 }
389
390 // get first element (which fulfills the requirements)
391 Element *elePrev = NULL, *ele = mUsedFirstElement;
392 while (cmp != NULL && ele != NULL && 0 != cmp(ele, cmpEle)) {
393 elePrev = ele;
394 ele = ele->next;
395 }
396 if (ele != NULL && elePrev == NULL) {
397 *e = mUsedFirstElement->data;
398 //element is at first, remove this node
399 mUsedFirstElement = mUsedFirstElement->next;
400 --mUsedElementCnts;
401 if (mUsedFirstElement == NULL) {
402 mUsedLastElement = NULL;
403 }
404 //printf("_popElement,%p,data:%p,%p\n",ele,*e,mUsedFirstElement);
405 free(ele);
406 } else if (ele != NULL && elePrev != NULL) {
407 // element is in the middle,remove this node
408 elePrev->next = ele->next;
409 --mUsedElementCnts;
410 *e = ele->data;
411 free(ele);
412 } else {
413 // element is invalid
414 *e = NULL;
415 return Q_ERR_INVALID_ELEMENT;
416 }
417
418 // notify only one waiting thread
419 pthread_cond_signal(&mCondPut);
420
421 return Q_OK;
422}
423
424int32_t Queue::_peekElement(void **e, int32_t pos)
425{
426 Element *ele = mUsedFirstElement;
427 int32_t elePos = 0;
428 if (mUsedElementCnts == 0) {
429 return Q_ERR_INVALID_ELEMENT;
430 }
431
432 while (ele != NULL && elePos++ < pos) {
433 ele = ele->next;
434 }
435
436 if (ele == NULL) {
437 return Q_ERR_INVALID_ELEMENT;
438 }
439 *e = ele->data;
440 return Q_OK;
441}
442
443}