Doxygen Book
mqttsrc.c
Go to the documentation of this file.
1 /* SPDX-License-Identifier: LGPL-2.1-only */
14 #ifdef HAVE_CONFIG_H
15 #include <config.h>
16 #endif
17 
18 #ifdef G_OS_WIN32
19 #include <process.h>
20 #else
21 #include <sys/types.h>
22 #include <unistd.h>
23 #endif
24 
25 #include <gst/base/gstbasesrc.h>
26 #include <MQTTAsync.h>
27 #include <nnstreamer_util.h>
28 
29 #include "mqttsrc.h"
30 
31 static GstStaticPadTemplate src_pad_template = GST_STATIC_PAD_TEMPLATE ("src",
32  GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
33 
34 #define gst_mqtt_src_parent_class parent_class
35 G_DEFINE_TYPE (GstMqttSrc, gst_mqtt_src, GST_TYPE_BASE_SRC);
36 
37 GST_DEBUG_CATEGORY_STATIC (gst_mqtt_src_debug);
38 #define GST_CAT_DEFAULT gst_mqtt_src_debug
39 
40 enum
41 {
43 
54 
56 };
57 
58 enum
59 {
64  DEFAULT_MQTT_SUB_TIMEOUT = 10000000, /* 10 seconds */
65  DEFAULT_MQTT_SUB_TIMEOUT_MIN = 1000000, /* 1 seconds */
66  DEFAULT_MQTT_QOS = 2, /* Once and one only */
67 };
68 
69 static guint8 src_client_id = 0;
70 static const gchar DEFAULT_MQTT_HOST_ADDRESS[] = "127.0.0.1";
71 static const gchar DEFAULT_MQTT_HOST_PORT[] = "1883";
72 static const gchar TAG_ERR_MQTTSRC[] = "ERROR: MQTTSrc";
73 static const gchar DEFAULT_MQTT_CLIENT_ID[] =
74  "$HOSTNAME_$PID_^[0-9][0-9]?$|^255$";
75 static const gchar DEFAULT_MQTT_CLIENT_ID_FORMAT[] = "%s_%u_src%u";
76 
78 static void
79 gst_mqtt_src_set_property (GObject * object, guint prop_id,
80  const GValue * value, GParamSpec * pspec);
81 static void
82 gst_mqtt_src_get_property (GObject * object, guint prop_id,
83  GValue * value, GParamSpec * pspec);
84 static void gst_mqtt_src_class_finalize (GObject * object);
85 
86 static GstStateChangeReturn
87 gst_mqtt_src_change_state (GstElement * element, GstStateChange transition);
88 
89 static gboolean gst_mqtt_src_start (GstBaseSrc * basesrc);
90 static gboolean gst_mqtt_src_stop (GstBaseSrc * basesrc);
91 static GstCaps *gst_mqtt_src_get_caps (GstBaseSrc * basesrc, GstCaps * filter);
92 static gboolean gst_mqtt_src_renegotiate (GstBaseSrc * basesrc);
93 
94 static void
95 gst_mqtt_src_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
96  GstClockTime * start, GstClockTime * end);
97 static gboolean gst_mqtt_src_is_seekable (GstBaseSrc * basesrc);
98 static GstFlowReturn
99 gst_mqtt_src_create (GstBaseSrc * basesrc, guint64 offset, guint size,
100  GstBuffer ** buf);
101 static gboolean gst_mqtt_src_query (GstBaseSrc * basesrc, GstQuery * query);
102 
103 static gboolean gst_mqtt_src_get_debug (GstMqttSrc * self);
104 static void gst_mqtt_src_set_debug (GstMqttSrc * self, const gboolean flag);
105 static gboolean gst_mqtt_src_get_is_live (GstMqttSrc * self);
106 static void gst_mqtt_src_set_is_live (GstMqttSrc * self, const gboolean flag);
107 static gchar *gst_mqtt_src_get_client_id (GstMqttSrc * self);
108 static void gst_mqtt_src_set_client_id (GstMqttSrc * self, const gchar * id);
109 static gchar *gst_mqtt_src_get_host_address (GstMqttSrc * self);
110 static void gst_mqtt_src_set_host_address (GstMqttSrc * self,
111  const gchar * addr);
112 static gchar *gst_mqtt_src_get_host_port (GstMqttSrc * self);
113 static void gst_mqtt_src_set_host_port (GstMqttSrc * self, const gchar * port);
114 static gint64 gst_mqtt_src_get_sub_timeout (GstMqttSrc * self);
115 static void gst_mqtt_src_set_sub_timeout (GstMqttSrc * self, const gint64 t);
116 static gchar *gst_mqtt_src_get_sub_topic (GstMqttSrc * self);
117 static void gst_mqtt_src_set_sub_topic (GstMqttSrc * self, const gchar * topic);
118 static gboolean gst_mqtt_src_get_opt_cleansession (GstMqttSrc * self);
120  const gboolean val);
123  const gint num);
124 static gint gst_mqtt_src_get_mqtt_qos (GstMqttSrc * self);
125 static void gst_mqtt_src_set_mqtt_qos (GstMqttSrc * self, const gint qos);
126 
127 static void cb_mqtt_on_connection_lost (void *context, char *cause);
128 static int cb_mqtt_on_message_arrived (void *context, char *topic_name,
129  int topic_len, MQTTAsync_message * message);
130 static void cb_mqtt_on_connect (void *context,
131  MQTTAsync_successData * response);
132 static void cb_mqtt_on_connect_failure (void *context,
133  MQTTAsync_failureData * response);
134 static void cb_mqtt_on_subscribe (void *context,
135  MQTTAsync_successData * response);
136 static void cb_mqtt_on_subscribe_failure (void *context,
137  MQTTAsync_failureData * response);
138 static void cb_mqtt_on_unsubscribe (void *context,
139  MQTTAsync_successData * response);
140 static void cb_mqtt_on_unsubscribe_failure (void *context,
141  MQTTAsync_failureData * response);
142 
143 static void cb_memory_wrapped_destroy (void *p);
144 
145 static GstMQTTMessageHdr *_extract_mqtt_msg_hdr_from (GstMemory * mem,
146  GstMemory ** hdr_mem, GstMapInfo * hdr_map_info);
147 static void _put_timestamp_on_gst_buf (GstMqttSrc * self,
148  GstMQTTMessageHdr * hdr, GstBuffer * buf);
149 static gboolean _subscribe (GstMqttSrc * self);
150 static gboolean _unsubscribe (GstMqttSrc * self);
151 
155 static inline gboolean
157 {
158  if (!GST_BUFFER_PTS_IS_VALID (buf) && !GST_BUFFER_DTS_IS_VALID (buf) &&
159  !GST_BUFFER_DURATION_IS_VALID (buf))
160  return FALSE;
161  return TRUE;
162 }
163 
168 static void
170 {
171  MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
172  MQTTAsync_responseOptions respn_opts = MQTTAsync_responseOptions_initializer;
173  GstBaseSrc *basesrc = GST_BASE_SRC (self);
174 
175  self->gquark_err_tag = g_quark_from_string (TAG_ERR_MQTTSRC);
176 
177  gst_base_src_set_format (basesrc, GST_FORMAT_TIME);
178  gst_base_src_set_async (basesrc, FALSE);
179 
181  self->mqtt_client_handle = NULL;
182  self->debug = DEFAULT_DEBUG;
183  self->is_live = DEFAULT_IS_LIVE;
184  self->mqtt_client_id = g_strdup (DEFAULT_MQTT_CLIENT_ID);
185  self->mqtt_host_address = g_strdup (DEFAULT_MQTT_HOST_ADDRESS);
186  self->mqtt_host_port = g_strdup (DEFAULT_MQTT_HOST_PORT);
187  self->mqtt_topic = NULL;
188  self->mqtt_sub_timeout = (gint64) DEFAULT_MQTT_SUB_TIMEOUT;
189  self->mqtt_conn_opts = conn_opts;
190  self->mqtt_conn_opts.cleansession = DEFAULT_MQTT_OPT_CLEANSESSION;
191  self->mqtt_conn_opts.keepAliveInterval = DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL;
192  self->mqtt_conn_opts.onSuccess = cb_mqtt_on_connect;
193  self->mqtt_conn_opts.onFailure = cb_mqtt_on_connect_failure;
194  self->mqtt_conn_opts.context = self;
195  self->mqtt_respn_opts = respn_opts;
196  self->mqtt_respn_opts.onSuccess = NULL;
197  self->mqtt_respn_opts.onFailure = NULL;
198  self->mqtt_respn_opts.context = self;
199  self->mqtt_qos = DEFAULT_MQTT_QOS;
200 
202  self->err = NULL;
203  self->aqueue = g_async_queue_new ();
204  g_cond_init (&self->mqtt_src_gcond);
205  g_mutex_init (&self->mqtt_src_mutex);
206  g_mutex_lock (&self->mqtt_src_mutex);
207  self->is_connected = FALSE;
208  self->is_subscribed = FALSE;
209  self->latency = GST_CLOCK_TIME_NONE;
210  g_mutex_unlock (&self->mqtt_src_mutex);
211  self->base_time_epoch = GST_CLOCK_TIME_NONE;
212  self->caps = NULL;
213  self->num_dumped = 0;
214 
215  gst_base_src_set_live (basesrc, self->is_live);
216 }
217 
221 static void
223 {
224  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
225  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
226  GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
227 
228  GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_MQTT_ELEM_NAME_SRC, 0,
229  "MQTT src");
230 
231  gobject_class->set_property = gst_mqtt_src_set_property;
232  gobject_class->get_property = gst_mqtt_src_get_property;
233  gobject_class->finalize = gst_mqtt_src_class_finalize;
234 
235  g_object_class_install_property (gobject_class, PROP_DEBUG,
236  g_param_spec_boolean ("debug", "Debug",
237  "Produce extra verbose output for debug purpose", DEFAULT_DEBUG,
238  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
239 
240  g_object_class_install_property (gobject_class, PROP_IS_LIVE,
241  g_param_spec_boolean ("is-live", "Is Live",
242  "Synchronize the incoming buffers' timestamp with the current running time",
243  DEFAULT_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
244 
245  g_object_class_install_property (gobject_class, PROP_MQTT_CLIENT_ID,
246  g_param_spec_string ("client-id", "Client ID",
247  "The client identifier passed to the server (broker)",
248  DEFAULT_MQTT_CLIENT_ID, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
249 
250  g_object_class_install_property (gobject_class, PROP_MQTT_HOST_ADDRESS,
251  g_param_spec_string ("host", "Host", "Host (broker) to connect to",
253  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
254 
255  g_object_class_install_property (gobject_class, PROP_MQTT_HOST_PORT,
256  g_param_spec_string ("port", "Port",
257  "Network port of host (broker) to connect to", DEFAULT_MQTT_HOST_PORT,
258  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
259 
260  g_object_class_install_property (gobject_class, PROP_MQTT_SUB_TIMEOUT,
261  g_param_spec_int64 ("sub-timeout", "Timeout for receiving a message",
262  "The timeout (in microseconds) for receiving a message from subscribed topic",
263  1000000, G_MAXINT64, DEFAULT_MQTT_SUB_TIMEOUT,
264  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
265 
266  g_object_class_install_property (gobject_class, PROP_MQTT_SUB_TOPIC,
267  g_param_spec_string ("sub-topic", "Topic to Subscribe (mandatory)",
268  "The topic's name to subscribe", NULL,
269  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
270 
271  g_object_class_install_property (gobject_class, PROP_MQTT_OPT_CLEANSESSION,
272  g_param_spec_boolean ("cleansession", "Cleansession",
273  "When it is TRUE, the state information is discarded at connect and disconnect.",
275  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
276 
277  g_object_class_install_property (gobject_class,
279  g_param_spec_int ("keep-alive-interval", "Keep Alive Interval",
280  "The maximum time (in seconds) that should pass without communication between the client and the server (broker)",
282  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
283 
284  g_object_class_install_property (gobject_class, PROP_MQTT_QOS,
285  g_param_spec_int ("mqtt-qos", "mqtt QoS level",
286  "The QoS level of MQTT.\n"
287  "\t\t\t 0: At most once\n"
288  "\t\t\t 1: At least once\n"
289  "\t\t\t 2: Exactly once\n"
290  "\t\t\tsee also: https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/qos.html",
291  0, 2, DEFAULT_MQTT_QOS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
292 
293  gstelement_class->change_state =
294  GST_DEBUG_FUNCPTR (gst_mqtt_src_change_state);
295 
296  gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_mqtt_src_start);
297  gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_mqtt_src_stop);
298  gstbasesrc_class->get_caps = GST_DEBUG_FUNCPTR (gst_mqtt_src_get_caps);
299  gstbasesrc_class->get_times = GST_DEBUG_FUNCPTR (gst_mqtt_src_get_times);
300  gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR (gst_mqtt_src_is_seekable);
301  gstbasesrc_class->create = GST_DEBUG_FUNCPTR (gst_mqtt_src_create);
302  gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_mqtt_src_query);
303 
304  gst_element_class_set_static_metadata (gstelement_class,
305  "MQTT source", "Source/MQTT",
306  "Subscribe a MQTT topic and push incoming data to the GStreamer pipeline",
307  "Wook Song <wook16.song@samsung.com>");
308  gst_element_class_add_static_pad_template (gstelement_class,
310 }
311 
315 static void
316 gst_mqtt_src_set_property (GObject * object, guint prop_id,
317  const GValue * value, GParamSpec * pspec)
318 {
319  GstMqttSrc *self = GST_MQTT_SRC (object);
320 
321  switch (prop_id) {
322  case PROP_DEBUG:
323  gst_mqtt_src_set_debug (self, g_value_get_boolean (value));
324  break;
325  case PROP_IS_LIVE:
326  gst_mqtt_src_set_is_live (self, g_value_get_boolean (value));
327  break;
328  case PROP_MQTT_CLIENT_ID:
329  gst_mqtt_src_set_client_id (self, g_value_get_string (value));
330  break;
332  gst_mqtt_src_set_host_address (self, g_value_get_string (value));
333  break;
334  case PROP_MQTT_HOST_PORT:
335  gst_mqtt_src_set_host_port (self, g_value_get_string (value));
336  break;
338  gst_mqtt_src_set_sub_timeout (self, g_value_get_int64 (value));
339  break;
340  case PROP_MQTT_SUB_TOPIC:
341  gst_mqtt_src_set_sub_topic (self, g_value_get_string (value));
342  break;
344  gst_mqtt_src_set_opt_cleansession (self, g_value_get_boolean (value));
345  break;
347  gst_mqtt_src_set_opt_keep_alive_interval (self, g_value_get_int (value));
348  break;
349  case PROP_MQTT_QOS:
350  gst_mqtt_src_set_mqtt_qos (self, g_value_get_int (value));
351  break;
352  default:
353  G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
354  break;
355  }
356 }
357 
361 static void
362 gst_mqtt_src_get_property (GObject * object, guint prop_id,
363  GValue * value, GParamSpec * pspec)
364 {
365  GstMqttSrc *self = GST_MQTT_SRC (object);
366 
367  switch (prop_id) {
368  case PROP_DEBUG:
369  g_value_set_boolean (value, gst_mqtt_src_get_debug (self));
370  break;
371  case PROP_IS_LIVE:
372  g_value_set_boolean (value, gst_mqtt_src_get_is_live (self));
373  break;
374  case PROP_MQTT_CLIENT_ID:
376  break;
379  break;
380  case PROP_MQTT_HOST_PORT:
382  break;
384  g_value_set_int64 (value, gst_mqtt_src_get_sub_timeout (self));
385  break;
386  case PROP_MQTT_SUB_TOPIC:
388  break;
390  g_value_set_boolean (value, gst_mqtt_src_get_opt_cleansession (self));
391  break;
393  g_value_set_int (value, gst_mqtt_src_get_opt_keep_alive_interval (self));
394  break;
395  case PROP_MQTT_QOS:
396  g_value_set_int (value, gst_mqtt_src_get_mqtt_qos (self));
397  break;
398  default:
399  G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
400  break;
401  }
402 }
403 
407 static void
408 gst_mqtt_src_class_finalize (GObject * object)
409 {
410  GstMqttSrc *self = GST_MQTT_SRC (object);
411  GstBuffer *remained;
412 
413  if (self->mqtt_client_handle) {
414  MQTTAsync_destroy (&self->mqtt_client_handle);
415  self->mqtt_client_handle = NULL;
416  }
417 
418  g_free (self->mqtt_client_id);
419  g_free (self->mqtt_host_address);
420  g_free (self->mqtt_host_port);
421  g_free (self->mqtt_topic);
422  gst_caps_replace (&self->caps, NULL);
423 
424  if (self->err)
425  g_error_free (self->err);
426 
427  while ((remained = g_async_queue_try_pop (self->aqueue))) {
428  gst_buffer_unref (remained);
429  }
430  g_clear_pointer (&self->aqueue, g_async_queue_unref);
431 
432  g_mutex_clear (&self->mqtt_src_mutex);
433  G_OBJECT_CLASS (parent_class)->finalize (object);
434 }
435 
439 static GstStateChangeReturn
440 gst_mqtt_src_change_state (GstElement * element, GstStateChange transition)
441 {
442  GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
443  GstMqttSrc *self = GST_MQTT_SRC (element);
444  gboolean no_preroll = FALSE;
445  GstClock *elem_clock;
446  GstClockTime base_time;
447  GstClockTime cur_time;
448  GstClockTimeDiff diff;
449 
450  switch (transition) {
451  case GST_STATE_CHANGE_NULL_TO_READY:
452  GST_INFO_OBJECT (self, "GST_STATE_CHANGE_NULL_TO_READY");
453  if (self->err) {
454  g_printerr ("%s: %s\n", g_quark_to_string (self->err->domain),
455  self->err->message);
456  return GST_STATE_CHANGE_FAILURE;
457  }
458  break;
459  case GST_STATE_CHANGE_READY_TO_PAUSED:
460  GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_PAUSED");
461  /* Regardless of the 'is-live''s value, prerolling is not supported */
462  no_preroll = TRUE;
463  break;
464  case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
465  GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_PLAYING");
466  self->base_time_epoch = GST_CLOCK_TIME_NONE;
467  elem_clock = gst_element_get_clock (element);
468  if (!elem_clock)
469  break;
470  base_time = gst_element_get_base_time (element);
471  cur_time = gst_clock_get_time (elem_clock);
472  gst_object_unref (elem_clock);
473  diff = GST_CLOCK_DIFF (base_time, cur_time);
474  self->base_time_epoch =
475  g_get_real_time () * GST_US_TO_NS_MULTIPLIER - diff;
476 
478  if (GST_BASE_SRC_IS_STARTED (GST_BASE_SRC (self)) &&
479  (self->is_connected == FALSE)) {
480  int conn = MQTTAsync_reconnect (self->mqtt_client_handle);
481 
482  if (conn != MQTTASYNC_SUCCESS) {
483  GST_ERROR_OBJECT (self, "Failed to re-subscribe to %s",
484  self->mqtt_topic);
485 
486  return GST_STATE_CHANGE_FAILURE;
487  }
488  }
489  break;
490  default:
491  break;
492  }
493 
494  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
495 
496  switch (transition) {
497  case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
498  if (self->is_subscribed && !_unsubscribe (self)) {
499  GST_ERROR_OBJECT (self, "Cannot unsubscribe to %s", self->mqtt_topic);
500  }
501  GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PLAYING_TO_PAUSED");
502  break;
503  case GST_STATE_CHANGE_PAUSED_TO_READY:
504  GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_READY");
505  break;
506  case GST_STATE_CHANGE_READY_TO_NULL:
507  GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_NULL");
508  default:
509  break;
510  }
511 
512  if (no_preroll && ret == GST_STATE_CHANGE_SUCCESS)
513  ret = GST_STATE_CHANGE_NO_PREROLL;
514 
515  return ret;
516 }
517 
521 static gboolean
522 gst_mqtt_src_start (GstBaseSrc * basesrc)
523 {
524  GstMqttSrc *self = GST_MQTT_SRC (basesrc);
525  gchar *haddr = g_strdup_printf ("%s:%s", self->mqtt_host_address,
526  self->mqtt_host_port);
527  int ret;
528  gint64 end_time;
529 
530  if (!g_strcmp0 (DEFAULT_MQTT_CLIENT_ID, self->mqtt_client_id)) {
531  g_free (self->mqtt_client_id);
532  self->mqtt_client_id = g_strdup_printf (DEFAULT_MQTT_CLIENT_ID_FORMAT,
533  g_get_host_name (), getpid (), src_client_id++);
534  }
535 
544  ret = MQTTAsync_create (&self->mqtt_client_handle, haddr,
545  self->mqtt_client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
546  g_free (haddr);
547  if (ret != MQTTASYNC_SUCCESS)
548  return FALSE;
549 
550  MQTTAsync_setCallbacks (self->mqtt_client_handle, self,
552 
553  ret = MQTTAsync_connect (self->mqtt_client_handle, &self->mqtt_conn_opts);
554  if (ret != MQTTASYNC_SUCCESS)
555  goto error;
556 
557  /* Waiting for the connection */
558  end_time = g_get_monotonic_time () +
559  DEFAULT_MQTT_CONN_TIMEOUT_SEC * G_TIME_SPAN_SECOND;
560  g_mutex_lock (&self->mqtt_src_mutex);
561  while (!self->is_connected) {
562  if (!g_cond_wait_until (&self->mqtt_src_gcond, &self->mqtt_src_mutex,
563  end_time)) {
564  g_mutex_unlock (&self->mqtt_src_mutex);
565  g_critical ("Failed to connect to MQTT broker from mqttsrc."
566  "Please check broker is running status or broker host address.");
567  goto error;
568  }
569  }
570  g_mutex_unlock (&self->mqtt_src_mutex);
571  return TRUE;
572 
573 error:
574  MQTTAsync_destroy (&self->mqtt_client_handle);
575  self->mqtt_client_handle = NULL;
576  return FALSE;
577 }
578 
582 static gboolean
583 gst_mqtt_src_stop (GstBaseSrc * basesrc)
584 {
585  GstMqttSrc *self = GST_MQTT_SRC (basesrc);
586 
587  /* todo */
588  MQTTAsync_disconnect (self->mqtt_client_handle, NULL);
589  g_mutex_lock (&self->mqtt_src_mutex);
590  self->is_connected = FALSE;
591  g_mutex_unlock (&self->mqtt_src_mutex);
592  MQTTAsync_destroy (&self->mqtt_client_handle);
593  self->mqtt_client_handle = NULL;
594  return TRUE;
595 }
596 
600 static GstCaps *
601 gst_mqtt_src_get_caps (GstBaseSrc * basesrc, GstCaps * filter)
602 {
603  GstPad *pad = GST_BASE_SRC_PAD (basesrc);
604  GstCaps *cur_caps = gst_pad_get_current_caps (pad);
605  GstCaps *caps = gst_caps_new_any ();
606  UNUSED (filter);
607 
608  if (cur_caps) {
609  GstCaps *intersection =
610  gst_caps_intersect_full (cur_caps, caps, GST_CAPS_INTERSECT_FIRST);
611 
612  gst_caps_unref (cur_caps);
613  gst_caps_unref (caps);
614  caps = intersection;
615  }
616 
617  return caps;
618 }
619 
623 static gboolean
624 gst_mqtt_src_renegotiate (GstBaseSrc * basesrc)
625 {
626  GstMqttSrc *self = GST_MQTT_SRC (basesrc);
627  GstCaps *peercaps = NULL;
628  GstCaps *thiscaps;
629  gboolean result = FALSE;
630 
631  if (self->caps == NULL || gst_caps_is_any (self->caps))
632  goto no_nego_needed;
633 
634  thiscaps = gst_pad_get_current_caps (GST_BASE_SRC_PAD (basesrc));
635  if (thiscaps && gst_caps_is_equal (self->caps, thiscaps)) {
636  gst_caps_unref (thiscaps);
637  goto no_nego_needed;
638  }
639 
640  if (thiscaps)
641  gst_caps_unref (thiscaps);
642 
643  peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), self->caps);
644  if (gst_caps_is_empty (peercaps) || peercaps == self->caps) {
645  gst_caps_unref (peercaps);
646  goto no_nego_needed;
647  }
648 
649  if (gst_caps_is_any (peercaps)) {
650  result = TRUE;
651  } else {
652  peercaps = gst_caps_fixate (peercaps);
653  if (gst_caps_is_fixed (peercaps)) {
654  result = gst_base_src_set_caps (basesrc, peercaps);
655  }
656  }
657 
658  gst_caps_unref (peercaps);
659 
660  return result;
661 
662 no_nego_needed:
663  {
664  GST_DEBUG_OBJECT (self, "no negotiation needed");
665 
666  return TRUE;
667  }
668 }
669 
673 static void
674 gst_mqtt_src_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
675  GstClockTime * start, GstClockTime * end)
676 {
677  GstClockTime sync_ts;
678  GstClockTime duration;
679  UNUSED (basesrc);
680 
681  sync_ts = GST_BUFFER_DTS (buffer);
682  duration = GST_BUFFER_DURATION (buffer);
683 
684  if (!GST_CLOCK_TIME_IS_VALID (sync_ts))
685  sync_ts = GST_BUFFER_PTS (buffer);
686 
687  if (GST_CLOCK_TIME_IS_VALID (sync_ts)) {
688  *start = sync_ts;
689  if (GST_CLOCK_TIME_IS_VALID (duration)) {
690  *end = sync_ts + duration;
691  }
692  }
693 }
694 
699 static gboolean
700 gst_mqtt_src_is_seekable (GstBaseSrc * basesrc)
701 {
702  UNUSED (basesrc);
703  return FALSE;
704 }
705 
709 static GstFlowReturn
710 gst_mqtt_src_create (GstBaseSrc * basesrc, guint64 offset, guint size,
711  GstBuffer ** buf)
712 {
713  GstMqttSrc *self = GST_MQTT_SRC (basesrc);
714  gint64 elapsed = self->mqtt_sub_timeout;
715  UNUSED (offset);
716  UNUSED (size);
717 
718  g_mutex_lock (&self->mqtt_src_mutex);
719  while ((!self->is_connected) || (!self->is_subscribed)) {
720  gint64 end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND;
721 
722  g_cond_wait_until (&self->mqtt_src_gcond, &self->mqtt_src_mutex, end_time);
723  if (self->err) {
724  g_mutex_unlock (&self->mqtt_src_mutex);
725  goto ret_flow_err;
726  }
727  }
728  g_mutex_unlock (&self->mqtt_src_mutex);
729 
730  while (elapsed > 0) {
732  *buf = g_async_queue_timeout_pop (self->aqueue,
734  if (*buf) {
735  GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
736  GstClockTime ulatency = GST_CLOCK_TIME_NONE;
737  GstClock *clock;
738 
740  if (!_is_gst_buffer_timestamp_valid (*buf)) {
741  if (self->debug) {
742  GST_DEBUG_OBJECT (self,
743  "%s: Dumped the received buffer! (total: %" G_GUINT64_FORMAT ")",
744  self->mqtt_topic, ++self->num_dumped);
745  }
746  elapsed = self->mqtt_sub_timeout;
747  gst_buffer_unref (*buf);
748  continue;
749  }
750 
752  clock = gst_element_get_clock (GST_ELEMENT (self));
753  if (clock) {
754  GstClockTime cur_time = gst_clock_get_time (clock);
755  GstClockTime buf_ts = GST_BUFFER_TIMESTAMP (*buf);
756  GstClockTimeDiff latency = 0;
757 
758  if ((base_time != GST_CLOCK_TIME_NONE) &&
759  (cur_time != GST_CLOCK_TIME_NONE) &&
760  (buf_ts != GST_CLOCK_TIME_NONE)) {
761  GstClockTimeDiff now = GST_CLOCK_DIFF (base_time, cur_time);
762 
763  latency = GST_CLOCK_DIFF (buf_ts, (GstClockTime) now);
764  }
765 
766  if (latency > 0) {
767  ulatency = (GstClockTime) latency;
768 
769  if (GST_BUFFER_DURATION_IS_VALID (*buf)) {
770  GstClockTime duration = GST_BUFFER_DURATION (*buf);
771 
772  if (duration >= ulatency) {
773  ulatency = GST_CLOCK_TIME_NONE;
774  }
775  }
776  }
777  gst_object_unref (clock);
778  }
779 
780  g_mutex_lock (&self->mqtt_src_mutex);
781  self->latency = ulatency;
782  g_mutex_unlock (&self->mqtt_src_mutex);
789  break;
790  } else if (self->err) {
791  break;
792  }
793  elapsed = elapsed - DEFAULT_MQTT_SUB_TIMEOUT_MIN;
794  }
795 
796  if (*buf == NULL) {
798  if (!self->err)
799  self->err = g_error_new (self->gquark_err_tag, GST_FLOW_EOS,
800  "%s: Timeout for receiving a message has been expired. Regarding as an error",
801  __func__);
802  goto ret_flow_err;
803  }
804 
805  return GST_FLOW_OK;
806 
807 ret_flow_err:
808  if (self->err) {
809  g_printerr ("%s: %s\n", g_quark_to_string (self->err->domain),
810  self->err->message);
811  }
812  return GST_FLOW_ERROR;
813 }
814 
818 static gboolean
819 gst_mqtt_src_query (GstBaseSrc * basesrc, GstQuery * query)
820 {
821  GstQueryType type = GST_QUERY_TYPE (query);
822  GstMqttSrc *self = GST_MQTT_SRC (basesrc);
823  gboolean res = FALSE;
824 
825  if (self->debug)
826  GST_DEBUG_OBJECT (self, "Got %s event", gst_query_type_get_name (type));
827 
828  switch (type) {
829  case GST_QUERY_LATENCY:{
830  GstClockTime min_latency = 0;
831  GstClockTime max_latency = GST_CLOCK_TIME_NONE;
832 
833  g_mutex_lock (&self->mqtt_src_mutex);
834  if (self->latency != GST_CLOCK_TIME_NONE) {
835  min_latency = self->latency;
836  }
837  g_mutex_unlock (&self->mqtt_src_mutex);
838 
839  if (self->debug) {
840  GST_DEBUG_OBJECT (self,
841  "Reporting latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT,
842  GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
843  }
848  gst_query_set_latency (query, TRUE, min_latency, max_latency);
849 
850  res = TRUE;
851  break;
852  }
853  default:{
854  res = GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query);
855  }
856  }
857 
858  return res;
859 }
860 
864 static gboolean
866 {
867  return self->debug;
868 }
869 
873 static void
874 gst_mqtt_src_set_debug (GstMqttSrc * self, const gboolean flag)
875 {
876  self->debug = flag;
877 }
878 
882 static gboolean
884 {
885  return self->is_live;
886 }
887 
891 static void
892 gst_mqtt_src_set_is_live (GstMqttSrc * self, const gboolean flag)
893 {
894  self->is_live = flag;
895  gst_base_src_set_live (GST_BASE_SRC (self), self->is_live);
896 }
897 
901 static gchar *
903 {
904  return self->mqtt_client_id;
905 }
906 
910 static void
911 gst_mqtt_src_set_client_id (GstMqttSrc * self, const gchar * id)
912 {
913  g_free (self->mqtt_client_id);
914  self->mqtt_client_id = g_strdup (id);
915 }
916 
920 static gchar *
922 {
923  return self->mqtt_host_address;
924 }
925 
929 static void
930 gst_mqtt_src_set_host_address (GstMqttSrc * self, const gchar * addr)
931 {
935  g_free (self->mqtt_host_address);
936  self->mqtt_host_address = g_strdup (addr);
937 }
938 
942 static gchar *
944 {
945  return self->mqtt_host_port;
946 }
947 
951 static void
952 gst_mqtt_src_set_host_port (GstMqttSrc * self, const gchar * port)
953 {
954  g_free (self->mqtt_host_port);
955  self->mqtt_host_port = g_strdup (port);
956 }
957 
961 static gint64
963 {
964  return self->mqtt_sub_timeout;
965 }
966 
970 static void
971 gst_mqtt_src_set_sub_timeout (GstMqttSrc * self, const gint64 t)
972 {
973  self->mqtt_sub_timeout = t;
974 }
975 
979 static gchar *
981 {
982  return self->mqtt_topic;
983 }
984 
988 static void
989 gst_mqtt_src_set_sub_topic (GstMqttSrc * self, const gchar * topic)
990 {
991  g_free (self->mqtt_topic);
992  self->mqtt_topic = g_strdup (topic);
993 }
994 
998 static gboolean
1000 {
1001  return self->mqtt_conn_opts.cleansession;
1002 }
1003 
1007 static void
1008 gst_mqtt_src_set_opt_cleansession (GstMqttSrc * self, const gboolean val)
1009 {
1010  self->mqtt_conn_opts.cleansession = val;
1011 }
1012 
1016 static gint
1018 {
1019  return self->mqtt_conn_opts.keepAliveInterval;
1020 }
1021 
1025 static void
1027 {
1028  self->mqtt_conn_opts.keepAliveInterval = num;
1029 }
1030 
1034 static gint
1036 {
1037  return self->mqtt_qos;
1038 }
1039 
1043 static void
1044 gst_mqtt_src_set_mqtt_qos (GstMqttSrc * self, const gint qos)
1045 {
1046  self->mqtt_qos = qos;
1047 }
1048 
1052 static void
1053 cb_mqtt_on_connection_lost (void *context, char *cause)
1054 {
1055  GstMqttSrc *self = GST_MQTT_SRC_CAST (context);
1056  UNUSED (cause);
1057 
1058  g_mutex_lock (&self->mqtt_src_mutex);
1059  self->is_connected = FALSE;
1060  self->is_subscribed = FALSE;
1061  g_cond_broadcast (&self->mqtt_src_gcond);
1062  if (!self->err) {
1063  self->err = g_error_new (self->gquark_err_tag, EHOSTDOWN,
1064  "Connection to the host (broker) has been lost: %s \n"
1065  "\t\tfor detail, please check the log message of the broker",
1066  g_strerror (EHOSTDOWN));
1067  }
1068  g_mutex_unlock (&self->mqtt_src_mutex);
1069 }
1070 
1074 static int
1075 cb_mqtt_on_message_arrived (void *context, char *topic_name, int topic_len,
1076  MQTTAsync_message * message)
1077 {
1078  const int size = message->payloadlen;
1079  guint8 *data = message->payload;
1080  GstMQTTMessageHdr *mqtt_msg_hdr;
1081  GstMapInfo hdr_map_info;
1082  GstMemory *received_mem;
1083  GstMemory *hdr_mem;
1084  GstBuffer *buffer;
1085  GstBaseSrc *basesrc;
1086  GstMqttSrc *self;
1087  GstClock *clock;
1088  GstCaps *recv_caps;
1089  gsize offset;
1090  guint i;
1091  UNUSED (topic_name);
1092  UNUSED (topic_len);
1093 
1094  self = GST_MQTT_SRC_CAST (context);
1095  g_mutex_lock (&self->mqtt_src_mutex);
1096  if (!self->is_subscribed) {
1097  g_mutex_unlock (&self->mqtt_src_mutex);
1098 
1099  return TRUE;
1100  }
1101  g_mutex_unlock (&self->mqtt_src_mutex);
1102 
1103  basesrc = GST_BASE_SRC (self);
1104  clock = gst_element_get_clock (GST_ELEMENT (self));
1105  received_mem = gst_memory_new_wrapped (0, data, size, 0, size, message,
1106  (GDestroyNotify) cb_memory_wrapped_destroy);
1107  if (!received_mem) {
1108  if (!self->err) {
1109  self->err = g_error_new (self->gquark_err_tag, ENODATA,
1110  "%s: failed to wrap the raw data of received message in GstMemory: %s",
1111  __func__, g_strerror (ENODATA));
1112  }
1113  return TRUE;
1114  }
1115 
1116  mqtt_msg_hdr = _extract_mqtt_msg_hdr_from (received_mem, &hdr_mem,
1117  &hdr_map_info);
1118  if (!mqtt_msg_hdr) {
1119  if (!self->err) {
1120  self->err = g_error_new (self->gquark_err_tag, ENODATA,
1121  "%s: failed to extract header information from received message: %s",
1122  __func__, g_strerror (ENODATA));
1123  }
1124  goto ret_unref_received_mem;
1125  }
1126 
1127  recv_caps = gst_caps_from_string (mqtt_msg_hdr->gst_caps_str);
1128  if (recv_caps) {
1129  if (!self->caps || !gst_caps_is_equal (self->caps, recv_caps)) {
1130  gst_caps_replace (&self->caps, recv_caps);
1131  gst_mqtt_src_renegotiate (basesrc);
1132  }
1133 
1134  gst_caps_unref (recv_caps);
1135  }
1136 
1137  buffer = gst_buffer_new ();
1138  offset = GST_MQTT_LEN_MSG_HDR;
1139  for (i = 0; i < mqtt_msg_hdr->num_mems; ++i) {
1140  GstMemory *each_memory;
1141  int each_size;
1142 
1143  each_size = mqtt_msg_hdr->size_mems[i];
1144  each_memory = gst_memory_share (received_mem, offset, each_size);
1145  gst_buffer_append_memory (buffer, each_memory);
1146  offset += each_size;
1147  }
1148 
1150  if (self->debug) {
1151  GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
1152 
1153  if (clock) {
1154  GST_DEBUG_OBJECT (self,
1155  "A message has been arrived at %" GST_TIME_FORMAT
1156  " and queue length is %d",
1157  GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
1158  g_async_queue_length (self->aqueue));
1159 
1160  gst_object_unref (clock);
1161  }
1162  }
1163  _put_timestamp_on_gst_buf (self, mqtt_msg_hdr, buffer);
1164  g_async_queue_push (self->aqueue, buffer);
1165 
1166  gst_memory_unmap (hdr_mem, &hdr_map_info);
1167  gst_memory_unref (hdr_mem);
1168 
1169 ret_unref_received_mem:
1170  gst_memory_unref (received_mem);
1171 
1172  return TRUE;
1173 }
1174 
1178 static void
1180 {
1181  MQTTAsync_message *msg = p;
1182 
1183  MQTTAsync_freeMessage (&msg);
1184 }
1185 
1189 static void
1190 cb_mqtt_on_connect (void *context, MQTTAsync_successData * response)
1191 {
1192  GstMqttSrc *self = GST_MQTT_SRC (context);
1193  GstBaseSrc *basesrc = GST_BASE_SRC (self);
1194  int ret;
1195  UNUSED (response);
1196 
1197  g_mutex_lock (&self->mqtt_src_mutex);
1198  self->is_connected = TRUE;
1199  g_cond_broadcast (&self->mqtt_src_gcond);
1200  g_mutex_unlock (&self->mqtt_src_mutex);
1201 
1203  if (gst_base_src_is_async (basesrc) &&
1204  (ret = gst_base_src_start_wait (basesrc)) != GST_FLOW_OK) {
1205  g_mutex_lock (&self->mqtt_src_mutex);
1206  self->err = g_error_new (self->gquark_err_tag, ret,
1207  "%s: the virtual method, start (), in the GstBaseSrc class fails with return code %d",
1208  __func__, ret);
1209  g_cond_broadcast (&self->mqtt_src_gcond);
1210  g_mutex_unlock (&self->mqtt_src_mutex);
1211  return;
1212  }
1213 
1214  if (!_subscribe (self)) {
1215  GST_ERROR_OBJECT (self, "Failed to subscribe to %s", self->mqtt_topic);
1216  }
1217 }
1218 
1222 static void
1223 cb_mqtt_on_connect_failure (void *context, MQTTAsync_failureData * response)
1224 {
1225  GstMqttSrc *self = GST_MQTT_SRC (context);
1226 
1227  g_mutex_lock (&self->mqtt_src_mutex);
1228  self->is_connected = FALSE;
1229 
1230  if (!self->err) {
1231  self->err = g_error_new (self->gquark_err_tag, response->code,
1232  "%s: failed to connect to the broker: %s", __func__, response->message);
1233  }
1234  g_cond_broadcast (&self->mqtt_src_gcond);
1235  g_mutex_unlock (&self->mqtt_src_mutex);
1236 }
1237 
1241 static void
1242 cb_mqtt_on_subscribe (void *context, MQTTAsync_successData * response)
1243 {
1244  GstMqttSrc *self = GST_MQTT_SRC (context);
1245  UNUSED (response);
1246 
1247  g_mutex_lock (&self->mqtt_src_mutex);
1248  self->is_subscribed = TRUE;
1249  g_cond_broadcast (&self->mqtt_src_gcond);
1250  g_mutex_unlock (&self->mqtt_src_mutex);
1251 }
1252 
1256 static void
1257 cb_mqtt_on_subscribe_failure (void *context, MQTTAsync_failureData * response)
1258 {
1259  GstMqttSrc *self = GST_MQTT_SRC (context);
1260 
1261  g_mutex_lock (&self->mqtt_src_mutex);
1262  if (!self->err) {
1263  self->err = g_error_new (self->gquark_err_tag, response->code,
1264  "%s: failed to subscribe the given topic, %s: %s", __func__,
1265  self->mqtt_topic, response->message);
1266  }
1267  g_cond_broadcast (&self->mqtt_src_gcond);
1268  g_mutex_unlock (&self->mqtt_src_mutex);
1269 }
1270 
1274 static void
1275 cb_mqtt_on_unsubscribe (void *context, MQTTAsync_successData * response)
1276 {
1277  GstMqttSrc *self = GST_MQTT_SRC (context);
1278  UNUSED (response);
1279 
1280  g_mutex_lock (&self->mqtt_src_mutex);
1281  self->is_subscribed = FALSE;
1282  g_cond_broadcast (&self->mqtt_src_gcond);
1283  g_mutex_unlock (&self->mqtt_src_mutex);
1284 }
1285 
1289 static void
1290 cb_mqtt_on_unsubscribe_failure (void *context, MQTTAsync_failureData * response)
1291 {
1292  GstMqttSrc *self = GST_MQTT_SRC (context);
1293 
1294  g_mutex_lock (&self->mqtt_src_mutex);
1295  if (!self->err) {
1296  self->err = g_error_new (self->gquark_err_tag, response->code,
1297  "%s: failed to unsubscribe the given topic, %s: %s", __func__,
1298  self->mqtt_topic, response->message);
1299  }
1300  g_cond_broadcast (&self->mqtt_src_gcond);
1301  g_mutex_unlock (&self->mqtt_src_mutex);
1302 }
1303 
1307 static gboolean
1309 {
1310  MQTTAsync_responseOptions opts = self->mqtt_respn_opts;
1311  int mqttasync_ret;
1312 
1313  opts.onSuccess = cb_mqtt_on_subscribe;
1314  opts.onFailure = cb_mqtt_on_subscribe_failure;
1315  opts.subscribeOptions.retainHandling = 1;
1316 
1317  mqttasync_ret = MQTTAsync_subscribe (self->mqtt_client_handle,
1318  self->mqtt_topic, self->mqtt_qos, &opts);
1319  if (mqttasync_ret != MQTTASYNC_SUCCESS)
1320  return FALSE;
1321  return TRUE;
1322 }
1323 
1327 static gboolean
1329 {
1330  MQTTAsync_responseOptions opts = self->mqtt_respn_opts;
1331  int mqttasync_ret;
1332 
1333  opts.onSuccess = cb_mqtt_on_unsubscribe;
1334  opts.onFailure = cb_mqtt_on_unsubscribe_failure;
1335 
1336  mqttasync_ret = MQTTAsync_unsubscribe (self->mqtt_client_handle,
1337  self->mqtt_topic, &opts);
1338  if (mqttasync_ret != MQTTASYNC_SUCCESS)
1339  return FALSE;
1340  return TRUE;
1341 }
1342 
1346 static GstMQTTMessageHdr *
1347 _extract_mqtt_msg_hdr_from (GstMemory * mem, GstMemory ** hdr_mem,
1348  GstMapInfo * hdr_map_info)
1349 {
1350  *hdr_mem = gst_memory_share (mem, 0, GST_MQTT_LEN_MSG_HDR);
1351  g_return_val_if_fail (*hdr_mem != NULL, NULL);
1352 
1353  if (!gst_memory_map (*hdr_mem, hdr_map_info, GST_MAP_READ)) {
1354  gst_memory_unref (*hdr_mem);
1355  return NULL;
1356  }
1357 
1358  return (GstMQTTMessageHdr *) hdr_map_info->data;
1359 }
1360 
1365 static void
1367  GstBuffer * buf)
1368 {
1369  gint64 diff_base_epoch = hdr->base_time_epoch - self->base_time_epoch;
1370 
1371  buf->pts = GST_CLOCK_TIME_NONE;
1372  buf->dts = GST_CLOCK_TIME_NONE;
1373  buf->duration = GST_CLOCK_TIME_NONE;
1374 
1375  if (hdr->sent_time_epoch < self->base_time_epoch)
1376  return;
1377 
1378  if (((GstClockTimeDiff) hdr->pts + diff_base_epoch) < 0)
1379  return;
1380 
1381  if (hdr->pts != GST_CLOCK_TIME_NONE) {
1382  buf->pts = hdr->pts + diff_base_epoch;
1383  }
1384 
1385  if (hdr->dts != GST_CLOCK_TIME_NONE) {
1386  buf->dts = hdr->dts + diff_base_epoch;
1387  }
1388 
1389  buf->duration = hdr->duration;
1390 
1391  if (self->debug) {
1392  GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
1393  GstClock *clock;
1394 
1395  clock = gst_element_get_clock (GST_ELEMENT (self));
1396 
1397  if (clock) {
1398  GST_DEBUG_OBJECT (self,
1399  "%s diff %" GST_STIME_FORMAT " now %" GST_TIME_FORMAT " ts (%"
1400  GST_TIME_FORMAT " -> %" GST_TIME_FORMAT ")", self->mqtt_topic,
1401  GST_STIME_ARGS (diff_base_epoch),
1402  GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
1403  GST_TIME_ARGS (hdr->pts), GST_TIME_ARGS (buf->pts));
1404 
1405  gst_object_unref (clock);
1406  }
1407  }
1408 }
_GstMQTTMessageHdr::gst_caps_str
gchar gst_caps_str[GST_MQTT_MAX_LEN_GST_CAPS_STR]
Definition: mqttcommon.h:59
DEFAULT_MQTT_CONN_TIMEOUT_SEC
#define DEFAULT_MQTT_CONN_TIMEOUT_SEC
Definition: mqttcommon.h:40
PROP_MQTT_HOST_ADDRESS
@ PROP_MQTT_HOST_ADDRESS
Definition: mqttsrc.c:47
PROP_MQTT_SUB_TOPIC
@ PROP_MQTT_SUB_TOPIC
Definition: mqttsrc.c:49
gst_mqtt_src_get_times
static void gst_mqtt_src_get_times(GstBaseSrc *basesrc, GstBuffer *buffer, GstClockTime *start, GstClockTime *end)
Return the time information of the given buffer.
Definition: mqttsrc.c:674
cb_mqtt_on_unsubscribe
static void cb_mqtt_on_unsubscribe(void *context, MQTTAsync_successData *response)
MQTTAsync_responseOptions's onSuccess callback for MQTTAsync_unsubscribe ()
Definition: mqttsrc.c:1275
_GstMqttSrc
GstMqttSrc data structure.
Definition: mqttsrc.h:46
data
svtc_1 data
Definition: gsttensor_if.c:844
gst_mqtt_src_get_opt_keep_alive_interval
static gint gst_mqtt_src_get_opt_keep_alive_interval(GstMqttSrc *self)
Getter for the 'keep-alive-interval' property.
Definition: mqttsrc.c:1017
gst_mqtt_src_start
static gboolean gst_mqtt_src_start(GstBaseSrc *basesrc)
Start mqttsrc, called when state changed null to ready.
Definition: mqttsrc.c:522
cb_mqtt_on_connection_lost
static void cb_mqtt_on_connection_lost(void *context, char *cause)
A callback to handle the connection lost to the broker.
Definition: mqttsrc.c:1053
FALSE
return FALSE
Definition: gsttensor_transform.c:590
result
case tensor_data_s gboolean * result
Definition: gsttensor_if.c:839
_GstMQTTMessageHdr::sent_time_epoch
gint64 sent_time_epoch
Definition: mqttcommon.h:55
GST_MQTT_LEN_MSG_HDR
#define GST_MQTT_LEN_MSG_HDR
Definition: mqttcommon.h:29
_GstMQTTMessageHdr::size_mems
gsize size_mems[GST_MQTT_MAX_NUM_MEMS]
Definition: mqttcommon.h:53
gst_mqtt_src_set_debug
static void gst_mqtt_src_set_debug(GstMqttSrc *self, const gboolean flag)
Setter for the 'debug' property.
Definition: mqttsrc.c:874
gst_mqtt_src_get_host_port
static gchar * gst_mqtt_src_get_host_port(GstMqttSrc *self)
Getter for the 'port' property.
Definition: mqttsrc.c:943
GST_MQTT_SRC
#define GST_MQTT_SRC(obj)
Definition: mqttsrc.h:27
_put_timestamp_on_gst_buf
static void _put_timestamp_on_gst_buf(GstMqttSrc *self, GstMQTTMessageHdr *hdr, GstBuffer *buf)
A utility function to put the timestamp information onto a GstBuffer-typed buffer using the given pac...
Definition: mqttsrc.c:1366
cb_mqtt_on_subscribe
static void cb_mqtt_on_subscribe(void *context, MQTTAsync_successData *response)
MQTTAsync_responseOptions's onSuccess callback for MQTTAsync_subscribe ()
Definition: mqttsrc.c:1242
gst_mqtt_src_get_opt_cleansession
static gboolean gst_mqtt_src_get_opt_cleansession(GstMqttSrc *self)
Getter for the 'cleansession' property.
Definition: mqttsrc.c:999
gst_mqtt_src_change_state
static GstStateChangeReturn gst_mqtt_src_change_state(GstElement *element, GstStateChange transition)
Handle mqttsrc's state change.
Definition: mqttsrc.c:440
gst_mqtt_src_get_caps
static GstCaps * gst_mqtt_src_get_caps(GstBaseSrc *basesrc, GstCaps *filter)
Get caps of subclass.
Definition: mqttsrc.c:601
PROP_MQTT_SUB_TIMEOUT
@ PROP_MQTT_SUB_TIMEOUT
Definition: mqttsrc.c:50
gst_mqtt_src_get_sub_topic
static gchar * gst_mqtt_src_get_sub_topic(GstMqttSrc *self)
Getter for the 'sub-topic' property.
Definition: mqttsrc.c:980
gst_mqtt_src_get_host_address
static gchar * gst_mqtt_src_get_host_address(GstMqttSrc *self)
Getter for the 'host' property.
Definition: mqttsrc.c:921
gst_mqtt_src_set_sub_timeout
static void gst_mqtt_src_set_sub_timeout(GstMqttSrc *self, const gint64 t)
Setter for the 'sub-timeout' property.
Definition: mqttsrc.c:971
DEFAULT_MQTT_CLIENT_ID_FORMAT
static const gchar DEFAULT_MQTT_CLIENT_ID_FORMAT[]
Definition: mqttsrc.c:75
gst_mqtt_src_get_mqtt_qos
static gint gst_mqtt_src_get_mqtt_qos(GstMqttSrc *self)
Getter for the 'mqtt-qos' property.
Definition: mqttsrc.c:1035
gst_mqtt_src_get_is_live
static gboolean gst_mqtt_src_get_is_live(GstMqttSrc *self)
Getter for the 'is-live' property.
Definition: mqttsrc.c:883
gst_mqtt_src_set_opt_cleansession
static void gst_mqtt_src_set_opt_cleansession(GstMqttSrc *self, const gboolean val)
Setter for the 'cleansession' property.
Definition: mqttsrc.c:1008
gst_mqtt_src_query
static gboolean gst_mqtt_src_query(GstBaseSrc *basesrc, GstQuery *query)
An implementation of the GstBaseSrc vmethod that handles queries.
Definition: mqttsrc.c:819
gst_mqtt_src_get_debug
static gboolean gst_mqtt_src_get_debug(GstMqttSrc *self)
Getter for the 'debug' property.
Definition: mqttsrc.c:865
PROP_MQTT_HOST_PORT
@ PROP_MQTT_HOST_PORT
Definition: mqttsrc.c:48
DEFAULT_DEBUG
@ DEFAULT_DEBUG
Definition: mqttsrc.c:60
cb_mqtt_on_unsubscribe_failure
static void cb_mqtt_on_unsubscribe_failure(void *context, MQTTAsync_failureData *response)
MQTTAsync_responseOptions's onFailure callback for MQTTAsync_unsubscribe ()
Definition: mqttsrc.c:1290
DEFAULT_MQTT_OPT_CLEANSESSION
@ DEFAULT_MQTT_OPT_CLEANSESSION
Definition: mqttsrc.c:62
g_free
g_free(self->option[(opnum) - 1])
opnum: \
gst_mqtt_src_renegotiate
static gboolean gst_mqtt_src_renegotiate(GstBaseSrc *basesrc)
Do negotiation procedure again if it needed.
Definition: mqttsrc.c:624
g_value_set_string
g_value_set_string(value, self->option[opnum - 1])
opnum: \
PROP_DEBUG
@ PROP_DEBUG
Definition: mqttsrc.c:44
DEFAULT_MQTT_HOST_ADDRESS
static const gchar DEFAULT_MQTT_HOST_ADDRESS[]
Definition: mqttsrc.c:70
cb_mqtt_on_message_arrived
static int cb_mqtt_on_message_arrived(void *context, char *topic_name, int topic_len, MQTTAsync_message *message)
A callback to handle the arrived message.
Definition: mqttsrc.c:1075
PROP_MQTT_CLIENT_ID
@ PROP_MQTT_CLIENT_ID
Definition: mqttsrc.c:46
GST_DEBUG_CATEGORY_STATIC
GST_DEBUG_CATEGORY_STATIC(gst_mqtt_src_debug)
_is_gst_buffer_timestamp_valid
static gboolean _is_gst_buffer_timestamp_valid(GstBuffer *buf)
A utility function to check whether the timestamp marked by _put_timestamp_on_gst_buf () is valid or ...
Definition: mqttsrc.c:156
GST_MQTT_SRC_CAST
#define GST_MQTT_SRC_CAST(obj)
Definition: mqttsrc.h:31
DEFAULT_MQTT_HOST_PORT
static const gchar DEFAULT_MQTT_HOST_PORT[]
Definition: mqttsrc.c:71
GST_MQTT_ELEM_NAME_SRC
#define GST_MQTT_ELEM_NAME_SRC
Definition: mqttcommon.h:27
gst_mqtt_src_create
static GstFlowReturn gst_mqtt_src_create(GstBaseSrc *basesrc, guint64 offset, guint size, GstBuffer **buf)
Create a buffer containing the subscribed data.
Definition: mqttsrc.c:710
cb_mqtt_on_connect
static void cb_mqtt_on_connect(void *context, MQTTAsync_successData *response)
A callback invoked when the connection is established.
Definition: mqttsrc.c:1190
GST_CAT_DEFAULT
#define GST_CAT_DEFAULT
Definition: mqttsrc.c:38
gst_mqtt_src_class_init
static void gst_mqtt_src_class_init(GstMqttSrcClass *klass)
Initialize GstMqttSrcClass object.
Definition: mqttsrc.c:222
cb_mqtt_on_connect_failure
static void cb_mqtt_on_connect_failure(void *context, MQTTAsync_failureData *response)
A callback invoked when it is failed to connect to the broker.
Definition: mqttsrc.c:1223
_GstMQTTMessageHdr::base_time_epoch
gint64 base_time_epoch
Definition: mqttcommon.h:54
gst_mqtt_src_set_mqtt_qos
static void gst_mqtt_src_set_mqtt_qos(GstMqttSrc *self, const gint qos)
Setter for the 'mqtt-qos' property.
Definition: mqttsrc.c:1044
gst_mqtt_src_set_is_live
static void gst_mqtt_src_set_is_live(GstMqttSrc *self, const gboolean flag)
Setter for the 'is-live' property.
Definition: mqttsrc.c:892
_extract_mqtt_msg_hdr_from
static GstMQTTMessageHdr * _extract_mqtt_msg_hdr_from(GstMemory *mem, GstMemory **hdr_mem, GstMapInfo *hdr_map_info)
A utility function to extract header information from a received message.
Definition: mqttsrc.c:1347
GST_US_TO_NS_MULTIPLIER
#define GST_US_TO_NS_MULTIPLIER
Definition: mqttcommon.h:38
PROP_LAST
@ PROP_LAST
Definition: mqttsrc.c:55
mqttsrc.h
Subscribe a MQTT topic and push incoming data to the GStreamer pipeline.
src_client_id
static guint8 src_client_id
Definition: mqttsrc.c:69
TRUE
return TRUE
Definition: gsttensor_if.c:897
UNUSED
#define UNUSED(expr)
Definition: mqttcommon.h:19
src_pad_template
static GstStaticPadTemplate src_pad_template
Definition: mqttsrc.c:31
nnstreamer_util.h
Optional NNStreamer utility functions for sub-plugin writers and users.
G_DEFINE_TYPE
G_DEFINE_TYPE(GstMqttSrc, gst_mqtt_src, GST_TYPE_BASE_SRC)
gst_mqtt_src_get_sub_timeout
static gint64 gst_mqtt_src_get_sub_timeout(GstMqttSrc *self)
Getter for the 'sub-timeout' property.
Definition: mqttsrc.c:962
DEFAULT_MQTT_SUB_TIMEOUT_MIN
@ DEFAULT_MQTT_SUB_TIMEOUT_MIN
Definition: mqttsrc.c:65
_subscribe
static gboolean _subscribe(GstMqttSrc *self)
A helper function to properly invoke MQTTAsync_subscribe ()
Definition: mqttsrc.c:1308
_GstMQTTMessageHdr
Defined a custom data type, GstMQTTMessageHdr.
Definition: mqttcommon.h:49
cb_memory_wrapped_destroy
static void cb_memory_wrapped_destroy(void *p)
A callback invoked when destroying the GstMemory which wrapped the arrived message.
Definition: mqttsrc.c:1179
TAG_ERR_MQTTSRC
static const gchar TAG_ERR_MQTTSRC[]
Definition: mqttsrc.c:72
PROP_0
@ PROP_0
Definition: mqttsrc.c:42
gst_mqtt_src_is_seekable
static gboolean gst_mqtt_src_is_seekable(GstBaseSrc *basesrc)
Check if source supports seeking.
Definition: mqttsrc.c:700
_GstMQTTMessageHdr::dts
GstClockTime dts
Definition: mqttcommon.h:57
_GstMqttSrcClass
GstMqttSrcClass data structure.
Definition: mqttsrc.h:79
gst_mqtt_src_set_host_address
static void gst_mqtt_src_set_host_address(GstMqttSrc *self, const gchar *addr)
Setter for the 'host' property.
Definition: mqttsrc.c:930
PROP_IS_LIVE
@ PROP_IS_LIVE
Definition: mqttsrc.c:45
gst_mqtt_src_get_client_id
static gchar * gst_mqtt_src_get_client_id(GstMqttSrc *self)
Getter for the 'client-id' property.
Definition: mqttsrc.c:902
gst_mqtt_src_set_opt_keep_alive_interval
static void gst_mqtt_src_set_opt_keep_alive_interval(GstMqttSrc *self, const gint num)
Setter for the 'keep-alive-interval' property.
Definition: mqttsrc.c:1026
PROP_MQTT_OPT_CLEANSESSION
@ PROP_MQTT_OPT_CLEANSESSION
Definition: mqttsrc.c:51
_unsubscribe
static gboolean _unsubscribe(GstMqttSrc *self)
A wrapper function that calls MQTTAsync_unsubscribe ()
Definition: mqttsrc.c:1328
DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL
@ DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL
Definition: mqttsrc.c:63
gst_mqtt_src_set_host_port
static void gst_mqtt_src_set_host_port(GstMqttSrc *self, const gchar *port)
Setter for the 'port' property.
Definition: mqttsrc.c:952
PROP_MQTT_QOS
@ PROP_MQTT_QOS
Definition: mqttsrc.c:53
DEFAULT_MQTT_QOS
@ DEFAULT_MQTT_QOS
Definition: mqttsrc.c:66
_GstMQTTMessageHdr::pts
GstClockTime pts
Definition: mqttcommon.h:58
PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL
@ PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL
Definition: mqttsrc.c:52
gst_mqtt_src_init
static void gst_mqtt_src_init(GstMqttSrc *self)
Initialize GstMqttSrc object.
Definition: mqttsrc.c:169
gst_mqtt_src_class_finalize
static void gst_mqtt_src_class_finalize(GObject *object)
Finalize GstMqttSrcClass object.
Definition: mqttsrc.c:408
type
svtc_1 type
Definition: gsttensor_if.c:843
_GstMQTTMessageHdr::num_mems
guint num_mems
Definition: mqttcommon.h:52
gst_mqtt_src_set_sub_topic
static void gst_mqtt_src_set_sub_topic(GstMqttSrc *self, const gchar *topic)
Setter for the 'sub-topic' property.
Definition: mqttsrc.c:989
_GstMQTTMessageHdr::duration
GstClockTime duration
Definition: mqttcommon.h:56
DEFAULT_IS_LIVE
@ DEFAULT_IS_LIVE
Definition: mqttsrc.c:61
DEFAULT_MQTT_CLIENT_ID
static const gchar DEFAULT_MQTT_CLIENT_ID[]
Definition: mqttsrc.c:73
gst_mqtt_src_set_property
static void gst_mqtt_src_set_property(GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
The setter for the mqttsrc's properties.
Definition: mqttsrc.c:316
gst_mqtt_src_get_property
static void gst_mqtt_src_get_property(GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
The getter for the mqttsrc's properties.
Definition: mqttsrc.c:362
cb_mqtt_on_subscribe_failure
static void cb_mqtt_on_subscribe_failure(void *context, MQTTAsync_failureData *response)
MQTTAsync_responseOptions's onFailure callback for MQTTAsync_subscribe ()
Definition: mqttsrc.c:1257
gst_mqtt_src_stop
static gboolean gst_mqtt_src_stop(GstBaseSrc *basesrc)
Stop mqttsrc, called when state changed ready to null.
Definition: mqttsrc.c:583
DEFAULT_MQTT_SUB_TIMEOUT
@ DEFAULT_MQTT_SUB_TIMEOUT
Definition: mqttsrc.c:64
gst_mqtt_src_set_client_id
static void gst_mqtt_src_set_client_id(GstMqttSrc *self, const gchar *id)
Setter for the 'client-id' property.
Definition: mqttsrc.c:911