20 #define GST_CAT_DEFAULT gst_edgesrc_debug
25 static GstStaticPadTemplate
srctemplate = GST_STATIC_PAD_TEMPLATE (
"src",
26 GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
45 #define gst_edgesrc_parent_class parent_class
49 const GValue * value, GParamSpec * pspec);
51 GValue * value, GParamSpec * pspec);
57 guint size, GstBuffer ** out_buf);
61 const gchar * dest_host);
65 const guint16 dest_port);
69 const nns_edge_connect_type_e connect_type);
71 GstStateChange transition);
79 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
80 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
81 GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
87 g_object_class_install_property (gobject_class,
PROP_HOST,
88 g_param_spec_string (
"host",
"Host",
89 "A self host address (DEPRECATED, has no effect).",
DEFAULT_HOST,
90 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
91 g_object_class_install_property (gobject_class,
PROP_PORT,
92 g_param_spec_uint (
"port",
"Port",
93 "A self port number (DEPRECATED, has no effect).",
95 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
97 g_param_spec_string (
"dest-host",
"Destination Host",
98 "A host address of edgesink to receive the packets from edgesink",
99 DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
101 g_param_spec_uint (
"dest-port",
"Destination Port",
102 "A port of edgesink to receive the packets from edgesink", 0, 65535,
103 DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
105 g_param_spec_enum (
"connect-type",
"Connect Type",
106 "The connections type between edgesink and edgesrc.",
108 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
109 g_object_class_install_property (gobject_class,
PROP_TOPIC,
110 g_param_spec_string (
"topic",
"Topic",
111 "The main topic of the host and option if necessary. "
112 "(topic)/(optional topic for main topic).",
"",
113 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
115 g_param_spec_string (
"custom-lib",
"Custom connection lib path",
116 "User defined custom connection lib path.",
117 "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
119 gst_element_class_add_pad_template (gstelement_class,
122 gst_element_class_set_static_metadata (gstelement_class,
123 "EdgeSrc",
"Source/Edge",
124 "Subscribe and push incoming streams",
"Samsung Electronics Co., Ltd.");
141 GstBaseSrc *basesrc = GST_BASE_SRC (
self);
143 gst_base_src_set_format (basesrc, GST_FORMAT_TIME);
144 gst_base_src_set_async (basesrc,
FALSE);
149 self->msg_queue = g_async_queue_new ();
151 self->playing =
FALSE;
152 self->custom_lib = NULL;
166 nns_logw (
"host property is deprecated");
169 nns_logw (
"port property is deprecated");
181 if (!g_value_get_string (value)) {
182 nns_logw (
"topic property cannot be NULL. Query-hybrid is disabled.");
186 self->topic = g_value_dup_string (value);
189 g_free (self->custom_lib);
190 self->custom_lib = g_value_dup_string (value);
193 G_OBJECT_WARN_INVALID_PROPERTY_ID (
object, prop_id, pspec);
209 nns_logw (
"host property is deprecated");
212 nns_logw (
"port property is deprecated");
230 G_OBJECT_WARN_INVALID_PROPERTY_ID (
object, prop_id, pspec);
242 nns_edge_data_h data_h;
244 self->playing =
FALSE;
246 self->dest_host = NULL;
251 g_free (self->custom_lib);
252 self->custom_lib = NULL;
254 if (self->msg_queue) {
255 while ((data_h = g_async_queue_try_pop (self->msg_queue))) {
256 nns_edge_data_destroy (data_h);
258 g_async_queue_unref (self->msg_queue);
259 self->msg_queue = NULL;
263 nns_edge_release_handle (self->edge_h);
266 G_OBJECT_CLASS (parent_class)->finalize (
object);
272 static GstStateChangeReturn
275 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
278 switch (transition) {
279 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
280 GST_INFO_OBJECT (
self,
"State changed from PAUSED to PLAYING.");
281 self->playing =
TRUE;
287 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
289 switch (transition) {
290 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
291 GST_INFO_OBJECT (
self,
"State changed from PLAYING to PAUSED.");
292 self->playing =
FALSE;
307 nns_edge_event_e event_type;
308 int ret = NNS_EDGE_ERROR_NONE;
312 if (0 != nns_edge_event_get_type (event_h, &event_type)) {
313 nns_loge (
"Failed to get event type!");
314 return NNS_EDGE_ERROR_UNKNOWN;
317 switch (event_type) {
318 case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
320 nns_edge_data_h
data;
322 nns_edge_event_parse_new_data (event_h, &
data);
323 g_async_queue_push (self->msg_queue,
data);
326 case NNS_EDGE_EVENT_CONNECTION_CLOSED:
328 self->playing =
FALSE;
349 if (NNS_EDGE_CONNECT_TYPE_CUSTOM != self->connect_type) {
350 ret = nns_edge_create_handle (NULL, self->connect_type,
351 NNS_EDGE_NODE_TYPE_SUB, &self->edge_h);
353 if (!self->custom_lib) {
354 nns_loge (
"Failed to create custom handle. custom-lib path is not set.");
357 ret = nns_edge_custom_create_handle (NULL, self->custom_lib,
358 NNS_EDGE_NODE_TYPE_SUB, &self->edge_h);
361 if (NNS_EDGE_ERROR_NONE != ret) {
362 nns_loge (
"Failed to get nnstreamer edge handle.");
365 nns_edge_release_handle (self->edge_h);
373 nns_edge_set_info (self->edge_h,
"DEST_HOST", self->dest_host);
374 if (self->dest_port > 0) {
375 port = g_strdup_printf (
"%u", self->dest_port);
376 nns_edge_set_info (self->edge_h,
"DEST_PORT", port);
380 nns_edge_set_info (self->edge_h,
"TOPIC", self->topic);
384 if (0 != nns_edge_start (self->edge_h)) {
386 (
"Failed to start NNStreamer-edge. Please check server IP and port");
390 if (0 != nns_edge_connect (self->edge_h, self->dest_host, self->dest_port)) {
391 nns_loge (
"Failed to connect to edge server!");
394 self->playing =
TRUE;
408 self->playing =
FALSE;
409 ret = nns_edge_stop (self->edge_h);
411 if (NNS_EDGE_ERROR_NONE != ret) {
412 nns_loge (
"Failed to stop edgesrc. error code(%d)", ret);
424 GstBuffer ** out_buf)
427 nns_edge_data_h data_h = NULL;
428 GstBuffer *buffer = NULL;
430 GstCaps *caps = NULL;
431 GstStructure *structure;
434 gboolean is_tensor =
FALSE;
435 guint i, num_data, max_mems;
442 while (self->playing && !data_h) {
443 data_h = g_async_queue_timeout_pop (self->msg_queue, G_USEC_PER_SEC);
447 nns_loge (
"Failed to get message from the edgesrc message queue.");
448 return GST_FLOW_ERROR;
451 ret = nns_edge_data_get_count (data_h, &num_data);
452 if (ret != NNS_EDGE_ERROR_NONE || num_data == 0) {
453 nns_loge (
"Failed to get the number of memories of the edge data.");
458 caps = gst_pad_get_current_caps (GST_BASE_SRC_PAD (basesrc));
460 structure = gst_caps_get_structure (caps, 0);
466 gst_caps_unref (caps);
470 if (num_data > max_mems) {
472 (
"Cannot create new buffer. The edge-data has %u memories, but allowed memories is %u.",
477 buffer = gst_buffer_new ();
478 for (i = 0; i < num_data; i++) {
480 nns_size_t data_len = 0;
483 nns_edge_data_get (data_h, i, &
data, &data_len);
485 mem = gst_memory_new_wrapped (0, new_data, data_len, 0, data_len,
492 gst_buffer_append_memory (buffer, mem);
498 nns_edge_data_destroy (data_h);
502 if (buffer == NULL) {
503 nns_loge (
"Failed to get buffer to push to the edgesrc.");
504 return GST_FLOW_ERROR;
518 return self->dest_host;
528 self->dest_host = g_strdup (dest_host);
537 return self->dest_port;
546 self->dest_port = dest_port;
552 static nns_edge_connect_type_e
555 return self->connect_type;
563 const nns_edge_connect_type_e connect_type)
565 self->connect_type = connect_type;