32 #define DBG (!self->silent)
52 #define TCP_HIGHEST_PORT 65535
53 #define TCP_DEFAULT_HOST "localhost"
54 #define TCP_DEFAULT_SRV_SRC_PORT 3000
55 #define TCP_DEFAULT_CLIENT_SRC_PORT 3001
56 #define DEFAULT_CLIENT_TIMEOUT 0
57 #define DEFAULT_SILENT TRUE
58 #define DEFAULT_MAX_REQUEST 2
61 #define GST_CAT_DEFAULT gst_tensor_query_client_debug
66 static GstStaticPadTemplate
sinktemplate = GST_STATIC_PAD_TEMPLATE (
"sink",
74 static GstStaticPadTemplate
srctemplate = GST_STATIC_PAD_TEMPLATE (
"src",
79 #define gst_tensor_query_client_parent_class parent_class
84 guint prop_id,
const GValue * value, GParamSpec * pspec);
86 guint prop_id, GValue * value, GParamSpec * pspec);
89 GstObject * parent, GstEvent * event);
91 GstObject * parent, GstQuery * query);
93 GstObject * parent, GstBuffer * buf);
95 GstPad * pad, GstCaps * filter);
103 GObjectClass *gobject_class;
104 GstElementClass *gstelement_class;
106 gobject_class = (GObjectClass *) klass;
107 gstelement_class = (GstElementClass *) klass;
114 g_object_class_install_property (gobject_class,
PROP_HOST,
115 g_param_spec_string (
"host",
"Host",
116 "A host address to receive the packets from query server",
118 g_object_class_install_property (gobject_class,
PROP_PORT,
119 g_param_spec_uint (
"port",
"Port",
120 "A port number to receive the packets from query server", 0,
122 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
124 g_param_spec_string (
"dest-host",
"Destination Host",
125 "A tenor query server host to send the packets",
128 g_param_spec_uint (
"dest-port",
"Destination Port",
129 "The port of tensor query server to send the packets", 0,
131 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
132 g_object_class_install_property (gobject_class,
PROP_SILENT,
133 g_param_spec_boolean (
"silent",
"Silent",
"Produce verbose output",
136 g_param_spec_enum (
"connect-type",
"Connect Type",
137 "The connections type between client and server.",
139 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
140 g_object_class_install_property (gobject_class,
PROP_TOPIC,
141 g_param_spec_string (
"topic",
"Topic",
142 "The main topic of the host.",
143 "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
145 g_object_class_install_property (gobject_class,
PROP_TIMEOUT,
146 g_param_spec_uint (
"timeout",
"timeout value",
147 "A timeout value (in ms) to wait message from query server after sending buffer to server. 0 means no wait.",
149 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
151 g_param_spec_uint (
"max-request",
"Maximum number of request",
152 "Sets the maximum number of buffers to request to the query server. "
153 "If the processing speed of query server is slower than the query client, the input buffer is dropped. "
154 "Two buffers are requested by default, and 0 means that all buffers are sent to query server without drop. ",
156 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
157 gst_element_class_add_pad_template (gstelement_class,
159 gst_element_class_add_pad_template (gstelement_class,
162 gst_element_class_set_static_metadata (gstelement_class,
163 "TensorQueryClient",
"Filter/Tensor/Query",
164 "Handle querying tensor data through the network",
165 "Samsung Electronics Co., Ltd.");
167 GST_DEBUG_CATEGORY_INIT (gst_tensor_query_client_debug,
"tensor_query_client",
168 0,
"Tensor Query Client");
178 self->sinkpad = gst_pad_new_from_static_template (&
sinktemplate,
"sink");
179 gst_element_add_pad (GST_ELEMENT (
self), self->sinkpad);
180 gst_pad_set_event_function (self->sinkpad,
182 gst_pad_set_query_function (self->sinkpad,
184 gst_pad_set_chain_function (self->sinkpad,
188 self->srcpad = gst_pad_new_from_static_template (&
srctemplate,
"src");
189 gst_element_add_pad (GST_ELEMENT (
self), self->srcpad);
199 self->in_caps_str = NULL;
202 self->msg_queue = g_async_queue_new ();
204 self->requested_num = 0;
205 self->is_tensor =
FALSE;
216 nns_edge_data_h data_h;
221 self->dest_host = NULL;
224 g_free (self->in_caps_str);
225 self->in_caps_str = NULL;
227 while ((data_h = g_async_queue_try_pop (self->msg_queue))) {
228 nns_edge_data_destroy (data_h);
231 if (self->msg_queue) {
232 g_async_queue_unref (self->msg_queue);
233 self->msg_queue = NULL;
237 nns_edge_release_handle (self->edge_h);
243 G_OBJECT_CLASS (parent_class)->finalize (
object);
251 const GValue * value, GParamSpec * pspec)
258 if (!g_value_get_string (value)) {
259 nns_logw (
"Sink host property cannot be NULL");
263 self->host = g_value_dup_string (value);
266 self->port = g_value_get_uint (value);
269 if (!g_value_get_string (value)) {
270 nns_logw (
"Sink host property cannot be NULL");
274 self->dest_host = g_value_dup_string (value);
277 self->dest_port = g_value_get_uint (value);
280 self->connect_type = g_value_get_enum (value);
283 if (!g_value_get_string (value)) {
284 nns_logw (
"Topic property cannot be NULL. Query-hybrid is disabled.");
288 self->topic = g_value_dup_string (value);
291 self->timeout = g_value_get_uint (value);
294 self->silent = g_value_get_boolean (value);
297 self->max_request = g_value_get_uint (value);
300 G_OBJECT_WARN_INVALID_PROPERTY_ID (
object, prop_id, pspec);
310 GValue * value, GParamSpec * pspec)
319 g_value_set_uint (value, self->port);
325 g_value_set_uint (value, self->dest_port);
328 g_value_set_enum (value, self->connect_type);
334 g_value_set_uint (value, self->timeout);
337 g_value_set_boolean (value, self->silent);
340 g_value_set_uint (value, self->max_request);
343 G_OBJECT_WARN_INVALID_PROPERTY_ID (
object, prop_id, pspec);
353 const gchar * caps_str)
355 GstCaps *curr_caps, *out_caps;
356 gboolean ret =
FALSE;
357 out_caps = gst_caps_from_string (caps_str);
361 curr_caps = gst_pad_get_current_caps (self->srcpad);
362 if (curr_caps == NULL || !gst_caps_is_equal (curr_caps, out_caps)) {
363 if (gst_caps_is_fixed (out_caps)) {
364 ret = gst_pad_set_caps (self->srcpad, out_caps);
371 GstStructure *s = gst_caps_get_structure (out_caps, 0);
374 if (self->is_tensor) {
380 nns_loge (
"out-caps from tensor_query_serversink is not fixed. "
381 "Failed to update client src caps, out-caps: %s", caps_str);
389 gst_caps_unref (curr_caps);
391 gst_caps_unref (out_caps);
404 gchar *find_key = NULL;
405 gchar *ret_str = NULL;
410 strv = g_strsplit (caps_str,
"@", -1);
411 num = g_strv_length (strv);
415 TRUE ? g_strdup (
"query_server_src_caps") :
416 g_strdup (
"query_server_sink_caps");
418 for (i = 1; i < num; i += 2) {
419 if (0 == g_strcmp0 (find_key, strv[i])) {
420 ret_str = g_strdup (strv[i + 1]);
437 nns_edge_event_e event_type;
438 int ret = NNS_EDGE_ERROR_NONE;
441 if (NNS_EDGE_ERROR_NONE != nns_edge_event_get_type (event_h, &event_type)) {
442 nns_loge (
"Failed to get event type!");
443 return NNS_EDGE_ERROR_NOT_SUPPORTED;
446 switch (event_type) {
447 case NNS_EDGE_EVENT_CAPABILITY:
449 GstCaps *server_caps, *client_caps;
450 GstStructure *server_st, *client_st;
452 gchar *ret_str, *caps_str;
454 nns_edge_event_parse_capability (event_h, &caps_str);
456 nns_logd (
"Received server-src caps: %s", GST_STR_NULL (ret_str));
457 client_caps = gst_caps_from_string ((gchar *) self->in_caps_str);
458 server_caps = gst_caps_from_string (ret_str);
462 gst_caps_set_simple (server_caps,
"framerate", GST_TYPE_FRACTION, 0, 1,
464 gst_caps_set_simple (client_caps,
"framerate", GST_TYPE_FRACTION, 0, 1,
467 server_st = gst_caps_get_structure (server_caps, 0);
468 client_st = gst_caps_get_structure (client_caps, 0);
482 if (
result || gst_caps_can_intersect (client_caps, server_caps)) {
485 nns_logd (
"Received server-sink caps: %s", GST_STR_NULL (ret_str));
487 nns_loge (
"Failed to update client source caps.");
488 ret = NNS_EDGE_ERROR_UNKNOWN;
493 nns_loge (
"Query caps is not acceptable!");
494 ret = NNS_EDGE_ERROR_UNKNOWN;
497 gst_caps_unref (server_caps);
498 gst_caps_unref (client_caps);
502 case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
504 nns_edge_data_h
data;
506 nns_edge_event_parse_new_data (event_h, &
data);
507 g_async_queue_push (self->msg_queue,
data);
523 gboolean started =
FALSE;
524 gchar *prev_caps = NULL;
529 ret = nns_edge_get_info (self->edge_h,
"CAPS", &prev_caps);
531 if (ret != NNS_EDGE_ERROR_NONE || !prev_caps ||
532 !g_str_equal (prev_caps, self->in_caps_str)) {
534 nns_edge_release_handle (self->edge_h);
541 ret = nns_edge_create_handle (
"TEMP_ID", self->connect_type,
542 NNS_EDGE_NODE_TYPE_QUERY_CLIENT, &self->edge_h);
543 if (ret != NNS_EDGE_ERROR_NONE)
549 nns_edge_set_info (self->edge_h,
"TOPIC", self->topic);
551 nns_edge_set_info (self->edge_h,
"HOST", self->host);
552 if (self->port > 0) {
553 gchar *port = g_strdup_printf (
"%u", self->port);
554 nns_edge_set_info (self->edge_h,
"PORT", port);
557 nns_edge_set_info (self->edge_h,
"CAPS", self->in_caps_str);
559 ret = nns_edge_start (self->edge_h);
560 if (ret != NNS_EDGE_ERROR_NONE) {
562 (
"Failed to start NNStreamer-edge. Please check server IP and port.");
566 ret = nns_edge_connect (self->edge_h, self->dest_host, self->dest_port);
567 if (ret != NNS_EDGE_ERROR_NONE) {
568 nns_loge (
"Failed to connect to edge server!");
576 nns_edge_release_handle (self->edge_h);
588 GstObject * parent, GstEvent * event)
592 GST_DEBUG_OBJECT (
self,
"Received %s event: %" GST_PTR_FORMAT,
593 GST_EVENT_TYPE_NAME (event), event);
595 switch (GST_EVENT_TYPE (event)) {
601 gst_event_parse_caps (event, &caps);
602 g_free (self->in_caps_str);
603 self->in_caps_str = gst_caps_to_string (caps);
607 nns_loge (
"Failed to create edge handle, cannot start query client.");
609 gst_event_unref (event);
616 return gst_pad_event_default (pad, parent, event);
624 GstObject * parent, GstQuery * query)
628 GST_DEBUG_OBJECT (
self,
"Received %s query: %" GST_PTR_FORMAT,
629 GST_QUERY_TYPE_NAME (query), query);
631 switch (GST_QUERY_TYPE (query)) {
637 gst_query_parse_caps (query, &filter);
640 gst_query_set_caps_result (query, caps);
641 gst_caps_unref (caps);
644 case GST_QUERY_ACCEPT_CAPS:
647 GstCaps *template_caps;
648 gboolean res =
FALSE;
650 gst_query_parse_accept_caps (query, &caps);
653 if (gst_caps_is_fixed (caps)) {
654 template_caps = gst_pad_get_pad_template_caps (pad);
656 res = gst_caps_can_intersect (template_caps, caps);
657 gst_caps_unref (template_caps);
660 gst_query_set_accept_caps_result (query, res);
667 return gst_pad_query_default (pad, parent, query);
675 GstObject * parent, GstBuffer * buf)
678 GstBuffer *out_buf = NULL;
679 GstFlowReturn res = GST_FLOW_OK;
680 nns_edge_data_h data_h = NULL;
681 guint i, num_tensors = 0, num_data = 0;
682 int ret = NNS_EDGE_ERROR_NONE;
688 if (self->max_request > 0 && self->requested_num > self->max_request) {
690 (
"The processing speed of the query server is too slow. Drop the input buffer.");
694 ret = nns_edge_data_create (&data_h);
695 if (ret != NNS_EDGE_ERROR_NONE) {
696 nns_loge (
"Failed to create data handle in client chain.");
701 for (i = 0; i < num_tensors; i++) {
703 if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
704 ml_loge (
"Cannot map the %uth memory in gst-buffer.", i);
705 gst_memory_unref (mem[i]);
709 nns_edge_data_add (data_h, map[i].
data, map[i].size, NULL);
712 nns_edge_get_info (self->edge_h,
"client_id", &val);
713 nns_edge_data_set_info (data_h,
"client_id", val);
716 ret = nns_edge_send (self->edge_h, data_h);
717 if (ret == NNS_EDGE_ERROR_NONE) {
718 self->requested_num++;
720 nns_loge (
"Failed to publish to server node.");
725 nns_edge_data_destroy (data_h);
727 data_h = g_async_queue_timeout_pop (self->msg_queue,
728 self->timeout * G_TIME_SPAN_MILLISECOND);
730 if (self->requested_num > 0)
731 self->requested_num--;
732 ret = nns_edge_data_get_count (data_h, &num_data);
734 if (ret == NNS_EDGE_ERROR_NONE && num_data > 0) {
738 out_buf = gst_buffer_new ();
740 for (i = 0; i < num_data; i++) {
745 nns_edge_data_get (data_h, i, &
data, &data_len);
748 new_mem = gst_memory_new_wrapped (0, new_data, data_len, 0, data_len,
751 if (self->is_tensor) {
755 gst_buffer_append_memory (out_buf, new_mem);
760 gst_buffer_copy_into (out_buf, buf, GST_BUFFER_COPY_METADATA, 0, -1);
762 res = gst_pad_push (self->srcpad, out_buf);
764 nns_loge (
"Failed to get the number of memories of the edge data.");
765 res = GST_FLOW_ERROR;
768 nns_edge_data_destroy (data_h);
771 for (i = 0; i < num_tensors; i++) {
772 gst_memory_unmap (mem[i], &map[i]);
773 gst_memory_unref (mem[i]);
776 gst_buffer_unref (buf);
789 caps = gst_pad_get_current_caps (pad);
792 caps = gst_pad_get_pad_template_caps (pad);
799 GstCaps *intersection;
801 gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
803 gst_caps_unref (caps);