LCOV - code coverage report
Current view: top level - nnstreamer-2.4.2/gst/mqtt - mqttsrc.c (source / functions) Coverage Total Hit
Test: nnstreamer 2.4.2-0 nnstreamer/nnstreamer.git#5d55fc62547faa02e861af5ef93cc1c89800934a Lines: 83.0 % 612 508
Test Date: 2024-09-25 09:08:39 Functions: 94.1 % 51 48

            Line data    Source code
       1              : /* SPDX-License-Identifier: LGPL-2.1-only */
       2              : /**
       3              :  * Copyright (C) 2021 Wook Song <wook16.song@samsung.com>
       4              :  */
       5              : /**
       6              :  * @file    mqttsrc.c
       7              :  * @date    08 Mar 2021
       8              :  * @brief   Subscribe a MQTT topic and push incoming data to the GStreamer pipeline
       9              :  * @see     https://github.com/nnstreamer/nnstreamer
      10              :  * @author  Wook Song <wook16.song@samsung.com>
      11              :  * @bug     No known bugs except for NYI items
      12              :  */
      13              : 
      14              : #ifdef HAVE_CONFIG_H
      15              : #include <config.h>
      16              : #endif
      17              : 
      18              : #ifdef G_OS_WIN32
      19              : #include <process.h>
      20              : #else
      21              : #include <sys/types.h>
      22              : #include <unistd.h>
      23              : #endif
      24              : 
      25              : #include <gst/base/gstbasesrc.h>
      26              : #include <MQTTAsync.h>
      27              : #include <nnstreamer_util.h>
      28              : 
      29              : #include "mqttsrc.h"
      30              : 
      31              : static GstStaticPadTemplate src_pad_template = GST_STATIC_PAD_TEMPLATE ("src",
      32              :     GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
      33              : 
      34              : #define gst_mqtt_src_parent_class parent_class
      35          240 : G_DEFINE_TYPE (GstMqttSrc, gst_mqtt_src, GST_TYPE_BASE_SRC);
      36              : 
      37              : GST_DEBUG_CATEGORY_STATIC (gst_mqtt_src_debug);
      38              : #define GST_CAT_DEFAULT gst_mqtt_src_debug
      39              : 
      40              : enum
      41              : {
      42              :   PROP_0,
      43              : 
      44              :   PROP_DEBUG,
      45              :   PROP_IS_LIVE,
      46              :   PROP_MQTT_CLIENT_ID,
      47              :   PROP_MQTT_HOST_ADDRESS,
      48              :   PROP_MQTT_HOST_PORT,
      49              :   PROP_MQTT_SUB_TOPIC,
      50              :   PROP_MQTT_SUB_TIMEOUT,
      51              :   PROP_MQTT_OPT_CLEANSESSION,
      52              :   PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL,
      53              :   PROP_MQTT_QOS,
      54              : 
      55              :   PROP_LAST
      56              : };
      57              : 
      58              : enum
      59              : {
      60              :   DEFAULT_DEBUG = FALSE,
      61              :   DEFAULT_IS_LIVE = TRUE,
      62              :   DEFAULT_MQTT_OPT_CLEANSESSION = TRUE,
      63              :   DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL = 60,    /* 1 minute */
      64              :   DEFAULT_MQTT_SUB_TIMEOUT = 10000000,  /* 10 seconds */
      65              :   DEFAULT_MQTT_SUB_TIMEOUT_MIN = 1000000,       /* 1 seconds */
      66              :   DEFAULT_MQTT_QOS = 2,         /* Once and one only */
      67              : };
      68              : 
      69              : static guint8 src_client_id = 0;
      70              : static const gchar DEFAULT_MQTT_HOST_ADDRESS[] = "127.0.0.1";
      71              : static const gchar DEFAULT_MQTT_HOST_PORT[] = "1883";
      72              : static const gchar TAG_ERR_MQTTSRC[] = "ERROR: MQTTSrc";
      73              : static const gchar DEFAULT_MQTT_CLIENT_ID[] =
      74              :     "$HOSTNAME_$PID_^[0-9][0-9]?$|^255$";
      75              : static const gchar DEFAULT_MQTT_CLIENT_ID_FORMAT[] = "%s_%u_src%u";
      76              : 
      77              : /** Function prototype declarations */
      78              : static void
      79              : gst_mqtt_src_set_property (GObject * object, guint prop_id,
      80              :     const GValue * value, GParamSpec * pspec);
      81              : static void
      82              : gst_mqtt_src_get_property (GObject * object, guint prop_id,
      83              :     GValue * value, GParamSpec * pspec);
      84              : static void gst_mqtt_src_class_finalize (GObject * object);
      85              : 
      86              : static GstStateChangeReturn
      87              : gst_mqtt_src_change_state (GstElement * element, GstStateChange transition);
      88              : 
      89              : static gboolean gst_mqtt_src_start (GstBaseSrc * basesrc);
      90              : static gboolean gst_mqtt_src_stop (GstBaseSrc * basesrc);
      91              : static GstCaps *gst_mqtt_src_get_caps (GstBaseSrc * basesrc, GstCaps * filter);
      92              : static gboolean gst_mqtt_src_renegotiate (GstBaseSrc * basesrc);
      93              : 
      94              : static void
      95              : gst_mqtt_src_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
      96              :     GstClockTime * start, GstClockTime * end);
      97              : static gboolean gst_mqtt_src_is_seekable (GstBaseSrc * basesrc);
      98              : static GstFlowReturn
      99              : gst_mqtt_src_create (GstBaseSrc * basesrc, guint64 offset, guint size,
     100              :     GstBuffer ** buf);
     101              : static gboolean gst_mqtt_src_query (GstBaseSrc * basesrc, GstQuery * query);
     102              : 
     103              : static gboolean gst_mqtt_src_get_debug (GstMqttSrc * self);
     104              : static void gst_mqtt_src_set_debug (GstMqttSrc * self, const gboolean flag);
     105              : static gboolean gst_mqtt_src_get_is_live (GstMqttSrc * self);
     106              : static void gst_mqtt_src_set_is_live (GstMqttSrc * self, const gboolean flag);
     107              : static gchar *gst_mqtt_src_get_client_id (GstMqttSrc * self);
     108              : static void gst_mqtt_src_set_client_id (GstMqttSrc * self, const gchar * id);
     109              : static gchar *gst_mqtt_src_get_host_address (GstMqttSrc * self);
     110              : static void gst_mqtt_src_set_host_address (GstMqttSrc * self,
     111              :     const gchar * addr);
     112              : static gchar *gst_mqtt_src_get_host_port (GstMqttSrc * self);
     113              : static void gst_mqtt_src_set_host_port (GstMqttSrc * self, const gchar * port);
     114              : static gint64 gst_mqtt_src_get_sub_timeout (GstMqttSrc * self);
     115              : static void gst_mqtt_src_set_sub_timeout (GstMqttSrc * self, const gint64 t);
     116              : static gchar *gst_mqtt_src_get_sub_topic (GstMqttSrc * self);
     117              : static void gst_mqtt_src_set_sub_topic (GstMqttSrc * self, const gchar * topic);
     118              : static gboolean gst_mqtt_src_get_opt_cleansession (GstMqttSrc * self);
     119              : static void gst_mqtt_src_set_opt_cleansession (GstMqttSrc * self,
     120              :     const gboolean val);
     121              : static gint gst_mqtt_src_get_opt_keep_alive_interval (GstMqttSrc * self);
     122              : static void gst_mqtt_src_set_opt_keep_alive_interval (GstMqttSrc * self,
     123              :     const gint num);
     124              : static gint gst_mqtt_src_get_mqtt_qos (GstMqttSrc * self);
     125              : static void gst_mqtt_src_set_mqtt_qos (GstMqttSrc * self, const gint qos);
     126              : 
     127              : static void cb_mqtt_on_connection_lost (void *context, char *cause);
     128              : static int cb_mqtt_on_message_arrived (void *context, char *topic_name,
     129              :     int topic_len, MQTTAsync_message * message);
     130              : static void cb_mqtt_on_connect (void *context,
     131              :     MQTTAsync_successData * response);
     132              : static void cb_mqtt_on_connect_failure (void *context,
     133              :     MQTTAsync_failureData * response);
     134              : static void cb_mqtt_on_subscribe (void *context,
     135              :     MQTTAsync_successData * response);
     136              : static void cb_mqtt_on_subscribe_failure (void *context,
     137              :     MQTTAsync_failureData * response);
     138              : static void cb_mqtt_on_unsubscribe (void *context,
     139              :     MQTTAsync_successData * response);
     140              : static void cb_mqtt_on_unsubscribe_failure (void *context,
     141              :     MQTTAsync_failureData * response);
     142              : 
     143              : static void cb_memory_wrapped_destroy (void *p);
     144              : 
     145              : static GstMQTTMessageHdr *_extract_mqtt_msg_hdr_from (GstMemory * mem,
     146              :     GstMemory ** hdr_mem, GstMapInfo * hdr_map_info);
     147              : static void _put_timestamp_on_gst_buf (GstMqttSrc * self,
     148              :     GstMQTTMessageHdr * hdr, GstBuffer * buf);
     149              : static gboolean _subscribe (GstMqttSrc * self);
     150              : static gboolean _unsubscribe (GstMqttSrc * self);
     151              : 
     152              : /**
     153              :  * @brief A utility function to check whether the timestamp marked by _put_timestamp_on_gst_buf () is valid or not
     154              :  */
     155              : static inline gboolean
     156            4 : _is_gst_buffer_timestamp_valid (GstBuffer * buf)
     157              : {
     158            4 :   if (!GST_BUFFER_PTS_IS_VALID (buf) && !GST_BUFFER_DTS_IS_VALID (buf) &&
     159            0 :       !GST_BUFFER_DURATION_IS_VALID (buf))
     160            0 :     return FALSE;
     161            4 :   return TRUE;
     162              : }
     163              : 
     164              : /** Function definitions */
     165              : /**
     166              :  * @brief Initialize GstMqttSrc object
     167              :  */
     168              : static void
     169            7 : gst_mqtt_src_init (GstMqttSrc * self)
     170              : {
     171            7 :   MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
     172            7 :   MQTTAsync_responseOptions respn_opts = MQTTAsync_responseOptions_initializer;
     173            7 :   GstBaseSrc *basesrc = GST_BASE_SRC (self);
     174              : 
     175            7 :   self->gquark_err_tag = g_quark_from_string (TAG_ERR_MQTTSRC);
     176              : 
     177            7 :   gst_base_src_set_format (basesrc, GST_FORMAT_TIME);
     178            7 :   gst_base_src_set_async (basesrc, FALSE);
     179              : 
     180              :   /** init mqttsrc properties */
     181            7 :   self->mqtt_client_handle = NULL;
     182            7 :   self->debug = DEFAULT_DEBUG;
     183            7 :   self->is_live = DEFAULT_IS_LIVE;
     184            7 :   self->mqtt_client_id = g_strdup (DEFAULT_MQTT_CLIENT_ID);
     185            7 :   self->mqtt_host_address = g_strdup (DEFAULT_MQTT_HOST_ADDRESS);
     186            7 :   self->mqtt_host_port = g_strdup (DEFAULT_MQTT_HOST_PORT);
     187            7 :   self->mqtt_topic = NULL;
     188            7 :   self->mqtt_sub_timeout = (gint64) DEFAULT_MQTT_SUB_TIMEOUT;
     189            7 :   self->mqtt_conn_opts = conn_opts;
     190            7 :   self->mqtt_conn_opts.cleansession = DEFAULT_MQTT_OPT_CLEANSESSION;
     191            7 :   self->mqtt_conn_opts.keepAliveInterval = DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL;
     192            7 :   self->mqtt_conn_opts.onSuccess = cb_mqtt_on_connect;
     193            7 :   self->mqtt_conn_opts.onFailure = cb_mqtt_on_connect_failure;
     194            7 :   self->mqtt_conn_opts.context = self;
     195            7 :   self->mqtt_respn_opts = respn_opts;
     196            7 :   self->mqtt_respn_opts.onSuccess = NULL;
     197            7 :   self->mqtt_respn_opts.onFailure = NULL;
     198            7 :   self->mqtt_respn_opts.context = self;
     199            7 :   self->mqtt_qos = DEFAULT_MQTT_QOS;
     200              : 
     201              :   /** init private member variables */
     202            7 :   self->err = NULL;
     203            7 :   self->aqueue = g_async_queue_new ();
     204            7 :   g_cond_init (&self->mqtt_src_gcond);
     205            7 :   g_mutex_init (&self->mqtt_src_mutex);
     206            7 :   g_mutex_lock (&self->mqtt_src_mutex);
     207            7 :   self->is_connected = FALSE;
     208            7 :   self->is_subscribed = FALSE;
     209            7 :   self->latency = GST_CLOCK_TIME_NONE;
     210            7 :   g_mutex_unlock (&self->mqtt_src_mutex);
     211            7 :   self->base_time_epoch = GST_CLOCK_TIME_NONE;
     212            7 :   self->caps = NULL;
     213            7 :   self->num_dumped = 0;
     214              : 
     215            7 :   gst_base_src_set_live (basesrc, self->is_live);
     216            7 : }
     217              : 
     218              : /**
     219              :  * @brief Initialize GstMqttSrcClass object
     220              :  */
     221              : static void
     222           21 : gst_mqtt_src_class_init (GstMqttSrcClass * klass)
     223              : {
     224           21 :   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
     225           21 :   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
     226           21 :   GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
     227              : 
     228           21 :   GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_MQTT_ELEM_NAME_SRC, 0,
     229              :       "MQTT src");
     230              : 
     231           21 :   gobject_class->set_property = gst_mqtt_src_set_property;
     232           21 :   gobject_class->get_property = gst_mqtt_src_get_property;
     233           21 :   gobject_class->finalize = gst_mqtt_src_class_finalize;
     234              : 
     235           21 :   g_object_class_install_property (gobject_class, PROP_DEBUG,
     236              :       g_param_spec_boolean ("debug", "Debug",
     237              :           "Produce extra verbose output for debug purpose", DEFAULT_DEBUG,
     238              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     239              : 
     240           21 :   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
     241              :       g_param_spec_boolean ("is-live", "Is Live",
     242              :           "Synchronize the incoming buffers' timestamp with the current running time",
     243              :           DEFAULT_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     244              : 
     245           21 :   g_object_class_install_property (gobject_class, PROP_MQTT_CLIENT_ID,
     246              :       g_param_spec_string ("client-id", "Client ID",
     247              :           "The client identifier passed to the server (broker)",
     248              :           DEFAULT_MQTT_CLIENT_ID, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     249              : 
     250           21 :   g_object_class_install_property (gobject_class, PROP_MQTT_HOST_ADDRESS,
     251              :       g_param_spec_string ("host", "Host", "Host (broker) to connect to",
     252              :           DEFAULT_MQTT_HOST_ADDRESS,
     253              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     254              : 
     255           21 :   g_object_class_install_property (gobject_class, PROP_MQTT_HOST_PORT,
     256              :       g_param_spec_string ("port", "Port",
     257              :           "Network port of host (broker) to connect to", DEFAULT_MQTT_HOST_PORT,
     258              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     259              : 
     260           21 :   g_object_class_install_property (gobject_class, PROP_MQTT_SUB_TIMEOUT,
     261              :       g_param_spec_int64 ("sub-timeout", "Timeout for receiving a message",
     262              :           "The timeout (in microseconds) for receiving a message from subscribed topic",
     263              :           1000000, G_MAXINT64, DEFAULT_MQTT_SUB_TIMEOUT,
     264              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     265              : 
     266           21 :   g_object_class_install_property (gobject_class, PROP_MQTT_SUB_TOPIC,
     267              :       g_param_spec_string ("sub-topic", "Topic to Subscribe (mandatory)",
     268              :           "The topic's name to subscribe", NULL,
     269              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     270              : 
     271           21 :   g_object_class_install_property (gobject_class, PROP_MQTT_OPT_CLEANSESSION,
     272              :       g_param_spec_boolean ("cleansession", "Cleansession",
     273              :           "When it is TRUE, the state information is discarded at connect and disconnect.",
     274              :           DEFAULT_MQTT_OPT_CLEANSESSION,
     275              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     276              : 
     277           21 :   g_object_class_install_property (gobject_class,
     278              :       PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL,
     279              :       g_param_spec_int ("keep-alive-interval", "Keep Alive Interval",
     280              :           "The maximum time (in seconds) that should pass without communication between the client and the server (broker)",
     281              :           1, G_MAXINT32, DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL,
     282              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     283              : 
     284           21 :   g_object_class_install_property (gobject_class, PROP_MQTT_QOS,
     285              :       g_param_spec_int ("mqtt-qos", "mqtt QoS level",
     286              :           "The QoS level of MQTT.\n"
     287              :           "\t\t\t  0: At most once\n"
     288              :           "\t\t\t  1: At least once\n"
     289              :           "\t\t\t  2: Exactly once\n"
     290              :           "\t\t\tsee also: https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/qos.html",
     291              :           0, 2, DEFAULT_MQTT_QOS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     292              : 
     293           21 :   gstelement_class->change_state =
     294           21 :       GST_DEBUG_FUNCPTR (gst_mqtt_src_change_state);
     295              : 
     296           21 :   gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_mqtt_src_start);
     297           21 :   gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_mqtt_src_stop);
     298           21 :   gstbasesrc_class->get_caps = GST_DEBUG_FUNCPTR (gst_mqtt_src_get_caps);
     299           21 :   gstbasesrc_class->get_times = GST_DEBUG_FUNCPTR (gst_mqtt_src_get_times);
     300           21 :   gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR (gst_mqtt_src_is_seekable);
     301           21 :   gstbasesrc_class->create = GST_DEBUG_FUNCPTR (gst_mqtt_src_create);
     302           21 :   gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_mqtt_src_query);
     303              : 
     304           21 :   gst_element_class_set_static_metadata (gstelement_class,
     305              :       "MQTT source", "Source/MQTT",
     306              :       "Subscribe a MQTT topic and push incoming data to the GStreamer pipeline",
     307              :       "Wook Song <wook16.song@samsung.com>");
     308           21 :   gst_element_class_add_static_pad_template (gstelement_class,
     309              :       &src_pad_template);
     310           21 : }
     311              : 
     312              : /**
     313              :  * @brief The setter for the mqttsrc's properties
     314              :  */
     315              : static void
     316           30 : gst_mqtt_src_set_property (GObject * object, guint prop_id,
     317              :     const GValue * value, GParamSpec * pspec)
     318              : {
     319           30 :   GstMqttSrc *self = GST_MQTT_SRC (object);
     320              : 
     321           30 :   switch (prop_id) {
     322            6 :     case PROP_DEBUG:
     323            6 :       gst_mqtt_src_set_debug (self, g_value_get_boolean (value));
     324            6 :       break;
     325            6 :     case PROP_IS_LIVE:
     326            6 :       gst_mqtt_src_set_is_live (self, g_value_get_boolean (value));
     327            6 :       break;
     328            1 :     case PROP_MQTT_CLIENT_ID:
     329            1 :       gst_mqtt_src_set_client_id (self, g_value_get_string (value));
     330            1 :       break;
     331            1 :     case PROP_MQTT_HOST_ADDRESS:
     332            1 :       gst_mqtt_src_set_host_address (self, g_value_get_string (value));
     333            1 :       break;
     334            1 :     case PROP_MQTT_HOST_PORT:
     335            1 :       gst_mqtt_src_set_host_port (self, g_value_get_string (value));
     336            1 :       break;
     337            6 :     case PROP_MQTT_SUB_TIMEOUT:
     338            6 :       gst_mqtt_src_set_sub_timeout (self, g_value_get_int64 (value));
     339            6 :       break;
     340            6 :     case PROP_MQTT_SUB_TOPIC:
     341            6 :       gst_mqtt_src_set_sub_topic (self, g_value_get_string (value));
     342            6 :       break;
     343            1 :     case PROP_MQTT_OPT_CLEANSESSION:
     344            1 :       gst_mqtt_src_set_opt_cleansession (self, g_value_get_boolean (value));
     345            1 :       break;
     346            1 :     case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
     347            1 :       gst_mqtt_src_set_opt_keep_alive_interval (self, g_value_get_int (value));
     348            1 :       break;
     349            1 :     case PROP_MQTT_QOS:
     350            1 :       gst_mqtt_src_set_mqtt_qos (self, g_value_get_int (value));
     351            1 :       break;
     352            0 :     default:
     353            0 :       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
     354            0 :       break;
     355              :   }
     356           30 : }
     357              : 
     358              : /**
     359              :  * @brief The getter for the mqttsrc's properties
     360              :  */
     361              : static void
     362           14 : gst_mqtt_src_get_property (GObject * object, guint prop_id,
     363              :     GValue * value, GParamSpec * pspec)
     364              : {
     365           14 :   GstMqttSrc *self = GST_MQTT_SRC (object);
     366              : 
     367           14 :   switch (prop_id) {
     368            2 :     case PROP_DEBUG:
     369            2 :       g_value_set_boolean (value, gst_mqtt_src_get_debug (self));
     370            2 :       break;
     371            1 :     case PROP_IS_LIVE:
     372            1 :       g_value_set_boolean (value, gst_mqtt_src_get_is_live (self));
     373            1 :       break;
     374            1 :     case PROP_MQTT_CLIENT_ID:
     375            1 :       g_value_set_string (value, gst_mqtt_src_get_client_id (self));
     376            1 :       break;
     377            1 :     case PROP_MQTT_HOST_ADDRESS:
     378            1 :       g_value_set_string (value, gst_mqtt_src_get_host_address (self));
     379            1 :       break;
     380            1 :     case PROP_MQTT_HOST_PORT:
     381            1 :       g_value_set_string (value, gst_mqtt_src_get_host_port (self));
     382            1 :       break;
     383            2 :     case PROP_MQTT_SUB_TIMEOUT:
     384            2 :       g_value_set_int64 (value, gst_mqtt_src_get_sub_timeout (self));
     385            2 :       break;
     386            1 :     case PROP_MQTT_SUB_TOPIC:
     387            1 :       g_value_set_string (value, gst_mqtt_src_get_sub_topic (self));
     388            1 :       break;
     389            1 :     case PROP_MQTT_OPT_CLEANSESSION:
     390            1 :       g_value_set_boolean (value, gst_mqtt_src_get_opt_cleansession (self));
     391            1 :       break;
     392            2 :     case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
     393            2 :       g_value_set_int (value, gst_mqtt_src_get_opt_keep_alive_interval (self));
     394            2 :       break;
     395            2 :     case PROP_MQTT_QOS:
     396            2 :       g_value_set_int (value, gst_mqtt_src_get_mqtt_qos (self));
     397            2 :       break;
     398            0 :     default:
     399            0 :       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
     400            0 :       break;
     401              :   }
     402           14 : }
     403              : 
     404              : /**
     405              :  * @brief Finalize GstMqttSrcClass object
     406              :  */
     407              : static void
     408            7 : gst_mqtt_src_class_finalize (GObject * object)
     409              : {
     410            7 :   GstMqttSrc *self = GST_MQTT_SRC (object);
     411              :   GstBuffer *remained;
     412              : 
     413            7 :   if (self->mqtt_client_handle) {
     414            0 :     MQTTAsync_destroy (&self->mqtt_client_handle);
     415            0 :     self->mqtt_client_handle = NULL;
     416              :   }
     417              : 
     418            7 :   g_free (self->mqtt_client_id);
     419            7 :   g_free (self->mqtt_host_address);
     420            7 :   g_free (self->mqtt_host_port);
     421            7 :   g_free (self->mqtt_topic);
     422            7 :   gst_caps_replace (&self->caps, NULL);
     423              : 
     424            7 :   if (self->err)
     425            2 :     g_error_free (self->err);
     426              : 
     427            7 :   while ((remained = g_async_queue_try_pop (self->aqueue))) {
     428            0 :     gst_buffer_unref (remained);
     429              :   }
     430            7 :   g_clear_pointer (&self->aqueue, g_async_queue_unref);
     431              : 
     432            7 :   g_mutex_clear (&self->mqtt_src_mutex);
     433            7 :   G_OBJECT_CLASS (parent_class)->finalize (object);
     434            7 : }
     435              : 
     436              : /**
     437              :  * @brief Handle mqttsrc's state change
     438              :  */
     439              : static GstStateChangeReturn
     440           32 : gst_mqtt_src_change_state (GstElement * element, GstStateChange transition)
     441              : {
     442           32 :   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
     443           32 :   GstMqttSrc *self = GST_MQTT_SRC (element);
     444           32 :   gboolean no_preroll = FALSE;
     445              :   GstClock *elem_clock;
     446              :   GstClockTime base_time;
     447              :   GstClockTime cur_time;
     448              :   GstClockTimeDiff diff;
     449              : 
     450           32 :   switch (transition) {
     451            5 :     case GST_STATE_CHANGE_NULL_TO_READY:
     452            5 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_NULL_TO_READY");
     453            5 :       if (self->err) {
     454            0 :         g_printerr ("%s: %s\n", g_quark_to_string (self->err->domain),
     455            0 :             self->err->message);
     456            0 :         return GST_STATE_CHANGE_FAILURE;
     457              :       }
     458            5 :       break;
     459            5 :     case GST_STATE_CHANGE_READY_TO_PAUSED:
     460            5 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_PAUSED");
     461              :       /* Regardless of the 'is-live''s value, prerolling is not supported */
     462            5 :       no_preroll = TRUE;
     463            5 :       break;
     464            5 :     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
     465            5 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_PLAYING");
     466            5 :       self->base_time_epoch = GST_CLOCK_TIME_NONE;
     467            5 :       elem_clock = gst_element_get_clock (element);
     468            5 :       if (!elem_clock)
     469            0 :         break;
     470            5 :       base_time = gst_element_get_base_time (element);
     471            5 :       cur_time = gst_clock_get_time (elem_clock);
     472            5 :       gst_object_unref (elem_clock);
     473            5 :       diff = GST_CLOCK_DIFF (base_time, cur_time);
     474            5 :       self->base_time_epoch =
     475            5 :           g_get_real_time () * GST_US_TO_NS_MULTIPLIER - diff;
     476              : 
     477              :       /** This handles the case when the state is changed to PLAYING again */
     478            5 :       if (GST_BASE_SRC_IS_STARTED (GST_BASE_SRC (self)) &&
     479            5 :           (self->is_connected == FALSE)) {
     480            0 :         int conn = MQTTAsync_reconnect (self->mqtt_client_handle);
     481              : 
     482            0 :         if (conn != MQTTASYNC_SUCCESS) {
     483            0 :           GST_ERROR_OBJECT (self, "Failed to re-subscribe to %s",
     484              :               self->mqtt_topic);
     485              : 
     486            0 :           return GST_STATE_CHANGE_FAILURE;
     487              :         }
     488              :       }
     489            5 :       break;
     490           17 :     default:
     491           17 :       break;
     492              :   }
     493              : 
     494           32 :   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
     495              : 
     496           32 :   switch (transition) {
     497            5 :     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
     498            5 :       if (self->is_subscribed && !_unsubscribe (self)) {
     499            0 :         GST_ERROR_OBJECT (self, "Cannot unsubscribe to %s", self->mqtt_topic);
     500              :       }
     501            5 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PLAYING_TO_PAUSED");
     502            5 :       break;
     503            5 :     case GST_STATE_CHANGE_PAUSED_TO_READY:
     504            5 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_READY");
     505            5 :       break;
     506            5 :     case GST_STATE_CHANGE_READY_TO_NULL:
     507            5 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_NULL");
     508              :     default:
     509           22 :       break;
     510              :   }
     511              : 
     512           32 :   if (no_preroll && ret == GST_STATE_CHANGE_SUCCESS)
     513            0 :     ret = GST_STATE_CHANGE_NO_PREROLL;
     514              : 
     515           32 :   return ret;
     516              : }
     517              : 
     518              : /**
     519              :  * @brief Start mqttsrc, called when state changed null to ready
     520              :  */
     521              : static gboolean
     522            5 : gst_mqtt_src_start (GstBaseSrc * basesrc)
     523              : {
     524            5 :   GstMqttSrc *self = GST_MQTT_SRC (basesrc);
     525            5 :   gchar *haddr = g_strdup_printf ("%s:%s", self->mqtt_host_address,
     526              :       self->mqtt_host_port);
     527              :   int ret;
     528              :   gint64 end_time;
     529              : 
     530            5 :   if (!g_strcmp0 (DEFAULT_MQTT_CLIENT_ID, self->mqtt_client_id)) {
     531            5 :     g_free (self->mqtt_client_id);
     532            5 :     self->mqtt_client_id = g_strdup_printf (DEFAULT_MQTT_CLIENT_ID_FORMAT,
     533            5 :         g_get_host_name (), getpid (), src_client_id++);
     534              :   }
     535              : 
     536              :   /**
     537              :    * @todo Support other persistence mechanisms
     538              :    *    MQTTCLIENT_PERSISTENCE_NONE: A memory-based persistence mechanism
     539              :    *    MQTTCLIENT_PERSISTENCE_DEFAULT: The default file system-based
     540              :    *                                    persistence mechanism
     541              :    *    MQTTCLIENT_PERSISTENCE_USER: An application-specific persistence
     542              :    *                                 mechanism
     543              :    */
     544           10 :   ret = MQTTAsync_create (&self->mqtt_client_handle, haddr,
     545            5 :       self->mqtt_client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
     546            5 :   g_free (haddr);
     547            5 :   if (ret != MQTTASYNC_SUCCESS)
     548            0 :     return FALSE;
     549              : 
     550            5 :   MQTTAsync_setCallbacks (self->mqtt_client_handle, self,
     551              :       cb_mqtt_on_connection_lost, cb_mqtt_on_message_arrived, NULL);
     552              : 
     553            5 :   ret = MQTTAsync_connect (self->mqtt_client_handle, &self->mqtt_conn_opts);
     554            5 :   if (ret != MQTTASYNC_SUCCESS)
     555            0 :     goto error;
     556              : 
     557              :   /* Waiting for the connection */
     558            5 :   end_time = g_get_monotonic_time () +
     559              :       DEFAULT_MQTT_CONN_TIMEOUT_SEC * G_TIME_SPAN_SECOND;
     560            5 :   g_mutex_lock (&self->mqtt_src_mutex);
     561            5 :   while (!self->is_connected) {
     562            0 :     if (!g_cond_wait_until (&self->mqtt_src_gcond, &self->mqtt_src_mutex,
     563              :             end_time)) {
     564            0 :       g_mutex_unlock (&self->mqtt_src_mutex);
     565            0 :       g_critical ("Failed to connect to MQTT broker from mqttsrc."
     566              :           "Please check broker is running status or broker host address.");
     567            0 :       goto error;
     568              :     }
     569              :   }
     570            5 :   g_mutex_unlock (&self->mqtt_src_mutex);
     571            5 :   return TRUE;
     572              : 
     573            0 : error:
     574            0 :   MQTTAsync_destroy (&self->mqtt_client_handle);
     575            0 :   self->mqtt_client_handle = NULL;
     576            0 :   return FALSE;
     577              : }
     578              : 
     579              : /**
     580              :  * @brief Stop mqttsrc, called when state changed ready to null
     581              :  */
     582              : static gboolean
     583            5 : gst_mqtt_src_stop (GstBaseSrc * basesrc)
     584              : {
     585            5 :   GstMqttSrc *self = GST_MQTT_SRC (basesrc);
     586              : 
     587              :   /* todo */
     588            5 :   MQTTAsync_disconnect (self->mqtt_client_handle, NULL);
     589            5 :   g_mutex_lock (&self->mqtt_src_mutex);
     590            5 :   self->is_connected = FALSE;
     591            5 :   g_mutex_unlock (&self->mqtt_src_mutex);
     592            5 :   MQTTAsync_destroy (&self->mqtt_client_handle);
     593            5 :   self->mqtt_client_handle = NULL;
     594            5 :   return TRUE;
     595              : }
     596              : 
     597              : /**
     598              :  * @brief Get caps of subclass
     599              :  */
     600              : static GstCaps *
     601           51 : gst_mqtt_src_get_caps (GstBaseSrc * basesrc, GstCaps * filter)
     602              : {
     603           51 :   GstPad *pad = GST_BASE_SRC_PAD (basesrc);
     604           51 :   GstCaps *cur_caps = gst_pad_get_current_caps (pad);
     605           51 :   GstCaps *caps = gst_caps_new_any ();
     606              :   UNUSED (filter);
     607              : 
     608           51 :   if (cur_caps) {
     609              :     GstCaps *intersection =
     610            1 :         gst_caps_intersect_full (cur_caps, caps, GST_CAPS_INTERSECT_FIRST);
     611              : 
     612            1 :     gst_caps_unref (cur_caps);
     613            1 :     gst_caps_unref (caps);
     614            1 :     caps = intersection;
     615              :   }
     616              : 
     617           51 :   return caps;
     618              : }
     619              : 
     620              : /**
     621              :  * @brief Do negotiation procedure again if it needed
     622              :  */
     623              : static gboolean
     624            4 : gst_mqtt_src_renegotiate (GstBaseSrc * basesrc)
     625              : {
     626            4 :   GstMqttSrc *self = GST_MQTT_SRC (basesrc);
     627            4 :   GstCaps *caps = NULL;
     628            4 :   GstCaps *peercaps = NULL;
     629              :   GstCaps *thiscaps;
     630            4 :   gboolean result = FALSE;
     631            4 :   GstCaps *fixed_caps = NULL;
     632              : 
     633            4 :   if (self->caps == NULL || gst_caps_is_any (self->caps))
     634            0 :     goto no_nego_needed;
     635              : 
     636            4 :   thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
     637            4 :   if (thiscaps && gst_caps_is_equal (self->caps, thiscaps)) {
     638            0 :     gst_caps_unref (thiscaps);
     639            0 :     goto no_nego_needed;
     640              :   }
     641              : 
     642            4 :   peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), self->caps);
     643            4 :   if (peercaps && !gst_caps_is_empty (peercaps)) {
     644            3 :     caps = gst_caps_ref (peercaps);
     645            3 :     if (peercaps != self->caps)
     646            3 :       gst_caps_unref (peercaps);
     647              :   } else {
     648            1 :     caps = gst_caps_ref (self->caps);
     649              :   }
     650              : 
     651            4 :   if (caps && !gst_caps_is_empty (caps)) {
     652            4 :     if (gst_caps_is_any (caps)) {
     653            0 :       result = TRUE;
     654              :     } else {
     655            4 :       fixed_caps = gst_caps_fixate (caps);
     656            4 :       if (fixed_caps && gst_caps_is_fixed (fixed_caps)) {
     657            4 :         result = gst_base_src_set_caps (basesrc, fixed_caps);
     658            4 :         if (peercaps == self->caps)
     659            0 :           gst_caps_unref (fixed_caps);
     660              :       }
     661              :     }
     662            4 :     gst_caps_unref (caps);
     663              :   } else {
     664            0 :     result = FALSE;
     665            0 :     if (caps && gst_caps_is_empty (caps))
     666            0 :       gst_caps_unref (caps);
     667              :   }
     668              : 
     669            4 :   if (thiscaps)
     670            4 :     gst_caps_unref (thiscaps);
     671              : 
     672            4 :   return result;
     673              : 
     674            0 : no_nego_needed:
     675              :   {
     676            0 :     GST_DEBUG_OBJECT (self, "no negotiation needed");
     677              : 
     678            0 :     return TRUE;
     679              :   }
     680              : }
     681              : 
     682              : /**
     683              :  * @brief Return the time information of the given buffer
     684              :  */
     685              : static void
     686            4 : gst_mqtt_src_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
     687              :     GstClockTime * start, GstClockTime * end)
     688              : {
     689              :   GstClockTime sync_ts;
     690              :   GstClockTime duration;
     691              :   UNUSED (basesrc);
     692              : 
     693            4 :   sync_ts = GST_BUFFER_DTS (buffer);
     694            4 :   duration = GST_BUFFER_DURATION (buffer);
     695              : 
     696            4 :   if (!GST_CLOCK_TIME_IS_VALID (sync_ts))
     697            0 :     sync_ts = GST_BUFFER_PTS (buffer);
     698              : 
     699            4 :   if (GST_CLOCK_TIME_IS_VALID (sync_ts)) {
     700            4 :     *start = sync_ts;
     701            4 :     if (GST_CLOCK_TIME_IS_VALID (duration)) {
     702            4 :       *end = sync_ts + duration;
     703              :     }
     704              :   }
     705            4 : }
     706              : 
     707              : /**
     708              :  * @brief Check if source supports seeking
     709              :  * @note Seeking is not supported since this element handles live subscription data.
     710              :  */
     711              : static gboolean
     712            5 : gst_mqtt_src_is_seekable (GstBaseSrc * basesrc)
     713              : {
     714              :   UNUSED (basesrc);
     715            5 :   return FALSE;
     716              : }
     717              : 
     718              : /**
     719              :  * @brief Create a buffer containing the subscribed data
     720              :  */
     721              : static GstFlowReturn
     722            6 : gst_mqtt_src_create (GstBaseSrc * basesrc, guint64 offset, guint size,
     723              :     GstBuffer ** buf)
     724              : {
     725            6 :   GstMqttSrc *self = GST_MQTT_SRC (basesrc);
     726            6 :   gint64 elapsed = self->mqtt_sub_timeout;
     727              :   UNUSED (offset);
     728              :   UNUSED (size);
     729              : 
     730            6 :   g_mutex_lock (&self->mqtt_src_mutex);
     731            6 :   while ((!self->is_connected) || (!self->is_subscribed)) {
     732            2 :     gint64 end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND;
     733              : 
     734            2 :     g_cond_wait_until (&self->mqtt_src_gcond, &self->mqtt_src_mutex, end_time);
     735            2 :     if (self->err) {
     736            2 :       g_mutex_unlock (&self->mqtt_src_mutex);
     737            2 :       goto ret_flow_err;
     738              :     }
     739              :   }
     740            4 :   g_mutex_unlock (&self->mqtt_src_mutex);
     741              : 
     742            4 :   while (elapsed > 0) {
     743              :     /** @todo DEFAULT_MQTT_SUB_TIMEOUT_MIN is too long */
     744            4 :     *buf = g_async_queue_timeout_pop (self->aqueue,
     745              :         DEFAULT_MQTT_SUB_TIMEOUT_MIN);
     746            4 :     if (*buf) {
     747            4 :       GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
     748            4 :       GstClockTime ulatency = GST_CLOCK_TIME_NONE;
     749              :       GstClock *clock;
     750              : 
     751              :       /** This buffer is coming from the past. Drop it. */
     752            4 :       if (!_is_gst_buffer_timestamp_valid (*buf)) {
     753            0 :         if (self->debug) {
     754            0 :           GST_DEBUG_OBJECT (self,
     755              :               "%s: Dumped the received buffer! (total: %" G_GUINT64_FORMAT ")",
     756              :               self->mqtt_topic, ++self->num_dumped);
     757              :         }
     758            0 :         elapsed = self->mqtt_sub_timeout;
     759            0 :         gst_buffer_unref (*buf);
     760            0 :         continue;
     761              :       }
     762              : 
     763              :       /** Update latency */
     764            4 :       clock = gst_element_get_clock (GST_ELEMENT (self));
     765            4 :       if (clock) {
     766            4 :         GstClockTime cur_time = gst_clock_get_time (clock);
     767            4 :         GstClockTime buf_ts = GST_BUFFER_TIMESTAMP (*buf);
     768            4 :         GstClockTimeDiff latency = 0;
     769              : 
     770            4 :         if ((base_time != GST_CLOCK_TIME_NONE) &&
     771            4 :             (cur_time != GST_CLOCK_TIME_NONE) &&
     772              :             (buf_ts != GST_CLOCK_TIME_NONE)) {
     773            4 :           GstClockTimeDiff now = GST_CLOCK_DIFF (base_time, cur_time);
     774              : 
     775            4 :           latency = GST_CLOCK_DIFF (buf_ts, (GstClockTime) now);
     776              :         }
     777              : 
     778            4 :         if (latency > 0) {
     779            0 :           ulatency = (GstClockTime) latency;
     780              : 
     781            0 :           if (GST_BUFFER_DURATION_IS_VALID (*buf)) {
     782            0 :             GstClockTime duration = GST_BUFFER_DURATION (*buf);
     783              : 
     784            0 :             if (duration >= ulatency) {
     785            0 :               ulatency = GST_CLOCK_TIME_NONE;
     786              :             }
     787              :           }
     788              :         }
     789            4 :         gst_object_unref (clock);
     790              :       }
     791              : 
     792            4 :       g_mutex_lock (&self->mqtt_src_mutex);
     793            4 :       self->latency = ulatency;
     794            4 :       g_mutex_unlock (&self->mqtt_src_mutex);
     795              :       /**
     796              :        * @todo If the difference between new latency and old latency,
     797              :        *      gst_element_post_message (GST_ELEMENT_CAST (self),
     798              :        *          gst_message_new_latency (GST_OBJECT_CAST (self)));
     799              :        *      is needed.
     800              :        */
     801            4 :       break;
     802            0 :     } else if (self->err) {
     803            0 :       break;
     804              :     }
     805            0 :     elapsed = elapsed - DEFAULT_MQTT_SUB_TIMEOUT_MIN;
     806              :   }
     807              : 
     808            4 :   if (*buf == NULL) {
     809              :     /** @todo: Send EoS here */
     810            0 :     if (!self->err)
     811            0 :       self->err = g_error_new (self->gquark_err_tag, GST_FLOW_EOS,
     812              :           "%s: Timeout for receiving a message has been expired. Regarding as an error",
     813              :           __func__);
     814            0 :     goto ret_flow_err;
     815              :   }
     816              : 
     817            4 :   return GST_FLOW_OK;
     818              : 
     819            2 : ret_flow_err:
     820            2 :   if (self->err) {
     821            2 :     g_printerr ("%s: %s\n", g_quark_to_string (self->err->domain),
     822            2 :         self->err->message);
     823              :   }
     824            2 :   return GST_FLOW_ERROR;
     825              : }
     826              : 
     827              : /**
     828              :  * @brief An implementation of the GstBaseSrc vmethod that handles queries
     829              :  */
     830              : static gboolean
     831           59 : gst_mqtt_src_query (GstBaseSrc * basesrc, GstQuery * query)
     832              : {
     833           59 :   GstQueryType type = GST_QUERY_TYPE (query);
     834           59 :   GstMqttSrc *self = GST_MQTT_SRC (basesrc);
     835           59 :   gboolean res = FALSE;
     836              : 
     837           59 :   if (self->debug)
     838           57 :     GST_DEBUG_OBJECT (self, "Got %s event", gst_query_type_get_name (type));
     839              : 
     840           59 :   switch (type) {
     841            3 :     case GST_QUERY_LATENCY:{
     842            3 :       GstClockTime min_latency = 0;
     843            3 :       GstClockTime max_latency = GST_CLOCK_TIME_NONE;
     844              : 
     845            3 :       g_mutex_lock (&self->mqtt_src_mutex);
     846            3 :       if (self->latency != GST_CLOCK_TIME_NONE) {
     847            0 :         min_latency = self->latency;
     848              :       }
     849            3 :       g_mutex_unlock (&self->mqtt_src_mutex);
     850              : 
     851            3 :       if (self->debug) {
     852            3 :         GST_DEBUG_OBJECT (self,
     853              :             "Reporting latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT,
     854              :             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
     855              :       }
     856              :       /**
     857              :        * @brief The second argument of gst_query_set_latency should be always
     858              :        *        TRUE.
     859              :        */
     860            3 :       gst_query_set_latency (query, TRUE, min_latency, max_latency);
     861              : 
     862            3 :       res = TRUE;
     863            3 :       break;
     864              :     }
     865           56 :     default:{
     866           56 :       res = GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query);
     867              :     }
     868              :   }
     869              : 
     870           59 :   return res;
     871              : }
     872              : 
     873              : /**
     874              :  * @brief Getter for the 'debug' property.
     875              :  */
     876              : static gboolean
     877            2 : gst_mqtt_src_get_debug (GstMqttSrc * self)
     878              : {
     879            2 :   return self->debug;
     880              : }
     881              : 
     882              : /**
     883              :  * @brief Setter for the 'debug' property.
     884              :  */
     885              : static void
     886            6 : gst_mqtt_src_set_debug (GstMqttSrc * self, const gboolean flag)
     887              : {
     888            6 :   self->debug = flag;
     889            6 : }
     890              : 
     891              : /**
     892              :  * @brief Getter for the 'is-live' property.
     893              :  */
     894              : static gboolean
     895            1 : gst_mqtt_src_get_is_live (GstMqttSrc * self)
     896              : {
     897            1 :   return self->is_live;
     898              : }
     899              : 
     900              : /**
     901              :  * @brief Setter for the 'is-live' property.
     902              :  */
     903              : static void
     904            6 : gst_mqtt_src_set_is_live (GstMqttSrc * self, const gboolean flag)
     905              : {
     906            6 :   self->is_live = flag;
     907            6 :   gst_base_src_set_live (GST_BASE_SRC (self), self->is_live);
     908            6 : }
     909              : 
     910              : /**
     911              :  * @brief Getter for the 'client-id' property.
     912              :  */
     913              : static gchar *
     914            1 : gst_mqtt_src_get_client_id (GstMqttSrc * self)
     915              : {
     916            1 :   return self->mqtt_client_id;
     917              : }
     918              : 
     919              : /**
     920              :  * @brief Setter for the 'client-id' property.
     921              :  */
     922              : static void
     923            1 : gst_mqtt_src_set_client_id (GstMqttSrc * self, const gchar * id)
     924              : {
     925            1 :   g_free (self->mqtt_client_id);
     926            1 :   self->mqtt_client_id = g_strdup (id);
     927            1 : }
     928              : 
     929              : /**
     930              :  * @brief Getter for the 'host' property.
     931              :  */
     932              : static gchar *
     933            1 : gst_mqtt_src_get_host_address (GstMqttSrc * self)
     934              : {
     935            1 :   return self->mqtt_host_address;
     936              : }
     937              : 
     938              : /**
     939              :  * @brief Setter for the 'host' property
     940              :  */
     941              : static void
     942            1 : gst_mqtt_src_set_host_address (GstMqttSrc * self, const gchar * addr)
     943              : {
     944              :   /**
     945              :    * @todo Handle the case where the addr is changed at runtime
     946              :    */
     947            1 :   g_free (self->mqtt_host_address);
     948            1 :   self->mqtt_host_address = g_strdup (addr);
     949            1 : }
     950              : 
     951              : /**
     952              :  * @brief Getter for the 'port' property.
     953              :  */
     954              : static gchar *
     955            1 : gst_mqtt_src_get_host_port (GstMqttSrc * self)
     956              : {
     957            1 :   return self->mqtt_host_port;
     958              : }
     959              : 
     960              : /**
     961              :  * @brief Setter for the 'port' property
     962              :  */
     963              : static void
     964            1 : gst_mqtt_src_set_host_port (GstMqttSrc * self, const gchar * port)
     965              : {
     966            1 :   g_free (self->mqtt_host_port);
     967            1 :   self->mqtt_host_port = g_strdup (port);
     968            1 : }
     969              : 
     970              : /**
     971              :  * @brief Getter for the 'sub-timeout' property
     972              :  */
     973              : static gint64
     974            2 : gst_mqtt_src_get_sub_timeout (GstMqttSrc * self)
     975              : {
     976            2 :   return self->mqtt_sub_timeout;
     977              : }
     978              : 
     979              : /**
     980              :  * @brief Setter for the 'sub-timeout' property
     981              :  */
     982              : static void
     983            6 : gst_mqtt_src_set_sub_timeout (GstMqttSrc * self, const gint64 t)
     984              : {
     985            6 :   self->mqtt_sub_timeout = t;
     986            6 : }
     987              : 
     988              : /**
     989              :  * @brief Getter for the 'sub-topic' property
     990              :  */
     991              : static gchar *
     992            1 : gst_mqtt_src_get_sub_topic (GstMqttSrc * self)
     993              : {
     994            1 :   return self->mqtt_topic;
     995              : }
     996              : 
     997              : /**
     998              :  * @brief Setter for the 'sub-topic' property
     999              :  */
    1000              : static void
    1001            6 : gst_mqtt_src_set_sub_topic (GstMqttSrc * self, const gchar * topic)
    1002              : {
    1003            6 :   g_free (self->mqtt_topic);
    1004            6 :   self->mqtt_topic = g_strdup (topic);
    1005            6 : }
    1006              : 
    1007              : /**
    1008              :  * @brief Getter for the 'cleansession' property.
    1009              :  */
    1010              : static gboolean
    1011            1 : gst_mqtt_src_get_opt_cleansession (GstMqttSrc * self)
    1012              : {
    1013            1 :   return self->mqtt_conn_opts.cleansession;
    1014              : }
    1015              : 
    1016              : /**
    1017              :  * @brief Setter for the 'cleansession' property.
    1018              :  */
    1019              : static void
    1020            1 : gst_mqtt_src_set_opt_cleansession (GstMqttSrc * self, const gboolean val)
    1021              : {
    1022            1 :   self->mqtt_conn_opts.cleansession = val;
    1023            1 : }
    1024              : 
    1025              : /**
    1026              :  * @brief Getter for the 'keep-alive-interval' property
    1027              :  */
    1028              : static gint
    1029            2 : gst_mqtt_src_get_opt_keep_alive_interval (GstMqttSrc * self)
    1030              : {
    1031            2 :   return self->mqtt_conn_opts.keepAliveInterval;
    1032              : }
    1033              : 
    1034              : /**
    1035              :  * @brief Setter for the 'keep-alive-interval' property
    1036              :  */
    1037              : static void
    1038            1 : gst_mqtt_src_set_opt_keep_alive_interval (GstMqttSrc * self, const gint num)
    1039              : {
    1040            1 :   self->mqtt_conn_opts.keepAliveInterval = num;
    1041            1 : }
    1042              : 
    1043              : /**
    1044              :  * @brief Getter for the 'mqtt-qos' property
    1045              :  */
    1046              : static gint
    1047            2 : gst_mqtt_src_get_mqtt_qos (GstMqttSrc * self)
    1048              : {
    1049            2 :   return self->mqtt_qos;
    1050              : }
    1051              : 
    1052              : /**
    1053              :  * @brief Setter for the 'mqtt-qos' property
    1054              :  */
    1055              : static void
    1056            1 : gst_mqtt_src_set_mqtt_qos (GstMqttSrc * self, const gint qos)
    1057              : {
    1058            1 :   self->mqtt_qos = qos;
    1059            1 : }
    1060              : 
    1061              : /**
    1062              :   * @brief A callback to handle the connection lost to the broker
    1063              :   */
    1064              : static void
    1065            0 : cb_mqtt_on_connection_lost (void *context, char *cause)
    1066              : {
    1067            0 :   GstMqttSrc *self = GST_MQTT_SRC_CAST (context);
    1068              :   UNUSED (cause);
    1069              : 
    1070            0 :   g_mutex_lock (&self->mqtt_src_mutex);
    1071            0 :   self->is_connected = FALSE;
    1072            0 :   self->is_subscribed = FALSE;
    1073            0 :   g_cond_broadcast (&self->mqtt_src_gcond);
    1074            0 :   if (!self->err) {
    1075            0 :     self->err = g_error_new (self->gquark_err_tag, EHOSTDOWN,
    1076              :         "Connection to the host (broker) has been lost: %s \n"
    1077              :         "\t\tfor detail, please check the log message of the broker",
    1078              :         g_strerror (EHOSTDOWN));
    1079              :   }
    1080            0 :   g_mutex_unlock (&self->mqtt_src_mutex);
    1081            0 : }
    1082              : 
    1083              : /**
    1084              :   * @brief A callback to handle the arrived message
    1085              :   */
    1086              : static int
    1087            6 : cb_mqtt_on_message_arrived (void *context, char *topic_name, int topic_len,
    1088              :     MQTTAsync_message * message)
    1089              : {
    1090            6 :   const int size = message->payloadlen;
    1091            6 :   guint8 *data = message->payload;
    1092              :   GstMQTTMessageHdr *mqtt_msg_hdr;
    1093              :   GstMapInfo hdr_map_info;
    1094              :   GstMemory *received_mem;
    1095              :   GstMemory *hdr_mem;
    1096              :   GstBuffer *buffer;
    1097              :   GstBaseSrc *basesrc;
    1098              :   GstMqttSrc *self;
    1099              :   GstClock *clock;
    1100              :   gsize offset;
    1101              :   guint i;
    1102              :   UNUSED (topic_name);
    1103              :   UNUSED (topic_len);
    1104              : 
    1105            6 :   self = GST_MQTT_SRC_CAST (context);
    1106            6 :   g_mutex_lock (&self->mqtt_src_mutex);
    1107            6 :   if (!self->is_subscribed) {
    1108            2 :     g_mutex_unlock (&self->mqtt_src_mutex);
    1109              : 
    1110            6 :     return TRUE;
    1111              :   }
    1112            4 :   g_mutex_unlock (&self->mqtt_src_mutex);
    1113              : 
    1114            4 :   basesrc = GST_BASE_SRC (self);
    1115            4 :   clock = gst_element_get_clock (GST_ELEMENT (self));
    1116            4 :   received_mem = gst_memory_new_wrapped (0, data, size, 0, size, message,
    1117              :       (GDestroyNotify) cb_memory_wrapped_destroy);
    1118            4 :   if (!received_mem) {
    1119            0 :     if (!self->err) {
    1120            0 :       self->err = g_error_new (self->gquark_err_tag, ENODATA,
    1121              :           "%s: failed to wrap the raw data of received message in GstMemory: %s",
    1122              :           __func__, g_strerror (ENODATA));
    1123              :     }
    1124            0 :     return TRUE;
    1125              :   }
    1126              : 
    1127            4 :   mqtt_msg_hdr = _extract_mqtt_msg_hdr_from (received_mem, &hdr_mem,
    1128              :       &hdr_map_info);
    1129            4 :   if (!mqtt_msg_hdr) {
    1130            0 :     if (!self->err) {
    1131            0 :       self->err = g_error_new (self->gquark_err_tag, ENODATA,
    1132              :           "%s: failed to extract header information from received message: %s",
    1133              :           __func__, g_strerror (ENODATA));
    1134              :     }
    1135            0 :     goto ret_unref_received_mem;
    1136              :   }
    1137              : 
    1138            4 :   if (!self->caps) {
    1139            3 :     self->caps = gst_caps_from_string (mqtt_msg_hdr->gst_caps_str);
    1140            3 :     gst_mqtt_src_renegotiate (basesrc);
    1141              :   } else {
    1142            1 :     GstCaps *recv_caps = gst_caps_from_string (mqtt_msg_hdr->gst_caps_str);
    1143              : 
    1144            1 :     if (recv_caps && !gst_caps_is_equal (self->caps, recv_caps)) {
    1145            1 :       gst_caps_replace (&self->caps, recv_caps);
    1146            1 :       gst_mqtt_src_renegotiate (basesrc);
    1147              :     } else {
    1148            0 :       gst_caps_replace (&recv_caps, NULL);
    1149              :     }
    1150              :   }
    1151              : 
    1152            4 :   buffer = gst_buffer_new ();
    1153            4 :   offset = GST_MQTT_LEN_MSG_HDR;
    1154            8 :   for (i = 0; i < mqtt_msg_hdr->num_mems; ++i) {
    1155              :     GstMemory *each_memory;
    1156              :     int each_size;
    1157              : 
    1158            4 :     each_size = mqtt_msg_hdr->size_mems[i];
    1159            4 :     each_memory = gst_memory_share (received_mem, offset, each_size);
    1160            4 :     gst_buffer_append_memory (buffer, each_memory);
    1161            4 :     offset += each_size;
    1162              :   }
    1163              : 
    1164              :   /** Timestamp synchronization */
    1165            4 :   if (self->debug) {
    1166            4 :     GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
    1167              : 
    1168            4 :     if (clock) {
    1169            4 :       GST_DEBUG_OBJECT (self,
    1170              :           "A message has been arrived at %" GST_TIME_FORMAT
    1171              :           " and queue length is %d",
    1172              :           GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
    1173              :           g_async_queue_length (self->aqueue));
    1174              : 
    1175            4 :       gst_object_unref (clock);
    1176              :     }
    1177              :   }
    1178            4 :   _put_timestamp_on_gst_buf (self, mqtt_msg_hdr, buffer);
    1179            4 :   g_async_queue_push (self->aqueue, buffer);
    1180              : 
    1181            4 :   gst_memory_unmap (hdr_mem, &hdr_map_info);
    1182            4 :   gst_memory_unref (hdr_mem);
    1183              : 
    1184            4 : ret_unref_received_mem:
    1185            4 :   gst_memory_unref (received_mem);
    1186              : 
    1187            4 :   return TRUE;
    1188              : }
    1189              : 
    1190              : /**
    1191              :   * @brief A callback invoked when destroying the GstMemory which wrapped the arrived message
    1192              :   */
    1193              : static void
    1194            4 : cb_memory_wrapped_destroy (void *p)
    1195              : {
    1196            4 :   MQTTAsync_message *msg = p;
    1197              : 
    1198            4 :   MQTTAsync_freeMessage (&msg);
    1199            4 : }
    1200              : 
    1201              : /**
    1202              :   * @brief A callback invoked when the connection is established
    1203              :   */
    1204              : static void
    1205            5 : cb_mqtt_on_connect (void *context, MQTTAsync_successData * response)
    1206              : {
    1207            5 :   GstMqttSrc *self = GST_MQTT_SRC (context);
    1208            5 :   GstBaseSrc *basesrc = GST_BASE_SRC (self);
    1209              :   int ret;
    1210              :   UNUSED (response);
    1211              : 
    1212            5 :   g_mutex_lock (&self->mqtt_src_mutex);
    1213            5 :   self->is_connected = TRUE;
    1214            5 :   g_cond_broadcast (&self->mqtt_src_gcond);
    1215            5 :   g_mutex_unlock (&self->mqtt_src_mutex);
    1216              : 
    1217              :   /** GstFlowReturn is an enum type. It is possible to use int here */
    1218            5 :   if (gst_base_src_is_async (basesrc) &&
    1219            0 :       (ret = gst_base_src_start_wait (basesrc)) != GST_FLOW_OK) {
    1220            0 :     g_mutex_lock (&self->mqtt_src_mutex);
    1221            0 :     self->err = g_error_new (self->gquark_err_tag, ret,
    1222              :         "%s: the virtual method, start (), in the GstBaseSrc class fails with return code %d",
    1223              :         __func__, ret);
    1224            0 :     g_cond_broadcast (&self->mqtt_src_gcond);
    1225            0 :     g_mutex_unlock (&self->mqtt_src_mutex);
    1226            0 :     return;
    1227              :   }
    1228              : 
    1229            5 :   if (!_subscribe (self)) {
    1230            2 :     GST_ERROR_OBJECT (self, "Failed to subscribe to %s", self->mqtt_topic);
    1231              :   }
    1232              : }
    1233              : 
    1234              : /**
    1235              :   * @brief A callback invoked when it is failed to connect to the broker
    1236              :   */
    1237              : static void
    1238            0 : cb_mqtt_on_connect_failure (void *context, MQTTAsync_failureData * response)
    1239              : {
    1240            0 :   GstMqttSrc *self = GST_MQTT_SRC (context);
    1241              : 
    1242            0 :   g_mutex_lock (&self->mqtt_src_mutex);
    1243            0 :   self->is_connected = FALSE;
    1244              : 
    1245            0 :   if (!self->err) {
    1246            0 :     self->err = g_error_new (self->gquark_err_tag, response->code,
    1247              :         "%s: failed to connect to the broker: %s", __func__, response->message);
    1248              :   }
    1249            0 :   g_cond_broadcast (&self->mqtt_src_gcond);
    1250            0 :   g_mutex_unlock (&self->mqtt_src_mutex);
    1251            0 : }
    1252              : 
    1253              : /**
    1254              :  * @brief MQTTAsync_responseOptions's onSuccess callback for MQTTAsync_subscribe ()
    1255              :  */
    1256              : static void
    1257            3 : cb_mqtt_on_subscribe (void *context, MQTTAsync_successData * response)
    1258              : {
    1259            3 :   GstMqttSrc *self = GST_MQTT_SRC (context);
    1260              :   UNUSED (response);
    1261              : 
    1262            3 :   g_mutex_lock (&self->mqtt_src_mutex);
    1263            3 :   self->is_subscribed = TRUE;
    1264            3 :   g_cond_broadcast (&self->mqtt_src_gcond);
    1265            3 :   g_mutex_unlock (&self->mqtt_src_mutex);
    1266            3 : }
    1267              : 
    1268              : /**
    1269              :  * @brief MQTTAsync_responseOptions's onFailure callback for MQTTAsync_subscribe ()
    1270              :  */
    1271              : static void
    1272            2 : cb_mqtt_on_subscribe_failure (void *context, MQTTAsync_failureData * response)
    1273              : {
    1274            2 :   GstMqttSrc *self = GST_MQTT_SRC (context);
    1275              : 
    1276            2 :   g_mutex_lock (&self->mqtt_src_mutex);
    1277            2 :   if (!self->err) {
    1278            2 :     self->err = g_error_new (self->gquark_err_tag, response->code,
    1279              :         "%s: failed to subscribe the given topic, %s: %s", __func__,
    1280              :         self->mqtt_topic, response->message);
    1281              :   }
    1282            2 :   g_cond_broadcast (&self->mqtt_src_gcond);
    1283            2 :   g_mutex_unlock (&self->mqtt_src_mutex);
    1284            2 : }
    1285              : 
    1286              : /**
    1287              :  * @brief MQTTAsync_responseOptions's onSuccess callback for MQTTAsync_unsubscribe ()
    1288              :  */
    1289              : static void
    1290            3 : cb_mqtt_on_unsubscribe (void *context, MQTTAsync_successData * response)
    1291              : {
    1292            3 :   GstMqttSrc *self = GST_MQTT_SRC (context);
    1293              :   UNUSED (response);
    1294              : 
    1295            3 :   g_mutex_lock (&self->mqtt_src_mutex);
    1296            3 :   self->is_subscribed = FALSE;
    1297            3 :   g_cond_broadcast (&self->mqtt_src_gcond);
    1298            3 :   g_mutex_unlock (&self->mqtt_src_mutex);
    1299            3 : }
    1300              : 
    1301              : /**
    1302              :  * @brief MQTTAsync_responseOptions's onFailure callback for MQTTAsync_unsubscribe ()
    1303              :  */
    1304              : static void
    1305            0 : cb_mqtt_on_unsubscribe_failure (void *context, MQTTAsync_failureData * response)
    1306              : {
    1307            0 :   GstMqttSrc *self = GST_MQTT_SRC (context);
    1308              : 
    1309            0 :   g_mutex_lock (&self->mqtt_src_mutex);
    1310            0 :   if (!self->err) {
    1311            0 :     self->err = g_error_new (self->gquark_err_tag, response->code,
    1312              :         "%s: failed to unsubscribe the given topic, %s: %s", __func__,
    1313              :         self->mqtt_topic, response->message);
    1314              :   }
    1315            0 :   g_cond_broadcast (&self->mqtt_src_gcond);
    1316            0 :   g_mutex_unlock (&self->mqtt_src_mutex);
    1317            0 : }
    1318              : 
    1319              : /**
    1320              :  * @brief A helper function to properly invoke MQTTAsync_subscribe ()
    1321              :  */
    1322              : static gboolean
    1323            5 : _subscribe (GstMqttSrc * self)
    1324              : {
    1325            5 :   MQTTAsync_responseOptions opts = self->mqtt_respn_opts;
    1326              :   int mqttasync_ret;
    1327              : 
    1328            5 :   opts.onSuccess = cb_mqtt_on_subscribe;
    1329            5 :   opts.onFailure = cb_mqtt_on_subscribe_failure;
    1330            5 :   opts.subscribeOptions.retainHandling = 1;
    1331              : 
    1332           10 :   mqttasync_ret = MQTTAsync_subscribe (self->mqtt_client_handle,
    1333            5 :       self->mqtt_topic, self->mqtt_qos, &opts);
    1334            5 :   if (mqttasync_ret != MQTTASYNC_SUCCESS)
    1335            5 :     return FALSE;
    1336            3 :   return TRUE;
    1337              : }
    1338              : 
    1339              : /**
    1340              :  * @brief A wrapper function that calls MQTTAsync_unsubscribe ()
    1341              :  */
    1342              : static gboolean
    1343            3 : _unsubscribe (GstMqttSrc * self)
    1344              : {
    1345            3 :   MQTTAsync_responseOptions opts = self->mqtt_respn_opts;
    1346              :   int mqttasync_ret;
    1347              : 
    1348            3 :   opts.onSuccess = cb_mqtt_on_unsubscribe;
    1349            3 :   opts.onFailure = cb_mqtt_on_unsubscribe_failure;
    1350              : 
    1351            6 :   mqttasync_ret = MQTTAsync_unsubscribe (self->mqtt_client_handle,
    1352            3 :       self->mqtt_topic, &opts);
    1353            3 :   if (mqttasync_ret != MQTTASYNC_SUCCESS)
    1354            3 :     return FALSE;
    1355            3 :   return TRUE;
    1356              : }
    1357              : 
    1358              : /**
    1359              :  * @brief A utility function to extract header information from a received message
    1360              :  */
    1361              : static GstMQTTMessageHdr *
    1362            4 : _extract_mqtt_msg_hdr_from (GstMemory * mem, GstMemory ** hdr_mem,
    1363              :     GstMapInfo * hdr_map_info)
    1364              : {
    1365            4 :   *hdr_mem = gst_memory_share (mem, 0, GST_MQTT_LEN_MSG_HDR);
    1366            4 :   g_return_val_if_fail (*hdr_mem != NULL, NULL);
    1367              : 
    1368            4 :   if (!gst_memory_map (*hdr_mem, hdr_map_info, GST_MAP_READ)) {
    1369            0 :     gst_memory_unref (*hdr_mem);
    1370            0 :     return NULL;
    1371              :   }
    1372              : 
    1373            4 :   return (GstMQTTMessageHdr *) hdr_map_info->data;
    1374              : }
    1375              : 
    1376              : /**
    1377              :   * @brief A utility function to put the timestamp information
    1378              :   *        onto a GstBuffer-typed buffer using the given packet header
    1379              :   */
    1380              : static void
    1381            4 : _put_timestamp_on_gst_buf (GstMqttSrc * self, GstMQTTMessageHdr * hdr,
    1382              :     GstBuffer * buf)
    1383              : {
    1384            4 :   gint64 diff_base_epoch = hdr->base_time_epoch - self->base_time_epoch;
    1385              : 
    1386            4 :   buf->pts = GST_CLOCK_TIME_NONE;
    1387            4 :   buf->dts = GST_CLOCK_TIME_NONE;
    1388            4 :   buf->duration = GST_CLOCK_TIME_NONE;
    1389              : 
    1390            4 :   if (hdr->sent_time_epoch < self->base_time_epoch)
    1391            0 :     return;
    1392              : 
    1393            4 :   if (((GstClockTimeDiff) hdr->pts + diff_base_epoch) < 0)
    1394            0 :     return;
    1395              : 
    1396            4 :   if (hdr->pts != GST_CLOCK_TIME_NONE) {
    1397            4 :     buf->pts = hdr->pts + diff_base_epoch;
    1398              :   }
    1399              : 
    1400            4 :   if (hdr->dts != GST_CLOCK_TIME_NONE) {
    1401            4 :     buf->dts = hdr->dts + diff_base_epoch;
    1402              :   }
    1403              : 
    1404            4 :   buf->duration = hdr->duration;
    1405              : 
    1406            4 :   if (self->debug) {
    1407            4 :     GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
    1408              :     GstClock *clock;
    1409              : 
    1410            4 :     clock = gst_element_get_clock (GST_ELEMENT (self));
    1411              : 
    1412            4 :     if (clock) {
    1413            4 :       GST_DEBUG_OBJECT (self,
    1414              :           "%s diff %" GST_STIME_FORMAT " now %" GST_TIME_FORMAT " ts (%"
    1415              :           GST_TIME_FORMAT " -> %" GST_TIME_FORMAT ")", self->mqtt_topic,
    1416              :           GST_STIME_ARGS (diff_base_epoch),
    1417              :           GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
    1418              :           GST_TIME_ARGS (hdr->pts), GST_TIME_ARGS (buf->pts));
    1419              : 
    1420            4 :       gst_object_unref (clock);
    1421              :     }
    1422              :   }
    1423              : }
        

Generated by: LCOV version 2.0-1