Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1-only */
2 : /**
3 : * Copyright (C) 2021 Wook Song <wook16.song@samsung.com>
4 : */
5 : /**
6 : * @file mqttsink.c
7 : * @date 01 Apr 2021
8 : * @brief Publish incoming data streams as a MQTT topic
9 : * @see https://github.com/nnstreamer/nnstreamer
10 : * @author Wook Song <wook16.song@samsung.com>
11 : * @bug No known bugs except for NYI items
12 : */
13 :
14 : #ifdef HAVE_CONFIG_H
15 : #include <config.h>
16 : #endif
17 :
18 : #include <stdlib.h>
19 : #include <string.h>
20 :
21 : #ifdef G_OS_WIN32
22 : #include <process.h>
23 : #else
24 : #include <sys/types.h>
25 : #include <unistd.h>
26 : #endif
27 :
28 : #include <gst/base/gstbasesink.h>
29 : #include <MQTTAsync.h>
30 : #include <nnstreamer_util.h>
31 :
32 : #include "mqttsink.h"
33 : #include "ntputil.h"
34 :
35 : static GstStaticPadTemplate sink_pad_template = GST_STATIC_PAD_TEMPLATE ("sink",
36 : GST_PAD_SINK, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
37 :
38 : #define gst_mqtt_sink_parent_class parent_class
39 230 : G_DEFINE_TYPE (GstMqttSink, gst_mqtt_sink, GST_TYPE_BASE_SINK);
40 :
41 : GST_DEBUG_CATEGORY_STATIC (gst_mqtt_sink_debug);
42 : #define GST_CAT_DEFAULT gst_mqtt_sink_debug
43 :
44 : enum
45 : {
46 : PROP_0,
47 :
48 : PROP_DEBUG,
49 : PROP_MQTT_CLIENT_ID,
50 : PROP_MQTT_HOST_ADDRESS,
51 : PROP_MQTT_HOST_PORT,
52 : PROP_MQTT_PUB_TOPIC,
53 : PROP_MQTT_PUB_WAIT_TIMEOUT,
54 : PROP_MQTT_OPT_CLEANSESSION,
55 : PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL,
56 : PROP_NUM_BUFFERS,
57 : PROP_MAX_MSG_BUF_SIZE,
58 : PROP_MQTT_QOS,
59 : PROP_MQTT_NTP_SYNC,
60 : PROP_MQTT_NTP_SRVS,
61 :
62 : PROP_LAST
63 : };
64 :
65 : enum
66 : {
67 : DEFAULT_DEBUG = FALSE,
68 : DEFAULT_NUM_BUFFERS = -1,
69 : DEFAULT_QOS = TRUE,
70 : DEFAULT_SYNC = FALSE,
71 : DEFAULT_MQTT_OPT_CLEANSESSION = TRUE,
72 : DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL = 60, /* 1 minute */
73 : DEFAULT_MQTT_DISCONNECT_TIMEOUT = G_TIME_SPAN_SECOND * 3, /* 3 secs */
74 : DEFAULT_MQTT_PUB_WAIT_TIMEOUT = 1, /* 1 secs */
75 : DEFAULT_MAX_MSG_BUF_SIZE = 0, /* Buffer size is not fixed */
76 : DEFAULT_MQTT_QOS = 0, /* fire and forget */
77 : DEFAULT_MQTT_NTP_SYNC = FALSE,
78 : MAX_LEN_PROP_NTP_SRVS = 4096,
79 : };
80 :
81 : static guint8 sink_client_id = 0;
82 : static const gchar DEFAULT_MQTT_HOST_ADDRESS[] = "127.0.0.1";
83 : static const gchar DEFAULT_MQTT_HOST_PORT[] = "1883";
84 : static const gchar TAG_ERR_MQTTSINK[] = "ERROR: MQTTSink";
85 : static const gchar DEFAULT_MQTT_CLIENT_ID[] = "$HOST_$PID_^[0-9][0-9]?$|^255$";
86 : static const gchar DEFAULT_MQTT_CLIENT_ID_FORMAT[] = "%s_%u_sink%u";
87 : static const gchar DEFAULT_MQTT_PUB_TOPIC[] = "$client-id/topic";
88 : static const gchar DEFAULT_MQTT_PUB_TOPIC_FORMAT[] = "%s/topic";
89 : static const gchar DEFAULT_MQTT_NTP_SERVERS[] = "pool.ntp.org:123";
90 :
91 : /** Function prototype declarations */
92 : static void
93 : gst_mqtt_sink_set_property (GObject * object, guint prop_id,
94 : const GValue * value, GParamSpec * pspec);
95 : static void
96 : gst_mqtt_sink_get_property (GObject * object, guint prop_id,
97 : GValue * value, GParamSpec * pspec);
98 : static void gst_mqtt_sink_class_finalize (GObject * object);
99 :
100 : static GstStateChangeReturn
101 : gst_mqtt_sink_change_state (GstElement * element, GstStateChange transition);
102 :
103 : static gboolean gst_mqtt_sink_start (GstBaseSink * basesink);
104 : static gboolean gst_mqtt_sink_stop (GstBaseSink * basesink);
105 : static gboolean gst_mqtt_sink_query (GstBaseSink * basesink, GstQuery * query);
106 : static GstFlowReturn
107 : gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * buffer);
108 : static GstFlowReturn
109 : gst_mqtt_sink_render_list (GstBaseSink * basesink, GstBufferList * list);
110 : static gboolean gst_mqtt_sink_event (GstBaseSink * basesink, GstEvent * event);
111 : static gboolean gst_mqtt_sink_set_caps (GstBaseSink * basesink, GstCaps * caps);
112 :
113 : static gboolean gst_mqtt_sink_get_debug (GstMqttSink * self);
114 : static void gst_mqtt_sink_set_debug (GstMqttSink * self, const gboolean flag);
115 : static gchar *gst_mqtt_sink_get_client_id (GstMqttSink * self);
116 : static void gst_mqtt_sink_set_client_id (GstMqttSink * self, const gchar * id);
117 : static gchar *gst_mqtt_sink_get_host_address (GstMqttSink * self);
118 : static void gst_mqtt_sink_set_host_address (GstMqttSink * self,
119 : const gchar * addr);
120 : static gchar *gst_mqtt_sink_get_host_port (GstMqttSink * self);
121 : static void gst_mqtt_sink_set_host_port (GstMqttSink * self,
122 : const gchar * port);
123 : static gchar *gst_mqtt_sink_get_pub_topic (GstMqttSink * self);
124 : static void gst_mqtt_sink_set_pub_topic (GstMqttSink * self,
125 : const gchar * topic);
126 : static gulong gst_mqtt_sink_get_pub_wait_timeout (GstMqttSink * self);
127 : static void gst_mqtt_sink_set_pub_wait_timeout (GstMqttSink * self,
128 : const gulong to);
129 : static gboolean gst_mqtt_sink_get_opt_cleansession (GstMqttSink * self);
130 : static void gst_mqtt_sink_set_opt_cleansession (GstMqttSink * self,
131 : const gboolean val);
132 : static gint gst_mqtt_sink_get_opt_keep_alive_interval (GstMqttSink * self);
133 : static void gst_mqtt_sink_set_opt_keep_alive_interval (GstMqttSink * self,
134 : const gint num);
135 :
136 : static gsize gst_mqtt_sink_get_max_msg_buf_size (GstMqttSink * self);
137 : static void gst_mqtt_sink_set_max_msg_buf_size (GstMqttSink * self,
138 : const gsize size);
139 : static gint gst_mqtt_sink_get_num_buffers (GstMqttSink * self);
140 : static void gst_mqtt_sink_set_num_buffers (GstMqttSink * self, const gint num);
141 : static gint gst_mqtt_sink_get_mqtt_qos (GstMqttSink * self);
142 : static void gst_mqtt_sink_set_mqtt_qos (GstMqttSink * self, const gint qos);
143 : static gboolean gst_mqtt_sink_get_mqtt_ntp_sync (GstMqttSink * self);
144 : static void gst_mqtt_sink_set_mqtt_ntp_sync (GstMqttSink * self,
145 : const gboolean flag);
146 : static gchar *gst_mqtt_sink_get_mqtt_ntp_srvs (GstMqttSink * self);
147 : static void gst_mqtt_sink_set_mqtt_ntp_srvs (GstMqttSink * self,
148 : const gchar * pairs);
149 :
150 : static void cb_mqtt_on_connect (void *context,
151 : MQTTAsync_successData * response);
152 : static void cb_mqtt_on_connect_failure (void *context,
153 : MQTTAsync_failureData * response);
154 : static void cb_mqtt_on_disconnect (void *context,
155 : MQTTAsync_successData * response);
156 : static void cb_mqtt_on_disconnect_failure (void *context,
157 : MQTTAsync_failureData * response);
158 : static void cb_mqtt_on_delivery_complete (void *context, MQTTAsync_token token);
159 : static void cb_mqtt_on_connection_lost (void *context, char *cause);
160 : static int cb_mqtt_on_message_arrived (void *context, char *topicName,
161 : int topicLen, MQTTAsync_message * message);
162 : static void cb_mqtt_on_send_success (void *context,
163 : MQTTAsync_successData * response);
164 : static void cb_mqtt_on_send_failure (void *context,
165 : MQTTAsync_failureData * response);
166 :
167 : /**
168 : * @brief Initialize GstMqttSink object
169 : */
170 : static void
171 10 : gst_mqtt_sink_init (GstMqttSink * self)
172 : {
173 10 : GstBaseSink *basesink = GST_BASE_SINK (self);
174 10 : MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
175 10 : MQTTAsync_responseOptions respn_opts = MQTTAsync_responseOptions_initializer;
176 :
177 : /** init MQTT related variables */
178 10 : self->mqtt_client_handle = NULL;
179 10 : self->mqtt_conn_opts = conn_opts;
180 10 : self->mqtt_conn_opts.onSuccess = cb_mqtt_on_connect;
181 10 : self->mqtt_conn_opts.onFailure = cb_mqtt_on_connect_failure;
182 10 : self->mqtt_conn_opts.context = self;
183 10 : self->mqtt_respn_opts = respn_opts;
184 10 : self->mqtt_respn_opts.onSuccess = cb_mqtt_on_send_success;
185 10 : self->mqtt_respn_opts.onFailure = cb_mqtt_on_send_failure;
186 10 : self->mqtt_respn_opts.context = self;
187 :
188 : /** init private variables */
189 10 : self->mqtt_sink_state = SINK_INITIALIZING;
190 10 : self->err = NULL;
191 10 : self->gquark_err_tag = g_quark_from_string (TAG_ERR_MQTTSINK);
192 10 : g_mutex_init (&self->mqtt_sink_mutex);
193 10 : g_cond_init (&self->mqtt_sink_gcond);
194 10 : self->mqtt_msg_buf = NULL;
195 10 : self->mqtt_msg_buf_size = 0;
196 10 : memset (&self->mqtt_msg_hdr, 0x0, sizeof (self->mqtt_msg_hdr));
197 10 : self->base_time_epoch = GST_CLOCK_TIME_NONE;
198 10 : self->in_caps = NULL;
199 :
200 : /** init mqttsink properties */
201 10 : self->debug = DEFAULT_DEBUG;
202 10 : self->num_buffers = DEFAULT_NUM_BUFFERS;
203 10 : self->max_msg_buf_size = DEFAULT_MAX_MSG_BUF_SIZE;
204 10 : self->mqtt_client_id = g_strdup (DEFAULT_MQTT_CLIENT_ID);
205 10 : self->mqtt_host_address = g_strdup (DEFAULT_MQTT_HOST_ADDRESS);
206 10 : self->mqtt_host_port = g_strdup (DEFAULT_MQTT_HOST_PORT);
207 10 : self->mqtt_topic = g_strdup (DEFAULT_MQTT_PUB_TOPIC);
208 10 : self->mqtt_pub_wait_timeout = DEFAULT_MQTT_PUB_WAIT_TIMEOUT;
209 10 : self->mqtt_conn_opts.cleansession = DEFAULT_MQTT_OPT_CLEANSESSION;
210 10 : self->mqtt_conn_opts.keepAliveInterval = DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL;
211 10 : self->mqtt_qos = DEFAULT_MQTT_QOS;
212 10 : self->mqtt_ntp_sync = DEFAULT_MQTT_NTP_SYNC;
213 10 : self->mqtt_ntp_srvs = g_strdup (DEFAULT_MQTT_NTP_SERVERS);
214 10 : self->mqtt_ntp_hnames = NULL;
215 10 : self->mqtt_ntp_ports = NULL;
216 10 : self->mqtt_ntp_num_srvs = 0;
217 10 : self->get_epoch_func = default_mqtt_get_unix_epoch;
218 10 : self->is_connected = FALSE;
219 :
220 : /** init basesink properties */
221 10 : gst_base_sink_set_qos_enabled (basesink, DEFAULT_QOS);
222 10 : gst_base_sink_set_sync (basesink, DEFAULT_SYNC);
223 10 : }
224 :
225 : /**
226 : * @brief Initialize GstMqttSinkClass object
227 : */
228 : static void
229 22 : gst_mqtt_sink_class_init (GstMqttSinkClass * klass)
230 : {
231 22 : GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
232 22 : GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
233 22 : GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
234 :
235 22 : GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_MQTT_ELEM_NAME_SINK, 0,
236 : "MQTT sink");
237 :
238 22 : gobject_class->set_property = gst_mqtt_sink_set_property;
239 22 : gobject_class->get_property = gst_mqtt_sink_get_property;
240 22 : gobject_class->finalize = gst_mqtt_sink_class_finalize;
241 :
242 22 : g_object_class_install_property (gobject_class, PROP_DEBUG,
243 : g_param_spec_boolean ("debug", "Debug",
244 : "Produce extra verbose output for debug purpose", DEFAULT_DEBUG,
245 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
246 :
247 22 : g_object_class_install_property (gobject_class, PROP_MQTT_CLIENT_ID,
248 : g_param_spec_string ("client-id", "Client ID",
249 : "The client identifier passed to the server (broker).", NULL,
250 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
251 :
252 22 : g_object_class_install_property (gobject_class, PROP_MQTT_HOST_ADDRESS,
253 : g_param_spec_string ("host", "Host", "Host (broker) to connect to",
254 : DEFAULT_MQTT_HOST_ADDRESS,
255 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
256 :
257 22 : g_object_class_install_property (gobject_class, PROP_MQTT_HOST_PORT,
258 : g_param_spec_string ("port", "Port",
259 : "Network port of host (broker) to connect to", DEFAULT_MQTT_HOST_PORT,
260 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
261 :
262 22 : g_object_class_install_property (gobject_class, PROP_MQTT_NTP_SYNC,
263 : g_param_spec_boolean ("ntp-sync", "NTP Synchronization",
264 : "Synchronize received streams to the NTP clock",
265 : DEFAULT_MQTT_NTP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266 :
267 22 : g_object_class_install_property (gobject_class, PROP_MQTT_NTP_SRVS,
268 : g_param_spec_string ("ntp-srvs", "NTP Server Host Name and Port Pairs",
269 : "NTP Servers' HOST_NAME:PORT pairs to use (valid only if ntp-sync is true)\n"
270 : "\t\t\tUse ',' to separate each pair if there are more pairs than one",
271 : DEFAULT_MQTT_NTP_SERVERS,
272 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
273 :
274 22 : g_object_class_install_property (gobject_class, PROP_MQTT_PUB_TOPIC,
275 : g_param_spec_string ("pub-topic", "Topic to Publish",
276 : "The topic's name to publish", NULL,
277 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
278 :
279 22 : g_object_class_install_property (gobject_class,
280 : PROP_MQTT_PUB_WAIT_TIMEOUT,
281 : g_param_spec_ulong ("pub-wait-timeout", "Timeout for Publish a message",
282 : "Timeout for execution of the main thread with completed publication of a message",
283 : 1UL, G_MAXULONG, DEFAULT_MQTT_PUB_WAIT_TIMEOUT,
284 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
285 :
286 22 : g_object_class_install_property (gobject_class, PROP_MQTT_OPT_CLEANSESSION,
287 : g_param_spec_boolean ("cleansession", "Cleansession",
288 : "When it is TRUE, the state information is discarded at connect and disconnect.",
289 : DEFAULT_MQTT_OPT_CLEANSESSION,
290 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
291 :
292 22 : g_object_class_install_property (gobject_class,
293 : PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL,
294 : g_param_spec_int ("keep-alive-interval", "Keep Alive Interval",
295 : "The maximum time (in seconds) that should pass without communication between the client and the server (broker)",
296 : 1, G_MAXINT32, DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL,
297 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
298 :
299 22 : g_object_class_install_property (gobject_class, PROP_MAX_MSG_BUF_SIZE,
300 : g_param_spec_ulong ("max-buffer-size",
301 : "The maximum size of a message buffer",
302 : "The maximum size in bytes of a message buffer (0 = dynamic buffer size)",
303 : 0, G_MAXULONG, DEFAULT_MAX_MSG_BUF_SIZE,
304 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
305 :
306 22 : g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
307 : g_param_spec_int ("num-buffers", "Num Buffers",
308 : "Number of (remaining) buffers to accept until sending EOS event (-1 = no limit)",
309 : -1, G_MAXINT32, DEFAULT_NUM_BUFFERS,
310 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
311 :
312 22 : g_object_class_install_property (gobject_class, PROP_MQTT_QOS,
313 : g_param_spec_int ("mqtt-qos", "mqtt QoS level",
314 : "The QoS level of MQTT.\n"
315 : "\t\t\t 0: At most once\n"
316 : "\t\t\t 1: At least once\n"
317 : "\t\t\t 2: Exactly once\n"
318 : "\t\t\tsee also: https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/qos.html",
319 : 0, 2, DEFAULT_MQTT_QOS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
320 :
321 22 : gstelement_class->change_state = gst_mqtt_sink_change_state;
322 :
323 22 : gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_mqtt_sink_start);
324 22 : gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_mqtt_sink_stop);
325 22 : gstbasesink_class->query = GST_DEBUG_FUNCPTR (gst_mqtt_sink_query);
326 22 : gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_mqtt_sink_render);
327 22 : gstbasesink_class->render_list =
328 22 : GST_DEBUG_FUNCPTR (gst_mqtt_sink_render_list);
329 22 : gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_mqtt_sink_event);
330 22 : gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_mqtt_sink_set_caps);
331 :
332 22 : gst_element_class_set_static_metadata (gstelement_class,
333 : "MQTT sink", "Sink/MQTT",
334 : "Publish incoming data streams as a MQTT topic",
335 : "Wook Song <wook16.song@samsung.com>");
336 22 : gst_element_class_add_static_pad_template (gstelement_class,
337 : &sink_pad_template);
338 22 : }
339 :
340 : /**
341 : * @brief The setter for the mqttsink's properties
342 : */
343 : static void
344 26 : gst_mqtt_sink_set_property (GObject * object, guint prop_id,
345 : const GValue * value, GParamSpec * pspec)
346 : {
347 26 : GstMqttSink *self = GST_MQTT_SINK (object);
348 :
349 26 : switch (prop_id) {
350 7 : case PROP_DEBUG:
351 7 : gst_mqtt_sink_set_debug (self, g_value_get_boolean (value));
352 7 : break;
353 1 : case PROP_MQTT_CLIENT_ID:
354 1 : gst_mqtt_sink_set_client_id (self, g_value_get_string (value));
355 1 : break;
356 2 : case PROP_MQTT_HOST_ADDRESS:
357 2 : gst_mqtt_sink_set_host_address (self, g_value_get_string (value));
358 2 : break;
359 2 : case PROP_MQTT_HOST_PORT:
360 2 : gst_mqtt_sink_set_host_port (self, g_value_get_string (value));
361 2 : break;
362 3 : case PROP_MQTT_PUB_TOPIC:
363 3 : gst_mqtt_sink_set_pub_topic (self, g_value_get_string (value));
364 3 : break;
365 1 : case PROP_MQTT_PUB_WAIT_TIMEOUT:
366 1 : gst_mqtt_sink_set_pub_wait_timeout (self, g_value_get_ulong (value));
367 1 : break;
368 1 : case PROP_MQTT_OPT_CLEANSESSION:
369 1 : gst_mqtt_sink_set_opt_cleansession (self, g_value_get_boolean (value));
370 1 : break;
371 1 : case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
372 1 : gst_mqtt_sink_set_opt_keep_alive_interval (self, g_value_get_int (value));
373 1 : break;
374 1 : case PROP_MAX_MSG_BUF_SIZE:
375 1 : gst_mqtt_sink_set_max_msg_buf_size (self, g_value_get_ulong (value));
376 1 : break;
377 3 : case PROP_NUM_BUFFERS:
378 3 : gst_mqtt_sink_set_num_buffers (self, g_value_get_int (value));
379 3 : break;
380 1 : case PROP_MQTT_QOS:
381 1 : gst_mqtt_sink_set_mqtt_qos (self, g_value_get_int (value));
382 1 : break;
383 2 : case PROP_MQTT_NTP_SYNC:
384 2 : gst_mqtt_sink_set_mqtt_ntp_sync (self, g_value_get_boolean (value));
385 2 : break;
386 1 : case PROP_MQTT_NTP_SRVS:
387 1 : gst_mqtt_sink_set_mqtt_ntp_srvs (self, g_value_get_string (value));
388 1 : break;
389 0 : default:
390 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
391 0 : break;
392 : }
393 26 : }
394 :
395 : /**
396 : * @brief The getter for the mqttsink's properties
397 : */
398 : static void
399 18 : gst_mqtt_sink_get_property (GObject * object, guint prop_id,
400 : GValue * value, GParamSpec * pspec)
401 : {
402 18 : GstMqttSink *self = GST_MQTT_SINK (object);
403 :
404 18 : switch (prop_id) {
405 2 : case PROP_DEBUG:
406 2 : g_value_set_boolean (value, gst_mqtt_sink_get_debug (self));
407 2 : break;
408 1 : case PROP_MQTT_CLIENT_ID:
409 1 : g_value_set_string (value, gst_mqtt_sink_get_client_id (self));
410 1 : break;
411 1 : case PROP_MQTT_HOST_ADDRESS:
412 1 : g_value_set_string (value, gst_mqtt_sink_get_host_address (self));
413 1 : break;
414 1 : case PROP_MQTT_HOST_PORT:
415 1 : g_value_set_string (value, gst_mqtt_sink_get_host_port (self));
416 1 : break;
417 1 : case PROP_MQTT_PUB_TOPIC:
418 1 : g_value_set_string (value, gst_mqtt_sink_get_pub_topic (self));
419 1 : break;
420 2 : case PROP_MQTT_PUB_WAIT_TIMEOUT:
421 2 : g_value_set_ulong (value, gst_mqtt_sink_get_pub_wait_timeout (self));
422 2 : break;
423 1 : case PROP_MQTT_OPT_CLEANSESSION:
424 1 : g_value_set_boolean (value, gst_mqtt_sink_get_opt_cleansession (self));
425 1 : break;
426 2 : case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
427 2 : g_value_set_int (value, gst_mqtt_sink_get_opt_keep_alive_interval (self));
428 2 : break;
429 1 : case PROP_MAX_MSG_BUF_SIZE:
430 1 : g_value_set_ulong (value, gst_mqtt_sink_get_max_msg_buf_size (self));
431 1 : break;
432 2 : case PROP_NUM_BUFFERS:
433 2 : g_value_set_int (value, gst_mqtt_sink_get_num_buffers (self));
434 2 : break;
435 2 : case PROP_MQTT_QOS:
436 2 : g_value_set_int (value, gst_mqtt_sink_get_mqtt_qos (self));
437 2 : break;
438 1 : case PROP_MQTT_NTP_SYNC:
439 1 : g_value_set_boolean (value, gst_mqtt_sink_get_mqtt_ntp_sync (self));
440 1 : break;
441 1 : case PROP_MQTT_NTP_SRVS:
442 1 : g_value_set_string (value, gst_mqtt_sink_get_mqtt_ntp_srvs (self));
443 1 : break;
444 0 : default:
445 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
446 0 : break;
447 : }
448 18 : }
449 :
450 : /**
451 : * @brief Finalize GstMqttSinkClass object
452 : */
453 : static void
454 10 : gst_mqtt_sink_class_finalize (GObject * object)
455 : {
456 10 : GstMqttSink *self = GST_MQTT_SINK (object);
457 :
458 10 : g_free (self->mqtt_host_address);
459 10 : self->mqtt_host_address = NULL;
460 10 : g_free (self->mqtt_host_port);
461 10 : self->mqtt_host_port = NULL;
462 10 : if (self->mqtt_client_handle) {
463 0 : MQTTAsync_destroy (&self->mqtt_client_handle);
464 0 : self->mqtt_client_handle = NULL;
465 : }
466 10 : g_free (self->mqtt_client_id);
467 10 : self->mqtt_client_id = NULL;
468 10 : g_free (self->mqtt_msg_buf);
469 10 : self->mqtt_msg_buf = NULL;
470 10 : g_free (self->mqtt_topic);
471 10 : self->mqtt_topic = NULL;
472 10 : gst_caps_replace (&self->in_caps, NULL);
473 10 : g_free (self->mqtt_ntp_srvs);
474 10 : self->mqtt_ntp_srvs = NULL;
475 10 : self->mqtt_ntp_num_srvs = 0;
476 10 : g_strfreev (self->mqtt_ntp_hnames);
477 10 : self->mqtt_ntp_hnames = NULL;
478 10 : g_free (self->mqtt_ntp_ports);
479 10 : self->mqtt_ntp_ports = NULL;
480 :
481 10 : if (self->err)
482 0 : g_error_free (self->err);
483 10 : g_mutex_clear (&self->mqtt_sink_mutex);
484 10 : G_OBJECT_CLASS (parent_class)->finalize (object);
485 10 : }
486 :
487 : /**
488 : * @brief Handle mqttsink's state change
489 : */
490 : static GstStateChangeReturn
491 52 : gst_mqtt_sink_change_state (GstElement * element, GstStateChange transition)
492 : {
493 52 : GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
494 52 : GstMqttSink *self = GST_MQTT_SINK (element);
495 : GstClock *elem_clock;
496 : GstClockTime base_time;
497 : GstClockTime cur_time;
498 : GstClockTimeDiff diff;
499 :
500 52 : switch (transition) {
501 10 : case GST_STATE_CHANGE_NULL_TO_READY:
502 10 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_NULL_TO_READY");
503 10 : if (self->err) {
504 0 : g_printerr ("%s: %s\n", g_quark_to_string (self->err->domain),
505 0 : self->err->message);
506 0 : return GST_STATE_CHANGE_FAILURE;
507 : }
508 10 : break;
509 8 : case GST_STATE_CHANGE_READY_TO_PAUSED:
510 8 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_PAUSED");
511 8 : break;
512 8 : case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
513 8 : if (self->mqtt_ntp_sync)
514 0 : self->get_epoch_func = ntputil_get_epoch;
515 8 : self->base_time_epoch = GST_CLOCK_TIME_NONE;
516 8 : elem_clock = gst_element_get_clock (element);
517 8 : if (!elem_clock)
518 0 : break;
519 8 : base_time = gst_element_get_base_time (element);
520 8 : cur_time = gst_clock_get_time (elem_clock);
521 8 : gst_object_unref (elem_clock);
522 8 : diff = GST_CLOCK_DIFF (base_time, cur_time);
523 8 : self->base_time_epoch =
524 16 : self->get_epoch_func (self->mqtt_ntp_num_srvs, self->mqtt_ntp_hnames,
525 8 : self->mqtt_ntp_ports) * GST_US_TO_NS_MULTIPLIER - diff;
526 8 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_PLAYING");
527 8 : break;
528 26 : default:
529 26 : break;
530 : }
531 :
532 52 : ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
533 :
534 52 : switch (transition) {
535 8 : case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
536 8 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PLAYING_TO_PAUSED");
537 8 : break;
538 8 : case GST_STATE_CHANGE_PAUSED_TO_READY:
539 8 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_READY");
540 8 : break;
541 8 : case GST_STATE_CHANGE_READY_TO_NULL:
542 8 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_NULL");
543 : default:
544 36 : break;
545 : }
546 :
547 52 : return ret;
548 : }
549 :
550 : /**
551 : * @brief Start mqttsink, called when state changed null to ready
552 : */
553 : static gboolean
554 10 : gst_mqtt_sink_start (GstBaseSink * basesink)
555 : {
556 10 : GstMqttSink *self = GST_MQTT_SINK (basesink);
557 10 : gchar *haddr = g_strdup_printf ("%s:%s", self->mqtt_host_address,
558 : self->mqtt_host_port);
559 : int ret;
560 : gint64 end_time;
561 :
562 10 : if (!g_strcmp0 (DEFAULT_MQTT_CLIENT_ID, self->mqtt_client_id)) {
563 10 : g_free (self->mqtt_client_id);
564 10 : self->mqtt_client_id = g_strdup_printf (DEFAULT_MQTT_CLIENT_ID_FORMAT,
565 10 : g_get_host_name (), getpid (), sink_client_id++);
566 : }
567 :
568 10 : if (!g_strcmp0 (DEFAULT_MQTT_PUB_TOPIC, self->mqtt_topic)) {
569 8 : self->mqtt_topic = g_strdup_printf (DEFAULT_MQTT_PUB_TOPIC_FORMAT,
570 : self->mqtt_client_id);
571 : }
572 :
573 : /**
574 : * @todo Support other persistence mechanisms
575 : * MQTTCLIENT_PERSISTENCE_NONE: A memory-based persistence mechanism
576 : * MQTTCLIENT_PERSISTENCE_DEFAULT: The default file system-based
577 : * persistence mechanism
578 : * MQTTCLIENT_PERSISTENCE_USER: An application-specific persistence
579 : * mechanism
580 : */
581 20 : ret = MQTTAsync_create (&self->mqtt_client_handle, haddr,
582 10 : self->mqtt_client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
583 10 : g_free (haddr);
584 10 : if (ret != MQTTASYNC_SUCCESS)
585 0 : return FALSE;
586 :
587 10 : MQTTAsync_setCallbacks (self->mqtt_client_handle, self,
588 : cb_mqtt_on_connection_lost, cb_mqtt_on_message_arrived,
589 : cb_mqtt_on_delivery_complete);
590 :
591 10 : ret = MQTTAsync_connect (self->mqtt_client_handle, &self->mqtt_conn_opts);
592 10 : if (ret != MQTTASYNC_SUCCESS) {
593 0 : goto error;
594 : }
595 :
596 : /* Waiting for the connection */
597 10 : end_time = g_get_monotonic_time () +
598 : DEFAULT_MQTT_CONN_TIMEOUT_SEC * G_TIME_SPAN_SECOND;
599 10 : g_mutex_lock (&self->mqtt_sink_mutex);
600 12 : while (!self->is_connected) {
601 4 : if (!g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
602 : end_time)) {
603 2 : g_mutex_unlock (&self->mqtt_sink_mutex);
604 2 : g_critical ("Failed to connect to MQTT broker from mqttsink."
605 : "Please check broker is running status or broker host address.");
606 2 : goto error;
607 : }
608 : }
609 8 : g_mutex_unlock (&self->mqtt_sink_mutex);
610 :
611 8 : return TRUE;
612 :
613 2 : error:
614 2 : MQTTAsync_destroy (&self->mqtt_client_handle);
615 2 : self->mqtt_client_handle = NULL;
616 2 : return FALSE;
617 : }
618 :
619 : /**
620 : * @brief Stop mqttsink, called when state changed ready to null
621 : */
622 : static gboolean
623 8 : gst_mqtt_sink_stop (GstBaseSink * basesink)
624 : {
625 8 : GstMqttSink *self = GST_MQTT_SINK (basesink);
626 8 : MQTTAsync_disconnectOptions disconn_opts =
627 : MQTTAsync_disconnectOptions_initializer;
628 :
629 8 : disconn_opts.timeout = DEFAULT_MQTT_DISCONNECT_TIMEOUT;
630 8 : disconn_opts.onSuccess = cb_mqtt_on_disconnect;
631 8 : disconn_opts.onFailure = cb_mqtt_on_disconnect_failure;
632 8 : disconn_opts.context = self;
633 :
634 8 : g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_STOPPED);
635 8 : while (MQTTAsync_isConnected (self->mqtt_client_handle)) {
636 8 : gint64 end_time = g_get_monotonic_time () + DEFAULT_MQTT_DISCONNECT_TIMEOUT;
637 : mqtt_sink_state_t cur_state;
638 :
639 8 : MQTTAsync_disconnect (self->mqtt_client_handle, &disconn_opts);
640 8 : g_mutex_lock (&self->mqtt_sink_mutex);
641 8 : self->is_connected = FALSE;
642 8 : g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
643 : end_time);
644 8 : g_mutex_unlock (&self->mqtt_sink_mutex);
645 8 : cur_state = g_atomic_int_get (&self->mqtt_sink_state);
646 :
647 8 : if ((cur_state == MQTT_DISCONNECTED) ||
648 0 : (cur_state == MQTT_DISCONNECT_FAILED) ||
649 0 : (cur_state == SINK_RENDER_EOS) || (cur_state == SINK_RENDER_ERROR))
650 : break;
651 : }
652 8 : MQTTAsync_destroy (&self->mqtt_client_handle);
653 8 : self->mqtt_client_handle = NULL;
654 8 : return TRUE;
655 : }
656 :
657 : /**
658 : * @brief Perform queries on the element
659 : */
660 : static gboolean
661 18 : gst_mqtt_sink_query (GstBaseSink * basesink, GstQuery * query)
662 : {
663 18 : gboolean ret = FALSE;
664 :
665 18 : switch (GST_QUERY_TYPE (query)) {
666 0 : case GST_QUERY_SEEKING:{
667 : GstFormat fmt;
668 :
669 : /* GST_QUERY_SEEKING is not supported */
670 0 : gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
671 0 : gst_query_set_seeking (query, fmt, FALSE, 0, -1);
672 0 : ret = TRUE;
673 0 : break;
674 : }
675 18 : default:{
676 18 : ret = GST_BASE_SINK_CLASS (parent_class)->query (basesink, query);
677 18 : break;
678 : }
679 : }
680 :
681 18 : return ret;
682 : }
683 :
684 : /**
685 : * @brief A utility function to set the timestamp information onto the given buffer
686 : */
687 : static void
688 24 : _put_timestamp_to_msg_buf_hdr (GstMqttSink * self, GstBuffer * gst_buf,
689 : GstMQTTMessageHdr * hdr)
690 : {
691 24 : hdr->base_time_epoch = self->base_time_epoch;
692 48 : hdr->sent_time_epoch = self->get_epoch_func (self->mqtt_ntp_num_srvs,
693 24 : self->mqtt_ntp_hnames, self->mqtt_ntp_ports) * GST_US_TO_NS_MULTIPLIER;
694 :
695 24 : hdr->duration = GST_BUFFER_DURATION_IS_VALID (gst_buf) ?
696 24 : GST_BUFFER_DURATION (gst_buf) : GST_CLOCK_TIME_NONE;
697 :
698 24 : hdr->dts = GST_BUFFER_DTS_IS_VALID (gst_buf) ?
699 24 : GST_BUFFER_DTS (gst_buf) : GST_CLOCK_TIME_NONE;
700 :
701 24 : hdr->pts = GST_BUFFER_PTS_IS_VALID (gst_buf) ?
702 24 : GST_BUFFER_PTS (gst_buf) : GST_CLOCK_TIME_NONE;
703 :
704 24 : if (self->debug) {
705 24 : GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
706 : GstClock *clock;
707 :
708 24 : clock = gst_element_get_clock (GST_ELEMENT (self));
709 :
710 24 : GST_DEBUG_OBJECT (self,
711 : "%s now %" GST_TIME_FORMAT " ts %" GST_TIME_FORMAT " sent %"
712 : GST_TIME_FORMAT, self->mqtt_topic,
713 : GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
714 : GST_TIME_ARGS (hdr->pts),
715 : GST_TIME_ARGS (hdr->sent_time_epoch - hdr->base_time_epoch));
716 :
717 24 : gst_object_unref (clock);
718 : }
719 24 : }
720 :
721 : /**
722 : * @brief A utility function to set the message header
723 : */
724 : static gboolean
725 24 : _mqtt_set_msg_buf_hdr (GstBuffer * gst_buf, GstMQTTMessageHdr * hdr)
726 : {
727 24 : gboolean ret = TRUE;
728 : guint i;
729 :
730 24 : hdr->num_mems = gst_buffer_n_memory (gst_buf);
731 47 : for (i = 0; i < hdr->num_mems; ++i) {
732 : GstMemory *each_mem;
733 :
734 23 : each_mem = gst_buffer_peek_memory (gst_buf, i);
735 23 : if (!each_mem) {
736 0 : memset (hdr, 0x0, sizeof (*hdr));
737 0 : ret = FALSE;
738 0 : break;
739 : }
740 :
741 23 : hdr->size_mems[i] = each_mem->size;
742 : }
743 :
744 24 : return ret;
745 : }
746 :
747 : /**
748 : * @brief The callback to process each buffer receiving on the sink pad
749 : */
750 : static GstFlowReturn
751 25 : gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * in_buf)
752 : {
753 25 : const gsize in_buf_size = gst_buffer_get_size (in_buf);
754 : static gboolean is_static_sized_buf = FALSE;
755 25 : GstMqttSink *self = GST_MQTT_SINK (basesink);
756 25 : GstFlowReturn ret = GST_FLOW_ERROR;
757 : mqtt_sink_state_t cur_state;
758 : GstMemory *in_buf_mem;
759 : GstMapInfo in_buf_map;
760 : gint mqtt_rc;
761 : guint8 *msg_pub;
762 :
763 25 : while ((cur_state =
764 25 : g_atomic_int_get (&self->mqtt_sink_state)) != MQTT_CONNECTED) {
765 0 : gint64 end_time = g_get_monotonic_time ();
766 : mqtt_sink_state_t _state;
767 :
768 0 : end_time += (self->mqtt_pub_wait_timeout * G_TIME_SPAN_SECOND);
769 0 : g_mutex_lock (&self->mqtt_sink_mutex);
770 0 : g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
771 : end_time);
772 0 : g_mutex_unlock (&self->mqtt_sink_mutex);
773 :
774 0 : _state = g_atomic_int_get (&self->mqtt_sink_state);
775 0 : switch (_state) {
776 0 : case MQTT_CONNECT_FAILURE:
777 : case MQTT_DISCONNECTED:
778 : case MQTT_CONNECTION_LOST:
779 : case SINK_RENDER_ERROR:
780 0 : ret = GST_FLOW_ERROR;
781 0 : break;
782 0 : case SINK_RENDER_EOS:
783 0 : ret = GST_FLOW_EOS;
784 0 : break;
785 0 : default:
786 0 : continue;
787 : }
788 0 : goto ret_with;
789 : }
790 :
791 25 : if (self->num_buffers == 0) {
792 1 : ret = GST_FLOW_EOS;
793 1 : goto ret_with;
794 : }
795 :
796 24 : if (self->num_buffers != -1) {
797 20 : self->num_buffers -= 1;
798 : }
799 :
800 24 : if ((!is_static_sized_buf) && (self->mqtt_msg_buf) &&
801 18 : (self->mqtt_msg_buf_size != 0) &&
802 18 : (self->mqtt_msg_buf_size < in_buf_size + GST_MQTT_LEN_MSG_HDR)) {
803 0 : g_free (self->mqtt_msg_buf);
804 0 : self->mqtt_msg_buf = NULL;
805 0 : self->mqtt_msg_buf_size = 0;
806 : }
807 :
808 : /** Allocate a message buffer */
809 24 : if ((!self->mqtt_msg_buf) && (self->mqtt_msg_buf_size == 0)) {
810 6 : if (self->max_msg_buf_size == 0) {
811 6 : self->mqtt_msg_buf_size = in_buf_size + GST_MQTT_LEN_MSG_HDR;
812 : } else {
813 0 : if (self->max_msg_buf_size < in_buf_size) {
814 0 : g_printerr ("%s: The given size for a message buffer is too small: "
815 : "given (%" G_GSIZE_FORMAT " bytes) vs. incoming (%" G_GSIZE_FORMAT
816 : " bytes)\n", TAG_ERR_MQTTSINK, self->max_msg_buf_size, in_buf_size);
817 0 : ret = GST_FLOW_ERROR;
818 0 : goto ret_with;
819 : }
820 0 : self->mqtt_msg_buf_size = self->max_msg_buf_size + GST_MQTT_LEN_MSG_HDR;
821 0 : is_static_sized_buf = TRUE;
822 : }
823 :
824 6 : self->mqtt_msg_buf = g_try_malloc0 (self->mqtt_msg_buf_size);
825 : }
826 :
827 24 : if (!_mqtt_set_msg_buf_hdr (in_buf, &self->mqtt_msg_hdr)) {
828 0 : ret = GST_FLOW_ERROR;
829 0 : goto ret_with;
830 : }
831 :
832 24 : msg_pub = self->mqtt_msg_buf;
833 24 : if (!msg_pub) {
834 0 : self->mqtt_msg_buf_size = 0;
835 0 : ret = GST_FLOW_ERROR;
836 0 : goto ret_with;
837 : }
838 24 : memcpy (msg_pub, &self->mqtt_msg_hdr, sizeof (self->mqtt_msg_hdr));
839 24 : _put_timestamp_to_msg_buf_hdr (self, in_buf, (GstMQTTMessageHdr *) msg_pub);
840 :
841 24 : in_buf_mem = gst_buffer_get_all_memory (in_buf);
842 24 : if (!in_buf_mem) {
843 1 : ret = GST_FLOW_ERROR;
844 1 : goto ret_with;
845 : }
846 :
847 23 : if (!gst_memory_map (in_buf_mem, &in_buf_map, GST_MAP_READ)) {
848 0 : ret = GST_FLOW_ERROR;
849 0 : goto ret_unref_in_buf_mem;
850 : }
851 :
852 23 : ret = GST_FLOW_OK;
853 :
854 23 : memcpy (&msg_pub[sizeof (self->mqtt_msg_hdr)], in_buf_map.data,
855 : in_buf_map.size);
856 46 : mqtt_rc = MQTTAsync_send (self->mqtt_client_handle, self->mqtt_topic,
857 23 : GST_MQTT_LEN_MSG_HDR + in_buf_map.size, self->mqtt_msg_buf,
858 : self->mqtt_qos, 1, &self->mqtt_respn_opts);
859 23 : if (mqtt_rc != MQTTASYNC_SUCCESS) {
860 1 : ret = GST_FLOW_ERROR;
861 : }
862 :
863 23 : gst_memory_unmap (in_buf_mem, &in_buf_map);
864 :
865 23 : ret_unref_in_buf_mem:
866 23 : gst_memory_unref (in_buf_mem);
867 :
868 25 : ret_with:
869 25 : return ret;
870 : }
871 :
872 : /**
873 : * @brief The callback to process GstBufferList (instead of a single buffer)
874 : * on the sink pad
875 : */
876 : static GstFlowReturn
877 0 : gst_mqtt_sink_render_list (GstBaseSink * basesink, GstBufferList * list)
878 : {
879 0 : guint num_buffers = gst_buffer_list_length (list);
880 : GstFlowReturn ret;
881 : GstBuffer *buffer;
882 : guint i;
883 :
884 0 : for (i = 0; i < num_buffers; ++i) {
885 0 : buffer = gst_buffer_list_get (list, i);
886 0 : ret = gst_mqtt_sink_render (basesink, buffer);
887 0 : if (ret != GST_FLOW_OK)
888 0 : break;
889 : }
890 :
891 0 : return ret;
892 : }
893 :
894 : /**
895 : * @brief Handle events arriving on the sink pad
896 : */
897 : static gboolean
898 12 : gst_mqtt_sink_event (GstBaseSink * basesink, GstEvent * event)
899 : {
900 12 : GstMqttSink *self = GST_MQTT_SINK (basesink);
901 12 : GstEventType type = GST_EVENT_TYPE (event);
902 12 : gboolean ret = FALSE;
903 :
904 12 : switch (type) {
905 0 : case GST_EVENT_EOS:
906 0 : g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_EOS);
907 0 : g_mutex_lock (&self->mqtt_sink_mutex);
908 0 : g_cond_broadcast (&self->mqtt_sink_gcond);
909 0 : g_mutex_unlock (&self->mqtt_sink_mutex);
910 0 : break;
911 12 : default:
912 12 : break;
913 : }
914 :
915 12 : ret = GST_BASE_SINK_CLASS (parent_class)->event (basesink, event);
916 :
917 12 : return ret;
918 : }
919 :
920 : /**
921 : * @brief An implementation of the set_caps vmethod in GstBaseSinkClass
922 : */
923 : static gboolean
924 3 : gst_mqtt_sink_set_caps (GstBaseSink * basesink, GstCaps * caps)
925 : {
926 3 : GstMqttSink *self = GST_MQTT_SINK (basesink);
927 : gboolean ret;
928 :
929 3 : ret = gst_caps_replace (&self->in_caps, caps);
930 :
931 3 : if (ret && gst_caps_is_fixed (self->in_caps)) {
932 3 : gchar *caps_str = gst_caps_to_string (caps);
933 : gsize len;
934 :
935 3 : if (caps_str == NULL) {
936 0 : g_critical ("Fail to convert caps to string representation");
937 0 : return FALSE;
938 : }
939 :
940 3 : len = g_strlcpy (self->mqtt_msg_hdr.gst_caps_str, caps_str,
941 : GST_MQTT_MAX_LEN_GST_CAPS_STR);
942 :
943 3 : if (len >= GST_MQTT_MAX_LEN_GST_CAPS_STR) {
944 0 : g_critical ("Fail to copy caps_str.");
945 0 : ret = FALSE;
946 : }
947 :
948 3 : g_free (caps_str);
949 : }
950 :
951 3 : return ret;
952 : }
953 :
954 : /**
955 : * @brief Getter for the 'debug' property.
956 : */
957 : static gboolean
958 2 : gst_mqtt_sink_get_debug (GstMqttSink * self)
959 : {
960 2 : return self->debug;
961 : }
962 :
963 : /**
964 : * @brief Setter for the 'debug' property.
965 : */
966 : static void
967 7 : gst_mqtt_sink_set_debug (GstMqttSink * self, const gboolean flag)
968 : {
969 7 : self->debug = flag;
970 7 : }
971 :
972 : /**
973 : * @brief Getter for the 'client-id' property.
974 : */
975 : static gchar *
976 1 : gst_mqtt_sink_get_client_id (GstMqttSink * self)
977 : {
978 1 : return self->mqtt_client_id;
979 : }
980 :
981 : /**
982 : * @brief Setter for the 'client-id' property.
983 : */
984 : static void
985 1 : gst_mqtt_sink_set_client_id (GstMqttSink * self, const gchar * id)
986 : {
987 1 : g_free (self->mqtt_client_id);
988 1 : self->mqtt_client_id = g_strdup (id);
989 1 : }
990 :
991 : /**
992 : * @brief Getter for the 'host' property.
993 : */
994 : static gchar *
995 1 : gst_mqtt_sink_get_host_address (GstMqttSink * self)
996 : {
997 1 : return self->mqtt_host_address;
998 : }
999 :
1000 : /**
1001 : * @brief Setter for the 'host' property
1002 : */
1003 : static void
1004 2 : gst_mqtt_sink_set_host_address (GstMqttSink * self, const gchar * addr)
1005 : {
1006 : /**
1007 : * @todo Handle the case where the addr is changed at runtime
1008 : */
1009 2 : g_free (self->mqtt_host_address);
1010 2 : self->mqtt_host_address = g_strdup (addr);
1011 2 : }
1012 :
1013 : /**
1014 : * @brief Getter for the 'port' property.
1015 : */
1016 : static gchar *
1017 1 : gst_mqtt_sink_get_host_port (GstMqttSink * self)
1018 : {
1019 1 : return self->mqtt_host_port;
1020 : }
1021 :
1022 : /**
1023 : * @brief Setter for the 'port' property
1024 : */
1025 : static void
1026 2 : gst_mqtt_sink_set_host_port (GstMqttSink * self, const gchar * port)
1027 : {
1028 2 : g_free (self->mqtt_host_port);
1029 2 : self->mqtt_host_port = g_strdup (port);
1030 2 : }
1031 :
1032 : /**
1033 : * @brief Getter for the 'pub-topic' property
1034 : */
1035 : static gchar *
1036 1 : gst_mqtt_sink_get_pub_topic (GstMqttSink * self)
1037 : {
1038 1 : return self->mqtt_topic;
1039 : }
1040 :
1041 : /**
1042 : * @brief Setter for the 'pub-topic' property
1043 : */
1044 : static void
1045 3 : gst_mqtt_sink_set_pub_topic (GstMqttSink * self, const gchar * topic)
1046 : {
1047 3 : g_free (self->mqtt_topic);
1048 3 : self->mqtt_topic = g_strdup (topic);
1049 3 : }
1050 :
1051 : /**
1052 : * @brief Getter for the 'cleansession' property.
1053 : */
1054 : static gboolean
1055 1 : gst_mqtt_sink_get_opt_cleansession (GstMqttSink * self)
1056 : {
1057 1 : return self->mqtt_conn_opts.cleansession;
1058 : }
1059 :
1060 : /**
1061 : * @brief Setter for the 'cleansession' property.
1062 : */
1063 : static void
1064 1 : gst_mqtt_sink_set_opt_cleansession (GstMqttSink * self, const gboolean val)
1065 : {
1066 1 : self->mqtt_conn_opts.cleansession = val;
1067 1 : }
1068 :
1069 : /**
1070 : * @brief Getter for the 'pub-wait-timeout' property.
1071 : */
1072 : static gulong
1073 2 : gst_mqtt_sink_get_pub_wait_timeout (GstMqttSink * self)
1074 : {
1075 2 : return self->mqtt_pub_wait_timeout;
1076 : }
1077 :
1078 : /**
1079 : * @brief Setter for the 'pub-wait-timeout' property.
1080 : */
1081 : static void
1082 1 : gst_mqtt_sink_set_pub_wait_timeout (GstMqttSink * self, const gulong to)
1083 : {
1084 1 : self->mqtt_pub_wait_timeout = to;
1085 1 : }
1086 :
1087 : /**
1088 : * @brief Getter for the 'keep-alive-interval' property
1089 : */
1090 : static gint
1091 2 : gst_mqtt_sink_get_opt_keep_alive_interval (GstMqttSink * self)
1092 : {
1093 2 : return self->mqtt_conn_opts.keepAliveInterval;
1094 : }
1095 :
1096 : /**
1097 : * @brief Setter for the 'keep-alive-interval' property
1098 : */
1099 : static void
1100 1 : gst_mqtt_sink_set_opt_keep_alive_interval (GstMqttSink * self, const gint num)
1101 : {
1102 1 : self->mqtt_conn_opts.keepAliveInterval = num;
1103 1 : }
1104 :
1105 : /**
1106 : * @brief Getter for the 'max-buffer-size' property.
1107 : */
1108 : static gsize
1109 1 : gst_mqtt_sink_get_max_msg_buf_size (GstMqttSink * self)
1110 : {
1111 1 : return self->max_msg_buf_size;
1112 : }
1113 :
1114 : /**
1115 : * @brief Setter for the 'max-buffer-size' property.
1116 : */
1117 : static void
1118 1 : gst_mqtt_sink_set_max_msg_buf_size (GstMqttSink * self, const gsize size)
1119 : {
1120 1 : self->max_msg_buf_size = size;
1121 1 : }
1122 :
1123 : /**
1124 : * @brief Getter for the 'num-buffers' property.
1125 : */
1126 : static gint
1127 2 : gst_mqtt_sink_get_num_buffers (GstMqttSink * self)
1128 : {
1129 : gint num_buffers;
1130 :
1131 2 : num_buffers = self->num_buffers;
1132 :
1133 2 : return num_buffers;
1134 : }
1135 :
1136 : /**
1137 : * @brief Setter for the 'num-buffers' property
1138 : */
1139 : static void
1140 3 : gst_mqtt_sink_set_num_buffers (GstMqttSink * self, const gint num)
1141 : {
1142 3 : self->num_buffers = num;
1143 3 : }
1144 :
1145 : /**
1146 : * @brief Getter for the 'mqtt-qos' property.
1147 : */
1148 : static gint
1149 2 : gst_mqtt_sink_get_mqtt_qos (GstMqttSink * self)
1150 : {
1151 2 : return self->mqtt_qos;
1152 : }
1153 :
1154 : /**
1155 : * @brief Setter for the 'mqtt-qos' property
1156 : */
1157 : static void
1158 1 : gst_mqtt_sink_set_mqtt_qos (GstMqttSink * self, const gint qos)
1159 : {
1160 1 : self->mqtt_qos = qos;
1161 1 : }
1162 :
1163 : /**
1164 : * @brief Getter for the 'ntp-sync' property.
1165 : */
1166 : static gboolean
1167 1 : gst_mqtt_sink_get_mqtt_ntp_sync (GstMqttSink * self)
1168 : {
1169 1 : return self->mqtt_ntp_sync;
1170 : }
1171 :
1172 : /**
1173 : * @brief Setter for the 'ntp-sync' property
1174 : */
1175 : static void
1176 2 : gst_mqtt_sink_set_mqtt_ntp_sync (GstMqttSink * self, const gboolean flag)
1177 : {
1178 2 : self->mqtt_ntp_sync = flag;
1179 2 : }
1180 :
1181 : /**
1182 : * @brief Getter for the 'ntp-srvs' property.
1183 : */
1184 : static gchar *
1185 1 : gst_mqtt_sink_get_mqtt_ntp_srvs (GstMqttSink * self)
1186 : {
1187 1 : return self->mqtt_ntp_srvs;
1188 : }
1189 :
1190 : /**
1191 : * @brief Setter for the 'ntp-srvs' property
1192 : */
1193 : static void
1194 1 : gst_mqtt_sink_set_mqtt_ntp_srvs (GstMqttSink * self, const gchar * pairs)
1195 : {
1196 1 : gchar **pair_arrs = NULL;
1197 1 : guint hnum = 0;
1198 : gchar *pair;
1199 : guint i, j;
1200 :
1201 1 : if (g_strcmp0 (self->mqtt_ntp_srvs, pairs) == 0)
1202 0 : return;
1203 :
1204 1 : g_free (self->mqtt_ntp_srvs);
1205 1 : self->mqtt_ntp_srvs = g_strdup (pairs);
1206 :
1207 1 : pair_arrs = g_strsplit (pairs, ",", -1);
1208 1 : if (pair_arrs == NULL)
1209 0 : return;
1210 :
1211 1 : hnum = g_strv_length (pair_arrs);
1212 1 : if (hnum == 0)
1213 0 : goto err_free_pair_arrs;
1214 :
1215 1 : g_free (self->mqtt_ntp_hnames);
1216 1 : self->mqtt_ntp_hnames = g_try_malloc0 ((hnum + 1) * sizeof (gchar *));
1217 1 : if (!self->mqtt_ntp_hnames)
1218 0 : goto err_free_pair_arrs;
1219 :
1220 1 : g_free (self->mqtt_ntp_ports);
1221 1 : self->mqtt_ntp_ports = g_try_malloc0 (hnum * sizeof (guint16));
1222 1 : if (!self->mqtt_ntp_ports)
1223 0 : goto err_free_mqtt_ntp_hnames;
1224 :
1225 1 : self->mqtt_ntp_num_srvs = hnum;
1226 2 : for (i = 0, j = 0; i < hnum; i++) {
1227 : gchar **hname_port;
1228 : gchar *hname;
1229 : gchar *eport;
1230 : gulong port_ul;
1231 :
1232 1 : pair = pair_arrs[i];
1233 1 : hname_port = g_strsplit (pair, ":", 2);
1234 1 : hname = hname_port[0];
1235 1 : port_ul = strtoul (hname_port[1], &eport, 10);
1236 1 : if ((port_ul == 0) || (port_ul > UINT16_MAX)) {
1237 0 : self->mqtt_ntp_num_srvs--;
1238 : } else {
1239 1 : self->mqtt_ntp_hnames[j] = g_strdup (hname);
1240 1 : self->mqtt_ntp_ports[j] = (uint16_t) port_ul;
1241 1 : ++j;
1242 : }
1243 :
1244 1 : g_strfreev (hname_port);
1245 : }
1246 :
1247 1 : g_strfreev (pair_arrs);
1248 1 : return;
1249 :
1250 0 : err_free_mqtt_ntp_hnames:
1251 0 : g_strfreev (self->mqtt_ntp_hnames);
1252 0 : self->mqtt_ntp_hnames = NULL;
1253 :
1254 0 : err_free_pair_arrs:
1255 0 : g_strfreev (pair_arrs);
1256 :
1257 0 : return;
1258 : }
1259 :
1260 : /** Callback function definitions */
1261 : /**
1262 : * @brief A callback function corresponding to MQTTAsync_connectOptions's
1263 : * onSuccess. This callback is invoked when the connection between
1264 : * this element and the broker is properly established.
1265 : */
1266 : static void
1267 8 : cb_mqtt_on_connect (void *context, MQTTAsync_successData * response)
1268 : {
1269 8 : GstMqttSink *self = (GstMqttSink *) context;
1270 : UNUSED (response);
1271 :
1272 8 : g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECTED);
1273 8 : g_mutex_lock (&self->mqtt_sink_mutex);
1274 8 : self->is_connected = TRUE;
1275 8 : g_cond_broadcast (&self->mqtt_sink_gcond);
1276 8 : g_mutex_unlock (&self->mqtt_sink_mutex);
1277 8 : }
1278 :
1279 : /**
1280 : * @brief A callback function corresponding to MQTTAsync_connectOptions's
1281 : * onFailure. This callback is invoked when it is failed to connect to
1282 : * the broker.
1283 : */
1284 : static void
1285 2 : cb_mqtt_on_connect_failure (void *context, MQTTAsync_failureData * response)
1286 : {
1287 2 : GstMqttSink *self = (GstMqttSink *) context;
1288 : UNUSED (response);
1289 :
1290 2 : g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECT_FAILURE);
1291 2 : g_mutex_lock (&self->mqtt_sink_mutex);
1292 2 : self->is_connected = FALSE;
1293 2 : g_cond_broadcast (&self->mqtt_sink_gcond);
1294 2 : g_mutex_unlock (&self->mqtt_sink_mutex);
1295 2 : }
1296 :
1297 : /**
1298 : * @brief A callback function corresponding to MQTTAsync_disconnectOptions's
1299 : * onSuccess. Regardless of the MQTTAsync_disconnect function's result,
1300 : * the pipeline should be stopped after this callback.
1301 : */
1302 : static void
1303 8 : cb_mqtt_on_disconnect (void *context, MQTTAsync_successData * response)
1304 : {
1305 8 : GstMqttSink *self = (GstMqttSink *) context;
1306 : UNUSED (response);
1307 :
1308 8 : g_atomic_int_set (&self->mqtt_sink_state, MQTT_DISCONNECTED);
1309 8 : g_mutex_lock (&self->mqtt_sink_mutex);
1310 8 : g_cond_broadcast (&self->mqtt_sink_gcond);
1311 8 : g_mutex_unlock (&self->mqtt_sink_mutex);
1312 8 : }
1313 :
1314 : /**
1315 : * @brief A callback function corresponding to MQTTAsync_disconnectOptions's
1316 : * onFailure. Regardless of the MQTTAsync_disconnect function's result,
1317 : * the pipeline should be stopped after this callback.
1318 : */
1319 : static void
1320 0 : cb_mqtt_on_disconnect_failure (void *context, MQTTAsync_failureData * response)
1321 : {
1322 0 : GstMqttSink *self = (GstMqttSink *) context;
1323 : UNUSED (response);
1324 :
1325 0 : g_atomic_int_set (&self->mqtt_sink_state, MQTT_DISCONNECT_FAILED);
1326 0 : g_mutex_lock (&self->mqtt_sink_mutex);
1327 0 : g_cond_broadcast (&self->mqtt_sink_gcond);
1328 0 : g_mutex_unlock (&self->mqtt_sink_mutex);
1329 0 : }
1330 :
1331 : /**
1332 : * @brief A callback function to be given to the MQTTAsync_setCallbacks function.
1333 : * This callback is activated when `mqtt-qos` is higher then 0.
1334 : */
1335 : static void
1336 0 : cb_mqtt_on_delivery_complete (void *context, MQTTAsync_token token)
1337 : {
1338 0 : GstMqttSink *self = (GstMqttSink *) context;
1339 :
1340 0 : GST_DEBUG_OBJECT (self,
1341 : "%s: the message with token(%d) has been delivered.", self->mqtt_topic,
1342 : token);
1343 0 : }
1344 :
1345 : /**
1346 : * @brief A callback function to be given to the MQTTAsync_setCallbacks function.
1347 : * When the connection between this element and the broker is broken,
1348 : * this callback will be invoked.
1349 : */
1350 : static void
1351 0 : cb_mqtt_on_connection_lost (void *context, char *cause)
1352 : {
1353 0 : GstMqttSink *self = (GstMqttSink *) context;
1354 : UNUSED (cause);
1355 :
1356 0 : g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECTION_LOST);
1357 0 : g_mutex_lock (&self->mqtt_sink_mutex);
1358 0 : self->is_connected = FALSE;
1359 0 : g_cond_broadcast (&self->mqtt_sink_gcond);
1360 0 : g_mutex_unlock (&self->mqtt_sink_mutex);
1361 0 : }
1362 :
1363 : /**
1364 : * @brief A callback function to be given to the MQTTAsync_setCallbacks function.
1365 : * In the case of the publisher, this callback is not used.
1366 : */
1367 : static int
1368 0 : cb_mqtt_on_message_arrived (void *context, char *topicName, int topicLen,
1369 : MQTTAsync_message * message)
1370 : {
1371 : UNUSED (context);
1372 : UNUSED (topicName);
1373 : UNUSED (topicLen);
1374 : UNUSED (message);
1375 : /* nothing to do */
1376 0 : return 1;
1377 : }
1378 :
1379 : /**
1380 : * @brief A callback function corresponding to MQTTAsync_responseOptions's
1381 : * onSuccess.
1382 : */
1383 : static void
1384 22 : cb_mqtt_on_send_success (void *context, MQTTAsync_successData * response)
1385 : {
1386 22 : GstMqttSink *self = (GstMqttSink *) context;
1387 22 : mqtt_sink_state_t state = g_atomic_int_get (&self->mqtt_sink_state);
1388 : UNUSED (response);
1389 :
1390 22 : if (state == SINK_RENDER_STOPPED) {
1391 0 : g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_EOS);
1392 :
1393 0 : g_mutex_lock (&self->mqtt_sink_mutex);
1394 0 : g_cond_broadcast (&self->mqtt_sink_gcond);
1395 0 : g_mutex_unlock (&self->mqtt_sink_mutex);
1396 : }
1397 22 : }
1398 :
1399 : /**
1400 : * @brief A callback function corresponding to MQTTAsync_responseOptions's
1401 : * onFailure.
1402 : */
1403 : static void
1404 1 : cb_mqtt_on_send_failure (void *context, MQTTAsync_failureData * response)
1405 : {
1406 1 : GstMqttSink *self = (GstMqttSink *) context;
1407 1 : mqtt_sink_state_t state = g_atomic_int_get (&self->mqtt_sink_state);
1408 : UNUSED (response);
1409 :
1410 1 : if (state == SINK_RENDER_STOPPED) {
1411 0 : g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_ERROR);
1412 :
1413 0 : g_mutex_lock (&self->mqtt_sink_mutex);
1414 0 : g_cond_broadcast (&self->mqtt_sink_gcond);
1415 0 : g_mutex_unlock (&self->mqtt_sink_mutex);
1416 : }
1417 :
1418 1 : }
|