20 #define GST_CAT_DEFAULT gst_edgesink_debug
25 static GstStaticPadTemplate
sinktemplate = GST_STATIC_PAD_TEMPLATE (
"sink",
49 #define DEFAULT_MQTT_HOST "127.0.0.1"
50 #define DEFAULT_MQTT_PORT 1883
52 #define gst_edgesink_parent_class parent_class
56 guint prop_id,
const GValue * value, GParamSpec * pspec);
59 guint prop_id, GValue * value, GParamSpec * pspec);
78 const nns_edge_connect_type_e connect_type);
86 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
87 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
88 GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
94 g_object_class_install_property (gobject_class,
PROP_HOST,
95 g_param_spec_string (
"host",
"Host",
96 "A self host address to accept connection from edgesrc",
DEFAULT_HOST,
97 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
98 g_object_class_install_property (gobject_class,
PROP_PORT,
99 g_param_spec_uint (
"port",
"Port",
100 "A self port address to accept connection from edgesrc. "
101 "If the port is set to 0 then the available port is allocated. ",
102 0, 65535,
DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
104 g_param_spec_enum (
"connect-type",
"Connect Type",
105 "The connections type between edgesink and edgesrc.",
107 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
109 g_param_spec_string (
"dest-host",
"Destination Host",
111 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
113 g_param_spec_uint (
"dest-port",
"Destination Port",
114 "The destination port of the broker", 0,
116 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
117 g_object_class_install_property (gobject_class,
PROP_TOPIC,
118 g_param_spec_string (
"topic",
"Topic",
119 "The main topic of the host and option if necessary. "
120 "(topic)/(optional topic for main topic).",
"",
121 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
123 g_param_spec_boolean (
"wait-connection",
"Wait connection to edgesrc",
124 "Wait until edgesink is connected to edgesrc. "
125 "In case of false(default), the buffers entering the edgesink are dropped.",
126 FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
128 g_param_spec_uint64 (
"connection-timeout",
129 "Timeout for waiting a connection",
130 "The timeout (in milliseconds) for waiting a connection to receiver. "
131 "0 timeout (default) means infinite wait.", 0, G_MAXUINT64, 0,
132 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
134 g_param_spec_string (
"custom-lib",
"Custom connection lib path",
135 "User defined custom connection lib path.",
136 "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
138 gst_element_class_add_pad_template (gstelement_class,
141 gst_element_class_set_static_metadata (gstelement_class,
142 "EdgeSink",
"Sink/Edge",
143 "Publish incoming streams",
"Samsung Electronics Co., Ltd.");
166 self->wait_connection =
FALSE;
167 self->connection_timeout = 0;
168 self->custom_lib = NULL;
169 self->is_connected =
FALSE;
170 g_mutex_init (&self->lock);
171 g_cond_init (&self->cond);
179 const GValue * value, GParamSpec * pspec)
191 if (!g_value_get_string (value)) {
192 nns_logw (
"dest host property cannot be NULL");
196 self->dest_host = g_value_dup_string (value);
199 self->dest_port = g_value_get_uint (value);
205 if (!g_value_get_string (value)) {
206 nns_logw (
"topic property cannot be NULL. Query-hybrid is disabled.");
210 self->topic = g_value_dup_string (value);
213 self->wait_connection = g_value_get_boolean (value);
216 self->connection_timeout = g_value_get_uint64 (value);
219 g_free (self->custom_lib);
220 self->custom_lib = g_value_dup_string (value);
223 G_OBJECT_WARN_INVALID_PROPERTY_ID (
object, prop_id, pspec);
248 g_value_set_uint (value, self->dest_port);
257 g_value_set_boolean (value, self->wait_connection);
260 g_value_set_uint64 (value, self->connection_timeout);
266 G_OBJECT_WARN_INVALID_PROPERTY_ID (
object, prop_id, pspec);
283 self->dest_host = NULL;
288 g_free (self->custom_lib);
289 self->custom_lib = NULL;
290 g_mutex_clear (&self->lock);
291 g_cond_clear (&self->cond);
294 nns_edge_release_handle (self->edge_h);
298 G_OBJECT_CLASS (parent_class)->finalize (
object);
308 nns_edge_event_e event_type;
309 int ret = NNS_EDGE_ERROR_NONE;
312 ret = nns_edge_event_get_type (event_h, &event_type);
313 if (NNS_EDGE_ERROR_NONE != ret) {
314 nns_loge (
"Failed to get event type!");
318 switch (event_type) {
319 case NNS_EDGE_EVENT_CONNECTION_COMPLETED:
321 g_mutex_lock (&self->lock);
322 self->is_connected =
TRUE;
323 g_cond_broadcast (&self->cond);
324 g_mutex_unlock (&self->lock);
345 if (NNS_EDGE_CONNECT_TYPE_CUSTOM != self->connect_type) {
346 ret = nns_edge_create_handle (NULL, self->connect_type,
347 NNS_EDGE_NODE_TYPE_PUB, &self->edge_h);
349 if (!self->custom_lib) {
350 nns_loge (
"Failed to start edgesink. Custom library is not set.");
353 ret = nns_edge_custom_create_handle (NULL, self->custom_lib,
354 NNS_EDGE_NODE_TYPE_PUB, &self->edge_h);
357 if (NNS_EDGE_ERROR_NONE != ret) {
358 nns_loge (
"Failed to get nnstreamer edge handle.");
361 nns_edge_release_handle (self->edge_h);
369 nns_edge_set_info (self->edge_h,
"HOST", self->host);
370 if (self->port > 0) {
371 port = g_strdup_printf (
"%u", self->port);
372 nns_edge_set_info (self->edge_h,
"PORT", port);
376 nns_edge_set_info (self->edge_h,
"DEST_HOST", self->dest_host);
377 if (self->dest_port > 0) {
378 port = g_strdup_printf (
"%u", self->dest_port);
379 nns_edge_set_info (self->edge_h,
"DEST_PORT", port);
383 nns_edge_set_info (self->edge_h,
"TOPIC", self->topic);
387 if (0 != nns_edge_start (self->edge_h)) {
389 (
"Failed to start NNStreamer-edge. Please check server IP and port");
409 end_time = G_MAXINT64;
411 end_time = g_get_monotonic_time ()
415 g_mutex_lock (&sink->
lock);
417 if (!g_cond_wait_until (&sink->
cond, &sink->
lock, end_time)) {
418 nns_loge (
"Failed to wait connection.");
423 g_mutex_unlock (&sink->
lock);
437 ret = nns_edge_stop (self->edge_h);
438 if (NNS_EDGE_ERROR_NONE != ret) {
439 nns_loge (
"Failed to stop edge. error code(%d)", ret);
454 GstStructure *structure;
456 nns_edge_data_h data_h;
463 nns_loge (
"Failed to send buffer.");
464 return GST_FLOW_ERROR;
467 ret = nns_edge_data_create (&data_h);
468 if (ret != NNS_EDGE_ERROR_NONE) {
469 nns_loge (
"Failed to create data handle in edgesink");
470 return GST_FLOW_ERROR;
473 caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (basesink));
474 structure = gst_caps_get_structure (caps, 0);
476 gst_caps_unref (caps);
481 num_mems = gst_buffer_n_memory (buffer);
483 for (i = 0; i < num_mems; i++) {
487 mem[i] = gst_buffer_get_memory (buffer, i);
489 if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
490 nns_loge (
"Cannot map the %uth memory in gst-buffer.", i);
491 gst_memory_unref (mem[i]);
496 ret = nns_edge_data_add (data_h, map[i].
data, map[i].size, NULL);
497 if (ret != NNS_EDGE_ERROR_NONE) {
498 nns_loge (
"Failed to append %u-th memory into edge data.", i);
504 ret = nns_edge_send (self->edge_h, data_h);
505 if (ret != NNS_EDGE_ERROR_NONE)
506 nns_loge (
"Failed to send edge data, connection lost or internal error.");
510 nns_edge_data_destroy (data_h);
512 for (i = 0; i < num_mems; i++) {
513 gst_memory_unmap (mem[i], &map[i]);
514 gst_memory_unref (mem[i]);
527 gchar *caps_str, *prev_caps_str, *new_caps_str;
530 caps_str = gst_caps_to_string (caps);
532 nns_edge_get_info (sink->
edge_h,
"CAPS", &prev_caps_str);
533 if (!prev_caps_str) {
534 prev_caps_str = g_strdup (
"");
537 g_strdup_printf (
"%s@edge_sink_caps@%s", prev_caps_str, caps_str);
538 set_rst = nns_edge_set_info (sink->
edge_h,
"CAPS", new_caps_str);
544 return set_rst == NNS_EDGE_ERROR_NONE;
564 self->host = g_strdup (host);
588 static nns_edge_connect_type_e
591 return self->connect_type;
599 const nns_edge_connect_type_e connect_type)
601 self->connect_type = connect_type;