Doxygen Book
tensor_query_server.c
Go to the documentation of this file.
1 /* SPDX-License-Identifier: LGPL-2.1-only */
13 #ifdef HAVE_CONFIG_H
14 #include "config.h"
15 #endif
16 
17 #include "tensor_query_server.h"
18 #include <tensor_typedef.h>
19 #include <tensor_common.h>
20 
24 G_LOCK_DEFINE_STATIC (query_server_table);
25 
29 static GHashTable *_qs_table = NULL;
30 
31 static void init_queryserver (void) __attribute__((constructor));
32 static void fini_queryserver (void) __attribute__((destructor));
33 
37 static void
38 _release_server_data (gpointer data)
39 {
41 
42  if (!_data)
43  return;
44 
45  g_mutex_lock (&_data->lock);
46  if (_data->edge_h) {
47  nns_edge_release_handle (_data->edge_h);
48  _data->edge_h = NULL;
49  }
50  g_mutex_unlock (&_data->lock);
51 
52  g_mutex_clear (&_data->lock);
53  g_cond_clear (&_data->cond);
54 
55  g_free (_data);
56 }
57 
61 static GstTensorQueryServer *
63 {
65 
66  G_LOCK (query_server_table);
67  data = g_hash_table_lookup (_qs_table, GUINT_TO_POINTER (id));
68  G_UNLOCK (query_server_table);
69 
70  return data;
71 }
72 
76 gboolean
78 {
80  gboolean ret;
81 
83 
84  if (NULL != data) {
85  return TRUE;
86  }
87 
88  data = g_try_new0 (GstTensorQueryServer, 1);
89  if (NULL == data) {
90  nns_loge ("Failed to allocate memory for tensor query server data.");
91  return FALSE;
92  }
93 
94  g_mutex_init (&data->lock);
95  g_cond_init (&data->cond);
96  data->id = id;
97  data->configured = FALSE;
98 
99  G_LOCK (query_server_table);
100  ret = g_hash_table_insert (_qs_table, GUINT_TO_POINTER (id), data);
101  if (!ret) {
102  _release_server_data (data);
103  nns_loge ("Failed to add tensor query server data into the table.");
104  }
105  G_UNLOCK (query_server_table);
106 
107  return ret;
108 }
109 
113 gboolean
115  nns_edge_connect_type_e connect_type, GstTensorQueryEdgeInfo * edge_info)
116 {
118  gchar *port_str, *id_str;
119  gboolean prepared = FALSE;
120  gint ret;
121 
123  if (NULL == data) {
124  return FALSE;
125  }
126 
127  g_mutex_lock (&data->lock);
128  if (data->edge_h == NULL) {
129  id_str = g_strdup_printf ("%u", id);
130 
131  ret = nns_edge_create_handle (id_str, connect_type,
132  NNS_EDGE_NODE_TYPE_QUERY_SERVER, &data->edge_h);
133  g_free (id_str);
134 
135  if (NNS_EDGE_ERROR_NONE != ret) {
136  GST_ERROR ("Failed to get nnstreamer edge handle.");
137  goto done;
138  }
139  }
140 
141  if (edge_info) {
142  if (edge_info->host) {
143  nns_edge_set_info (data->edge_h, "HOST", edge_info->host);
144  }
145  if (edge_info->port > 0) {
146  port_str = g_strdup_printf ("%u", edge_info->port);
147  nns_edge_set_info (data->edge_h, "PORT", port_str);
148  g_free (port_str);
149  }
150  if (edge_info->dest_host) {
151  nns_edge_set_info (data->edge_h, "DEST_HOST", edge_info->dest_host);
152  }
153  if (edge_info->dest_port > 0) {
154  port_str = g_strdup_printf ("%u", edge_info->dest_port);
155  nns_edge_set_info (data->edge_h, "DEST_PORT", port_str);
156  g_free (port_str);
157  }
158  if (edge_info->topic) {
159  nns_edge_set_info (data->edge_h, "TOPIC", edge_info->topic);
160  }
161 
162  nns_edge_set_event_callback (data->edge_h, edge_info->cb, edge_info->pdata);
163 
164  ret = nns_edge_start (data->edge_h);
165  if (NNS_EDGE_ERROR_NONE != ret) {
166  nns_loge
167  ("Failed to start NNStreamer-edge. Please check server IP and port.");
168  goto done;
169  }
170  }
171 
172  prepared = TRUE;
173 
174 done:
175  g_mutex_unlock (&data->lock);
176  return prepared;
177 }
178 
182 gboolean
183 gst_tensor_query_server_send_buffer (const guint id, GstBuffer * buffer)
184 {
186  GstMetaQuery *meta_query;
187  nns_edge_data_h data_h;
188  guint i, num_tensors = 0;
189  gint ret = NNS_EDGE_ERROR_NONE;
190  GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
191  GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
192  gchar *val;
193  gboolean sent = FALSE;
194 
196 
197  if (NULL == data) {
198  nns_loge ("Failed to send buffer, server handle is null.");
199  return FALSE;
200  }
201 
202  meta_query = gst_buffer_get_meta_query (buffer);
203  if (!meta_query) {
204  nns_loge ("Failed to send buffer, cannot get tensor query meta.");
205  return FALSE;
206  }
207 
208  ret = nns_edge_data_create (&data_h);
209  if (ret != NNS_EDGE_ERROR_NONE) {
210  nns_loge ("Failed to create edge data handle in query server.");
211  return FALSE;
212  }
213 
214  num_tensors = gst_tensor_buffer_get_count (buffer);
215  for (i = 0; i < num_tensors; i++) {
216  mem[i] = gst_tensor_buffer_get_nth_memory (buffer, i);
217 
218  if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
219  ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
220  gst_memory_unref (mem[i]);
221  num_tensors = i;
222  goto done;
223  }
224 
225  nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
226  }
227 
228  val = g_strdup_printf ("%lld", (long long) meta_query->client_id);
229  nns_edge_data_set_info (data_h, "client_id", val);
230  g_free (val);
231 
232  g_mutex_lock (&data->lock);
233  ret = nns_edge_send (data->edge_h, data_h);
234  g_mutex_unlock (&data->lock);
235 
236  if (ret != NNS_EDGE_ERROR_NONE) {
237  nns_loge ("Failed to send edge data handle in query server.");
238  goto done;
239  }
240 
241  sent = TRUE;
242 
243 done:
244  for (i = 0; i < num_tensors; i++) {
245  gst_memory_unmap (mem[i], &map[i]);
246  gst_memory_unref (mem[i]);
247  }
248 
249  nns_edge_data_destroy (data_h);
250 
251  return sent;
252 }
253 
257 void
259 {
261 
263 
264  if (NULL == data) {
265  return;
266  }
267 
268  g_mutex_lock (&data->lock);
269  if (data->edge_h) {
270  nns_edge_release_handle (data->edge_h);
271  data->edge_h = NULL;
272  }
273  g_mutex_unlock (&data->lock);
274 }
275 
279 void
281 {
282  G_LOCK (query_server_table);
283  if (g_hash_table_lookup (_qs_table, GUINT_TO_POINTER (id)))
284  g_hash_table_remove (_qs_table, GUINT_TO_POINTER (id));
285  G_UNLOCK (query_server_table);
286 }
287 
291 gboolean
293 {
294  gint64 end_time;
296 
298 
299  if (NULL == data) {
300  return FALSE;
301  }
302 
303  end_time = g_get_monotonic_time () +
304  DEFAULT_QUERY_INFO_TIMEOUT * G_TIME_SPAN_SECOND;
305  g_mutex_lock (&data->lock);
306  while (!data->configured) {
307  if (!g_cond_wait_until (&data->cond, &data->lock, end_time)) {
308  g_mutex_unlock (&data->lock);
309  ml_loge ("Failed to get server sink info.");
310  return FALSE;
311  }
312  }
313  g_mutex_unlock (&data->lock);
314 
315  return TRUE;
316 }
317 
321 void
323 {
325 
327 
328  if (NULL == data) {
329  return;
330  }
331 
332  g_mutex_lock (&data->lock);
333  data->configured = TRUE;
334  g_cond_broadcast (&data->cond);
335  g_mutex_unlock (&data->lock);
336 }
337 
341 void
342 gst_tensor_query_server_set_caps (const guint id, const gchar * caps_str)
343 {
345  gchar *prev_caps_str, *new_caps_str;
346 
348 
349  if (NULL == data) {
350  return;
351  }
352 
353  g_mutex_lock (&data->lock);
354 
355  prev_caps_str = new_caps_str = NULL;
356  nns_edge_get_info (data->edge_h, "CAPS", &prev_caps_str);
357  if (!prev_caps_str)
358  prev_caps_str = g_strdup ("");
359  new_caps_str = g_strdup_printf ("%s%s", prev_caps_str, caps_str);
360  nns_edge_set_info (data->edge_h, "CAPS", new_caps_str);
361 
362  g_free (prev_caps_str);
363  g_free (new_caps_str);
364 
365  g_mutex_unlock (&data->lock);
366 }
367 
371 static void
372 init_queryserver (void)
373 {
374  G_LOCK (query_server_table);
375  g_assert (NULL == _qs_table);
376  _qs_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
377  _release_server_data);
378  G_UNLOCK (query_server_table);
379 }
380 
384 static void
386 {
387  G_LOCK (query_server_table);
388  g_assert (_qs_table);
389  g_hash_table_destroy (_qs_table);
390  _qs_table = NULL;
391  G_UNLOCK (query_server_table);
392 }
gst_tensor_query_server_add_data
gboolean gst_tensor_query_server_add_data(const guint id)
Add nnstreamer edge server handle into hash table.
Definition: tensor_query_server.c:77
GstTensorQueryEdgeInfo::host
gchar * host
Definition: tensor_query_server.h:30
GstTensorQueryServer::cond
GCond cond
Definition: tensor_query_server.h:49
g_assert
g_assert(sizeof(DTYPE_UNSIGNED)==sizeof(DTYPE_SIGNED))
data
svtc_1 data
Definition: gsttensor_if.c:826
NNS_TENSOR_SIZE_LIMIT
#define NNS_TENSOR_SIZE_LIMIT
The number of tensors NNStreamer supports is 256. The max memories of gst-buffer is 16 (See NNS_TENSO...
Definition: tensor_typedef.h:42
init_queryserver
static void init_queryserver(void)
Internal function to release query server data.
Definition: tensor_query_server.c:31
GstTensorQueryEdgeInfo::cb
nns_edge_event_cb cb
Definition: tensor_query_server.h:37
DEFAULT_QUERY_INFO_TIMEOUT
#define DEFAULT_QUERY_INFO_TIMEOUT
Definition: tensor_query_server.h:23
FALSE
return FALSE
Definition: gsttensor_transform.c:596
GstTensorQueryEdgeInfo
Internal data structure for nns-edge info to prepare edge connection.
Definition: tensor_query_server.h:28
G_LOCK_DEFINE_STATIC
G_LOCK_DEFINE_STATIC(query_server_table)
mutex for tensor-query server table.
GstMetaQuery::client_id
query_client_id_t client_id
Definition: tensor_meta.h:30
GstTensorQueryServer::lock
GMutex lock
Definition: tensor_query_server.h:48
GstMetaQuery
GstMetaQuery meta structure.
Definition: tensor_meta.h:26
g_free
g_free(self->option[(opnum) - 1])
opnum: \
tensor_query_server.h
GStreamer plugin to handle meta_query for server elements.
__attribute__
__attribute__((__format__(__printf__, 1, 2)))
overwrites the error message buffer with the new message.
Definition: nnstreamer_log.c:97
fini_queryserver
static void fini_queryserver(void)
Destruct the query server.
Definition: tensor_query_server.c:385
GstTensorQueryEdgeInfo::dest_host
gchar * dest_host
Definition: tensor_query_server.h:32
gst_tensor_query_server_set_configured
void gst_tensor_query_server_set_configured(const guint id)
set query server sink configured.
Definition: tensor_query_server.c:322
gst_buffer_get_meta_query
#define gst_buffer_get_meta_query(b)
Definition: tensor_meta.h:44
ml_loge
#define ml_loge
Definition: nnstreamer_log.h:78
TRUE
return TRUE
Definition: gsttensor_if.c:879
nns_loge
#define nns_loge
Definition: nnstreamer_log.h:142
tensor_typedef.h
Common header file for NNStreamer, the GStreamer plugin for neural networks.
gst_tensor_query_server_remove_data
void gst_tensor_query_server_remove_data(const guint id)
Remove GstTensorQueryServer.
Definition: tensor_query_server.c:280
GstTensorQueryEdgeInfo::port
guint16 port
Definition: tensor_query_server.h:31
tensor_common.h
Common header file for NNStreamer, the GStreamer plugin for neural networks.
_qs_table
static GHashTable * _qs_table
Table for query server data.
Definition: tensor_query_server.c:29
gst_tensor_query_server_wait_sink
gboolean gst_tensor_query_server_wait_sink(const guint id)
Wait until the sink is configured and get server info handle.
Definition: tensor_query_server.c:292
GstTensorQueryEdgeInfo::dest_port
guint16 dest_port
Definition: tensor_query_server.h:33
gst_tensor_query_server_release_edge_handle
void gst_tensor_query_server_release_edge_handle(const guint id)
Release nnstreamer edge handle of query server.
Definition: tensor_query_server.c:258
gst_tensor_buffer_get_nth_memory
GstMemory * gst_tensor_buffer_get_nth_memory(GstBuffer *buffer, const guint index)
Get the nth GstMemory from given buffer.
Definition: nnstreamer_plugin_api_impl.c:1608
gst_tensor_buffer_get_count
guint gst_tensor_buffer_get_count(GstBuffer *buffer)
Get the number of tensors in the buffer.
Definition: nnstreamer_plugin_api_impl.c:1835
GstTensorQueryServer::edge_h
nns_edge_h edge_h
Definition: tensor_query_server.h:51
GstTensorQueryEdgeInfo::pdata
void * pdata
Definition: tensor_query_server.h:38
gst_tensor_query_server_prepare
gboolean gst_tensor_query_server_prepare(const guint id, nns_edge_connect_type_e connect_type, GstTensorQueryEdgeInfo *edge_info)
Prepare edge connection and its handle.
Definition: tensor_query_server.c:114
gst_tensor_query_server_get_handle
static GstTensorQueryServer * gst_tensor_query_server_get_handle(const guint id)
Get nnstreamer edge server handle.
Definition: tensor_query_server.c:62
gst_tensor_query_server_send_buffer
gboolean gst_tensor_query_server_send_buffer(const guint id, GstBuffer *buffer)
Send buffer to connected edge device.
Definition: tensor_query_server.c:183
GstTensorQueryServer
GstTensorQueryServer internal info data structure.
Definition: tensor_query_server.h:44
GstTensorQueryEdgeInfo::topic
gchar * topic
Definition: tensor_query_server.h:34
gst_tensor_query_server_set_caps
void gst_tensor_query_server_set_caps(const guint id, const gchar *caps_str)
set query server caps.
Definition: tensor_query_server.c:342
GST_ERROR
GST_ERROR("Failed to register nnstreamer plugin : tensor_" # name)
type)) { \