Line data Source code
1 : /* SPDX-License-Identifier: LGPL-2.1-only */
2 : /**
3 : * Copyright (C) 2023 Samsung Electronics Co., Ltd.
4 : *
5 : * @file gstdatareposink.c
6 : * @date 30 March 2023
7 : * @brief GStreamer plugin that writes data from buffers to files in in MLOps Data repository
8 : * @see https://github.com/nnstreamer/nnstreamer
9 : * @author Hyunil Park <hyunil46.park@samsung.com>
10 : * @bug No known bugs except for NYI items
11 : *
12 : * ## Example launch line
13 : * |[
14 : * gst-launch-1.0 videotestsrc ! datareposink location=filename json=video.json
15 : * gst-launch-1.0 videotestsrc ! pngenc ! datareposink location=image_%02d.png json=video.json
16 : * gst-launch-1.0 audiotestsrc samplesperbuffer=44100 ! audio/x-raw, format=S16LE, layout=interleaved, rate=44100, channels=1 ! \
17 : * datareposink location=filename json=audio.json
18 : * gst-launch-1.0 datareposrc location=file.dat json=file.json tensors-sequence=2,3 start-sample-index=0 stop-sample-index=199 epochs=1 ! \
19 : * other/tensors, format=static, num_tensors=2, framerate=0/1, dimensions=1:1:784:1.1:1:10:1, types=float32.float32 ! \
20 : * datareposink location=hyunil.dat json=file.json
21 : * ]|
22 : */
23 :
24 : #ifdef HAVE_CONFIG_H
25 : #include "config.h"
26 : #endif
27 : #include <gst/gst.h>
28 : #include <gst/video/video-info.h>
29 : #include <gst/audio/audio-info.h>
30 : #include <glib/gstdio.h>
31 : #include <sys/types.h>
32 : #include <fcntl.h>
33 : #include <unistd.h>
34 : #include <nnstreamer_plugin_api.h>
35 : #include <tensor_common.h>
36 : #include <nnstreamer_util.h>
37 : #include "gstdatareposink.h"
38 :
39 : /**
40 : * @brief Tensors caps
41 : */
42 : #define TENSOR_CAPS GST_TENSORS_CAP_MAKE ("{ static, flexible, sparse }")
43 : /**
44 : * @brief Video caps
45 : */
46 : #define SUPPORTED_VIDEO_FORMAT \
47 : "{RGB, BGR, RGBx, BGRx, xRGB, xBGR, RGBA, BGRA, ARGB, ABGR, GRAY8}"
48 : #define VIDEO_CAPS GST_VIDEO_CAPS_MAKE (SUPPORTED_VIDEO_FORMAT) "," \
49 : "interlace-mode = (string) progressive"
50 : /**
51 : * @brief Audio caps
52 : */
53 : #define SUPPORTED_AUDIO_FORMAT \
54 : "{S8, U8, S16LE, S16BE, U16LE, U16BE, S32LE, S32BE, U32LE, U32BE, F32LE, F32BE, F64LE, F64BE}"
55 : #define AUDIO_CAPS GST_AUDIO_CAPS_MAKE (SUPPORTED_AUDIO_FORMAT) "," \
56 : "layout = (string) interleaved"
57 : /**
58 : * @brief Text caps
59 : */
60 : #define TEXT_CAPS "text/x-raw, format = (string) utf8"
61 : /**
62 : * @brief Octet caps
63 : */
64 : #define OCTET_CAPS "application/octet-stream"
65 : /**
66 : * @brief Image caps
67 : */
68 : #define IMAGE_CAPS \
69 : "image/png, width = (int) [ 16, 1000000 ], height = (int) [ 16, 1000000 ], framerate = (fraction) [ 0/1, MAX];" \
70 : "image/jpeg, width = (int) [ 16, 65535 ], height = (int) [ 16, 65535 ], framerate = (fraction) [ 0/1, MAX], sof-marker = (int) { 0, 1, 2, 4, 9 };" \
71 : "image/tiff, endianness = (int) { BIG_ENDIAN, LITTLE_ENDIAN };" \
72 : "image/gif;" \
73 : "image/bmp"
74 :
75 : static GstStaticPadTemplate sinktemplate =
76 : GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, GST_PAD_ALWAYS,
77 : GST_STATIC_CAPS (TENSOR_CAPS ";" VIDEO_CAPS ";" AUDIO_CAPS ";" IMAGE_CAPS
78 : ";" TEXT_CAPS ";" OCTET_CAPS));
79 :
80 : /**
81 : * @brief datareposink properties.
82 : */
83 : enum
84 : {
85 : PROP_0,
86 : PROP_LOCATION,
87 : PROP_JSON
88 : };
89 :
90 : GST_DEBUG_CATEGORY_STATIC (gst_data_repo_sink_debug);
91 : #define GST_CAT_DEFAULT gst_data_repo_sink_debug
92 : #define _do_init \
93 : GST_DEBUG_CATEGORY_INIT (gst_data_repo_sink_debug, "datareposink", 0, "datareposink element");
94 : #define gst_data_repo_sink_parent_class parent_class
95 1125 : G_DEFINE_TYPE_WITH_CODE (GstDataRepoSink, gst_data_repo_sink,
96 : GST_TYPE_BASE_SINK, _do_init);
97 :
98 : static void gst_data_repo_sink_set_property (GObject * object, guint prop_id,
99 : const GValue * value, GParamSpec * pspec);
100 : static void gst_data_repo_sink_get_property (GObject * object, guint prop_id,
101 : GValue * value, GParamSpec * pspec);
102 : static void gst_data_repo_sink_finalize (GObject * object);
103 : static gboolean gst_data_repo_sink_stop (GstBaseSink * basesink);
104 : static GstStateChangeReturn gst_data_repo_sink_change_state (GstElement *
105 : element, GstStateChange transition);
106 : static GstFlowReturn gst_data_repo_sink_render (GstBaseSink * bsink,
107 : GstBuffer * buffer);
108 : static GstCaps *gst_data_repo_sink_get_caps (GstBaseSink * bsink,
109 : GstCaps * filter);
110 : static gboolean gst_data_repo_sink_set_caps (GstBaseSink * bsink,
111 : GstCaps * caps);
112 : static gboolean gst_data_repo_sink_event (GstBaseSink * bsink,
113 : GstEvent * event);
114 : static gboolean gst_data_repo_sink_query (GstBaseSink * sink, GstQuery * query);
115 :
116 : /**
117 : * @brief Initialize datareposink class.
118 : */
119 : static void
120 22 : gst_data_repo_sink_class_init (GstDataRepoSinkClass * klass)
121 : {
122 : GObjectClass *gobject_class;
123 : GstElementClass *gstelement_class;
124 : GstBaseSinkClass *gstbasesink_class;
125 :
126 22 : gobject_class = G_OBJECT_CLASS (klass);
127 22 : gstelement_class = GST_ELEMENT_CLASS (klass);
128 22 : gstbasesink_class = GST_BASE_SINK_CLASS (klass);
129 :
130 22 : gobject_class->set_property = gst_data_repo_sink_set_property;
131 22 : gobject_class->get_property = gst_data_repo_sink_get_property;
132 22 : gobject_class->finalize = gst_data_repo_sink_finalize;
133 :
134 22 : g_object_class_install_property (gobject_class, PROP_LOCATION,
135 : g_param_spec_string ("location", "File Location",
136 : "Location to write files to MLOps Data Repository. "
137 : "if the files are images, use placeholder in indexes for filename"
138 : "(e.g., filenmae%04d.png).",
139 : NULL,
140 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
141 : GST_PARAM_MUTABLE_READY));
142 :
143 22 : g_object_class_install_property (gobject_class, PROP_JSON,
144 : g_param_spec_string ("json", "JSON file path",
145 : "JSON file path to write the meta information of a sample", NULL,
146 : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
147 : GST_PARAM_MUTABLE_READY));
148 :
149 22 : gst_element_class_set_static_metadata (gstelement_class,
150 : "NNStreamer MLOps Data Repository Sink",
151 : "Sink/File",
152 : "Write files to MLOps Data Repository", "Samsung Electronics Co., Ltd.");
153 :
154 22 : gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
155 :
156 22 : gstelement_class->change_state =
157 22 : GST_DEBUG_FUNCPTR (gst_data_repo_sink_change_state);
158 22 : gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_data_repo_sink_render);
159 22 : gstbasesink_class->get_caps = GST_DEBUG_FUNCPTR (gst_data_repo_sink_get_caps);
160 22 : gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_data_repo_sink_set_caps);
161 22 : gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_data_repo_sink_event);
162 22 : gstbasesink_class->query = GST_DEBUG_FUNCPTR (gst_data_repo_sink_query);
163 22 : gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_data_repo_sink_stop);
164 :
165 : if (sizeof (off_t) < 8) {
166 : GST_LOG ("No large file support, sizeof (off_t) = %" G_GSIZE_FORMAT "!",
167 : sizeof (off_t));
168 : }
169 22 : }
170 :
171 : /**
172 : * @brief Initialize datareposink.
173 : */
174 : static void
175 26 : gst_data_repo_sink_init (GstDataRepoSink * sink)
176 : {
177 26 : sink->filename = NULL;
178 26 : sink->fd = 0;
179 26 : sink->fd_offset = 0;
180 26 : sink->data_type = GST_DATA_REPO_DATA_UNKNOWN;
181 26 : sink->is_static_tensors = FALSE;
182 26 : sink->fixed_caps = NULL;
183 26 : sink->json_object = NULL;
184 26 : sink->total_samples = 0;
185 26 : sink->cumulative_tensors = 0;
186 26 : sink->json_object = json_object_new ();
187 26 : sink->sample_offset_array = json_array_new ();
188 26 : sink->tensor_size_array = json_array_new ();
189 26 : sink->tensor_count_array = json_array_new ();
190 26 : }
191 :
192 : /**
193 : * @brief finalize datareposink.
194 : */
195 : static void
196 24 : gst_data_repo_sink_finalize (GObject * object)
197 : {
198 24 : GstDataRepoSink *sink = GST_DATA_REPO_SINK (object);
199 :
200 24 : g_free (sink->filename);
201 24 : g_free (sink->json_filename);
202 :
203 24 : if (sink->fd) {
204 0 : g_close (sink->fd, NULL);
205 0 : sink->fd = 0;
206 : }
207 :
208 : /* Check for gst-inspect log */
209 24 : if (sink->fixed_caps)
210 22 : gst_caps_unref (sink->fixed_caps);
211 :
212 24 : if (sink->sample_offset_array)
213 14 : json_array_unref (sink->sample_offset_array);
214 24 : if (sink->tensor_size_array)
215 14 : json_array_unref (sink->tensor_size_array);
216 24 : if (sink->tensor_count_array)
217 14 : json_array_unref (sink->tensor_count_array);
218 24 : if (sink->json_object) {
219 2 : json_object_unref (sink->json_object);
220 2 : sink->json_object = NULL;
221 : }
222 24 : G_OBJECT_CLASS (parent_class)->finalize (object);
223 24 : }
224 :
225 : /**
226 : * @brief Setter for datareposink properties.
227 : */
228 : static void
229 52 : gst_data_repo_sink_set_property (GObject * object, guint prop_id,
230 : const GValue * value, GParamSpec * pspec)
231 : {
232 52 : GstDataRepoSink *sink = GST_DATA_REPO_SINK (object);
233 :
234 52 : switch (prop_id) {
235 26 : case PROP_LOCATION:
236 26 : sink->filename = g_value_dup_string (value);
237 26 : GST_INFO_OBJECT (sink, "filename: %s", sink->filename);
238 26 : break;
239 26 : case PROP_JSON:
240 26 : sink->json_filename = g_value_dup_string (value);
241 26 : GST_INFO_OBJECT (sink, "JSON filename: %s", sink->json_filename);
242 26 : break;
243 0 : default:
244 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
245 0 : break;
246 : }
247 52 : }
248 :
249 : /**
250 : * @brief Getter datareposink properties.
251 : */
252 : static void
253 2 : gst_data_repo_sink_get_property (GObject * object, guint prop_id,
254 : GValue * value, GParamSpec * pspec)
255 : {
256 : GstDataRepoSink *sink;
257 :
258 2 : sink = GST_DATA_REPO_SINK (object);
259 :
260 2 : switch (prop_id) {
261 1 : case PROP_LOCATION:
262 1 : g_value_set_string (value, sink->filename);
263 1 : break;
264 1 : case PROP_JSON:
265 1 : g_value_set_string (value, sink->json_filename);
266 1 : break;
267 0 : default:
268 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
269 0 : break;
270 : }
271 2 : }
272 :
273 : /**
274 : * @brief Function to write others media type (tensors(fixed), video, audio, octet and text)
275 : */
276 : static GstFlowReturn
277 34 : gst_data_repo_sink_write_others (GstDataRepoSink * sink, GstBuffer * buffer)
278 : {
279 34 : gsize write_size = 0;
280 : GstMapInfo info;
281 34 : GstFlowReturn ret = GST_FLOW_OK;
282 :
283 68 : g_return_val_if_fail (sink != NULL, GST_FLOW_ERROR);
284 34 : g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
285 34 : g_return_val_if_fail (sink->fd != 0, GST_FLOW_ERROR);
286 :
287 34 : if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
288 0 : GST_ERROR_OBJECT (sink, "Failed to map the incoming buffer.");
289 0 : return GST_FLOW_ERROR;
290 : }
291 :
292 34 : GST_OBJECT_LOCK (sink);
293 34 : sink->sample_size = info.size;
294 :
295 34 : GST_LOG_OBJECT (sink,
296 : "Writing %lld bytes at offset 0x%" G_GINT64_MODIFIER "x (%lld size)",
297 : (long long) info.size, sink->fd_offset, (long long) sink->fd_offset);
298 :
299 34 : write_size = write (sink->fd, info.data, info.size);
300 :
301 34 : if (write_size != info.size) {
302 0 : GST_ERROR_OBJECT (sink, "Could not write data to file");
303 0 : ret = GST_FLOW_ERROR;
304 : } else {
305 34 : sink->fd_offset += write_size;
306 34 : sink->total_samples++;
307 : }
308 :
309 34 : GST_OBJECT_UNLOCK (sink);
310 34 : gst_buffer_unmap (buffer, &info);
311 :
312 34 : return ret;
313 : }
314 :
315 : /**
316 : * @brief Function to write flexible tensors or sparse tensors
317 : */
318 : static GstFlowReturn
319 161 : gst_data_repo_sink_write_flexible_or_sparse_tensors (GstDataRepoSink * sink,
320 : GstBuffer * buffer)
321 : {
322 : guint num_tensors, i;
323 161 : gsize write_size = 0, total_write = 0, tensor_size;
324 : GstMapInfo info;
325 161 : GstMemory *mem = NULL;
326 : GstTensorMetaInfo meta;
327 :
328 322 : g_return_val_if_fail (sink != NULL, GST_FLOW_ERROR);
329 161 : g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
330 161 : g_return_val_if_fail (sink->fd != 0, GST_FLOW_ERROR);
331 161 : g_return_val_if_fail (sink->json_object != NULL, GST_FLOW_ERROR);
332 161 : g_return_val_if_fail (sink->sample_offset_array != NULL, GST_FLOW_ERROR);
333 161 : g_return_val_if_fail (sink->tensor_size_array != NULL, GST_FLOW_ERROR);
334 161 : g_return_val_if_fail (sink->tensor_count_array != NULL, GST_FLOW_ERROR);
335 :
336 161 : GST_OBJECT_LOCK (sink);
337 :
338 161 : num_tensors = gst_tensor_buffer_get_count (buffer);
339 161 : GST_INFO_OBJECT (sink, "num_tensors: %u", num_tensors);
340 :
341 350 : for (i = 0; i < num_tensors; i++) {
342 191 : mem = gst_tensor_buffer_get_nth_memory (buffer, i);
343 191 : if (!gst_memory_map (mem, &info, GST_MAP_READ)) {
344 0 : GST_ERROR_OBJECT (sink, "Failed to map memory");
345 0 : goto mem_map_error;
346 : }
347 :
348 191 : if (!gst_tensor_meta_info_parse_header (&meta, info.data)) {
349 2 : GST_ERROR_OBJECT (sink,
350 : "Invalid format of tensors, the format is static.");
351 2 : goto error;
352 : }
353 189 : tensor_size = info.size;
354 :
355 189 : GST_LOG_OBJECT (sink, "tensor[%u] size: %zd", i, tensor_size);
356 189 : GST_LOG_OBJECT (sink,
357 : "Writing %lld bytes at offset 0x%" G_GINT64_MODIFIER "x (%lld size)",
358 : (long long) tensor_size, sink->fd_offset + total_write,
359 : (long long) sink->fd_offset + total_write);
360 :
361 189 : write_size = write (sink->fd, info.data, tensor_size);
362 189 : if (write_size != tensor_size) {
363 0 : GST_ERROR_OBJECT (sink, "Could not write data to file");
364 0 : goto error;
365 : }
366 :
367 189 : json_array_add_int_element (sink->tensor_size_array, tensor_size);
368 189 : total_write += write_size;
369 :
370 189 : gst_memory_unmap (mem, &info);
371 189 : gst_memory_unref (mem);
372 : }
373 :
374 159 : json_array_add_int_element (sink->sample_offset_array, sink->fd_offset);
375 159 : sink->fd_offset += total_write;
376 :
377 159 : GST_LOG_OBJECT (sink, "cumulative_tensors: %u", sink->cumulative_tensors);
378 159 : json_array_add_int_element (sink->tensor_count_array,
379 159 : sink->cumulative_tensors);
380 159 : sink->cumulative_tensors += num_tensors;
381 :
382 159 : sink->total_samples++;
383 :
384 159 : GST_OBJECT_UNLOCK (sink);
385 :
386 159 : return GST_FLOW_OK;
387 :
388 2 : error:
389 2 : gst_memory_unmap (mem, &info);
390 2 : mem_map_error:
391 2 : gst_memory_unref (mem);
392 2 : GST_OBJECT_UNLOCK (sink);
393 :
394 2 : return GST_FLOW_ERROR;
395 : }
396 :
397 : /**
398 : * @brief Get image filename
399 : */
400 : static gchar *
401 20 : gst_data_repo_sink_get_image_filename (GstDataRepoSink * sink)
402 : {
403 20 : gchar *filename = NULL;
404 :
405 20 : g_return_val_if_fail (sink != NULL, NULL);
406 20 : g_return_val_if_fail (sink->data_type == GST_DATA_REPO_DATA_IMAGE, NULL);
407 20 : g_return_val_if_fail (sink->filename != NULL, NULL);
408 :
409 : #ifdef __GNUC__
410 : #pragma GCC diagnostic push
411 : #pragma GCC diagnostic ignored "-Wformat-nonliteral"
412 : #endif
413 : /* let's set value by property */
414 20 : filename = g_strdup_printf (sink->filename, sink->total_samples);
415 : #ifdef __GNUC__
416 : #pragma GCC diagnostic pop
417 : #endif
418 :
419 20 : return filename;
420 : }
421 :
422 : /**
423 : * @brief Function to read multi image files
424 : */
425 : static GstFlowReturn
426 20 : gst_data_repo_sink_write_multi_images (GstDataRepoSink * sink,
427 : GstBuffer * buffer)
428 : {
429 20 : g_autofree gchar *filename = NULL;
430 20 : GstFlowReturn ret = GST_FLOW_OK;
431 20 : GError *error = NULL;
432 : GstMapInfo info;
433 :
434 20 : g_return_val_if_fail (sink != NULL, GST_FLOW_ERROR);
435 20 : g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
436 :
437 20 : if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
438 0 : GST_ERROR_OBJECT (sink, "Failed to map the incoming buffer.");
439 0 : return GST_FLOW_ERROR;
440 : }
441 :
442 20 : filename = gst_data_repo_sink_get_image_filename (sink);
443 :
444 20 : GST_OBJECT_LOCK (sink);
445 20 : sink->sample_size = info.size;
446 :
447 20 : GST_DEBUG_OBJECT (sink, "Writing to file \"%s\", size(%zd)", filename,
448 : info.size);
449 :
450 20 : if (!g_file_set_contents (filename, (char *) info.data, info.size, &error)) {
451 0 : GST_ERROR_OBJECT (sink, "Could not write data to file: %s",
452 : error ? error->message : "unknown error");
453 0 : g_clear_error (&error);
454 0 : ret = GST_FLOW_ERROR;
455 : } else {
456 20 : sink->total_samples++;
457 : }
458 :
459 20 : GST_OBJECT_UNLOCK (sink);
460 20 : gst_buffer_unmap (buffer, &info);
461 :
462 20 : return ret;
463 : }
464 :
465 : /**
466 : * @brief Called when a buffer should be presented or output.
467 : */
468 : static GstFlowReturn
469 215 : gst_data_repo_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
470 : {
471 215 : GstDataRepoSink *sink = GST_DATA_REPO_SINK_CAST (bsink);
472 :
473 215 : switch (sink->data_type) {
474 24 : case GST_DATA_REPO_DATA_VIDEO:
475 : case GST_DATA_REPO_DATA_AUDIO:
476 : case GST_DATA_REPO_DATA_TEXT:
477 : case GST_DATA_REPO_DATA_OCTET:
478 24 : return gst_data_repo_sink_write_others (sink, buffer);
479 171 : case GST_DATA_REPO_DATA_TENSOR:
480 : {
481 171 : sink->is_static_tensors =
482 171 : gst_tensor_pad_caps_is_static (GST_BASE_SINK_PAD (sink));
483 171 : if (!sink->is_static_tensors)
484 161 : return gst_data_repo_sink_write_flexible_or_sparse_tensors (sink,
485 : buffer);
486 10 : return gst_data_repo_sink_write_others (sink, buffer);
487 : }
488 20 : case GST_DATA_REPO_DATA_IMAGE:
489 20 : return gst_data_repo_sink_write_multi_images (sink, buffer);
490 0 : default:
491 0 : return GST_FLOW_ERROR;
492 : }
493 : }
494 :
495 : /**
496 : * @brief Get caps of datareposink.
497 : */
498 : static GstCaps *
499 544 : gst_data_repo_sink_get_caps (GstBaseSink * bsink, GstCaps * filter)
500 : {
501 544 : GstDataRepoSink *sink = GST_DATA_REPO_SINK (bsink);
502 544 : GstCaps *caps = NULL;
503 :
504 544 : GST_OBJECT_LOCK (sink);
505 544 : caps = gst_pad_get_pad_template_caps (GST_BASE_SINK_PAD (bsink));
506 544 : GST_OBJECT_UNLOCK (sink);
507 :
508 544 : GST_INFO_OBJECT (sink, "Got caps %" GST_PTR_FORMAT, caps);
509 :
510 544 : if (caps && filter) {
511 : GstCaps *intersection =
512 18 : gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
513 :
514 18 : gst_caps_unref (caps);
515 18 : caps = intersection;
516 : }
517 :
518 544 : GST_DEBUG_OBJECT (sink, "result get caps: %" GST_PTR_FORMAT, caps);
519 :
520 544 : return caps;
521 : }
522 :
523 : /**
524 : * @brief Set caps of datareposink.
525 : */
526 : static gboolean
527 22 : gst_data_repo_sink_set_caps (GstBaseSink * bsink, GstCaps * caps)
528 : {
529 : GstDataRepoSink *sink;
530 :
531 22 : sink = GST_DATA_REPO_SINK (bsink);
532 22 : GST_INFO_OBJECT (sink, "set caps %" GST_PTR_FORMAT, caps);
533 :
534 22 : sink->data_type = gst_data_repo_get_data_type_from_caps (caps);
535 22 : sink->fixed_caps = gst_caps_copy (caps);
536 :
537 22 : GST_DEBUG_OBJECT (sink, "data type: %d", sink->data_type);
538 22 : return (sink->data_type != GST_DATA_REPO_DATA_UNKNOWN);
539 : }
540 :
541 : /**
542 : * @brief Receive Event on datareposink.
543 : */
544 : static gboolean
545 275 : gst_data_repo_sink_event (GstBaseSink * bsink, GstEvent * event)
546 : {
547 : GstDataRepoSink *sink;
548 275 : sink = GST_DATA_REPO_SINK (bsink);
549 :
550 275 : GST_INFO_OBJECT (sink, "got event (%s)",
551 : gst_event_type_get_name (GST_EVENT_TYPE (event)));
552 :
553 275 : switch (GST_EVENT_TYPE (event)) {
554 21 : case GST_EVENT_EOS:
555 21 : GST_INFO_OBJECT (sink, "get GST_EVENT_EOS event..state is %d",
556 : GST_STATE (sink));
557 21 : break;
558 0 : case GST_EVENT_FLUSH_START:
559 0 : GST_INFO_OBJECT (sink, "get GST_EVENT_FLUSH_START event");
560 0 : break;
561 0 : case GST_EVENT_FLUSH_STOP:
562 0 : GST_INFO_OBJECT (sink, "get GST_EVENT_FLUSH_STOP event");
563 0 : break;
564 254 : default:
565 254 : break;
566 : }
567 :
568 275 : return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event);
569 : }
570 :
571 : /**
572 : * @brief Perform a GstQuery on datareposink.
573 : */
574 : static gboolean
575 559 : gst_data_repo_sink_query (GstBaseSink * bsink, GstQuery * query)
576 : {
577 : gboolean ret;
578 :
579 559 : switch (GST_QUERY_TYPE (query)) {
580 0 : case GST_QUERY_SEEKING:{
581 : GstFormat fmt;
582 :
583 : /* we don't supporting seeking */
584 0 : gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
585 0 : gst_query_set_seeking (query, fmt, FALSE, 0, -1);
586 0 : ret = TRUE;
587 0 : break;
588 : }
589 559 : default:
590 559 : ret = GST_BASE_SINK_CLASS (parent_class)->query (bsink, query);
591 559 : break;
592 : }
593 :
594 559 : return ret;
595 : }
596 :
597 : /**
598 : * @brief Function to open file
599 : */
600 : static gboolean
601 21 : gst_data_repo_sink_open_file (GstDataRepoSink * sink)
602 : {
603 21 : gchar *filename = NULL;
604 21 : int flags = O_CREAT | O_WRONLY;
605 :
606 21 : g_return_val_if_fail (sink != NULL, FALSE);
607 21 : g_return_val_if_fail (sink->data_type != GST_DATA_REPO_DATA_UNKNOWN, FALSE);
608 :
609 21 : if (sink->filename == NULL || sink->filename[0] == '\0')
610 0 : goto no_filename;
611 :
612 : /* for image, g_file_set_contents() is used in the write function */
613 21 : if (sink->data_type == GST_DATA_REPO_DATA_IMAGE) {
614 4 : return TRUE;
615 : }
616 :
617 : /* need to get filename by media type */
618 17 : filename = g_strdup (sink->filename);
619 :
620 17 : GST_INFO_OBJECT (sink, "opening file %s", filename);
621 :
622 : /** How about support file mode property ?
623 : flags |= O_APPEND ("ab") */
624 :
625 17 : flags |= O_TRUNC; /* "wb" */
626 :
627 : /* open the file */
628 17 : sink->fd = g_open (filename, flags, 0644);
629 :
630 17 : if (sink->fd < 0)
631 0 : goto open_failed;
632 :
633 17 : g_free (filename);
634 :
635 17 : return TRUE;
636 :
637 : /* ERRORS */
638 0 : no_filename:
639 : {
640 0 : GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND,
641 : (("No file name specified for writing.")), (NULL));
642 0 : goto error_exit;
643 : }
644 0 : open_failed:
645 : {
646 0 : switch (errno) {
647 0 : case ENOENT:
648 0 : GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND, (NULL),
649 : ("No such file \"%s\"", sink->filename));
650 0 : break;
651 0 : default:
652 0 : GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ,
653 : (("Could not open file \"%s\" for reading."), sink->filename),
654 : GST_ERROR_SYSTEM);
655 0 : break;
656 : }
657 0 : goto error_exit;
658 : }
659 :
660 0 : error_exit:
661 0 : g_free (filename);
662 :
663 0 : return FALSE;
664 : }
665 :
666 : /**
667 : * @brief Stop datareposink
668 : */
669 : static gboolean
670 22 : gst_data_repo_sink_stop (GstBaseSink * basesink)
671 : {
672 : GstDataRepoSink *sink;
673 :
674 22 : sink = GST_DATA_REPO_SINK_CAST (basesink);
675 :
676 : /* close the file */
677 22 : g_close (sink->fd, NULL);
678 22 : sink->fd = 0;
679 :
680 22 : return TRUE;
681 : }
682 :
683 : /**
684 : * @brief Write json to file
685 : */
686 : static gboolean
687 22 : __write_json (JsonObject * object, const gchar * filename)
688 : {
689 : JsonNode *root;
690 : JsonGenerator *generator;
691 22 : gboolean ret = TRUE;
692 :
693 22 : g_return_val_if_fail (object != NULL, FALSE);
694 22 : g_return_val_if_fail (filename != NULL, FALSE);
695 :
696 : /* Make it the root node */
697 22 : root = json_node_init_object (json_node_alloc (), object);
698 22 : generator = json_generator_new ();
699 22 : json_generator_set_root (generator, root);
700 22 : json_generator_set_pretty (generator, TRUE);
701 22 : ret = json_generator_to_file (generator, filename, NULL);
702 22 : if (!ret) {
703 0 : GST_ERROR ("Failed to write JSON to file %s", filename);
704 : }
705 :
706 : /* Release everything */
707 22 : g_object_unref (generator);
708 22 : json_node_free (root);
709 :
710 22 : return ret;
711 : }
712 :
713 : /**
714 : * @brief write the meta information to a JSON file
715 : */
716 : static gboolean
717 22 : gst_data_repo_sink_write_json_meta_file (GstDataRepoSink * sink)
718 : {
719 22 : gchar *caps_str = NULL;
720 22 : gboolean ret = TRUE;
721 :
722 22 : g_return_val_if_fail (sink != NULL, FALSE);
723 22 : g_return_val_if_fail (sink->json_filename != NULL, FALSE);
724 22 : g_return_val_if_fail (sink->data_type != GST_DATA_REPO_DATA_UNKNOWN, FALSE);
725 22 : g_return_val_if_fail (sink->fixed_caps != NULL, FALSE);
726 22 : g_return_val_if_fail (sink->json_object != NULL, FALSE);
727 22 : g_return_val_if_fail (sink->sample_offset_array != NULL, FALSE);
728 22 : g_return_val_if_fail (sink->tensor_size_array != NULL, FALSE);
729 22 : g_return_val_if_fail (sink->tensor_count_array != NULL, GST_FLOW_ERROR);
730 :
731 22 : caps_str = gst_caps_to_string (sink->fixed_caps);
732 22 : GST_DEBUG_OBJECT (sink, "caps string: %s", caps_str);
733 :
734 22 : json_object_set_string_member (sink->json_object, "gst_caps", caps_str);
735 :
736 22 : json_object_set_int_member (sink->json_object, "total_samples",
737 22 : sink->total_samples);
738 :
739 22 : if (sink->data_type == GST_DATA_REPO_DATA_TENSOR && !sink->is_static_tensors) {
740 10 : json_object_set_array_member (sink->json_object, "sample_offset",
741 : sink->sample_offset_array);
742 10 : json_object_set_array_member (sink->json_object, "tensor_size",
743 : sink->tensor_size_array);
744 10 : json_object_set_array_member (sink->json_object, "tensor_count",
745 : sink->tensor_count_array);
746 :
747 10 : sink->sample_offset_array = NULL;
748 10 : sink->tensor_size_array = NULL;
749 10 : sink->tensor_count_array = NULL;
750 : } else {
751 12 : json_object_set_int_member (sink->json_object, "sample_size",
752 12 : sink->sample_size);
753 : }
754 22 : ret = __write_json (sink->json_object, sink->json_filename);
755 22 : if (!ret) {
756 0 : GST_ERROR_OBJECT (sink, "Failed to write json meta file: %s",
757 : sink->json_filename);
758 : }
759 :
760 22 : json_object_unref (sink->json_object);
761 22 : g_free (caps_str);
762 22 : sink->json_object = NULL;
763 :
764 22 : return ret;
765 : }
766 :
767 : /**
768 : * @brief Change state of datareposink.
769 : */
770 : static GstStateChangeReturn
771 138 : gst_data_repo_sink_change_state (GstElement * element,
772 : GstStateChange transition)
773 : {
774 138 : GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
775 138 : GstDataRepoSink *sink = GST_DATA_REPO_SINK (element);
776 :
777 138 : switch (transition) {
778 26 : case GST_STATE_CHANGE_NULL_TO_READY:
779 26 : GST_INFO_OBJECT (sink, "NULL_TO_READY");
780 26 : if (sink->filename == NULL || sink->json_filename == NULL) {
781 2 : GST_ERROR_OBJECT (sink, "Set filenmae and json");
782 2 : goto state_change_failed;
783 : }
784 24 : break;
785 :
786 24 : case GST_STATE_CHANGE_READY_TO_PAUSED:
787 24 : GST_INFO_OBJECT (sink, "READY_TO_PAUSED");
788 24 : break;
789 :
790 21 : case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
791 21 : GST_INFO_OBJECT (sink, "PAUSED_TO_PLAYING");
792 :
793 21 : if (!gst_data_repo_sink_open_file (sink))
794 0 : goto state_change_failed;
795 21 : break;
796 :
797 67 : default:
798 67 : break;
799 : }
800 :
801 136 : ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
802 :
803 136 : switch (transition) {
804 21 : case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
805 21 : GST_INFO_OBJECT (sink, "PLAYING_TO_PAUSED");
806 21 : break;
807 :
808 22 : case GST_STATE_CHANGE_PAUSED_TO_READY:
809 22 : GST_INFO_OBJECT (sink, "PAUSED_TO_READY");
810 22 : break;
811 :
812 22 : case GST_STATE_CHANGE_READY_TO_NULL:
813 22 : GST_INFO_OBJECT (sink, "READY_TO_NULL");
814 22 : if (!gst_data_repo_sink_write_json_meta_file (sink))
815 0 : goto state_change_failed;
816 22 : break;
817 :
818 71 : default:
819 71 : break;
820 : }
821 136 : return ret;
822 :
823 2 : state_change_failed:
824 2 : GST_ERROR_OBJECT (sink, "state change failed");
825 :
826 2 : return GST_STATE_CHANGE_FAILURE;
827 : }
|