LCOV - code coverage report
Current view: top level - nnstreamer-2.4.2/gst/mqtt - mqttsink.c (source / functions) Coverage Total Hit
Test: nnstreamer 2.4.2-0 nnstreamer/nnstreamer.git#5d55fc62547faa02e861af5ef93cc1c89800934a Lines: 80.9 % 612 495
Test Date: 2024-09-25 09:08:39 Functions: 90.6 % 53 48

            Line data    Source code
       1              : /* SPDX-License-Identifier: LGPL-2.1-only */
       2              : /**
       3              :  * Copyright (C) 2021 Wook Song <wook16.song@samsung.com>
       4              :  */
       5              : /**
       6              :  * @file    mqttsink.c
       7              :  * @date    01 Apr 2021
       8              :  * @brief   Publish incoming data streams as a MQTT topic
       9              :  * @see     https://github.com/nnstreamer/nnstreamer
      10              :  * @author  Wook Song <wook16.song@samsung.com>
      11              :  * @bug     No known bugs except for NYI items
      12              :  */
      13              : 
      14              : #ifdef HAVE_CONFIG_H
      15              : #include <config.h>
      16              : #endif
      17              : 
      18              : #include <stdlib.h>
      19              : #include <string.h>
      20              : 
      21              : #ifdef G_OS_WIN32
      22              : #include <process.h>
      23              : #else
      24              : #include <sys/types.h>
      25              : #include <unistd.h>
      26              : #endif
      27              : 
      28              : #include <gst/base/gstbasesink.h>
      29              : #include <MQTTAsync.h>
      30              : #include <nnstreamer_util.h>
      31              : 
      32              : #include "mqttsink.h"
      33              : #include "ntputil.h"
      34              : 
      35              : static GstStaticPadTemplate sink_pad_template = GST_STATIC_PAD_TEMPLATE ("sink",
      36              :     GST_PAD_SINK, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
      37              : 
      38              : #define gst_mqtt_sink_parent_class parent_class
      39          230 : G_DEFINE_TYPE (GstMqttSink, gst_mqtt_sink, GST_TYPE_BASE_SINK);
      40              : 
      41              : GST_DEBUG_CATEGORY_STATIC (gst_mqtt_sink_debug);
      42              : #define GST_CAT_DEFAULT gst_mqtt_sink_debug
      43              : 
      44              : enum
      45              : {
      46              :   PROP_0,
      47              : 
      48              :   PROP_DEBUG,
      49              :   PROP_MQTT_CLIENT_ID,
      50              :   PROP_MQTT_HOST_ADDRESS,
      51              :   PROP_MQTT_HOST_PORT,
      52              :   PROP_MQTT_PUB_TOPIC,
      53              :   PROP_MQTT_PUB_WAIT_TIMEOUT,
      54              :   PROP_MQTT_OPT_CLEANSESSION,
      55              :   PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL,
      56              :   PROP_NUM_BUFFERS,
      57              :   PROP_MAX_MSG_BUF_SIZE,
      58              :   PROP_MQTT_QOS,
      59              :   PROP_MQTT_NTP_SYNC,
      60              :   PROP_MQTT_NTP_SRVS,
      61              : 
      62              :   PROP_LAST
      63              : };
      64              : 
      65              : enum
      66              : {
      67              :   DEFAULT_DEBUG = FALSE,
      68              :   DEFAULT_NUM_BUFFERS = -1,
      69              :   DEFAULT_QOS = TRUE,
      70              :   DEFAULT_SYNC = FALSE,
      71              :   DEFAULT_MQTT_OPT_CLEANSESSION = TRUE,
      72              :   DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL = 60,    /* 1 minute */
      73              :   DEFAULT_MQTT_DISCONNECT_TIMEOUT = G_TIME_SPAN_SECOND * 3,     /* 3 secs */
      74              :   DEFAULT_MQTT_PUB_WAIT_TIMEOUT = 1,    /* 1 secs */
      75              :   DEFAULT_MAX_MSG_BUF_SIZE = 0, /* Buffer size is not fixed */
      76              :   DEFAULT_MQTT_QOS = 0,         /* fire and forget */
      77              :   DEFAULT_MQTT_NTP_SYNC = FALSE,
      78              :   MAX_LEN_PROP_NTP_SRVS = 4096,
      79              : };
      80              : 
      81              : static guint8 sink_client_id = 0;
      82              : static const gchar DEFAULT_MQTT_HOST_ADDRESS[] = "127.0.0.1";
      83              : static const gchar DEFAULT_MQTT_HOST_PORT[] = "1883";
      84              : static const gchar TAG_ERR_MQTTSINK[] = "ERROR: MQTTSink";
      85              : static const gchar DEFAULT_MQTT_CLIENT_ID[] = "$HOST_$PID_^[0-9][0-9]?$|^255$";
      86              : static const gchar DEFAULT_MQTT_CLIENT_ID_FORMAT[] = "%s_%u_sink%u";
      87              : static const gchar DEFAULT_MQTT_PUB_TOPIC[] = "$client-id/topic";
      88              : static const gchar DEFAULT_MQTT_PUB_TOPIC_FORMAT[] = "%s/topic";
      89              : static const gchar DEFAULT_MQTT_NTP_SERVERS[] = "pool.ntp.org:123";
      90              : 
      91              : /** Function prototype declarations */
      92              : static void
      93              : gst_mqtt_sink_set_property (GObject * object, guint prop_id,
      94              :     const GValue * value, GParamSpec * pspec);
      95              : static void
      96              : gst_mqtt_sink_get_property (GObject * object, guint prop_id,
      97              :     GValue * value, GParamSpec * pspec);
      98              : static void gst_mqtt_sink_class_finalize (GObject * object);
      99              : 
     100              : static GstStateChangeReturn
     101              : gst_mqtt_sink_change_state (GstElement * element, GstStateChange transition);
     102              : 
     103              : static gboolean gst_mqtt_sink_start (GstBaseSink * basesink);
     104              : static gboolean gst_mqtt_sink_stop (GstBaseSink * basesink);
     105              : static gboolean gst_mqtt_sink_query (GstBaseSink * basesink, GstQuery * query);
     106              : static GstFlowReturn
     107              : gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * buffer);
     108              : static GstFlowReturn
     109              : gst_mqtt_sink_render_list (GstBaseSink * basesink, GstBufferList * list);
     110              : static gboolean gst_mqtt_sink_event (GstBaseSink * basesink, GstEvent * event);
     111              : static gboolean gst_mqtt_sink_set_caps (GstBaseSink * basesink, GstCaps * caps);
     112              : 
     113              : static gboolean gst_mqtt_sink_get_debug (GstMqttSink * self);
     114              : static void gst_mqtt_sink_set_debug (GstMqttSink * self, const gboolean flag);
     115              : static gchar *gst_mqtt_sink_get_client_id (GstMqttSink * self);
     116              : static void gst_mqtt_sink_set_client_id (GstMqttSink * self, const gchar * id);
     117              : static gchar *gst_mqtt_sink_get_host_address (GstMqttSink * self);
     118              : static void gst_mqtt_sink_set_host_address (GstMqttSink * self,
     119              :     const gchar * addr);
     120              : static gchar *gst_mqtt_sink_get_host_port (GstMqttSink * self);
     121              : static void gst_mqtt_sink_set_host_port (GstMqttSink * self,
     122              :     const gchar * port);
     123              : static gchar *gst_mqtt_sink_get_pub_topic (GstMqttSink * self);
     124              : static void gst_mqtt_sink_set_pub_topic (GstMqttSink * self,
     125              :     const gchar * topic);
     126              : static gulong gst_mqtt_sink_get_pub_wait_timeout (GstMqttSink * self);
     127              : static void gst_mqtt_sink_set_pub_wait_timeout (GstMqttSink * self,
     128              :     const gulong to);
     129              : static gboolean gst_mqtt_sink_get_opt_cleansession (GstMqttSink * self);
     130              : static void gst_mqtt_sink_set_opt_cleansession (GstMqttSink * self,
     131              :     const gboolean val);
     132              : static gint gst_mqtt_sink_get_opt_keep_alive_interval (GstMqttSink * self);
     133              : static void gst_mqtt_sink_set_opt_keep_alive_interval (GstMqttSink * self,
     134              :     const gint num);
     135              : 
     136              : static gsize gst_mqtt_sink_get_max_msg_buf_size (GstMqttSink * self);
     137              : static void gst_mqtt_sink_set_max_msg_buf_size (GstMqttSink * self,
     138              :     const gsize size);
     139              : static gint gst_mqtt_sink_get_num_buffers (GstMqttSink * self);
     140              : static void gst_mqtt_sink_set_num_buffers (GstMqttSink * self, const gint num);
     141              : static gint gst_mqtt_sink_get_mqtt_qos (GstMqttSink * self);
     142              : static void gst_mqtt_sink_set_mqtt_qos (GstMqttSink * self, const gint qos);
     143              : static gboolean gst_mqtt_sink_get_mqtt_ntp_sync (GstMqttSink * self);
     144              : static void gst_mqtt_sink_set_mqtt_ntp_sync (GstMqttSink * self,
     145              :     const gboolean flag);
     146              : static gchar *gst_mqtt_sink_get_mqtt_ntp_srvs (GstMqttSink * self);
     147              : static void gst_mqtt_sink_set_mqtt_ntp_srvs (GstMqttSink * self,
     148              :     const gchar * pairs);
     149              : 
     150              : static void cb_mqtt_on_connect (void *context,
     151              :     MQTTAsync_successData * response);
     152              : static void cb_mqtt_on_connect_failure (void *context,
     153              :     MQTTAsync_failureData * response);
     154              : static void cb_mqtt_on_disconnect (void *context,
     155              :     MQTTAsync_successData * response);
     156              : static void cb_mqtt_on_disconnect_failure (void *context,
     157              :     MQTTAsync_failureData * response);
     158              : static void cb_mqtt_on_delivery_complete (void *context, MQTTAsync_token token);
     159              : static void cb_mqtt_on_connection_lost (void *context, char *cause);
     160              : static int cb_mqtt_on_message_arrived (void *context, char *topicName,
     161              :     int topicLen, MQTTAsync_message * message);
     162              : static void cb_mqtt_on_send_success (void *context,
     163              :     MQTTAsync_successData * response);
     164              : static void cb_mqtt_on_send_failure (void *context,
     165              :     MQTTAsync_failureData * response);
     166              : 
     167              : /**
     168              :  * @brief Initialize GstMqttSink object
     169              :  */
     170              : static void
     171           10 : gst_mqtt_sink_init (GstMqttSink * self)
     172              : {
     173           10 :   GstBaseSink *basesink = GST_BASE_SINK (self);
     174           10 :   MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
     175           10 :   MQTTAsync_responseOptions respn_opts = MQTTAsync_responseOptions_initializer;
     176              : 
     177              :   /** init MQTT related variables */
     178           10 :   self->mqtt_client_handle = NULL;
     179           10 :   self->mqtt_conn_opts = conn_opts;
     180           10 :   self->mqtt_conn_opts.onSuccess = cb_mqtt_on_connect;
     181           10 :   self->mqtt_conn_opts.onFailure = cb_mqtt_on_connect_failure;
     182           10 :   self->mqtt_conn_opts.context = self;
     183           10 :   self->mqtt_respn_opts = respn_opts;
     184           10 :   self->mqtt_respn_opts.onSuccess = cb_mqtt_on_send_success;
     185           10 :   self->mqtt_respn_opts.onFailure = cb_mqtt_on_send_failure;
     186           10 :   self->mqtt_respn_opts.context = self;
     187              : 
     188              :   /** init private variables */
     189           10 :   self->mqtt_sink_state = SINK_INITIALIZING;
     190           10 :   self->err = NULL;
     191           10 :   self->gquark_err_tag = g_quark_from_string (TAG_ERR_MQTTSINK);
     192           10 :   g_mutex_init (&self->mqtt_sink_mutex);
     193           10 :   g_cond_init (&self->mqtt_sink_gcond);
     194           10 :   self->mqtt_msg_buf = NULL;
     195           10 :   self->mqtt_msg_buf_size = 0;
     196           10 :   memset (&self->mqtt_msg_hdr, 0x0, sizeof (self->mqtt_msg_hdr));
     197           10 :   self->base_time_epoch = GST_CLOCK_TIME_NONE;
     198           10 :   self->in_caps = NULL;
     199              : 
     200              :   /** init mqttsink properties */
     201           10 :   self->debug = DEFAULT_DEBUG;
     202           10 :   self->num_buffers = DEFAULT_NUM_BUFFERS;
     203           10 :   self->max_msg_buf_size = DEFAULT_MAX_MSG_BUF_SIZE;
     204           10 :   self->mqtt_client_id = g_strdup (DEFAULT_MQTT_CLIENT_ID);
     205           10 :   self->mqtt_host_address = g_strdup (DEFAULT_MQTT_HOST_ADDRESS);
     206           10 :   self->mqtt_host_port = g_strdup (DEFAULT_MQTT_HOST_PORT);
     207           10 :   self->mqtt_topic = g_strdup (DEFAULT_MQTT_PUB_TOPIC);
     208           10 :   self->mqtt_pub_wait_timeout = DEFAULT_MQTT_PUB_WAIT_TIMEOUT;
     209           10 :   self->mqtt_conn_opts.cleansession = DEFAULT_MQTT_OPT_CLEANSESSION;
     210           10 :   self->mqtt_conn_opts.keepAliveInterval = DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL;
     211           10 :   self->mqtt_qos = DEFAULT_MQTT_QOS;
     212           10 :   self->mqtt_ntp_sync = DEFAULT_MQTT_NTP_SYNC;
     213           10 :   self->mqtt_ntp_srvs = g_strdup (DEFAULT_MQTT_NTP_SERVERS);
     214           10 :   self->mqtt_ntp_hnames = NULL;
     215           10 :   self->mqtt_ntp_ports = NULL;
     216           10 :   self->mqtt_ntp_num_srvs = 0;
     217           10 :   self->get_epoch_func = default_mqtt_get_unix_epoch;
     218           10 :   self->is_connected = FALSE;
     219              : 
     220              :   /** init basesink properties */
     221           10 :   gst_base_sink_set_qos_enabled (basesink, DEFAULT_QOS);
     222           10 :   gst_base_sink_set_sync (basesink, DEFAULT_SYNC);
     223           10 : }
     224              : 
     225              : /**
     226              :  * @brief Initialize GstMqttSinkClass object
     227              :  */
     228              : static void
     229           22 : gst_mqtt_sink_class_init (GstMqttSinkClass * klass)
     230              : {
     231           22 :   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
     232           22 :   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
     233           22 :   GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
     234              : 
     235           22 :   GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_MQTT_ELEM_NAME_SINK, 0,
     236              :       "MQTT sink");
     237              : 
     238           22 :   gobject_class->set_property = gst_mqtt_sink_set_property;
     239           22 :   gobject_class->get_property = gst_mqtt_sink_get_property;
     240           22 :   gobject_class->finalize = gst_mqtt_sink_class_finalize;
     241              : 
     242           22 :   g_object_class_install_property (gobject_class, PROP_DEBUG,
     243              :       g_param_spec_boolean ("debug", "Debug",
     244              :           "Produce extra verbose output for debug purpose", DEFAULT_DEBUG,
     245              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     246              : 
     247           22 :   g_object_class_install_property (gobject_class, PROP_MQTT_CLIENT_ID,
     248              :       g_param_spec_string ("client-id", "Client ID",
     249              :           "The client identifier passed to the server (broker).", NULL,
     250              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     251              : 
     252           22 :   g_object_class_install_property (gobject_class, PROP_MQTT_HOST_ADDRESS,
     253              :       g_param_spec_string ("host", "Host", "Host (broker) to connect to",
     254              :           DEFAULT_MQTT_HOST_ADDRESS,
     255              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     256              : 
     257           22 :   g_object_class_install_property (gobject_class, PROP_MQTT_HOST_PORT,
     258              :       g_param_spec_string ("port", "Port",
     259              :           "Network port of host (broker) to connect to", DEFAULT_MQTT_HOST_PORT,
     260              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     261              : 
     262           22 :   g_object_class_install_property (gobject_class, PROP_MQTT_NTP_SYNC,
     263              :       g_param_spec_boolean ("ntp-sync", "NTP Synchronization",
     264              :           "Synchronize received streams to the NTP clock",
     265              :           DEFAULT_MQTT_NTP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     266              : 
     267           22 :   g_object_class_install_property (gobject_class, PROP_MQTT_NTP_SRVS,
     268              :       g_param_spec_string ("ntp-srvs", "NTP Server Host Name and Port Pairs",
     269              :           "NTP Servers' HOST_NAME:PORT pairs to use (valid only if ntp-sync is true)\n"
     270              :           "\t\t\tUse ',' to separate each pair if there are more pairs than one",
     271              :           DEFAULT_MQTT_NTP_SERVERS,
     272              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     273              : 
     274           22 :   g_object_class_install_property (gobject_class, PROP_MQTT_PUB_TOPIC,
     275              :       g_param_spec_string ("pub-topic", "Topic to Publish",
     276              :           "The topic's name to publish", NULL,
     277              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     278              : 
     279           22 :   g_object_class_install_property (gobject_class,
     280              :       PROP_MQTT_PUB_WAIT_TIMEOUT,
     281              :       g_param_spec_ulong ("pub-wait-timeout", "Timeout for Publish a message",
     282              :           "Timeout for execution of the main thread with completed publication of a message",
     283              :           1UL, G_MAXULONG, DEFAULT_MQTT_PUB_WAIT_TIMEOUT,
     284              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     285              : 
     286           22 :   g_object_class_install_property (gobject_class, PROP_MQTT_OPT_CLEANSESSION,
     287              :       g_param_spec_boolean ("cleansession", "Cleansession",
     288              :           "When it is TRUE, the state information is discarded at connect and disconnect.",
     289              :           DEFAULT_MQTT_OPT_CLEANSESSION,
     290              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     291              : 
     292           22 :   g_object_class_install_property (gobject_class,
     293              :       PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL,
     294              :       g_param_spec_int ("keep-alive-interval", "Keep Alive Interval",
     295              :           "The maximum time (in seconds) that should pass without communication between the client and the server (broker)",
     296              :           1, G_MAXINT32, DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL,
     297              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     298              : 
     299           22 :   g_object_class_install_property (gobject_class, PROP_MAX_MSG_BUF_SIZE,
     300              :       g_param_spec_ulong ("max-buffer-size",
     301              :           "The maximum size of a message buffer",
     302              :           "The maximum size in bytes of a message buffer (0 = dynamic buffer size)",
     303              :           0, G_MAXULONG, DEFAULT_MAX_MSG_BUF_SIZE,
     304              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     305              : 
     306           22 :   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
     307              :       g_param_spec_int ("num-buffers", "Num Buffers",
     308              :           "Number of (remaining) buffers to accept until sending EOS event (-1 = no limit)",
     309              :           -1, G_MAXINT32, DEFAULT_NUM_BUFFERS,
     310              :           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     311              : 
     312           22 :   g_object_class_install_property (gobject_class, PROP_MQTT_QOS,
     313              :       g_param_spec_int ("mqtt-qos", "mqtt QoS level",
     314              :           "The QoS level of MQTT.\n"
     315              :           "\t\t\t  0: At most once\n"
     316              :           "\t\t\t  1: At least once\n"
     317              :           "\t\t\t  2: Exactly once\n"
     318              :           "\t\t\tsee also: https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/qos.html",
     319              :           0, 2, DEFAULT_MQTT_QOS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
     320              : 
     321           22 :   gstelement_class->change_state = gst_mqtt_sink_change_state;
     322              : 
     323           22 :   gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_mqtt_sink_start);
     324           22 :   gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_mqtt_sink_stop);
     325           22 :   gstbasesink_class->query = GST_DEBUG_FUNCPTR (gst_mqtt_sink_query);
     326           22 :   gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_mqtt_sink_render);
     327           22 :   gstbasesink_class->render_list =
     328           22 :       GST_DEBUG_FUNCPTR (gst_mqtt_sink_render_list);
     329           22 :   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_mqtt_sink_event);
     330           22 :   gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_mqtt_sink_set_caps);
     331              : 
     332           22 :   gst_element_class_set_static_metadata (gstelement_class,
     333              :       "MQTT sink", "Sink/MQTT",
     334              :       "Publish incoming data streams as a MQTT topic",
     335              :       "Wook Song <wook16.song@samsung.com>");
     336           22 :   gst_element_class_add_static_pad_template (gstelement_class,
     337              :       &sink_pad_template);
     338           22 : }
     339              : 
     340              : /**
     341              :  * @brief The setter for the mqttsink's properties
     342              :  */
     343              : static void
     344           26 : gst_mqtt_sink_set_property (GObject * object, guint prop_id,
     345              :     const GValue * value, GParamSpec * pspec)
     346              : {
     347           26 :   GstMqttSink *self = GST_MQTT_SINK (object);
     348              : 
     349           26 :   switch (prop_id) {
     350            7 :     case PROP_DEBUG:
     351            7 :       gst_mqtt_sink_set_debug (self, g_value_get_boolean (value));
     352            7 :       break;
     353            1 :     case PROP_MQTT_CLIENT_ID:
     354            1 :       gst_mqtt_sink_set_client_id (self, g_value_get_string (value));
     355            1 :       break;
     356            2 :     case PROP_MQTT_HOST_ADDRESS:
     357            2 :       gst_mqtt_sink_set_host_address (self, g_value_get_string (value));
     358            2 :       break;
     359            2 :     case PROP_MQTT_HOST_PORT:
     360            2 :       gst_mqtt_sink_set_host_port (self, g_value_get_string (value));
     361            2 :       break;
     362            3 :     case PROP_MQTT_PUB_TOPIC:
     363            3 :       gst_mqtt_sink_set_pub_topic (self, g_value_get_string (value));
     364            3 :       break;
     365            1 :     case PROP_MQTT_PUB_WAIT_TIMEOUT:
     366            1 :       gst_mqtt_sink_set_pub_wait_timeout (self, g_value_get_ulong (value));
     367            1 :       break;
     368            1 :     case PROP_MQTT_OPT_CLEANSESSION:
     369            1 :       gst_mqtt_sink_set_opt_cleansession (self, g_value_get_boolean (value));
     370            1 :       break;
     371            1 :     case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
     372            1 :       gst_mqtt_sink_set_opt_keep_alive_interval (self, g_value_get_int (value));
     373            1 :       break;
     374            1 :     case PROP_MAX_MSG_BUF_SIZE:
     375            1 :       gst_mqtt_sink_set_max_msg_buf_size (self, g_value_get_ulong (value));
     376            1 :       break;
     377            3 :     case PROP_NUM_BUFFERS:
     378            3 :       gst_mqtt_sink_set_num_buffers (self, g_value_get_int (value));
     379            3 :       break;
     380            1 :     case PROP_MQTT_QOS:
     381            1 :       gst_mqtt_sink_set_mqtt_qos (self, g_value_get_int (value));
     382            1 :       break;
     383            2 :     case PROP_MQTT_NTP_SYNC:
     384            2 :       gst_mqtt_sink_set_mqtt_ntp_sync (self, g_value_get_boolean (value));
     385            2 :       break;
     386            1 :     case PROP_MQTT_NTP_SRVS:
     387            1 :       gst_mqtt_sink_set_mqtt_ntp_srvs (self, g_value_get_string (value));
     388            1 :       break;
     389            0 :     default:
     390            0 :       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
     391            0 :       break;
     392              :   }
     393           26 : }
     394              : 
     395              : /**
     396              :  * @brief The getter for the mqttsink's properties
     397              :  */
     398              : static void
     399           18 : gst_mqtt_sink_get_property (GObject * object, guint prop_id,
     400              :     GValue * value, GParamSpec * pspec)
     401              : {
     402           18 :   GstMqttSink *self = GST_MQTT_SINK (object);
     403              : 
     404           18 :   switch (prop_id) {
     405            2 :     case PROP_DEBUG:
     406            2 :       g_value_set_boolean (value, gst_mqtt_sink_get_debug (self));
     407            2 :       break;
     408            1 :     case PROP_MQTT_CLIENT_ID:
     409            1 :       g_value_set_string (value, gst_mqtt_sink_get_client_id (self));
     410            1 :       break;
     411            1 :     case PROP_MQTT_HOST_ADDRESS:
     412            1 :       g_value_set_string (value, gst_mqtt_sink_get_host_address (self));
     413            1 :       break;
     414            1 :     case PROP_MQTT_HOST_PORT:
     415            1 :       g_value_set_string (value, gst_mqtt_sink_get_host_port (self));
     416            1 :       break;
     417            1 :     case PROP_MQTT_PUB_TOPIC:
     418            1 :       g_value_set_string (value, gst_mqtt_sink_get_pub_topic (self));
     419            1 :       break;
     420            2 :     case PROP_MQTT_PUB_WAIT_TIMEOUT:
     421            2 :       g_value_set_ulong (value, gst_mqtt_sink_get_pub_wait_timeout (self));
     422            2 :       break;
     423            1 :     case PROP_MQTT_OPT_CLEANSESSION:
     424            1 :       g_value_set_boolean (value, gst_mqtt_sink_get_opt_cleansession (self));
     425            1 :       break;
     426            2 :     case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
     427            2 :       g_value_set_int (value, gst_mqtt_sink_get_opt_keep_alive_interval (self));
     428            2 :       break;
     429            1 :     case PROP_MAX_MSG_BUF_SIZE:
     430            1 :       g_value_set_ulong (value, gst_mqtt_sink_get_max_msg_buf_size (self));
     431            1 :       break;
     432            2 :     case PROP_NUM_BUFFERS:
     433            2 :       g_value_set_int (value, gst_mqtt_sink_get_num_buffers (self));
     434            2 :       break;
     435            2 :     case PROP_MQTT_QOS:
     436            2 :       g_value_set_int (value, gst_mqtt_sink_get_mqtt_qos (self));
     437            2 :       break;
     438            1 :     case PROP_MQTT_NTP_SYNC:
     439            1 :       g_value_set_boolean (value, gst_mqtt_sink_get_mqtt_ntp_sync (self));
     440            1 :       break;
     441            1 :     case PROP_MQTT_NTP_SRVS:
     442            1 :       g_value_set_string (value, gst_mqtt_sink_get_mqtt_ntp_srvs (self));
     443            1 :       break;
     444            0 :     default:
     445            0 :       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
     446            0 :       break;
     447              :   }
     448           18 : }
     449              : 
     450              : /**
     451              :  * @brief Finalize GstMqttSinkClass object
     452              :  */
     453              : static void
     454           10 : gst_mqtt_sink_class_finalize (GObject * object)
     455              : {
     456           10 :   GstMqttSink *self = GST_MQTT_SINK (object);
     457              : 
     458           10 :   g_free (self->mqtt_host_address);
     459           10 :   self->mqtt_host_address = NULL;
     460           10 :   g_free (self->mqtt_host_port);
     461           10 :   self->mqtt_host_port = NULL;
     462           10 :   if (self->mqtt_client_handle) {
     463            0 :     MQTTAsync_destroy (&self->mqtt_client_handle);
     464            0 :     self->mqtt_client_handle = NULL;
     465              :   }
     466           10 :   g_free (self->mqtt_client_id);
     467           10 :   self->mqtt_client_id = NULL;
     468           10 :   g_free (self->mqtt_msg_buf);
     469           10 :   self->mqtt_msg_buf = NULL;
     470           10 :   g_free (self->mqtt_topic);
     471           10 :   self->mqtt_topic = NULL;
     472           10 :   gst_caps_replace (&self->in_caps, NULL);
     473           10 :   g_free (self->mqtt_ntp_srvs);
     474           10 :   self->mqtt_ntp_srvs = NULL;
     475           10 :   self->mqtt_ntp_num_srvs = 0;
     476           10 :   g_strfreev (self->mqtt_ntp_hnames);
     477           10 :   self->mqtt_ntp_hnames = NULL;
     478           10 :   g_free (self->mqtt_ntp_ports);
     479           10 :   self->mqtt_ntp_ports = NULL;
     480              : 
     481           10 :   if (self->err)
     482            0 :     g_error_free (self->err);
     483           10 :   g_mutex_clear (&self->mqtt_sink_mutex);
     484           10 :   G_OBJECT_CLASS (parent_class)->finalize (object);
     485           10 : }
     486              : 
     487              : /**
     488              :  * @brief Handle mqttsink's state change
     489              :  */
     490              : static GstStateChangeReturn
     491           52 : gst_mqtt_sink_change_state (GstElement * element, GstStateChange transition)
     492              : {
     493           52 :   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
     494           52 :   GstMqttSink *self = GST_MQTT_SINK (element);
     495              :   GstClock *elem_clock;
     496              :   GstClockTime base_time;
     497              :   GstClockTime cur_time;
     498              :   GstClockTimeDiff diff;
     499              : 
     500           52 :   switch (transition) {
     501           10 :     case GST_STATE_CHANGE_NULL_TO_READY:
     502           10 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_NULL_TO_READY");
     503           10 :       if (self->err) {
     504            0 :         g_printerr ("%s: %s\n", g_quark_to_string (self->err->domain),
     505            0 :             self->err->message);
     506            0 :         return GST_STATE_CHANGE_FAILURE;
     507              :       }
     508           10 :       break;
     509            8 :     case GST_STATE_CHANGE_READY_TO_PAUSED:
     510            8 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_PAUSED");
     511            8 :       break;
     512            8 :     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
     513            8 :       if (self->mqtt_ntp_sync)
     514            0 :         self->get_epoch_func = ntputil_get_epoch;
     515            8 :       self->base_time_epoch = GST_CLOCK_TIME_NONE;
     516            8 :       elem_clock = gst_element_get_clock (element);
     517            8 :       if (!elem_clock)
     518            0 :         break;
     519            8 :       base_time = gst_element_get_base_time (element);
     520            8 :       cur_time = gst_clock_get_time (elem_clock);
     521            8 :       gst_object_unref (elem_clock);
     522            8 :       diff = GST_CLOCK_DIFF (base_time, cur_time);
     523            8 :       self->base_time_epoch =
     524           16 :           self->get_epoch_func (self->mqtt_ntp_num_srvs, self->mqtt_ntp_hnames,
     525            8 :           self->mqtt_ntp_ports) * GST_US_TO_NS_MULTIPLIER - diff;
     526            8 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_PLAYING");
     527            8 :       break;
     528           26 :     default:
     529           26 :       break;
     530              :   }
     531              : 
     532           52 :   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
     533              : 
     534           52 :   switch (transition) {
     535            8 :     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
     536            8 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PLAYING_TO_PAUSED");
     537            8 :       break;
     538            8 :     case GST_STATE_CHANGE_PAUSED_TO_READY:
     539            8 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_READY");
     540            8 :       break;
     541            8 :     case GST_STATE_CHANGE_READY_TO_NULL:
     542            8 :       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_NULL");
     543              :     default:
     544           36 :       break;
     545              :   }
     546              : 
     547           52 :   return ret;
     548              : }
     549              : 
     550              : /**
     551              :  * @brief Start mqttsink, called when state changed null to ready
     552              :  */
     553              : static gboolean
     554           10 : gst_mqtt_sink_start (GstBaseSink * basesink)
     555              : {
     556           10 :   GstMqttSink *self = GST_MQTT_SINK (basesink);
     557           10 :   gchar *haddr = g_strdup_printf ("%s:%s", self->mqtt_host_address,
     558              :       self->mqtt_host_port);
     559              :   int ret;
     560              :   gint64 end_time;
     561              : 
     562           10 :   if (!g_strcmp0 (DEFAULT_MQTT_CLIENT_ID, self->mqtt_client_id)) {
     563           10 :     g_free (self->mqtt_client_id);
     564           10 :     self->mqtt_client_id = g_strdup_printf (DEFAULT_MQTT_CLIENT_ID_FORMAT,
     565           10 :         g_get_host_name (), getpid (), sink_client_id++);
     566              :   }
     567              : 
     568           10 :   if (!g_strcmp0 (DEFAULT_MQTT_PUB_TOPIC, self->mqtt_topic)) {
     569            8 :     self->mqtt_topic = g_strdup_printf (DEFAULT_MQTT_PUB_TOPIC_FORMAT,
     570              :         self->mqtt_client_id);
     571              :   }
     572              : 
     573              :   /**
     574              :    * @todo Support other persistence mechanisms
     575              :    *    MQTTCLIENT_PERSISTENCE_NONE: A memory-based persistence mechanism
     576              :    *    MQTTCLIENT_PERSISTENCE_DEFAULT: The default file system-based
     577              :    *                                    persistence mechanism
     578              :    *    MQTTCLIENT_PERSISTENCE_USER: An application-specific persistence
     579              :    *                                 mechanism
     580              :    */
     581           20 :   ret = MQTTAsync_create (&self->mqtt_client_handle, haddr,
     582           10 :       self->mqtt_client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
     583           10 :   g_free (haddr);
     584           10 :   if (ret != MQTTASYNC_SUCCESS)
     585            0 :     return FALSE;
     586              : 
     587           10 :   MQTTAsync_setCallbacks (self->mqtt_client_handle, self,
     588              :       cb_mqtt_on_connection_lost, cb_mqtt_on_message_arrived,
     589              :       cb_mqtt_on_delivery_complete);
     590              : 
     591           10 :   ret = MQTTAsync_connect (self->mqtt_client_handle, &self->mqtt_conn_opts);
     592           10 :   if (ret != MQTTASYNC_SUCCESS) {
     593            0 :     goto error;
     594              :   }
     595              : 
     596              :   /* Waiting for the connection */
     597           10 :   end_time = g_get_monotonic_time () +
     598              :       DEFAULT_MQTT_CONN_TIMEOUT_SEC * G_TIME_SPAN_SECOND;
     599           10 :   g_mutex_lock (&self->mqtt_sink_mutex);
     600           12 :   while (!self->is_connected) {
     601            4 :     if (!g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
     602              :             end_time)) {
     603            2 :       g_mutex_unlock (&self->mqtt_sink_mutex);
     604            2 :       g_critical ("Failed to connect to MQTT broker from mqttsink."
     605              :           "Please check broker is running status or broker host address.");
     606            2 :       goto error;
     607              :     }
     608              :   }
     609            8 :   g_mutex_unlock (&self->mqtt_sink_mutex);
     610              : 
     611            8 :   return TRUE;
     612              : 
     613            2 : error:
     614            2 :   MQTTAsync_destroy (&self->mqtt_client_handle);
     615            2 :   self->mqtt_client_handle = NULL;
     616            2 :   return FALSE;
     617              : }
     618              : 
     619              : /**
     620              :  * @brief Stop mqttsink, called when state changed ready to null
     621              :  */
     622              : static gboolean
     623            8 : gst_mqtt_sink_stop (GstBaseSink * basesink)
     624              : {
     625            8 :   GstMqttSink *self = GST_MQTT_SINK (basesink);
     626            8 :   MQTTAsync_disconnectOptions disconn_opts =
     627              :       MQTTAsync_disconnectOptions_initializer;
     628              : 
     629            8 :   disconn_opts.timeout = DEFAULT_MQTT_DISCONNECT_TIMEOUT;
     630            8 :   disconn_opts.onSuccess = cb_mqtt_on_disconnect;
     631            8 :   disconn_opts.onFailure = cb_mqtt_on_disconnect_failure;
     632            8 :   disconn_opts.context = self;
     633              : 
     634            8 :   g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_STOPPED);
     635            8 :   while (MQTTAsync_isConnected (self->mqtt_client_handle)) {
     636            8 :     gint64 end_time = g_get_monotonic_time () + DEFAULT_MQTT_DISCONNECT_TIMEOUT;
     637              :     mqtt_sink_state_t cur_state;
     638              : 
     639            8 :     MQTTAsync_disconnect (self->mqtt_client_handle, &disconn_opts);
     640            8 :     g_mutex_lock (&self->mqtt_sink_mutex);
     641            8 :     self->is_connected = FALSE;
     642            8 :     g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
     643              :         end_time);
     644            8 :     g_mutex_unlock (&self->mqtt_sink_mutex);
     645            8 :     cur_state = g_atomic_int_get (&self->mqtt_sink_state);
     646              : 
     647            8 :     if ((cur_state == MQTT_DISCONNECTED) ||
     648            0 :         (cur_state == MQTT_DISCONNECT_FAILED) ||
     649            0 :         (cur_state == SINK_RENDER_EOS) || (cur_state == SINK_RENDER_ERROR))
     650              :       break;
     651              :   }
     652            8 :   MQTTAsync_destroy (&self->mqtt_client_handle);
     653            8 :   self->mqtt_client_handle = NULL;
     654            8 :   return TRUE;
     655              : }
     656              : 
     657              : /**
     658              :  * @brief Perform queries on the element
     659              :  */
     660              : static gboolean
     661           18 : gst_mqtt_sink_query (GstBaseSink * basesink, GstQuery * query)
     662              : {
     663           18 :   gboolean ret = FALSE;
     664              : 
     665           18 :   switch (GST_QUERY_TYPE (query)) {
     666            0 :     case GST_QUERY_SEEKING:{
     667              :       GstFormat fmt;
     668              : 
     669              :       /* GST_QUERY_SEEKING is not supported */
     670            0 :       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
     671            0 :       gst_query_set_seeking (query, fmt, FALSE, 0, -1);
     672            0 :       ret = TRUE;
     673            0 :       break;
     674              :     }
     675           18 :     default:{
     676           18 :       ret = GST_BASE_SINK_CLASS (parent_class)->query (basesink, query);
     677           18 :       break;
     678              :     }
     679              :   }
     680              : 
     681           18 :   return ret;
     682              : }
     683              : 
     684              : /**
     685              :  * @brief A utility function to set the timestamp information onto the given buffer
     686              :  */
     687              : static void
     688           24 : _put_timestamp_to_msg_buf_hdr (GstMqttSink * self, GstBuffer * gst_buf,
     689              :     GstMQTTMessageHdr * hdr)
     690              : {
     691           24 :   hdr->base_time_epoch = self->base_time_epoch;
     692           48 :   hdr->sent_time_epoch = self->get_epoch_func (self->mqtt_ntp_num_srvs,
     693           24 :       self->mqtt_ntp_hnames, self->mqtt_ntp_ports) * GST_US_TO_NS_MULTIPLIER;
     694              : 
     695           24 :   hdr->duration = GST_BUFFER_DURATION_IS_VALID (gst_buf) ?
     696           24 :       GST_BUFFER_DURATION (gst_buf) : GST_CLOCK_TIME_NONE;
     697              : 
     698           24 :   hdr->dts = GST_BUFFER_DTS_IS_VALID (gst_buf) ?
     699           24 :       GST_BUFFER_DTS (gst_buf) : GST_CLOCK_TIME_NONE;
     700              : 
     701           24 :   hdr->pts = GST_BUFFER_PTS_IS_VALID (gst_buf) ?
     702           24 :       GST_BUFFER_PTS (gst_buf) : GST_CLOCK_TIME_NONE;
     703              : 
     704           24 :   if (self->debug) {
     705           24 :     GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
     706              :     GstClock *clock;
     707              : 
     708           24 :     clock = gst_element_get_clock (GST_ELEMENT (self));
     709              : 
     710           24 :     GST_DEBUG_OBJECT (self,
     711              :         "%s now %" GST_TIME_FORMAT " ts %" GST_TIME_FORMAT " sent %"
     712              :         GST_TIME_FORMAT, self->mqtt_topic,
     713              :         GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
     714              :         GST_TIME_ARGS (hdr->pts),
     715              :         GST_TIME_ARGS (hdr->sent_time_epoch - hdr->base_time_epoch));
     716              : 
     717           24 :     gst_object_unref (clock);
     718              :   }
     719           24 : }
     720              : 
     721              : /**
     722              :  * @brief A utility function to set the message header
     723              :  */
     724              : static gboolean
     725           24 : _mqtt_set_msg_buf_hdr (GstBuffer * gst_buf, GstMQTTMessageHdr * hdr)
     726              : {
     727           24 :   gboolean ret = TRUE;
     728              :   guint i;
     729              : 
     730           24 :   hdr->num_mems = gst_buffer_n_memory (gst_buf);
     731           47 :   for (i = 0; i < hdr->num_mems; ++i) {
     732              :     GstMemory *each_mem;
     733              : 
     734           23 :     each_mem = gst_buffer_peek_memory (gst_buf, i);
     735           23 :     if (!each_mem) {
     736            0 :       memset (hdr, 0x0, sizeof (*hdr));
     737            0 :       ret = FALSE;
     738            0 :       break;
     739              :     }
     740              : 
     741           23 :     hdr->size_mems[i] = each_mem->size;
     742              :   }
     743              : 
     744           24 :   return ret;
     745              : }
     746              : 
     747              : /**
     748              :  * @brief The callback to process each buffer receiving on the sink pad
     749              :  */
     750              : static GstFlowReturn
     751           25 : gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * in_buf)
     752              : {
     753           25 :   const gsize in_buf_size = gst_buffer_get_size (in_buf);
     754              :   static gboolean is_static_sized_buf = FALSE;
     755           25 :   GstMqttSink *self = GST_MQTT_SINK (basesink);
     756           25 :   GstFlowReturn ret = GST_FLOW_ERROR;
     757              :   mqtt_sink_state_t cur_state;
     758              :   GstMemory *in_buf_mem;
     759              :   GstMapInfo in_buf_map;
     760              :   gint mqtt_rc;
     761              :   guint8 *msg_pub;
     762              : 
     763           25 :   while ((cur_state =
     764           25 :           g_atomic_int_get (&self->mqtt_sink_state)) != MQTT_CONNECTED) {
     765            0 :     gint64 end_time = g_get_monotonic_time ();
     766              :     mqtt_sink_state_t _state;
     767              : 
     768            0 :     end_time += (self->mqtt_pub_wait_timeout * G_TIME_SPAN_SECOND);
     769            0 :     g_mutex_lock (&self->mqtt_sink_mutex);
     770            0 :     g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
     771              :         end_time);
     772            0 :     g_mutex_unlock (&self->mqtt_sink_mutex);
     773              : 
     774            0 :     _state = g_atomic_int_get (&self->mqtt_sink_state);
     775            0 :     switch (_state) {
     776            0 :       case MQTT_CONNECT_FAILURE:
     777              :       case MQTT_DISCONNECTED:
     778              :       case MQTT_CONNECTION_LOST:
     779              :       case SINK_RENDER_ERROR:
     780            0 :         ret = GST_FLOW_ERROR;
     781            0 :         break;
     782            0 :       case SINK_RENDER_EOS:
     783            0 :         ret = GST_FLOW_EOS;
     784            0 :         break;
     785            0 :       default:
     786            0 :         continue;
     787              :     }
     788            0 :     goto ret_with;
     789              :   }
     790              : 
     791           25 :   if (self->num_buffers == 0) {
     792            1 :     ret = GST_FLOW_EOS;
     793            1 :     goto ret_with;
     794              :   }
     795              : 
     796           24 :   if (self->num_buffers != -1) {
     797           20 :     self->num_buffers -= 1;
     798              :   }
     799              : 
     800           24 :   if ((!is_static_sized_buf) && (self->mqtt_msg_buf) &&
     801           18 :       (self->mqtt_msg_buf_size != 0) &&
     802           18 :       (self->mqtt_msg_buf_size < in_buf_size + GST_MQTT_LEN_MSG_HDR)) {
     803            0 :     g_free (self->mqtt_msg_buf);
     804            0 :     self->mqtt_msg_buf = NULL;
     805            0 :     self->mqtt_msg_buf_size = 0;
     806              :   }
     807              : 
     808              :   /** Allocate a message buffer */
     809           24 :   if ((!self->mqtt_msg_buf) && (self->mqtt_msg_buf_size == 0)) {
     810            6 :     if (self->max_msg_buf_size == 0) {
     811            6 :       self->mqtt_msg_buf_size = in_buf_size + GST_MQTT_LEN_MSG_HDR;
     812              :     } else {
     813            0 :       if (self->max_msg_buf_size < in_buf_size) {
     814            0 :         g_printerr ("%s: The given size for a message buffer is too small: "
     815              :             "given (%" G_GSIZE_FORMAT " bytes) vs. incoming (%" G_GSIZE_FORMAT
     816              :             " bytes)\n", TAG_ERR_MQTTSINK, self->max_msg_buf_size, in_buf_size);
     817            0 :         ret = GST_FLOW_ERROR;
     818            0 :         goto ret_with;
     819              :       }
     820            0 :       self->mqtt_msg_buf_size = self->max_msg_buf_size + GST_MQTT_LEN_MSG_HDR;
     821            0 :       is_static_sized_buf = TRUE;
     822              :     }
     823              : 
     824            6 :     self->mqtt_msg_buf = g_try_malloc0 (self->mqtt_msg_buf_size);
     825              :   }
     826              : 
     827           24 :   if (!_mqtt_set_msg_buf_hdr (in_buf, &self->mqtt_msg_hdr)) {
     828            0 :     ret = GST_FLOW_ERROR;
     829            0 :     goto ret_with;
     830              :   }
     831              : 
     832           24 :   msg_pub = self->mqtt_msg_buf;
     833           24 :   if (!msg_pub) {
     834            0 :     self->mqtt_msg_buf_size = 0;
     835            0 :     ret = GST_FLOW_ERROR;
     836            0 :     goto ret_with;
     837              :   }
     838           24 :   memcpy (msg_pub, &self->mqtt_msg_hdr, sizeof (self->mqtt_msg_hdr));
     839           24 :   _put_timestamp_to_msg_buf_hdr (self, in_buf, (GstMQTTMessageHdr *) msg_pub);
     840              : 
     841           24 :   in_buf_mem = gst_buffer_get_all_memory (in_buf);
     842           24 :   if (!in_buf_mem) {
     843            1 :     ret = GST_FLOW_ERROR;
     844            1 :     goto ret_with;
     845              :   }
     846              : 
     847           23 :   if (!gst_memory_map (in_buf_mem, &in_buf_map, GST_MAP_READ)) {
     848            0 :     ret = GST_FLOW_ERROR;
     849            0 :     goto ret_unref_in_buf_mem;
     850              :   }
     851              : 
     852           23 :   ret = GST_FLOW_OK;
     853              : 
     854           23 :   memcpy (&msg_pub[sizeof (self->mqtt_msg_hdr)], in_buf_map.data,
     855              :       in_buf_map.size);
     856           46 :   mqtt_rc = MQTTAsync_send (self->mqtt_client_handle, self->mqtt_topic,
     857           23 :       GST_MQTT_LEN_MSG_HDR + in_buf_map.size, self->mqtt_msg_buf,
     858              :       self->mqtt_qos, 1, &self->mqtt_respn_opts);
     859           23 :   if (mqtt_rc != MQTTASYNC_SUCCESS) {
     860            1 :     ret = GST_FLOW_ERROR;
     861              :   }
     862              : 
     863           23 :   gst_memory_unmap (in_buf_mem, &in_buf_map);
     864              : 
     865           23 : ret_unref_in_buf_mem:
     866           23 :   gst_memory_unref (in_buf_mem);
     867              : 
     868           25 : ret_with:
     869           25 :   return ret;
     870              : }
     871              : 
     872              : /**
     873              :  * @brief The callback to process GstBufferList (instead of a single buffer)
     874              :  *        on the sink pad
     875              :  */
     876              : static GstFlowReturn
     877            0 : gst_mqtt_sink_render_list (GstBaseSink * basesink, GstBufferList * list)
     878              : {
     879            0 :   guint num_buffers = gst_buffer_list_length (list);
     880              :   GstFlowReturn ret;
     881              :   GstBuffer *buffer;
     882              :   guint i;
     883              : 
     884            0 :   for (i = 0; i < num_buffers; ++i) {
     885            0 :     buffer = gst_buffer_list_get (list, i);
     886            0 :     ret = gst_mqtt_sink_render (basesink, buffer);
     887            0 :     if (ret != GST_FLOW_OK)
     888            0 :       break;
     889              :   }
     890              : 
     891            0 :   return ret;
     892              : }
     893              : 
     894              : /**
     895              :  * @brief Handle events arriving on the sink pad
     896              :  */
     897              : static gboolean
     898           12 : gst_mqtt_sink_event (GstBaseSink * basesink, GstEvent * event)
     899              : {
     900           12 :   GstMqttSink *self = GST_MQTT_SINK (basesink);
     901           12 :   GstEventType type = GST_EVENT_TYPE (event);
     902           12 :   gboolean ret = FALSE;
     903              : 
     904           12 :   switch (type) {
     905            0 :     case GST_EVENT_EOS:
     906            0 :       g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_EOS);
     907            0 :       g_mutex_lock (&self->mqtt_sink_mutex);
     908            0 :       g_cond_broadcast (&self->mqtt_sink_gcond);
     909            0 :       g_mutex_unlock (&self->mqtt_sink_mutex);
     910            0 :       break;
     911           12 :     default:
     912           12 :       break;
     913              :   }
     914              : 
     915           12 :   ret = GST_BASE_SINK_CLASS (parent_class)->event (basesink, event);
     916              : 
     917           12 :   return ret;
     918              : }
     919              : 
     920              : /**
     921              :  * @brief An implementation of the set_caps vmethod in GstBaseSinkClass
     922              :  */
     923              : static gboolean
     924            3 : gst_mqtt_sink_set_caps (GstBaseSink * basesink, GstCaps * caps)
     925              : {
     926            3 :   GstMqttSink *self = GST_MQTT_SINK (basesink);
     927              :   gboolean ret;
     928              : 
     929            3 :   ret = gst_caps_replace (&self->in_caps, caps);
     930              : 
     931            3 :   if (ret && gst_caps_is_fixed (self->in_caps)) {
     932            3 :     gchar *caps_str = gst_caps_to_string (caps);
     933              :     gsize len;
     934              : 
     935            3 :     if (caps_str == NULL) {
     936            0 :       g_critical ("Fail to convert caps to string representation");
     937            0 :       return FALSE;
     938              :     }
     939              : 
     940            3 :     len = g_strlcpy (self->mqtt_msg_hdr.gst_caps_str, caps_str,
     941              :         GST_MQTT_MAX_LEN_GST_CAPS_STR);
     942              : 
     943            3 :     if (len >= GST_MQTT_MAX_LEN_GST_CAPS_STR) {
     944            0 :       g_critical ("Fail to copy caps_str.");
     945            0 :       ret = FALSE;
     946              :     }
     947              : 
     948            3 :     g_free (caps_str);
     949              :   }
     950              : 
     951            3 :   return ret;
     952              : }
     953              : 
     954              : /**
     955              :  * @brief Getter for the 'debug' property.
     956              :  */
     957              : static gboolean
     958            2 : gst_mqtt_sink_get_debug (GstMqttSink * self)
     959              : {
     960            2 :   return self->debug;
     961              : }
     962              : 
     963              : /**
     964              :  * @brief Setter for the 'debug' property.
     965              :  */
     966              : static void
     967            7 : gst_mqtt_sink_set_debug (GstMqttSink * self, const gboolean flag)
     968              : {
     969            7 :   self->debug = flag;
     970            7 : }
     971              : 
     972              : /**
     973              :  * @brief Getter for the 'client-id' property.
     974              :  */
     975              : static gchar *
     976            1 : gst_mqtt_sink_get_client_id (GstMqttSink * self)
     977              : {
     978            1 :   return self->mqtt_client_id;
     979              : }
     980              : 
     981              : /**
     982              :  * @brief Setter for the 'client-id' property.
     983              :  */
     984              : static void
     985            1 : gst_mqtt_sink_set_client_id (GstMqttSink * self, const gchar * id)
     986              : {
     987            1 :   g_free (self->mqtt_client_id);
     988            1 :   self->mqtt_client_id = g_strdup (id);
     989            1 : }
     990              : 
     991              : /**
     992              :  * @brief Getter for the 'host' property.
     993              :  */
     994              : static gchar *
     995            1 : gst_mqtt_sink_get_host_address (GstMqttSink * self)
     996              : {
     997            1 :   return self->mqtt_host_address;
     998              : }
     999              : 
    1000              : /**
    1001              :  * @brief Setter for the 'host' property
    1002              :  */
    1003              : static void
    1004            2 : gst_mqtt_sink_set_host_address (GstMqttSink * self, const gchar * addr)
    1005              : {
    1006              :   /**
    1007              :    * @todo Handle the case where the addr is changed at runtime
    1008              :    */
    1009            2 :   g_free (self->mqtt_host_address);
    1010            2 :   self->mqtt_host_address = g_strdup (addr);
    1011            2 : }
    1012              : 
    1013              : /**
    1014              :  * @brief Getter for the 'port' property.
    1015              :  */
    1016              : static gchar *
    1017            1 : gst_mqtt_sink_get_host_port (GstMqttSink * self)
    1018              : {
    1019            1 :   return self->mqtt_host_port;
    1020              : }
    1021              : 
    1022              : /**
    1023              :  * @brief Setter for the 'port' property
    1024              :  */
    1025              : static void
    1026            2 : gst_mqtt_sink_set_host_port (GstMqttSink * self, const gchar * port)
    1027              : {
    1028            2 :   g_free (self->mqtt_host_port);
    1029            2 :   self->mqtt_host_port = g_strdup (port);
    1030            2 : }
    1031              : 
    1032              : /**
    1033              :  * @brief Getter for the 'pub-topic' property
    1034              :  */
    1035              : static gchar *
    1036            1 : gst_mqtt_sink_get_pub_topic (GstMqttSink * self)
    1037              : {
    1038            1 :   return self->mqtt_topic;
    1039              : }
    1040              : 
    1041              : /**
    1042              :  * @brief Setter for the 'pub-topic' property
    1043              :  */
    1044              : static void
    1045            3 : gst_mqtt_sink_set_pub_topic (GstMqttSink * self, const gchar * topic)
    1046              : {
    1047            3 :   g_free (self->mqtt_topic);
    1048            3 :   self->mqtt_topic = g_strdup (topic);
    1049            3 : }
    1050              : 
    1051              : /**
    1052              :  * @brief Getter for the 'cleansession' property.
    1053              :  */
    1054              : static gboolean
    1055            1 : gst_mqtt_sink_get_opt_cleansession (GstMqttSink * self)
    1056              : {
    1057            1 :   return self->mqtt_conn_opts.cleansession;
    1058              : }
    1059              : 
    1060              : /**
    1061              :  * @brief Setter for the 'cleansession' property.
    1062              :  */
    1063              : static void
    1064            1 : gst_mqtt_sink_set_opt_cleansession (GstMqttSink * self, const gboolean val)
    1065              : {
    1066            1 :   self->mqtt_conn_opts.cleansession = val;
    1067            1 : }
    1068              : 
    1069              : /**
    1070              :  * @brief Getter for the 'pub-wait-timeout' property.
    1071              :  */
    1072              : static gulong
    1073            2 : gst_mqtt_sink_get_pub_wait_timeout (GstMqttSink * self)
    1074              : {
    1075            2 :   return self->mqtt_pub_wait_timeout;
    1076              : }
    1077              : 
    1078              : /**
    1079              :  * @brief Setter for the 'pub-wait-timeout' property.
    1080              :  */
    1081              : static void
    1082            1 : gst_mqtt_sink_set_pub_wait_timeout (GstMqttSink * self, const gulong to)
    1083              : {
    1084            1 :   self->mqtt_pub_wait_timeout = to;
    1085            1 : }
    1086              : 
    1087              : /**
    1088              :  * @brief Getter for the 'keep-alive-interval' property
    1089              :  */
    1090              : static gint
    1091            2 : gst_mqtt_sink_get_opt_keep_alive_interval (GstMqttSink * self)
    1092              : {
    1093            2 :   return self->mqtt_conn_opts.keepAliveInterval;
    1094              : }
    1095              : 
    1096              : /**
    1097              :  * @brief Setter for the 'keep-alive-interval' property
    1098              :  */
    1099              : static void
    1100            1 : gst_mqtt_sink_set_opt_keep_alive_interval (GstMqttSink * self, const gint num)
    1101              : {
    1102            1 :   self->mqtt_conn_opts.keepAliveInterval = num;
    1103            1 : }
    1104              : 
    1105              : /**
    1106              :  * @brief Getter for the 'max-buffer-size' property.
    1107              :  */
    1108              : static gsize
    1109            1 : gst_mqtt_sink_get_max_msg_buf_size (GstMqttSink * self)
    1110              : {
    1111            1 :   return self->max_msg_buf_size;
    1112              : }
    1113              : 
    1114              : /**
    1115              :  * @brief Setter for the 'max-buffer-size' property.
    1116              :  */
    1117              : static void
    1118            1 : gst_mqtt_sink_set_max_msg_buf_size (GstMqttSink * self, const gsize size)
    1119              : {
    1120            1 :   self->max_msg_buf_size = size;
    1121            1 : }
    1122              : 
    1123              : /**
    1124              :  * @brief Getter for the 'num-buffers' property.
    1125              :  */
    1126              : static gint
    1127            2 : gst_mqtt_sink_get_num_buffers (GstMqttSink * self)
    1128              : {
    1129              :   gint num_buffers;
    1130              : 
    1131            2 :   num_buffers = self->num_buffers;
    1132              : 
    1133            2 :   return num_buffers;
    1134              : }
    1135              : 
    1136              : /**
    1137              :  * @brief Setter for the 'num-buffers' property
    1138              :  */
    1139              : static void
    1140            3 : gst_mqtt_sink_set_num_buffers (GstMqttSink * self, const gint num)
    1141              : {
    1142            3 :   self->num_buffers = num;
    1143            3 : }
    1144              : 
    1145              : /**
    1146              :  * @brief Getter for the 'mqtt-qos' property.
    1147              :  */
    1148              : static gint
    1149            2 : gst_mqtt_sink_get_mqtt_qos (GstMqttSink * self)
    1150              : {
    1151            2 :   return self->mqtt_qos;
    1152              : }
    1153              : 
    1154              : /**
    1155              :  * @brief Setter for the 'mqtt-qos' property
    1156              :  */
    1157              : static void
    1158            1 : gst_mqtt_sink_set_mqtt_qos (GstMqttSink * self, const gint qos)
    1159              : {
    1160            1 :   self->mqtt_qos = qos;
    1161            1 : }
    1162              : 
    1163              : /**
    1164              :  * @brief Getter for the 'ntp-sync' property.
    1165              :  */
    1166              : static gboolean
    1167            1 : gst_mqtt_sink_get_mqtt_ntp_sync (GstMqttSink * self)
    1168              : {
    1169            1 :   return self->mqtt_ntp_sync;
    1170              : }
    1171              : 
    1172              : /**
    1173              :  * @brief Setter for the 'ntp-sync' property
    1174              :  */
    1175              : static void
    1176            2 : gst_mqtt_sink_set_mqtt_ntp_sync (GstMqttSink * self, const gboolean flag)
    1177              : {
    1178            2 :   self->mqtt_ntp_sync = flag;
    1179            2 : }
    1180              : 
    1181              : /**
    1182              :  * @brief Getter for the 'ntp-srvs' property.
    1183              :  */
    1184              : static gchar *
    1185            1 : gst_mqtt_sink_get_mqtt_ntp_srvs (GstMqttSink * self)
    1186              : {
    1187            1 :   return self->mqtt_ntp_srvs;
    1188              : }
    1189              : 
    1190              : /**
    1191              :  * @brief Setter for the 'ntp-srvs' property
    1192              :  */
    1193              : static void
    1194            1 : gst_mqtt_sink_set_mqtt_ntp_srvs (GstMqttSink * self, const gchar * pairs)
    1195              : {
    1196            1 :   gchar **pair_arrs = NULL;
    1197            1 :   guint hnum = 0;
    1198              :   gchar *pair;
    1199              :   guint i, j;
    1200              : 
    1201            1 :   if (g_strcmp0 (self->mqtt_ntp_srvs, pairs) == 0)
    1202            0 :     return;
    1203              : 
    1204            1 :   g_free (self->mqtt_ntp_srvs);
    1205            1 :   self->mqtt_ntp_srvs = g_strdup (pairs);
    1206              : 
    1207            1 :   pair_arrs = g_strsplit (pairs, ",", -1);
    1208            1 :   if (pair_arrs == NULL)
    1209            0 :     return;
    1210              : 
    1211            1 :   hnum = g_strv_length (pair_arrs);
    1212            1 :   if (hnum == 0)
    1213            0 :     goto err_free_pair_arrs;
    1214              : 
    1215            1 :   g_free (self->mqtt_ntp_hnames);
    1216            1 :   self->mqtt_ntp_hnames = g_try_malloc0 ((hnum + 1) * sizeof (gchar *));
    1217            1 :   if (!self->mqtt_ntp_hnames)
    1218            0 :     goto err_free_pair_arrs;
    1219              : 
    1220            1 :   g_free (self->mqtt_ntp_ports);
    1221            1 :   self->mqtt_ntp_ports = g_try_malloc0 (hnum * sizeof (guint16));
    1222            1 :   if (!self->mqtt_ntp_ports)
    1223            0 :     goto err_free_mqtt_ntp_hnames;
    1224              : 
    1225            1 :   self->mqtt_ntp_num_srvs = hnum;
    1226            2 :   for (i = 0, j = 0; i < hnum; i++) {
    1227              :     gchar **hname_port;
    1228              :     gchar *hname;
    1229              :     gchar *eport;
    1230              :     gulong port_ul;
    1231              : 
    1232            1 :     pair = pair_arrs[i];
    1233            1 :     hname_port = g_strsplit (pair, ":", 2);
    1234            1 :     hname = hname_port[0];
    1235            1 :     port_ul = strtoul (hname_port[1], &eport, 10);
    1236            1 :     if ((port_ul == 0) || (port_ul > UINT16_MAX)) {
    1237            0 :       self->mqtt_ntp_num_srvs--;
    1238              :     } else {
    1239            1 :       self->mqtt_ntp_hnames[j] = g_strdup (hname);
    1240            1 :       self->mqtt_ntp_ports[j] = (uint16_t) port_ul;
    1241            1 :       ++j;
    1242              :     }
    1243              : 
    1244            1 :     g_strfreev (hname_port);
    1245              :   }
    1246              : 
    1247            1 :   g_strfreev (pair_arrs);
    1248            1 :   return;
    1249              : 
    1250            0 : err_free_mqtt_ntp_hnames:
    1251            0 :   g_strfreev (self->mqtt_ntp_hnames);
    1252            0 :   self->mqtt_ntp_hnames = NULL;
    1253              : 
    1254            0 : err_free_pair_arrs:
    1255            0 :   g_strfreev (pair_arrs);
    1256              : 
    1257            0 :   return;
    1258              : }
    1259              : 
    1260              : /** Callback function definitions */
    1261              : /**
    1262              :  * @brief A callback function corresponding to MQTTAsync_connectOptions's
    1263              :  *        onSuccess. This callback is invoked when the connection between
    1264              :  *        this element and the broker is properly established.
    1265              :  */
    1266              : static void
    1267            8 : cb_mqtt_on_connect (void *context, MQTTAsync_successData * response)
    1268              : {
    1269            8 :   GstMqttSink *self = (GstMqttSink *) context;
    1270              :   UNUSED (response);
    1271              : 
    1272            8 :   g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECTED);
    1273            8 :   g_mutex_lock (&self->mqtt_sink_mutex);
    1274            8 :   self->is_connected = TRUE;
    1275            8 :   g_cond_broadcast (&self->mqtt_sink_gcond);
    1276            8 :   g_mutex_unlock (&self->mqtt_sink_mutex);
    1277            8 : }
    1278              : 
    1279              : /**
    1280              :  * @brief A callback function corresponding to MQTTAsync_connectOptions's
    1281              :  *        onFailure. This callback is invoked when it is failed to connect to
    1282              :  *        the broker.
    1283              :  */
    1284              : static void
    1285            2 : cb_mqtt_on_connect_failure (void *context, MQTTAsync_failureData * response)
    1286              : {
    1287            2 :   GstMqttSink *self = (GstMqttSink *) context;
    1288              :   UNUSED (response);
    1289              : 
    1290            2 :   g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECT_FAILURE);
    1291            2 :   g_mutex_lock (&self->mqtt_sink_mutex);
    1292            2 :   self->is_connected = FALSE;
    1293            2 :   g_cond_broadcast (&self->mqtt_sink_gcond);
    1294            2 :   g_mutex_unlock (&self->mqtt_sink_mutex);
    1295            2 : }
    1296              : 
    1297              : /**
    1298              :  * @brief A callback function corresponding to MQTTAsync_disconnectOptions's
    1299              :  *        onSuccess. Regardless of the MQTTAsync_disconnect function's result,
    1300              :  *        the pipeline should be stopped after this callback.
    1301              :  */
    1302              : static void
    1303            8 : cb_mqtt_on_disconnect (void *context, MQTTAsync_successData * response)
    1304              : {
    1305            8 :   GstMqttSink *self = (GstMqttSink *) context;
    1306              :   UNUSED (response);
    1307              : 
    1308            8 :   g_atomic_int_set (&self->mqtt_sink_state, MQTT_DISCONNECTED);
    1309            8 :   g_mutex_lock (&self->mqtt_sink_mutex);
    1310            8 :   g_cond_broadcast (&self->mqtt_sink_gcond);
    1311            8 :   g_mutex_unlock (&self->mqtt_sink_mutex);
    1312            8 : }
    1313              : 
    1314              : /**
    1315              :  * @brief A callback function corresponding to MQTTAsync_disconnectOptions's
    1316              :  *        onFailure. Regardless of the MQTTAsync_disconnect function's result,
    1317              :  *        the pipeline should be stopped after this callback.
    1318              :  */
    1319              : static void
    1320            0 : cb_mqtt_on_disconnect_failure (void *context, MQTTAsync_failureData * response)
    1321              : {
    1322            0 :   GstMqttSink *self = (GstMqttSink *) context;
    1323              :   UNUSED (response);
    1324              : 
    1325            0 :   g_atomic_int_set (&self->mqtt_sink_state, MQTT_DISCONNECT_FAILED);
    1326            0 :   g_mutex_lock (&self->mqtt_sink_mutex);
    1327            0 :   g_cond_broadcast (&self->mqtt_sink_gcond);
    1328            0 :   g_mutex_unlock (&self->mqtt_sink_mutex);
    1329            0 : }
    1330              : 
    1331              : /**
    1332              :  * @brief A callback function to be given to the MQTTAsync_setCallbacks function.
    1333              :  *        This callback is activated when `mqtt-qos` is higher then 0.
    1334              :  */
    1335              : static void
    1336            0 : cb_mqtt_on_delivery_complete (void *context, MQTTAsync_token token)
    1337              : {
    1338            0 :   GstMqttSink *self = (GstMqttSink *) context;
    1339              : 
    1340            0 :   GST_DEBUG_OBJECT (self,
    1341              :       "%s: the message with token(%d) has been delivered.", self->mqtt_topic,
    1342              :       token);
    1343            0 : }
    1344              : 
    1345              : /**
    1346              :  * @brief A callback function to be given to the MQTTAsync_setCallbacks function.
    1347              :  *        When the connection between this element and the broker is broken,
    1348              :  *        this callback will be invoked.
    1349              :  */
    1350              : static void
    1351            0 : cb_mqtt_on_connection_lost (void *context, char *cause)
    1352              : {
    1353            0 :   GstMqttSink *self = (GstMqttSink *) context;
    1354              :   UNUSED (cause);
    1355              : 
    1356            0 :   g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECTION_LOST);
    1357            0 :   g_mutex_lock (&self->mqtt_sink_mutex);
    1358            0 :   self->is_connected = FALSE;
    1359            0 :   g_cond_broadcast (&self->mqtt_sink_gcond);
    1360            0 :   g_mutex_unlock (&self->mqtt_sink_mutex);
    1361            0 : }
    1362              : 
    1363              : /**
    1364              :  * @brief A callback function to be given to the MQTTAsync_setCallbacks function.
    1365              :  *        In the case of the publisher, this callback is not used.
    1366              :  */
    1367              : static int
    1368            0 : cb_mqtt_on_message_arrived (void *context, char *topicName, int topicLen,
    1369              :     MQTTAsync_message * message)
    1370              : {
    1371              :   UNUSED (context);
    1372              :   UNUSED (topicName);
    1373              :   UNUSED (topicLen);
    1374              :   UNUSED (message);
    1375              :   /* nothing to do */
    1376            0 :   return 1;
    1377              : }
    1378              : 
    1379              : /**
    1380              :  * @brief A callback function corresponding to MQTTAsync_responseOptions's
    1381              :  *        onSuccess.
    1382              :  */
    1383              : static void
    1384           22 : cb_mqtt_on_send_success (void *context, MQTTAsync_successData * response)
    1385              : {
    1386           22 :   GstMqttSink *self = (GstMqttSink *) context;
    1387           22 :   mqtt_sink_state_t state = g_atomic_int_get (&self->mqtt_sink_state);
    1388              :   UNUSED (response);
    1389              : 
    1390           22 :   if (state == SINK_RENDER_STOPPED) {
    1391            0 :     g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_EOS);
    1392              : 
    1393            0 :     g_mutex_lock (&self->mqtt_sink_mutex);
    1394            0 :     g_cond_broadcast (&self->mqtt_sink_gcond);
    1395            0 :     g_mutex_unlock (&self->mqtt_sink_mutex);
    1396              :   }
    1397           22 : }
    1398              : 
    1399              : /**
    1400              :  * @brief A callback function corresponding to MQTTAsync_responseOptions's
    1401              :  *        onFailure.
    1402              :  */
    1403              : static void
    1404            1 : cb_mqtt_on_send_failure (void *context, MQTTAsync_failureData * response)
    1405              : {
    1406            1 :   GstMqttSink *self = (GstMqttSink *) context;
    1407            1 :   mqtt_sink_state_t state = g_atomic_int_get (&self->mqtt_sink_state);
    1408              :   UNUSED (response);
    1409              : 
    1410            1 :   if (state == SINK_RENDER_STOPPED) {
    1411            0 :     g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_ERROR);
    1412              : 
    1413            0 :     g_mutex_lock (&self->mqtt_sink_mutex);
    1414            0 :     g_cond_broadcast (&self->mqtt_sink_gcond);
    1415            0 :     g_mutex_unlock (&self->mqtt_sink_mutex);
    1416              :   }
    1417              : 
    1418            1 : }
        

Generated by: LCOV version 2.0-1