21 #include <sys/types.h>
25 #include <gst/base/gstbasesrc.h>
26 #include <MQTTAsync.h>
32 GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
34 #define gst_mqtt_src_parent_class parent_class
38 #define GST_CAT_DEFAULT gst_mqtt_src_debug
74 "$HOSTNAME_$PID_^[0-9][0-9]?$|^255$";
80 const GValue * value, GParamSpec * pspec);
83 GValue * value, GParamSpec * pspec);
86 static GstStateChangeReturn
96 GstClockTime * start, GstClockTime * end);
129 int topic_len, MQTTAsync_message * message);
131 MQTTAsync_successData * response);
133 MQTTAsync_failureData * response);
135 MQTTAsync_successData * response);
137 MQTTAsync_failureData * response);
139 MQTTAsync_successData * response);
141 MQTTAsync_failureData * response);
146 GstMemory ** hdr_mem, GstMapInfo * hdr_map_info);
155 static inline gboolean
158 if (!GST_BUFFER_PTS_IS_VALID (buf) && !GST_BUFFER_DTS_IS_VALID (buf) &&
159 !GST_BUFFER_DURATION_IS_VALID (buf))
171 MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
172 MQTTAsync_responseOptions respn_opts = MQTTAsync_responseOptions_initializer;
173 GstBaseSrc *basesrc = GST_BASE_SRC (
self);
177 gst_base_src_set_format (basesrc, GST_FORMAT_TIME);
178 gst_base_src_set_async (basesrc,
FALSE);
181 self->mqtt_client_handle = NULL;
187 self->mqtt_topic = NULL;
189 self->mqtt_conn_opts = conn_opts;
194 self->mqtt_conn_opts.context =
self;
195 self->mqtt_respn_opts = respn_opts;
196 self->mqtt_respn_opts.onSuccess = NULL;
197 self->mqtt_respn_opts.onFailure = NULL;
198 self->mqtt_respn_opts.context =
self;
203 self->aqueue = g_async_queue_new ();
204 g_cond_init (&self->mqtt_src_gcond);
205 g_mutex_init (&self->mqtt_src_mutex);
206 g_mutex_lock (&self->mqtt_src_mutex);
207 self->is_connected =
FALSE;
208 self->is_subscribed =
FALSE;
209 self->latency = GST_CLOCK_TIME_NONE;
210 g_mutex_unlock (&self->mqtt_src_mutex);
211 self->base_time_epoch = GST_CLOCK_TIME_NONE;
213 self->num_dumped = 0;
215 gst_base_src_set_live (basesrc, self->is_live);
224 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
225 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
226 GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
235 g_object_class_install_property (gobject_class,
PROP_DEBUG,
236 g_param_spec_boolean (
"debug",
"Debug",
237 "Produce extra verbose output for debug purpose",
DEFAULT_DEBUG,
238 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
240 g_object_class_install_property (gobject_class,
PROP_IS_LIVE,
241 g_param_spec_boolean (
"is-live",
"Is Live",
242 "Synchronize the incoming buffers' timestamp with the current running time",
246 g_param_spec_string (
"client-id",
"Client ID",
247 "The client identifier passed to the server (broker)",
251 g_param_spec_string (
"host",
"Host",
"Host (broker) to connect to",
253 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
256 g_param_spec_string (
"port",
"Port",
258 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
261 g_param_spec_int64 (
"sub-timeout",
"Timeout for receiving a message",
262 "The timeout (in microseconds) for receiving a message from subscribed topic",
264 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
267 g_param_spec_string (
"sub-topic",
"Topic to Subscribe (mandatory)",
268 "The topic's name to subscribe", NULL,
269 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
272 g_param_spec_boolean (
"cleansession",
"Cleansession",
273 "When it is TRUE, the state information is discarded at connect and disconnect.",
275 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
277 g_object_class_install_property (gobject_class,
279 g_param_spec_int (
"keep-alive-interval",
"Keep Alive Interval",
280 "The maximum time (in seconds) that should pass without communication between the client and the server (broker)",
282 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
284 g_object_class_install_property (gobject_class,
PROP_MQTT_QOS,
285 g_param_spec_int (
"mqtt-qos",
"mqtt QoS level",
286 "The QoS level of MQTT.\n"
287 "\t\t\t 0: At most once\n"
288 "\t\t\t 1: At least once\n"
289 "\t\t\t 2: Exactly once\n"
290 "\t\t\tsee also: https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/qos.html",
293 gstelement_class->change_state =
304 gst_element_class_set_static_metadata (gstelement_class,
305 "MQTT source",
"Source/MQTT",
306 "Subscribe a MQTT topic and push incoming data to the GStreamer pipeline",
307 "Wook Song <wook16.song@samsung.com>");
308 gst_element_class_add_static_pad_template (gstelement_class,
317 const GValue * value, GParamSpec * pspec)
353 G_OBJECT_WARN_INVALID_PROPERTY_ID (
object, prop_id, pspec);
363 GValue * value, GParamSpec * pspec)
399 G_OBJECT_WARN_INVALID_PROPERTY_ID (
object, prop_id, pspec);
413 if (self->mqtt_client_handle) {
414 MQTTAsync_destroy (&self->mqtt_client_handle);
415 self->mqtt_client_handle = NULL;
418 g_free (self->mqtt_client_id);
419 g_free (self->mqtt_host_address);
420 g_free (self->mqtt_host_port);
421 g_free (self->mqtt_topic);
422 gst_caps_replace (&self->caps, NULL);
425 g_error_free (self->err);
427 while ((remained = g_async_queue_try_pop (self->aqueue))) {
428 gst_buffer_unref (remained);
430 g_clear_pointer (&self->aqueue, g_async_queue_unref);
432 g_mutex_clear (&self->mqtt_src_mutex);
433 G_OBJECT_CLASS (parent_class)->finalize (
object);
439 static GstStateChangeReturn
442 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
444 gboolean no_preroll =
FALSE;
445 GstClock *elem_clock;
446 GstClockTime base_time;
447 GstClockTime cur_time;
448 GstClockTimeDiff diff;
450 switch (transition) {
451 case GST_STATE_CHANGE_NULL_TO_READY:
452 GST_INFO_OBJECT (
self,
"GST_STATE_CHANGE_NULL_TO_READY");
454 g_printerr (
"%s: %s\n", g_quark_to_string (self->err->domain),
456 return GST_STATE_CHANGE_FAILURE;
459 case GST_STATE_CHANGE_READY_TO_PAUSED:
460 GST_INFO_OBJECT (
self,
"GST_STATE_CHANGE_READY_TO_PAUSED");
464 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
465 GST_INFO_OBJECT (
self,
"GST_STATE_CHANGE_PAUSED_TO_PLAYING");
466 self->base_time_epoch = GST_CLOCK_TIME_NONE;
467 elem_clock = gst_element_get_clock (element);
470 base_time = gst_element_get_base_time (element);
471 cur_time = gst_clock_get_time (elem_clock);
472 gst_object_unref (elem_clock);
473 diff = GST_CLOCK_DIFF (base_time, cur_time);
474 self->base_time_epoch =
478 if (GST_BASE_SRC_IS_STARTED (GST_BASE_SRC (
self)) &&
479 (self->is_connected ==
FALSE)) {
480 int conn = MQTTAsync_reconnect (self->mqtt_client_handle);
482 if (conn != MQTTASYNC_SUCCESS) {
483 GST_ERROR_OBJECT (
self,
"Failed to re-subscribe to %s",
486 return GST_STATE_CHANGE_FAILURE;
494 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
496 switch (transition) {
497 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
499 GST_ERROR_OBJECT (
self,
"Cannot unsubscribe to %s", self->mqtt_topic);
501 GST_INFO_OBJECT (
self,
"GST_STATE_CHANGE_PLAYING_TO_PAUSED");
503 case GST_STATE_CHANGE_PAUSED_TO_READY:
504 GST_INFO_OBJECT (
self,
"GST_STATE_CHANGE_PAUSED_TO_READY");
506 case GST_STATE_CHANGE_READY_TO_NULL:
507 GST_INFO_OBJECT (
self,
"GST_STATE_CHANGE_READY_TO_NULL");
512 if (no_preroll && ret == GST_STATE_CHANGE_SUCCESS)
513 ret = GST_STATE_CHANGE_NO_PREROLL;
525 gchar *haddr = g_strdup_printf (
"%s:%s", self->mqtt_host_address,
526 self->mqtt_host_port);
531 g_free (self->mqtt_client_id);
544 ret = MQTTAsync_create (&self->mqtt_client_handle, haddr,
545 self->mqtt_client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
547 if (ret != MQTTASYNC_SUCCESS)
550 MQTTAsync_setCallbacks (self->mqtt_client_handle,
self,
553 ret = MQTTAsync_connect (self->mqtt_client_handle, &self->mqtt_conn_opts);
554 if (ret != MQTTASYNC_SUCCESS)
558 end_time = g_get_monotonic_time () +
560 g_mutex_lock (&self->mqtt_src_mutex);
561 while (!self->is_connected) {
562 if (!g_cond_wait_until (&self->mqtt_src_gcond, &self->mqtt_src_mutex,
564 g_mutex_unlock (&self->mqtt_src_mutex);
565 g_critical (
"Failed to connect to MQTT broker from mqttsrc."
566 "Please check broker is running status or broker host address.");
570 g_mutex_unlock (&self->mqtt_src_mutex);
574 MQTTAsync_destroy (&self->mqtt_client_handle);
575 self->mqtt_client_handle = NULL;
588 MQTTAsync_disconnect (self->mqtt_client_handle, NULL);
589 g_mutex_lock (&self->mqtt_src_mutex);
590 self->is_connected =
FALSE;
591 g_mutex_unlock (&self->mqtt_src_mutex);
592 MQTTAsync_destroy (&self->mqtt_client_handle);
593 self->mqtt_client_handle = NULL;
603 GstPad *pad = GST_BASE_SRC_PAD (basesrc);
604 GstCaps *cur_caps = gst_pad_get_current_caps (pad);
605 GstCaps *caps = gst_caps_new_any ();
609 GstCaps *intersection =
610 gst_caps_intersect_full (cur_caps, caps, GST_CAPS_INTERSECT_FIRST);
612 gst_caps_unref (cur_caps);
613 gst_caps_unref (caps);
627 GstCaps *peercaps = NULL;
631 if (self->caps == NULL || gst_caps_is_any (self->caps))
634 thiscaps = gst_pad_get_current_caps (GST_BASE_SRC_PAD (basesrc));
635 if (thiscaps && gst_caps_is_equal (self->caps, thiscaps)) {
636 gst_caps_unref (thiscaps);
641 gst_caps_unref (thiscaps);
643 peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), self->caps);
644 if (gst_caps_is_empty (peercaps) || peercaps == self->caps) {
645 gst_caps_unref (peercaps);
649 if (gst_caps_is_any (peercaps)) {
652 peercaps = gst_caps_fixate (peercaps);
653 if (gst_caps_is_fixed (peercaps)) {
654 result = gst_base_src_set_caps (basesrc, peercaps);
658 gst_caps_unref (peercaps);
664 GST_DEBUG_OBJECT (
self,
"no negotiation needed");
675 GstClockTime * start, GstClockTime * end)
677 GstClockTime sync_ts;
678 GstClockTime duration;
681 sync_ts = GST_BUFFER_DTS (buffer);
682 duration = GST_BUFFER_DURATION (buffer);
684 if (!GST_CLOCK_TIME_IS_VALID (sync_ts))
685 sync_ts = GST_BUFFER_PTS (buffer);
687 if (GST_CLOCK_TIME_IS_VALID (sync_ts)) {
689 if (GST_CLOCK_TIME_IS_VALID (duration)) {
690 *end = sync_ts + duration;
714 gint64 elapsed =
self->mqtt_sub_timeout;
718 g_mutex_lock (&self->mqtt_src_mutex);
719 while ((!self->is_connected) || (!self->is_subscribed)) {
720 gint64 end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND;
722 g_cond_wait_until (&self->mqtt_src_gcond, &self->mqtt_src_mutex, end_time);
724 g_mutex_unlock (&self->mqtt_src_mutex);
728 g_mutex_unlock (&self->mqtt_src_mutex);
730 while (elapsed > 0) {
732 *buf = g_async_queue_timeout_pop (self->aqueue,
735 GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (
self));
736 GstClockTime ulatency = GST_CLOCK_TIME_NONE;
742 GST_DEBUG_OBJECT (
self,
743 "%s: Dumped the received buffer! (total: %" G_GUINT64_FORMAT
")",
744 self->mqtt_topic, ++self->num_dumped);
746 elapsed =
self->mqtt_sub_timeout;
747 gst_buffer_unref (*buf);
752 clock = gst_element_get_clock (GST_ELEMENT (
self));
754 GstClockTime cur_time = gst_clock_get_time (clock);
755 GstClockTime buf_ts = GST_BUFFER_TIMESTAMP (*buf);
756 GstClockTimeDiff latency = 0;
758 if ((base_time != GST_CLOCK_TIME_NONE) &&
759 (cur_time != GST_CLOCK_TIME_NONE) &&
760 (buf_ts != GST_CLOCK_TIME_NONE)) {
761 GstClockTimeDiff now = GST_CLOCK_DIFF (base_time, cur_time);
763 latency = GST_CLOCK_DIFF (buf_ts, (GstClockTime) now);
767 ulatency = (GstClockTime) latency;
769 if (GST_BUFFER_DURATION_IS_VALID (*buf)) {
770 GstClockTime duration = GST_BUFFER_DURATION (*buf);
772 if (duration >= ulatency) {
773 ulatency = GST_CLOCK_TIME_NONE;
777 gst_object_unref (clock);
780 g_mutex_lock (&self->mqtt_src_mutex);
781 self->latency = ulatency;
782 g_mutex_unlock (&self->mqtt_src_mutex);
790 }
else if (self->err) {
799 self->err = g_error_new (self->gquark_err_tag, GST_FLOW_EOS,
800 "%s: Timeout for receiving a message has been expired. Regarding as an error",
809 g_printerr (
"%s: %s\n", g_quark_to_string (self->err->domain),
812 return GST_FLOW_ERROR;
821 GstQueryType
type = GST_QUERY_TYPE (query);
823 gboolean res =
FALSE;
826 GST_DEBUG_OBJECT (
self,
"Got %s event", gst_query_type_get_name (
type));
829 case GST_QUERY_LATENCY:{
830 GstClockTime min_latency = 0;
831 GstClockTime max_latency = GST_CLOCK_TIME_NONE;
833 g_mutex_lock (&self->mqtt_src_mutex);
834 if (self->latency != GST_CLOCK_TIME_NONE) {
835 min_latency =
self->latency;
837 g_mutex_unlock (&self->mqtt_src_mutex);
840 GST_DEBUG_OBJECT (
self,
841 "Reporting latency min %" GST_TIME_FORMAT
", max %" GST_TIME_FORMAT,
842 GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
848 gst_query_set_latency (query,
TRUE, min_latency, max_latency);
854 res = GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query);
885 return self->is_live;
894 self->is_live = flag;
895 gst_base_src_set_live (GST_BASE_SRC (
self), self->is_live);
904 return self->mqtt_client_id;
913 g_free (self->mqtt_client_id);
914 self->mqtt_client_id = g_strdup (
id);
923 return self->mqtt_host_address;
935 g_free (self->mqtt_host_address);
936 self->mqtt_host_address = g_strdup (addr);
945 return self->mqtt_host_port;
954 g_free (self->mqtt_host_port);
955 self->mqtt_host_port = g_strdup (port);
964 return self->mqtt_sub_timeout;
973 self->mqtt_sub_timeout = t;
982 return self->mqtt_topic;
991 g_free (self->mqtt_topic);
992 self->mqtt_topic = g_strdup (topic);
1001 return self->mqtt_conn_opts.cleansession;
1010 self->mqtt_conn_opts.cleansession = val;
1019 return self->mqtt_conn_opts.keepAliveInterval;
1028 self->mqtt_conn_opts.keepAliveInterval = num;
1037 return self->mqtt_qos;
1046 self->mqtt_qos = qos;
1058 g_mutex_lock (&self->mqtt_src_mutex);
1059 self->is_connected =
FALSE;
1060 self->is_subscribed =
FALSE;
1061 g_cond_broadcast (&self->mqtt_src_gcond);
1063 self->err = g_error_new (self->gquark_err_tag, EHOSTDOWN,
1064 "Connection to the host (broker) has been lost: %s \n"
1065 "\t\tfor detail, please check the log message of the broker",
1066 g_strerror (EHOSTDOWN));
1068 g_mutex_unlock (&self->mqtt_src_mutex);
1076 MQTTAsync_message * message)
1078 const int size = message->payloadlen;
1079 guint8 *
data = message->payload;
1081 GstMapInfo hdr_map_info;
1082 GstMemory *received_mem;
1085 GstBaseSrc *basesrc;
1095 g_mutex_lock (&self->mqtt_src_mutex);
1096 if (!self->is_subscribed) {
1097 g_mutex_unlock (&self->mqtt_src_mutex);
1101 g_mutex_unlock (&self->mqtt_src_mutex);
1103 basesrc = GST_BASE_SRC (
self);
1104 clock = gst_element_get_clock (GST_ELEMENT (
self));
1105 received_mem = gst_memory_new_wrapped (0,
data, size, 0, size, message,
1107 if (!received_mem) {
1109 self->err = g_error_new (self->gquark_err_tag, ENODATA,
1110 "%s: failed to wrap the raw data of received message in GstMemory: %s",
1111 __func__, g_strerror (ENODATA));
1118 if (!mqtt_msg_hdr) {
1120 self->err = g_error_new (self->gquark_err_tag, ENODATA,
1121 "%s: failed to extract header information from received message: %s",
1122 __func__, g_strerror (ENODATA));
1124 goto ret_unref_received_mem;
1127 recv_caps = gst_caps_from_string (mqtt_msg_hdr->
gst_caps_str);
1129 if (!self->caps || !gst_caps_is_equal (self->caps, recv_caps)) {
1130 gst_caps_replace (&self->caps, recv_caps);
1134 gst_caps_unref (recv_caps);
1137 buffer = gst_buffer_new ();
1139 for (i = 0; i < mqtt_msg_hdr->
num_mems; ++i) {
1140 GstMemory *each_memory;
1144 each_memory = gst_memory_share (received_mem, offset, each_size);
1145 gst_buffer_append_memory (buffer, each_memory);
1146 offset += each_size;
1151 GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (
self));
1154 GST_DEBUG_OBJECT (
self,
1155 "A message has been arrived at %" GST_TIME_FORMAT
1156 " and queue length is %d",
1157 GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
1158 g_async_queue_length (self->aqueue));
1160 gst_object_unref (clock);
1164 g_async_queue_push (self->aqueue, buffer);
1166 gst_memory_unmap (hdr_mem, &hdr_map_info);
1167 gst_memory_unref (hdr_mem);
1169 ret_unref_received_mem:
1170 gst_memory_unref (received_mem);
1181 MQTTAsync_message *msg = p;
1183 MQTTAsync_freeMessage (&msg);
1193 GstBaseSrc *basesrc = GST_BASE_SRC (
self);
1197 g_mutex_lock (&self->mqtt_src_mutex);
1198 self->is_connected =
TRUE;
1199 g_cond_broadcast (&self->mqtt_src_gcond);
1200 g_mutex_unlock (&self->mqtt_src_mutex);
1203 if (gst_base_src_is_async (basesrc) &&
1204 (ret = gst_base_src_start_wait (basesrc)) != GST_FLOW_OK) {
1205 g_mutex_lock (&self->mqtt_src_mutex);
1206 self->err = g_error_new (self->gquark_err_tag, ret,
1207 "%s: the virtual method, start (), in the GstBaseSrc class fails with return code %d",
1209 g_cond_broadcast (&self->mqtt_src_gcond);
1210 g_mutex_unlock (&self->mqtt_src_mutex);
1215 GST_ERROR_OBJECT (
self,
"Failed to subscribe to %s", self->mqtt_topic);
1227 g_mutex_lock (&self->mqtt_src_mutex);
1228 self->is_connected =
FALSE;
1231 self->err = g_error_new (self->gquark_err_tag, response->code,
1232 "%s: failed to connect to the broker: %s", __func__, response->message);
1234 g_cond_broadcast (&self->mqtt_src_gcond);
1235 g_mutex_unlock (&self->mqtt_src_mutex);
1247 g_mutex_lock (&self->mqtt_src_mutex);
1248 self->is_subscribed =
TRUE;
1249 g_cond_broadcast (&self->mqtt_src_gcond);
1250 g_mutex_unlock (&self->mqtt_src_mutex);
1261 g_mutex_lock (&self->mqtt_src_mutex);
1263 self->err = g_error_new (self->gquark_err_tag, response->code,
1264 "%s: failed to subscribe the given topic, %s: %s", __func__,
1265 self->mqtt_topic, response->message);
1267 g_cond_broadcast (&self->mqtt_src_gcond);
1268 g_mutex_unlock (&self->mqtt_src_mutex);
1280 g_mutex_lock (&self->mqtt_src_mutex);
1281 self->is_subscribed =
FALSE;
1282 g_cond_broadcast (&self->mqtt_src_gcond);
1283 g_mutex_unlock (&self->mqtt_src_mutex);
1294 g_mutex_lock (&self->mqtt_src_mutex);
1296 self->err = g_error_new (self->gquark_err_tag, response->code,
1297 "%s: failed to unsubscribe the given topic, %s: %s", __func__,
1298 self->mqtt_topic, response->message);
1300 g_cond_broadcast (&self->mqtt_src_gcond);
1301 g_mutex_unlock (&self->mqtt_src_mutex);
1310 MQTTAsync_responseOptions opts =
self->mqtt_respn_opts;
1315 opts.subscribeOptions.retainHandling = 1;
1317 mqttasync_ret = MQTTAsync_subscribe (self->mqtt_client_handle,
1318 self->mqtt_topic, self->mqtt_qos, &opts);
1319 if (mqttasync_ret != MQTTASYNC_SUCCESS)
1330 MQTTAsync_responseOptions opts =
self->mqtt_respn_opts;
1336 mqttasync_ret = MQTTAsync_unsubscribe (self->mqtt_client_handle,
1337 self->mqtt_topic, &opts);
1338 if (mqttasync_ret != MQTTASYNC_SUCCESS)
1348 GstMapInfo * hdr_map_info)
1351 g_return_val_if_fail (*hdr_mem != NULL, NULL);
1353 if (!gst_memory_map (*hdr_mem, hdr_map_info, GST_MAP_READ)) {
1354 gst_memory_unref (*hdr_mem);
1369 gint64 diff_base_epoch = hdr->
base_time_epoch -
self->base_time_epoch;
1371 buf->pts = GST_CLOCK_TIME_NONE;
1372 buf->dts = GST_CLOCK_TIME_NONE;
1373 buf->duration = GST_CLOCK_TIME_NONE;
1378 if (((GstClockTimeDiff) hdr->
pts + diff_base_epoch) < 0)
1381 if (hdr->
pts != GST_CLOCK_TIME_NONE) {
1382 buf->pts = hdr->
pts + diff_base_epoch;
1385 if (hdr->
dts != GST_CLOCK_TIME_NONE) {
1386 buf->dts = hdr->
dts + diff_base_epoch;
1392 GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (
self));
1395 clock = gst_element_get_clock (GST_ELEMENT (
self));
1398 GST_DEBUG_OBJECT (
self,
1399 "%s diff %" GST_STIME_FORMAT
" now %" GST_TIME_FORMAT
" ts (%"
1400 GST_TIME_FORMAT
" -> %" GST_TIME_FORMAT
")", self->mqtt_topic,
1401 GST_STIME_ARGS (diff_base_epoch),
1402 GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
1403 GST_TIME_ARGS (hdr->
pts), GST_TIME_ARGS (buf->pts));
1405 gst_object_unref (clock);