amldemux: CF1 add probe pmt table flow [1/1]

PD#SWPL-129898

Problem:
start es filter after push ts data
this will case es before pmt won't be dmx

Solution:
cache ts buffer before pmt pkt
re-push cached buffers after started es filters

Verify:
(detail info)

Change-Id: Ic789fd529fcf53e932986b23d3e9cdcec69aaa8e
Signed-off-by: xuesong.jiang <xuesong.jiang@amlogic.com>
diff --git a/src/gstamladapterpipe.c b/src/gstamladapterpipe.c
index f78db51..1ea9b78 100644
--- a/src/gstamladapterpipe.c
+++ b/src/gstamladapterpipe.c
@@ -390,6 +390,46 @@
     goto done;
 }
 
+GstFlowReturn gst_amladapterpipe_peek(GstAdapterPipe *pipe, guint idx, GstBuffer **buffer)
+{
+    g_return_val_if_fail(pipe, GST_FLOW_ERROR);
+    g_return_val_if_fail(buffer, GST_FLOW_ERROR);
+
+    GstFlowReturn ret;
+    guint available;
+
+    ret = GST_FLOW_OK;
+    *buffer = NULL;
+
+    GST_ADAPTER_PIPE_MUTEX_LOCK(pipe);
+
+    if (pipe->secure)
+    {
+        if (gst_queue_array_get_length(pipe->s.secure_ap.sec_buf_q) <= idx)
+        {
+            GST_DEBUG("pipe:%p, type:%d, peek err. q len < idx", pipe, pipe->type);
+        }
+        else
+        {
+            *buffer = (GstBuffer *)gst_queue_array_peek_nth(pipe->s.secure_ap.sec_buf_q, idx);
+        }
+    }
+    else
+    {
+        // TODO: add peek flow for using adaptor
+        *buffer = gst_adapter_take_buffer(pipe->s.clear_ap.adapter, BUFFER_SIZE);
+    }
+
+    if (*buffer == NULL)
+        ret = GST_FLOW_AMLAP_EMPTY;
+    else
+        gst_buffer_ref(*buffer);
+
+    GST_DEBUG("pipe:%p, type:%d, peeked buf:%p(idx:%d, %d bytes), ret:%d", pipe, pipe->type, *buffer, idx, *buffer ? gst_buffer_get_size(*buffer) : -1, ret);
+    GST_ADAPTER_PIPE_MUTEX_UNLOCK(pipe);
+    return ret;
+}
+
 GstFlowReturn gst_amladapterpipe_set_flushing(GstAdapterPipe *pipe, gboolean flushing)
 {
     g_return_val_if_fail(pipe, GST_FLOW_ERROR);
diff --git a/src/gstamladapterpipe.h b/src/gstamladapterpipe.h
index 378a905..fd1eaa6 100644
--- a/src/gstamladapterpipe.h
+++ b/src/gstamladapterpipe.h
@@ -88,6 +88,7 @@
 GstFlowReturn gst_amladapterpipe_push(GstAdapterPipe *pipe, GstBuffer *buffer);
 GstFlowReturn gst_amladapterpipe_push_uncheck(GstAdapterPipe *pipe, GstBuffer *buffer);
 GstFlowReturn gst_amladapterpipe_pop(GstAdapterPipe *pipe, GstBuffer **buffer);
+GstFlowReturn gst_amladapterpipe_peek(GstAdapterPipe *pipe, guint idx, GstBuffer **buffer);
 GstFlowReturn gst_amladapterpipe_set_flushing(GstAdapterPipe *pipe, gboolean flushing);
 GstFlowReturn gst_amladapterpipe_is_flushing(GstAdapterPipe *pipe, gboolean *flushing);
 void gst_amladapterpipe_set_eos(GstAdapterPipe *pipe);
diff --git a/src/gstamldmx.c b/src/gstamldmx.c
index c90f7dd..773637b 100644
--- a/src/gstamldmx.c
+++ b/src/gstamldmx.c
@@ -176,6 +176,8 @@
     }
     case GST_STATE_CHANGE_READY_TO_PAUSED:
     {
+        amlhwdmx->flags |= GST_AML_DMX_PROB;
+
         if (amlhwdmx->suf & GST_INIT_PLAY)
         {
             gst_amldmxwrap_start_filter(amlhwdmx->dmx_dev_id, amlhwdmx->sinkpad.pat_para.fid);
@@ -503,6 +505,7 @@
     amlhwdmx->selected_audio_stream = DEFAULT_SELECTED_AUDIO_TRACK;
     amlhwdmx->suf = GST_INIT_PLAY;
     amlhwdmx->updated_pmt_num = 0;
+    amlhwdmx->sinkpad.prob_peek_idx = 0;
 
     gst_amladapterpipe_reset(amlhwdmx->sinkpad.adapter_pipe);
 
@@ -1471,8 +1474,25 @@
     }
     else if (!amlhwdmx->seekable)
     {
-        ret = gst_amladapterpipe_pop(amlhwdmx->sinkpad.adapter_pipe, &buf);
-        GST_DEBUG("pop buffer:%p(%d bytes) is_secure:%d, ret:%d", buf, gst_buffer_get_size(buf), amlhwdmx->is_secure, ret);
+        GST_DEBUG_OBJECT(amlhwdmx, "flags:0x%x", amlhwdmx->flags);
+        if (amlhwdmx->flags & GST_AML_DMX_PROB)
+        {
+            ret = gst_amladapterpipe_peek(amlhwdmx->sinkpad.adapter_pipe, amlhwdmx->sinkpad.prob_peek_idx, &buf);
+            GST_DEBUG("peek buffer:%p(idx:%d, %d bytes) is_secure:%d, ret:%d",
+                      buf,
+                      amlhwdmx->sinkpad.prob_peek_idx, buf ? gst_buffer_get_size(buf) : -1,
+                      amlhwdmx->is_secure, ret);
+            if (buf)
+                amlhwdmx->sinkpad.prob_peek_idx++;
+        }
+        else
+        {
+            ret = gst_amladapterpipe_pop(amlhwdmx->sinkpad.adapter_pipe, &buf);
+            GST_DEBUG("pop buffer:%p(%d bytes) is_secure:%d, ret:%d",
+                      buf,
+                      buf ? gst_buffer_get_size(buf) : -1,
+                      amlhwdmx->is_secure, ret);
+        }
     }
     else
     {
@@ -1600,7 +1620,8 @@
 
     GST_DEBUG("got pts:%llu - %" GST_TIME_FORMAT, pts, GST_TIME_ARGS(pts));
     AMLHWDMX_CONTEXT_LOCK(amlhwdmx);
-    amlhwdmx->sinkpad.decoded_pts = pts;
+    if (pts != GST_CLOCK_TIME_NONE)
+        amlhwdmx->sinkpad.decoded_pts = pts;
     AMLHWDMX_CONTEXT_UNLOCK(amlhwdmx);
 }
 
diff --git a/src/gstamldmx.h b/src/gstamldmx.h
index 472497c..e5eeeb9 100644
--- a/src/gstamldmx.h
+++ b/src/gstamldmx.h
@@ -54,6 +54,7 @@
     GST_AML_DMX_NONE = 0,
     GST_AML_DMX_FLUSHING = (1 << 0),
     GST_AML_DMX_EOS = (1 << 1),
+    GST_AML_DMX_PROB = (1 << 2),
 } GstAmlDmxFlags;
 
 typedef struct _GstAmlhwdmx GstAmlhwdmx;
@@ -87,6 +88,8 @@
     GstDmxPatPara pat_para;
     GstDmxPmtPara pmt_para[MAX_PROGRAM_NUM];
 
+    guint prob_peek_idx;
+
     /* context para for push mode */
     GstTask *task;
     GRecMutex task_lock;
diff --git a/src/gstamldmxfilter.c b/src/gstamldmxfilter.c
index ee82827..591a67c 100644
--- a/src/gstamldmxfilter.c
+++ b/src/gstamldmxfilter.c
@@ -160,7 +160,8 @@
 
         gst_pad_start_task(amlhwdmx->srcpads[i].pad, (GstTaskFunction)gst_amlhwdmx_src_loop, &amlhwdmx->srcpads[i], NULL);
     }
-    GST_LOG_OBJECT(amlhwdmx, "found stream:%d", amlhwdmx->srcpad_num);
+    amlhwdmx->flags &= ~GST_AML_DMX_PROB;
+    GST_LOG_OBJECT(amlhwdmx, "found %d streams", amlhwdmx->srcpad_num);
 }
 
 void gst_parse_es_cb(int dev_no, int fid, const guint *data, int len, void *user_data)
@@ -178,6 +179,11 @@
     amlhwdmx = (GstAmlhwdmx *)gst_pad_get_parent(dmx_src_pad->pad);
     adapter_pipe = dmx_src_pad->adapter_pipe;
 
+    if (amlhwdmx->flags | GST_AML_DMX_PROB)
+    {
+        GST_DEBUG_OBJECT(amlhwdmx, "ignore es buffer in probe state ");
+    }
+
     GST_DEBUG_OBJECT(amlhwdmx, "trace in | dev_no:%d, fid:%d, data:%p, len:%d", dev_no, fid, data, len);
 
     buf = gst_es_filter_construct_buf(amlhwdmx, dev_no, fid, data, len);
diff --git a/src/gstamldmxwrap.c b/src/gstamldmxwrap.c
index f8eb795..02def2d 100644
--- a/src/gstamldmxwrap.c
+++ b/src/gstamldmxwrap.c
@@ -699,8 +699,7 @@
     }
 
     filter_mem_info.filter_num = 1;
-    // filter_mem_info.info[0].type = stream_type;
-    filter_mem_info.info[0].type = DMX_VIDEO_TYPE;
+    filter_mem_info.info[0].type = stream_type;
     filter_mem_info.info[0].pid = pid;
     if (ioctl(filter->fd, DMX_GET_FILTER_MEM_INFO, &filter_mem_info) < 0)
     {