24 #define GST_CAT_DEFAULT gst_tensor_query_serversrc_debug
26 #define DEFAULT_PORT_SRC 3000
27 #define DEFAULT_IS_LIVE TRUE
28 #define DEFAULT_MQTT_HOST "127.0.0.1"
29 #define DEFAULT_MQTT_PORT 1883
30 #define DEFAULT_DATA_POP_TIMEOUT 100000U
35 static GstStaticPadTemplate
srctemplate = GST_STATIC_PAD_TEMPLATE (
"src",
57 #define gst_tensor_query_serversrc_parent_class parent_class
62 * element, GstStateChange transition);
64 guint prop_id,
const GValue * value, GParamSpec * pspec);
66 guint prop_id, GValue * value, GParamSpec * pspec);
79 GObjectClass *gobject_class;
80 GstElementClass *gstelement_class;
81 GstBaseSrcClass *gstbasesrc_class;
82 GstPushSrcClass *gstpushsrc_class;
84 gstpushsrc_class = (GstPushSrcClass *) klass;
85 gstbasesrc_class = (GstBaseSrcClass *) gstpushsrc_class;
86 gstelement_class = (GstElementClass *) gstbasesrc_class;
87 gobject_class = (GObjectClass *) gstelement_class;
95 g_object_class_install_property (gobject_class,
PROP_HOST,
96 g_param_spec_string (
"host",
"Host",
"The hostname to listen as",
97 DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
98 g_object_class_install_property (gobject_class,
PROP_PORT,
99 g_param_spec_uint (
"port",
"Port",
100 "The port to listen to (0=random available port)", 0,
103 g_param_spec_string (
"dest-host",
"Destination Host",
105 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
107 g_param_spec_uint (
"dest-port",
"Destination Port",
108 "The destination port to connect to (0=random available port)", 0,
110 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
112 g_param_spec_enum (
"connect-type",
"Connect Type",
"The connection type.",
114 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
115 g_object_class_install_property (gobject_class,
PROP_TIMEOUT,
116 g_param_spec_uint (
"timeout",
"Timeout",
117 "The timeout as seconds to maintain connection", 0, 3600,
119 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
120 g_object_class_install_property (gobject_class,
PROP_TOPIC,
121 g_param_spec_string (
"topic",
"Topic",
122 "The main topic of the host and option if necessary. "
123 "(topic)/(optional topic for main topic).",
"",
124 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
125 g_object_class_install_property (gobject_class,
PROP_ID,
126 g_param_spec_uint (
"id",
"ID",
"ID for distinguishing query servers.", 0,
128 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
129 g_object_class_install_property (gobject_class,
PROP_IS_LIVE,
130 g_param_spec_boolean (
"is-live",
"Is Live",
131 "Synchronize the incoming buffers' timestamp with the current running time",
134 gst_element_class_add_pad_template (gstelement_class,
137 gst_element_class_set_static_metadata (gstelement_class,
138 "TensorQueryServerSrc",
"Source/Tensor/Query",
139 "Receive tensor data as a server over the network",
140 "Samsung Electronics Co., Ltd.");
142 GST_DEBUG_CATEGORY_INIT (gst_tensor_query_serversrc_debug,
143 "tensor_query_serversrc", 0,
"Tensor Query Server Source");
164 gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME);
166 gst_base_src_set_do_timestamp (GST_BASE_SRC (src),
TRUE);
178 nns_edge_data_h data_h;
187 while ((data_h = g_async_queue_try_pop (src->
msg_queue))) {
188 nns_edge_data_destroy (data_h);
192 G_OBJECT_CLASS (parent_class)->finalize (
object);
201 nns_edge_event_e event_type;
202 int ret = NNS_EDGE_ERROR_NONE;
205 ret = nns_edge_event_get_type (event_h, &event_type);
206 if (NNS_EDGE_ERROR_NONE != ret) {
207 nns_loge (
"Failed to get event type!");
211 switch (event_type) {
212 case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
214 nns_edge_data_h
data;
216 ret = nns_edge_event_parse_new_data (event_h, &
data);
217 if (NNS_EDGE_ERROR_NONE != ret) {
218 nns_loge (
"Failed to parse new data received from new data event");
237 gboolean ret =
FALSE;
243 nns_loge (
"Failed to get server information from query server.");
253 nns_edge_connect_type_e connect_type)
264 edge_info.
pdata = src;
274 static GstStateChangeReturn
276 GstStateChange transition)
279 GstBaseSrc *bsrc = GST_BASE_SRC (element);
280 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
283 switch (transition) {
284 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
286 nns_loge (
"Failed to change state from PAUSED to PLAYING.");
287 return GST_STATE_CHANGE_FAILURE;
290 caps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (bsrc), NULL);
292 gst_caps_unref (caps);
296 case GST_STATE_CHANGE_READY_TO_PAUSED:
298 nns_loge (
"Failed to change state from READY to PAUSED.");
299 return GST_STATE_CHANGE_FAILURE;
306 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
307 if (ret == GST_STATE_CHANGE_FAILURE) {
308 nns_loge (
"Failed to change state");
312 switch (transition) {
313 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
317 case GST_STATE_CHANGE_PAUSED_TO_READY:
332 const GValue * value, GParamSpec * pspec)
338 if (!g_value_get_string (value)) {
339 nns_logw (
"host property cannot be NULL");
343 serversrc->
host = g_value_dup_string (value);
346 serversrc->
port = g_value_get_uint (value);
349 if (!g_value_get_string (value)) {
350 nns_logw (
"host property cannot be NULL");
354 serversrc->
dest_host = g_value_dup_string (value);
357 serversrc->
dest_port = g_value_get_uint (value);
363 serversrc->
timeout = g_value_get_uint (value);
366 if (!g_value_get_string (value)) {
367 nns_logw (
"topic property cannot be NULL. Query-hybrid is disabled.");
371 serversrc->
topic = g_value_dup_string (value);
374 serversrc->
src_id = g_value_get_uint (value);
377 gst_base_src_set_live (GST_BASE_SRC (serversrc),
378 g_value_get_boolean (value));
381 G_OBJECT_WARN_INVALID_PROPERTY_ID (
object, prop_id, pspec);
391 GValue * value, GParamSpec * pspec)
400 g_value_set_uint (value, serversrc->
port);
406 g_value_set_uint (value, serversrc->
dest_port);
412 g_value_set_uint (value, serversrc->
timeout);
418 g_value_set_uint (value, serversrc->
src_id);
421 g_value_set_boolean (value,
422 gst_base_src_is_live (GST_BASE_SRC (serversrc)));
425 G_OBJECT_WARN_INVALID_PROPERTY_ID (
object, prop_id, pspec);
436 nns_edge_data_h data_h = NULL;
437 GstBuffer *buffer = NULL;
442 while (src->
playing && !data_h) {
443 data_h = g_async_queue_timeout_pop (src->
msg_queue,
448 nns_loge (
"Failed to get message from the server message queue.");
452 ret = nns_edge_data_get_count (data_h, &num_data);
453 if (ret != NNS_EDGE_ERROR_NONE || num_data == 0) {
454 nns_loge (
"Failed to get the number of memories of the edge data.");
458 buffer = gst_buffer_new ();
459 for (i = 0; i < num_data; i++) {
461 nns_size_t data_len = 0;
464 nns_edge_data_get (data_h, i, &
data, &data_len);
467 gst_buffer_append_memory (buffer,
468 gst_memory_new_wrapped (0, new_data, data_len, 0, data_len, new_data,
476 ret = nns_edge_data_get_info (data_h,
"client_id", &val);
477 if (NNS_EDGE_ERROR_NONE != ret) {
478 gst_buffer_unref (buffer);
481 meta_query->
client_id = g_ascii_strtoll (val, NULL, 10);
487 nns_edge_data_destroy (data_h);
498 GstBaseSrc *bsrc = GST_BASE_SRC (psrc);
499 GstStateChangeReturn sret;
500 GstState state = GST_STATE_NULL;
503 GstCaps *caps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (bsrc), NULL);
504 if (gst_caps_is_fixed (caps)) {
505 gst_base_src_set_caps (bsrc, caps);
510 gst_caps_unref (caps);
515 if (*outbuf == NULL) {
516 sret = gst_element_get_state (GST_ELEMENT (psrc), &state, NULL, 0);
517 if (sret != GST_STATE_CHANGE_SUCCESS || state != GST_STATE_PLAYING) {
518 nns_logw (
"Failed to get buffer for query server, not in PLAYING state.");
519 return GST_FLOW_FLUSHING;
522 nns_loge (
"Failed to get buffer to push to the tensor query serversrc.");
523 return GST_FLOW_ERROR;
536 gchar *caps_str, *new_caps_str;
538 caps_str = gst_caps_to_string (caps);
540 new_caps_str = g_strdup_printf (
"@query_server_src_caps@%s", caps_str);