Doxygen Book
mqttsink.c
Go to the documentation of this file.
1 /* SPDX-License-Identifier: LGPL-2.1-only */
14 #ifdef HAVE_CONFIG_H
15 #include <config.h>
16 #endif
17 
18 #include <stdlib.h>
19 #include <string.h>
20 
21 #ifdef G_OS_WIN32
22 #include <process.h>
23 #else
24 #include <sys/types.h>
25 #include <unistd.h>
26 #endif
27 
28 #include <gst/base/gstbasesink.h>
29 #include <MQTTAsync.h>
30 #include <nnstreamer_util.h>
31 
32 #include "mqttsink.h"
33 #include "ntputil.h"
34 
35 static GstStaticPadTemplate sink_pad_template = GST_STATIC_PAD_TEMPLATE ("sink",
36  GST_PAD_SINK, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
37 
38 #define gst_mqtt_sink_parent_class parent_class
39 G_DEFINE_TYPE (GstMqttSink, gst_mqtt_sink, GST_TYPE_BASE_SINK);
40 
41 GST_DEBUG_CATEGORY_STATIC (gst_mqtt_sink_debug);
42 #define GST_CAT_DEFAULT gst_mqtt_sink_debug
43 
44 enum
45 {
47 
61 
63 };
64 
65 enum
66 {
73  DEFAULT_MQTT_DISCONNECT_TIMEOUT = G_TIME_SPAN_SECOND * 3, /* 3 secs */
75  DEFAULT_MAX_MSG_BUF_SIZE = 0, /* Buffer size is not fixed */
76  DEFAULT_MQTT_QOS = 0, /* fire and forget */
79 };
80 
81 static guint8 sink_client_id = 0;
82 static const gchar DEFAULT_MQTT_HOST_ADDRESS[] = "127.0.0.1";
83 static const gchar DEFAULT_MQTT_HOST_PORT[] = "1883";
84 static const gchar TAG_ERR_MQTTSINK[] = "ERROR: MQTTSink";
85 static const gchar DEFAULT_MQTT_CLIENT_ID[] = "$HOST_$PID_^[0-9][0-9]?$|^255$";
86 static const gchar DEFAULT_MQTT_CLIENT_ID_FORMAT[] = "%s_%u_sink%u";
87 static const gchar DEFAULT_MQTT_PUB_TOPIC[] = "$client-id/topic";
88 static const gchar DEFAULT_MQTT_PUB_TOPIC_FORMAT[] = "%s/topic";
89 static const gchar DEFAULT_MQTT_NTP_SERVERS[] = "pool.ntp.org:123";
90 
92 static void
93 gst_mqtt_sink_set_property (GObject * object, guint prop_id,
94  const GValue * value, GParamSpec * pspec);
95 static void
96 gst_mqtt_sink_get_property (GObject * object, guint prop_id,
97  GValue * value, GParamSpec * pspec);
98 static void gst_mqtt_sink_class_finalize (GObject * object);
99 
100 static GstStateChangeReturn
101 gst_mqtt_sink_change_state (GstElement * element, GstStateChange transition);
102 
103 static gboolean gst_mqtt_sink_start (GstBaseSink * basesink);
104 static gboolean gst_mqtt_sink_stop (GstBaseSink * basesink);
105 static gboolean gst_mqtt_sink_query (GstBaseSink * basesink, GstQuery * query);
106 static GstFlowReturn
107 gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * buffer);
108 static GstFlowReturn
109 gst_mqtt_sink_render_list (GstBaseSink * basesink, GstBufferList * list);
110 static gboolean gst_mqtt_sink_event (GstBaseSink * basesink, GstEvent * event);
111 static gboolean gst_mqtt_sink_set_caps (GstBaseSink * basesink, GstCaps * caps);
112 
113 static gboolean gst_mqtt_sink_get_debug (GstMqttSink * self);
114 static void gst_mqtt_sink_set_debug (GstMqttSink * self, const gboolean flag);
115 static gchar *gst_mqtt_sink_get_client_id (GstMqttSink * self);
116 static void gst_mqtt_sink_set_client_id (GstMqttSink * self, const gchar * id);
117 static gchar *gst_mqtt_sink_get_host_address (GstMqttSink * self);
118 static void gst_mqtt_sink_set_host_address (GstMqttSink * self,
119  const gchar * addr);
120 static gchar *gst_mqtt_sink_get_host_port (GstMqttSink * self);
121 static void gst_mqtt_sink_set_host_port (GstMqttSink * self,
122  const gchar * port);
123 static gchar *gst_mqtt_sink_get_pub_topic (GstMqttSink * self);
124 static void gst_mqtt_sink_set_pub_topic (GstMqttSink * self,
125  const gchar * topic);
126 static gulong gst_mqtt_sink_get_pub_wait_timeout (GstMqttSink * self);
128  const gulong to);
129 static gboolean gst_mqtt_sink_get_opt_cleansession (GstMqttSink * self);
131  const gboolean val);
134  const gint num);
135 
136 static gsize gst_mqtt_sink_get_max_msg_buf_size (GstMqttSink * self);
138  const gsize size);
139 static gint gst_mqtt_sink_get_num_buffers (GstMqttSink * self);
140 static void gst_mqtt_sink_set_num_buffers (GstMqttSink * self, const gint num);
141 static gint gst_mqtt_sink_get_mqtt_qos (GstMqttSink * self);
142 static void gst_mqtt_sink_set_mqtt_qos (GstMqttSink * self, const gint qos);
143 static gboolean gst_mqtt_sink_get_mqtt_ntp_sync (GstMqttSink * self);
144 static void gst_mqtt_sink_set_mqtt_ntp_sync (GstMqttSink * self,
145  const gboolean flag);
146 static gchar *gst_mqtt_sink_get_mqtt_ntp_srvs (GstMqttSink * self);
147 static void gst_mqtt_sink_set_mqtt_ntp_srvs (GstMqttSink * self,
148  const gchar * pairs);
149 
150 static void cb_mqtt_on_connect (void *context,
151  MQTTAsync_successData * response);
152 static void cb_mqtt_on_connect_failure (void *context,
153  MQTTAsync_failureData * response);
154 static void cb_mqtt_on_disconnect (void *context,
155  MQTTAsync_successData * response);
156 static void cb_mqtt_on_disconnect_failure (void *context,
157  MQTTAsync_failureData * response);
158 static void cb_mqtt_on_delivery_complete (void *context, MQTTAsync_token token);
159 static void cb_mqtt_on_connection_lost (void *context, char *cause);
160 static int cb_mqtt_on_message_arrived (void *context, char *topicName,
161  int topicLen, MQTTAsync_message * message);
162 static void cb_mqtt_on_send_success (void *context,
163  MQTTAsync_successData * response);
164 static void cb_mqtt_on_send_failure (void *context,
165  MQTTAsync_failureData * response);
166 
170 static void
172 {
173  GstBaseSink *basesink = GST_BASE_SINK (self);
174  MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
175  MQTTAsync_responseOptions respn_opts = MQTTAsync_responseOptions_initializer;
176 
178  self->mqtt_client_handle = NULL;
179  self->mqtt_conn_opts = conn_opts;
180  self->mqtt_conn_opts.onSuccess = cb_mqtt_on_connect;
181  self->mqtt_conn_opts.onFailure = cb_mqtt_on_connect_failure;
182  self->mqtt_conn_opts.context = self;
183  self->mqtt_respn_opts = respn_opts;
184  self->mqtt_respn_opts.onSuccess = cb_mqtt_on_send_success;
185  self->mqtt_respn_opts.onFailure = cb_mqtt_on_send_failure;
186  self->mqtt_respn_opts.context = self;
187 
189  self->mqtt_sink_state = SINK_INITIALIZING;
190  self->err = NULL;
191  self->gquark_err_tag = g_quark_from_string (TAG_ERR_MQTTSINK);
192  g_mutex_init (&self->mqtt_sink_mutex);
193  g_cond_init (&self->mqtt_sink_gcond);
194  self->mqtt_msg_buf = NULL;
195  self->mqtt_msg_buf_size = 0;
196  memset (&self->mqtt_msg_hdr, 0x0, sizeof (self->mqtt_msg_hdr));
197  self->base_time_epoch = GST_CLOCK_TIME_NONE;
198  self->in_caps = NULL;
199 
201  self->debug = DEFAULT_DEBUG;
202  self->num_buffers = DEFAULT_NUM_BUFFERS;
203  self->max_msg_buf_size = DEFAULT_MAX_MSG_BUF_SIZE;
204  self->mqtt_client_id = g_strdup (DEFAULT_MQTT_CLIENT_ID);
205  self->mqtt_host_address = g_strdup (DEFAULT_MQTT_HOST_ADDRESS);
206  self->mqtt_host_port = g_strdup (DEFAULT_MQTT_HOST_PORT);
207  self->mqtt_topic = g_strdup (DEFAULT_MQTT_PUB_TOPIC);
208  self->mqtt_pub_wait_timeout = DEFAULT_MQTT_PUB_WAIT_TIMEOUT;
209  self->mqtt_conn_opts.cleansession = DEFAULT_MQTT_OPT_CLEANSESSION;
210  self->mqtt_conn_opts.keepAliveInterval = DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL;
211  self->mqtt_qos = DEFAULT_MQTT_QOS;
212  self->mqtt_ntp_sync = DEFAULT_MQTT_NTP_SYNC;
213  self->mqtt_ntp_srvs = g_strdup (DEFAULT_MQTT_NTP_SERVERS);
214  self->mqtt_ntp_hnames = NULL;
215  self->mqtt_ntp_ports = NULL;
216  self->mqtt_ntp_num_srvs = 0;
217  self->get_epoch_func = default_mqtt_get_unix_epoch;
218  self->is_connected = FALSE;
219 
221  gst_base_sink_set_qos_enabled (basesink, DEFAULT_QOS);
222  gst_base_sink_set_sync (basesink, DEFAULT_SYNC);
223 }
224 
228 static void
230 {
231  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
232  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
233  GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
234 
235  GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_MQTT_ELEM_NAME_SINK, 0,
236  "MQTT sink");
237 
238  gobject_class->set_property = gst_mqtt_sink_set_property;
239  gobject_class->get_property = gst_mqtt_sink_get_property;
240  gobject_class->finalize = gst_mqtt_sink_class_finalize;
241 
242  g_object_class_install_property (gobject_class, PROP_DEBUG,
243  g_param_spec_boolean ("debug", "Debug",
244  "Produce extra verbose output for debug purpose", DEFAULT_DEBUG,
245  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
246 
247  g_object_class_install_property (gobject_class, PROP_MQTT_CLIENT_ID,
248  g_param_spec_string ("client-id", "Client ID",
249  "The client identifier passed to the server (broker).", NULL,
250  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
251 
252  g_object_class_install_property (gobject_class, PROP_MQTT_HOST_ADDRESS,
253  g_param_spec_string ("host", "Host", "Host (broker) to connect to",
255  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
256 
257  g_object_class_install_property (gobject_class, PROP_MQTT_HOST_PORT,
258  g_param_spec_string ("port", "Port",
259  "Network port of host (broker) to connect to", DEFAULT_MQTT_HOST_PORT,
260  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
261 
262  g_object_class_install_property (gobject_class, PROP_MQTT_NTP_SYNC,
263  g_param_spec_boolean ("ntp-sync", "NTP Synchronization",
264  "Synchronize received streams to the NTP clock",
265  DEFAULT_MQTT_NTP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266 
267  g_object_class_install_property (gobject_class, PROP_MQTT_NTP_SRVS,
268  g_param_spec_string ("ntp-srvs", "NTP Server Host Name and Port Pairs",
269  "NTP Servers' HOST_NAME:PORT pairs to use (valid only if ntp-sync is true)\n"
270  "\t\t\tUse ',' to separate each pair if there are more pairs than one",
272  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
273 
274  g_object_class_install_property (gobject_class, PROP_MQTT_PUB_TOPIC,
275  g_param_spec_string ("pub-topic", "Topic to Publish",
276  "The topic's name to publish", NULL,
277  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
278 
279  g_object_class_install_property (gobject_class,
281  g_param_spec_ulong ("pub-wait-timeout", "Timeout for Publish a message",
282  "Timeout for execution of the main thread with completed publication of a message",
283  1UL, G_MAXULONG, DEFAULT_MQTT_PUB_WAIT_TIMEOUT,
284  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
285 
286  g_object_class_install_property (gobject_class, PROP_MQTT_OPT_CLEANSESSION,
287  g_param_spec_boolean ("cleansession", "Cleansession",
288  "When it is TRUE, the state information is discarded at connect and disconnect.",
290  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
291 
292  g_object_class_install_property (gobject_class,
294  g_param_spec_int ("keep-alive-interval", "Keep Alive Interval",
295  "The maximum time (in seconds) that should pass without communication between the client and the server (broker)",
297  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
298 
299  g_object_class_install_property (gobject_class, PROP_MAX_MSG_BUF_SIZE,
300  g_param_spec_ulong ("max-buffer-size",
301  "The maximum size of a message buffer",
302  "The maximum size in bytes of a message buffer (0 = dynamic buffer size)",
303  0, G_MAXULONG, DEFAULT_MAX_MSG_BUF_SIZE,
304  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
305 
306  g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
307  g_param_spec_int ("num-buffers", "Num Buffers",
308  "Number of (remaining) buffers to accept until sending EOS event (-1 = no limit)",
309  -1, G_MAXINT32, DEFAULT_NUM_BUFFERS,
310  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
311 
312  g_object_class_install_property (gobject_class, PROP_MQTT_QOS,
313  g_param_spec_int ("mqtt-qos", "mqtt QoS level",
314  "The QoS level of MQTT.\n"
315  "\t\t\t 0: At most once\n"
316  "\t\t\t 1: At least once\n"
317  "\t\t\t 2: Exactly once\n"
318  "\t\t\tsee also: https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/qos.html",
319  0, 2, DEFAULT_MQTT_QOS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
320 
321  gstelement_class->change_state = gst_mqtt_sink_change_state;
322 
323  gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_mqtt_sink_start);
324  gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_mqtt_sink_stop);
325  gstbasesink_class->query = GST_DEBUG_FUNCPTR (gst_mqtt_sink_query);
326  gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_mqtt_sink_render);
327  gstbasesink_class->render_list =
328  GST_DEBUG_FUNCPTR (gst_mqtt_sink_render_list);
329  gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_mqtt_sink_event);
330  gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_mqtt_sink_set_caps);
331 
332  gst_element_class_set_static_metadata (gstelement_class,
333  "MQTT sink", "Sink/MQTT",
334  "Publish incoming data streams as a MQTT topic",
335  "Wook Song <wook16.song@samsung.com>");
336  gst_element_class_add_static_pad_template (gstelement_class,
338 }
339 
343 static void
344 gst_mqtt_sink_set_property (GObject * object, guint prop_id,
345  const GValue * value, GParamSpec * pspec)
346 {
347  GstMqttSink *self = GST_MQTT_SINK (object);
348 
349  switch (prop_id) {
350  case PROP_DEBUG:
351  gst_mqtt_sink_set_debug (self, g_value_get_boolean (value));
352  break;
353  case PROP_MQTT_CLIENT_ID:
354  gst_mqtt_sink_set_client_id (self, g_value_get_string (value));
355  break;
357  gst_mqtt_sink_set_host_address (self, g_value_get_string (value));
358  break;
359  case PROP_MQTT_HOST_PORT:
360  gst_mqtt_sink_set_host_port (self, g_value_get_string (value));
361  break;
362  case PROP_MQTT_PUB_TOPIC:
363  gst_mqtt_sink_set_pub_topic (self, g_value_get_string (value));
364  break;
366  gst_mqtt_sink_set_pub_wait_timeout (self, g_value_get_ulong (value));
367  break;
369  gst_mqtt_sink_set_opt_cleansession (self, g_value_get_boolean (value));
370  break;
372  gst_mqtt_sink_set_opt_keep_alive_interval (self, g_value_get_int (value));
373  break;
375  gst_mqtt_sink_set_max_msg_buf_size (self, g_value_get_ulong (value));
376  break;
377  case PROP_NUM_BUFFERS:
378  gst_mqtt_sink_set_num_buffers (self, g_value_get_int (value));
379  break;
380  case PROP_MQTT_QOS:
381  gst_mqtt_sink_set_mqtt_qos (self, g_value_get_int (value));
382  break;
383  case PROP_MQTT_NTP_SYNC:
384  gst_mqtt_sink_set_mqtt_ntp_sync (self, g_value_get_boolean (value));
385  break;
386  case PROP_MQTT_NTP_SRVS:
387  gst_mqtt_sink_set_mqtt_ntp_srvs (self, g_value_get_string (value));
388  break;
389  default:
390  G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
391  break;
392  }
393 }
394 
398 static void
399 gst_mqtt_sink_get_property (GObject * object, guint prop_id,
400  GValue * value, GParamSpec * pspec)
401 {
402  GstMqttSink *self = GST_MQTT_SINK (object);
403 
404  switch (prop_id) {
405  case PROP_DEBUG:
406  g_value_set_boolean (value, gst_mqtt_sink_get_debug (self));
407  break;
408  case PROP_MQTT_CLIENT_ID:
410  break;
413  break;
414  case PROP_MQTT_HOST_PORT:
416  break;
417  case PROP_MQTT_PUB_TOPIC:
419  break;
421  g_value_set_ulong (value, gst_mqtt_sink_get_pub_wait_timeout (self));
422  break;
424  g_value_set_boolean (value, gst_mqtt_sink_get_opt_cleansession (self));
425  break;
427  g_value_set_int (value, gst_mqtt_sink_get_opt_keep_alive_interval (self));
428  break;
430  g_value_set_ulong (value, gst_mqtt_sink_get_max_msg_buf_size (self));
431  break;
432  case PROP_NUM_BUFFERS:
433  g_value_set_int (value, gst_mqtt_sink_get_num_buffers (self));
434  break;
435  case PROP_MQTT_QOS:
436  g_value_set_int (value, gst_mqtt_sink_get_mqtt_qos (self));
437  break;
438  case PROP_MQTT_NTP_SYNC:
439  g_value_set_boolean (value, gst_mqtt_sink_get_mqtt_ntp_sync (self));
440  break;
441  case PROP_MQTT_NTP_SRVS:
443  break;
444  default:
445  G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
446  break;
447  }
448 }
449 
453 static void
455 {
456  GstMqttSink *self = GST_MQTT_SINK (object);
457 
458  g_free (self->mqtt_host_address);
459  self->mqtt_host_address = NULL;
460  g_free (self->mqtt_host_port);
461  self->mqtt_host_port = NULL;
462  if (self->mqtt_client_handle) {
463  MQTTAsync_destroy (&self->mqtt_client_handle);
464  self->mqtt_client_handle = NULL;
465  }
466  g_free (self->mqtt_client_id);
467  self->mqtt_client_id = NULL;
468  g_free (self->mqtt_msg_buf);
469  self->mqtt_msg_buf = NULL;
470  g_free (self->mqtt_topic);
471  self->mqtt_topic = NULL;
472  gst_caps_replace (&self->in_caps, NULL);
473  g_free (self->mqtt_ntp_srvs);
474  self->mqtt_ntp_srvs = NULL;
475  self->mqtt_ntp_num_srvs = 0;
476  g_strfreev (self->mqtt_ntp_hnames);
477  self->mqtt_ntp_hnames = NULL;
478  g_free (self->mqtt_ntp_ports);
479  self->mqtt_ntp_ports = NULL;
480 
481  if (self->err)
482  g_error_free (self->err);
483  g_mutex_clear (&self->mqtt_sink_mutex);
484  G_OBJECT_CLASS (parent_class)->finalize (object);
485 }
486 
490 static GstStateChangeReturn
491 gst_mqtt_sink_change_state (GstElement * element, GstStateChange transition)
492 {
493  GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
494  GstMqttSink *self = GST_MQTT_SINK (element);
495  GstClock *elem_clock;
496  GstClockTime base_time;
497  GstClockTime cur_time;
498  GstClockTimeDiff diff;
499 
500  switch (transition) {
501  case GST_STATE_CHANGE_NULL_TO_READY:
502  GST_INFO_OBJECT (self, "GST_STATE_CHANGE_NULL_TO_READY");
503  if (self->err) {
504  g_printerr ("%s: %s\n", g_quark_to_string (self->err->domain),
505  self->err->message);
506  return GST_STATE_CHANGE_FAILURE;
507  }
508  break;
509  case GST_STATE_CHANGE_READY_TO_PAUSED:
510  GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_PAUSED");
511  break;
512  case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
513  if (self->mqtt_ntp_sync)
514  self->get_epoch_func = ntputil_get_epoch;
515  self->base_time_epoch = GST_CLOCK_TIME_NONE;
516  elem_clock = gst_element_get_clock (element);
517  if (!elem_clock)
518  break;
519  base_time = gst_element_get_base_time (element);
520  cur_time = gst_clock_get_time (elem_clock);
521  gst_object_unref (elem_clock);
522  diff = GST_CLOCK_DIFF (base_time, cur_time);
523  self->base_time_epoch =
524  self->get_epoch_func (self->mqtt_ntp_num_srvs, self->mqtt_ntp_hnames,
525  self->mqtt_ntp_ports) * GST_US_TO_NS_MULTIPLIER - diff;
526  GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_PLAYING");
527  break;
528  default:
529  break;
530  }
531 
532  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
533 
534  switch (transition) {
535  case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
536  GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PLAYING_TO_PAUSED");
537  break;
538  case GST_STATE_CHANGE_PAUSED_TO_READY:
539  GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_READY");
540  break;
541  case GST_STATE_CHANGE_READY_TO_NULL:
542  GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_NULL");
543  default:
544  break;
545  }
546 
547  return ret;
548 }
549 
553 static gboolean
554 gst_mqtt_sink_start (GstBaseSink * basesink)
555 {
556  GstMqttSink *self = GST_MQTT_SINK (basesink);
557  gchar *haddr = g_strdup_printf ("%s:%s", self->mqtt_host_address,
558  self->mqtt_host_port);
559  int ret;
560  gint64 end_time;
561 
562  if (!g_strcmp0 (DEFAULT_MQTT_CLIENT_ID, self->mqtt_client_id)) {
563  g_free (self->mqtt_client_id);
564  self->mqtt_client_id = g_strdup_printf (DEFAULT_MQTT_CLIENT_ID_FORMAT,
565  g_get_host_name (), getpid (), sink_client_id++);
566  }
567 
568  if (!g_strcmp0 (DEFAULT_MQTT_PUB_TOPIC, self->mqtt_topic)) {
569  self->mqtt_topic = g_strdup_printf (DEFAULT_MQTT_PUB_TOPIC_FORMAT,
570  self->mqtt_client_id);
571  }
572 
581  ret = MQTTAsync_create (&self->mqtt_client_handle, haddr,
582  self->mqtt_client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
583  g_free (haddr);
584  if (ret != MQTTASYNC_SUCCESS)
585  return FALSE;
586 
587  MQTTAsync_setCallbacks (self->mqtt_client_handle, self,
590 
591  ret = MQTTAsync_connect (self->mqtt_client_handle, &self->mqtt_conn_opts);
592  if (ret != MQTTASYNC_SUCCESS) {
593  goto error;
594  }
595 
596  /* Waiting for the connection */
597  end_time = g_get_monotonic_time () +
598  DEFAULT_MQTT_CONN_TIMEOUT_SEC * G_TIME_SPAN_SECOND;
599  g_mutex_lock (&self->mqtt_sink_mutex);
600  while (!self->is_connected) {
601  if (!g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
602  end_time)) {
603  g_mutex_unlock (&self->mqtt_sink_mutex);
604  g_critical ("Failed to connect to MQTT broker from mqttsink."
605  "Please check broker is running status or broker host address.");
606  goto error;
607  }
608  }
609  g_mutex_unlock (&self->mqtt_sink_mutex);
610 
611  return TRUE;
612 
613 error:
614  MQTTAsync_destroy (&self->mqtt_client_handle);
615  self->mqtt_client_handle = NULL;
616  return FALSE;
617 }
618 
622 static gboolean
623 gst_mqtt_sink_stop (GstBaseSink * basesink)
624 {
625  GstMqttSink *self = GST_MQTT_SINK (basesink);
626  MQTTAsync_disconnectOptions disconn_opts =
627  MQTTAsync_disconnectOptions_initializer;
628 
629  disconn_opts.timeout = DEFAULT_MQTT_DISCONNECT_TIMEOUT;
630  disconn_opts.onSuccess = cb_mqtt_on_disconnect;
631  disconn_opts.onFailure = cb_mqtt_on_disconnect_failure;
632  disconn_opts.context = self;
633 
634  g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_STOPPED);
635  while (MQTTAsync_isConnected (self->mqtt_client_handle)) {
636  gint64 end_time = g_get_monotonic_time () + DEFAULT_MQTT_DISCONNECT_TIMEOUT;
637  mqtt_sink_state_t cur_state;
638 
639  MQTTAsync_disconnect (self->mqtt_client_handle, &disconn_opts);
640  g_mutex_lock (&self->mqtt_sink_mutex);
641  self->is_connected = FALSE;
642  g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
643  end_time);
644  g_mutex_unlock (&self->mqtt_sink_mutex);
645  cur_state = g_atomic_int_get (&self->mqtt_sink_state);
646 
647  if ((cur_state == MQTT_DISCONNECTED) ||
648  (cur_state == MQTT_DISCONNECT_FAILED) ||
649  (cur_state == SINK_RENDER_EOS) || (cur_state == SINK_RENDER_ERROR))
650  break;
651  }
652  MQTTAsync_destroy (&self->mqtt_client_handle);
653  self->mqtt_client_handle = NULL;
654  return TRUE;
655 }
656 
660 static gboolean
661 gst_mqtt_sink_query (GstBaseSink * basesink, GstQuery * query)
662 {
663  gboolean ret = FALSE;
664 
665  switch (GST_QUERY_TYPE (query)) {
666  case GST_QUERY_SEEKING:{
667  GstFormat fmt;
668 
669  /* GST_QUERY_SEEKING is not supported */
670  gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
671  gst_query_set_seeking (query, fmt, FALSE, 0, -1);
672  ret = TRUE;
673  break;
674  }
675  default:{
676  ret = GST_BASE_SINK_CLASS (parent_class)->query (basesink, query);
677  break;
678  }
679  }
680 
681  return ret;
682 }
683 
687 static void
688 _put_timestamp_to_msg_buf_hdr (GstMqttSink * self, GstBuffer * gst_buf,
689  GstMQTTMessageHdr * hdr)
690 {
691  hdr->base_time_epoch = self->base_time_epoch;
692  hdr->sent_time_epoch = self->get_epoch_func (self->mqtt_ntp_num_srvs,
693  self->mqtt_ntp_hnames, self->mqtt_ntp_ports) * GST_US_TO_NS_MULTIPLIER;
694 
695  hdr->duration = GST_BUFFER_DURATION_IS_VALID (gst_buf) ?
696  GST_BUFFER_DURATION (gst_buf) : GST_CLOCK_TIME_NONE;
697 
698  hdr->dts = GST_BUFFER_DTS_IS_VALID (gst_buf) ?
699  GST_BUFFER_DTS (gst_buf) : GST_CLOCK_TIME_NONE;
700 
701  hdr->pts = GST_BUFFER_PTS_IS_VALID (gst_buf) ?
702  GST_BUFFER_PTS (gst_buf) : GST_CLOCK_TIME_NONE;
703 
704  if (self->debug) {
705  GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
706  GstClock *clock;
707 
708  clock = gst_element_get_clock (GST_ELEMENT (self));
709 
710  GST_DEBUG_OBJECT (self,
711  "%s now %" GST_TIME_FORMAT " ts %" GST_TIME_FORMAT " sent %"
712  GST_TIME_FORMAT, self->mqtt_topic,
713  GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
714  GST_TIME_ARGS (hdr->pts),
715  GST_TIME_ARGS (hdr->sent_time_epoch - hdr->base_time_epoch));
716 
717  gst_object_unref (clock);
718  }
719 }
720 
724 static gboolean
725 _mqtt_set_msg_buf_hdr (GstBuffer * gst_buf, GstMQTTMessageHdr * hdr)
726 {
727  gboolean ret = TRUE;
728  guint i;
729 
730  hdr->num_mems = gst_buffer_n_memory (gst_buf);
731  for (i = 0; i < hdr->num_mems; ++i) {
732  GstMemory *each_mem;
733 
734  each_mem = gst_buffer_peek_memory (gst_buf, i);
735  if (!each_mem) {
736  memset (hdr, 0x0, sizeof (*hdr));
737  ret = FALSE;
738  break;
739  }
740 
741  hdr->size_mems[i] = each_mem->size;
742  }
743 
744  return ret;
745 }
746 
750 static GstFlowReturn
751 gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * in_buf)
752 {
753  const gsize in_buf_size = gst_buffer_get_size (in_buf);
754  static gboolean is_static_sized_buf = FALSE;
755  GstMqttSink *self = GST_MQTT_SINK (basesink);
756  GstFlowReturn ret = GST_FLOW_ERROR;
757  mqtt_sink_state_t cur_state;
758  GstMemory *in_buf_mem;
759  GstMapInfo in_buf_map;
760  gint mqtt_rc;
761  guint8 *msg_pub;
762 
763  while ((cur_state =
764  g_atomic_int_get (&self->mqtt_sink_state)) != MQTT_CONNECTED) {
765  gint64 end_time = g_get_monotonic_time ();
766  mqtt_sink_state_t _state;
767 
768  end_time += (self->mqtt_pub_wait_timeout * G_TIME_SPAN_SECOND);
769  g_mutex_lock (&self->mqtt_sink_mutex);
770  g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
771  end_time);
772  g_mutex_unlock (&self->mqtt_sink_mutex);
773 
774  _state = g_atomic_int_get (&self->mqtt_sink_state);
775  switch (_state) {
777  case MQTT_DISCONNECTED:
779  case SINK_RENDER_ERROR:
780  ret = GST_FLOW_ERROR;
781  break;
782  case SINK_RENDER_EOS:
783  ret = GST_FLOW_EOS;
784  break;
785  default:
786  continue;
787  }
788  goto ret_with;
789  }
790 
791  if (self->num_buffers == 0) {
792  ret = GST_FLOW_EOS;
793  goto ret_with;
794  }
795 
796  if (self->num_buffers != -1) {
797  self->num_buffers -= 1;
798  }
799 
800  if ((!is_static_sized_buf) && (self->mqtt_msg_buf) &&
801  (self->mqtt_msg_buf_size != 0) &&
802  (self->mqtt_msg_buf_size < in_buf_size + GST_MQTT_LEN_MSG_HDR)) {
803  g_free (self->mqtt_msg_buf);
804  self->mqtt_msg_buf = NULL;
805  self->mqtt_msg_buf_size = 0;
806  }
807 
809  if ((!self->mqtt_msg_buf) && (self->mqtt_msg_buf_size == 0)) {
810  if (self->max_msg_buf_size == 0) {
811  self->mqtt_msg_buf_size = in_buf_size + GST_MQTT_LEN_MSG_HDR;
812  } else {
813  if (self->max_msg_buf_size < in_buf_size) {
814  g_printerr ("%s: The given size for a message buffer is too small: "
815  "given (%" G_GSIZE_FORMAT " bytes) vs. incoming (%" G_GSIZE_FORMAT
816  " bytes)\n", TAG_ERR_MQTTSINK, self->max_msg_buf_size, in_buf_size);
817  ret = GST_FLOW_ERROR;
818  goto ret_with;
819  }
820  self->mqtt_msg_buf_size = self->max_msg_buf_size + GST_MQTT_LEN_MSG_HDR;
821  is_static_sized_buf = TRUE;
822  }
823 
824  self->mqtt_msg_buf = g_try_malloc0 (self->mqtt_msg_buf_size);
825  }
826 
827  if (!_mqtt_set_msg_buf_hdr (in_buf, &self->mqtt_msg_hdr)) {
828  ret = GST_FLOW_ERROR;
829  goto ret_with;
830  }
831 
832  msg_pub = self->mqtt_msg_buf;
833  if (!msg_pub) {
834  self->mqtt_msg_buf_size = 0;
835  ret = GST_FLOW_ERROR;
836  goto ret_with;
837  }
838  memcpy (msg_pub, &self->mqtt_msg_hdr, sizeof (self->mqtt_msg_hdr));
839  _put_timestamp_to_msg_buf_hdr (self, in_buf, (GstMQTTMessageHdr *) msg_pub);
840 
841  in_buf_mem = gst_buffer_get_all_memory (in_buf);
842  if (!in_buf_mem) {
843  ret = GST_FLOW_ERROR;
844  goto ret_with;
845  }
846 
847  if (!gst_memory_map (in_buf_mem, &in_buf_map, GST_MAP_READ)) {
848  ret = GST_FLOW_ERROR;
849  goto ret_unref_in_buf_mem;
850  }
851 
852  ret = GST_FLOW_OK;
853 
854  memcpy (&msg_pub[sizeof (self->mqtt_msg_hdr)], in_buf_map.data,
855  in_buf_map.size);
856  mqtt_rc = MQTTAsync_send (self->mqtt_client_handle, self->mqtt_topic,
857  GST_MQTT_LEN_MSG_HDR + in_buf_map.size, self->mqtt_msg_buf,
858  self->mqtt_qos, 1, &self->mqtt_respn_opts);
859  if (mqtt_rc != MQTTASYNC_SUCCESS) {
860  ret = GST_FLOW_ERROR;
861  }
862 
863  gst_memory_unmap (in_buf_mem, &in_buf_map);
864 
865 ret_unref_in_buf_mem:
866  gst_memory_unref (in_buf_mem);
867 
868 ret_with:
869  return ret;
870 }
871 
876 static GstFlowReturn
877 gst_mqtt_sink_render_list (GstBaseSink * basesink, GstBufferList * list)
878 {
879  guint num_buffers = gst_buffer_list_length (list);
880  GstFlowReturn ret;
881  GstBuffer *buffer;
882  guint i;
883 
884  for (i = 0; i < num_buffers; ++i) {
885  buffer = gst_buffer_list_get (list, i);
886  ret = gst_mqtt_sink_render (basesink, buffer);
887  if (ret != GST_FLOW_OK)
888  break;
889  }
890 
891  return ret;
892 }
893 
897 static gboolean
898 gst_mqtt_sink_event (GstBaseSink * basesink, GstEvent * event)
899 {
900  GstMqttSink *self = GST_MQTT_SINK (basesink);
901  GstEventType type = GST_EVENT_TYPE (event);
902  gboolean ret = FALSE;
903 
904  switch (type) {
905  case GST_EVENT_EOS:
906  g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_EOS);
907  g_mutex_lock (&self->mqtt_sink_mutex);
908  g_cond_broadcast (&self->mqtt_sink_gcond);
909  g_mutex_unlock (&self->mqtt_sink_mutex);
910  break;
911  default:
912  break;
913  }
914 
915  ret = GST_BASE_SINK_CLASS (parent_class)->event (basesink, event);
916 
917  return ret;
918 }
919 
923 static gboolean
924 gst_mqtt_sink_set_caps (GstBaseSink * basesink, GstCaps * caps)
925 {
926  GstMqttSink *self = GST_MQTT_SINK (basesink);
927  gboolean ret;
928 
929  ret = gst_caps_replace (&self->in_caps, caps);
930 
931  if (ret && gst_caps_is_fixed (self->in_caps)) {
932  gchar *caps_str = gst_caps_to_string (caps);
933  gsize len;
934 
935  if (caps_str == NULL) {
936  g_critical ("Fail to convert caps to string representation");
937  return FALSE;
938  }
939 
940  len = g_strlcpy (self->mqtt_msg_hdr.gst_caps_str, caps_str,
942 
943  if (len >= GST_MQTT_MAX_LEN_GST_CAPS_STR) {
944  g_critical ("Fail to copy caps_str.");
945  ret = FALSE;
946  }
947 
948  g_free (caps_str);
949  }
950 
951  return ret;
952 }
953 
957 static gboolean
959 {
960  return self->debug;
961 }
962 
966 static void
967 gst_mqtt_sink_set_debug (GstMqttSink * self, const gboolean flag)
968 {
969  self->debug = flag;
970 }
971 
975 static gchar *
977 {
978  return self->mqtt_client_id;
979 }
980 
984 static void
985 gst_mqtt_sink_set_client_id (GstMqttSink * self, const gchar * id)
986 {
987  g_free (self->mqtt_client_id);
988  self->mqtt_client_id = g_strdup (id);
989 }
990 
994 static gchar *
996 {
997  return self->mqtt_host_address;
998 }
999 
1003 static void
1004 gst_mqtt_sink_set_host_address (GstMqttSink * self, const gchar * addr)
1005 {
1009  g_free (self->mqtt_host_address);
1010  self->mqtt_host_address = g_strdup (addr);
1011 }
1012 
1016 static gchar *
1018 {
1019  return self->mqtt_host_port;
1020 }
1021 
1025 static void
1026 gst_mqtt_sink_set_host_port (GstMqttSink * self, const gchar * port)
1027 {
1028  g_free (self->mqtt_host_port);
1029  self->mqtt_host_port = g_strdup (port);
1030 }
1031 
1035 static gchar *
1037 {
1038  return self->mqtt_topic;
1039 }
1040 
1044 static void
1045 gst_mqtt_sink_set_pub_topic (GstMqttSink * self, const gchar * topic)
1046 {
1047  g_free (self->mqtt_topic);
1048  self->mqtt_topic = g_strdup (topic);
1049 }
1050 
1054 static gboolean
1056 {
1057  return self->mqtt_conn_opts.cleansession;
1058 }
1059 
1063 static void
1065 {
1066  self->mqtt_conn_opts.cleansession = val;
1067 }
1068 
1072 static gulong
1074 {
1075  return self->mqtt_pub_wait_timeout;
1076 }
1077 
1081 static void
1083 {
1084  self->mqtt_pub_wait_timeout = to;
1085 }
1086 
1090 static gint
1092 {
1093  return self->mqtt_conn_opts.keepAliveInterval;
1094 }
1095 
1099 static void
1101 {
1102  self->mqtt_conn_opts.keepAliveInterval = num;
1103 }
1104 
1108 static gsize
1110 {
1111  return self->max_msg_buf_size;
1112 }
1113 
1117 static void
1119 {
1120  self->max_msg_buf_size = size;
1121 }
1122 
1126 static gint
1128 {
1129  gint num_buffers;
1130 
1131  num_buffers = self->num_buffers;
1132 
1133  return num_buffers;
1134 }
1135 
1139 static void
1141 {
1142  self->num_buffers = num;
1143 }
1144 
1148 static gint
1150 {
1151  return self->mqtt_qos;
1152 }
1153 
1157 static void
1158 gst_mqtt_sink_set_mqtt_qos (GstMqttSink * self, const gint qos)
1159 {
1160  self->mqtt_qos = qos;
1161 }
1162 
1166 static gboolean
1168 {
1169  return self->mqtt_ntp_sync;
1170 }
1171 
1175 static void
1176 gst_mqtt_sink_set_mqtt_ntp_sync (GstMqttSink * self, const gboolean flag)
1177 {
1178  self->mqtt_ntp_sync = flag;
1179 }
1180 
1184 static gchar *
1186 {
1187  return self->mqtt_ntp_srvs;
1188 }
1189 
1193 static void
1194 gst_mqtt_sink_set_mqtt_ntp_srvs (GstMqttSink * self, const gchar * pairs)
1195 {
1196  gchar **pair_arrs = NULL;
1197  guint hnum = 0;
1198  gchar *pair;
1199  guint i, j;
1200 
1201  if (g_strcmp0 (self->mqtt_ntp_srvs, pairs) == 0)
1202  return;
1203 
1204  g_free (self->mqtt_ntp_srvs);
1205  self->mqtt_ntp_srvs = g_strdup (pairs);
1206 
1207  pair_arrs = g_strsplit (pairs, ",", -1);
1208  if (pair_arrs == NULL)
1209  return;
1210 
1211  hnum = g_strv_length (pair_arrs);
1212  if (hnum == 0)
1213  goto err_free_pair_arrs;
1214 
1215  g_free (self->mqtt_ntp_hnames);
1216  self->mqtt_ntp_hnames = g_try_malloc0 ((hnum + 1) * sizeof (gchar *));
1217  if (!self->mqtt_ntp_hnames)
1218  goto err_free_pair_arrs;
1219 
1220  g_free (self->mqtt_ntp_ports);
1221  self->mqtt_ntp_ports = g_try_malloc0 (hnum * sizeof (guint16));
1222  if (!self->mqtt_ntp_ports)
1223  goto err_free_mqtt_ntp_hnames;
1224 
1225  self->mqtt_ntp_num_srvs = hnum;
1226  for (i = 0, j = 0; i < hnum; i++) {
1227  gchar **hname_port;
1228  gchar *hname;
1229  gchar *eport;
1230  gulong port_ul;
1231 
1232  pair = pair_arrs[i];
1233  hname_port = g_strsplit (pair, ":", 2);
1234  hname = hname_port[0];
1235  port_ul = strtoul (hname_port[1], &eport, 10);
1236  if ((port_ul == 0) || (port_ul > UINT16_MAX)) {
1237  self->mqtt_ntp_num_srvs--;
1238  } else {
1239  self->mqtt_ntp_hnames[j] = g_strdup (hname);
1240  self->mqtt_ntp_ports[j] = (uint16_t) port_ul;
1241  ++j;
1242  }
1243 
1244  g_strfreev (hname_port);
1245  }
1246 
1247  g_strfreev (pair_arrs);
1248  return;
1249 
1250 err_free_mqtt_ntp_hnames:
1251  g_strfreev (self->mqtt_ntp_hnames);
1252  self->mqtt_ntp_hnames = NULL;
1253 
1254 err_free_pair_arrs:
1255  g_strfreev (pair_arrs);
1256 
1257  return;
1258 }
1259 
1266 static void
1267 cb_mqtt_on_connect (void *context, MQTTAsync_successData * response)
1268 {
1269  GstMqttSink *self = (GstMqttSink *) context;
1270  UNUSED (response);
1271 
1272  g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECTED);
1273  g_mutex_lock (&self->mqtt_sink_mutex);
1274  self->is_connected = TRUE;
1275  g_cond_broadcast (&self->mqtt_sink_gcond);
1276  g_mutex_unlock (&self->mqtt_sink_mutex);
1277 }
1278 
1284 static void
1285 cb_mqtt_on_connect_failure (void *context, MQTTAsync_failureData * response)
1286 {
1287  GstMqttSink *self = (GstMqttSink *) context;
1288  UNUSED (response);
1289 
1290  g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECT_FAILURE);
1291  g_mutex_lock (&self->mqtt_sink_mutex);
1292  self->is_connected = FALSE;
1293  g_cond_broadcast (&self->mqtt_sink_gcond);
1294  g_mutex_unlock (&self->mqtt_sink_mutex);
1295 }
1296 
1302 static void
1303 cb_mqtt_on_disconnect (void *context, MQTTAsync_successData * response)
1304 {
1305  GstMqttSink *self = (GstMqttSink *) context;
1306  UNUSED (response);
1307 
1308  g_atomic_int_set (&self->mqtt_sink_state, MQTT_DISCONNECTED);
1309  g_mutex_lock (&self->mqtt_sink_mutex);
1310  g_cond_broadcast (&self->mqtt_sink_gcond);
1311  g_mutex_unlock (&self->mqtt_sink_mutex);
1312 }
1313 
1319 static void
1320 cb_mqtt_on_disconnect_failure (void *context, MQTTAsync_failureData * response)
1321 {
1322  GstMqttSink *self = (GstMqttSink *) context;
1323  UNUSED (response);
1324 
1325  g_atomic_int_set (&self->mqtt_sink_state, MQTT_DISCONNECT_FAILED);
1326  g_mutex_lock (&self->mqtt_sink_mutex);
1327  g_cond_broadcast (&self->mqtt_sink_gcond);
1328  g_mutex_unlock (&self->mqtt_sink_mutex);
1329 }
1330 
1335 static void
1336 cb_mqtt_on_delivery_complete (void *context, MQTTAsync_token token)
1337 {
1338  GstMqttSink *self = (GstMqttSink *) context;
1339 
1340  GST_DEBUG_OBJECT (self,
1341  "%s: the message with token(%d) has been delivered.", self->mqtt_topic,
1342  token);
1343 }
1344 
1350 static void
1351 cb_mqtt_on_connection_lost (void *context, char *cause)
1352 {
1353  GstMqttSink *self = (GstMqttSink *) context;
1354  UNUSED (cause);
1355 
1356  g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECTION_LOST);
1357  g_mutex_lock (&self->mqtt_sink_mutex);
1358  self->is_connected = FALSE;
1359  g_cond_broadcast (&self->mqtt_sink_gcond);
1360  g_mutex_unlock (&self->mqtt_sink_mutex);
1361 }
1362 
1367 static int
1368 cb_mqtt_on_message_arrived (void *context, char *topicName, int topicLen,
1369  MQTTAsync_message * message)
1370 {
1371  UNUSED (context);
1372  UNUSED (topicName);
1373  UNUSED (topicLen);
1374  UNUSED (message);
1375  /* nothing to do */
1376  return 1;
1377 }
1378 
1383 static void
1384 cb_mqtt_on_send_success (void *context, MQTTAsync_successData * response)
1385 {
1386  GstMqttSink *self = (GstMqttSink *) context;
1387  mqtt_sink_state_t state = g_atomic_int_get (&self->mqtt_sink_state);
1388  UNUSED (response);
1389 
1390  if (state == SINK_RENDER_STOPPED) {
1391  g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_EOS);
1392 
1393  g_mutex_lock (&self->mqtt_sink_mutex);
1394  g_cond_broadcast (&self->mqtt_sink_gcond);
1395  g_mutex_unlock (&self->mqtt_sink_mutex);
1396  }
1397 }
1398 
1403 static void
1404 cb_mqtt_on_send_failure (void *context, MQTTAsync_failureData * response)
1405 {
1406  GstMqttSink *self = (GstMqttSink *) context;
1407  mqtt_sink_state_t state = g_atomic_int_get (&self->mqtt_sink_state);
1408  UNUSED (response);
1409 
1410  if (state == SINK_RENDER_STOPPED) {
1411  g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_ERROR);
1412 
1413  g_mutex_lock (&self->mqtt_sink_mutex);
1414  g_cond_broadcast (&self->mqtt_sink_gcond);
1415  g_mutex_unlock (&self->mqtt_sink_mutex);
1416  }
1417 
1418 }
DEFAULT_MQTT_NTP_SERVERS
static const gchar DEFAULT_MQTT_NTP_SERVERS[]
Definition: mqttsink.c:89
DEFAULT_MAX_MSG_BUF_SIZE
@ DEFAULT_MAX_MSG_BUF_SIZE
Definition: mqttsink.c:75
PROP_MQTT_NTP_SYNC
@ PROP_MQTT_NTP_SYNC
Definition: mqttsink.c:59
DEFAULT_MQTT_CONN_TIMEOUT_SEC
#define DEFAULT_MQTT_CONN_TIMEOUT_SEC
Definition: mqttcommon.h:40
gst_mqtt_sink_set_caps
static gboolean gst_mqtt_sink_set_caps(GstBaseSink *basesink, GstCaps *caps)
An implementation of the set_caps vmethod in GstBaseSinkClass.
Definition: mqttsink.c:924
MQTT_CONNECT_FAILURE
@ MQTT_CONNECT_FAILURE
Definition: mqttsink.h:45
GST_MQTT_ELEM_NAME_SINK
#define GST_MQTT_ELEM_NAME_SINK
Definition: mqttcommon.h:26
TAG_ERR_MQTTSINK
static const gchar TAG_ERR_MQTTSINK[]
Definition: mqttsink.c:84
PROP_0
@ PROP_0
Definition: mqttsink.c:46
gst_mqtt_sink_get_client_id
static gchar * gst_mqtt_sink_get_client_id(GstMqttSink *self)
Getter for the 'client-id' property.
Definition: mqttsink.c:976
mqttsink.h
Publish incoming data streams as a MQTT topic.
SINK_RENDER_STOPPED
@ SINK_RENDER_STOPPED
Definition: mqttsink.h:47
DEFAULT_SYNC
@ DEFAULT_SYNC
Definition: mqttsink.c:70
FALSE
return FALSE
Definition: gsttensor_transform.c:596
gst_mqtt_sink_class_finalize
static void gst_mqtt_sink_class_finalize(GObject *object)
Finalize GstMqttSinkClass object.
Definition: mqttsink.c:454
MQTT_DISCONNECTED
@ MQTT_DISCONNECTED
Definition: mqttsink.h:51
gst_mqtt_sink_get_mqtt_qos
static gint gst_mqtt_sink_get_mqtt_qos(GstMqttSink *self)
Getter for the 'mqtt-qos' property.
Definition: mqttsink.c:1149
PROP_NUM_BUFFERS
@ PROP_NUM_BUFFERS
Definition: mqttsink.c:56
PROP_MQTT_CLIENT_ID
@ PROP_MQTT_CLIENT_ID
Definition: mqttsink.c:49
_GstMQTTMessageHdr::sent_time_epoch
gint64 sent_time_epoch
Definition: mqttcommon.h:55
PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL
@ PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL
Definition: mqttsink.c:55
_GstMqttSinkClass
GstMqttSinkClass data structure.
Definition: mqttsink.h:101
GST_MQTT_LEN_MSG_HDR
#define GST_MQTT_LEN_MSG_HDR
Definition: mqttcommon.h:29
_GstMQTTMessageHdr::size_mems
gsize size_mems[GST_MQTT_MAX_NUM_MEMS]
Definition: mqttcommon.h:53
PROP_MQTT_HOST_PORT
@ PROP_MQTT_HOST_PORT
Definition: mqttsink.c:51
gst_mqtt_sink_set_opt_cleansession
static void gst_mqtt_sink_set_opt_cleansession(GstMqttSink *self, const gboolean val)
Setter for the 'cleansession' property.
Definition: mqttsink.c:1064
G_DEFINE_TYPE
G_DEFINE_TYPE(GstMqttSink, gst_mqtt_sink, GST_TYPE_BASE_SINK)
gst_mqtt_sink_stop
static gboolean gst_mqtt_sink_stop(GstBaseSink *basesink)
Stop mqttsink, called when state changed ready to null.
Definition: mqttsink.c:623
cb_mqtt_on_disconnect
static void cb_mqtt_on_disconnect(void *context, MQTTAsync_successData *response)
A callback function corresponding to MQTTAsync_disconnectOptions's onSuccess. Regardless of the MQTTA...
Definition: mqttsink.c:1303
SINK_RENDER_EOS
@ SINK_RENDER_EOS
Definition: mqttsink.h:48
gst_mqtt_sink_render_list
static GstFlowReturn gst_mqtt_sink_render_list(GstBaseSink *basesink, GstBufferList *list)
The callback to process GstBufferList (instead of a single buffer) on the sink pad.
Definition: mqttsink.c:877
cb_mqtt_on_message_arrived
static int cb_mqtt_on_message_arrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
A callback function to be given to the MQTTAsync_setCallbacks function. In the case of the publisher,...
Definition: mqttsink.c:1368
PROP_DEBUG
@ PROP_DEBUG
Definition: mqttsink.c:48
sink_client_id
static guint8 sink_client_id
Definition: mqttsink.c:81
cb_mqtt_on_send_failure
static void cb_mqtt_on_send_failure(void *context, MQTTAsync_failureData *response)
A callback function corresponding to MQTTAsync_responseOptions's onFailure.
Definition: mqttsink.c:1404
gst_mqtt_sink_get_opt_keep_alive_interval
static gint gst_mqtt_sink_get_opt_keep_alive_interval(GstMqttSink *self)
Getter for the 'keep-alive-interval' property.
Definition: mqttsink.c:1091
PROP_MQTT_PUB_WAIT_TIMEOUT
@ PROP_MQTT_PUB_WAIT_TIMEOUT
Definition: mqttsink.c:53
DEFAULT_MQTT_NTP_SYNC
@ DEFAULT_MQTT_NTP_SYNC
Definition: mqttsink.c:77
MQTT_CONNECTED
@ MQTT_CONNECTED
Definition: mqttsink.h:50
cb_mqtt_on_connection_lost
static void cb_mqtt_on_connection_lost(void *context, char *cause)
A callback function to be given to the MQTTAsync_setCallbacks function. When the connection between t...
Definition: mqttsink.c:1351
gst_mqtt_sink_set_mqtt_ntp_sync
static void gst_mqtt_sink_set_mqtt_ntp_sync(GstMqttSink *self, const gboolean flag)
Setter for the 'ntp-sync' property.
Definition: mqttsink.c:1176
PROP_MQTT_NTP_SRVS
@ PROP_MQTT_NTP_SRVS
Definition: mqttsink.c:60
DEFAULT_QOS
@ DEFAULT_QOS
Definition: mqttsink.c:69
g_free
g_free(self->option[(opnum) - 1])
opnum: \
gst_mqtt_sink_get_debug
static gboolean gst_mqtt_sink_get_debug(GstMqttSink *self)
Getter for the 'debug' property.
Definition: mqttsink.c:958
PROP_MQTT_QOS
@ PROP_MQTT_QOS
Definition: mqttsink.c:58
MQTT_CONNECTION_LOST
@ MQTT_CONNECTION_LOST
Definition: mqttsink.h:44
gst_mqtt_sink_get_mqtt_ntp_sync
static gboolean gst_mqtt_sink_get_mqtt_ntp_sync(GstMqttSink *self)
Getter for the 'ntp-sync' property.
Definition: mqttsink.c:1167
PROP_MQTT_HOST_ADDRESS
@ PROP_MQTT_HOST_ADDRESS
Definition: mqttsink.c:50
g_value_set_string
g_value_set_string(value, self->option[opnum - 1])
opnum: \
sink_pad_template
static GstStaticPadTemplate sink_pad_template
Definition: mqttsink.c:35
DEFAULT_MQTT_OPT_CLEANSESSION
@ DEFAULT_MQTT_OPT_CLEANSESSION
Definition: mqttsink.c:71
MAX_LEN_PROP_NTP_SRVS
@ MAX_LEN_PROP_NTP_SRVS
Definition: mqttsink.c:78
gst_mqtt_sink_get_mqtt_ntp_srvs
static gchar * gst_mqtt_sink_get_mqtt_ntp_srvs(GstMqttSink *self)
Getter for the 'ntp-srvs' property.
Definition: mqttsink.c:1185
GST_DEBUG_CATEGORY_STATIC
GST_DEBUG_CATEGORY_STATIC(gst_mqtt_sink_debug)
default_mqtt_get_unix_epoch
static int64_t default_mqtt_get_unix_epoch(uint32_t hnum, char **hnames, uint16_t *hports)
A wrapper function of g_get_real_time () to assign it to the function pointer, mqtt_get_unix_epoch.
Definition: mqttcommon.h:71
cb_mqtt_on_delivery_complete
static void cb_mqtt_on_delivery_complete(void *context, MQTTAsync_token token)
A callback function to be given to the MQTTAsync_setCallbacks function. This callback is activated wh...
Definition: mqttsink.c:1336
gst_mqtt_sink_render
static GstFlowReturn gst_mqtt_sink_render(GstBaseSink *basesink, GstBuffer *buffer)
The callback to process each buffer receiving on the sink pad.
Definition: mqttsink.c:751
DEFAULT_MQTT_CLIENT_ID
static const gchar DEFAULT_MQTT_CLIENT_ID[]
Definition: mqttsink.c:85
gst_mqtt_sink_set_mqtt_qos
static void gst_mqtt_sink_set_mqtt_qos(GstMqttSink *self, const gint qos)
Setter for the 'mqtt-qos' property.
Definition: mqttsink.c:1158
PROP_MAX_MSG_BUF_SIZE
@ PROP_MAX_MSG_BUF_SIZE
Definition: mqttsink.c:57
gst_mqtt_sink_get_host_address
static gchar * gst_mqtt_sink_get_host_address(GstMqttSink *self)
Getter for the 'host' property.
Definition: mqttsink.c:995
_GstMQTTMessageHdr::base_time_epoch
gint64 base_time_epoch
Definition: mqttcommon.h:54
gst_mqtt_sink_set_pub_topic
static void gst_mqtt_sink_set_pub_topic(GstMqttSink *self, const gchar *topic)
Setter for the 'pub-topic' property.
Definition: mqttsink.c:1045
GST_US_TO_NS_MULTIPLIER
#define GST_US_TO_NS_MULTIPLIER
Definition: mqttcommon.h:38
gst_mqtt_sink_set_property
static void gst_mqtt_sink_set_property(GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
The setter for the mqttsink's properties.
Definition: mqttsink.c:344
gst_mqtt_sink_set_host_address
static void gst_mqtt_sink_set_host_address(GstMqttSink *self, const gchar *addr)
Setter for the 'host' property.
Definition: mqttsink.c:1004
cb_mqtt_on_send_success
static void cb_mqtt_on_send_success(void *context, MQTTAsync_successData *response)
A callback function corresponding to MQTTAsync_responseOptions's onSuccess.
Definition: mqttsink.c:1384
gst_mqtt_sink_set_num_buffers
static void gst_mqtt_sink_set_num_buffers(GstMqttSink *self, const gint num)
Setter for the 'num-buffers' property.
Definition: mqttsink.c:1140
ntputil_get_epoch
int64_t ntputil_get_epoch(uint32_t hnums, char **hnames, uint16_t *ports)
Get NTP timestamps from the given or public NTP servers.
Definition: ntputil.c:140
_mqtt_set_msg_buf_hdr
static gboolean _mqtt_set_msg_buf_hdr(GstBuffer *gst_buf, GstMQTTMessageHdr *hdr)
A utility function to set the message header.
Definition: mqttsink.c:725
gst_mqtt_sink_get_host_port
static gchar * gst_mqtt_sink_get_host_port(GstMqttSink *self)
Getter for the 'port' property.
Definition: mqttsink.c:1017
DEFAULT_MQTT_HOST_ADDRESS
static const gchar DEFAULT_MQTT_HOST_ADDRESS[]
Definition: mqttsink.c:82
_put_timestamp_to_msg_buf_hdr
static void _put_timestamp_to_msg_buf_hdr(GstMqttSink *self, GstBuffer *gst_buf, GstMQTTMessageHdr *hdr)
A utility function to set the timestamp information onto the given buffer.
Definition: mqttsink.c:688
TRUE
return TRUE
Definition: gsttensor_if.c:879
UNUSED
#define UNUSED(expr)
Definition: mqttcommon.h:19
gst_mqtt_sink_set_max_msg_buf_size
static void gst_mqtt_sink_set_max_msg_buf_size(GstMqttSink *self, const gsize size)
Setter for the 'max-buffer-size' property.
Definition: mqttsink.c:1118
gst_mqtt_sink_init
static void gst_mqtt_sink_init(GstMqttSink *self)
Initialize GstMqttSink object.
Definition: mqttsink.c:171
nnstreamer_util.h
Optional NNStreamer utility functions for sub-plugin writers and users.
DEFAULT_MQTT_PUB_TOPIC_FORMAT
static const gchar DEFAULT_MQTT_PUB_TOPIC_FORMAT[]
Definition: mqttsink.c:88
PROP_MQTT_OPT_CLEANSESSION
@ PROP_MQTT_OPT_CLEANSESSION
Definition: mqttsink.c:54
gst_mqtt_sink_set_host_port
static void gst_mqtt_sink_set_host_port(GstMqttSink *self, const gchar *port)
Setter for the 'port' property.
Definition: mqttsink.c:1026
gst_mqtt_sink_get_opt_cleansession
static gboolean gst_mqtt_sink_get_opt_cleansession(GstMqttSink *self)
Getter for the 'cleansession' property.
Definition: mqttsink.c:1055
DEFAULT_MQTT_DISCONNECT_TIMEOUT
@ DEFAULT_MQTT_DISCONNECT_TIMEOUT
Definition: mqttsink.c:73
DEFAULT_MQTT_PUB_TOPIC
static const gchar DEFAULT_MQTT_PUB_TOPIC[]
Definition: mqttsink.c:87
MQTT_DISCONNECT_FAILED
@ MQTT_DISCONNECT_FAILED
Definition: mqttsink.h:52
gst_mqtt_sink_get_pub_topic
static gchar * gst_mqtt_sink_get_pub_topic(GstMqttSink *self)
Getter for the 'pub-topic' property.
Definition: mqttsink.c:1036
GST_MQTT_MAX_LEN_GST_CAPS_STR
#define GST_MQTT_MAX_LEN_GST_CAPS_STR
Definition: mqttcommon.h:30
gst_mqtt_sink_set_opt_keep_alive_interval
static void gst_mqtt_sink_set_opt_keep_alive_interval(GstMqttSink *self, const gint num)
Setter for the 'keep-alive-interval' property.
Definition: mqttsink.c:1100
gst_mqtt_sink_set_pub_wait_timeout
static void gst_mqtt_sink_set_pub_wait_timeout(GstMqttSink *self, const gulong to)
Setter for the 'pub-wait-timeout' property.
Definition: mqttsink.c:1082
DEFAULT_NUM_BUFFERS
@ DEFAULT_NUM_BUFFERS
Definition: mqttsink.c:68
_GstMQTTMessageHdr
Defined a custom data type, GstMQTTMessageHdr.
Definition: mqttcommon.h:49
cb_mqtt_on_connect_failure
static void cb_mqtt_on_connect_failure(void *context, MQTTAsync_failureData *response)
A callback function corresponding to MQTTAsync_connectOptions's onFailure. This callback is invoked w...
Definition: mqttsink.c:1285
gst_mqtt_sink_get_pub_wait_timeout
static gulong gst_mqtt_sink_get_pub_wait_timeout(GstMqttSink *self)
Getter for the 'pub-wait-timeout' property.
Definition: mqttsink.c:1073
gst_mqtt_sink_start
static gboolean gst_mqtt_sink_start(GstBaseSink *basesink)
Start mqttsink, called when state changed null to ready.
Definition: mqttsink.c:554
DEFAULT_MQTT_QOS
@ DEFAULT_MQTT_QOS
Definition: mqttsink.c:76
_GstMqttSink
GstMqttSink data structure.
Definition: mqttsink.h:60
gst_mqtt_sink_get_property
static void gst_mqtt_sink_get_property(GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
The getter for the mqttsink's properties.
Definition: mqttsink.c:399
cb_mqtt_on_connect
static void cb_mqtt_on_connect(void *context, MQTTAsync_successData *response)
A callback function corresponding to MQTTAsync_connectOptions's onSuccess. This callback is invoked w...
Definition: mqttsink.c:1267
ntputil.h
A header file of NTP utility functions.
_GstMQTTMessageHdr::dts
GstClockTime dts
Definition: mqttcommon.h:57
DEFAULT_MQTT_CLIENT_ID_FORMAT
static const gchar DEFAULT_MQTT_CLIENT_ID_FORMAT[]
Definition: mqttsink.c:86
GST_MQTT_SINK
#define GST_MQTT_SINK(obj)
Definition: mqttsink.h:26
SINK_INITIALIZING
@ SINK_INITIALIZING
Definition: mqttsink.h:46
PROP_LAST
@ PROP_LAST
Definition: mqttsink.c:62
DEFAULT_MQTT_PUB_WAIT_TIMEOUT
@ DEFAULT_MQTT_PUB_WAIT_TIMEOUT
Definition: mqttsink.c:74
gst_mqtt_sink_query
static gboolean gst_mqtt_sink_query(GstBaseSink *basesink, GstQuery *query)
Perform queries on the element.
Definition: mqttsink.c:661
gst_mqtt_sink_set_debug
static void gst_mqtt_sink_set_debug(GstMqttSink *self, const gboolean flag)
Setter for the 'debug' property.
Definition: mqttsink.c:967
gst_mqtt_sink_get_num_buffers
static gint gst_mqtt_sink_get_num_buffers(GstMqttSink *self)
Getter for the 'num-buffers' property.
Definition: mqttsink.c:1127
DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL
@ DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL
Definition: mqttsink.c:72
PROP_MQTT_PUB_TOPIC
@ PROP_MQTT_PUB_TOPIC
Definition: mqttsink.c:52
gst_mqtt_sink_change_state
static GstStateChangeReturn gst_mqtt_sink_change_state(GstElement *element, GstStateChange transition)
Handle mqttsink's state change.
Definition: mqttsink.c:491
_GstMQTTMessageHdr::pts
GstClockTime pts
Definition: mqttcommon.h:58
GST_CAT_DEFAULT
#define GST_CAT_DEFAULT
Definition: mqttsink.c:42
gst_mqtt_sink_event
static gboolean gst_mqtt_sink_event(GstBaseSink *basesink, GstEvent *event)
Handle events arriving on the sink pad.
Definition: mqttsink.c:898
gst_mqtt_sink_set_mqtt_ntp_srvs
static void gst_mqtt_sink_set_mqtt_ntp_srvs(GstMqttSink *self, const gchar *pairs)
Setter for the 'ntp-srvs' property.
Definition: mqttsink.c:1194
cb_mqtt_on_disconnect_failure
static void cb_mqtt_on_disconnect_failure(void *context, MQTTAsync_failureData *response)
A callback function corresponding to MQTTAsync_disconnectOptions's onFailure. Regardless of the MQTTA...
Definition: mqttsink.c:1320
type
svtc_1 type
Definition: gsttensor_if.c:825
_GstMQTTMessageHdr::num_mems
guint num_mems
Definition: mqttcommon.h:52
SINK_RENDER_ERROR
@ SINK_RENDER_ERROR
Definition: mqttsink.h:49
_GstMQTTMessageHdr::duration
GstClockTime duration
Definition: mqttcommon.h:56
mqtt_sink_state_t
enum _mqtt_sink_state_t mqtt_sink_state_t
A type definition to indicate the state of this element.
gst_mqtt_sink_class_init
static void gst_mqtt_sink_class_init(GstMqttSinkClass *klass)
Initialize GstMqttSinkClass object.
Definition: mqttsink.c:229
DEFAULT_DEBUG
@ DEFAULT_DEBUG
Definition: mqttsink.c:67
DEFAULT_MQTT_HOST_PORT
static const gchar DEFAULT_MQTT_HOST_PORT[]
Definition: mqttsink.c:83
gst_mqtt_sink_set_client_id
static void gst_mqtt_sink_set_client_id(GstMqttSink *self, const gchar *id)
Setter for the 'client-id' property.
Definition: mqttsink.c:985
gst_mqtt_sink_get_max_msg_buf_size
static gsize gst_mqtt_sink_get_max_msg_buf_size(GstMqttSink *self)
Getter for the 'max-buffer-size' property.
Definition: mqttsink.c:1109