24 #include <sys/types.h>
28 #include <gst/base/gstbasesink.h>
29 #include <MQTTAsync.h>
36 GST_PAD_SINK, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
38 #define gst_mqtt_sink_parent_class parent_class
42 #define GST_CAT_DEFAULT gst_mqtt_sink_debug
94 const GValue * value, GParamSpec * pspec);
97 GValue * value, GParamSpec * pspec);
100 static GstStateChangeReturn
125 const gchar * topic);
145 const gboolean flag);
148 const gchar * pairs);
151 MQTTAsync_successData * response);
153 MQTTAsync_failureData * response);
155 MQTTAsync_successData * response);
157 MQTTAsync_failureData * response);
161 int topicLen, MQTTAsync_message * message);
163 MQTTAsync_successData * response);
165 MQTTAsync_failureData * response);
173 GstBaseSink *basesink = GST_BASE_SINK (
self);
174 MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
175 MQTTAsync_responseOptions respn_opts = MQTTAsync_responseOptions_initializer;
178 self->mqtt_client_handle = NULL;
179 self->mqtt_conn_opts = conn_opts;
182 self->mqtt_conn_opts.context =
self;
183 self->mqtt_respn_opts = respn_opts;
186 self->mqtt_respn_opts.context =
self;
192 g_mutex_init (&self->mqtt_sink_mutex);
193 g_cond_init (&self->mqtt_sink_gcond);
194 self->mqtt_msg_buf = NULL;
195 self->mqtt_msg_buf_size = 0;
196 memset (&self->mqtt_msg_hdr, 0x0, sizeof (self->mqtt_msg_hdr));
197 self->base_time_epoch = GST_CLOCK_TIME_NONE;
198 self->in_caps = NULL;
214 self->mqtt_ntp_hnames = NULL;
215 self->mqtt_ntp_ports = NULL;
216 self->mqtt_ntp_num_srvs = 0;
218 self->is_connected =
FALSE;
221 gst_base_sink_set_qos_enabled (basesink,
DEFAULT_QOS);
231 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
232 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
233 GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
242 g_object_class_install_property (gobject_class,
PROP_DEBUG,
243 g_param_spec_boolean (
"debug",
"Debug",
244 "Produce extra verbose output for debug purpose",
DEFAULT_DEBUG,
245 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
248 g_param_spec_string (
"client-id",
"Client ID",
249 "The client identifier passed to the server (broker).", NULL,
250 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
253 g_param_spec_string (
"host",
"Host",
"Host (broker) to connect to",
255 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
258 g_param_spec_string (
"port",
"Port",
260 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
263 g_param_spec_boolean (
"ntp-sync",
"NTP Synchronization",
264 "Synchronize received streams to the NTP clock",
268 g_param_spec_string (
"ntp-srvs",
"NTP Server Host Name and Port Pairs",
269 "NTP Servers' HOST_NAME:PORT pairs to use (valid only if ntp-sync is true)\n"
270 "\t\t\tUse ',' to separate each pair if there are more pairs than one",
272 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
275 g_param_spec_string (
"pub-topic",
"Topic to Publish",
276 "The topic's name to publish", NULL,
277 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
279 g_object_class_install_property (gobject_class,
281 g_param_spec_ulong (
"pub-wait-timeout",
"Timeout for Publish a message",
282 "Timeout for execution of the main thread with completed publication of a message",
284 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
287 g_param_spec_boolean (
"cleansession",
"Cleansession",
288 "When it is TRUE, the state information is discarded at connect and disconnect.",
290 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
292 g_object_class_install_property (gobject_class,
294 g_param_spec_int (
"keep-alive-interval",
"Keep Alive Interval",
295 "The maximum time (in seconds) that should pass without communication between the client and the server (broker)",
297 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
300 g_param_spec_ulong (
"max-buffer-size",
301 "The maximum size of a message buffer",
302 "The maximum size in bytes of a message buffer (0 = dynamic buffer size)",
304 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
307 g_param_spec_int (
"num-buffers",
"Num Buffers",
308 "Number of (remaining) buffers to accept until sending EOS event (-1 = no limit)",
310 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
312 g_object_class_install_property (gobject_class,
PROP_MQTT_QOS,
313 g_param_spec_int (
"mqtt-qos",
"mqtt QoS level",
314 "The QoS level of MQTT.\n"
315 "\t\t\t 0: At most once\n"
316 "\t\t\t 1: At least once\n"
317 "\t\t\t 2: Exactly once\n"
318 "\t\t\tsee also: https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/qos.html",
327 gstbasesink_class->render_list =
332 gst_element_class_set_static_metadata (gstelement_class,
333 "MQTT sink",
"Sink/MQTT",
334 "Publish incoming data streams as a MQTT topic",
335 "Wook Song <wook16.song@samsung.com>");
336 gst_element_class_add_static_pad_template (gstelement_class,
345 const GValue * value, GParamSpec * pspec)
390 G_OBJECT_WARN_INVALID_PROPERTY_ID (
object, prop_id, pspec);
400 GValue * value, GParamSpec * pspec)
445 G_OBJECT_WARN_INVALID_PROPERTY_ID (
object, prop_id, pspec);
458 g_free (self->mqtt_host_address);
459 self->mqtt_host_address = NULL;
460 g_free (self->mqtt_host_port);
461 self->mqtt_host_port = NULL;
462 if (self->mqtt_client_handle) {
463 MQTTAsync_destroy (&self->mqtt_client_handle);
464 self->mqtt_client_handle = NULL;
466 g_free (self->mqtt_client_id);
467 self->mqtt_client_id = NULL;
468 g_free (self->mqtt_msg_buf);
469 self->mqtt_msg_buf = NULL;
470 g_free (self->mqtt_topic);
471 self->mqtt_topic = NULL;
472 gst_caps_replace (&self->in_caps, NULL);
473 g_free (self->mqtt_ntp_srvs);
474 self->mqtt_ntp_srvs = NULL;
475 self->mqtt_ntp_num_srvs = 0;
476 g_strfreev (self->mqtt_ntp_hnames);
477 self->mqtt_ntp_hnames = NULL;
478 g_free (self->mqtt_ntp_ports);
479 self->mqtt_ntp_ports = NULL;
482 g_error_free (self->err);
483 g_mutex_clear (&self->mqtt_sink_mutex);
484 G_OBJECT_CLASS (parent_class)->finalize (
object);
490 static GstStateChangeReturn
493 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
495 GstClock *elem_clock;
496 GstClockTime base_time;
497 GstClockTime cur_time;
498 GstClockTimeDiff diff;
500 switch (transition) {
501 case GST_STATE_CHANGE_NULL_TO_READY:
502 GST_INFO_OBJECT (
self,
"GST_STATE_CHANGE_NULL_TO_READY");
504 g_printerr (
"%s: %s\n", g_quark_to_string (self->err->domain),
506 return GST_STATE_CHANGE_FAILURE;
509 case GST_STATE_CHANGE_READY_TO_PAUSED:
510 GST_INFO_OBJECT (
self,
"GST_STATE_CHANGE_READY_TO_PAUSED");
512 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
513 if (self->mqtt_ntp_sync)
515 self->base_time_epoch = GST_CLOCK_TIME_NONE;
516 elem_clock = gst_element_get_clock (element);
519 base_time = gst_element_get_base_time (element);
520 cur_time = gst_clock_get_time (elem_clock);
521 gst_object_unref (elem_clock);
522 diff = GST_CLOCK_DIFF (base_time, cur_time);
523 self->base_time_epoch =
524 self->get_epoch_func (self->mqtt_ntp_num_srvs, self->mqtt_ntp_hnames,
526 GST_INFO_OBJECT (
self,
"GST_STATE_CHANGE_PAUSED_TO_PLAYING");
532 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
534 switch (transition) {
535 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
536 GST_INFO_OBJECT (
self,
"GST_STATE_CHANGE_PLAYING_TO_PAUSED");
538 case GST_STATE_CHANGE_PAUSED_TO_READY:
539 GST_INFO_OBJECT (
self,
"GST_STATE_CHANGE_PAUSED_TO_READY");
541 case GST_STATE_CHANGE_READY_TO_NULL:
542 GST_INFO_OBJECT (
self,
"GST_STATE_CHANGE_READY_TO_NULL");
557 gchar *haddr = g_strdup_printf (
"%s:%s", self->mqtt_host_address,
558 self->mqtt_host_port);
563 g_free (self->mqtt_client_id);
570 self->mqtt_client_id);
581 ret = MQTTAsync_create (&self->mqtt_client_handle, haddr,
582 self->mqtt_client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
584 if (ret != MQTTASYNC_SUCCESS)
587 MQTTAsync_setCallbacks (self->mqtt_client_handle,
self,
591 ret = MQTTAsync_connect (self->mqtt_client_handle, &self->mqtt_conn_opts);
592 if (ret != MQTTASYNC_SUCCESS) {
597 end_time = g_get_monotonic_time () +
599 g_mutex_lock (&self->mqtt_sink_mutex);
600 while (!self->is_connected) {
601 if (!g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
603 g_mutex_unlock (&self->mqtt_sink_mutex);
604 g_critical (
"Failed to connect to MQTT broker from mqttsink."
605 "Please check broker is running status or broker host address.");
609 g_mutex_unlock (&self->mqtt_sink_mutex);
614 MQTTAsync_destroy (&self->mqtt_client_handle);
615 self->mqtt_client_handle = NULL;
626 MQTTAsync_disconnectOptions disconn_opts =
627 MQTTAsync_disconnectOptions_initializer;
632 disconn_opts.context =
self;
635 while (MQTTAsync_isConnected (self->mqtt_client_handle)) {
639 MQTTAsync_disconnect (self->mqtt_client_handle, &disconn_opts);
640 g_mutex_lock (&self->mqtt_sink_mutex);
641 self->is_connected =
FALSE;
642 g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
644 g_mutex_unlock (&self->mqtt_sink_mutex);
645 cur_state = g_atomic_int_get (&self->mqtt_sink_state);
652 MQTTAsync_destroy (&self->mqtt_client_handle);
653 self->mqtt_client_handle = NULL;
663 gboolean ret =
FALSE;
665 switch (GST_QUERY_TYPE (query)) {
666 case GST_QUERY_SEEKING:{
670 gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
671 gst_query_set_seeking (query, fmt,
FALSE, 0, -1);
676 ret = GST_BASE_SINK_CLASS (parent_class)->query (basesink, query);
695 hdr->
duration = GST_BUFFER_DURATION_IS_VALID (gst_buf) ?
696 GST_BUFFER_DURATION (gst_buf) : GST_CLOCK_TIME_NONE;
698 hdr->
dts = GST_BUFFER_DTS_IS_VALID (gst_buf) ?
699 GST_BUFFER_DTS (gst_buf) : GST_CLOCK_TIME_NONE;
701 hdr->
pts = GST_BUFFER_PTS_IS_VALID (gst_buf) ?
702 GST_BUFFER_PTS (gst_buf) : GST_CLOCK_TIME_NONE;
705 GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (
self));
708 clock = gst_element_get_clock (GST_ELEMENT (
self));
710 GST_DEBUG_OBJECT (
self,
711 "%s now %" GST_TIME_FORMAT
" ts %" GST_TIME_FORMAT
" sent %"
712 GST_TIME_FORMAT, self->mqtt_topic,
713 GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
714 GST_TIME_ARGS (hdr->
pts),
717 gst_object_unref (clock);
730 hdr->
num_mems = gst_buffer_n_memory (gst_buf);
731 for (i = 0; i < hdr->
num_mems; ++i) {
734 each_mem = gst_buffer_peek_memory (gst_buf, i);
736 memset (hdr, 0x0,
sizeof (*hdr));
753 const gsize in_buf_size = gst_buffer_get_size (in_buf);
754 static gboolean is_static_sized_buf =
FALSE;
756 GstFlowReturn ret = GST_FLOW_ERROR;
758 GstMemory *in_buf_mem;
759 GstMapInfo in_buf_map;
765 gint64 end_time = g_get_monotonic_time ();
768 end_time += (
self->mqtt_pub_wait_timeout * G_TIME_SPAN_SECOND);
769 g_mutex_lock (&self->mqtt_sink_mutex);
770 g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
772 g_mutex_unlock (&self->mqtt_sink_mutex);
774 _state = g_atomic_int_get (&self->mqtt_sink_state);
780 ret = GST_FLOW_ERROR;
791 if (self->num_buffers == 0) {
796 if (self->num_buffers != -1) {
797 self->num_buffers -= 1;
800 if ((!is_static_sized_buf) && (self->mqtt_msg_buf) &&
801 (self->mqtt_msg_buf_size != 0) &&
803 g_free (self->mqtt_msg_buf);
804 self->mqtt_msg_buf = NULL;
805 self->mqtt_msg_buf_size = 0;
809 if ((!self->mqtt_msg_buf) && (self->mqtt_msg_buf_size == 0)) {
810 if (self->max_msg_buf_size == 0) {
813 if (self->max_msg_buf_size < in_buf_size) {
814 g_printerr (
"%s: The given size for a message buffer is too small: "
815 "given (%" G_GSIZE_FORMAT
" bytes) vs. incoming (%" G_GSIZE_FORMAT
817 ret = GST_FLOW_ERROR;
821 is_static_sized_buf =
TRUE;
824 self->mqtt_msg_buf = g_try_malloc0 (self->mqtt_msg_buf_size);
828 ret = GST_FLOW_ERROR;
832 msg_pub =
self->mqtt_msg_buf;
834 self->mqtt_msg_buf_size = 0;
835 ret = GST_FLOW_ERROR;
838 memcpy (msg_pub, &self->mqtt_msg_hdr, sizeof (self->mqtt_msg_hdr));
841 in_buf_mem = gst_buffer_get_all_memory (in_buf);
843 ret = GST_FLOW_ERROR;
847 if (!gst_memory_map (in_buf_mem, &in_buf_map, GST_MAP_READ)) {
848 ret = GST_FLOW_ERROR;
849 goto ret_unref_in_buf_mem;
854 memcpy (&msg_pub[
sizeof (self->mqtt_msg_hdr)], in_buf_map.data,
856 mqtt_rc = MQTTAsync_send (self->mqtt_client_handle, self->mqtt_topic,
858 self->mqtt_qos, 1, &self->mqtt_respn_opts);
859 if (mqtt_rc != MQTTASYNC_SUCCESS) {
860 ret = GST_FLOW_ERROR;
863 gst_memory_unmap (in_buf_mem, &in_buf_map);
865 ret_unref_in_buf_mem:
866 gst_memory_unref (in_buf_mem);
879 guint num_buffers = gst_buffer_list_length (list);
884 for (i = 0; i < num_buffers; ++i) {
885 buffer = gst_buffer_list_get (list, i);
887 if (ret != GST_FLOW_OK)
901 GstEventType
type = GST_EVENT_TYPE (event);
902 gboolean ret =
FALSE;
907 g_mutex_lock (&self->mqtt_sink_mutex);
908 g_cond_broadcast (&self->mqtt_sink_gcond);
909 g_mutex_unlock (&self->mqtt_sink_mutex);
915 ret = GST_BASE_SINK_CLASS (parent_class)->event (basesink, event);
929 ret = gst_caps_replace (&self->in_caps, caps);
931 if (ret && gst_caps_is_fixed (self->in_caps)) {
932 gchar *caps_str = gst_caps_to_string (caps);
935 if (caps_str == NULL) {
936 g_critical (
"Fail to convert caps to string representation");
940 len = g_strlcpy (self->mqtt_msg_hdr.gst_caps_str, caps_str,
944 g_critical (
"Fail to copy caps_str.");
978 return self->mqtt_client_id;
987 g_free (self->mqtt_client_id);
988 self->mqtt_client_id = g_strdup (
id);
997 return self->mqtt_host_address;
1009 g_free (self->mqtt_host_address);
1010 self->mqtt_host_address = g_strdup (addr);
1019 return self->mqtt_host_port;
1028 g_free (self->mqtt_host_port);
1029 self->mqtt_host_port = g_strdup (port);
1038 return self->mqtt_topic;
1047 g_free (self->mqtt_topic);
1048 self->mqtt_topic = g_strdup (topic);
1057 return self->mqtt_conn_opts.cleansession;
1066 self->mqtt_conn_opts.cleansession = val;
1075 return self->mqtt_pub_wait_timeout;
1084 self->mqtt_pub_wait_timeout = to;
1093 return self->mqtt_conn_opts.keepAliveInterval;
1102 self->mqtt_conn_opts.keepAliveInterval = num;
1111 return self->max_msg_buf_size;
1120 self->max_msg_buf_size = size;
1131 num_buffers =
self->num_buffers;
1142 self->num_buffers = num;
1151 return self->mqtt_qos;
1160 self->mqtt_qos = qos;
1169 return self->mqtt_ntp_sync;
1178 self->mqtt_ntp_sync = flag;
1187 return self->mqtt_ntp_srvs;
1196 gchar **pair_arrs = NULL;
1201 if (g_strcmp0 (self->mqtt_ntp_srvs, pairs) == 0)
1204 g_free (self->mqtt_ntp_srvs);
1205 self->mqtt_ntp_srvs = g_strdup (pairs);
1207 pair_arrs = g_strsplit (pairs,
",", -1);
1208 if (pair_arrs == NULL)
1211 hnum = g_strv_length (pair_arrs);
1213 goto err_free_pair_arrs;
1215 g_free (self->mqtt_ntp_hnames);
1216 self->mqtt_ntp_hnames = g_try_malloc0 ((hnum + 1) *
sizeof (gchar *));
1217 if (!self->mqtt_ntp_hnames)
1218 goto err_free_pair_arrs;
1220 g_free (self->mqtt_ntp_ports);
1221 self->mqtt_ntp_ports = g_try_malloc0 (hnum *
sizeof (guint16));
1222 if (!self->mqtt_ntp_ports)
1223 goto err_free_mqtt_ntp_hnames;
1225 self->mqtt_ntp_num_srvs = hnum;
1226 for (i = 0, j = 0; i < hnum; i++) {
1232 pair = pair_arrs[i];
1233 hname_port = g_strsplit (pair,
":", 2);
1234 hname = hname_port[0];
1235 port_ul = strtoul (hname_port[1], &eport, 10);
1236 if ((port_ul == 0) || (port_ul > UINT16_MAX)) {
1237 self->mqtt_ntp_num_srvs--;
1239 self->mqtt_ntp_hnames[j] = g_strdup (hname);
1240 self->mqtt_ntp_ports[j] = (uint16_t) port_ul;
1244 g_strfreev (hname_port);
1247 g_strfreev (pair_arrs);
1250 err_free_mqtt_ntp_hnames:
1251 g_strfreev (self->mqtt_ntp_hnames);
1252 self->mqtt_ntp_hnames = NULL;
1255 g_strfreev (pair_arrs);
1273 g_mutex_lock (&self->mqtt_sink_mutex);
1274 self->is_connected =
TRUE;
1275 g_cond_broadcast (&self->mqtt_sink_gcond);
1276 g_mutex_unlock (&self->mqtt_sink_mutex);
1291 g_mutex_lock (&self->mqtt_sink_mutex);
1292 self->is_connected =
FALSE;
1293 g_cond_broadcast (&self->mqtt_sink_gcond);
1294 g_mutex_unlock (&self->mqtt_sink_mutex);
1309 g_mutex_lock (&self->mqtt_sink_mutex);
1310 g_cond_broadcast (&self->mqtt_sink_gcond);
1311 g_mutex_unlock (&self->mqtt_sink_mutex);
1326 g_mutex_lock (&self->mqtt_sink_mutex);
1327 g_cond_broadcast (&self->mqtt_sink_gcond);
1328 g_mutex_unlock (&self->mqtt_sink_mutex);
1340 GST_DEBUG_OBJECT (
self,
1341 "%s: the message with token(%d) has been delivered.", self->mqtt_topic,
1357 g_mutex_lock (&self->mqtt_sink_mutex);
1358 self->is_connected =
FALSE;
1359 g_cond_broadcast (&self->mqtt_sink_gcond);
1360 g_mutex_unlock (&self->mqtt_sink_mutex);
1369 MQTTAsync_message * message)
1393 g_mutex_lock (&self->mqtt_sink_mutex);
1394 g_cond_broadcast (&self->mqtt_sink_gcond);
1395 g_mutex_unlock (&self->mqtt_sink_mutex);
1413 g_mutex_lock (&self->mqtt_sink_mutex);
1414 g_cond_broadcast (&self->mqtt_sink_gcond);
1415 g_mutex_unlock (&self->mqtt_sink_mutex);