blob: fa705e4ea629285701475a41b92fe93e0165c07d [file] [log] [blame]
/*
* Copyright (C) 2021 Amlogic Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <unistd.h>
#include "Queue.h"
namespace Tls {
Queue::Queue()
{
_init();
}
Queue::Queue(uint32_t maxElement)
{
_init();
if (maxElement > 0) {
mCapability = maxElement;
}
}
Queue::Queue(bool ascendingOrder, int (*cmp)(void *, void *))
{
_init();
if (cmp) {
mSort = true;
mAscendingOrder = ascendingOrder;
cmpEleFun = cmp;
}
}
Queue::Queue(uint32_t maxElement, bool ascendingOrder, int (*cmp)(void *, void *))
{
_init();
if (maxElement > 0) {
mCapability = maxElement;
}
if (cmp) {
mSort = true;
mAscendingOrder = ascendingOrder;
cmpEleFun = cmp;
}
}
Queue::~Queue()
{
_release();
}
int32_t Queue::push(void *ele)
{
if (Q_OK != _lock())
return Q_ERR_LOCK;
int32_t ret = _pushElement(ele, false);
if (Q_OK != _unlock())
return Q_ERR_LOCK;
return ret;
}
int32_t Queue::pushAndWait(void *ele)
{
if (Q_OK != _lock())
return Q_ERR_LOCK;
int32_t ret = _pushElement(ele, true);
if (Q_OK != _unlock())
return Q_ERR_LOCK;
return ret;
}
int32_t Queue::pop(void **e)
{
if (Q_OK != _lock())
return Q_ERR_LOCK;
int32_t ret = _popElement(e, false, NULL, NULL);
if (Q_OK != _unlock())
return Q_ERR_LOCK;
return ret;
}
int32_t Queue::popAndWait(void **e)
{
*e = NULL;
if (Q_OK != _lock())
return Q_ERR_LOCK;
int32_t ret = _popElement(e, true, NULL, NULL);
if (Q_OK != _unlock())
return Q_ERR_LOCK;
return ret;
}
int32_t Queue::peek(void **e, int32_t pos)
{
*e = NULL;
if (Q_OK != _lock())
return Q_ERR_LOCK;
int32_t ret = _peekElement(e, pos);
if (Q_OK != _unlock())
return Q_ERR_LOCK;
return ret;
}
int32_t Queue::flush()
{
if (Q_OK != _lock())
return Q_ERR_LOCK;
int32_t ret = _flushElements(NULL, NULL);
if (Q_OK != _unlock())
return Q_ERR_LOCK;
return ret;
}
int32_t Queue::flushAndCallback(void *userdata, void (*fcb)(void *userdata, void *ele))
{
if (Q_OK != _lock())
return Q_ERR_LOCK;
int32_t ret = _flushElements(userdata, fcb);
if (Q_OK != _unlock())
return Q_ERR_LOCK;
return ret;
}
int32_t Queue::getCnt()
{
int32_t cnt = 0;
if (Q_OK != _lock())
return Q_ERR_LOCK;
cnt = mUsedElementCnts;
if (Q_OK != _unlock())
return Q_ERR_LOCK;
return cnt;
}
bool Queue::isEmpty()
{
bool isEmpty;
_lock();
if (mUsedFirstElement == NULL || mUsedLastElement == NULL)
isEmpty = true;
else
isEmpty = false;
_unlock();
return isEmpty;
}
int32_t Queue::setAllowedNewData(bool allowed)
{
if (Q_OK != _lock())
return Q_ERR_LOCK;
mAllowedNewData = allowed;
if (Q_OK != _unlock())
return Q_ERR_LOCK;
if (mAllowedNewData == false) {
// notify waiting threads, when new data isn't accepted
pthread_cond_broadcast(&mCondGet);
pthread_cond_broadcast(&mCondPut);
}
return Q_OK;
}
bool Queue::isAllowedNewData()
{
if (Q_OK != _lock())
return Q_ERR_LOCK;
bool allowed = mAllowedNewData;
if (Q_OK != _unlock())
return Q_ERR_LOCK;
return allowed;
}
int32_t Queue::_init()
{
pthread_mutex_init(&mMutex, NULL);
pthread_cond_init(&mCondGet, NULL);
pthread_cond_init(&mCondPut, NULL);
mUsedFirstElement = NULL;
mUsedLastElement = NULL;
mUsedElementCnts = 0;
mCapability = INT32_MAX - 1;//max capability
mAllowedNewData = true;
mSort = false;
mAscendingOrder = 1;
cmpEleFun = NULL;
return Q_OK;
}
int32_t Queue::_release()
{
// this method will not immediately return on error,
// it will try to release all the memory that was allocated.
int error = Q_OK;
// make sure no new data comes and wake all waiting threads
error = setAllowedNewData(false);
error = _lock();
error = _flushElements(NULL, NULL);
mUsedFirstElement = NULL;
mUsedLastElement = NULL;
// destroy lock and queue etc
error = pthread_cond_destroy(&mCondGet);
error = pthread_cond_destroy(&mCondPut);
error = _unlock();
while (EBUSY == (error = pthread_mutex_destroy(&mMutex)))
usleep(100*1000);
return error;
}
int32_t Queue::_lock()
{
// all errors are unrecoverable for us
if (0 != pthread_mutex_lock(&mMutex))
return Q_ERR_LOCK;
return Q_OK;
}
int32_t Queue::_unlock()
{
// all errors are unrecoverable for us
if (0 != pthread_mutex_unlock(&mMutex))
return Q_ERR_LOCK;
return Q_OK;
}
int32_t Queue::_flushElements(void *userdata, void (*fcb)(void *userdata, void *ele))
{
Element *ele = NULL;
while (mUsedFirstElement != NULL) {
ele = mUsedFirstElement;
mUsedFirstElement = mUsedFirstElement->next;
if (fcb != NULL) {
fcb(userdata,ele->data);
}
free(ele);
}
mUsedFirstElement = NULL;
mUsedLastElement = NULL;
mUsedElementCnts = 0;
return Q_OK;
}
int32_t Queue::_pushElement(void *ele, bool isWait)
{
Element *newEle = NULL;
if (mAllowedNewData == false) { // no new data allowed
return Q_ERR_NONEWDATA;
}
// max_elements already reached?
// if condition _needs_ to be in sync with while loop below!
if (mUsedElementCnts == mCapability) {
if (isWait == false) {
return Q_ERR_NUM_ELEMENTS;
} else {
while ((mUsedElementCnts == mCapability) && mAllowedNewData) {
pthread_cond_wait(&mCondPut, &mMutex);
}
if (mAllowedNewData == false) {
return Q_ERR_NONEWDATA;
}
}
}
newEle = (Element *)calloc(1,sizeof(Element));
if (newEle == NULL) { // could not allocate memory for new elements
return Q_ERR_MEM;
}
newEle->data = ele;
newEle->next = NULL;
//printf("_pushElement,%p,%p,data:%p\n",mUsedLastElement,newEle,ele);
if (mSort == false || mUsedFirstElement == NULL) {
// insert at the end when we don't want to sort or the queue is empty
if (mUsedLastElement == NULL)
mUsedFirstElement = newEle;
else
mUsedLastElement->next = newEle;
mUsedLastElement = newEle;
} else {
// search appropriate place to sort element in
Element *s = mUsedFirstElement; // s != NULL, because of if condition above
Element *t = NULL;
//check if insert new element to the first element
int asc_first_el = (mAscendingOrder == true && cmpEleFun(s->data, ele) >= 0);
int desc_first_el = (mAscendingOrder == false && cmpEleFun(s->data, ele) <= 0);
if (asc_first_el == 0 && desc_first_el == 0) {
// element will be inserted between s and t
for (s = mUsedFirstElement, t = s->next; s != NULL && t != NULL; s = t, t = t->next) {
if (mAscendingOrder == true && cmpEleFun(s->data, ele) <= 0 && cmpEleFun(ele, t->data) <= 0) {
// asc: s <= e <= t
break;
} else if(mAscendingOrder == false && cmpEleFun(s->data, ele) >= 0 && cmpEleFun(ele, t->data) >= 0) {
// desc: s >= e >= t
break;
}
}
// actually insert
s->next = newEle;
newEle->next = t;
if (t == NULL)
mUsedLastElement = newEle;
} else if(asc_first_el != 0 || desc_first_el != 0) {
// add at front
newEle->next = mUsedFirstElement;
mUsedFirstElement = newEle;
}
}
mUsedElementCnts++;
//printf("_pushElement,%p,data:%p,%d\n",mUsedLastElement,mUsedLastElement->data,mUsedElementCnts);
// notify only one waiting thread, so that we don't have to check and fall to sleep because we were to slow
pthread_cond_signal(&mCondGet);
return Q_OK;
}
int32_t Queue::_popElement(void **e, bool isWait, int (*cmp)(void *, void *), void *cmpEle)
{
// are elements in the queue?
if (mUsedElementCnts == 0) {
if (isWait == false) {
*e = NULL;
return Q_ERR_NUM_ELEMENTS;
} else {
while (mUsedElementCnts == 0 && mAllowedNewData) {
pthread_cond_wait(&mCondGet, &mMutex);
}
if (mUsedElementCnts == 0 && mAllowedNewData == false) {
return Q_ERR_NONEWDATA;
}
}
}
// get first element (which fulfills the requirements)
Element *elePrev = NULL, *ele = mUsedFirstElement;
while (cmp != NULL && ele != NULL && 0 != cmp(ele, cmpEle)) {
elePrev = ele;
ele = ele->next;
}
if (ele != NULL && elePrev == NULL) {
*e = mUsedFirstElement->data;
//element is at first, remove this node
mUsedFirstElement = mUsedFirstElement->next;
--mUsedElementCnts;
if (mUsedFirstElement == NULL) {
mUsedLastElement = NULL;
}
//printf("_popElement,%p,data:%p,%p\n",ele,*e,mUsedFirstElement);
free(ele);
} else if (ele != NULL && elePrev != NULL) {
// element is in the middle,remove this node
elePrev->next = ele->next;
--mUsedElementCnts;
*e = ele->data;
free(ele);
} else {
// element is invalid
*e = NULL;
return Q_ERR_INVALID_ELEMENT;
}
// notify only one waiting thread
pthread_cond_signal(&mCondPut);
return Q_OK;
}
int32_t Queue::_peekElement(void **e, int32_t pos)
{
Element *ele = mUsedFirstElement;
int32_t elePos = 0;
if (mUsedElementCnts == 0) {
return Q_ERR_INVALID_ELEMENT;
}
while (ele != NULL && elePos++ < pos) {
ele = ele->next;
}
if (ele == NULL) {
return Q_ERR_INVALID_ELEMENT;
}
*e = ele->data;
return Q_OK;
}
}