Doxygen Book
tensor_query_client.c
Go to the documentation of this file.
1 /* SPDX-License-Identifier: LGPL-2.1-only */
13 #ifdef HAVE_CONFIG_H
14 #include <config.h>
15 #endif
16 
17 #include "nnstreamer_util.h"
18 #include "tensor_query_client.h"
19 #include <gio/gio.h>
20 #include <glib.h>
21 #include <string.h>
22 #include "tensor_query_common.h"
23 
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <unistd.h>
27 
31 #ifndef DBG
32 #define DBG (!self->silent)
33 #endif
34 
38 enum
39 {
50 };
51 
52 #define TCP_HIGHEST_PORT 65535
53 #define TCP_DEFAULT_HOST "localhost"
54 #define TCP_DEFAULT_SRV_SRC_PORT 3000
55 #define TCP_DEFAULT_CLIENT_SRC_PORT 3001
56 #define DEFAULT_CLIENT_TIMEOUT 0
57 #define DEFAULT_SILENT TRUE
58 #define DEFAULT_MAX_REQUEST 2
59 
60 GST_DEBUG_CATEGORY_STATIC (gst_tensor_query_client_debug);
61 #define GST_CAT_DEFAULT gst_tensor_query_client_debug
62 
66 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
67  GST_PAD_SINK,
68  GST_PAD_ALWAYS,
69  GST_STATIC_CAPS_ANY);
70 
74 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
75  GST_PAD_SRC,
76  GST_PAD_ALWAYS,
77  GST_STATIC_CAPS_ANY);
78 
79 #define gst_tensor_query_client_parent_class parent_class
80 G_DEFINE_TYPE (GstTensorQueryClient, gst_tensor_query_client, GST_TYPE_ELEMENT);
81 
82 static void gst_tensor_query_client_finalize (GObject * object);
83 static void gst_tensor_query_client_set_property (GObject * object,
84  guint prop_id, const GValue * value, GParamSpec * pspec);
85 static void gst_tensor_query_client_get_property (GObject * object,
86  guint prop_id, GValue * value, GParamSpec * pspec);
87 
88 static gboolean gst_tensor_query_client_sink_event (GstPad * pad,
89  GstObject * parent, GstEvent * event);
90 static gboolean gst_tensor_query_client_sink_query (GstPad * pad,
91  GstObject * parent, GstQuery * query);
92 static GstFlowReturn gst_tensor_query_client_chain (GstPad * pad,
93  GstObject * parent, GstBuffer * buf);
95  GstPad * pad, GstCaps * filter);
96 
100 static void
102 {
103  GObjectClass *gobject_class;
104  GstElementClass *gstelement_class;
105 
106  gobject_class = (GObjectClass *) klass;
107  gstelement_class = (GstElementClass *) klass;
108 
109  gobject_class->set_property = gst_tensor_query_client_set_property;
110  gobject_class->get_property = gst_tensor_query_client_get_property;
111  gobject_class->finalize = gst_tensor_query_client_finalize;
112 
114  g_object_class_install_property (gobject_class, PROP_HOST,
115  g_param_spec_string ("host", "Host",
116  "A host address to receive the packets from query server",
117  TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
118  g_object_class_install_property (gobject_class, PROP_PORT,
119  g_param_spec_uint ("port", "Port",
120  "A port number to receive the packets from query server", 0,
122  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
123  g_object_class_install_property (gobject_class, PROP_DEST_HOST,
124  g_param_spec_string ("dest-host", "Destination Host",
125  "A tenor query server host to send the packets",
126  TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
127  g_object_class_install_property (gobject_class, PROP_DEST_PORT,
128  g_param_spec_uint ("dest-port", "Destination Port",
129  "The port of tensor query server to send the packets", 0,
131  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
132  g_object_class_install_property (gobject_class, PROP_SILENT,
133  g_param_spec_boolean ("silent", "Silent", "Produce verbose output",
134  DEFAULT_SILENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
135  g_object_class_install_property (gobject_class, PROP_CONNECT_TYPE,
136  g_param_spec_enum ("connect-type", "Connect Type",
137  "The connections type between client and server.",
139  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
140  g_object_class_install_property (gobject_class, PROP_TOPIC,
141  g_param_spec_string ("topic", "Topic",
142  "The main topic of the host.",
143  "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
144 
145  g_object_class_install_property (gobject_class, PROP_TIMEOUT,
146  g_param_spec_uint ("timeout", "timeout value",
147  "A timeout value (in ms) to wait message from query server after sending buffer to server. 0 means no wait.",
148  0, G_MAXUINT, DEFAULT_CLIENT_TIMEOUT,
149  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
150  g_object_class_install_property (gobject_class, PROP_MAX_REQUEST,
151  g_param_spec_uint ("max-request", "Maximum number of request",
152  "Sets the maximum number of buffers to request to the query server. "
153  "If the processing speed of query server is slower than the query client, the input buffer is dropped. "
154  "Two buffers are requested by default, and 0 means that all buffers are sent to query server without drop. ",
155  0, G_MAXUINT, DEFAULT_MAX_REQUEST,
156  G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
157  gst_element_class_add_pad_template (gstelement_class,
158  gst_static_pad_template_get (&sinktemplate));
159  gst_element_class_add_pad_template (gstelement_class,
160  gst_static_pad_template_get (&srctemplate));
161 
162  gst_element_class_set_static_metadata (gstelement_class,
163  "TensorQueryClient", "Filter/Tensor/Query",
164  "Handle querying tensor data through the network",
165  "Samsung Electronics Co., Ltd.");
166 
167  GST_DEBUG_CATEGORY_INIT (gst_tensor_query_client_debug, "tensor_query_client",
168  0, "Tensor Query Client");
169 }
170 
174 static void
176 {
178  self->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
179  gst_element_add_pad (GST_ELEMENT (self), self->sinkpad);
180  gst_pad_set_event_function (self->sinkpad,
181  GST_DEBUG_FUNCPTR (gst_tensor_query_client_sink_event));
182  gst_pad_set_query_function (self->sinkpad,
183  GST_DEBUG_FUNCPTR (gst_tensor_query_client_sink_query));
184  gst_pad_set_chain_function (self->sinkpad,
185  GST_DEBUG_FUNCPTR (gst_tensor_query_client_chain));
186 
188  self->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
189  gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
190 
191  /* init properties */
192  self->silent = DEFAULT_SILENT;
193  self->connect_type = DEFAULT_CONNECT_TYPE;
194  self->host = g_strdup (TCP_DEFAULT_HOST);
195  self->port = TCP_DEFAULT_CLIENT_SRC_PORT;
196  self->dest_host = g_strdup (TCP_DEFAULT_HOST);
197  self->dest_port = TCP_DEFAULT_SRV_SRC_PORT;
198  self->topic = NULL;
199  self->in_caps_str = NULL;
200  self->timeout = DEFAULT_CLIENT_TIMEOUT;
201  self->edge_h = NULL;
202  self->msg_queue = g_async_queue_new ();
203  self->max_request = DEFAULT_MAX_REQUEST;
204  self->requested_num = 0;
205  self->is_tensor = FALSE;
206  gst_tensors_config_init (&self->config);
207 }
208 
212 static void
214 {
216  nns_edge_data_h data_h;
217 
218  g_free (self->host);
219  self->host = NULL;
220  g_free (self->dest_host);
221  self->dest_host = NULL;
222  g_free (self->topic);
223  self->topic = NULL;
224  g_free (self->in_caps_str);
225  self->in_caps_str = NULL;
226 
227  while ((data_h = g_async_queue_try_pop (self->msg_queue))) {
228  nns_edge_data_destroy (data_h);
229  }
230 
231  if (self->msg_queue) {
232  g_async_queue_unref (self->msg_queue);
233  self->msg_queue = NULL;
234  }
235 
236  if (self->edge_h) {
237  nns_edge_release_handle (self->edge_h);
238  self->edge_h = NULL;
239  }
240 
241  gst_tensors_config_free (&self->config);
242 
243  G_OBJECT_CLASS (parent_class)->finalize (object);
244 }
245 
249 static void
250 gst_tensor_query_client_set_property (GObject * object, guint prop_id,
251  const GValue * value, GParamSpec * pspec)
252 {
254 
256  switch (prop_id) {
257  case PROP_HOST:
258  if (!g_value_get_string (value)) {
259  nns_logw ("Sink host property cannot be NULL");
260  break;
261  }
262  g_free (self->host);
263  self->host = g_value_dup_string (value);
264  break;
265  case PROP_PORT:
266  self->port = g_value_get_uint (value);
267  break;
268  case PROP_DEST_HOST:
269  if (!g_value_get_string (value)) {
270  nns_logw ("Sink host property cannot be NULL");
271  break;
272  }
273  g_free (self->dest_host);
274  self->dest_host = g_value_dup_string (value);
275  break;
276  case PROP_DEST_PORT:
277  self->dest_port = g_value_get_uint (value);
278  break;
279  case PROP_CONNECT_TYPE:
280  self->connect_type = g_value_get_enum (value);
281  break;
282  case PROP_TOPIC:
283  if (!g_value_get_string (value)) {
284  nns_logw ("Topic property cannot be NULL. Query-hybrid is disabled.");
285  break;
286  }
287  g_free (self->topic);
288  self->topic = g_value_dup_string (value);
289  break;
290  case PROP_TIMEOUT:
291  self->timeout = g_value_get_uint (value);
292  break;
293  case PROP_SILENT:
294  self->silent = g_value_get_boolean (value);
295  break;
296  case PROP_MAX_REQUEST:
297  self->max_request = g_value_get_uint (value);
298  break;
299  default:
300  G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
301  break;
302  }
303 }
304 
308 static void
309 gst_tensor_query_client_get_property (GObject * object, guint prop_id,
310  GValue * value, GParamSpec * pspec)
311 {
313 
314  switch (prop_id) {
315  case PROP_HOST:
316  g_value_set_string (value, self->host);
317  break;
318  case PROP_PORT:
319  g_value_set_uint (value, self->port);
320  break;
321  case PROP_DEST_HOST:
322  g_value_set_string (value, self->dest_host);
323  break;
324  case PROP_DEST_PORT:
325  g_value_set_uint (value, self->dest_port);
326  break;
327  case PROP_CONNECT_TYPE:
328  g_value_set_enum (value, self->connect_type);
329  break;
330  case PROP_TOPIC:
331  g_value_set_string (value, self->topic);
332  break;
333  case PROP_TIMEOUT:
334  g_value_set_uint (value, self->timeout);
335  break;
336  case PROP_SILENT:
337  g_value_set_boolean (value, self->silent);
338  break;
339  case PROP_MAX_REQUEST:
340  g_value_set_uint (value, self->max_request);
341  break;
342  default:
343  G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
344  break;
345  }
346 }
347 
351 static gboolean
353  const gchar * caps_str)
354 {
355  GstCaps *curr_caps, *out_caps;
356  gboolean ret = FALSE;
357  out_caps = gst_caps_from_string (caps_str);
358  silent_debug_caps (self, out_caps, "set out-caps");
359 
360  /* Update src pad caps if it is different. */
361  curr_caps = gst_pad_get_current_caps (self->srcpad);
362  if (curr_caps == NULL || !gst_caps_is_equal (curr_caps, out_caps)) {
363  if (gst_caps_is_fixed (out_caps)) {
364  ret = gst_pad_set_caps (self->srcpad, out_caps);
365 
370  if (ret) {
371  GstStructure *s = gst_caps_get_structure (out_caps, 0);
372 
373  self->is_tensor = gst_structure_is_tensor_stream (s);
374  if (self->is_tensor) {
375  gst_tensors_config_free (&self->config);
376  gst_tensors_config_from_structure (&self->config, s);
377  }
378  }
379  } else {
380  nns_loge ("out-caps from tensor_query_serversink is not fixed. "
381  "Failed to update client src caps, out-caps: %s", caps_str);
382  }
383  } else {
385  ret = TRUE;
386  }
387 
388  if (curr_caps)
389  gst_caps_unref (curr_caps);
390 
391  gst_caps_unref (out_caps);
392 
393  return ret;
394 }
395 
399 static gchar *
400 _nns_edge_parse_caps (gchar * caps_str, gboolean is_src)
401 {
402  gchar **strv;
403  gint num, i;
404  gchar *find_key = NULL;
405  gchar *ret_str = NULL;
406 
407  if (!caps_str)
408  return NULL;
409 
410  strv = g_strsplit (caps_str, "@", -1);
411  num = g_strv_length (strv);
412 
413  find_key =
414  is_src ==
415  TRUE ? g_strdup ("query_server_src_caps") :
416  g_strdup ("query_server_sink_caps");
417 
418  for (i = 1; i < num; i += 2) {
419  if (0 == g_strcmp0 (find_key, strv[i])) {
420  ret_str = g_strdup (strv[i + 1]);
421  break;
422  }
423  }
424 
425  g_free (find_key);
426  g_strfreev (strv);
427 
428  return ret_str;
429 }
430 
434 static int
435 _nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
436 {
437  nns_edge_event_e event_type;
438  int ret = NNS_EDGE_ERROR_NONE;
439  GstTensorQueryClient *self = (GstTensorQueryClient *) user_data;
440 
441  if (NNS_EDGE_ERROR_NONE != nns_edge_event_get_type (event_h, &event_type)) {
442  nns_loge ("Failed to get event type!");
443  return NNS_EDGE_ERROR_NOT_SUPPORTED;
444  }
445 
446  switch (event_type) {
447  case NNS_EDGE_EVENT_CAPABILITY:
448  {
449  GstCaps *server_caps, *client_caps;
450  GstStructure *server_st, *client_st;
451  gboolean result = FALSE;
452  gchar *ret_str, *caps_str;
453 
454  nns_edge_event_parse_capability (event_h, &caps_str);
455  ret_str = _nns_edge_parse_caps (caps_str, TRUE);
456  nns_logd ("Received server-src caps: %s", GST_STR_NULL (ret_str));
457  client_caps = gst_caps_from_string ((gchar *) self->in_caps_str);
458  server_caps = gst_caps_from_string (ret_str);
459  g_free (ret_str);
460 
462  gst_caps_set_simple (server_caps, "framerate", GST_TYPE_FRACTION, 0, 1,
463  NULL);
464  gst_caps_set_simple (client_caps, "framerate", GST_TYPE_FRACTION, 0, 1,
465  NULL);
466 
467  server_st = gst_caps_get_structure (server_caps, 0);
468  client_st = gst_caps_get_structure (client_caps, 0);
469 
470  if (gst_structure_is_tensor_stream (server_st)) {
471  GstTensorsConfig server_config, client_config;
472 
473  gst_tensors_config_from_structure (&server_config, server_st);
474  gst_tensors_config_from_structure (&client_config, client_st);
475 
476  result = gst_tensors_config_is_equal (&server_config, &client_config);
477 
478  gst_tensors_config_free (&server_config);
479  gst_tensors_config_free (&client_config);
480  }
481 
482  if (result || gst_caps_can_intersect (client_caps, server_caps)) {
484  ret_str = _nns_edge_parse_caps (caps_str, FALSE);
485  nns_logd ("Received server-sink caps: %s", GST_STR_NULL (ret_str));
486  if (!gst_tensor_query_client_update_caps (self, ret_str)) {
487  nns_loge ("Failed to update client source caps.");
488  ret = NNS_EDGE_ERROR_UNKNOWN;
489  }
490  g_free (ret_str);
491  } else {
492  /* respond deny with src caps string */
493  nns_loge ("Query caps is not acceptable!");
494  ret = NNS_EDGE_ERROR_UNKNOWN;
495  }
496 
497  gst_caps_unref (server_caps);
498  gst_caps_unref (client_caps);
499  g_free (caps_str);
500  break;
501  }
502  case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
503  {
504  nns_edge_data_h data;
505 
506  nns_edge_event_parse_new_data (event_h, &data);
507  g_async_queue_push (self->msg_queue, data);
508  break;
509  }
510  default:
511  break;
512  }
513 
514  return ret;
515 }
516 
520 static gboolean
522 {
523  gboolean started = FALSE;
524  gchar *prev_caps = NULL;
525  int ret;
526 
527  /* Already created, compare caps string. */
528  if (self->edge_h) {
529  ret = nns_edge_get_info (self->edge_h, "CAPS", &prev_caps);
530 
531  if (ret != NNS_EDGE_ERROR_NONE || !prev_caps ||
532  !g_str_equal (prev_caps, self->in_caps_str)) {
533  /* Capability is changed, close old handle. */
534  nns_edge_release_handle (self->edge_h);
535  self->edge_h = NULL;
536  } else {
537  return TRUE;
538  }
539  }
540 
541  ret = nns_edge_create_handle ("TEMP_ID", self->connect_type,
542  NNS_EDGE_NODE_TYPE_QUERY_CLIENT, &self->edge_h);
543  if (ret != NNS_EDGE_ERROR_NONE)
544  return FALSE;
545 
546  nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
547 
548  if (self->topic)
549  nns_edge_set_info (self->edge_h, "TOPIC", self->topic);
550  if (self->host)
551  nns_edge_set_info (self->edge_h, "HOST", self->host);
552  if (self->port > 0) {
553  gchar *port = g_strdup_printf ("%u", self->port);
554  nns_edge_set_info (self->edge_h, "PORT", port);
555  g_free (port);
556  }
557  nns_edge_set_info (self->edge_h, "CAPS", self->in_caps_str);
558 
559  ret = nns_edge_start (self->edge_h);
560  if (ret != NNS_EDGE_ERROR_NONE) {
561  nns_loge
562  ("Failed to start NNStreamer-edge. Please check server IP and port.");
563  goto done;
564  }
565 
566  ret = nns_edge_connect (self->edge_h, self->dest_host, self->dest_port);
567  if (ret != NNS_EDGE_ERROR_NONE) {
568  nns_loge ("Failed to connect to edge server!");
569  goto done;
570  }
571 
572  started = TRUE;
573 
574 done:
575  if (!started) {
576  nns_edge_release_handle (self->edge_h);
577  self->edge_h = NULL;
578  }
579 
580  return started;
581 }
582 
586 static gboolean
588  GstObject * parent, GstEvent * event)
589 {
591 
592  GST_DEBUG_OBJECT (self, "Received %s event: %" GST_PTR_FORMAT,
593  GST_EVENT_TYPE_NAME (event), event);
594 
595  switch (GST_EVENT_TYPE (event)) {
596  case GST_EVENT_CAPS:
597  {
598  GstCaps *caps;
599  gboolean ret;
600 
601  gst_event_parse_caps (event, &caps);
602  g_free (self->in_caps_str);
603  self->in_caps_str = gst_caps_to_string (caps);
604 
606  if (!ret)
607  nns_loge ("Failed to create edge handle, cannot start query client.");
608 
609  gst_event_unref (event);
610  return ret;
611  }
612  default:
613  break;
614  }
615 
616  return gst_pad_event_default (pad, parent, event);
617 }
618 
622 static gboolean
624  GstObject * parent, GstQuery * query)
625 {
627 
628  GST_DEBUG_OBJECT (self, "Received %s query: %" GST_PTR_FORMAT,
629  GST_QUERY_TYPE_NAME (query), query);
630 
631  switch (GST_QUERY_TYPE (query)) {
632  case GST_QUERY_CAPS:
633  {
634  GstCaps *caps;
635  GstCaps *filter;
636 
637  gst_query_parse_caps (query, &filter);
638  caps = gst_tensor_query_client_query_caps (self, pad, filter);
639 
640  gst_query_set_caps_result (query, caps);
641  gst_caps_unref (caps);
642  return TRUE;
643  }
644  case GST_QUERY_ACCEPT_CAPS:
645  {
646  GstCaps *caps;
647  GstCaps *template_caps;
648  gboolean res = FALSE;
649 
650  gst_query_parse_accept_caps (query, &caps);
651  silent_debug_caps (self, caps, "accept-caps");
652 
653  if (gst_caps_is_fixed (caps)) {
654  template_caps = gst_pad_get_pad_template_caps (pad);
655 
656  res = gst_caps_can_intersect (template_caps, caps);
657  gst_caps_unref (template_caps);
658  }
659 
660  gst_query_set_accept_caps_result (query, res);
661  return TRUE;
662  }
663  default:
664  break;
665  }
666 
667  return gst_pad_query_default (pad, parent, query);
668 }
669 
673 static GstFlowReturn
675  GstObject * parent, GstBuffer * buf)
676 {
678  GstBuffer *out_buf = NULL;
679  GstFlowReturn res = GST_FLOW_OK;
680  nns_edge_data_h data_h = NULL;
681  guint i, num_tensors = 0, num_data = 0;
682  int ret = NNS_EDGE_ERROR_NONE;
683  GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
684  GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
685  gchar *val;
686  UNUSED (pad);
687 
688  if (self->max_request > 0 && self->requested_num > self->max_request) {
689  nns_logi
690  ("The processing speed of the query server is too slow. Drop the input buffer.");
691  goto try_pop;
692  }
693 
694  ret = nns_edge_data_create (&data_h);
695  if (ret != NNS_EDGE_ERROR_NONE) {
696  nns_loge ("Failed to create data handle in client chain.");
697  goto try_pop;
698  }
699 
700  num_tensors = gst_tensor_buffer_get_count (buf);
701  for (i = 0; i < num_tensors; i++) {
702  mem[i] = gst_tensor_buffer_get_nth_memory (buf, i);
703  if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
704  ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
705  gst_memory_unref (mem[i]);
706  num_tensors = i;
707  goto try_pop;
708  }
709  nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
710  }
711 
712  nns_edge_get_info (self->edge_h, "client_id", &val);
713  nns_edge_data_set_info (data_h, "client_id", val);
714  g_free (val);
715 
716  ret = nns_edge_send (self->edge_h, data_h);
717  if (ret == NNS_EDGE_ERROR_NONE) {
718  self->requested_num++;
719  } else {
720  nns_loge ("Failed to publish to server node.");
721  }
722 
723 try_pop:
724  if (data_h)
725  nns_edge_data_destroy (data_h);
726 
727  data_h = g_async_queue_timeout_pop (self->msg_queue,
728  self->timeout * G_TIME_SPAN_MILLISECOND);
729  if (data_h) {
730  if (self->requested_num > 0)
731  self->requested_num--;
732  ret = nns_edge_data_get_count (data_h, &num_data);
733 
734  if (ret == NNS_EDGE_ERROR_NONE && num_data > 0) {
735  GstMemory *new_mem;
736  GstTensorInfo *_info;
737 
738  out_buf = gst_buffer_new ();
739 
740  for (i = 0; i < num_data; i++) {
741  void *data = NULL;
742  nns_size_t data_len;
743  gpointer new_data;
744 
745  nns_edge_data_get (data_h, i, &data, &data_len);
746  new_data = _g_memdup (data, data_len);
747 
748  new_mem = gst_memory_new_wrapped (0, new_data, data_len, 0, data_len,
749  new_data, g_free);
750 
751  if (self->is_tensor) {
752  _info = gst_tensors_info_get_nth_info (&self->config.info, i);
753  gst_tensor_buffer_append_memory (out_buf, new_mem, _info);
754  } else {
755  gst_buffer_append_memory (out_buf, new_mem);
756  }
757  }
758 
759  /* metadata from incoming buffer */
760  gst_buffer_copy_into (out_buf, buf, GST_BUFFER_COPY_METADATA, 0, -1);
761 
762  res = gst_pad_push (self->srcpad, out_buf);
763  } else {
764  nns_loge ("Failed to get the number of memories of the edge data.");
765  res = GST_FLOW_ERROR;
766  }
767 
768  nns_edge_data_destroy (data_h);
769  }
770 
771  for (i = 0; i < num_tensors; i++) {
772  gst_memory_unmap (mem[i], &map[i]);
773  gst_memory_unref (mem[i]);
774  }
775 
776  gst_buffer_unref (buf);
777  return res;
778 }
779 
783 static GstCaps *
785  GstCaps * filter)
786 {
787  GstCaps *caps;
788 
789  caps = gst_pad_get_current_caps (pad);
790  if (!caps) {
792  caps = gst_pad_get_pad_template_caps (pad);
793  }
794 
795  silent_debug_caps (self, caps, "caps");
796  silent_debug_caps (self, filter, "filter");
797 
798  if (filter) {
799  GstCaps *intersection;
800  intersection =
801  gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
802 
803  gst_caps_unref (caps);
804  caps = intersection;
805  }
806 
807  silent_debug_caps (self, caps, "result");
808  return caps;
809 }
PROP_TIMEOUT
@ PROP_TIMEOUT
Definition: tensor_query_client.c:47
gst_tensors_config_is_equal
gboolean gst_tensors_config_is_equal(const GstTensorsConfig *c1, const GstTensorsConfig *c2)
Compare tensor config info (for other/tensors)
Definition: nnstreamer_plugin_api_util_impl.c:881
gst_tensor_query_client_update_caps
static gboolean gst_tensor_query_client_update_caps(GstTensorQueryClient *self, const gchar *caps_str)
Update src pad caps from tensors config.
Definition: tensor_query_client.c:352
_nns_edge_event_cb
static int _nns_edge_event_cb(nns_edge_event_h event_h, void *user_data)
nnstreamer-edge event callback.
Definition: tensor_query_client.c:435
PROP_DEST_HOST
@ PROP_DEST_HOST
Definition: tensor_query_client.c:43
TCP_DEFAULT_CLIENT_SRC_PORT
#define TCP_DEFAULT_CLIENT_SRC_PORT
Definition: tensor_query_client.c:55
data
svtc_1 data
Definition: gsttensor_if.c:844
GstTensorInfo
Internal data structure for tensor info.
Definition: tensor_typedef.h:261
NNS_TENSOR_SIZE_LIMIT
#define NNS_TENSOR_SIZE_LIMIT
The number of tensors NNStreamer supports is 256. The max memories of gst-buffer is 16 (See NNS_TENSO...
Definition: tensor_typedef.h:42
gst_tensor_query_client_get_property
static void gst_tensor_query_client_get_property(GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
get property
Definition: tensor_query_client.c:309
FALSE
return FALSE
Definition: gsttensor_transform.c:590
result
case tensor_data_s gboolean * result
Definition: gsttensor_if.c:839
GST_TENSOR_QUERY_CLIENT
#define GST_TENSOR_QUERY_CLIENT(obj)
Definition: tensor_query_client.h:25
gst_tensor_query_client_set_property
static void gst_tensor_query_client_set_property(GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
set property
Definition: tensor_query_client.c:250
nns_logd
#define nns_logd
Definition: nnstreamer_log.h:143
gst_tensor_query_client_sink_event
static gboolean gst_tensor_query_client_sink_event(GstPad *pad, GstObject *parent, GstEvent *event)
This function handles sink event.
Definition: tensor_query_client.c:587
TCP_DEFAULT_HOST
#define TCP_DEFAULT_HOST
Definition: tensor_query_client.c:53
nns_logi
#define nns_logi
Definition: nnstreamer_log.h:140
GST_TYPE_QUERY_CONNECT_TYPE
#define GST_TYPE_QUERY_CONNECT_TYPE
Definition: tensor_query_common.h:35
g_free
g_free(self->option[(opnum) - 1])
opnum: \
_GstTensorQueryClient
GstTensorQueryClient data structure.
Definition: tensor_query_client.h:41
g_value_set_string
g_value_set_string(value, self->option[opnum - 1])
opnum: \
PROP_PORT
@ PROP_PORT
Definition: tensor_query_client.c:42
_g_memdup
#define _g_memdup(data, size)
g_memdup() function replaced by g_memdup2() in glib version >= 2.68
Definition: nnstreamer_util.h:31
gst_tensors_config_free
void gst_tensors_config_free(GstTensorsConfig *config)
Free allocated data in tensors config structure.
Definition: nnstreamer_plugin_api_util_impl.c:845
tensor_query_client.h
GStreamer plugin to handle tensor query client.
GstTensorsConfig
Internal data structure for configured tensors info (for other/tensors).
Definition: tensor_typedef.h:284
gst_structure_is_tensor_stream
G_BEGIN_DECLS gboolean gst_structure_is_tensor_stream(const GstStructure *structure)
Check given mimetype is tensor stream.
Definition: nnstreamer_plugin_api_impl.c:984
silent_debug_caps
#define silent_debug_caps(self, caps, msg)
Macro for capability debug message.
Definition: tensor_common.h:285
PROP_CONNECT_TYPE
@ PROP_CONNECT_TYPE
Definition: tensor_query_client.c:45
ml_loge
#define ml_loge
Definition: nnstreamer_log.h:78
TRUE
return TRUE
Definition: gsttensor_if.c:897
PROP_TOPIC
@ PROP_TOPIC
Definition: tensor_query_client.c:46
UNUSED
#define UNUSED(expr)
Definition: mqttcommon.h:19
DEFAULT_SILENT
#define DEFAULT_SILENT
Definition: tensor_query_client.c:57
nns_loge
#define nns_loge
Definition: nnstreamer_log.h:142
DEFAULT_CONNECT_TYPE
#define DEFAULT_CONNECT_TYPE
Definition: edge_common.h:27
nnstreamer_util.h
Optional NNStreamer utility functions for sub-plugin writers and users.
_nns_edge_parse_caps
static gchar * _nns_edge_parse_caps(gchar *caps_str, gboolean is_src)
Parse caps from received event data.
Definition: tensor_query_client.c:400
TCP_DEFAULT_SRV_SRC_PORT
#define TCP_DEFAULT_SRV_SRC_PORT
Definition: tensor_query_client.c:54
PROP_SILENT
@ PROP_SILENT
Definition: tensor_query_client.c:48
gst_tensors_info_get_nth_info
GstTensorInfo * gst_tensors_info_get_nth_info(GstTensorsInfo *info, guint index)
Get the pointer of nth tensor information.
Definition: nnstreamer_plugin_api_util_impl.c:296
PROP_DEST_PORT
@ PROP_DEST_PORT
Definition: tensor_query_client.c:44
TCP_HIGHEST_PORT
#define TCP_HIGHEST_PORT
Definition: tensor_query_client.c:52
gst_tensor_query_client_chain
static GstFlowReturn gst_tensor_query_client_chain(GstPad *pad, GstObject *parent, GstBuffer *buf)
Chain function, this function does the actual processing.
Definition: tensor_query_client.c:674
gst_tensor_query_client_class_init
static void gst_tensor_query_client_class_init(GstTensorQueryClientClass *klass)
initialize the class
Definition: tensor_query_client.c:101
DEFAULT_MAX_REQUEST
#define DEFAULT_MAX_REQUEST
Definition: tensor_query_client.c:58
G_DEFINE_TYPE
G_DEFINE_TYPE(GstTensorQueryClient, gst_tensor_query_client, GST_TYPE_ELEMENT)
DEFAULT_CLIENT_TIMEOUT
#define DEFAULT_CLIENT_TIMEOUT
Definition: tensor_query_client.c:56
gst_tensor_query_client_query_caps
static GstCaps * gst_tensor_query_client_query_caps(GstTensorQueryClient *self, GstPad *pad, GstCaps *filter)
Get pad caps for caps negotiation.
Definition: tensor_query_client.c:784
nns_logw
#define nns_logw
Definition: nnstreamer_log.h:141
sinktemplate
static GstStaticPadTemplate sinktemplate
the capabilities of the inputs.
Definition: tensor_query_client.c:66
_GstTensorQueryClientClass
GstTensorQueryClientClass data structure.
Definition: tensor_query_client.h:71
gst_tensor_buffer_get_nth_memory
GstMemory * gst_tensor_buffer_get_nth_memory(GstBuffer *buffer, const guint index)
Get the nth GstMemory from given buffer.
Definition: nnstreamer_plugin_api_impl.c:1586
GST_DEBUG_CATEGORY_STATIC
GST_DEBUG_CATEGORY_STATIC(gst_tensor_query_client_debug)
gst_tensors_config_init
void gst_tensors_config_init(GstTensorsConfig *config)
Initialize the tensors config info structure (for other/tensors)
Definition: nnstreamer_plugin_api_util_impl.c:830
PROP_0
@ PROP_0
Definition: tensor_query_client.c:40
gst_tensor_buffer_get_count
guint gst_tensor_buffer_get_count(GstBuffer *buffer)
Get the number of tensors in the buffer.
Definition: nnstreamer_plugin_api_impl.c:1813
srctemplate
static GstStaticPadTemplate srctemplate
the capabilities of the outputs.
Definition: tensor_query_client.c:74
gst_tensor_query_client_init
static void gst_tensor_query_client_init(GstTensorQueryClient *self)
initialize the new element
Definition: tensor_query_client.c:175
gst_tensor_query_client_create_edge_handle
static gboolean gst_tensor_query_client_create_edge_handle(GstTensorQueryClient *self)
Internal function to create edge handle.
Definition: tensor_query_client.c:521
gst_tensor_query_client_finalize
static void gst_tensor_query_client_finalize(GObject *object)
finalize the object
Definition: tensor_query_client.c:213
gst_tensors_config_from_structure
gboolean gst_tensors_config_from_structure(GstTensorsConfig *config, const GstStructure *structure)
Parse structure and set tensors config (for other/tensors)
Definition: nnstreamer_plugin_api_impl.c:1413
PROP_HOST
@ PROP_HOST
Definition: tensor_query_client.c:41
PROP_MAX_REQUEST
@ PROP_MAX_REQUEST
Definition: tensor_query_client.c:49
gst_tensor_buffer_append_memory
gboolean gst_tensor_buffer_append_memory(GstBuffer *buffer, GstMemory *memory, const GstTensorInfo *info)
Append memory to given buffer.
Definition: nnstreamer_plugin_api_impl.c:1666
gst_tensor_query_client_sink_query
static gboolean gst_tensor_query_client_sink_query(GstPad *pad, GstObject *parent, GstQuery *query)
This function handles sink pad query.
Definition: tensor_query_client.c:623
tensor_query_common.h
Utility functions for tensor query.