blob: be00a310b13d97205ccb92da2c0adc6ccd62c0db [file] [log] [blame]
/* GStreamer
* Copyright (C) 2023 <xuesong.jiang@amlogic.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include "gstamladapterpipe.h"
#include "aml_defs.h"
typedef struct _GstClearAdapter GstClearAdapter;
typedef struct _GstSecureAdapter GstSecureAdapter;
struct _GstClearAdapter
{
/* adpater collecting clear data */
GstAdapter *adapter;
guint prob_peek_offset;
};
struct _GstSecureAdapter
{
/* adpater collecting secure dma buf */
GstQueueArray *sec_buf_q;
guint prob_peek_idx;
};
struct _GstAdapterPipe
{
GstAdapterPipeDataType type;
/* lock for syncing */
GMutex tlock;
/* with TLOCK */
/* signals counterpart thread to have a look */
GCond cond;
/* seen eos */
gboolean eos;
gboolean secure;
gboolean unlimit;
/* seen flushing */
gboolean flushing;
/* flowreturn obtained by src task */
GstFlowReturn srcresult;
GstAdapterPipeStatus status;
union
{
GstClearAdapter clear_ap;
GstSecureAdapter secure_ap;
} s;
};
GstFlowReturn gst_amladapterpipe_init(GstAdapterPipe *pipe, GstAdapterPipeDataType type, gboolean is_secure, gboolean is_unlimit);
static GstFlowReturn gst_amladapterpipe_dispose(GstAdapterPipe *pipe);
static GstFlowReturn gst_amladapterpipe_clear_unlock(GstAdapterPipe *pipe);
static gboolean gst_amladapterpipe_overrun_unlock(GstAdapterPipe *pipe);
static gboolean gst_amladapterpipe_underrun_unlock(GstAdapterPipe *pipe);
static GstFlowReturn gst_amladapterpipe_push_uncheck_unlock(GstAdapterPipe *pipe, GstBuffer *buffer);
static GstFlowReturn gst_amladapterpipe_pop_uncheck_unlock(GstAdapterPipe *pipe, GstBuffer **buffer);
/* private interfaces */
GstFlowReturn gst_amladapterpipe_init(GstAdapterPipe *pipe, GstAdapterPipeDataType type, gboolean is_secure, gboolean is_unlimit)
{
g_mutex_init(&pipe->tlock);
g_cond_init(&pipe->cond);
pipe->eos = FALSE;
pipe->type = type;
pipe->secure = is_secure;
pipe->unlimit = is_unlimit;
pipe->srcresult = GST_FLOW_OK;
pipe->status.pushed_size = 0;
pipe->status.poped_size = 0;
pipe->status.low_num = SEC_BUF_QUEUE_MIN_SIZE;
pipe->status.max_num = SEC_BUF_QUEUE_MAX_SIZE;
pipe->status.needed_num = SEC_BUF_QUEUE_NEEDED_SIZE;
pipe->status.low_size = LOW_BUFFER_SIZE;
pipe->status.max_size = MAX_BUFFER_SIZE;
pipe->status.needed_size = LOW_BUFFER_SIZE;
pipe->status.cur_size = 0;
if (pipe->secure)
{
pipe->s.secure_ap.sec_buf_q = gst_queue_array_new(SEC_BUF_QUEUE_MAX_SIZE);
g_return_val_if_fail(pipe->s.secure_ap.sec_buf_q, GST_FLOW_ERROR);
pipe->status.cur_num = 0;
pipe->status.pushed_num = 0;
pipe->status.poped_num = 0;
pipe->s.secure_ap.prob_peek_idx = 0;
}
else
{
pipe->s.clear_ap.adapter = gst_adapter_new();
g_return_val_if_fail(pipe->s.clear_ap.adapter, GST_FLOW_ERROR);
pipe->status.cur_num = (pipe->status.low_num + pipe->status.max_num) / 2;
pipe->status.pushed_num = pipe->status.cur_num;
pipe->status.poped_num = pipe->status.cur_num;
pipe->s.clear_ap.prob_peek_offset = 0;
}
return GST_FLOW_OK;
}
GstFlowReturn gst_amladapterpipe_dispose(GstAdapterPipe *pipe)
{
if (pipe->s.clear_ap.adapter)
{
g_object_unref(G_OBJECT(pipe->s.clear_ap.adapter));
pipe->s.clear_ap.adapter = NULL;
}
if (pipe->s.secure_ap.sec_buf_q)
{
gst_queue_array_free(pipe->s.secure_ap.sec_buf_q);
pipe->s.secure_ap.sec_buf_q = NULL;
}
return GST_FLOW_OK;
}
static GstFlowReturn gst_amladapterpipe_clear_unlock(GstAdapterPipe *pipe)
{
g_return_val_if_fail(pipe, GST_FLOW_ERROR);
if (pipe->secure)
{
if (pipe->s.secure_ap.sec_buf_q)
{
GstMiniObject *obj;
while ((obj = gst_queue_array_pop_head(pipe->s.secure_ap.sec_buf_q)))
gst_mini_object_unref(obj);
}
pipe->status.cur_num = 0;
pipe->status.pushed_num = 0;
pipe->status.poped_num = 0;
pipe->s.secure_ap.prob_peek_idx = 0;
}
else
{
if (pipe->s.clear_ap.adapter)
gst_adapter_clear(pipe->s.clear_ap.adapter);
pipe->s.clear_ap.prob_peek_offset = 0;
}
pipe->status.cur_size = 0;
pipe->status.pushed_size = 0;
pipe->status.poped_size = 0;
return GST_FLOW_OK;
}
static gboolean gst_amladapterpipe_overrun_unlock(GstAdapterPipe *pipe)
{
if (pipe->status.cur_num > pipe->status.max_num || pipe->status.cur_size > pipe->status.max_size)
{
GST_DEBUG("pipe:%p, type:%d, overrun, [status.cur_num:%d, status.max_num:%d], [status.cur_size:%d, status.max_size:%d]",
pipe, pipe->type, pipe->status.cur_num, pipe->status.max_num, pipe->status.cur_size, pipe->status.max_size);
return TRUE;
}
else
return FALSE;
}
static gboolean gst_amladapterpipe_underrun_unlock(GstAdapterPipe *pipe)
{
if (pipe->status.cur_num < pipe->status.needed_num || pipe->status.cur_size < pipe->status.needed_size)
{
GST_DEBUG("pipe:%p, type:%d, overrun, [status.cur_num:%d, status.needed_num:%d], [status.cur_size:%d, status.needed_size:%d]",
pipe, pipe->type, pipe->status.cur_num, pipe->status.needed_num, pipe->status.cur_size, pipe->status.needed_size);
return TRUE;
}
else
return FALSE;
}
static GstFlowReturn gst_amladapterpipe_push_uncheck_unlock(GstAdapterPipe *pipe, GstBuffer *buffer)
{
if (pipe->secure)
{
gst_queue_array_push_tail(pipe->s.secure_ap.sec_buf_q, buffer);
pipe->status.cur_num++;
pipe->status.pushed_num++;
}
else
gst_adapter_push(pipe->s.clear_ap.adapter, buffer);
pipe->status.cur_size += gst_buffer_get_size(buffer);
pipe->status.pushed_size += gst_buffer_get_size(buffer);
GST_DEBUG("pipe:%p, type:%d, pushed in buffer %p, status.cur_num:%d, status.cur_size:%d", pipe, pipe->type, buffer, pipe->status.cur_num, pipe->status.cur_size);
return GST_FLOW_OK;
}
static GstFlowReturn gst_amladapterpipe_pop_uncheck_unlock(GstAdapterPipe *pipe, GstBuffer **buffer)
{
GstFlowReturn ret;
ret = GST_FLOW_OK;
if (pipe->secure)
{
*buffer = (GstBuffer *)gst_queue_array_pop_head(pipe->s.secure_ap.sec_buf_q);
}
else
{
*buffer = gst_adapter_take_buffer(pipe->s.clear_ap.adapter, BUFFER_SIZE);
}
if (*buffer == NULL)
{
ret = GST_FLOW_AMLAP_EMPTY;
}
else
{
if (pipe->secure)
{
pipe->status.cur_num--;
pipe->status.poped_num++;
}
pipe->status.cur_size -= gst_buffer_get_size(*buffer);
pipe->status.poped_size += gst_buffer_get_size(*buffer);
}
GST_DEBUG("pipe:%p, type:%d, poped out buffer %p, status.cur_num:%d, status.cur_size:%d, ret:%d", pipe, pipe->type, *buffer, pipe->status.cur_num, pipe->status.cur_size, ret);
return ret;
}
/* public interfaces */
GstAdapterPipe *gst_amladapterpipe_new()
{
GstAdapterPipe *pipe;
pipe = calloc(1, sizeof(GstAdapterPipe));
memset(pipe, 0, sizeof(GstAdapterPipe));
gst_amladapterpipe_init(pipe, GST_AP_TS, FALSE, FALSE);
return pipe;
}
GstAdapterPipe *gst_amladapterpipe_new_full(GstAdapterPipeDataType type, gboolean is_secure, gboolean is_unlimit)
{
GstAdapterPipe *pipe;
pipe = calloc(1, sizeof(GstAdapterPipe));
g_return_val_if_fail(pipe, NULL);
memset(pipe, 0, sizeof(GstAdapterPipe));
gst_amladapterpipe_init(pipe, type, is_secure, is_unlimit);
return pipe;
}
void gst_amladapterpipe_free(GstAdapterPipe *pipe)
{
g_return_if_fail(pipe);
gst_amladapterpipe_reset(pipe);
gst_amladapterpipe_dispose(pipe);
free(pipe);
}
GstFlowReturn gst_amladapterpipe_reset(GstAdapterPipe *pipe)
{
g_return_val_if_fail(pipe, GST_FLOW_ERROR);
GstFlowReturn ret;
GST_ADAPTER_PIPE_MUTEX_LOCK(pipe);
ret = gst_amladapterpipe_clear_unlock(pipe);
pipe->eos = FALSE;
pipe->srcresult = GST_FLOW_OK;
GST_ADAPTER_PIPE_MUTEX_UNLOCK(pipe);
g_mutex_clear(&pipe->tlock);
g_cond_clear(&pipe->cond);
return ret;
}
GstFlowReturn gst_amladapterpipe_clear(GstAdapterPipe *pipe)
{
g_return_val_if_fail(pipe, GST_FLOW_ERROR);
GstFlowReturn ret;
GST_ADAPTER_PIPE_MUTEX_LOCK(pipe);
ret = gst_amladapterpipe_clear_unlock(pipe);
GST_ADAPTER_PIPE_MUTEX_UNLOCK(pipe);
return ret;
}
gboolean gst_amladapterpipe_get_status(GstAdapterPipe *pipe, GstAdapterPipeStatus *status)
{
g_return_val_if_fail(pipe, FALSE);
g_return_val_if_fail(status, FALSE);
GST_ADAPTER_PIPE_MUTEX_LOCK(pipe);
memcpy(status, &pipe->status, sizeof(GstAdapterPipeStatus));
GST_ADAPTER_PIPE_MUTEX_UNLOCK(pipe);
return TRUE;
}
GstFlowReturn gst_amladapterpipe_push(GstAdapterPipe *pipe, GstBuffer *buffer)
{
g_return_val_if_fail(pipe, GST_FLOW_ERROR);
g_return_val_if_fail(buffer, GST_FLOW_ERROR);
GstFlowReturn ret;
GST_ADAPTER_PIPE_MUTEX_LOCK(pipe);
if (pipe->flushing)
goto flushing;
while (!pipe->unlimit && (pipe->status.cur_num + 1 > pipe->status.max_num || pipe->status.cur_size + gst_buffer_get_size(buffer) > pipe->status.max_size))
{
GST_DEBUG("pipe:%p, type:%d, reached max, [status.cur_num:%d, status.max_num:%d], [status.cur_size:%d, status.max_size:%d], waitting sig",
pipe, pipe->type, pipe->status.cur_num, pipe->status.max_num, pipe->status.cur_size, pipe->status.max_size);
GST_ADAPTER_PIPE_WAIT(pipe);
GST_DEBUG("pipe:%p, type:%d, waitted", pipe, pipe->type);
}
if (pipe->flushing)
goto flushing;
ret = gst_amladapterpipe_push_uncheck_unlock(pipe, buffer);
if (!pipe->unlimit && (pipe->status.cur_num >= pipe->status.needed_num && pipe->status.cur_size >= pipe->status.needed_size))
{
GST_DEBUG("pipe:%p, type:%d, reachd needed, [status.cur_num:%d, status.needed_num:%d], [status.cur_size:%d, status.needed_size:%d], emit sig",
pipe, pipe->type, pipe->status.cur_num, pipe->status.needed_num, pipe->status.cur_size, pipe->status.needed_size);
GST_ADAPTER_PIPE_SIGNAL(pipe);
}
if (ret != GST_FLOW_OK)
{
GST_ERROR("pipe:%p, type:%d, push return err:%d", pipe, pipe->type, ret);
goto done;
}
GST_DEBUG("pipe:%p, type:%d, pushed buf:%p(%d bytes), status.cur_num:%d, status.cur_size:%d",
pipe, pipe->type, buffer, gst_buffer_get_size(buffer), pipe->status.cur_num, pipe->status.cur_size);
done:
GST_ADAPTER_PIPE_MUTEX_UNLOCK(pipe);
return ret;
flushing:
GST_DEBUG("pipe:%p, type:%d, flushing, clear adapter", pipe, pipe->type);
gst_buffer_unref(buffer);
gst_amladapterpipe_clear_unlock(pipe);
ret = GST_FLOW_FLUSHING;
goto done;
}
GstFlowReturn gst_amladapterpipe_pop(GstAdapterPipe *pipe, GstBuffer **buffer)
{
g_return_val_if_fail(pipe, GST_FLOW_ERROR);
g_return_val_if_fail(buffer, GST_FLOW_ERROR);
GstFlowReturn ret;
GST_ADAPTER_PIPE_MUTEX_LOCK(pipe);
if (pipe->flushing)
goto flushing;
while (!pipe->unlimit && (pipe->status.cur_num < pipe->status.needed_num || pipe->status.cur_size < pipe->status.needed_size) && !pipe->eos)
{
GST_DEBUG("pipe:%p, type:%d, need more, [status.cur_num:%d, status.needed_num:%d], [status.cur_size:%d, status.needed_size:%d]",
pipe, pipe->type, pipe->status.cur_num, pipe->status.needed_num, pipe->status.cur_size, pipe->status.needed_size);
gboolean waitted;
GST_ADAPTER_PIPE_WAITT_TIME(pipe, 20, waitted);
if (!waitted)
{
GST_DEBUG("pipe:%p, type:%d, timeout. need continue", pipe, pipe->type);
continue;
}
GST_DEBUG("pipe:%p, type:%d, waitted", pipe, pipe->type);
break;
}
if (pipe->flushing)
goto flushing;
ret = gst_amladapterpipe_pop_uncheck_unlock(pipe, buffer);
if (ret == GST_FLOW_AMLAP_EMPTY)
{
GST_DEBUG("pipe:%p, type:%d, poped buf:%p, status.cur_num:%d, status.cur_size:%d, pipe empty, need retry",
pipe, pipe->type, *buffer, pipe->status.cur_num, pipe->status.cur_size);
goto done;
}
GST_DEBUG("pipe:%p, type:%d, poped buf:%p(%d bytes), status.cur_num:%d, status.cur_size:%d",
pipe, pipe->type, *buffer, gst_buffer_get_size(*buffer), pipe->status.cur_num, pipe->status.cur_size);
done:
if (!pipe->unlimit && (pipe->status.cur_num <= pipe->status.low_num || pipe->status.cur_size <= pipe->status.low_size))
{
GST_DEBUG("pipe:%p, type:%d, buffer low, [status.cur_num:%d, status.low_num:%d], [status.cur_size:%d, status.low_size:%d]",
pipe, pipe->type, pipe->status.cur_num, pipe->status.low_num, pipe->status.cur_size, pipe->status.low_size);
GST_ADAPTER_PIPE_SIGNAL(pipe);
}
GST_ADAPTER_PIPE_MUTEX_UNLOCK(pipe);
return ret;
flushing:
GST_DEBUG("pipe:%p, type:%d, flushing, clear adapter", pipe, pipe->type);
gst_amladapterpipe_clear_unlock(pipe);
ret = GST_FLOW_FLUSHING;
goto done;
}
GstFlowReturn gst_amladapterpipe_peek(GstAdapterPipe *pipe, GstBuffer **buffer)
{
g_return_val_if_fail(pipe, GST_FLOW_ERROR);
g_return_val_if_fail(buffer, GST_FLOW_ERROR);
GstFlowReturn ret;
ret = GST_FLOW_AMLAP_EMPTY;
*buffer = NULL;
GST_ADAPTER_PIPE_MUTEX_LOCK(pipe);
if (pipe->secure)
{
if (gst_queue_array_get_length(pipe->s.secure_ap.sec_buf_q) <= pipe->s.secure_ap.prob_peek_idx)
{
GST_DEBUG("pipe:%p, type:%d, peek err. q len < idx", pipe, pipe->type);
}
else
{
*buffer = (GstBuffer *)gst_queue_array_peek_nth(pipe->s.secure_ap.sec_buf_q, pipe->s.secure_ap.prob_peek_idx);
if (*buffer)
{
gst_buffer_ref(*buffer);
pipe->s.secure_ap.prob_peek_idx++;
ret = GST_FLOW_OK;
}
}
}
else
{
guint available = gst_adapter_available(pipe->s.clear_ap.adapter);
if (available < BUFFER_SIZE + pipe->s.clear_ap.prob_peek_offset)
{
GST_DEBUG("pipe:%p, type:%d, peek err. q available:%d < BUFFER_SIZE", pipe, pipe->type, available);
}
else
{
GstMapInfo map;
memset(&map, 0, sizeof(map));
*buffer = gst_buffer_new_and_alloc(BUFFER_SIZE);
if (*buffer)
{
gst_buffer_map(*buffer, &map, GST_MAP_WRITE);
gst_adapter_copy(pipe->s.clear_ap.adapter, map.data, pipe->s.clear_ap.prob_peek_offset, BUFFER_SIZE);
gst_buffer_unmap(*buffer, &map);
pipe->s.clear_ap.prob_peek_offset += BUFFER_SIZE;
ret = GST_FLOW_OK;
}
}
}
GST_DEBUG("pipe:%p, type:%d, peeked buf:%p(%d bytes), ret:%d", pipe, pipe->type, *buffer, *buffer ? gst_buffer_get_size(*buffer) : -1, ret);
GST_ADAPTER_PIPE_MUTEX_UNLOCK(pipe);
return ret;
}
GstFlowReturn gst_amladapterpipe_set_flushing(GstAdapterPipe *pipe, gboolean flushing)
{
g_return_val_if_fail(pipe, GST_FLOW_ERROR);
GstFlowReturn ret;
GST_ADAPTER_PIPE_MUTEX_LOCK(pipe);
GST_DEBUG("pipe:%p, type:%d, set flushing to %d", pipe, pipe->type, flushing);
pipe->flushing = flushing;
ret = gst_amladapterpipe_clear_unlock(pipe);
if (!pipe->unlimit)
GST_ADAPTER_PIPE_SIGNAL(pipe);
GST_ADAPTER_PIPE_MUTEX_UNLOCK(pipe);
return ret;
}
GstFlowReturn gst_amladapterpipe_is_flushing(GstAdapterPipe *pipe, gboolean *flushing)
{
g_return_val_if_fail(pipe, GST_FLOW_ERROR);
g_return_val_if_fail(flushing, GST_FLOW_ERROR);
GST_ADAPTER_PIPE_MUTEX_LOCK(pipe);
*flushing = pipe->flushing;
GST_DEBUG("pipe:%p, type:%d, is flushing:%d", pipe, pipe->type, pipe->flushing);
GST_ADAPTER_PIPE_MUTEX_UNLOCK(pipe);
return GST_FLOW_OK;
}
void gst_amladapterpipe_set_eos(GstAdapterPipe *pipe)
{
g_return_if_fail(pipe);
GST_ADAPTER_PIPE_MUTEX_LOCK(pipe);
GST_DEBUG("pipe:%p, type:%d, set eos", pipe, pipe->type);
pipe->eos = TRUE;
GST_ADAPTER_PIPE_MUTEX_UNLOCK(pipe);
}
gboolean gst_amladapterpipe_is_eos(GstAdapterPipe *pipe)
{
g_return_val_if_fail(pipe, FALSE);
gboolean ret;
GST_ADAPTER_PIPE_MUTEX_LOCK(pipe);
ret = pipe->eos;
GST_DEBUG("pipe:%p, type:%d, is eos:%d", pipe, pipe->type, ret);
GST_ADAPTER_PIPE_MUTEX_UNLOCK(pipe);
return ret;
}