Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1-only */
2 : /**
3 : * Copyright (C) 2022 Samsung Electronics Co., Ltd.
4 : *
5 : * @file edge_src.c
6 : * @date 02 Aug 2022
7 : * @brief Subscribe and push incoming data to the GStreamer pipeline
8 : * @author Yechan Choi <yechan9.choi@samsung.com>
9 : * @see http://github.com/nnstreamer/nnstreamer
10 : * @bug No known bugs
11 : *
12 : */
13 : #ifdef HAVE_CONFIG_H
14 : #include <config.h>
15 : #endif
16 :
17 : #include "edge_src.h"
18 :
19 : GST_DEBUG_CATEGORY_STATIC (gst_edgesrc_debug);
20 : #define GST_CAT_DEFAULT gst_edgesrc_debug
21 :
22 : /**
23 : * @brief the capabilities of the outputs
24 : */
25 : static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
26 : GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
27 :
28 : /**
29 : * @brief edgesrc properties
30 : */
31 : enum
32 : {
33 : PROP_0,
34 : PROP_HOST,
35 : PROP_PORT,
36 : PROP_DEST_HOST,
37 : PROP_DEST_PORT,
38 : PROP_CONNECT_TYPE,
39 : PROP_TOPIC,
40 : PROP_CUSTOM_LIB,
41 :
42 : PROP_LAST
43 : };
44 :
45 : #define gst_edgesrc_parent_class parent_class
46 477 : G_DEFINE_TYPE (GstEdgeSrc, gst_edgesrc, GST_TYPE_BASE_SRC);
47 :
48 : static void gst_edgesrc_set_property (GObject * object, guint prop_id,
49 : const GValue * value, GParamSpec * pspec);
50 : static void gst_edgesrc_get_property (GObject * object, guint prop_id,
51 : GValue * value, GParamSpec * pspec);
52 : static void gst_edgesrc_class_finalize (GObject * object);
53 :
54 : static gboolean gst_edgesrc_start (GstBaseSrc * basesrc);
55 : static gboolean gst_edgesrc_stop (GstBaseSrc * basesrc);
56 : static GstFlowReturn gst_edgesrc_create (GstBaseSrc * basesrc, guint64 offset,
57 : guint size, GstBuffer ** out_buf);
58 :
59 : static gchar *gst_edgesrc_get_dest_host (GstEdgeSrc * self);
60 : static void gst_edgesrc_set_dest_host (GstEdgeSrc * self,
61 : const gchar * dest_host);
62 :
63 : static guint16 gst_edgesrc_get_dest_port (GstEdgeSrc * self);
64 : static void gst_edgesrc_set_dest_port (GstEdgeSrc * self,
65 : const guint16 dest_port);
66 :
67 : static nns_edge_connect_type_e gst_edgesrc_get_connect_type (GstEdgeSrc * self);
68 : static void gst_edgesrc_set_connect_type (GstEdgeSrc * self,
69 : const nns_edge_connect_type_e connect_type);
70 : static GstStateChangeReturn gst_edgesrc_change_state (GstElement * element,
71 : GstStateChange transition);
72 :
73 : /**
74 : * @brief initialize the class
75 : */
76 : static void
77 30 : gst_edgesrc_class_init (GstEdgeSrcClass * klass)
78 : {
79 30 : GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
80 30 : GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
81 30 : GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
82 :
83 30 : gobject_class->set_property = gst_edgesrc_set_property;
84 30 : gobject_class->get_property = gst_edgesrc_get_property;
85 30 : gobject_class->finalize = gst_edgesrc_class_finalize;
86 :
87 30 : g_object_class_install_property (gobject_class, PROP_HOST,
88 : g_param_spec_string ("host", "Host",
89 : "A self host address (DEPRECATED, has no effect).", DEFAULT_HOST,
90 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
91 30 : g_object_class_install_property (gobject_class, PROP_PORT,
92 : g_param_spec_uint ("port", "Port",
93 : "A self port number (DEPRECATED, has no effect).",
94 : 0, 65535, DEFAULT_PORT,
95 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
96 30 : g_object_class_install_property (gobject_class, PROP_DEST_HOST,
97 : g_param_spec_string ("dest-host", "Destination Host",
98 : "A host address of edgesink to receive the packets from edgesink",
99 : DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
100 30 : g_object_class_install_property (gobject_class, PROP_DEST_PORT,
101 : g_param_spec_uint ("dest-port", "Destination Port",
102 : "A port of edgesink to receive the packets from edgesink", 0, 65535,
103 : DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
104 30 : g_object_class_install_property (gobject_class, PROP_CONNECT_TYPE,
105 : g_param_spec_enum ("connect-type", "Connect Type",
106 : "The connections type between edgesink and edgesrc.",
107 : GST_TYPE_EDGE_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
108 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
109 30 : g_object_class_install_property (gobject_class, PROP_TOPIC,
110 : g_param_spec_string ("topic", "Topic",
111 : "The main topic of the host and option if necessary. "
112 : "(topic)/(optional topic for main topic).", "",
113 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
114 30 : g_object_class_install_property (gobject_class, PROP_CUSTOM_LIB,
115 : g_param_spec_string ("custom-lib", "Custom connection lib path",
116 : "User defined custom connection lib path.",
117 : "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
118 :
119 30 : gst_element_class_add_pad_template (gstelement_class,
120 : gst_static_pad_template_get (&srctemplate));
121 :
122 30 : gst_element_class_set_static_metadata (gstelement_class,
123 : "EdgeSrc", "Source/Edge",
124 : "Subscribe and push incoming streams", "Samsung Electronics Co., Ltd.");
125 :
126 30 : gstbasesrc_class->start = gst_edgesrc_start;
127 30 : gstbasesrc_class->stop = gst_edgesrc_stop;
128 30 : gstbasesrc_class->create = gst_edgesrc_create;
129 30 : gstelement_class->change_state = gst_edgesrc_change_state;
130 :
131 30 : GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT,
132 : GST_EDGE_ELEM_NAME_SRC, 0, "Edge src");
133 30 : }
134 :
135 : /**
136 : * @brief initialize edgesrc element
137 : */
138 : static void
139 15 : gst_edgesrc_init (GstEdgeSrc * self)
140 : {
141 15 : GstBaseSrc *basesrc = GST_BASE_SRC (self);
142 :
143 15 : gst_base_src_set_format (basesrc, GST_FORMAT_TIME);
144 15 : gst_base_src_set_async (basesrc, FALSE);
145 :
146 15 : self->dest_host = g_strdup (DEFAULT_HOST);
147 15 : self->dest_port = DEFAULT_PORT;
148 15 : self->topic = NULL;
149 15 : self->msg_queue = g_async_queue_new ();
150 15 : self->connect_type = DEFAULT_CONNECT_TYPE;
151 15 : self->playing = FALSE;
152 15 : self->custom_lib = NULL;
153 15 : }
154 :
155 : /**
156 : * @brief set property
157 : */
158 : static void
159 33 : gst_edgesrc_set_property (GObject * object, guint prop_id, const GValue * value,
160 : GParamSpec * pspec)
161 : {
162 33 : GstEdgeSrc *self = GST_EDGESRC (object);
163 :
164 33 : switch (prop_id) {
165 1 : case PROP_HOST:
166 1 : nns_logw ("host property is deprecated");
167 1 : break;
168 4 : case PROP_PORT:
169 4 : nns_logw ("port property is deprecated");
170 4 : break;
171 4 : case PROP_DEST_HOST:
172 4 : gst_edgesrc_set_dest_host (self, g_value_get_string (value));
173 4 : break;
174 11 : case PROP_DEST_PORT:
175 11 : gst_edgesrc_set_dest_port (self, g_value_get_uint (value));
176 11 : break;
177 7 : case PROP_CONNECT_TYPE:
178 7 : gst_edgesrc_set_connect_type (self, g_value_get_enum (value));
179 7 : break;
180 4 : case PROP_TOPIC:
181 4 : if (!g_value_get_string (value)) {
182 0 : nns_logw ("topic property cannot be NULL. Query-hybrid is disabled.");
183 0 : break;
184 : }
185 4 : g_free (self->topic);
186 4 : self->topic = g_value_dup_string (value);
187 4 : break;
188 2 : case PROP_CUSTOM_LIB:
189 2 : g_free (self->custom_lib);
190 2 : self->custom_lib = g_value_dup_string (value);
191 2 : break;
192 0 : default:
193 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
194 0 : break;
195 : }
196 33 : }
197 :
198 : /**
199 : * @brief get property
200 : */
201 : static void
202 4 : gst_edgesrc_get_property (GObject * object, guint prop_id, GValue * value,
203 : GParamSpec * pspec)
204 : {
205 4 : GstEdgeSrc *self = GST_EDGESRC (object);
206 :
207 4 : switch (prop_id) {
208 0 : case PROP_HOST:
209 0 : nns_logw ("host property is deprecated");
210 0 : break;
211 0 : case PROP_PORT:
212 0 : nns_logw ("port property is deprecated");
213 0 : break;
214 1 : case PROP_DEST_HOST:
215 1 : g_value_set_string (value, gst_edgesrc_get_dest_host (self));
216 1 : break;
217 1 : case PROP_DEST_PORT:
218 1 : g_value_set_uint (value, gst_edgesrc_get_dest_port (self));
219 1 : break;
220 1 : case PROP_CONNECT_TYPE:
221 1 : g_value_set_enum (value, gst_edgesrc_get_connect_type (self));
222 1 : break;
223 1 : case PROP_TOPIC:
224 1 : g_value_set_string (value, self->topic);
225 1 : break;
226 0 : case PROP_CUSTOM_LIB:
227 0 : g_value_set_string (value, self->custom_lib);
228 0 : break;
229 0 : default:
230 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
231 0 : break;
232 : }
233 4 : }
234 :
235 : /**
236 : * @brief finalize the object
237 : */
238 : static void
239 11 : gst_edgesrc_class_finalize (GObject * object)
240 : {
241 11 : GstEdgeSrc *self = GST_EDGESRC (object);
242 : nns_edge_data_h data_h;
243 :
244 11 : self->playing = FALSE;
245 11 : g_free (self->dest_host);
246 11 : self->dest_host = NULL;
247 :
248 11 : g_free (self->topic);
249 11 : self->topic = NULL;
250 :
251 11 : g_free (self->custom_lib);
252 11 : self->custom_lib = NULL;
253 :
254 11 : if (self->msg_queue) {
255 11 : while ((data_h = g_async_queue_try_pop (self->msg_queue))) {
256 0 : nns_edge_data_destroy (data_h);
257 : }
258 11 : g_async_queue_unref (self->msg_queue);
259 11 : self->msg_queue = NULL;
260 : }
261 :
262 11 : if (self->edge_h) {
263 10 : nns_edge_release_handle (self->edge_h);
264 10 : self->edge_h = NULL;
265 : }
266 11 : G_OBJECT_CLASS (parent_class)->finalize (object);
267 11 : }
268 :
269 : /**
270 : * @brief Change state of edgesrc.
271 : */
272 : static GstStateChangeReturn
273 72 : gst_edgesrc_change_state (GstElement * element, GstStateChange transition)
274 : {
275 72 : GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
276 72 : GstEdgeSrc *self = GST_EDGESRC (element);
277 :
278 72 : switch (transition) {
279 11 : case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
280 11 : GST_INFO_OBJECT (self, "State changed from PAUSED to PLAYING.");
281 11 : self->playing = TRUE;
282 11 : break;
283 61 : default:
284 61 : break;
285 : }
286 :
287 72 : ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
288 :
289 72 : switch (transition) {
290 10 : case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
291 10 : GST_INFO_OBJECT (self, "State changed from PLAYING to PAUSED.");
292 10 : self->playing = FALSE;
293 10 : break;
294 62 : default:
295 62 : break;
296 : }
297 :
298 72 : return ret;
299 : }
300 :
301 : /**
302 : * @brief nnstreamer-edge event callback.
303 : */
304 : static int
305 147 : _nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
306 : {
307 : nns_edge_event_e event_type;
308 147 : int ret = NNS_EDGE_ERROR_NONE;
309 :
310 147 : GstEdgeSrc *self = GST_EDGESRC (user_data);
311 :
312 147 : if (0 != nns_edge_event_get_type (event_h, &event_type)) {
313 0 : nns_loge ("Failed to get event type!");
314 147 : return NNS_EDGE_ERROR_UNKNOWN;
315 : }
316 :
317 147 : switch (event_type) {
318 139 : case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
319 : {
320 : nns_edge_data_h data;
321 :
322 139 : nns_edge_event_parse_new_data (event_h, &data);
323 139 : g_async_queue_push (self->msg_queue, data);
324 139 : break;
325 : }
326 0 : case NNS_EDGE_EVENT_CONNECTION_CLOSED:
327 : {
328 0 : self->playing = FALSE;
329 0 : break;
330 : }
331 8 : default:
332 8 : break;
333 : }
334 :
335 147 : return ret;
336 : }
337 :
338 : /**
339 : * @brief start edgesrc, called when state changed null to ready
340 : */
341 : static gboolean
342 14 : gst_edgesrc_start (GstBaseSrc * basesrc)
343 : {
344 14 : GstEdgeSrc *self = GST_EDGESRC (basesrc);
345 :
346 : int ret;
347 14 : char *port = NULL;
348 :
349 14 : if (NNS_EDGE_CONNECT_TYPE_CUSTOM != self->connect_type) {
350 11 : ret = nns_edge_create_handle (NULL, self->connect_type,
351 : NNS_EDGE_NODE_TYPE_SUB, &self->edge_h);
352 : } else {
353 3 : if (!self->custom_lib) {
354 1 : nns_loge ("Failed to create custom handle. custom-lib path is not set.");
355 1 : return FALSE;
356 : }
357 2 : ret = nns_edge_custom_create_handle (NULL, self->custom_lib,
358 : NNS_EDGE_NODE_TYPE_SUB, &self->edge_h);
359 : }
360 :
361 13 : if (NNS_EDGE_ERROR_NONE != ret) {
362 1 : nns_loge ("Failed to get nnstreamer edge handle.");
363 :
364 1 : if (self->edge_h) {
365 1 : nns_edge_release_handle (self->edge_h);
366 1 : self->edge_h = NULL;
367 : }
368 :
369 1 : return FALSE;
370 : }
371 :
372 12 : if (self->dest_host)
373 12 : nns_edge_set_info (self->edge_h, "DEST_HOST", self->dest_host);
374 12 : if (self->dest_port > 0) {
375 12 : port = g_strdup_printf ("%u", self->dest_port);
376 12 : nns_edge_set_info (self->edge_h, "DEST_PORT", port);
377 12 : g_free (port);
378 : }
379 12 : if (self->topic)
380 3 : nns_edge_set_info (self->edge_h, "TOPIC", self->topic);
381 :
382 12 : nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
383 :
384 12 : if (0 != nns_edge_start (self->edge_h)) {
385 0 : nns_loge
386 : ("Failed to start NNStreamer-edge. Please check server IP and port");
387 0 : return FALSE;
388 : }
389 :
390 12 : if (0 != nns_edge_connect (self->edge_h, self->dest_host, self->dest_port)) {
391 1 : nns_loge ("Failed to connect to edge server!");
392 1 : return FALSE;
393 : }
394 11 : self->playing = TRUE;
395 :
396 11 : return TRUE;
397 : }
398 :
399 : /**
400 : * @brief Stop edgesrc, called when state changed ready to null
401 : */
402 : static gboolean
403 10 : gst_edgesrc_stop (GstBaseSrc * basesrc)
404 : {
405 10 : GstEdgeSrc *self = GST_EDGESRC (basesrc);
406 : int ret;
407 :
408 10 : self->playing = FALSE;
409 10 : ret = nns_edge_stop (self->edge_h);
410 :
411 10 : if (NNS_EDGE_ERROR_NONE != ret) {
412 0 : nns_loge ("Failed to stop edgesrc. error code(%d)", ret);
413 0 : return FALSE;
414 : }
415 :
416 10 : return TRUE;
417 : }
418 :
419 : /**
420 : * @brief Create a buffer containing the subscribed data
421 : */
422 : static GstFlowReturn
423 94 : gst_edgesrc_create (GstBaseSrc * basesrc, guint64 offset, guint size,
424 : GstBuffer ** out_buf)
425 : {
426 94 : GstEdgeSrc *self = GST_EDGESRC (basesrc);
427 94 : nns_edge_data_h data_h = NULL;
428 94 : GstBuffer *buffer = NULL;
429 : GstMemory *mem;
430 94 : GstCaps *caps = NULL;
431 : GstStructure *structure;
432 : GstTensorsConfig config;
433 : GstTensorInfo *_info;
434 94 : gboolean is_tensor = FALSE;
435 : guint i, num_data, max_mems;
436 : int ret;
437 :
438 : UNUSED (offset);
439 : UNUSED (size);
440 94 : gst_tensors_config_init (&config);
441 :
442 195 : while (self->playing && !data_h) {
443 102 : data_h = g_async_queue_timeout_pop (self->msg_queue, G_USEC_PER_SEC);
444 : }
445 :
446 93 : if (!data_h) {
447 1 : nns_loge ("Failed to get message from the edgesrc message queue.");
448 93 : return GST_FLOW_ERROR;
449 : }
450 :
451 92 : ret = nns_edge_data_get_count (data_h, &num_data);
452 92 : if (ret != NNS_EDGE_ERROR_NONE || num_data == 0) {
453 0 : nns_loge ("Failed to get the number of memories of the edge data.");
454 0 : goto done;
455 : }
456 :
457 : /* Check current caps and max memory. */
458 92 : caps = gst_pad_get_current_caps (GST_BASE_SRC_PAD (basesrc));
459 92 : if (caps) {
460 0 : structure = gst_caps_get_structure (caps, 0);
461 0 : is_tensor = gst_structure_is_tensor_stream (structure);
462 :
463 0 : if (is_tensor)
464 0 : gst_tensors_config_from_structure (&config, structure);
465 :
466 0 : gst_caps_unref (caps);
467 : }
468 :
469 92 : max_mems = is_tensor ? NNS_TENSOR_SIZE_LIMIT : gst_buffer_get_max_memory ();
470 92 : if (num_data > max_mems) {
471 0 : nns_loge
472 : ("Cannot create new buffer. The edge-data has %u memories, but allowed memories is %u.",
473 : num_data, max_mems);
474 0 : goto done;
475 : }
476 :
477 92 : buffer = gst_buffer_new ();
478 184 : for (i = 0; i < num_data; i++) {
479 92 : void *data = NULL;
480 92 : nns_size_t data_len = 0;
481 : gpointer new_data;
482 :
483 92 : nns_edge_data_get (data_h, i, &data, &data_len);
484 92 : new_data = _g_memdup (data, data_len);
485 92 : mem = gst_memory_new_wrapped (0, new_data, data_len, 0, data_len,
486 : new_data, g_free);
487 :
488 92 : if (is_tensor) {
489 0 : _info = gst_tensors_info_get_nth_info (&config.info, i);
490 0 : gst_tensor_buffer_append_memory (buffer, mem, _info);
491 : } else {
492 92 : gst_buffer_append_memory (buffer, mem);
493 : }
494 : }
495 :
496 92 : done:
497 92 : if (data_h)
498 92 : nns_edge_data_destroy (data_h);
499 :
500 92 : gst_tensors_config_free (&config);
501 :
502 92 : if (buffer == NULL) {
503 0 : nns_loge ("Failed to get buffer to push to the edgesrc.");
504 0 : return GST_FLOW_ERROR;
505 : }
506 :
507 92 : *out_buf = buffer;
508 :
509 92 : return GST_FLOW_OK;
510 : }
511 :
512 : /**
513 : * @brief getter for the 'host' property.
514 : */
515 : static gchar *
516 1 : gst_edgesrc_get_dest_host (GstEdgeSrc * self)
517 : {
518 1 : return self->dest_host;
519 : }
520 :
521 : /**
522 : * @brief setter for the 'host' property.
523 : */
524 : static void
525 4 : gst_edgesrc_set_dest_host (GstEdgeSrc * self, const gchar * dest_host)
526 : {
527 4 : g_free (self->dest_host);
528 4 : self->dest_host = g_strdup (dest_host);
529 4 : }
530 :
531 : /**
532 : * @brief getter for the 'port' property.
533 : */
534 : static guint16
535 1 : gst_edgesrc_get_dest_port (GstEdgeSrc * self)
536 : {
537 1 : return self->dest_port;
538 : }
539 :
540 : /**
541 : * @brief setter for the 'port' property.
542 : */
543 : static void
544 11 : gst_edgesrc_set_dest_port (GstEdgeSrc * self, const guint16 dest_port)
545 : {
546 11 : self->dest_port = dest_port;
547 11 : }
548 :
549 : /**
550 : * @brief getter for the 'connect_type' property.
551 : */
552 : static nns_edge_connect_type_e
553 1 : gst_edgesrc_get_connect_type (GstEdgeSrc * self)
554 : {
555 1 : return self->connect_type;
556 : }
557 :
558 : /**
559 : * @brief setter for the 'connect_type' property.
560 : */
561 : static void
562 7 : gst_edgesrc_set_connect_type (GstEdgeSrc * self,
563 : const nns_edge_connect_type_e connect_type)
564 : {
565 7 : self->connect_type = connect_type;
566 7 : }
|