blob: 273ef4397d1c035c50b05d30b57e12a8e3a5511b [file] [log] [blame]
hanghang.luofa7b16f2024-05-31 14:44:31 +08001/*
2 * mpegtsparse.c -
3 * Copyright (C) 2007 Alessandro Decina
4 *
5 * Authors:
6 * Alessandro Decina <alessandro@nnva.org>
7 * Zaheer Abbas Merali <zaheerabbas at merali dot org>
8 *
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Library General Public
11 * License as published by the Free Software Foundation; either
12 * version 2 of the License, or (at your option) any later version.
13 *
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Library General Public License for more details.
18 *
19 * You should have received a copy of the GNU Library General Public
20 * License along with this library; if not, write to the
21 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22 * Boston, MA 02110-1301, USA.
23 */
24
25#ifdef HAVE_CONFIG_H
26#include "config.h"
27#endif
28
29#include <stdio.h>
30#include <stdlib.h>
31#include <string.h>
32#include "amltsbase.h"
33#include "amltsparse.h"
34#include "gstamldesc.h"
hanghang.luofa7b16f2024-05-31 14:44:31 +080035/* latency in mseconds is maximum 100 ms between PCR */
36#define TS_LATENCY 100
37
38#define TABLE_ID_UNSET 0xFF
39#define RUNNING_STATUS_RUNNING 4
40#define SYNC_BYTE 0x47
41
42GST_DEBUG_CATEGORY_STATIC (mpegts_parse_debug);
43#define GST_CAT_DEFAULT mpegts_parse_debug
44
45typedef struct _AmlTSParsePad AmlTSParsePad;
46
47typedef struct
48{
49 AmlTSBaseProgram program;
50 AmlTSParsePad *tspad;
51} AmlTSParseProgram;
52
53struct _AmlTSParsePad
54{
55 GstPad *pad;
56
57 /* the program number that the peer wants on this pad */
58 gint program_number;
59 AmlTSParseProgram *program;
60
61 /* set to FALSE before a push and TRUE after */
62 gboolean pushed;
63
64 /* the return of the latest push */
65 GstFlowReturn flow_return;
66
67 AmlTSParse2Adapter ts_adapter;
68};
69
70static GstStaticPadTemplate src_template =
71GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC,
72 GST_PAD_ALWAYS,
73 GST_STATIC_CAPS ("video/mpegts, " "systemstream = (boolean) true ")
74 );
75
76static GstStaticPadTemplate program_template =
77GST_STATIC_PAD_TEMPLATE ("program_%u", GST_PAD_SRC,
78 GST_PAD_REQUEST,
79 GST_STATIC_CAPS ("video/mpegts, " "systemstream = (boolean) true ")
80 );
81
82enum
83{
84 PROP_0,
85 PROP_SET_TIMESTAMPS,
86 PROP_SMOOTHING_LATENCY,
87 PROP_PCR_PID,
88 PROP_ALIGNMENT,
89 PROP_SPLIT_ON_RAI,
90 /* FILL ME */
91};
92
93static void amlts_parse_set_property (GObject * object, guint prop_id,
94 const GValue * value, GParamSpec * pspec);
95static void amlts_parse_get_property (GObject * object, guint prop_id,
96 GValue * value, GParamSpec * pspec);
97
98static void
99amlts_parse_program_started (AmlTSBase * base, AmlTSBaseProgram * program);
100static void
101amlts_parse_program_stopped (AmlTSBase * base, AmlTSBaseProgram * program);
102
103static GstFlowReturn
104amlts_parse_push (AmlTSBase * base, AmlTSPacketizerPacket * packet,
105 GstMpegtsSection * section);
106static void amlts_parse_inspect_packet (AmlTSBase * base,
107 AmlTSPacketizerPacket * packet);
108static GstFlowReturn amlts_parse_have_buffer (AmlTSBase * base,
109 GstBuffer * buffer);
110
111static AmlTSParsePad *amlts_parse_create_tspad (AmlTSParse2 * parse,
112 const gchar * name);
113static void amlts_parse_destroy_tspad (AmlTSParse2 * parse,
114 AmlTSParsePad * tspad);
115
116static void amlts_parse_pad_removed (GstElement * element, GstPad * pad);
117static GstPad *amlts_parse_request_new_pad (GstElement * element,
118 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
119static void amlts_parse_release_pad (GstElement * element, GstPad * pad);
120static gboolean amlts_parse_src_pad_query (GstPad * pad, GstObject * parent,
121 GstQuery * query);
122static gboolean push_event (AmlTSBase * base, GstEvent * event);
123
124#define amlts_parse_parent_class parent_class
125G_DEFINE_TYPE (AmlTSParse2, amlts_parse, GST_TYPE_AMLTS_BASE);
bo.xiaoff2de692024-08-07 14:38:55 +0800126/* aml mark:new method at new version
127GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (tsparse, "tsparse",
128 GST_RANK_NONE, GST_TYPE_MPEGTS_PARSE,
129 GST_DEBUG_CATEGORY_INIT (mpegts_parse_debug, "tsparse", 0,
130 "MPEG transport stream parser"));
131*/
hanghang.luofa7b16f2024-05-31 14:44:31 +0800132static void amlts_parse_reset (AmlTSBase * base);
133static GstFlowReturn amlts_parse_input_done (AmlTSBase * base);
134static GstFlowReturn
bo.xiaoff2de692024-08-07 14:38:55 +0800135aml_drain_pending_buffers (AmlTSParse2 * parse, gboolean drain_all);
hanghang.luofa7b16f2024-05-31 14:44:31 +0800136
137static void
138amlts_parse_finalize (GObject * object)
139{
140 AmlTSParse2 *parse = (AmlTSParse2 *) object;
141
142 gst_flow_combiner_free (parse->flowcombiner);
143
144 gst_adapter_clear (parse->ts_adapter.adapter);
145 g_object_unref (parse->ts_adapter.adapter);
146
147 GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
148}
149
150static void
151amlts_parse_class_init (AmlTSParse2Class * klass)
152{
153 GObjectClass *gobject_class = (GObjectClass *) (klass);
154 GstElementClass *element_class;
155 AmlTSBaseClass *ts_class;
156
157 gobject_class->set_property = amlts_parse_set_property;
158 gobject_class->get_property = amlts_parse_get_property;
159 gobject_class->finalize = amlts_parse_finalize;
160
161 g_object_class_install_property (gobject_class, PROP_SET_TIMESTAMPS,
162 g_param_spec_boolean ("set-timestamps",
163 "Timestamp (or re-timestamp) the output stream",
164 "If set, timestamps will be set on the output buffers using "
165 "PCRs and smoothed over the smoothing-latency period", FALSE,
166 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
167 g_object_class_install_property (gobject_class, PROP_SMOOTHING_LATENCY,
168 g_param_spec_uint ("smoothing-latency", "Smoothing Latency",
169 "Additional latency in microseconds for smoothing jitter in input timestamps on live capture",
170 0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
171 g_object_class_install_property (gobject_class, PROP_PCR_PID,
172 g_param_spec_int ("pcr-pid", "PID containing PCR",
173 "Set the PID to use for PCR values (-1 for auto)",
174 -1, G_MAXINT, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
175 g_object_class_install_property (gobject_class, PROP_ALIGNMENT,
176 g_param_spec_uint ("alignment", "Alignment",
177 "Number of packets per buffer (padded with dummy packets on EOS) (0 = auto)",
178 0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
179 g_object_class_install_property (gobject_class, PROP_SPLIT_ON_RAI,
180 g_param_spec_boolean ("split-on-rai", "Split on RAI",
181 "If set, buffers sized smaller than the alignment will be sent "
182 "so that RAI packets are at the start of a new buffer", FALSE,
183 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
184
185 element_class = GST_ELEMENT_CLASS (klass);
186 element_class->pad_removed = amlts_parse_pad_removed;
187 element_class->request_new_pad = amlts_parse_request_new_pad;
188 element_class->release_pad = amlts_parse_release_pad;
189
190 gst_element_class_add_static_pad_template (element_class, &src_template);
191 gst_element_class_add_static_pad_template (element_class, &program_template);
192
193 gst_element_class_set_static_metadata (element_class,
194 "MPEG transport stream parser", "Codec/Parser",
195 "Parses MPEG2 transport streams",
196 "Alessandro Decina <alessandro@nnva.org>, "
197 "Zaheer Abbas Merali <zaheerabbas at merali dot org>");
198
199 ts_class = GST_AMLTS_BASE_CLASS (klass);
200 ts_class->push = GST_DEBUG_FUNCPTR (amlts_parse_push);
201 ts_class->push_event = GST_DEBUG_FUNCPTR (push_event);
202 ts_class->program_started = GST_DEBUG_FUNCPTR (amlts_parse_program_started);
203 ts_class->program_stopped = GST_DEBUG_FUNCPTR (amlts_parse_program_stopped);
204 ts_class->reset = GST_DEBUG_FUNCPTR (amlts_parse_reset);
205 ts_class->input_done = GST_DEBUG_FUNCPTR (amlts_parse_input_done);
206 ts_class->inspect_packet = GST_DEBUG_FUNCPTR (amlts_parse_inspect_packet);
207}
208
209static void
210amlts_parse_init (AmlTSParse2 * parse)
211{
212 AmlTSBase *base = (AmlTSBase *) parse;
213
214 base->program_size = sizeof (AmlTSParseProgram);
215 base->push_data = TRUE;
216 base->push_section = TRUE;
217 base->push_unknown = TRUE;
218
219 parse->user_pcr_pid = parse->pcr_pid = -1;
220
221 parse->flowcombiner = gst_flow_combiner_new ();
222
223 parse->srcpad = gst_pad_new_from_static_template (&src_template, "src");
224 gst_flow_combiner_add_pad (parse->flowcombiner, parse->srcpad);
225 parse->first = TRUE;
226 gst_pad_set_query_function (parse->srcpad,
227 GST_DEBUG_FUNCPTR (amlts_parse_src_pad_query));
228 gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad);
229
230 parse->have_group_id = FALSE;
231 parse->group_id = G_MAXUINT;
232
233 parse->ts_adapter.adapter = gst_adapter_new ();
234 parse->ts_adapter.packets_in_adapter = 0;
235 parse->ts_adapter.first_is_keyframe = TRUE;
236 parse->alignment = 0;
237 parse->is_eos = FALSE;
238 parse->header = 0;
239 parse->split_on_rai = FALSE;
240}
241
242static void
243amlts_parse_reset (AmlTSBase * base)
244{
245 AmlTSParse2 *parse = (AmlTSParse2 *) base;
246
247 /* Set the various know PIDs we are interested in */
248
249 /* CAT */
250 MPEGTS_BIT_SET (base->known_psi, 1);
251 /* NIT, ST */
252 MPEGTS_BIT_SET (base->known_psi, 0x10);
253 /* SDT, BAT, ST */
254 MPEGTS_BIT_SET (base->known_psi, 0x11);
255 /* EIT, ST, CIT (TS 102 323) */
256 MPEGTS_BIT_SET (base->known_psi, 0x12);
257 /* RST, ST */
258 MPEGTS_BIT_SET (base->known_psi, 0x13);
259 /* RNT (TS 102 323) */
260 MPEGTS_BIT_SET (base->known_psi, 0x16);
261 /* inband signalling */
262 MPEGTS_BIT_SET (base->known_psi, 0x1c);
263 /* measurement */
264 MPEGTS_BIT_SET (base->known_psi, 0x1d);
265 /* DIT */
266 MPEGTS_BIT_SET (base->known_psi, 0x1e);
267 /* SIT */
268 MPEGTS_BIT_SET (base->known_psi, 0x1f);
269
270 parse->first = TRUE;
271 parse->have_group_id = FALSE;
272 parse->group_id = G_MAXUINT;
273
274 g_list_free_full (parse->pending_buffers, (GDestroyNotify) gst_buffer_unref);
275 parse->pending_buffers = NULL;
276
277 parse->current_pcr = GST_CLOCK_TIME_NONE;
278 parse->previous_pcr = GST_CLOCK_TIME_NONE;
279 parse->base_pcr = GST_CLOCK_TIME_NONE;
280 parse->bytes_since_pcr = 0;
281 parse->pcr_pid = parse->user_pcr_pid;
282 parse->ts_offset = 0;
283
284 gst_adapter_clear (parse->ts_adapter.adapter);
285 parse->ts_adapter.packets_in_adapter = 0;
286 parse->ts_adapter.first_is_keyframe = TRUE;
287 parse->is_eos = FALSE;
288 parse->header = 0;
289}
290
291static void
292amlts_parse_set_property (GObject * object, guint prop_id,
293 const GValue * value, GParamSpec * pspec)
294{
295 AmlTSParse2 *parse = (AmlTSParse2 *) object;
296
297 switch (prop_id) {
298 case PROP_SET_TIMESTAMPS:
299 parse->set_timestamps = g_value_get_boolean (value);
300 break;
301 case PROP_SMOOTHING_LATENCY:
302 parse->smoothing_latency = GST_USECOND * g_value_get_uint (value);
303 amlts_packetizer_set_pcr_discont_threshold (GST_AMLTS_BASE
304 (parse)->packetizer, parse->smoothing_latency);
305 break;
306 case PROP_PCR_PID:
307 parse->pcr_pid = parse->user_pcr_pid = g_value_get_int (value);
308 break;
309 case PROP_ALIGNMENT:
310 parse->alignment = g_value_get_uint (value);
311 break;
312 case PROP_SPLIT_ON_RAI:
313 parse->split_on_rai = g_value_get_boolean (value);
314 break;
315 default:
316 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
317 }
318}
319
320static void
321amlts_parse_get_property (GObject * object, guint prop_id,
322 GValue * value, GParamSpec * pspec)
323{
324 AmlTSParse2 *parse = (AmlTSParse2 *) object;
325
326 switch (prop_id) {
327 case PROP_SET_TIMESTAMPS:
328 g_value_set_boolean (value, parse->set_timestamps);
329 break;
330 case PROP_SMOOTHING_LATENCY:
331 g_value_set_uint (value, parse->smoothing_latency / GST_USECOND);
332 break;
333 case PROP_PCR_PID:
334 g_value_set_int (value, parse->pcr_pid);
335 break;
336 case PROP_ALIGNMENT:
337 g_value_set_uint (value, parse->alignment);
338 break;
339 case PROP_SPLIT_ON_RAI:
340 g_value_set_boolean (value, parse->split_on_rai);
341 break;
342 default:
343 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
344 }
345}
346
347static gboolean
bo.xiaoff2de692024-08-07 14:38:55 +0800348aml_prepare_src_pad (AmlTSBase * base, AmlTSParse2 * parse)
hanghang.luofa7b16f2024-05-31 14:44:31 +0800349{
350 GstEvent *event;
351 gchar *stream_id;
352 GstCaps *caps;
353
354 if (!parse->first)
355 return TRUE;
356
357 /* If there's no packet_size yet, we can't set caps yet */
358 if (G_UNLIKELY (base->packetizer->packet_size == 0))
359 return FALSE;
360
361 stream_id =
362 gst_pad_create_stream_id (parse->srcpad, GST_ELEMENT_CAST (base),
363 "multi-program");
364
365 event =
366 gst_pad_get_sticky_event (parse->parent.sinkpad, GST_EVENT_STREAM_START,
367 0);
368 if (event) {
369 if (gst_event_parse_group_id (event, &parse->group_id))
370 parse->have_group_id = TRUE;
371 else
372 parse->have_group_id = FALSE;
373 gst_event_unref (event);
374 } else if (!parse->have_group_id) {
375 parse->have_group_id = TRUE;
376 parse->group_id = gst_util_group_id_next ();
377 }
378 event = gst_event_new_stream_start (stream_id);
379 if (parse->have_group_id)
380 gst_event_set_group_id (event, parse->group_id);
381
382 gst_pad_push_event (parse->srcpad, event);
383 g_free (stream_id);
384
385 caps = gst_caps_new_simple ("video/mpegts",
386 "systemstream", G_TYPE_BOOLEAN, TRUE,
387 "packetsize", G_TYPE_INT, base->packetizer->packet_size, NULL);
388
389 gst_pad_set_caps (parse->srcpad, caps);
390 gst_caps_unref (caps);
391
392 /* If setting output timestamps, ensure that the output segment is TIME */
393 if (parse->set_timestamps == FALSE || base->segment.format == GST_FORMAT_TIME)
394 /* Just use the upstream segment */
395 base->out_segment = base->segment;
396 else {
397 GstSegment *seg = &base->out_segment;
398 gst_segment_init (seg, GST_FORMAT_TIME);
399 GST_DEBUG_OBJECT (parse,
400 "Generating time output segment %" GST_SEGMENT_FORMAT, seg);
401 }
402 gst_pad_push_event (parse->srcpad,
403 gst_event_new_segment (&base->out_segment));
404
405 parse->first = FALSE;
406
407 return TRUE;
408}
409
410static gboolean
411push_event (AmlTSBase * base, GstEvent * event)
412{
413 AmlTSParse2 *parse = (AmlTSParse2 *) base;
414 GList *tmp;
415
416 if (G_UNLIKELY (parse->first)) {
417 /* We will send the segment when really starting */
418 if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT)) {
419 gst_event_unref (event);
420 return TRUE;
421 }
bo.xiaoff2de692024-08-07 14:38:55 +0800422 aml_prepare_src_pad (base, parse);
hanghang.luofa7b16f2024-05-31 14:44:31 +0800423 }
424 if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_EOS)) {
425 gsize packet_size = base->packetizer->packet_size;
426
427 parse->is_eos = TRUE;
428
429 if (packet_size > 0 && parse->alignment > 0 &&
430 parse->ts_adapter.packets_in_adapter > 0
431 && parse->ts_adapter.packets_in_adapter < parse->alignment) {
432 GstBuffer *buf;
433 GstMapInfo map;
434 guint8 *data;
435 gint missing_packets =
436 parse->alignment - parse->ts_adapter.packets_in_adapter;
437 gint i = missing_packets;
438
439 GST_DEBUG_OBJECT (parse, "Adding %d dummy packets", missing_packets);
440
441 buf = gst_buffer_new_and_alloc (missing_packets * packet_size);
442 gst_buffer_map (buf, &map, GST_MAP_READWRITE);
443 data = map.data;
444
445 for (; i > 0; i--) {
446 gint offset;
447
448 if (packet_size > MPEGTS_NORMAL_PACKETSIZE) {
449 parse->header++;
450 GST_WRITE_UINT32_BE (data, parse->header);
451 offset = 4;
452 } else {
453 offset = 0;
454 }
455 GST_WRITE_UINT8 (data + offset, SYNC_BYTE);
456 /* null packet PID */
457 GST_WRITE_UINT16_BE (data + offset + 1, 0x1FFF);
458 /* no adaptation field exists | continuity counter undefined */
459 GST_WRITE_UINT8 (data + offset + 3, 0x10);
460 /* payload */
461 memset (data + offset + 4, 0, MPEGTS_NORMAL_PACKETSIZE - 4);
462 data += packet_size;
463 }
464 gst_buffer_unmap (buf, &map);
465 gst_adapter_push (parse->ts_adapter.adapter, buf);
466 parse->ts_adapter.packets_in_adapter += missing_packets;
467 }
bo.xiaoff2de692024-08-07 14:38:55 +0800468 aml_drain_pending_buffers (parse, TRUE);
hanghang.luofa7b16f2024-05-31 14:44:31 +0800469 }
470
471 if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
472 parse->ts_offset = 0;
473
474 for (tmp = parse->srcpads; tmp; tmp = tmp->next) {
475 GstPad *pad = (GstPad *) tmp->data;
476 if (pad) {
477 gst_event_ref (event);
478 gst_pad_push_event (pad, event);
479 }
480 }
481
482 gst_pad_push_event (parse->srcpad, event);
483
484 return TRUE;
485}
486
487static AmlTSParsePad *
488amlts_parse_create_tspad (AmlTSParse2 * parse, const gchar * pad_name)
489{
490 GstPad *pad;
491 AmlTSParsePad *tspad;
492
493 pad = gst_pad_new_from_static_template (&program_template, pad_name);
494 gst_pad_set_query_function (pad,
495 GST_DEBUG_FUNCPTR (amlts_parse_src_pad_query));
496
497 /* create our wrapper */
498 tspad = g_new0 (AmlTSParsePad, 1);
499 tspad->pad = pad;
500 tspad->program_number = -1;
501 tspad->program = NULL;
502 tspad->pushed = FALSE;
503 tspad->flow_return = GST_FLOW_NOT_LINKED;
504 tspad->ts_adapter.adapter = gst_adapter_new ();
505 tspad->ts_adapter.packets_in_adapter = 0;
506 tspad->ts_adapter.first_is_keyframe = TRUE;
507 gst_pad_set_element_private (pad, tspad);
508 gst_flow_combiner_add_pad (parse->flowcombiner, pad);
509
510 return tspad;
511}
512
513static void
514amlts_parse_destroy_tspad (AmlTSParse2 * parse, AmlTSParsePad * tspad)
515{
516 gst_adapter_clear (tspad->ts_adapter.adapter);
517 g_object_unref (tspad->ts_adapter.adapter);
518
519 /* free the wrapper */
520 g_free (tspad);
521}
522
523static void
524amlts_parse_pad_removed (GstElement * element, GstPad * pad)
525{
526 AmlTSParsePad *tspad;
527 AmlTSParse2 *parse = GST_AMLTS_PARSE (element);
528
529 if (gst_pad_get_direction (pad) == GST_PAD_SINK)
530 return;
531
532 tspad = (AmlTSParsePad *) gst_pad_get_element_private (pad);
533 if (tspad) {
534 amlts_parse_destroy_tspad (parse, tspad);
535
536 parse->srcpads = g_list_remove_all (parse->srcpads, pad);
537 }
538
539 if (GST_ELEMENT_CLASS (parent_class)->pad_removed)
540 GST_ELEMENT_CLASS (parent_class)->pad_removed (element, pad);
541}
542
543static GstPad *
544amlts_parse_request_new_pad (GstElement * element, GstPadTemplate * template,
545 const gchar * padname, const GstCaps * caps)
546{
547 AmlTSParse2 *parse;
548 AmlTSParsePad *tspad;
549 AmlTSParseProgram *parseprogram;
550 GstPad *pad;
551 gint program_num = -1;
552 GstEvent *event;
553 gchar *stream_id;
554
555 g_return_val_if_fail (template != NULL, NULL);
556 g_return_val_if_fail (GST_IS_AMLTS_PARSE (element), NULL);
557 g_return_val_if_fail (padname != NULL, NULL);
558
559 sscanf (padname + 8, "%d", &program_num);
560
561 GST_DEBUG_OBJECT (element, "padname:%s, program:%d", padname, program_num);
562
563 parse = GST_AMLTS_PARSE (element);
564
565 tspad = amlts_parse_create_tspad (parse, padname);
566 tspad->program_number = program_num;
567
568 /* Find if the program is already active */
569 parseprogram =
570 (AmlTSParseProgram *) amlts_base_get_program (GST_AMLTS_BASE (parse),
571 program_num);
572 if (parseprogram) {
573 tspad->program = parseprogram;
574 parseprogram->tspad = tspad;
575 }
576
577 pad = tspad->pad;
578 parse->srcpads = g_list_append (parse->srcpads, pad);
579
580 gst_pad_set_active (pad, TRUE);
581
582 stream_id = gst_pad_create_stream_id (pad, element, padname + 8);
583
584 event =
585 gst_pad_get_sticky_event (parse->parent.sinkpad, GST_EVENT_STREAM_START,
586 0);
587 if (event) {
588 if (gst_event_parse_group_id (event, &parse->group_id))
589 parse->have_group_id = TRUE;
590 else
591 parse->have_group_id = FALSE;
592 gst_event_unref (event);
593 } else if (!parse->have_group_id) {
594 parse->have_group_id = TRUE;
595 parse->group_id = gst_util_group_id_next ();
596 }
597 event = gst_event_new_stream_start (stream_id);
598 if (parse->have_group_id)
599 gst_event_set_group_id (event, parse->group_id);
600
601 gst_pad_push_event (pad, event);
602 g_free (stream_id);
603
604 gst_element_add_pad (element, pad);
605
606 return pad;
607}
608
609static GstBuffer *
610amlts_packet_to_buffer (AmlTSPacketizerPacket * packet)
611{
612 GstBuffer *buf =
613 gst_buffer_new_and_alloc (packet->data_end - packet->data_start);
614 gst_buffer_fill (buf, 0, packet->data_start,
615 packet->data_end - packet->data_start);
616 return buf;
617}
618
619static void
620amlts_parse_release_pad (GstElement * element, GstPad * pad)
621{
622 AmlTSParse2 *parse = (AmlTSParse2 *) element;
623
624 gst_pad_set_active (pad, FALSE);
625 /* we do the cleanup in GstElement::pad-removed */
626 gst_flow_combiner_remove_pad (parse->flowcombiner, pad);
627 gst_element_remove_pad (element, pad);
628}
629
630static GstFlowReturn
bo.xiaoff2de692024-08-07 14:38:55 +0800631aml_empty_adapter_into_pad (AmlTSParse2 * parse, AmlTSParse2Adapter * ts_adapter,
hanghang.luofa7b16f2024-05-31 14:44:31 +0800632 GstPad * pad)
633{
634 GstAdapter *adapter = ts_adapter->adapter;
635 GstBuffer *buf = NULL;
636 guint64 pts_dist, dts_dist;
637 GstClockTime pts, dts;
638 gsize avail = gst_adapter_available (adapter);
639 GstFlowReturn ret = GST_FLOW_OK;
640 gsize offset;
641
642 if (avail > 0)
643 buf = gst_adapter_take_buffer (adapter, avail);
644 /* Find the previous PTS/DTS. We also handle un-aligned input since want to
645 * use the most recent PTS/DTS if present */
646 offset = MIN (GST_AMLTS_BASE (parse)->packetizer->packet_size, 188);
647 pts = gst_adapter_prev_pts_at_offset (adapter, offset, &pts_dist);
648 dts = gst_adapter_prev_dts_at_offset (adapter, offset, &dts_dist);
649
650 GST_LOG_OBJECT (pad,
651 "prev pts:%" GST_TIME_FORMAT " (dist:%" G_GUINT64_FORMAT ") dts:%"
652 GST_TIME_FORMAT " (dist:%" G_GUINT64_FORMAT ")",
653 GST_TIME_ARGS (pts), pts_dist, GST_TIME_ARGS (dts), dts_dist);
654
655 ts_adapter->packets_in_adapter = 0;
656
657 if (buf) {
658 GST_BUFFER_PTS (buf) = pts;
659 GST_BUFFER_DTS (buf) = dts;
660 if (!ts_adapter->first_is_keyframe)
661 gst_buffer_set_flags (buf, GST_BUFFER_FLAG_DELTA_UNIT);
662 ret = gst_pad_push (pad, buf);
663 }
664
665 return ret;
666}
667
668static GstFlowReturn
669enqueue_and_maybe_push_buffer (AmlTSParse2 * parse, GstPad * pad,
670 AmlTSParse2Adapter * ts_adapter, GstBuffer * buffer)
671{
672 GstFlowReturn ret = GST_FLOW_OK;
673
674 if (buffer != NULL) {
675 if (parse->alignment == 1) {
676 ret = gst_pad_push (pad, buffer);
677 ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
678 } else {
679 if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)
680 && parse->split_on_rai) {
bo.xiaoff2de692024-08-07 14:38:55 +0800681 ret = aml_empty_adapter_into_pad (parse, ts_adapter, pad);
hanghang.luofa7b16f2024-05-31 14:44:31 +0800682 ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
683 }
684 gst_adapter_push (ts_adapter->adapter, buffer);
685 ts_adapter->packets_in_adapter++;
686 if (ts_adapter->packets_in_adapter == 1 && parse->split_on_rai) {
687 ts_adapter->first_is_keyframe =
688 !GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
689 }
690
691 if (ts_adapter->packets_in_adapter == parse->alignment
692 && ts_adapter->packets_in_adapter > 0) {
bo.xiaoff2de692024-08-07 14:38:55 +0800693 ret = aml_empty_adapter_into_pad (parse, ts_adapter, pad);
hanghang.luofa7b16f2024-05-31 14:44:31 +0800694 ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
695 }
696 }
697 }
698
699 return ret;
700}
701
702static GstFlowReturn
703amlts_parse_tspad_push_section (AmlTSParse2 * parse, AmlTSParsePad * tspad,
704 GstMpegtsSection * section, AmlTSPacketizerPacket * packet,
705 GstBuffer * buf)
706{
707 GstFlowReturn ret = GST_FLOW_OK;
708 gboolean to_push = TRUE;
709
710 if (tspad->program_number != -1) {
711 if (tspad->program) {
712 /* we push all sections to all pads except PMTs which we
713 * only push to pads meant to receive that program number */
714 if (section->table_id == 0x02) {
715 /* PMT */
716 if (section->subtable_extension != tspad->program_number)
717 to_push = FALSE;
718 }
719 } else if (section->table_id != 0x00) {
720 /* there's a program filter on the pad but the PMT for the program has not
721 * been parsed yet, ignore the pad until we get a PMT.
722 * But we always allow PAT to go through */
723 to_push = FALSE;
724 }
725 }
726
727 GST_DEBUG_OBJECT (parse,
728 "pushing section: %d program number: %d table_id: %d", to_push,
729 tspad->program_number, section->table_id);
730
731 if (to_push) {
732 ret =
733 enqueue_and_maybe_push_buffer (parse, tspad->pad,
734 &tspad->ts_adapter, gst_buffer_ref (buf));
735 }
736
737 GST_LOG_OBJECT (parse, "Returning %s", gst_flow_get_name (ret));
738 return ret;
739}
740
741static GstFlowReturn
742amlts_parse_tspad_push (AmlTSParse2 * parse, AmlTSParsePad * tspad,
743 AmlTSPacketizerPacket * packet, GstBuffer * buf)
744{
745 GstFlowReturn ret = GST_FLOW_OK;
746 AmlTSBaseProgram *bp = NULL;
747
748 if (tspad->program_number != -1) {
749 if (tspad->program)
750 bp = (AmlTSBaseProgram *) tspad->program;
751 else
752 bp = amlts_base_get_program ((AmlTSBase *) parse,
753 tspad->program_number);
754 }
755
756 if (bp) {
757 if (packet->pid == bp->pmt_pid || bp->streams == NULL
758 || bp->streams[packet->pid]) {
759 /* push if there's no filter or if the pid is in the filter */
760 ret = gst_pad_push (tspad->pad, gst_buffer_ref (buf));
761 ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
762 }
763 }
764 GST_DEBUG_OBJECT (parse, "Returning %s", gst_flow_get_name (ret));
765
766 return ret;
767}
768
769static void
bo.xiaoff2de692024-08-07 14:38:55 +0800770aml_pad_clear_for_push (GstPad * pad, AmlTSParse2 * parse)
hanghang.luofa7b16f2024-05-31 14:44:31 +0800771{
772 AmlTSParsePad *tspad = (AmlTSParsePad *) gst_pad_get_element_private (pad);
773
774 tspad->flow_return = GST_FLOW_NOT_LINKED;
775 tspad->pushed = FALSE;
776}
777
778static GstFlowReturn
779amlts_parse_push (AmlTSBase * base, AmlTSPacketizerPacket * packet,
780 GstMpegtsSection * section)
781{
782 AmlTSParse2 *parse = (AmlTSParse2 *) base;
783 guint32 pads_cookie;
784 gboolean done = FALSE;
785 GstPad *pad = NULL;
786 AmlTSParsePad *tspad;
787 GstFlowReturn ret;
788 GList *srcpads;
789 GstBuffer *buf;
790
791 GST_OBJECT_LOCK (parse);
792 srcpads = parse->srcpads;
793
794 /* clear tspad->pushed on pads */
bo.xiaoff2de692024-08-07 14:38:55 +0800795 g_list_foreach (srcpads, (GFunc) aml_pad_clear_for_push, parse);
hanghang.luofa7b16f2024-05-31 14:44:31 +0800796 if (srcpads)
797 ret = GST_FLOW_NOT_LINKED;
798 else
799 ret = GST_FLOW_OK;
800
801 /* Get cookie and source pads list */
802 pads_cookie = GST_ELEMENT_CAST (parse)->pads_cookie;
803 if (G_LIKELY (srcpads)) {
804 pad = GST_PAD_CAST (srcpads->data);
805 g_object_ref (pad);
806 }
807 GST_OBJECT_UNLOCK (parse);
808
809 buf = amlts_packet_to_buffer (packet);
810 if (parse->split_on_rai
811 && !(packet->afc_flags & MPEGTS_AFC_RANDOM_ACCESS_FLAG)) {
812 gst_buffer_set_flags (buf, GST_BUFFER_FLAG_DELTA_UNIT);
813 }
814
815 /* Copy over input PTS/DTS (if present) */
816 GST_BUFFER_DTS (buf) = base->packetizer->last_dts;
817 GST_BUFFER_PTS (buf) = base->packetizer->last_pts;
818 ret = amlts_parse_have_buffer (base, gst_buffer_ref (buf));
819
820 while (pad && !done) {
821 tspad = gst_pad_get_element_private (pad);
822
823 if (G_LIKELY (!tspad->pushed)) {
824 if (section) {
825 tspad->flow_return =
826 amlts_parse_tspad_push_section (parse, tspad, section, packet,
827 buf);
828 } else {
829 tspad->flow_return =
830 amlts_parse_tspad_push (parse, tspad, packet, buf);
831 }
832 tspad->pushed = TRUE;
833
834 if (G_UNLIKELY (tspad->flow_return != GST_FLOW_OK
835 && tspad->flow_return != GST_FLOW_NOT_LINKED)) {
836 /* return the error upstream */
837 ret = tspad->flow_return;
838 done = TRUE;
839 }
840
841 }
842
843 if (ret == GST_FLOW_NOT_LINKED)
844 ret = tspad->flow_return;
845
846 g_object_unref (pad);
847
848 if (G_UNLIKELY (!done)) {
849 GST_OBJECT_LOCK (parse);
850 if (G_UNLIKELY (pads_cookie != GST_ELEMENT_CAST (parse)->pads_cookie)) {
851 /* resync */
852 GST_DEBUG ("resync");
853 pads_cookie = GST_ELEMENT_CAST (parse)->pads_cookie;
854 srcpads = parse->srcpads;
855 } else {
856 GST_DEBUG ("getting next pad");
857 /* Get next pad */
858 srcpads = g_list_next (srcpads);
859 }
860
861 if (srcpads) {
862 pad = GST_PAD_CAST (srcpads->data);
863 g_object_ref (pad);
864 } else
865 done = TRUE;
866 GST_OBJECT_UNLOCK (parse);
867 }
868 }
869
870 gst_buffer_unref (buf);
871 return ret;
872}
873
874static void
875amlts_parse_inspect_packet (AmlTSBase * base, AmlTSPacketizerPacket * packet)
876{
877 AmlTSParse2 *parse = GST_AMLTS_PARSE (base);
878 GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p PCR %"
879 G_GUINT64_FORMAT, packet->pid, packet->payload_unit_start_indicator,
880 packet->scram_afc_cc & 0x30,
881 FLAGS_CONTINUITY_COUNTER (packet->scram_afc_cc), packet->payload,
882 packet->pcr);
883
884 /* Store the PCR if desired */
885 if (parse->current_pcr == GST_CLOCK_TIME_NONE &&
886 packet->afc_flags & MPEGTS_AFC_PCR_FLAG) {
887 /* Take this as the pcr_pid if set to auto-select */
888 if (parse->pcr_pid == -1)
889 parse->pcr_pid = packet->pid;
890 /* Check the PCR-PID matches the program we want for multiple programs */
891 if (parse->pcr_pid == packet->pid) {
892 parse->current_pcr = amlts_packetizer_pts_to_ts (base->packetizer,
893 PCRTIME_TO_GSTTIME (packet->pcr), parse->pcr_pid);
894 GST_DEBUG ("Got new PCR %" GST_TIME_FORMAT " raw %" G_GUINT64_FORMAT,
895 GST_TIME_ARGS (parse->current_pcr), packet->pcr);
896 if (parse->base_pcr == GST_CLOCK_TIME_NONE) {
897 parse->base_pcr = parse->current_pcr;
898 }
899 }
900 }
901}
902
903static GstClockTime
bo.xiaoff2de692024-08-07 14:38:55 +0800904aml_get_pending_timestamp_diff (AmlTSParse2 * parse)
hanghang.luofa7b16f2024-05-31 14:44:31 +0800905{
906 GList *l;
907 GstClockTime first_ts, last_ts;
908
909 if (parse->pending_buffers == NULL)
910 return GST_CLOCK_TIME_NONE;
911
912 l = g_list_last (parse->pending_buffers);
913 first_ts = GST_BUFFER_PTS (l->data);
914 if (first_ts == GST_CLOCK_TIME_NONE)
915 return GST_CLOCK_TIME_NONE;
916
917 l = g_list_first (parse->pending_buffers);
918 last_ts = GST_BUFFER_PTS (l->data);
919 if (last_ts == GST_CLOCK_TIME_NONE)
920 return GST_CLOCK_TIME_NONE;
921
922 return last_ts - first_ts;
923}
924
925static GstFlowReturn
bo.xiaoff2de692024-08-07 14:38:55 +0800926aml_drain_pending_buffers (AmlTSParse2 * parse, gboolean drain_all)
hanghang.luofa7b16f2024-05-31 14:44:31 +0800927{
928 GstFlowReturn ret = GST_FLOW_OK;
929 GstClockTime start_ts;
930 GstClockTime pcr = GST_CLOCK_TIME_NONE;
931 GstClockTime pcr_diff = 0;
932 gsize pcr_bytes, bytes_since_pcr, pos;
933 GstBuffer *buffer;
934 GList *l, *end = NULL;
935
936 if (parse->pending_buffers == NULL)
937 return GST_FLOW_OK; /* Nothing to push */
938
939 /*
940 * There are 4 cases:
941 * 1 We get a buffer with no PCR -> it's the head of the list
942 * -> Do nothing, unless it's EOS
943 * 2 We get a buffer with a PCR, it's the first PCR we've seen, and belongs
944 * to the buffer at the head of the list
945 * -> Push any buffers in the list except the head,
946 * using a smoothing of their timestamps to land at the PCR
947 * -> store new PCR as the previous PCR, bytes_since_pcr = sizeof (buffer);
948 * 3 It's EOS (drain_all == TRUE, current_pcr == NONE)
949 * -> Push any buffers in the list using a smoothing of their timestamps
950 * starting at the previous PCR or first TS
951 * 4 We get a buffer with a PCR, and have a previous PCR
952 * -> If distance > smoothing_latency,
953 * output buffers except the last in the pending queue using
954 * piecewise-linear timestamps
955 * -> store new PCR as the previous PCR, bytes_since_pcr = sizeof (buffer);
956 */
957
958 /* Case 1 */
959 if (!GST_CLOCK_TIME_IS_VALID (parse->current_pcr) && !drain_all)
960 return GST_FLOW_OK;
961
962 if (GST_CLOCK_TIME_IS_VALID (parse->current_pcr)) {
963 pcr = parse->current_pcr;
964 parse->current_pcr = GST_CLOCK_TIME_NONE;
965 }
966
967 /* The bytes of the last buffer are after the PCR */
968 buffer = GST_BUFFER (g_list_nth_data (parse->pending_buffers, 0));
969 bytes_since_pcr = gst_buffer_get_size (buffer);
970
971 pcr_bytes = parse->bytes_since_pcr - bytes_since_pcr;
972
973 if (!drain_all)
974 end = g_list_first (parse->pending_buffers);
975
976 /* Case 2 */
977 if (!GST_CLOCK_TIME_IS_VALID (parse->previous_pcr)) {
bo.xiaoff2de692024-08-07 14:38:55 +0800978 pcr_diff = aml_get_pending_timestamp_diff (parse);
hanghang.luofa7b16f2024-05-31 14:44:31 +0800979
980 /* Calculate the start_ts that ends at the end timestamp */
981 start_ts = GST_CLOCK_TIME_NONE;
982 if (end) {
983 start_ts = GST_BUFFER_PTS (GST_BUFFER (end->data));
984 if (start_ts > pcr_diff)
985 start_ts -= pcr_diff;
986 }
987 } else if (drain_all) { /* Case 3 */
988 start_ts = parse->previous_pcr;
bo.xiaoff2de692024-08-07 14:38:55 +0800989 pcr_diff = aml_get_pending_timestamp_diff (parse);
hanghang.luofa7b16f2024-05-31 14:44:31 +0800990 } else { /* Case 4 */
991 start_ts = parse->previous_pcr;
992 if (GST_CLOCK_TIME_IS_VALID (pcr) && pcr > start_ts)
993 pcr_diff = GST_CLOCK_DIFF (start_ts, pcr);
994
995 /* Make sure PCR observations are sufficiently far apart */
996 if (drain_all == FALSE && pcr_diff < parse->smoothing_latency)
997 return GST_FLOW_OK;
998 }
999
1000 GST_INFO_OBJECT (parse, "Pushing buffers - startTS %" GST_TIME_FORMAT
1001 " duration %" GST_TIME_FORMAT " %" G_GSIZE_FORMAT " bytes",
1002 GST_TIME_ARGS (start_ts), GST_TIME_ARGS (pcr_diff), pcr_bytes);
1003
1004 /* Now, push buffers out pacing timestamps over pcr_diff time and pcr_bytes */
1005 pos = 0;
1006 l = g_list_last (parse->pending_buffers);
1007 while (l != end) {
1008 GList *p;
1009 GstClockTime out_ts = start_ts;
1010
1011 buffer = gst_buffer_make_writable (GST_BUFFER (l->data));
1012
1013 if (out_ts != GST_CLOCK_TIME_NONE && pcr_diff != GST_CLOCK_TIME_NONE &&
1014 pcr_bytes && pos)
1015 out_ts += gst_util_uint64_scale (pcr_diff, pos, pcr_bytes);
1016
1017 pos += gst_buffer_get_size (buffer);
1018
1019 GST_DEBUG_OBJECT (parse,
1020 "InputTS %" GST_TIME_FORMAT " out %" GST_TIME_FORMAT,
1021 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)), GST_TIME_ARGS (out_ts));
1022
1023 GST_BUFFER_PTS (buffer) = out_ts + parse->ts_offset;
1024 GST_BUFFER_DTS (buffer) = out_ts + parse->ts_offset;
1025 if (ret == GST_FLOW_OK) {
1026 ret =
1027 enqueue_and_maybe_push_buffer (parse, parse->srcpad,
1028 &parse->ts_adapter, buffer);
1029 } else {
1030 gst_buffer_unref (buffer);
1031 }
1032
1033 /* Free this list node and move to the next */
1034 p = g_list_previous (l);
1035 parse->pending_buffers = g_list_delete_link (parse->pending_buffers, l);
1036 l = p;
1037 }
1038
1039 if (parse->is_eos) {
bo.xiaoff2de692024-08-07 14:38:55 +08001040 aml_empty_adapter_into_pad (parse, &parse->ts_adapter, parse->srcpad);
hanghang.luofa7b16f2024-05-31 14:44:31 +08001041 }
1042
1043 parse->pending_buffers = end;
1044 parse->bytes_since_pcr = bytes_since_pcr;
1045 parse->previous_pcr = pcr;
1046 return ret;
1047}
1048
1049static GstFlowReturn
1050amlts_parse_have_buffer (AmlTSBase * base, GstBuffer * buffer)
1051{
1052 AmlTSParse2 *parse = GST_AMLTS_PARSE (base);
1053 GstFlowReturn ret = GST_FLOW_OK;
1054
1055 GST_LOG_OBJECT (parse, "Received buffer %" GST_PTR_FORMAT, buffer);
1056
1057 /* Assume all packets have equal size */
1058 if (parse->alignment > 0 &&
1059 base->packetizer->packet_size != MPEGTS_NORMAL_PACKETSIZE) {
1060 GstMapInfo map;
1061 guint8 *data;
1062
1063 gst_buffer_map (buffer, &map, GST_MAP_READ);
1064 data = map.data;
1065
1066 parse->header = GST_READ_UINT32_BE (data);
1067 gst_buffer_unmap (buffer, &map);
1068 }
1069
1070 if (parse->current_pcr != GST_CLOCK_TIME_NONE) {
1071 GST_DEBUG_OBJECT (parse,
1072 "InputTS %" GST_TIME_FORMAT " PCR %" GST_TIME_FORMAT,
1073 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)),
1074 GST_TIME_ARGS (parse->current_pcr));
1075 }
1076
1077 if (parse->set_timestamps || parse->first) {
1078 parse->pending_buffers = g_list_prepend (parse->pending_buffers, buffer);
1079 parse->bytes_since_pcr += gst_buffer_get_size (buffer);
1080 buffer = NULL;
1081 }
1082
bo.xiaoff2de692024-08-07 14:38:55 +08001083 if (!aml_prepare_src_pad (base, parse))
hanghang.luofa7b16f2024-05-31 14:44:31 +08001084 return GST_FLOW_OK;
1085
1086 if (parse->pending_buffers != NULL) {
1087 /* Don't keep pending_buffers if not setting output timestamps */
1088 gboolean drain_all = (parse->set_timestamps == FALSE);
bo.xiaoff2de692024-08-07 14:38:55 +08001089 ret = aml_drain_pending_buffers (parse, drain_all);
hanghang.luofa7b16f2024-05-31 14:44:31 +08001090 if (ret != GST_FLOW_OK) {
1091 if (buffer)
1092 gst_buffer_unref (buffer);
1093 return ret;
1094 }
1095 }
1096
1097 ret =
1098 enqueue_and_maybe_push_buffer (parse, parse->srcpad,
1099 &parse->ts_adapter, buffer);
1100 return ret;
1101}
1102
1103static void
bo.xiaoff2de692024-08-07 14:38:55 +08001104aml_empty_pad (GstPad * pad, AmlTSParse2 * parse)
hanghang.luofa7b16f2024-05-31 14:44:31 +08001105{
1106 AmlTSParsePad *tspad = (AmlTSParsePad *) gst_pad_get_element_private (pad);
1107 GstFlowReturn ret;
bo.xiaoff2de692024-08-07 14:38:55 +08001108 ret = aml_empty_adapter_into_pad (parse, &tspad->ts_adapter, tspad->pad);
hanghang.luofa7b16f2024-05-31 14:44:31 +08001109 ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
1110}
1111
1112static GstFlowReturn
1113amlts_parse_input_done (AmlTSBase * base)
1114{
1115 AmlTSParse2 *parse = GST_AMLTS_PARSE (base);
1116 GstFlowReturn ret = GST_FLOW_OK;
1117
bo.xiaoff2de692024-08-07 14:38:55 +08001118 if (!aml_prepare_src_pad (base, parse))
hanghang.luofa7b16f2024-05-31 14:44:31 +08001119 return GST_FLOW_OK;
1120
1121 if (parse->alignment == 0) {
bo.xiaoff2de692024-08-07 14:38:55 +08001122 ret = aml_empty_adapter_into_pad (parse, &parse->ts_adapter, parse->srcpad);
hanghang.luofa7b16f2024-05-31 14:44:31 +08001123 ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
bo.xiaoff2de692024-08-07 14:38:55 +08001124 g_list_foreach (parse->srcpads, (GFunc) aml_empty_pad, parse);
hanghang.luofa7b16f2024-05-31 14:44:31 +08001125 }
1126 return ret;
1127}
1128
1129static AmlTSParsePad *
bo.xiaoff2de692024-08-07 14:38:55 +08001130aml_find_pad_for_program (AmlTSParse2 * parse, guint program_number)
hanghang.luofa7b16f2024-05-31 14:44:31 +08001131{
1132 GList *tmp;
1133
1134 for (tmp = parse->srcpads; tmp; tmp = tmp->next) {
1135 AmlTSParsePad *tspad = gst_pad_get_element_private ((GstPad *) tmp->data);
1136
1137 if (tspad->program_number == program_number)
1138 return tspad;
1139 }
1140
1141 return NULL;
1142}
1143
1144static void
1145amlts_parse_program_started (AmlTSBase * base, AmlTSBaseProgram * program)
1146{
1147 AmlTSParse2 *parse = GST_AMLTS_PARSE (base);
1148 AmlTSParseProgram *parseprogram = (AmlTSParseProgram *) program;
1149 AmlTSParsePad *tspad;
1150
1151 /* If we have a request pad for that program, activate it */
bo.xiaoff2de692024-08-07 14:38:55 +08001152 tspad = aml_find_pad_for_program (parse, program->program_number);
hanghang.luofa7b16f2024-05-31 14:44:31 +08001153
1154 if (tspad) {
1155 tspad->program = parseprogram;
1156 parseprogram->tspad = tspad;
1157 }
1158}
1159
1160static void
1161amlts_parse_program_stopped (AmlTSBase * base, AmlTSBaseProgram * program)
1162{
1163 AmlTSParse2 *parse = GST_AMLTS_PARSE (base);
1164 AmlTSParseProgram *parseprogram = (AmlTSParseProgram *) program;
1165 AmlTSParsePad *tspad;
1166
1167 /* If we have a request pad for that program, activate it */
bo.xiaoff2de692024-08-07 14:38:55 +08001168 tspad = aml_find_pad_for_program (parse, program->program_number);
hanghang.luofa7b16f2024-05-31 14:44:31 +08001169
1170 if (tspad) {
1171 tspad->program = NULL;
1172 parseprogram->tspad = NULL;
1173 }
1174
1175 parse->pcr_pid = -1;
1176 parse->ts_offset += parse->current_pcr - parse->base_pcr;
1177 parse->base_pcr = GST_CLOCK_TIME_NONE;
1178}
1179
1180static gboolean
1181amlts_parse_src_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
1182{
1183 AmlTSParse2 *parse = GST_AMLTS_PARSE (parent);
1184 gboolean res;
1185
1186 switch (GST_QUERY_TYPE (query)) {
1187 case GST_QUERY_LATENCY:
1188 {
1189 if ((res = gst_pad_peer_query (((AmlTSBase *) parse)->sinkpad, query))) {
1190 gboolean is_live;
1191 GstClockTime min_latency, max_latency;
1192
1193 gst_query_parse_latency (query, &is_live, &min_latency, &max_latency);
1194 if (is_live) {
1195 GstClockTime extra_latency = TS_LATENCY * GST_MSECOND;
1196 if (parse->set_timestamps) {
1197 extra_latency = MAX (extra_latency, parse->smoothing_latency);
1198 }
1199 min_latency += extra_latency;
1200 if (max_latency != GST_CLOCK_TIME_NONE)
1201 max_latency += extra_latency;
1202 }
1203
1204 gst_query_set_latency (query, is_live, min_latency, max_latency);
1205 }
1206 break;
1207 }
1208 default:
1209 res = gst_pad_query_default (pad, parent, query);
1210 }
1211 return res;
1212}
1213
1214gboolean
1215gst_amltsparse_plugin_init (GstPlugin * plugin)
1216{
1217 GST_DEBUG_CATEGORY_INIT (mpegts_parse_debug, "amltsparse", 0,
1218 "MPEG transport stream parser");
1219
1220 return gst_element_register (plugin, "amltsparse",
1221 GST_RANK_NONE, GST_TYPE_AMLTS_PARSE);
1222}
1223