Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1-only */
2 : /**
3 : * Copyright (C) 2021 Wook Song <wook16.song@samsung.com>
4 : */
5 : /**
6 : * @file mqttsrc.c
7 : * @date 08 Mar 2021
8 : * @brief Subscribe a MQTT topic and push incoming data to the GStreamer pipeline
9 : * @see https://github.com/nnstreamer/nnstreamer
10 : * @author Wook Song <wook16.song@samsung.com>
11 : * @bug No known bugs except for NYI items
12 : */
13 :
14 : #ifdef HAVE_CONFIG_H
15 : #include <config.h>
16 : #endif
17 :
18 : #ifdef G_OS_WIN32
19 : #include <process.h>
20 : #else
21 : #include <sys/types.h>
22 : #include <unistd.h>
23 : #endif
24 :
25 : #include <gst/base/gstbasesrc.h>
26 : #include <MQTTAsync.h>
27 : #include <nnstreamer_util.h>
28 :
29 : #include "mqttsrc.h"
30 :
31 : static GstStaticPadTemplate src_pad_template = GST_STATIC_PAD_TEMPLATE ("src",
32 : GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
33 :
34 : #define gst_mqtt_src_parent_class parent_class
35 240 : G_DEFINE_TYPE (GstMqttSrc, gst_mqtt_src, GST_TYPE_BASE_SRC);
36 :
37 : GST_DEBUG_CATEGORY_STATIC (gst_mqtt_src_debug);
38 : #define GST_CAT_DEFAULT gst_mqtt_src_debug
39 :
40 : enum
41 : {
42 : PROP_0,
43 :
44 : PROP_DEBUG,
45 : PROP_IS_LIVE,
46 : PROP_MQTT_CLIENT_ID,
47 : PROP_MQTT_HOST_ADDRESS,
48 : PROP_MQTT_HOST_PORT,
49 : PROP_MQTT_SUB_TOPIC,
50 : PROP_MQTT_SUB_TIMEOUT,
51 : PROP_MQTT_OPT_CLEANSESSION,
52 : PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL,
53 : PROP_MQTT_QOS,
54 :
55 : PROP_LAST
56 : };
57 :
58 : enum
59 : {
60 : DEFAULT_DEBUG = FALSE,
61 : DEFAULT_IS_LIVE = TRUE,
62 : DEFAULT_MQTT_OPT_CLEANSESSION = TRUE,
63 : DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL = 60, /* 1 minute */
64 : DEFAULT_MQTT_SUB_TIMEOUT = 10000000, /* 10 seconds */
65 : DEFAULT_MQTT_SUB_TIMEOUT_MIN = 1000000, /* 1 seconds */
66 : DEFAULT_MQTT_QOS = 2, /* Once and one only */
67 : };
68 :
69 : static guint8 src_client_id = 0;
70 : static const gchar DEFAULT_MQTT_HOST_ADDRESS[] = "127.0.0.1";
71 : static const gchar DEFAULT_MQTT_HOST_PORT[] = "1883";
72 : static const gchar TAG_ERR_MQTTSRC[] = "ERROR: MQTTSrc";
73 : static const gchar DEFAULT_MQTT_CLIENT_ID[] =
74 : "$HOSTNAME_$PID_^[0-9][0-9]?$|^255$";
75 : static const gchar DEFAULT_MQTT_CLIENT_ID_FORMAT[] = "%s_%u_src%u";
76 :
77 : /** Function prototype declarations */
78 : static void
79 : gst_mqtt_src_set_property (GObject * object, guint prop_id,
80 : const GValue * value, GParamSpec * pspec);
81 : static void
82 : gst_mqtt_src_get_property (GObject * object, guint prop_id,
83 : GValue * value, GParamSpec * pspec);
84 : static void gst_mqtt_src_class_finalize (GObject * object);
85 :
86 : static GstStateChangeReturn
87 : gst_mqtt_src_change_state (GstElement * element, GstStateChange transition);
88 :
89 : static gboolean gst_mqtt_src_start (GstBaseSrc * basesrc);
90 : static gboolean gst_mqtt_src_stop (GstBaseSrc * basesrc);
91 : static GstCaps *gst_mqtt_src_get_caps (GstBaseSrc * basesrc, GstCaps * filter);
92 : static gboolean gst_mqtt_src_renegotiate (GstBaseSrc * basesrc);
93 :
94 : static void
95 : gst_mqtt_src_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
96 : GstClockTime * start, GstClockTime * end);
97 : static gboolean gst_mqtt_src_is_seekable (GstBaseSrc * basesrc);
98 : static GstFlowReturn
99 : gst_mqtt_src_create (GstBaseSrc * basesrc, guint64 offset, guint size,
100 : GstBuffer ** buf);
101 : static gboolean gst_mqtt_src_query (GstBaseSrc * basesrc, GstQuery * query);
102 :
103 : static gboolean gst_mqtt_src_get_debug (GstMqttSrc * self);
104 : static void gst_mqtt_src_set_debug (GstMqttSrc * self, const gboolean flag);
105 : static gboolean gst_mqtt_src_get_is_live (GstMqttSrc * self);
106 : static void gst_mqtt_src_set_is_live (GstMqttSrc * self, const gboolean flag);
107 : static gchar *gst_mqtt_src_get_client_id (GstMqttSrc * self);
108 : static void gst_mqtt_src_set_client_id (GstMqttSrc * self, const gchar * id);
109 : static gchar *gst_mqtt_src_get_host_address (GstMqttSrc * self);
110 : static void gst_mqtt_src_set_host_address (GstMqttSrc * self,
111 : const gchar * addr);
112 : static gchar *gst_mqtt_src_get_host_port (GstMqttSrc * self);
113 : static void gst_mqtt_src_set_host_port (GstMqttSrc * self, const gchar * port);
114 : static gint64 gst_mqtt_src_get_sub_timeout (GstMqttSrc * self);
115 : static void gst_mqtt_src_set_sub_timeout (GstMqttSrc * self, const gint64 t);
116 : static gchar *gst_mqtt_src_get_sub_topic (GstMqttSrc * self);
117 : static void gst_mqtt_src_set_sub_topic (GstMqttSrc * self, const gchar * topic);
118 : static gboolean gst_mqtt_src_get_opt_cleansession (GstMqttSrc * self);
119 : static void gst_mqtt_src_set_opt_cleansession (GstMqttSrc * self,
120 : const gboolean val);
121 : static gint gst_mqtt_src_get_opt_keep_alive_interval (GstMqttSrc * self);
122 : static void gst_mqtt_src_set_opt_keep_alive_interval (GstMqttSrc * self,
123 : const gint num);
124 : static gint gst_mqtt_src_get_mqtt_qos (GstMqttSrc * self);
125 : static void gst_mqtt_src_set_mqtt_qos (GstMqttSrc * self, const gint qos);
126 :
127 : static void cb_mqtt_on_connection_lost (void *context, char *cause);
128 : static int cb_mqtt_on_message_arrived (void *context, char *topic_name,
129 : int topic_len, MQTTAsync_message * message);
130 : static void cb_mqtt_on_connect (void *context,
131 : MQTTAsync_successData * response);
132 : static void cb_mqtt_on_connect_failure (void *context,
133 : MQTTAsync_failureData * response);
134 : static void cb_mqtt_on_subscribe (void *context,
135 : MQTTAsync_successData * response);
136 : static void cb_mqtt_on_subscribe_failure (void *context,
137 : MQTTAsync_failureData * response);
138 : static void cb_mqtt_on_unsubscribe (void *context,
139 : MQTTAsync_successData * response);
140 : static void cb_mqtt_on_unsubscribe_failure (void *context,
141 : MQTTAsync_failureData * response);
142 :
143 : static void cb_memory_wrapped_destroy (void *p);
144 :
145 : static GstMQTTMessageHdr *_extract_mqtt_msg_hdr_from (GstMemory * mem,
146 : GstMemory ** hdr_mem, GstMapInfo * hdr_map_info);
147 : static void _put_timestamp_on_gst_buf (GstMqttSrc * self,
148 : GstMQTTMessageHdr * hdr, GstBuffer * buf);
149 : static gboolean _subscribe (GstMqttSrc * self);
150 : static gboolean _unsubscribe (GstMqttSrc * self);
151 :
152 : /**
153 : * @brief A utility function to check whether the timestamp marked by _put_timestamp_on_gst_buf () is valid or not
154 : */
155 : static inline gboolean
156 4 : _is_gst_buffer_timestamp_valid (GstBuffer * buf)
157 : {
158 4 : if (!GST_BUFFER_PTS_IS_VALID (buf) && !GST_BUFFER_DTS_IS_VALID (buf) &&
159 0 : !GST_BUFFER_DURATION_IS_VALID (buf))
160 0 : return FALSE;
161 4 : return TRUE;
162 : }
163 :
164 : /** Function definitions */
165 : /**
166 : * @brief Initialize GstMqttSrc object
167 : */
168 : static void
169 7 : gst_mqtt_src_init (GstMqttSrc * self)
170 : {
171 7 : MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
172 7 : MQTTAsync_responseOptions respn_opts = MQTTAsync_responseOptions_initializer;
173 7 : GstBaseSrc *basesrc = GST_BASE_SRC (self);
174 :
175 7 : self->gquark_err_tag = g_quark_from_string (TAG_ERR_MQTTSRC);
176 :
177 7 : gst_base_src_set_format (basesrc, GST_FORMAT_TIME);
178 7 : gst_base_src_set_async (basesrc, FALSE);
179 :
180 : /** init mqttsrc properties */
181 7 : self->mqtt_client_handle = NULL;
182 7 : self->debug = DEFAULT_DEBUG;
183 7 : self->is_live = DEFAULT_IS_LIVE;
184 7 : self->mqtt_client_id = g_strdup (DEFAULT_MQTT_CLIENT_ID);
185 7 : self->mqtt_host_address = g_strdup (DEFAULT_MQTT_HOST_ADDRESS);
186 7 : self->mqtt_host_port = g_strdup (DEFAULT_MQTT_HOST_PORT);
187 7 : self->mqtt_topic = NULL;
188 7 : self->mqtt_sub_timeout = (gint64) DEFAULT_MQTT_SUB_TIMEOUT;
189 7 : self->mqtt_conn_opts = conn_opts;
190 7 : self->mqtt_conn_opts.cleansession = DEFAULT_MQTT_OPT_CLEANSESSION;
191 7 : self->mqtt_conn_opts.keepAliveInterval = DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL;
192 7 : self->mqtt_conn_opts.onSuccess = cb_mqtt_on_connect;
193 7 : self->mqtt_conn_opts.onFailure = cb_mqtt_on_connect_failure;
194 7 : self->mqtt_conn_opts.context = self;
195 7 : self->mqtt_respn_opts = respn_opts;
196 7 : self->mqtt_respn_opts.onSuccess = NULL;
197 7 : self->mqtt_respn_opts.onFailure = NULL;
198 7 : self->mqtt_respn_opts.context = self;
199 7 : self->mqtt_qos = DEFAULT_MQTT_QOS;
200 :
201 : /** init private member variables */
202 7 : self->err = NULL;
203 7 : self->aqueue = g_async_queue_new ();
204 7 : g_cond_init (&self->mqtt_src_gcond);
205 7 : g_mutex_init (&self->mqtt_src_mutex);
206 7 : g_mutex_lock (&self->mqtt_src_mutex);
207 7 : self->is_connected = FALSE;
208 7 : self->is_subscribed = FALSE;
209 7 : self->latency = GST_CLOCK_TIME_NONE;
210 7 : g_mutex_unlock (&self->mqtt_src_mutex);
211 7 : self->base_time_epoch = GST_CLOCK_TIME_NONE;
212 7 : self->caps = NULL;
213 7 : self->num_dumped = 0;
214 :
215 7 : gst_base_src_set_live (basesrc, self->is_live);
216 7 : }
217 :
218 : /**
219 : * @brief Initialize GstMqttSrcClass object
220 : */
221 : static void
222 21 : gst_mqtt_src_class_init (GstMqttSrcClass * klass)
223 : {
224 21 : GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
225 21 : GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
226 21 : GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
227 :
228 21 : GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_MQTT_ELEM_NAME_SRC, 0,
229 : "MQTT src");
230 :
231 21 : gobject_class->set_property = gst_mqtt_src_set_property;
232 21 : gobject_class->get_property = gst_mqtt_src_get_property;
233 21 : gobject_class->finalize = gst_mqtt_src_class_finalize;
234 :
235 21 : g_object_class_install_property (gobject_class, PROP_DEBUG,
236 : g_param_spec_boolean ("debug", "Debug",
237 : "Produce extra verbose output for debug purpose", DEFAULT_DEBUG,
238 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
239 :
240 21 : g_object_class_install_property (gobject_class, PROP_IS_LIVE,
241 : g_param_spec_boolean ("is-live", "Is Live",
242 : "Synchronize the incoming buffers' timestamp with the current running time",
243 : DEFAULT_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
244 :
245 21 : g_object_class_install_property (gobject_class, PROP_MQTT_CLIENT_ID,
246 : g_param_spec_string ("client-id", "Client ID",
247 : "The client identifier passed to the server (broker)",
248 : DEFAULT_MQTT_CLIENT_ID, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
249 :
250 21 : g_object_class_install_property (gobject_class, PROP_MQTT_HOST_ADDRESS,
251 : g_param_spec_string ("host", "Host", "Host (broker) to connect to",
252 : DEFAULT_MQTT_HOST_ADDRESS,
253 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
254 :
255 21 : g_object_class_install_property (gobject_class, PROP_MQTT_HOST_PORT,
256 : g_param_spec_string ("port", "Port",
257 : "Network port of host (broker) to connect to", DEFAULT_MQTT_HOST_PORT,
258 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
259 :
260 21 : g_object_class_install_property (gobject_class, PROP_MQTT_SUB_TIMEOUT,
261 : g_param_spec_int64 ("sub-timeout", "Timeout for receiving a message",
262 : "The timeout (in microseconds) for receiving a message from subscribed topic",
263 : 1000000, G_MAXINT64, DEFAULT_MQTT_SUB_TIMEOUT,
264 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
265 :
266 21 : g_object_class_install_property (gobject_class, PROP_MQTT_SUB_TOPIC,
267 : g_param_spec_string ("sub-topic", "Topic to Subscribe (mandatory)",
268 : "The topic's name to subscribe", NULL,
269 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
270 :
271 21 : g_object_class_install_property (gobject_class, PROP_MQTT_OPT_CLEANSESSION,
272 : g_param_spec_boolean ("cleansession", "Cleansession",
273 : "When it is TRUE, the state information is discarded at connect and disconnect.",
274 : DEFAULT_MQTT_OPT_CLEANSESSION,
275 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
276 :
277 21 : g_object_class_install_property (gobject_class,
278 : PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL,
279 : g_param_spec_int ("keep-alive-interval", "Keep Alive Interval",
280 : "The maximum time (in seconds) that should pass without communication between the client and the server (broker)",
281 : 1, G_MAXINT32, DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL,
282 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
283 :
284 21 : g_object_class_install_property (gobject_class, PROP_MQTT_QOS,
285 : g_param_spec_int ("mqtt-qos", "mqtt QoS level",
286 : "The QoS level of MQTT.\n"
287 : "\t\t\t 0: At most once\n"
288 : "\t\t\t 1: At least once\n"
289 : "\t\t\t 2: Exactly once\n"
290 : "\t\t\tsee also: https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/qos.html",
291 : 0, 2, DEFAULT_MQTT_QOS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
292 :
293 21 : gstelement_class->change_state =
294 21 : GST_DEBUG_FUNCPTR (gst_mqtt_src_change_state);
295 :
296 21 : gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_mqtt_src_start);
297 21 : gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_mqtt_src_stop);
298 21 : gstbasesrc_class->get_caps = GST_DEBUG_FUNCPTR (gst_mqtt_src_get_caps);
299 21 : gstbasesrc_class->get_times = GST_DEBUG_FUNCPTR (gst_mqtt_src_get_times);
300 21 : gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR (gst_mqtt_src_is_seekable);
301 21 : gstbasesrc_class->create = GST_DEBUG_FUNCPTR (gst_mqtt_src_create);
302 21 : gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_mqtt_src_query);
303 :
304 21 : gst_element_class_set_static_metadata (gstelement_class,
305 : "MQTT source", "Source/MQTT",
306 : "Subscribe a MQTT topic and push incoming data to the GStreamer pipeline",
307 : "Wook Song <wook16.song@samsung.com>");
308 21 : gst_element_class_add_static_pad_template (gstelement_class,
309 : &src_pad_template);
310 21 : }
311 :
312 : /**
313 : * @brief The setter for the mqttsrc's properties
314 : */
315 : static void
316 30 : gst_mqtt_src_set_property (GObject * object, guint prop_id,
317 : const GValue * value, GParamSpec * pspec)
318 : {
319 30 : GstMqttSrc *self = GST_MQTT_SRC (object);
320 :
321 30 : switch (prop_id) {
322 6 : case PROP_DEBUG:
323 6 : gst_mqtt_src_set_debug (self, g_value_get_boolean (value));
324 6 : break;
325 6 : case PROP_IS_LIVE:
326 6 : gst_mqtt_src_set_is_live (self, g_value_get_boolean (value));
327 6 : break;
328 1 : case PROP_MQTT_CLIENT_ID:
329 1 : gst_mqtt_src_set_client_id (self, g_value_get_string (value));
330 1 : break;
331 1 : case PROP_MQTT_HOST_ADDRESS:
332 1 : gst_mqtt_src_set_host_address (self, g_value_get_string (value));
333 1 : break;
334 1 : case PROP_MQTT_HOST_PORT:
335 1 : gst_mqtt_src_set_host_port (self, g_value_get_string (value));
336 1 : break;
337 6 : case PROP_MQTT_SUB_TIMEOUT:
338 6 : gst_mqtt_src_set_sub_timeout (self, g_value_get_int64 (value));
339 6 : break;
340 6 : case PROP_MQTT_SUB_TOPIC:
341 6 : gst_mqtt_src_set_sub_topic (self, g_value_get_string (value));
342 6 : break;
343 1 : case PROP_MQTT_OPT_CLEANSESSION:
344 1 : gst_mqtt_src_set_opt_cleansession (self, g_value_get_boolean (value));
345 1 : break;
346 1 : case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
347 1 : gst_mqtt_src_set_opt_keep_alive_interval (self, g_value_get_int (value));
348 1 : break;
349 1 : case PROP_MQTT_QOS:
350 1 : gst_mqtt_src_set_mqtt_qos (self, g_value_get_int (value));
351 1 : break;
352 0 : default:
353 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
354 0 : break;
355 : }
356 30 : }
357 :
358 : /**
359 : * @brief The getter for the mqttsrc's properties
360 : */
361 : static void
362 14 : gst_mqtt_src_get_property (GObject * object, guint prop_id,
363 : GValue * value, GParamSpec * pspec)
364 : {
365 14 : GstMqttSrc *self = GST_MQTT_SRC (object);
366 :
367 14 : switch (prop_id) {
368 2 : case PROP_DEBUG:
369 2 : g_value_set_boolean (value, gst_mqtt_src_get_debug (self));
370 2 : break;
371 1 : case PROP_IS_LIVE:
372 1 : g_value_set_boolean (value, gst_mqtt_src_get_is_live (self));
373 1 : break;
374 1 : case PROP_MQTT_CLIENT_ID:
375 1 : g_value_set_string (value, gst_mqtt_src_get_client_id (self));
376 1 : break;
377 1 : case PROP_MQTT_HOST_ADDRESS:
378 1 : g_value_set_string (value, gst_mqtt_src_get_host_address (self));
379 1 : break;
380 1 : case PROP_MQTT_HOST_PORT:
381 1 : g_value_set_string (value, gst_mqtt_src_get_host_port (self));
382 1 : break;
383 2 : case PROP_MQTT_SUB_TIMEOUT:
384 2 : g_value_set_int64 (value, gst_mqtt_src_get_sub_timeout (self));
385 2 : break;
386 1 : case PROP_MQTT_SUB_TOPIC:
387 1 : g_value_set_string (value, gst_mqtt_src_get_sub_topic (self));
388 1 : break;
389 1 : case PROP_MQTT_OPT_CLEANSESSION:
390 1 : g_value_set_boolean (value, gst_mqtt_src_get_opt_cleansession (self));
391 1 : break;
392 2 : case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
393 2 : g_value_set_int (value, gst_mqtt_src_get_opt_keep_alive_interval (self));
394 2 : break;
395 2 : case PROP_MQTT_QOS:
396 2 : g_value_set_int (value, gst_mqtt_src_get_mqtt_qos (self));
397 2 : break;
398 0 : default:
399 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
400 0 : break;
401 : }
402 14 : }
403 :
404 : /**
405 : * @brief Finalize GstMqttSrcClass object
406 : */
407 : static void
408 7 : gst_mqtt_src_class_finalize (GObject * object)
409 : {
410 7 : GstMqttSrc *self = GST_MQTT_SRC (object);
411 : GstBuffer *remained;
412 :
413 7 : if (self->mqtt_client_handle) {
414 0 : MQTTAsync_destroy (&self->mqtt_client_handle);
415 0 : self->mqtt_client_handle = NULL;
416 : }
417 :
418 7 : g_free (self->mqtt_client_id);
419 7 : g_free (self->mqtt_host_address);
420 7 : g_free (self->mqtt_host_port);
421 7 : g_free (self->mqtt_topic);
422 7 : gst_caps_replace (&self->caps, NULL);
423 :
424 7 : if (self->err)
425 2 : g_error_free (self->err);
426 :
427 7 : while ((remained = g_async_queue_try_pop (self->aqueue))) {
428 0 : gst_buffer_unref (remained);
429 : }
430 7 : g_clear_pointer (&self->aqueue, g_async_queue_unref);
431 :
432 7 : g_mutex_clear (&self->mqtt_src_mutex);
433 7 : G_OBJECT_CLASS (parent_class)->finalize (object);
434 7 : }
435 :
436 : /**
437 : * @brief Handle mqttsrc's state change
438 : */
439 : static GstStateChangeReturn
440 32 : gst_mqtt_src_change_state (GstElement * element, GstStateChange transition)
441 : {
442 32 : GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
443 32 : GstMqttSrc *self = GST_MQTT_SRC (element);
444 32 : gboolean no_preroll = FALSE;
445 : GstClock *elem_clock;
446 : GstClockTime base_time;
447 : GstClockTime cur_time;
448 : GstClockTimeDiff diff;
449 :
450 32 : switch (transition) {
451 5 : case GST_STATE_CHANGE_NULL_TO_READY:
452 5 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_NULL_TO_READY");
453 5 : if (self->err) {
454 0 : g_printerr ("%s: %s\n", g_quark_to_string (self->err->domain),
455 0 : self->err->message);
456 0 : return GST_STATE_CHANGE_FAILURE;
457 : }
458 5 : break;
459 5 : case GST_STATE_CHANGE_READY_TO_PAUSED:
460 5 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_PAUSED");
461 : /* Regardless of the 'is-live''s value, prerolling is not supported */
462 5 : no_preroll = TRUE;
463 5 : break;
464 5 : case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
465 5 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_PLAYING");
466 5 : self->base_time_epoch = GST_CLOCK_TIME_NONE;
467 5 : elem_clock = gst_element_get_clock (element);
468 5 : if (!elem_clock)
469 0 : break;
470 5 : base_time = gst_element_get_base_time (element);
471 5 : cur_time = gst_clock_get_time (elem_clock);
472 5 : gst_object_unref (elem_clock);
473 5 : diff = GST_CLOCK_DIFF (base_time, cur_time);
474 5 : self->base_time_epoch =
475 5 : g_get_real_time () * GST_US_TO_NS_MULTIPLIER - diff;
476 :
477 : /** This handles the case when the state is changed to PLAYING again */
478 5 : if (GST_BASE_SRC_IS_STARTED (GST_BASE_SRC (self)) &&
479 5 : (self->is_connected == FALSE)) {
480 0 : int conn = MQTTAsync_reconnect (self->mqtt_client_handle);
481 :
482 0 : if (conn != MQTTASYNC_SUCCESS) {
483 0 : GST_ERROR_OBJECT (self, "Failed to re-subscribe to %s",
484 : self->mqtt_topic);
485 :
486 0 : return GST_STATE_CHANGE_FAILURE;
487 : }
488 : }
489 5 : break;
490 17 : default:
491 17 : break;
492 : }
493 :
494 32 : ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
495 :
496 32 : switch (transition) {
497 5 : case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
498 5 : if (self->is_subscribed && !_unsubscribe (self)) {
499 0 : GST_ERROR_OBJECT (self, "Cannot unsubscribe to %s", self->mqtt_topic);
500 : }
501 5 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PLAYING_TO_PAUSED");
502 5 : break;
503 5 : case GST_STATE_CHANGE_PAUSED_TO_READY:
504 5 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_READY");
505 5 : break;
506 5 : case GST_STATE_CHANGE_READY_TO_NULL:
507 5 : GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_NULL");
508 : default:
509 22 : break;
510 : }
511 :
512 32 : if (no_preroll && ret == GST_STATE_CHANGE_SUCCESS)
513 0 : ret = GST_STATE_CHANGE_NO_PREROLL;
514 :
515 32 : return ret;
516 : }
517 :
518 : /**
519 : * @brief Start mqttsrc, called when state changed null to ready
520 : */
521 : static gboolean
522 5 : gst_mqtt_src_start (GstBaseSrc * basesrc)
523 : {
524 5 : GstMqttSrc *self = GST_MQTT_SRC (basesrc);
525 5 : gchar *haddr = g_strdup_printf ("%s:%s", self->mqtt_host_address,
526 : self->mqtt_host_port);
527 : int ret;
528 : gint64 end_time;
529 :
530 5 : if (!g_strcmp0 (DEFAULT_MQTT_CLIENT_ID, self->mqtt_client_id)) {
531 5 : g_free (self->mqtt_client_id);
532 5 : self->mqtt_client_id = g_strdup_printf (DEFAULT_MQTT_CLIENT_ID_FORMAT,
533 5 : g_get_host_name (), getpid (), src_client_id++);
534 : }
535 :
536 : /**
537 : * @todo Support other persistence mechanisms
538 : * MQTTCLIENT_PERSISTENCE_NONE: A memory-based persistence mechanism
539 : * MQTTCLIENT_PERSISTENCE_DEFAULT: The default file system-based
540 : * persistence mechanism
541 : * MQTTCLIENT_PERSISTENCE_USER: An application-specific persistence
542 : * mechanism
543 : */
544 10 : ret = MQTTAsync_create (&self->mqtt_client_handle, haddr,
545 5 : self->mqtt_client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
546 5 : g_free (haddr);
547 5 : if (ret != MQTTASYNC_SUCCESS)
548 0 : return FALSE;
549 :
550 5 : MQTTAsync_setCallbacks (self->mqtt_client_handle, self,
551 : cb_mqtt_on_connection_lost, cb_mqtt_on_message_arrived, NULL);
552 :
553 5 : ret = MQTTAsync_connect (self->mqtt_client_handle, &self->mqtt_conn_opts);
554 5 : if (ret != MQTTASYNC_SUCCESS)
555 0 : goto error;
556 :
557 : /* Waiting for the connection */
558 5 : end_time = g_get_monotonic_time () +
559 : DEFAULT_MQTT_CONN_TIMEOUT_SEC * G_TIME_SPAN_SECOND;
560 5 : g_mutex_lock (&self->mqtt_src_mutex);
561 5 : while (!self->is_connected) {
562 0 : if (!g_cond_wait_until (&self->mqtt_src_gcond, &self->mqtt_src_mutex,
563 : end_time)) {
564 0 : g_mutex_unlock (&self->mqtt_src_mutex);
565 0 : g_critical ("Failed to connect to MQTT broker from mqttsrc."
566 : "Please check broker is running status or broker host address.");
567 0 : goto error;
568 : }
569 : }
570 5 : g_mutex_unlock (&self->mqtt_src_mutex);
571 5 : return TRUE;
572 :
573 0 : error:
574 0 : MQTTAsync_destroy (&self->mqtt_client_handle);
575 0 : self->mqtt_client_handle = NULL;
576 0 : return FALSE;
577 : }
578 :
579 : /**
580 : * @brief Stop mqttsrc, called when state changed ready to null
581 : */
582 : static gboolean
583 5 : gst_mqtt_src_stop (GstBaseSrc * basesrc)
584 : {
585 5 : GstMqttSrc *self = GST_MQTT_SRC (basesrc);
586 :
587 : /* todo */
588 5 : MQTTAsync_disconnect (self->mqtt_client_handle, NULL);
589 5 : g_mutex_lock (&self->mqtt_src_mutex);
590 5 : self->is_connected = FALSE;
591 5 : g_mutex_unlock (&self->mqtt_src_mutex);
592 5 : MQTTAsync_destroy (&self->mqtt_client_handle);
593 5 : self->mqtt_client_handle = NULL;
594 5 : return TRUE;
595 : }
596 :
597 : /**
598 : * @brief Get caps of subclass
599 : */
600 : static GstCaps *
601 51 : gst_mqtt_src_get_caps (GstBaseSrc * basesrc, GstCaps * filter)
602 : {
603 51 : GstPad *pad = GST_BASE_SRC_PAD (basesrc);
604 51 : GstCaps *cur_caps = gst_pad_get_current_caps (pad);
605 51 : GstCaps *caps = gst_caps_new_any ();
606 : UNUSED (filter);
607 :
608 51 : if (cur_caps) {
609 : GstCaps *intersection =
610 1 : gst_caps_intersect_full (cur_caps, caps, GST_CAPS_INTERSECT_FIRST);
611 :
612 1 : gst_caps_unref (cur_caps);
613 1 : gst_caps_unref (caps);
614 1 : caps = intersection;
615 : }
616 :
617 51 : return caps;
618 : }
619 :
620 : /**
621 : * @brief Do negotiation procedure again if it needed
622 : */
623 : static gboolean
624 4 : gst_mqtt_src_renegotiate (GstBaseSrc * basesrc)
625 : {
626 4 : GstMqttSrc *self = GST_MQTT_SRC (basesrc);
627 4 : GstCaps *caps = NULL;
628 4 : GstCaps *peercaps = NULL;
629 : GstCaps *thiscaps;
630 4 : gboolean result = FALSE;
631 4 : GstCaps *fixed_caps = NULL;
632 :
633 4 : if (self->caps == NULL || gst_caps_is_any (self->caps))
634 0 : goto no_nego_needed;
635 :
636 4 : thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
637 4 : if (thiscaps && gst_caps_is_equal (self->caps, thiscaps)) {
638 0 : gst_caps_unref (thiscaps);
639 0 : goto no_nego_needed;
640 : }
641 :
642 4 : peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), self->caps);
643 4 : if (peercaps && !gst_caps_is_empty (peercaps)) {
644 3 : caps = gst_caps_ref (peercaps);
645 3 : if (peercaps != self->caps)
646 3 : gst_caps_unref (peercaps);
647 : } else {
648 1 : caps = gst_caps_ref (self->caps);
649 : }
650 :
651 4 : if (caps && !gst_caps_is_empty (caps)) {
652 4 : if (gst_caps_is_any (caps)) {
653 0 : result = TRUE;
654 : } else {
655 4 : fixed_caps = gst_caps_fixate (caps);
656 4 : if (fixed_caps && gst_caps_is_fixed (fixed_caps)) {
657 4 : result = gst_base_src_set_caps (basesrc, fixed_caps);
658 4 : if (peercaps == self->caps)
659 0 : gst_caps_unref (fixed_caps);
660 : }
661 : }
662 4 : gst_caps_unref (caps);
663 : } else {
664 0 : result = FALSE;
665 0 : if (caps && gst_caps_is_empty (caps))
666 0 : gst_caps_unref (caps);
667 : }
668 :
669 4 : if (thiscaps)
670 4 : gst_caps_unref (thiscaps);
671 :
672 4 : return result;
673 :
674 0 : no_nego_needed:
675 : {
676 0 : GST_DEBUG_OBJECT (self, "no negotiation needed");
677 :
678 0 : return TRUE;
679 : }
680 : }
681 :
682 : /**
683 : * @brief Return the time information of the given buffer
684 : */
685 : static void
686 4 : gst_mqtt_src_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
687 : GstClockTime * start, GstClockTime * end)
688 : {
689 : GstClockTime sync_ts;
690 : GstClockTime duration;
691 : UNUSED (basesrc);
692 :
693 4 : sync_ts = GST_BUFFER_DTS (buffer);
694 4 : duration = GST_BUFFER_DURATION (buffer);
695 :
696 4 : if (!GST_CLOCK_TIME_IS_VALID (sync_ts))
697 0 : sync_ts = GST_BUFFER_PTS (buffer);
698 :
699 4 : if (GST_CLOCK_TIME_IS_VALID (sync_ts)) {
700 4 : *start = sync_ts;
701 4 : if (GST_CLOCK_TIME_IS_VALID (duration)) {
702 4 : *end = sync_ts + duration;
703 : }
704 : }
705 4 : }
706 :
707 : /**
708 : * @brief Check if source supports seeking
709 : * @note Seeking is not supported since this element handles live subscription data.
710 : */
711 : static gboolean
712 5 : gst_mqtt_src_is_seekable (GstBaseSrc * basesrc)
713 : {
714 : UNUSED (basesrc);
715 5 : return FALSE;
716 : }
717 :
718 : /**
719 : * @brief Create a buffer containing the subscribed data
720 : */
721 : static GstFlowReturn
722 6 : gst_mqtt_src_create (GstBaseSrc * basesrc, guint64 offset, guint size,
723 : GstBuffer ** buf)
724 : {
725 6 : GstMqttSrc *self = GST_MQTT_SRC (basesrc);
726 6 : gint64 elapsed = self->mqtt_sub_timeout;
727 : UNUSED (offset);
728 : UNUSED (size);
729 :
730 6 : g_mutex_lock (&self->mqtt_src_mutex);
731 6 : while ((!self->is_connected) || (!self->is_subscribed)) {
732 2 : gint64 end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND;
733 :
734 2 : g_cond_wait_until (&self->mqtt_src_gcond, &self->mqtt_src_mutex, end_time);
735 2 : if (self->err) {
736 2 : g_mutex_unlock (&self->mqtt_src_mutex);
737 2 : goto ret_flow_err;
738 : }
739 : }
740 4 : g_mutex_unlock (&self->mqtt_src_mutex);
741 :
742 4 : while (elapsed > 0) {
743 : /** @todo DEFAULT_MQTT_SUB_TIMEOUT_MIN is too long */
744 4 : *buf = g_async_queue_timeout_pop (self->aqueue,
745 : DEFAULT_MQTT_SUB_TIMEOUT_MIN);
746 4 : if (*buf) {
747 4 : GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
748 4 : GstClockTime ulatency = GST_CLOCK_TIME_NONE;
749 : GstClock *clock;
750 :
751 : /** This buffer is coming from the past. Drop it. */
752 4 : if (!_is_gst_buffer_timestamp_valid (*buf)) {
753 0 : if (self->debug) {
754 0 : GST_DEBUG_OBJECT (self,
755 : "%s: Dumped the received buffer! (total: %" G_GUINT64_FORMAT ")",
756 : self->mqtt_topic, ++self->num_dumped);
757 : }
758 0 : elapsed = self->mqtt_sub_timeout;
759 0 : gst_buffer_unref (*buf);
760 0 : continue;
761 : }
762 :
763 : /** Update latency */
764 4 : clock = gst_element_get_clock (GST_ELEMENT (self));
765 4 : if (clock) {
766 4 : GstClockTime cur_time = gst_clock_get_time (clock);
767 4 : GstClockTime buf_ts = GST_BUFFER_TIMESTAMP (*buf);
768 4 : GstClockTimeDiff latency = 0;
769 :
770 4 : if ((base_time != GST_CLOCK_TIME_NONE) &&
771 4 : (cur_time != GST_CLOCK_TIME_NONE) &&
772 : (buf_ts != GST_CLOCK_TIME_NONE)) {
773 4 : GstClockTimeDiff now = GST_CLOCK_DIFF (base_time, cur_time);
774 :
775 4 : latency = GST_CLOCK_DIFF (buf_ts, (GstClockTime) now);
776 : }
777 :
778 4 : if (latency > 0) {
779 0 : ulatency = (GstClockTime) latency;
780 :
781 0 : if (GST_BUFFER_DURATION_IS_VALID (*buf)) {
782 0 : GstClockTime duration = GST_BUFFER_DURATION (*buf);
783 :
784 0 : if (duration >= ulatency) {
785 0 : ulatency = GST_CLOCK_TIME_NONE;
786 : }
787 : }
788 : }
789 4 : gst_object_unref (clock);
790 : }
791 :
792 4 : g_mutex_lock (&self->mqtt_src_mutex);
793 4 : self->latency = ulatency;
794 4 : g_mutex_unlock (&self->mqtt_src_mutex);
795 : /**
796 : * @todo If the difference between new latency and old latency,
797 : * gst_element_post_message (GST_ELEMENT_CAST (self),
798 : * gst_message_new_latency (GST_OBJECT_CAST (self)));
799 : * is needed.
800 : */
801 4 : break;
802 0 : } else if (self->err) {
803 0 : break;
804 : }
805 0 : elapsed = elapsed - DEFAULT_MQTT_SUB_TIMEOUT_MIN;
806 : }
807 :
808 4 : if (*buf == NULL) {
809 : /** @todo: Send EoS here */
810 0 : if (!self->err)
811 0 : self->err = g_error_new (self->gquark_err_tag, GST_FLOW_EOS,
812 : "%s: Timeout for receiving a message has been expired. Regarding as an error",
813 : __func__);
814 0 : goto ret_flow_err;
815 : }
816 :
817 4 : return GST_FLOW_OK;
818 :
819 2 : ret_flow_err:
820 2 : if (self->err) {
821 2 : g_printerr ("%s: %s\n", g_quark_to_string (self->err->domain),
822 2 : self->err->message);
823 : }
824 2 : return GST_FLOW_ERROR;
825 : }
826 :
827 : /**
828 : * @brief An implementation of the GstBaseSrc vmethod that handles queries
829 : */
830 : static gboolean
831 59 : gst_mqtt_src_query (GstBaseSrc * basesrc, GstQuery * query)
832 : {
833 59 : GstQueryType type = GST_QUERY_TYPE (query);
834 59 : GstMqttSrc *self = GST_MQTT_SRC (basesrc);
835 59 : gboolean res = FALSE;
836 :
837 59 : if (self->debug)
838 57 : GST_DEBUG_OBJECT (self, "Got %s event", gst_query_type_get_name (type));
839 :
840 59 : switch (type) {
841 3 : case GST_QUERY_LATENCY:{
842 3 : GstClockTime min_latency = 0;
843 3 : GstClockTime max_latency = GST_CLOCK_TIME_NONE;
844 :
845 3 : g_mutex_lock (&self->mqtt_src_mutex);
846 3 : if (self->latency != GST_CLOCK_TIME_NONE) {
847 0 : min_latency = self->latency;
848 : }
849 3 : g_mutex_unlock (&self->mqtt_src_mutex);
850 :
851 3 : if (self->debug) {
852 3 : GST_DEBUG_OBJECT (self,
853 : "Reporting latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT,
854 : GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
855 : }
856 : /**
857 : * @brief The second argument of gst_query_set_latency should be always
858 : * TRUE.
859 : */
860 3 : gst_query_set_latency (query, TRUE, min_latency, max_latency);
861 :
862 3 : res = TRUE;
863 3 : break;
864 : }
865 56 : default:{
866 56 : res = GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query);
867 : }
868 : }
869 :
870 59 : return res;
871 : }
872 :
873 : /**
874 : * @brief Getter for the 'debug' property.
875 : */
876 : static gboolean
877 2 : gst_mqtt_src_get_debug (GstMqttSrc * self)
878 : {
879 2 : return self->debug;
880 : }
881 :
882 : /**
883 : * @brief Setter for the 'debug' property.
884 : */
885 : static void
886 6 : gst_mqtt_src_set_debug (GstMqttSrc * self, const gboolean flag)
887 : {
888 6 : self->debug = flag;
889 6 : }
890 :
891 : /**
892 : * @brief Getter for the 'is-live' property.
893 : */
894 : static gboolean
895 1 : gst_mqtt_src_get_is_live (GstMqttSrc * self)
896 : {
897 1 : return self->is_live;
898 : }
899 :
900 : /**
901 : * @brief Setter for the 'is-live' property.
902 : */
903 : static void
904 6 : gst_mqtt_src_set_is_live (GstMqttSrc * self, const gboolean flag)
905 : {
906 6 : self->is_live = flag;
907 6 : gst_base_src_set_live (GST_BASE_SRC (self), self->is_live);
908 6 : }
909 :
910 : /**
911 : * @brief Getter for the 'client-id' property.
912 : */
913 : static gchar *
914 1 : gst_mqtt_src_get_client_id (GstMqttSrc * self)
915 : {
916 1 : return self->mqtt_client_id;
917 : }
918 :
919 : /**
920 : * @brief Setter for the 'client-id' property.
921 : */
922 : static void
923 1 : gst_mqtt_src_set_client_id (GstMqttSrc * self, const gchar * id)
924 : {
925 1 : g_free (self->mqtt_client_id);
926 1 : self->mqtt_client_id = g_strdup (id);
927 1 : }
928 :
929 : /**
930 : * @brief Getter for the 'host' property.
931 : */
932 : static gchar *
933 1 : gst_mqtt_src_get_host_address (GstMqttSrc * self)
934 : {
935 1 : return self->mqtt_host_address;
936 : }
937 :
938 : /**
939 : * @brief Setter for the 'host' property
940 : */
941 : static void
942 1 : gst_mqtt_src_set_host_address (GstMqttSrc * self, const gchar * addr)
943 : {
944 : /**
945 : * @todo Handle the case where the addr is changed at runtime
946 : */
947 1 : g_free (self->mqtt_host_address);
948 1 : self->mqtt_host_address = g_strdup (addr);
949 1 : }
950 :
951 : /**
952 : * @brief Getter for the 'port' property.
953 : */
954 : static gchar *
955 1 : gst_mqtt_src_get_host_port (GstMqttSrc * self)
956 : {
957 1 : return self->mqtt_host_port;
958 : }
959 :
960 : /**
961 : * @brief Setter for the 'port' property
962 : */
963 : static void
964 1 : gst_mqtt_src_set_host_port (GstMqttSrc * self, const gchar * port)
965 : {
966 1 : g_free (self->mqtt_host_port);
967 1 : self->mqtt_host_port = g_strdup (port);
968 1 : }
969 :
970 : /**
971 : * @brief Getter for the 'sub-timeout' property
972 : */
973 : static gint64
974 2 : gst_mqtt_src_get_sub_timeout (GstMqttSrc * self)
975 : {
976 2 : return self->mqtt_sub_timeout;
977 : }
978 :
979 : /**
980 : * @brief Setter for the 'sub-timeout' property
981 : */
982 : static void
983 6 : gst_mqtt_src_set_sub_timeout (GstMqttSrc * self, const gint64 t)
984 : {
985 6 : self->mqtt_sub_timeout = t;
986 6 : }
987 :
988 : /**
989 : * @brief Getter for the 'sub-topic' property
990 : */
991 : static gchar *
992 1 : gst_mqtt_src_get_sub_topic (GstMqttSrc * self)
993 : {
994 1 : return self->mqtt_topic;
995 : }
996 :
997 : /**
998 : * @brief Setter for the 'sub-topic' property
999 : */
1000 : static void
1001 6 : gst_mqtt_src_set_sub_topic (GstMqttSrc * self, const gchar * topic)
1002 : {
1003 6 : g_free (self->mqtt_topic);
1004 6 : self->mqtt_topic = g_strdup (topic);
1005 6 : }
1006 :
1007 : /**
1008 : * @brief Getter for the 'cleansession' property.
1009 : */
1010 : static gboolean
1011 1 : gst_mqtt_src_get_opt_cleansession (GstMqttSrc * self)
1012 : {
1013 1 : return self->mqtt_conn_opts.cleansession;
1014 : }
1015 :
1016 : /**
1017 : * @brief Setter for the 'cleansession' property.
1018 : */
1019 : static void
1020 1 : gst_mqtt_src_set_opt_cleansession (GstMqttSrc * self, const gboolean val)
1021 : {
1022 1 : self->mqtt_conn_opts.cleansession = val;
1023 1 : }
1024 :
1025 : /**
1026 : * @brief Getter for the 'keep-alive-interval' property
1027 : */
1028 : static gint
1029 2 : gst_mqtt_src_get_opt_keep_alive_interval (GstMqttSrc * self)
1030 : {
1031 2 : return self->mqtt_conn_opts.keepAliveInterval;
1032 : }
1033 :
1034 : /**
1035 : * @brief Setter for the 'keep-alive-interval' property
1036 : */
1037 : static void
1038 1 : gst_mqtt_src_set_opt_keep_alive_interval (GstMqttSrc * self, const gint num)
1039 : {
1040 1 : self->mqtt_conn_opts.keepAliveInterval = num;
1041 1 : }
1042 :
1043 : /**
1044 : * @brief Getter for the 'mqtt-qos' property
1045 : */
1046 : static gint
1047 2 : gst_mqtt_src_get_mqtt_qos (GstMqttSrc * self)
1048 : {
1049 2 : return self->mqtt_qos;
1050 : }
1051 :
1052 : /**
1053 : * @brief Setter for the 'mqtt-qos' property
1054 : */
1055 : static void
1056 1 : gst_mqtt_src_set_mqtt_qos (GstMqttSrc * self, const gint qos)
1057 : {
1058 1 : self->mqtt_qos = qos;
1059 1 : }
1060 :
1061 : /**
1062 : * @brief A callback to handle the connection lost to the broker
1063 : */
1064 : static void
1065 0 : cb_mqtt_on_connection_lost (void *context, char *cause)
1066 : {
1067 0 : GstMqttSrc *self = GST_MQTT_SRC_CAST (context);
1068 : UNUSED (cause);
1069 :
1070 0 : g_mutex_lock (&self->mqtt_src_mutex);
1071 0 : self->is_connected = FALSE;
1072 0 : self->is_subscribed = FALSE;
1073 0 : g_cond_broadcast (&self->mqtt_src_gcond);
1074 0 : if (!self->err) {
1075 0 : self->err = g_error_new (self->gquark_err_tag, EHOSTDOWN,
1076 : "Connection to the host (broker) has been lost: %s \n"
1077 : "\t\tfor detail, please check the log message of the broker",
1078 : g_strerror (EHOSTDOWN));
1079 : }
1080 0 : g_mutex_unlock (&self->mqtt_src_mutex);
1081 0 : }
1082 :
1083 : /**
1084 : * @brief A callback to handle the arrived message
1085 : */
1086 : static int
1087 6 : cb_mqtt_on_message_arrived (void *context, char *topic_name, int topic_len,
1088 : MQTTAsync_message * message)
1089 : {
1090 6 : const int size = message->payloadlen;
1091 6 : guint8 *data = message->payload;
1092 : GstMQTTMessageHdr *mqtt_msg_hdr;
1093 : GstMapInfo hdr_map_info;
1094 : GstMemory *received_mem;
1095 : GstMemory *hdr_mem;
1096 : GstBuffer *buffer;
1097 : GstBaseSrc *basesrc;
1098 : GstMqttSrc *self;
1099 : GstClock *clock;
1100 : gsize offset;
1101 : guint i;
1102 : UNUSED (topic_name);
1103 : UNUSED (topic_len);
1104 :
1105 6 : self = GST_MQTT_SRC_CAST (context);
1106 6 : g_mutex_lock (&self->mqtt_src_mutex);
1107 6 : if (!self->is_subscribed) {
1108 2 : g_mutex_unlock (&self->mqtt_src_mutex);
1109 :
1110 6 : return TRUE;
1111 : }
1112 4 : g_mutex_unlock (&self->mqtt_src_mutex);
1113 :
1114 4 : basesrc = GST_BASE_SRC (self);
1115 4 : clock = gst_element_get_clock (GST_ELEMENT (self));
1116 4 : received_mem = gst_memory_new_wrapped (0, data, size, 0, size, message,
1117 : (GDestroyNotify) cb_memory_wrapped_destroy);
1118 4 : if (!received_mem) {
1119 0 : if (!self->err) {
1120 0 : self->err = g_error_new (self->gquark_err_tag, ENODATA,
1121 : "%s: failed to wrap the raw data of received message in GstMemory: %s",
1122 : __func__, g_strerror (ENODATA));
1123 : }
1124 0 : return TRUE;
1125 : }
1126 :
1127 4 : mqtt_msg_hdr = _extract_mqtt_msg_hdr_from (received_mem, &hdr_mem,
1128 : &hdr_map_info);
1129 4 : if (!mqtt_msg_hdr) {
1130 0 : if (!self->err) {
1131 0 : self->err = g_error_new (self->gquark_err_tag, ENODATA,
1132 : "%s: failed to extract header information from received message: %s",
1133 : __func__, g_strerror (ENODATA));
1134 : }
1135 0 : goto ret_unref_received_mem;
1136 : }
1137 :
1138 4 : if (!self->caps) {
1139 3 : self->caps = gst_caps_from_string (mqtt_msg_hdr->gst_caps_str);
1140 3 : gst_mqtt_src_renegotiate (basesrc);
1141 : } else {
1142 1 : GstCaps *recv_caps = gst_caps_from_string (mqtt_msg_hdr->gst_caps_str);
1143 :
1144 1 : if (recv_caps && !gst_caps_is_equal (self->caps, recv_caps)) {
1145 1 : gst_caps_replace (&self->caps, recv_caps);
1146 1 : gst_mqtt_src_renegotiate (basesrc);
1147 : } else {
1148 0 : gst_caps_replace (&recv_caps, NULL);
1149 : }
1150 : }
1151 :
1152 4 : buffer = gst_buffer_new ();
1153 4 : offset = GST_MQTT_LEN_MSG_HDR;
1154 8 : for (i = 0; i < mqtt_msg_hdr->num_mems; ++i) {
1155 : GstMemory *each_memory;
1156 : int each_size;
1157 :
1158 4 : each_size = mqtt_msg_hdr->size_mems[i];
1159 4 : each_memory = gst_memory_share (received_mem, offset, each_size);
1160 4 : gst_buffer_append_memory (buffer, each_memory);
1161 4 : offset += each_size;
1162 : }
1163 :
1164 : /** Timestamp synchronization */
1165 4 : if (self->debug) {
1166 4 : GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
1167 :
1168 4 : if (clock) {
1169 4 : GST_DEBUG_OBJECT (self,
1170 : "A message has been arrived at %" GST_TIME_FORMAT
1171 : " and queue length is %d",
1172 : GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
1173 : g_async_queue_length (self->aqueue));
1174 :
1175 4 : gst_object_unref (clock);
1176 : }
1177 : }
1178 4 : _put_timestamp_on_gst_buf (self, mqtt_msg_hdr, buffer);
1179 4 : g_async_queue_push (self->aqueue, buffer);
1180 :
1181 4 : gst_memory_unmap (hdr_mem, &hdr_map_info);
1182 4 : gst_memory_unref (hdr_mem);
1183 :
1184 4 : ret_unref_received_mem:
1185 4 : gst_memory_unref (received_mem);
1186 :
1187 4 : return TRUE;
1188 : }
1189 :
1190 : /**
1191 : * @brief A callback invoked when destroying the GstMemory which wrapped the arrived message
1192 : */
1193 : static void
1194 4 : cb_memory_wrapped_destroy (void *p)
1195 : {
1196 4 : MQTTAsync_message *msg = p;
1197 :
1198 4 : MQTTAsync_freeMessage (&msg);
1199 4 : }
1200 :
1201 : /**
1202 : * @brief A callback invoked when the connection is established
1203 : */
1204 : static void
1205 5 : cb_mqtt_on_connect (void *context, MQTTAsync_successData * response)
1206 : {
1207 5 : GstMqttSrc *self = GST_MQTT_SRC (context);
1208 5 : GstBaseSrc *basesrc = GST_BASE_SRC (self);
1209 : int ret;
1210 : UNUSED (response);
1211 :
1212 5 : g_mutex_lock (&self->mqtt_src_mutex);
1213 5 : self->is_connected = TRUE;
1214 5 : g_cond_broadcast (&self->mqtt_src_gcond);
1215 5 : g_mutex_unlock (&self->mqtt_src_mutex);
1216 :
1217 : /** GstFlowReturn is an enum type. It is possible to use int here */
1218 5 : if (gst_base_src_is_async (basesrc) &&
1219 0 : (ret = gst_base_src_start_wait (basesrc)) != GST_FLOW_OK) {
1220 0 : g_mutex_lock (&self->mqtt_src_mutex);
1221 0 : self->err = g_error_new (self->gquark_err_tag, ret,
1222 : "%s: the virtual method, start (), in the GstBaseSrc class fails with return code %d",
1223 : __func__, ret);
1224 0 : g_cond_broadcast (&self->mqtt_src_gcond);
1225 0 : g_mutex_unlock (&self->mqtt_src_mutex);
1226 0 : return;
1227 : }
1228 :
1229 5 : if (!_subscribe (self)) {
1230 2 : GST_ERROR_OBJECT (self, "Failed to subscribe to %s", self->mqtt_topic);
1231 : }
1232 : }
1233 :
1234 : /**
1235 : * @brief A callback invoked when it is failed to connect to the broker
1236 : */
1237 : static void
1238 0 : cb_mqtt_on_connect_failure (void *context, MQTTAsync_failureData * response)
1239 : {
1240 0 : GstMqttSrc *self = GST_MQTT_SRC (context);
1241 :
1242 0 : g_mutex_lock (&self->mqtt_src_mutex);
1243 0 : self->is_connected = FALSE;
1244 :
1245 0 : if (!self->err) {
1246 0 : self->err = g_error_new (self->gquark_err_tag, response->code,
1247 : "%s: failed to connect to the broker: %s", __func__, response->message);
1248 : }
1249 0 : g_cond_broadcast (&self->mqtt_src_gcond);
1250 0 : g_mutex_unlock (&self->mqtt_src_mutex);
1251 0 : }
1252 :
1253 : /**
1254 : * @brief MQTTAsync_responseOptions's onSuccess callback for MQTTAsync_subscribe ()
1255 : */
1256 : static void
1257 3 : cb_mqtt_on_subscribe (void *context, MQTTAsync_successData * response)
1258 : {
1259 3 : GstMqttSrc *self = GST_MQTT_SRC (context);
1260 : UNUSED (response);
1261 :
1262 3 : g_mutex_lock (&self->mqtt_src_mutex);
1263 3 : self->is_subscribed = TRUE;
1264 3 : g_cond_broadcast (&self->mqtt_src_gcond);
1265 3 : g_mutex_unlock (&self->mqtt_src_mutex);
1266 3 : }
1267 :
1268 : /**
1269 : * @brief MQTTAsync_responseOptions's onFailure callback for MQTTAsync_subscribe ()
1270 : */
1271 : static void
1272 2 : cb_mqtt_on_subscribe_failure (void *context, MQTTAsync_failureData * response)
1273 : {
1274 2 : GstMqttSrc *self = GST_MQTT_SRC (context);
1275 :
1276 2 : g_mutex_lock (&self->mqtt_src_mutex);
1277 2 : if (!self->err) {
1278 2 : self->err = g_error_new (self->gquark_err_tag, response->code,
1279 : "%s: failed to subscribe the given topic, %s: %s", __func__,
1280 : self->mqtt_topic, response->message);
1281 : }
1282 2 : g_cond_broadcast (&self->mqtt_src_gcond);
1283 2 : g_mutex_unlock (&self->mqtt_src_mutex);
1284 2 : }
1285 :
1286 : /**
1287 : * @brief MQTTAsync_responseOptions's onSuccess callback for MQTTAsync_unsubscribe ()
1288 : */
1289 : static void
1290 3 : cb_mqtt_on_unsubscribe (void *context, MQTTAsync_successData * response)
1291 : {
1292 3 : GstMqttSrc *self = GST_MQTT_SRC (context);
1293 : UNUSED (response);
1294 :
1295 3 : g_mutex_lock (&self->mqtt_src_mutex);
1296 3 : self->is_subscribed = FALSE;
1297 3 : g_cond_broadcast (&self->mqtt_src_gcond);
1298 3 : g_mutex_unlock (&self->mqtt_src_mutex);
1299 3 : }
1300 :
1301 : /**
1302 : * @brief MQTTAsync_responseOptions's onFailure callback for MQTTAsync_unsubscribe ()
1303 : */
1304 : static void
1305 0 : cb_mqtt_on_unsubscribe_failure (void *context, MQTTAsync_failureData * response)
1306 : {
1307 0 : GstMqttSrc *self = GST_MQTT_SRC (context);
1308 :
1309 0 : g_mutex_lock (&self->mqtt_src_mutex);
1310 0 : if (!self->err) {
1311 0 : self->err = g_error_new (self->gquark_err_tag, response->code,
1312 : "%s: failed to unsubscribe the given topic, %s: %s", __func__,
1313 : self->mqtt_topic, response->message);
1314 : }
1315 0 : g_cond_broadcast (&self->mqtt_src_gcond);
1316 0 : g_mutex_unlock (&self->mqtt_src_mutex);
1317 0 : }
1318 :
1319 : /**
1320 : * @brief A helper function to properly invoke MQTTAsync_subscribe ()
1321 : */
1322 : static gboolean
1323 5 : _subscribe (GstMqttSrc * self)
1324 : {
1325 5 : MQTTAsync_responseOptions opts = self->mqtt_respn_opts;
1326 : int mqttasync_ret;
1327 :
1328 5 : opts.onSuccess = cb_mqtt_on_subscribe;
1329 5 : opts.onFailure = cb_mqtt_on_subscribe_failure;
1330 5 : opts.subscribeOptions.retainHandling = 1;
1331 :
1332 10 : mqttasync_ret = MQTTAsync_subscribe (self->mqtt_client_handle,
1333 5 : self->mqtt_topic, self->mqtt_qos, &opts);
1334 5 : if (mqttasync_ret != MQTTASYNC_SUCCESS)
1335 5 : return FALSE;
1336 3 : return TRUE;
1337 : }
1338 :
1339 : /**
1340 : * @brief A wrapper function that calls MQTTAsync_unsubscribe ()
1341 : */
1342 : static gboolean
1343 3 : _unsubscribe (GstMqttSrc * self)
1344 : {
1345 3 : MQTTAsync_responseOptions opts = self->mqtt_respn_opts;
1346 : int mqttasync_ret;
1347 :
1348 3 : opts.onSuccess = cb_mqtt_on_unsubscribe;
1349 3 : opts.onFailure = cb_mqtt_on_unsubscribe_failure;
1350 :
1351 6 : mqttasync_ret = MQTTAsync_unsubscribe (self->mqtt_client_handle,
1352 3 : self->mqtt_topic, &opts);
1353 3 : if (mqttasync_ret != MQTTASYNC_SUCCESS)
1354 3 : return FALSE;
1355 3 : return TRUE;
1356 : }
1357 :
1358 : /**
1359 : * @brief A utility function to extract header information from a received message
1360 : */
1361 : static GstMQTTMessageHdr *
1362 4 : _extract_mqtt_msg_hdr_from (GstMemory * mem, GstMemory ** hdr_mem,
1363 : GstMapInfo * hdr_map_info)
1364 : {
1365 4 : *hdr_mem = gst_memory_share (mem, 0, GST_MQTT_LEN_MSG_HDR);
1366 4 : g_return_val_if_fail (*hdr_mem != NULL, NULL);
1367 :
1368 4 : if (!gst_memory_map (*hdr_mem, hdr_map_info, GST_MAP_READ)) {
1369 0 : gst_memory_unref (*hdr_mem);
1370 0 : return NULL;
1371 : }
1372 :
1373 4 : return (GstMQTTMessageHdr *) hdr_map_info->data;
1374 : }
1375 :
1376 : /**
1377 : * @brief A utility function to put the timestamp information
1378 : * onto a GstBuffer-typed buffer using the given packet header
1379 : */
1380 : static void
1381 4 : _put_timestamp_on_gst_buf (GstMqttSrc * self, GstMQTTMessageHdr * hdr,
1382 : GstBuffer * buf)
1383 : {
1384 4 : gint64 diff_base_epoch = hdr->base_time_epoch - self->base_time_epoch;
1385 :
1386 4 : buf->pts = GST_CLOCK_TIME_NONE;
1387 4 : buf->dts = GST_CLOCK_TIME_NONE;
1388 4 : buf->duration = GST_CLOCK_TIME_NONE;
1389 :
1390 4 : if (hdr->sent_time_epoch < self->base_time_epoch)
1391 0 : return;
1392 :
1393 4 : if (((GstClockTimeDiff) hdr->pts + diff_base_epoch) < 0)
1394 0 : return;
1395 :
1396 4 : if (hdr->pts != GST_CLOCK_TIME_NONE) {
1397 4 : buf->pts = hdr->pts + diff_base_epoch;
1398 : }
1399 :
1400 4 : if (hdr->dts != GST_CLOCK_TIME_NONE) {
1401 4 : buf->dts = hdr->dts + diff_base_epoch;
1402 : }
1403 :
1404 4 : buf->duration = hdr->duration;
1405 :
1406 4 : if (self->debug) {
1407 4 : GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
1408 : GstClock *clock;
1409 :
1410 4 : clock = gst_element_get_clock (GST_ELEMENT (self));
1411 :
1412 4 : if (clock) {
1413 4 : GST_DEBUG_OBJECT (self,
1414 : "%s diff %" GST_STIME_FORMAT " now %" GST_TIME_FORMAT " ts (%"
1415 : GST_TIME_FORMAT " -> %" GST_TIME_FORMAT ")", self->mqtt_topic,
1416 : GST_STIME_ARGS (diff_base_epoch),
1417 : GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
1418 : GST_TIME_ARGS (hdr->pts), GST_TIME_ARGS (buf->pts));
1419 :
1420 4 : gst_object_unref (clock);
1421 : }
1422 : }
1423 : }
|