amldemux: CF1 add load balance for es streams [1/1]
PD#SWPL-135165
Problem:
add load balance for es streams
disable by default
Solution:
(detail info)
Verify:
(detail info)
Change-Id: I4e246ac331a52f1270d9d3ab415f07747265df24
Signed-off-by: xuesong.jiang <xuesong.jiang@amlogic.com>
diff --git a/src/aml_defs.h b/src/aml_defs.h
index 68c809b..4421b7f 100644
--- a/src/aml_defs.h
+++ b/src/aml_defs.h
@@ -53,6 +53,7 @@
/** dmx es ring buffer threshold **/
#define AMLHWDMX_CLEAR_VIDEO_ES_RING_BUF_SIZE AMLHWDMX_MAX_ES_BUF_SIZE
#define AMLHWDMX_DEFAULT_ES_RING_BUF_SIZE 2 * 1024 * 1024
+#define AMLHWDMX_DEFAULT_LBM_THRESHOLD -1 // ms
/* clear adaptor size */
#define MAX_BUFFER_SIZE (AMLHWDMX_MAX_ES_BUF_SIZE / 188 * 188)
diff --git a/src/gstamldmx.c b/src/gstamldmx.c
index 801f2b0..e7e8a1f 100644
--- a/src/gstamldmx.c
+++ b/src/gstamldmx.c
@@ -92,6 +92,7 @@
static void gst_amlhwdmx_wait(GstAmlhwdmx *amlhwdmx);
static gboolean gst_amlhwdmx_write(GstAmlhwdmx *amlhwdmx, GstBuffer *buffer);
static gboolean gst_amlhwdmx_check_filters_mem_status(GstAmlhwdmx *amlhwdmx);
+void gst_amlhwdmx_load_balance_control(GstAmlhwdmx *amlhwdmx, gint idx, gint ts);
static void gst_amlhwdmx_update_pts_cb(gpointer elem, guint64 pts, gpointer user_data);
static void gst_amlhwdmx_dump_es(GstDmxSrcStreamPad *srcpad, GstBuffer *buf);
static void gst_amlhwdmx_dump_ts(GstBuffer *buffer, const char *file_name);
@@ -107,6 +108,7 @@
PROP_IS_LIVE,
PROP_IS_SECURE,
PROP_IS_SECURE_ES,
+ PROP_LOAD_BALANCE,
PROP_CC_DESC
};
@@ -258,6 +260,12 @@
DEFAULT_IS_SECURE_ES,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property(gobject_klass, PROP_LOAD_BALANCE,
+ g_param_spec_int("load balance threshold", "load balance threshold in ms",
+ "cfg streams output ts diff is less than the threshold. -1 means disable",
+ G_MININT, G_MAXINT, AMLHWDMX_DEFAULT_LBM_THRESHOLD,
+ (GParamFlags)G_PARAM_READWRITE));
+
gst_element_class_add_pad_template(gstelement_klass, gst_static_pad_template_get(&sink_template));
gst_element_class_add_static_pad_template(gstelement_klass, &video_src_template);
gst_element_class_add_static_pad_template(gstelement_klass, &audio_src_template);
@@ -387,6 +395,12 @@
GST_DEBUG_OBJECT(amlhwdmx, "get PROP_IS_SECURE_ES:%d", amlhwdmx->is_secure_es);
break;
}
+ case PROP_LOAD_BALANCE:
+ {
+ g_value_set_int(value, amlhwdmx->lb_th);
+ GST_DEBUG_OBJECT(amlhwdmx, "get PROP_LOAD_BALANCE:%d", amlhwdmx->lb_th);
+ break;
+ }
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
@@ -462,6 +476,12 @@
GST_DEBUG_OBJECT(amlhwdmx, "set PROP_IS_SECURE_ES:%d", amlhwdmx->is_secure_es);
break;
}
+ case PROP_LOAD_BALANCE:
+ {
+ amlhwdmx->lb_th = g_value_get_int(value);
+ GST_DEBUG_OBJECT(amlhwdmx, "set PROP_LOAD_BALANCE:%d", amlhwdmx->lb_th);
+ break;
+ }
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
@@ -503,6 +523,7 @@
amlhwdmx->is_live = DEFAULT_IS_LIVE;
amlhwdmx->is_secure = DEFAULT_IS_SECURE;
amlhwdmx->is_secure_es = DEFAULT_IS_SECURE_ES;
+ amlhwdmx->lb_th = AMLHWDMX_DEFAULT_LBM_THRESHOLD;
amlhwdmx->caps = NULL;
amlhwdmx->seek_event = NULL;
amlhwdmx->needQueryTsInfo = TRUE;
@@ -1344,6 +1365,7 @@
stream_pad->stream_object = stream_object;
stream_pad->stream_type = gst_stream_get_stream_type(stream_object);
stream_pad->newest_pts = GST_CLOCK_TIME_NONE;
+ stream_pad->idx = amlhwdmx->srcpad_num;
amlhwdmx->srcpad_num++;
ret = TRUE;
@@ -1510,6 +1532,51 @@
return ret;
}
+void gst_amlhwdmx_load_balance_control(GstAmlhwdmx *amlhwdmx, gint idx, gint ts)
+{
+ g_return_if_fail(amlhwdmx);
+ g_return_if_fail(idx >= 0 && idx < MAX_STREAM_NUM);
+ g_return_if_fail(GST_CLOCK_TIME_NONE != ts);
+
+ if (-1 == amlhwdmx->lb_th)
+ return;
+
+ GST_DEBUG_OBJECT(amlhwdmx, "check stream[%d] | check ts(%d ms)", idx, ts);
+
+ if (ts <= g_atomic_int_get(&amlhwdmx->max_ts[idx]))
+ {
+ GST_DEBUG_OBJECT(amlhwdmx, "check stream[%d] | check ts((%d ms) <= it's max ts(%d ms). control done", idx, ts, amlhwdmx->max_ts[idx]);
+ return;
+ }
+
+ g_atomic_int_set(&amlhwdmx->max_ts[idx], ts);
+
+ while (!(amlhwdmx->flags & GST_AML_DMX_FLUSHING))
+ {
+ gint i = 0;
+ gboolean need_wait = FALSE;
+
+ for (i = 0; i < MAX_STREAM_NUM; i++)
+ {
+ gint stream_ts = g_atomic_int_get(&amlhwdmx->max_ts[i]);
+ if ((stream_ts != 0) && (i != idx) && (ts > stream_ts) && (ts - stream_ts > AMLHWDMX_DEFAULT_LBM_THRESHOLD))
+ {
+ need_wait = TRUE;
+ break;
+ }
+ }
+ if (need_wait)
+ {
+ GST_DEBUG_OBJECT(amlhwdmx, "check stream[%d] | check ts(%d ms) compare stream[%d]'s max ts(%d) overflow. need wait", idx, ts, i, amlhwdmx->max_ts[i]);
+ g_usleep(50 * 1000);
+ continue;
+ }
+ break;
+ }
+
+ GST_DEBUG_OBJECT(amlhwdmx, "check stream[%d] | check ts(%d ms) control done", idx, ts);
+}
+
static void gst_amlhwdmx_loop(GstAmlhwdmx *amlhwdmx)
{
GstFlowReturn ret = GST_FLOW_OK;
@@ -1754,6 +1821,8 @@
GST_TIME_ARGS(GST_BUFFER_PTS(buf)));
}
+ gst_amlhwdmx_load_balance_control(amlhwdmx, srcpad->idx, GST_BUFFER_PTS(buf) / GST_MSECOND);
+
gst_pad_push(srcpad->pad, buf);
GST_DEBUG_OBJECT(srcpad->pad, "pushed buf:%p (size:%d start: %" GST_TIME_FORMAT ", duration: %" GST_TIME_FORMAT ")",
diff --git a/src/gstamldmx.h b/src/gstamldmx.h
index f04d9bb..4642697 100644
--- a/src/gstamldmx.h
+++ b/src/gstamldmx.h
@@ -106,6 +106,7 @@
{
GstPad *pad;
GstAdapterPipe *adapter_pipe;
+ gint idx;
gint es_fid;
guint pid;
GstStream *stream_object;
@@ -124,6 +125,9 @@
GstAllocator *fd_allocator;
+ gint max_ts[MAX_STREAM_NUM]; // for load balance control(ms)
+ gint lb_th; // load balance threshold(ms)
+
gint prog_no; /* ReadWriteable */
char available_languages[256]; /* Readable */
char preferred_language[8]; /* Writeable */