Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1-only */
2 : /**
3 : * Copyright (C) 2022 Samsung Electronics Co., Ltd.
4 : *
5 : * @file edge_sink.c
6 : * @date 01 Aug 2022
7 : * @brief Publish incoming streams
8 : * @author Yechan Choi <yechan9.choi@samsung.com>
9 : * @see http://github.com/nnstreamer/nnstreamer
10 : * @bug No known bugs
11 : *
12 : */
13 : #ifdef HAVE_CONFIG_H
14 : #include <config.h>
15 : #endif
16 :
17 : #include "edge_sink.h"
18 :
19 : GST_DEBUG_CATEGORY_STATIC (gst_edgesink_debug);
20 : #define GST_CAT_DEFAULT gst_edgesink_debug
21 :
22 : /**
23 : * @brief the capabilities of the inputs.
24 : */
25 : static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
26 : GST_PAD_SINK,
27 : GST_PAD_ALWAYS,
28 : GST_STATIC_CAPS_ANY);
29 :
30 : /**
31 : * @brief edgesink properties
32 : */
33 : enum
34 : {
35 : PROP_0,
36 :
37 : PROP_HOST,
38 : PROP_PORT,
39 : PROP_DEST_HOST,
40 : PROP_DEST_PORT,
41 : PROP_CONNECT_TYPE,
42 : PROP_TOPIC,
43 : PROP_WAIT_CONNECTION,
44 : PROP_CONNECTION_TIMEOUT,
45 : PROP_CUSTOM_LIB,
46 :
47 : PROP_LAST
48 : };
49 : #define DEFAULT_MQTT_HOST "127.0.0.1"
50 : #define DEFAULT_MQTT_PORT 1883
51 :
52 : #define gst_edgesink_parent_class parent_class
53 145 : G_DEFINE_TYPE (GstEdgeSink, gst_edgesink, GST_TYPE_BASE_SINK);
54 :
55 : static void gst_edgesink_set_property (GObject * object,
56 : guint prop_id, const GValue * value, GParamSpec * pspec);
57 :
58 : static void gst_edgesink_get_property (GObject * object,
59 : guint prop_id, GValue * value, GParamSpec * pspec);
60 :
61 : static void gst_edgesink_finalize (GObject * object);
62 :
63 : static gboolean gst_edgesink_start (GstBaseSink * basesink);
64 : static gboolean gst_edgesink_stop (GstBaseSink * basesink);
65 : static GstFlowReturn gst_edgesink_render (GstBaseSink * basesink,
66 : GstBuffer * buffer);
67 : static gboolean gst_edgesink_set_caps (GstBaseSink * basesink, GstCaps * caps);
68 :
69 : static gchar *gst_edgesink_get_host (GstEdgeSink * self);
70 : static void gst_edgesink_set_host (GstEdgeSink * self, const gchar * host);
71 :
72 : static guint16 gst_edgesink_get_port (GstEdgeSink * self);
73 : static void gst_edgesink_set_port (GstEdgeSink * self, const guint16 port);
74 :
75 : static nns_edge_connect_type_e gst_edgesink_get_connect_type (GstEdgeSink *
76 : self);
77 : static void gst_edgesink_set_connect_type (GstEdgeSink * self,
78 : const nns_edge_connect_type_e connect_type);
79 :
80 : /**
81 : * @brief initialize the class
82 : */
83 : static void
84 22 : gst_edgesink_class_init (GstEdgeSinkClass * klass)
85 : {
86 22 : GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
87 22 : GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
88 22 : GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
89 :
90 22 : gobject_class->set_property = gst_edgesink_set_property;
91 22 : gobject_class->get_property = gst_edgesink_get_property;
92 22 : gobject_class->finalize = gst_edgesink_finalize;
93 :
94 22 : g_object_class_install_property (gobject_class, PROP_HOST,
95 : g_param_spec_string ("host", "Host",
96 : "A self host address to accept connection from edgesrc", DEFAULT_HOST,
97 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
98 22 : g_object_class_install_property (gobject_class, PROP_PORT,
99 : g_param_spec_uint ("port", "Port",
100 : "A self port address to accept connection from edgesrc. "
101 : "If the port is set to 0 then the available port is allocated. ",
102 : 0, 65535, DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
103 22 : g_object_class_install_property (gobject_class, PROP_CONNECT_TYPE,
104 : g_param_spec_enum ("connect-type", "Connect Type",
105 : "The connections type between edgesink and edgesrc.",
106 : GST_TYPE_EDGE_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
107 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
108 22 : g_object_class_install_property (gobject_class, PROP_DEST_HOST,
109 : g_param_spec_string ("dest-host", "Destination Host",
110 : "The destination hostname of the broker", DEFAULT_MQTT_HOST,
111 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
112 22 : g_object_class_install_property (gobject_class, PROP_DEST_PORT,
113 : g_param_spec_uint ("dest-port", "Destination Port",
114 : "The destination port of the broker", 0,
115 : 65535, DEFAULT_MQTT_PORT,
116 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
117 22 : g_object_class_install_property (gobject_class, PROP_TOPIC,
118 : g_param_spec_string ("topic", "Topic",
119 : "The main topic of the host and option if necessary. "
120 : "(topic)/(optional topic for main topic).", "",
121 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
122 22 : g_object_class_install_property (gobject_class, PROP_WAIT_CONNECTION,
123 : g_param_spec_boolean ("wait-connection", "Wait connection to edgesrc",
124 : "Wait until edgesink is connected to edgesrc. "
125 : "In case of false(default), the buffers entering the edgesink are dropped.",
126 : FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
127 22 : g_object_class_install_property (gobject_class, PROP_CONNECTION_TIMEOUT,
128 : g_param_spec_uint64 ("connection-timeout",
129 : "Timeout for waiting a connection",
130 : "The timeout (in milliseconds) for waiting a connection to receiver. "
131 : "0 timeout (default) means infinite wait.", 0, G_MAXUINT64, 0,
132 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
133 22 : g_object_class_install_property (gobject_class, PROP_CUSTOM_LIB,
134 : g_param_spec_string ("custom-lib", "Custom connection lib path",
135 : "User defined custom connection lib path.",
136 : "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
137 :
138 22 : gst_element_class_add_pad_template (gstelement_class,
139 : gst_static_pad_template_get (&sinktemplate));
140 :
141 22 : gst_element_class_set_static_metadata (gstelement_class,
142 : "EdgeSink", "Sink/Edge",
143 : "Publish incoming streams", "Samsung Electronics Co., Ltd.");
144 :
145 22 : gstbasesink_class->start = gst_edgesink_start;
146 22 : gstbasesink_class->stop = gst_edgesink_stop;
147 22 : gstbasesink_class->render = gst_edgesink_render;
148 22 : gstbasesink_class->set_caps = gst_edgesink_set_caps;
149 :
150 22 : GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT,
151 : GST_EDGE_ELEM_NAME_SINK, 0, "Edge sink");
152 22 : }
153 :
154 : /**
155 : * @brief initialize the new element
156 : */
157 : static void
158 7 : gst_edgesink_init (GstEdgeSink * self)
159 : {
160 7 : self->host = g_strdup (DEFAULT_HOST);
161 7 : self->port = DEFAULT_PORT;
162 7 : self->dest_host = g_strdup (DEFAULT_HOST);
163 7 : self->dest_port = DEFAULT_PORT;
164 7 : self->topic = NULL;
165 7 : self->connect_type = DEFAULT_CONNECT_TYPE;
166 7 : self->wait_connection = FALSE;
167 7 : self->connection_timeout = 0;
168 7 : self->custom_lib = NULL;
169 7 : }
170 :
171 : /**
172 : * @brief set property
173 : */
174 : static void
175 18 : gst_edgesink_set_property (GObject * object, guint prop_id,
176 : const GValue * value, GParamSpec * pspec)
177 : {
178 18 : GstEdgeSink *self = GST_EDGESINK (object);
179 :
180 18 : switch (prop_id) {
181 2 : case PROP_HOST:
182 2 : gst_edgesink_set_host (self, g_value_get_string (value));
183 2 : break;
184 7 : case PROP_PORT:
185 7 : gst_edgesink_set_port (self, g_value_get_uint (value));
186 7 : break;
187 1 : case PROP_DEST_HOST:
188 1 : if (!g_value_get_string (value)) {
189 0 : nns_logw ("dest host property cannot be NULL");
190 0 : break;
191 : }
192 1 : g_free (self->dest_host);
193 1 : self->dest_host = g_value_dup_string (value);
194 1 : break;
195 1 : case PROP_DEST_PORT:
196 1 : self->dest_port = g_value_get_uint (value);
197 1 : break;
198 4 : case PROP_CONNECT_TYPE:
199 4 : gst_edgesink_set_connect_type (self, g_value_get_enum (value));
200 4 : break;
201 1 : case PROP_TOPIC:
202 1 : if (!g_value_get_string (value)) {
203 0 : nns_logw ("topic property cannot be NULL. Query-hybrid is disabled.");
204 0 : break;
205 : }
206 1 : g_free (self->topic);
207 1 : self->topic = g_value_dup_string (value);
208 1 : break;
209 0 : case PROP_WAIT_CONNECTION:
210 0 : self->wait_connection = g_value_get_boolean (value);
211 0 : break;
212 0 : case PROP_CONNECTION_TIMEOUT:
213 0 : self->connection_timeout = g_value_get_uint64 (value);
214 0 : break;
215 2 : case PROP_CUSTOM_LIB:
216 2 : g_free (self->custom_lib);
217 2 : self->custom_lib = g_value_dup_string (value);
218 2 : break;
219 0 : default:
220 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
221 0 : break;
222 : }
223 18 : }
224 :
225 : /**
226 : * @brief get property
227 : */
228 : static void
229 16 : gst_edgesink_get_property (GObject * object, guint prop_id, GValue * value,
230 : GParamSpec * pspec)
231 : {
232 16 : GstEdgeSink *self = GST_EDGESINK (object);
233 :
234 16 : switch (prop_id) {
235 2 : case PROP_HOST:
236 2 : g_value_set_string (value, gst_edgesink_get_host (self));
237 2 : break;
238 3 : case PROP_PORT:
239 3 : g_value_set_uint (value, gst_edgesink_get_port (self));
240 3 : break;
241 2 : case PROP_DEST_HOST:
242 2 : g_value_set_string (value, self->dest_host);
243 2 : break;
244 2 : case PROP_DEST_PORT:
245 2 : g_value_set_uint (value, self->dest_port);
246 2 : break;
247 2 : case PROP_CONNECT_TYPE:
248 2 : g_value_set_enum (value, gst_edgesink_get_connect_type (self));
249 2 : break;
250 2 : case PROP_TOPIC:
251 2 : g_value_set_string (value, self->topic);
252 2 : break;
253 1 : case PROP_WAIT_CONNECTION:
254 1 : g_value_set_boolean (value, self->wait_connection);
255 1 : break;
256 1 : case PROP_CONNECTION_TIMEOUT:
257 1 : g_value_set_uint64 (value, self->connection_timeout);
258 1 : break;
259 1 : case PROP_CUSTOM_LIB:
260 1 : g_value_set_string (value, self->custom_lib);
261 1 : break;
262 0 : default:
263 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
264 0 : break;
265 : }
266 16 : }
267 :
268 : /**
269 : * @brief finalize the object
270 : */
271 : static void
272 6 : gst_edgesink_finalize (GObject * object)
273 : {
274 6 : GstEdgeSink *self = GST_EDGESINK (object);
275 :
276 6 : g_free (self->host);
277 6 : self->host = NULL;
278 :
279 6 : g_free (self->dest_host);
280 6 : self->dest_host = NULL;
281 :
282 6 : g_free (self->topic);
283 6 : self->topic = NULL;
284 :
285 6 : g_free (self->custom_lib);
286 6 : self->custom_lib = NULL;
287 :
288 6 : if (self->edge_h) {
289 2 : nns_edge_release_handle (self->edge_h);
290 2 : self->edge_h = NULL;
291 : }
292 :
293 6 : G_OBJECT_CLASS (parent_class)->finalize (object);
294 6 : }
295 :
296 : /**
297 : * @brief start processing of edgesink
298 : */
299 : static gboolean
300 5 : gst_edgesink_start (GstBaseSink * basesink)
301 : {
302 5 : GstEdgeSink *self = GST_EDGESINK (basesink);
303 :
304 : int ret;
305 5 : char *port = NULL;
306 :
307 5 : if (NNS_EDGE_CONNECT_TYPE_CUSTOM != self->connect_type) {
308 2 : ret = nns_edge_create_handle (NULL, self->connect_type,
309 : NNS_EDGE_NODE_TYPE_PUB, &self->edge_h);
310 : } else {
311 3 : if (!self->custom_lib) {
312 1 : nns_loge ("Failed to start edgesink. Custom library is not set.");
313 1 : return FALSE;
314 : }
315 2 : ret = nns_edge_custom_create_handle (NULL, self->custom_lib,
316 : NNS_EDGE_NODE_TYPE_PUB, &self->edge_h);
317 : }
318 :
319 4 : if (NNS_EDGE_ERROR_NONE != ret) {
320 1 : nns_loge ("Failed to get nnstreamer edge handle.");
321 :
322 1 : if (self->edge_h) {
323 1 : nns_edge_release_handle (self->edge_h);
324 1 : self->edge_h = NULL;
325 : }
326 :
327 1 : return FALSE;
328 : }
329 :
330 3 : if (self->host)
331 3 : nns_edge_set_info (self->edge_h, "HOST", self->host);
332 3 : if (self->port > 0) {
333 1 : port = g_strdup_printf ("%u", self->port);
334 1 : nns_edge_set_info (self->edge_h, "PORT", port);
335 1 : g_free (port);
336 : }
337 3 : if (self->dest_host)
338 3 : nns_edge_set_info (self->edge_h, "DEST_HOST", self->dest_host);
339 3 : if (self->dest_port > 0) {
340 3 : port = g_strdup_printf ("%u", self->dest_port);
341 3 : nns_edge_set_info (self->edge_h, "DEST_PORT", port);
342 3 : g_free (port);
343 : }
344 3 : if (self->topic)
345 0 : nns_edge_set_info (self->edge_h, "TOPIC", self->topic);
346 :
347 3 : if (0 != nns_edge_start (self->edge_h)) {
348 1 : nns_loge
349 : ("Failed to start NNStreamer-edge. Please check server IP and port");
350 1 : return FALSE;
351 : }
352 :
353 2 : if (self->wait_connection) {
354 0 : guint64 remaining = self->connection_timeout;
355 0 : if (0 == remaining)
356 0 : remaining = G_MAXUINT64;
357 :
358 0 : while (remaining >= 10 &&
359 0 : NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) {
360 0 : if (!self->wait_connection) {
361 0 : nns_logi
362 : ("Waiting for connection to edgesrc was canceled by the user.");
363 0 : return FALSE;
364 : }
365 0 : g_usleep (10000);
366 0 : remaining -= 10;
367 : }
368 :
369 0 : if (NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) {
370 0 : nns_loge ("Failed to connect to edgesrc within timeout: %ju ms",
371 : self->connection_timeout);
372 0 : return FALSE;
373 : }
374 : }
375 :
376 2 : return TRUE;
377 : }
378 :
379 : /**
380 : * @brief Stop processing of edgesink
381 : */
382 : static gboolean
383 1 : gst_edgesink_stop (GstBaseSink * basesink)
384 : {
385 1 : GstEdgeSink *self = GST_EDGESINK (basesink);
386 : int ret;
387 :
388 1 : ret = nns_edge_stop (self->edge_h);
389 1 : if (NNS_EDGE_ERROR_NONE != ret) {
390 0 : nns_loge ("Failed to stop edge. error code(%d)", ret);
391 0 : return FALSE;
392 : }
393 :
394 1 : return TRUE;
395 : }
396 :
397 : /**
398 : * @brief render buffer, send buffer
399 : */
400 : static GstFlowReturn
401 13 : gst_edgesink_render (GstBaseSink * basesink, GstBuffer * buffer)
402 : {
403 13 : GstEdgeSink *self = GST_EDGESINK (basesink);
404 : GstCaps *caps;
405 : GstStructure *structure;
406 : gboolean is_tensor;
407 : nns_edge_data_h data_h;
408 : guint i, num_mems;
409 : int ret;
410 : GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
411 : GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
412 :
413 13 : ret = nns_edge_data_create (&data_h);
414 13 : if (ret != NNS_EDGE_ERROR_NONE) {
415 0 : nns_loge ("Failed to create data handle in edgesink");
416 13 : return GST_FLOW_ERROR;
417 : }
418 :
419 13 : caps = gst_pad_get_current_caps (GST_BASE_SINK_PAD (basesink));
420 13 : structure = gst_caps_get_structure (caps, 0);
421 13 : is_tensor = gst_structure_is_tensor_stream (structure);
422 13 : gst_caps_unref (caps);
423 :
424 13 : if (is_tensor)
425 13 : num_mems = gst_tensor_buffer_get_count (buffer);
426 : else
427 0 : num_mems = gst_buffer_n_memory (buffer);
428 :
429 26 : for (i = 0; i < num_mems; i++) {
430 13 : if (is_tensor)
431 13 : mem[i] = gst_tensor_buffer_get_nth_memory (buffer, i);
432 : else
433 0 : mem[i] = gst_buffer_get_memory (buffer, i);
434 :
435 13 : if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
436 0 : nns_loge ("Cannot map the %uth memory in gst-buffer.", i);
437 0 : gst_memory_unref (mem[i]);
438 0 : num_mems = i;
439 0 : goto done;
440 : }
441 :
442 13 : ret = nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
443 13 : if (ret != NNS_EDGE_ERROR_NONE) {
444 0 : nns_loge ("Failed to append %u-th memory into edge data.", i);
445 0 : num_mems = i + 1;
446 0 : goto done;
447 : }
448 : }
449 :
450 13 : ret = nns_edge_send (self->edge_h, data_h);
451 13 : if (ret != NNS_EDGE_ERROR_NONE)
452 12 : nns_loge ("Failed to send edge data, connection lost or internal error.");
453 :
454 1 : done:
455 13 : if (data_h)
456 13 : nns_edge_data_destroy (data_h);
457 :
458 26 : for (i = 0; i < num_mems; i++) {
459 13 : gst_memory_unmap (mem[i], &map[i]);
460 13 : gst_memory_unref (mem[i]);
461 : }
462 :
463 13 : return GST_FLOW_OK;
464 : }
465 :
466 : /**
467 : * @brief An implementation of the set_caps vmethod in GstBaseSinkClass
468 : */
469 : static gboolean
470 2 : gst_edgesink_set_caps (GstBaseSink * basesink, GstCaps * caps)
471 : {
472 2 : GstEdgeSink *sink = GST_EDGESINK (basesink);
473 : gchar *caps_str, *prev_caps_str, *new_caps_str;
474 : int set_rst;
475 :
476 2 : caps_str = gst_caps_to_string (caps);
477 :
478 2 : nns_edge_get_info (sink->edge_h, "CAPS", &prev_caps_str);
479 2 : if (!prev_caps_str) {
480 0 : prev_caps_str = g_strdup ("");
481 : }
482 : new_caps_str =
483 2 : g_strdup_printf ("%s@edge_sink_caps@%s", prev_caps_str, caps_str);
484 2 : set_rst = nns_edge_set_info (sink->edge_h, "CAPS", new_caps_str);
485 :
486 2 : g_free (prev_caps_str);
487 2 : g_free (new_caps_str);
488 2 : g_free (caps_str);
489 :
490 2 : return set_rst == NNS_EDGE_ERROR_NONE;
491 : }
492 :
493 : /**
494 : * @brief getter for the 'host' property.
495 : */
496 : static gchar *
497 2 : gst_edgesink_get_host (GstEdgeSink * self)
498 : {
499 2 : return self->host;
500 : }
501 :
502 : /**
503 : * @brief setter for the 'host' property.
504 : */
505 : static void
506 2 : gst_edgesink_set_host (GstEdgeSink * self, const gchar * host)
507 : {
508 2 : if (self->host)
509 2 : g_free (self->host);
510 2 : self->host = g_strdup (host);
511 2 : }
512 :
513 : /**
514 : * @brief getter for the 'port' property.
515 : */
516 : static guint16
517 3 : gst_edgesink_get_port (GstEdgeSink * self)
518 : {
519 3 : return self->port;
520 : }
521 :
522 : /**
523 : * @brief setter for the 'port' property.
524 : */
525 : static void
526 7 : gst_edgesink_set_port (GstEdgeSink * self, const guint16 port)
527 : {
528 7 : self->port = port;
529 7 : }
530 :
531 : /**
532 : * @brief getter for the 'connect_type' property.
533 : */
534 : static nns_edge_connect_type_e
535 2 : gst_edgesink_get_connect_type (GstEdgeSink * self)
536 : {
537 2 : return self->connect_type;
538 : }
539 :
540 : /**
541 : * @brief setter for the 'connect_type' property.
542 : */
543 : static void
544 4 : gst_edgesink_set_connect_type (GstEdgeSink * self,
545 : const nns_edge_connect_type_e connect_type)
546 : {
547 4 : self->connect_type = connect_type;
548 4 : }
|