amldemux: CF1 add load balance for es streams [1/1]

PD#SWPL-135165

Problem:
add load balance for es streams
disable by default

Solution:
(detail info)

Verify:
(detail info)

Change-Id: I4e246ac331a52f1270d9d3ab415f07747265df24
Signed-off-by: xuesong.jiang <xuesong.jiang@amlogic.com>
diff --git a/src/aml_defs.h b/src/aml_defs.h
index 68c809b..4421b7f 100644
--- a/src/aml_defs.h
+++ b/src/aml_defs.h
@@ -53,6 +53,7 @@
 /** dmx es ring buffer threshold **/
 #define AMLHWDMX_CLEAR_VIDEO_ES_RING_BUF_SIZE AMLHWDMX_MAX_ES_BUF_SIZE
 #define AMLHWDMX_DEFAULT_ES_RING_BUF_SIZE 2 * 1024 * 1024
+#define AMLHWDMX_DEFAULT_LBM_THRESHOLD -1 // ms
 
 /* clear adaptor size */
 #define MAX_BUFFER_SIZE (AMLHWDMX_MAX_ES_BUF_SIZE / 188 * 188)
diff --git a/src/gstamldmx.c b/src/gstamldmx.c
index 801f2b0..e7e8a1f 100644
--- a/src/gstamldmx.c
+++ b/src/gstamldmx.c
@@ -92,6 +92,7 @@
 static void gst_amlhwdmx_wait(GstAmlhwdmx *amlhwdmx);
 static gboolean gst_amlhwdmx_write(GstAmlhwdmx *amlhwdmx, GstBuffer *buffer);
 static gboolean gst_amlhwdmx_check_filters_mem_status(GstAmlhwdmx *amlhwdmx);
+void gst_amlhwdmx_load_balance_control(GstAmlhwdmx *amlhwdmx, gint idx, gint ts);
 static void gst_amlhwdmx_update_pts_cb(gpointer elem, guint64 pts, gpointer user_data);
 static void gst_amlhwdmx_dump_es(GstDmxSrcStreamPad *srcpad, GstBuffer *buf);
 static void gst_amlhwdmx_dump_ts(GstBuffer *buffer, const char *file_name);
@@ -107,6 +108,7 @@
     PROP_IS_LIVE,
     PROP_IS_SECURE,
     PROP_IS_SECURE_ES,
+    PROP_LOAD_BALANCE,
     PROP_CC_DESC
 };
 
@@ -258,6 +260,12 @@
                                                          DEFAULT_IS_SECURE_ES,
                                                          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+    g_object_class_install_property(gobject_klass, PROP_LOAD_BALANCE,
+                                    g_param_spec_int("load balance threshold", "load balance threshold in ms",
+                                                     "cfg streams output ts diff is less than the threshold. -1 means disable",
+                                                     G_MININT, G_MAXINT, AMLHWDMX_DEFAULT_LBM_THRESHOLD,
+                                                     (GParamFlags)G_PARAM_READWRITE));
+
     gst_element_class_add_pad_template(gstelement_klass, gst_static_pad_template_get(&sink_template));
     gst_element_class_add_static_pad_template(gstelement_klass, &video_src_template);
     gst_element_class_add_static_pad_template(gstelement_klass, &audio_src_template);
@@ -387,6 +395,12 @@
         GST_DEBUG_OBJECT(amlhwdmx, "get PROP_IS_SECURE_ES:%d", amlhwdmx->is_secure_es);
         break;
     }
+    case PROP_LOAD_BALANCE:
+    {
+        g_value_set_int(value, amlhwdmx->lb_th);
+        GST_DEBUG_OBJECT(amlhwdmx, "get PROP_LOAD_BALANCE:%d", amlhwdmx->lb_th);
+        break;
+    }
     default:
         G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
         break;
@@ -462,6 +476,12 @@
         GST_DEBUG_OBJECT(amlhwdmx, "set PROP_IS_SECURE_ES:%d", amlhwdmx->is_secure_es);
         break;
     }
+    case PROP_LOAD_BALANCE:
+    {
+        amlhwdmx->lb_th = g_value_get_int(value);
+        GST_DEBUG_OBJECT(amlhwdmx, "set PROP_LOAD_BALANCE:%d", amlhwdmx->lb_th);
+        break;
+    }
     default:
         G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
         break;
@@ -503,6 +523,7 @@
     amlhwdmx->is_live = DEFAULT_IS_LIVE;
     amlhwdmx->is_secure = DEFAULT_IS_SECURE;
     amlhwdmx->is_secure_es = DEFAULT_IS_SECURE_ES;
+    amlhwdmx->lb_th = AMLHWDMX_DEFAULT_LBM_THRESHOLD;
     amlhwdmx->caps = NULL;
     amlhwdmx->seek_event = NULL;
     amlhwdmx->needQueryTsInfo = TRUE;
@@ -1344,6 +1365,7 @@
         stream_pad->stream_object = stream_object;
         stream_pad->stream_type = gst_stream_get_stream_type(stream_object);
         stream_pad->newest_pts = GST_CLOCK_TIME_NONE;
+        stream_pad->idx = amlhwdmx->srcpad_num;
         amlhwdmx->srcpad_num++;
 
         ret = TRUE;
@@ -1510,6 +1532,51 @@
     return ret;
 }
 
+void gst_amlhwdmx_load_balance_control(GstAmlhwdmx *amlhwdmx, gint idx, gint ts)
+{
+    g_return_if_fail(amlhwdmx);
+    g_return_if_fail(idx >= 0 && idx < MAX_STREAM_NUM);
+    g_return_if_fail(GST_CLOCK_TIME_NONE != ts);
+
+    if (-1 == amlhwdmx->lb_th)
+        return;
+
+    GST_DEBUG_OBJECT(amlhwdmx, "check stream[%d] | check ts(%d ms)", idx, ts);
+
+    if (ts <= g_atomic_int_get(&amlhwdmx->max_ts[idx]))
+    {
+        GST_DEBUG_OBJECT(amlhwdmx, "check stream[%d] | check ts((%d ms) <= it's max ts(%d ms). control done", idx, ts, amlhwdmx->max_ts[idx]);
+        return;
+    }
+
+    g_atomic_int_set(&amlhwdmx->max_ts[idx], ts);
+
+    while (!(amlhwdmx->flags & GST_AML_DMX_FLUSHING))
+    {
+        gint i = 0;
+        gboolean need_wait = FALSE;
+
+        for (i = 0; i < MAX_STREAM_NUM; i++)
+        {
+            gint stream_ts = g_atomic_int_get(&amlhwdmx->max_ts[i]);
+            if ((stream_ts != 0) && (i != idx) && (ts > stream_ts) && (ts - stream_ts > AMLHWDMX_DEFAULT_LBM_THRESHOLD))
+            {
+                need_wait = TRUE;
+                break;
+            }
+        }
+        if (need_wait)
+        {
+            GST_DEBUG_OBJECT(amlhwdmx, "check stream[%d] | check ts(%d ms) compare stream[%d]'s max ts(%d) overflow. need wait", idx, ts, i, amlhwdmx->max_ts[i]);
+            g_usleep(50 * 1000);
+            continue;
+        }
+        break;
+    }
+
+    GST_DEBUG_OBJECT(amlhwdmx, "check stream[%d] | check ts(%d ms) control done", idx, ts);
+}
+
 static void gst_amlhwdmx_loop(GstAmlhwdmx *amlhwdmx)
 {
     GstFlowReturn ret = GST_FLOW_OK;
@@ -1754,6 +1821,8 @@
                          GST_TIME_ARGS(GST_BUFFER_PTS(buf)));
     }
 
+    gst_amlhwdmx_load_balance_control(amlhwdmx, srcpad->idx, GST_BUFFER_PTS(buf) / GST_MSECOND);
+
     gst_pad_push(srcpad->pad, buf);
 
     GST_DEBUG_OBJECT(srcpad->pad, "pushed buf:%p (size:%d start: %" GST_TIME_FORMAT ", duration: %" GST_TIME_FORMAT ")",
diff --git a/src/gstamldmx.h b/src/gstamldmx.h
index f04d9bb..4642697 100644
--- a/src/gstamldmx.h
+++ b/src/gstamldmx.h
@@ -106,6 +106,7 @@
 {
     GstPad *pad;
     GstAdapterPipe *adapter_pipe;
+    gint idx;
     gint es_fid;
     guint pid;
     GstStream *stream_object;
@@ -124,6 +125,9 @@
 
     GstAllocator *fd_allocator;
 
+    gint max_ts[MAX_STREAM_NUM]; // for load balance control(ms)
+    gint lb_th;                  // load balance threshold(ms)
+
     gint prog_no;                  /* ReadWriteable */
     char available_languages[256]; /* Readable */
     char preferred_language[8];    /* Writeable */