28 #include <gst/video/video-info.h>
29 #include <gst/audio/audio-info.h>
30 #include <glib/gstdio.h>
31 #include <sys/types.h>
42 #define TENSOR_CAPS GST_TENSORS_CAP_MAKE ("{ static, flexible, sparse }")
46 #define SUPPORTED_VIDEO_FORMAT \
47 "{RGB, BGR, RGBx, BGRx, xRGB, xBGR, RGBA, BGRA, ARGB, ABGR, GRAY8}"
48 #define VIDEO_CAPS GST_VIDEO_CAPS_MAKE (SUPPORTED_VIDEO_FORMAT) "," \
49 "interlace-mode = (string) progressive"
53 #define SUPPORTED_AUDIO_FORMAT \
54 "{S8, U8, S16LE, S16BE, U16LE, U16BE, S32LE, S32BE, U32LE, U32BE, F32LE, F32BE, F64LE, F64BE}"
55 #define AUDIO_CAPS GST_AUDIO_CAPS_MAKE (SUPPORTED_AUDIO_FORMAT) "," \
56 "layout = (string) interleaved"
60 #define TEXT_CAPS "text/x-raw, format = (string) utf8"
64 #define OCTET_CAPS "application/octet-stream"
69 "image/png, width = (int) [ 16, 1000000 ], height = (int) [ 16, 1000000 ], framerate = (fraction) [ 0/1, MAX];" \
70 "image/jpeg, width = (int) [ 16, 65535 ], height = (int) [ 16, 65535 ], framerate = (fraction) [ 0/1, MAX], sof-marker = (int) { 0, 1, 2, 4, 9 };" \
71 "image/tiff, endianness = (int) { BIG_ENDIAN, LITTLE_ENDIAN };" \
76 GST_STATIC_PAD_TEMPLATE (
"sink", GST_PAD_SINK, GST_PAD_ALWAYS,
91 #define GST_CAT_DEFAULT gst_data_repo_sink_debug
93 GST_DEBUG_CATEGORY_INIT (gst_data_repo_sink_debug, "datareposink", 0, "datareposink element");
94 #define gst_data_repo_sink_parent_class parent_class
99 const GValue * value, GParamSpec * pspec);
101 GValue * value, GParamSpec * pspec);
105 element, GstStateChange transition);
120 GObjectClass *gobject_class;
121 GstElementClass *gstelement_class;
122 GstBaseSinkClass *gstbasesink_class;
124 gobject_class = G_OBJECT_CLASS (klass);
125 gstelement_class = GST_ELEMENT_CLASS (klass);
126 gstbasesink_class = GST_BASE_SINK_CLASS (klass);
132 g_object_class_install_property (gobject_class,
PROP_LOCATION,
133 g_param_spec_string (
"location",
"File Location",
134 "Location to write files to MLOps Data Repository. "
135 "if the files are images, use placeholder in indexes for filename"
136 "(e.g., filenmae%04d.png).",
138 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
139 GST_PARAM_MUTABLE_READY));
141 g_object_class_install_property (gobject_class,
PROP_JSON,
142 g_param_spec_string (
"json",
"JSON file path",
143 "JSON file path to write the meta information of a sample", NULL,
144 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
145 GST_PARAM_MUTABLE_READY));
147 gst_element_class_set_static_metadata (gstelement_class,
148 "NNStreamer MLOps Data Repository Sink",
150 "Write files to MLOps Data Repository",
"Samsung Electronics Co., Ltd.");
152 gst_element_class_add_static_pad_template (gstelement_class, &
sinktemplate);
154 gstelement_class->change_state =
163 if (
sizeof (off_t) < 8) {
165 (
"64-bit file support unavailable due to system limitations, sizeof (off_t) = %"
166 G_GSIZE_FORMAT
"!",
sizeof (off_t));
203 g_close (sink->
fd, NULL);
220 G_OBJECT_CLASS (parent_class)->finalize (
object);
228 const GValue * value, GParamSpec * pspec)
234 sink->
filename = g_value_dup_string (value);
235 GST_INFO_OBJECT (sink,
"filename: %s", sink->
filename);
239 GST_INFO_OBJECT (sink,
"JSON filename: %s", sink->
json_filename);
242 G_OBJECT_WARN_INVALID_PROPERTY_ID (
object, prop_id, pspec);
252 GValue * value, GParamSpec * pspec)
266 G_OBJECT_WARN_INVALID_PROPERTY_ID (
object, prop_id, pspec);
277 ssize_t write_size = 0;
279 GstFlowReturn ret = GST_FLOW_OK;
281 g_return_val_if_fail (sink != NULL, GST_FLOW_ERROR);
282 g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
283 g_return_val_if_fail (sink->
fd != 0, GST_FLOW_ERROR);
285 if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
286 GST_ERROR_OBJECT (sink,
"Failed to map the incoming buffer.");
287 return GST_FLOW_ERROR;
290 GST_OBJECT_LOCK (sink);
293 GST_LOG_OBJECT (sink,
294 "Writing %lld bytes at offset 0x%" G_GINT64_MODIFIER
"x (%lld size)",
297 write_size = write (sink->
fd, info.data, info.size);
299 if ((write_size == -1) || (write_size != (ssize_t) info.size)) {
300 GST_ERROR_OBJECT (sink,
"Error writing data to file");
301 ret = GST_FLOW_ERROR;
307 GST_OBJECT_UNLOCK (sink);
308 gst_buffer_unmap (buffer, &info);
320 guint num_tensors, i;
321 gsize total_write = 0, tensor_size;
322 ssize_t write_size = 0;
324 GstMemory *mem = NULL;
327 g_return_val_if_fail (sink != NULL, GST_FLOW_ERROR);
328 g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
329 g_return_val_if_fail (sink->
fd != 0, GST_FLOW_ERROR);
330 g_return_val_if_fail (sink->
json_object != NULL, GST_FLOW_ERROR);
335 GST_OBJECT_LOCK (sink);
338 GST_INFO_OBJECT (sink,
"num_tensors: %u", num_tensors);
340 for (i = 0; i < num_tensors; i++) {
342 if (!gst_memory_map (mem, &info, GST_MAP_READ)) {
343 GST_ERROR_OBJECT (sink,
"Failed to map memory");
348 GST_ERROR_OBJECT (sink,
349 "Invalid format of tensors, the format is static.");
352 tensor_size = info.size;
354 GST_LOG_OBJECT (sink,
"tensor[%u] size: %zd", i, tensor_size);
355 GST_LOG_OBJECT (sink,
356 "Writing %lld bytes at offset 0x%" G_GINT64_MODIFIER
"x (%lld size)",
357 (
long long) tensor_size, sink->
fd_offset + total_write,
358 (
long long) sink->
fd_offset + total_write);
360 write_size = write (sink->
fd, info.data, tensor_size);
361 if ((write_size == -1) || (write_size != (ssize_t) tensor_size)) {
362 GST_ERROR_OBJECT (sink,
"Error writing data to file");
367 total_write += (gsize) write_size;
369 gst_memory_unmap (mem, &info);
370 gst_memory_unref (mem);
383 GST_OBJECT_UNLOCK (sink);
388 gst_memory_unmap (mem, &info);
390 gst_memory_unref (mem);
391 GST_OBJECT_UNLOCK (sink);
393 return GST_FLOW_ERROR;
402 gchar *filename = NULL;
404 g_return_val_if_fail (sink != NULL, NULL);
406 g_return_val_if_fail (sink->
filename != NULL, NULL);
409 #pragma GCC diagnostic push
410 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
414 #pragma GCC diagnostic pop
427 g_autofree gchar *filename = NULL;
428 GstFlowReturn ret = GST_FLOW_OK;
429 GError *error = NULL;
432 g_return_val_if_fail (sink != NULL, GST_FLOW_ERROR);
433 g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
435 if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
436 GST_ERROR_OBJECT (sink,
"Failed to map the incoming buffer.");
437 return GST_FLOW_ERROR;
442 GST_OBJECT_LOCK (sink);
445 GST_DEBUG_OBJECT (sink,
"Writing to file \"%s\", size(%zd)", filename,
448 if (!g_file_set_contents (filename, (
char *) info.data, info.size, &error)) {
449 GST_ERROR_OBJECT (sink,
"Could not write data to file: %s",
450 error ? error->message :
"unknown error");
451 g_clear_error (&error);
452 ret = GST_FLOW_ERROR;
457 GST_OBJECT_UNLOCK (sink);
458 gst_buffer_unmap (buffer, &info);
486 return GST_FLOW_ERROR;
497 GstCaps *caps = NULL;
499 GST_OBJECT_LOCK (sink);
502 GST_INFO_OBJECT (sink,
"Got caps %" GST_PTR_FORMAT, caps);
505 caps = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
507 caps = gst_caps_ref (caps);
510 GST_DEBUG_OBJECT (sink,
"result get caps: %" GST_PTR_FORMAT, caps);
511 GST_OBJECT_UNLOCK (sink);
522 GstStructure *structure;
525 g_return_if_fail (sink != NULL);
529 structure = gst_caps_get_structure (sink->
fixed_caps, 0);
544 GST_INFO_OBJECT (sink,
"set caps %" GST_PTR_FORMAT, caps);
556 GST_DEBUG_OBJECT (sink,
"data type: %d", sink->
data_type);
568 switch (GST_QUERY_TYPE (query)) {
569 case GST_QUERY_SEEKING:{
573 gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
574 gst_query_set_seeking (query, fmt,
FALSE, 0, -1);
579 ret = GST_BASE_SINK_CLASS (parent_class)->query (bsink, query);
592 gchar *filename = NULL;
593 int flags = O_CREAT | O_WRONLY;
595 g_return_val_if_fail (sink != NULL,
FALSE);
606 filename = g_strdup (sink->
filename);
608 GST_INFO_OBJECT (sink,
"opening file %s", filename);
611 sink->
fd = g_open (filename, flags, 0644);
622 GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND,
623 ((
"No file name specified for writing.")), (NULL));
630 GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND, (NULL),
631 (
"No such file \"%s\"", sink->
filename));
634 GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ,
635 ((
"Could not open file \"%s\" for reading."), sink->
filename),
658 g_close (sink->
fd, NULL);
671 JsonGenerator *generator;
674 g_return_val_if_fail (
object != NULL,
FALSE);
675 g_return_val_if_fail (filename != NULL,
FALSE);
677 root = json_node_init_object (json_node_alloc (),
object);
678 generator = json_generator_new ();
679 json_generator_set_root (generator, root);
680 json_generator_set_pretty (generator,
TRUE);
681 ret = json_generator_to_file (generator, filename, NULL);
683 GST_ERROR (
"Failed to write JSON to file %s", filename);
686 g_object_unref (generator);
687 json_node_free (root);
698 gchar *caps_str = NULL;
701 g_return_val_if_fail (sink != NULL,
FALSE);
710 caps_str = gst_caps_to_string (sink->
fixed_caps);
711 GST_DEBUG_OBJECT (sink,
"caps string: %s", caps_str);
713 json_object_set_string_member (sink->
json_object,
"gst_caps", caps_str);
715 json_object_set_int_member (sink->
json_object,
"total_samples",
719 json_object_set_array_member (sink->
json_object,
"sample_offset",
721 json_object_set_array_member (sink->
json_object,
"tensor_size",
723 json_object_set_array_member (sink->
json_object,
"tensor_count",
730 json_object_set_int_member (sink->
json_object,
"sample_size",
735 GST_ERROR_OBJECT (sink,
"Failed to write json meta file: %s",
749 static GstStateChangeReturn
751 GstStateChange transition)
753 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
756 switch (transition) {
757 case GST_STATE_CHANGE_NULL_TO_READY:
758 GST_INFO_OBJECT (sink,
"NULL_TO_READY");
760 GST_ERROR_OBJECT (sink,
"Set filenmae and json");
761 goto state_change_failed;
765 case GST_STATE_CHANGE_READY_TO_PAUSED:
766 GST_INFO_OBJECT (sink,
"READY_TO_PAUSED");
769 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
770 GST_INFO_OBJECT (sink,
"PAUSED_TO_PLAYING");
772 goto state_change_failed;
779 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
781 switch (transition) {
782 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
783 GST_INFO_OBJECT (sink,
"PLAYING_TO_PAUSED");
786 case GST_STATE_CHANGE_PAUSED_TO_READY:
787 GST_INFO_OBJECT (sink,
"PAUSED_TO_READY");
790 case GST_STATE_CHANGE_READY_TO_NULL:
791 GST_INFO_OBJECT (sink,
"READY_TO_NULL");
793 goto state_change_failed;
802 GST_ERROR_OBJECT (sink,
"state change failed");
804 return GST_STATE_CHANGE_FAILURE;