FFmpeg
shared.c
Go to the documentation of this file.
1 /*
2  * Shared file cache protocol.
3  * Copyright (c) 2026 Niklas Haas
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  *
21  * Based on cache.c by Michael Niedermayer
22  */
23 
24 #include "libavutil/attributes.h"
25 #include "libavutil/avassert.h"
26 #include "libavutil/avstring.h"
27 #include "libavutil/crc.h"
28 #include "libavutil/error.h"
29 #include "libavutil/hash.h"
30 #include "libavutil/file_open.h"
31 #include "libavutil/mem.h"
32 #include "libavutil/opt.h"
33 #include "libavutil/time.h"
34 
35 #include "url.h"
36 
37 #include <errno.h>
38 #include <fcntl.h>
39 #include <inttypes.h>
40 #include <stdatomic.h>
41 #include <string.h>
42 #include <sys/file.h>
43 #include <sys/mman.h>
44 #include <sys/stat.h>
45 #include <unistd.h>
46 
47 /**
48  * This hash should be resistant against collision attacks, so that an
49  * attacker could not generate e.g. two different URIs that map to the same
50  * cache file. This requires at least 64 bits of collision resistance in
51  * practice (i.e. 128 bits = 16 bytes of hash size). However, we can be
52  * conservative by computing e.g. a 256 bit hash and storing it inside the
53  * file header for verification.
54  *
55  * Note that due to the way we use atomics, we should avoid zero bytes in
56  * the resulting hash; hence we tweak the input slightly to avoid this.
57  * The resulting loss in hash strength is negligible, since 32 bytes is
58  * already much more than needed.
59  */
60 #define HASH_METHOD "SHA512/256"
61 #define HASH_SIZE 32
62 
63 static int hash_uri(uint8_t hash[HASH_SIZE], const char *uri)
64 {
65  struct AVHashContext *ctx = NULL;
67  if (ret < 0)
68  return ret;
69 
72  av_hash_update(ctx, (const uint8_t *) uri, strlen(uri));
75 
76  for (int i = 0; i < HASH_SIZE; i++)
77  hash[i] = hash[i] ? hash[i] : ~hash[i]; /* prevent zero bytes */
78  return 0;
79 }
80 
81 #define HEADER_MAGIC MKTAG(u'\xFF', 'S', 'h', '$')
82 #define HEADER_VERSION 2
83 
84 enum BlockState {
85  /* Reserved block state values */
86  BLOCK_NONE = 0, ///< block is not cached
87  BLOCK_PENDING, ///< a thread is currently trying to write this block
88  BLOCK_FAILED, ///< the underlying I/O source failed to read this block
89 
90  /**
91  * All other block states represent valid cached blocks, with the value
92  * being the CRC of the block data.
93  */
94 };
95 
96 static uint16_t get_block_crc(const uint8_t *block, size_t block_size)
97 {
98  uint16_t crc = av_crc(av_crc_get_table(AV_CRC_16_ANSI), 0, block, block_size);
99  switch (crc) {
100  case BLOCK_NONE:
101  case BLOCK_FAILED:
102  case BLOCK_PENDING:
103  return ~crc; /* avoid reserved block states */
104  default:
105  return crc;
106  }
107 }
108 
109 typedef struct Block {
110  atomic_ushort state; /* enum BlockState */
111 } Block;
112 
113 typedef struct Spacemap {
117  atomic_ullong filesize; /* byte offset of true EOF, or 0 if unknown */
118  atomic_uchar hash[HASH_SIZE]; /* hash of resource URI / filename */
119  char reserved[80];
120 
122 } Spacemap;
123 
124 /* Set to value iff the current value is unset (zero) */
125 #define DEF_SET_ONCE(ctype, atype) \
126  static int set_once_##atype(atomic_##atype *const ptr, const ctype value) \
127  { \
128  ctype prev = 0; \
129  av_assert1(value != 0); \
130  if (atomic_compare_exchange_strong_explicit( \
131  ptr, &prev, value, memory_order_release, memory_order_relaxed)) \
132  return 1; \
133  else if (prev == value) \
134  return 0; \
135  else \
136  return AVERROR(EINVAL); \
137  }
138 
139 DEF_SET_ONCE(unsigned char, uchar)
140 DEF_SET_ONCE(unsigned int, uint)
141 DEF_SET_ONCE(unsigned short, ushort)
142 DEF_SET_ONCE(unsigned long long, ullong)
143 
144 typedef struct SharedContext {
145  AVClass *class;
148 
149  /* options */
150  char *cache_dir;
151  int block_shift; ///< requested shift; may disagree with actual
155  int verify;
156 
157  /* misc state */
158  int64_t pos; ///< current logical position
159  uint8_t *tmp_buf;
161  int write_err; ///< write error occurred
162 
163  /* cache file */
164  uint8_t *cache_data; ///< optional mmap of the cache file
165  char *cache_path;
166  off_t cache_size; ///< size of mapped memory region (for munmap)
167  int fd;
168 
169  /* space map */
171  char *map_path;
172  off_t map_size;
173  int mapfd;
174 
175  /* statistics */
178 } SharedContext;
179 
181 {
182  SharedContext *s = h->priv_data;
183 
184  ffurl_close(s->inner);
185  if (s->cache_data)
186  munmap(s->cache_data, s->cache_size);
187  if (s->spacemap)
188  munmap(s->spacemap, s->map_size);
189  if (s->fd != -1)
190  close(s->fd);
191  if (s->mapfd != -1)
192  close(s->mapfd);
193  av_freep(&s->cache_path);
194  av_freep(&s->map_path);
195  av_freep(&s->tmp_buf);
196 
197  av_log(h, AV_LOG_DEBUG, "Cache statistics: %"PRId64" hits, %"PRId64" misses\n",
198  s->nb_hit, s->nb_miss);
199  return 0;
200 }
201 
202 static int cache_map(URLContext *h, int64_t filesize);
203 static int spacemap_init(URLContext *h, const uint8_t hash[HASH_SIZE]);
204 static int spacemap_grow(URLContext *h, int64_t block);
205 
207 {
208  SharedContext *s = h->priv_data;
209  return atomic_load_explicit(&s->spacemap->filesize, memory_order_relaxed);
210 }
211 
212 static int set_filesize(URLContext *h, int64_t new_size)
213 {
214  SharedContext *s = h->priv_data;
215  int ret;
216 
217  if (!new_size)
218  return 0;
219 
220  ret = set_once_ullong(&s->spacemap->filesize, new_size);
221  if (ret < 0) {
222  av_log(h, AV_LOG_ERROR, "Cached file size mismatch, expected: "
223  "%"PRId64", got: %"PRIu64"!\n", new_size,
224  (uint64_t) atomic_load(&s->spacemap->filesize));
225  return ret;
226  } else if (ret) {
227  /* Opportunistically map the file; this also sets the correct filesize.
228  * Ignore errors as this is not critical to the cache logic. */
229  cache_map(h, new_size);
230  }
231 
232  return ret;
233 }
234 
235 static int shared_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
236 {
237  SharedContext *s = h->priv_data;
238  int ret;
239 
240  if (!s->cache_dir || !s->cache_dir[0]) {
241  av_log(h, AV_LOG_ERROR, "Missing path for shared cache! Specify a "
242  "directory using the -cache_dir option.\n");
243  return AVERROR(EINVAL);
244  }
245 
246  s->fd = s->mapfd = -1; /* Set these early for shared_close() failure path */
247 
248  /* Open underlying protocol */
249  av_strstart(arg, "shared:", &arg);
250  ret = ffurl_open_whitelist(&s->inner, arg, flags, &h->interrupt_callback,
251  options, h->protocol_whitelist, h->protocol_blacklist, h);
252 
253  if (ret < 0)
254  goto fail;
255 
256  uint8_t hash[HASH_SIZE];
257  ret = hash_uri(hash, arg);
258  if (ret < 0)
259  goto fail;
260 
261  /* 128 bits is enough for collision resistance; we already store the full
262  * hash inside the header for verification */
263  char filename[2 * 16 + 1];
264  for (int i = 0; i < FF_ARRAY_ELEMS(filename) / 2; i++)
265  sprintf(&filename[i * 2], "%02X", hash[i]);
266  s->cache_path = av_asprintf("%s/%s.cache", s->cache_dir, filename);
267  s->map_path = av_asprintf("%s/%s.spacemap", s->cache_dir, filename);
268  if (!s->cache_path || !s->map_path) {
269  ret = AVERROR(ENOMEM);
270  goto fail;
271  }
272 
273  av_log(h, AV_LOG_VERBOSE, "Opening cache file '%s' for URI: '%s'\n",
274  s->cache_path, s->inner->filename);
275 
276  s->fd = avpriv_open(s->cache_path, O_RDWR | O_CREAT, 0660);
277  s->mapfd = avpriv_open(s->map_path, O_RDWR | O_CREAT, 0660);
278  if (s->fd < 0 || s->mapfd < 0) {
279  ret = AVERROR(errno);
280  av_log(h, AV_LOG_ERROR, "Failed to open '%s': %s\n",
281  s->fd < 0 ? s->cache_path : s->map_path, av_err2str(ret));
282  goto fail;
283  }
284 
285  ret = spacemap_init(h, hash);
286  if (ret < 0)
287  goto fail;
288 
289  s->block_size = 1 << atomic_load(&s->spacemap->block_shift);
290 
292  if (!filesize) {
293  /* Filesize is not yet known, try to get it from the underlying URL */
294  filesize = ffurl_size(s->inner);
295  if (filesize < 0 && filesize != AVERROR(ENOSYS)) {
296  ret = (int) filesize;
297  goto fail;
298  } else if (filesize > 0) {
300  if (ret < 0)
301  goto fail;
302  }
303  }
304 
305  if (filesize > 0) {
306  int64_t last_pos = filesize - 1;
307  int64_t last_block = last_pos >> atomic_load(&s->spacemap->block_shift);
308  ret = spacemap_grow(h, last_block);
309  if (ret < 0)
310  goto fail;
311 
312  /* If filesize is known, we can directly mmap() the cache file */
313  ret = cache_map(h, filesize);
314  if (ret < 0) {
315  av_log(h, AV_LOG_WARNING, "Failed to map cache file: %s. Falling "
316  "back to normal read/write\n", av_err2str(ret));
317  ret = 0;
318  }
319  }
320 
321  /* Temporary buffer needed for pread/pwrite() fallback */
322  s->tmp_buf = av_malloc(s->block_size);
323  if (!s->tmp_buf) {
324  ret = AVERROR(ENOMEM);
325  goto fail;
326  }
327 
328  h->max_packet_size = s->block_size;
329  h->min_packet_size = s->block_size;
330  ret = 0;
331 
332 fail:
333  if (ret < 0)
334  shared_close(h);
335  return ret;
336 }
337 
339 {
340  SharedContext *s = h->priv_data;
341  if (s->cache_size >= filesize || filesize > SIZE_MAX)
342  return 0;
343 
344  if (s->cache_data) {
345  munmap(s->cache_data, s->cache_size);
346  s->cache_data = NULL;
347  s->cache_size = 0;
348  }
349 
350  struct stat st;
351  int ret = fstat(s->fd, &st);
352  if (ret < 0)
353  return AVERROR(errno);
354 
355  if (st.st_size != filesize) {
356  /* Ensure the file size is correct before mapping; this can happen if
357  * another process wrote the correct filesize to the header but
358  * crashed right before actually successfully resizing the file. */
359  ret = ftruncate(s->fd, filesize);
360  if (ret < 0)
361  return AVERROR(errno);
362  }
363 
364  s->cache_data = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, s->fd, 0);
365  if (s->cache_data == MAP_FAILED) {
366  s->cache_data = NULL;
367  return AVERROR(errno);
368  }
369 
370  s->cache_size = filesize;
371  return 0;
372 }
373 
374 static int spacemap_remap(URLContext *h, size_t map_size)
375 {
376  SharedContext *s = h->priv_data;
377  int ret, did_grow = 0, locked = 0;
378  if (map_size <= s->map_size)
379  return 0;
380 
381  /* Opportunistically get current filesize before attempting to lock */
382  struct stat st;
383  ret = fstat(s->mapfd, &st);
384  if (ret < 0) {
385  ret = AVERROR(errno);
386  goto fail;
387  }
388 
389  if (st.st_size >= map_size)
390  goto skip_resize;
391 
392  /* Lock the spacemap to ensure nobody else is currently resizing it */
393  ret = flock(s->mapfd, LOCK_EX);
394  if (ret < 0) {
395  ret = AVERROR(errno);
396  goto fail;
397  }
398  locked = 1;
399 
400  /* Refresh filesize after acquiring the lock */
401  ret = fstat(s->mapfd, &st);
402  if (ret < 0) {
403  ret = AVERROR(errno);
404  goto fail;
405  }
406 
407  if (st.st_size >= map_size)
408  goto skip_resize;
409 
410  ret = ftruncate(s->mapfd, map_size);
411  if (ret < 0) {
412  ret = AVERROR(errno);
413  goto fail;
414  }
415  st.st_size = map_size;
416  did_grow = 1;
417 
418 skip_resize:
419  if (s->spacemap)
420  munmap(s->spacemap, s->map_size);
421  s->map_size = st.st_size;
422  s->spacemap = mmap(NULL, s->map_size, PROT_READ | PROT_WRITE, MAP_SHARED, s->mapfd, 0);
423  if (s->spacemap == MAP_FAILED) {
424  s->spacemap = NULL; /* for munmap check */
425  s->map_size = 0;
426  ret = AVERROR(errno);
427  goto fail;
428  }
429 
430  if (locked) {
431  flock(s->mapfd, LOCK_UN);
432  locked = 0;
433  }
434 
435  return did_grow;
436 
437 fail:
438  if (locked)
439  flock(s->mapfd, LOCK_UN);
440  av_log(h, AV_LOG_ERROR, "Failed to resize space map: %s\n", av_err2str(ret));
441  return ret;
442 }
443 
445 {
446  SharedContext *s = h->priv_data;
447  int64_t num_blocks = block + 1;
448  size_t map_bytes = sizeof(Spacemap) + num_blocks * sizeof(Block);
449 
450  /* When streaming files without known size, round up the number of blocks
451  * to the nearest multiple of the block size to reduce the rate of resizes */
452  if (!get_filesize(h)) {
453  av_assert0(s->block_size > 0);
454  map_bytes = FFALIGN(map_bytes, (int64_t) s->block_size);
455  }
456 
457  if (map_bytes < num_blocks)
458  return AVERROR(EINVAL); /* overflow */
459 
460  const off_t old_size = s->map_size;
461  int ret = spacemap_remap(h, map_bytes);
462  if (ret < 0)
463  return ret;
464 
465  /* Report new size after successful grow */
466  if (s->map_size > old_size) {
467  num_blocks = (s->map_size - sizeof(Spacemap)) / sizeof(Block);
469  "%s %zu bytes, capacity: %"PRId64" blocks = %zu MB\n",
470  ret ? "Resized spacemap to" : "Mapped spacemap with",
471  (size_t) s->map_size, num_blocks,
472  (num_blocks * (int64_t) s->block_size) >> 20);
473  }
474  return 0;
475 }
476 
477 static int spacemap_init(URLContext *h, const uint8_t hash[HASH_SIZE])
478 {
479  SharedContext *s = h->priv_data;
480  int ret;
481 
482  ret = spacemap_remap(h, sizeof(Spacemap));
483  if (ret < 0)
484  return ret;
485 
486  if ((ret = set_once_uint(&s->spacemap->header_magic, HEADER_MAGIC)) < 0 ||
487  (ret = set_once_ushort(&s->spacemap->version, HEADER_VERSION)) < 0)
488  {
489  av_log(h, AV_LOG_ERROR, "Shared cache spacemap header mismatch!\n");
490  av_log(h, AV_LOG_ERROR, " Expected magic: 0x%X, version: %d\n",
492  av_log(h, AV_LOG_ERROR, " Got magic: 0x%X, version: %d\n",
493  atomic_load(&s->spacemap->header_magic),
494  atomic_load(&s->spacemap->version));
495  return ret;
496  }
497 
498  ret = set_once_ushort(&s->spacemap->block_shift, s->block_shift);
499  if (ret < 0) {
500  const int shift = atomic_load(&s->spacemap->block_shift);
501  av_log(h, AV_LOG_WARNING, "Shared cache uses block shift %d, "
502  "but requested block shift is %d.\n", shift, s->block_shift);
503  if (shift < 9 || shift > 30) {
504  av_log(h, AV_LOG_ERROR, "Invalid block shift %d in cache file!\n", shift);
505  return AVERROR(EINVAL);
506  }
507  }
508 
509  for (int i = 0; i < HASH_SIZE; i++) {
510  ret = set_once_uchar(&s->spacemap->hash[i], hash[i]);
511  if (ret < 0) {
512  av_log(h, AV_LOG_ERROR, "Shared cache spacemap hash mismatch!\n");
513  av_log(h, AV_LOG_ERROR, " Expected hash: ");
514  for (int j = 0; j < 32; j++)
515  av_log(h, AV_LOG_ERROR, "%02X", hash[j]);
516  av_log(h, AV_LOG_ERROR, "\n Got hash: ");
517  for (int j = 0; j < 32; j++)
518  av_log(h, AV_LOG_ERROR, "%02X", atomic_load(&s->spacemap->hash[j]));
519  av_log(h, AV_LOG_ERROR, "\n");
520  return ret;
521  }
522  }
523 
524  if (ret) /* set_once() return 1 if this is the first time setting the value */
525  av_log(h, AV_LOG_DEBUG, "Initialized new cache spacemap.\n");
526 
527  return ret;
528 }
529 
530 static int read_cache(SharedContext *s, uint8_t *buf, size_t size, off_t offset)
531 {
532  if (s->cache_data) {
533  av_assert1(offset + size <= s->cache_size);
534  memcpy(buf, s->cache_data + offset, size);
535  return 0;
536  }
537 
538  while (size) {
539  ssize_t ret = pread(s->fd, buf, size, offset);
540  if (ret <= 0)
541  return ret ? AVERROR(errno) : AVERROR(EIO);
542  buf += ret;
543  offset += ret;
544  size -= ret;
545  }
546 
547  return 0;
548 }
549 
550 static int write_cache(SharedContext *s, const uint8_t *buf, size_t size, off_t offset)
551 {
552  if (s->cache_data) {
553  av_assert1(offset + size <= s->cache_size);
554  memcpy(s->cache_data + offset, buf, size);
555  return 0;
556  }
557 
558  while (size) {
559  ssize_t ret = pwrite(s->fd, buf, size, offset);
560  if (ret <= 0)
561  return ret ? AVERROR(errno) : AVERROR(EIO);
562  buf += ret;
563  offset += ret;
564  size -= ret;
565  }
566 
567  return 0;
568 }
569 
570 static size_t clamp_size(URLContext *h, size_t size, int64_t pos)
571 {
572  const int64_t filesize = get_filesize(h);
573  if (!filesize)
574  return size;
575  else if (pos > filesize)
576  return 0;
577  else
578  return FFMIN(filesize - pos, size);
579 }
580 
581 static int shared_read(URLContext *h, unsigned char *buf, int size)
582 {
583  SharedContext *s = h->priv_data;
584  uint8_t *tmp;
585  int ret;
586 
587  if (size <= 0)
588  return 0;
589 
590  size = clamp_size(h, size, s->pos);
591  if (size <= 0)
592  return AVERROR_EOF;
593 
594  const int shift = atomic_load_explicit(&s->spacemap->block_shift, memory_order_relaxed);
595  const int64_t block_id = s->pos >> shift;
596  const int64_t offset = s->pos & (s->block_size - 1);
597  const int64_t block_pos = block_id * s->block_size;
598  int block_size = clamp_size(h, s->block_size, block_pos);
599  ret = spacemap_grow(h, block_id);
600  if (ret < 0)
601  return ret;
602 
603  Block *const block = &s->spacemap->blocks[block_id];
604  unsigned short state = atomic_load_explicit(&block->state, memory_order_acquire);
605  int64_t pending_since = 0;
606  int verify_read = 0, is_race = 0;
607 
608 retry:
609  switch (state) {
610  default:
611  /* We always need to read the entire block to verify integrity */
612  block_size = clamp_size(h, block_size, block_pos); /* filesize may have changed */
613  if (s->cache_data) {
614  av_assert1(block_pos + block_size <= s->cache_size);
615  tmp = s->cache_data + block_pos;
616  } else {
617  tmp = s->tmp_buf;
618  ret = read_cache(s, tmp, block_size, block_pos);
619  if (ret < 0) {
620  av_log(h, AV_LOG_ERROR, "Failed to read from cache file: %s\n", av_err2str(ret));
621  return ret;
622  }
623  }
624 
625  uint16_t crc = get_block_crc(tmp, block_size);
626  if (crc != state) {
627  av_log(h, AV_LOG_ERROR, "Cache corruption detected for block 0x%"PRIx64" at "
628  "offset 0x%"PRIx64": expected CRC: 0x%04X, got: 0x%04X\n",
629  block_id, block_pos, state, crc);
630  return AVERROR(EIO);
631  }
632 
633  tmp += (ptrdiff_t) offset;
634  size = FFMIN(size, block_size - offset);
635  if (s->verify) {
636  verify_read = 1;
637  break; /* fall through to the cache miss logic */
638  }
639 
640  memcpy(buf, tmp, size);
641  s->nb_hit++;
642  s->pos += size;
643  return size;
644 
645  case BLOCK_FAILED:
646  if (!s->retry_errors)
647  return AVERROR(EIO);
649  case BLOCK_NONE:
650  if (s->read_only)
651  break; /* don't mark block as pending */
654  memory_order_acquire,
655  memory_order_acquire))
656  {
657  /* Acquired pending state, proceed to fetch the block */
659  break;
660  }
661  /* CAS failed, another thread changed the state; reload it */
662  goto retry;
663 
664  case BLOCK_PENDING:
665  /* Another thread is busy fetching this block, wait for it to finish */
666  if (!s->timeout) {
667  is_race = 1;
668  break; /* no timeout requested, immediately race to fetch block */
669  } else if (pending_since) {
671  if (new - pending_since >= s->timeout) {
672  is_race = 1;
673  break; /* timeout expired, try to fetch the block ourselves */
674  }
675  } else {
676  pending_since = av_gettime_relative();
677  }
678 
679  /* Make sure we try a few times before giving up */
680  av_usleep(s->timeout >> 4);
681  state = atomic_load_explicit(&block->state, memory_order_acquire);
682  goto retry;
683  }
684 
685  /* Cache miss, fetch this block from underlying protocol */
686  s->nb_miss++;
687 
688  const int read_only = s->read_only || s->write_err || verify_read;
689  int64_t inner_pos = read_only ? s->pos : block_pos;
690  if (s->inner_pos != inner_pos) {
691  inner_pos = ffurl_seek(s->inner, inner_pos, SEEK_SET);
692  if (inner_pos < 0) {
693  av_log(h, AV_LOG_ERROR, "Failed to seek underlying protocol: %s\n",
694  av_err2str(inner_pos));
695  if (!read_only) {
696  /* Release pending state to avoid stalling other threads. Don't
697  * mark this as failed, since the seek error may be unrelated to
698  * the block and should probably be tried again. */
700  BLOCK_NONE,
701  memory_order_relaxed,
702  memory_order_relaxed);
703  }
704  return inner_pos;
705  }
706 
707  av_log(h, AV_LOG_DEBUG, "Inner seek to 0x%"PRIx64"\n", inner_pos);
708  s->inner_pos = inner_pos;
709  }
710 
711  if (read_only) {
712  /* Directly defer to the underlying protocol */
713  ret = ffurl_read(s->inner, buf, size);
714  if (ret < 0)
715  return ret;
716 
717  /* Verify the read data against the cached data if requested */
718  if (verify_read && memcmp(buf, tmp, ret)) {
719  av_log(h, AV_LOG_ERROR, "Cache verification failed for %d bytes "
720  "in block 0x%"PRIx64" at offset 0x%"PRIx64" + %"PRId64"!\n",
721  ret, block_id, block_pos, offset);
722  }
723 
724  s->pos = s->inner_pos = inner_pos + ret;
725  return ret;
726  }
727 
728  int write_back = 1;
729  if (s->cache_data && !is_race) {
730  /* Read directly into memory mapped cache file */
731  tmp = s->cache_data + block_pos;
732  write_back = 0;
733  } else if (size >= block_size && !offset) {
734  /* Read directly into output buffer if aligned and large enough */
735  tmp = buf;
736  } else {
737  /* Read into temporary buffer and copy later */
738  tmp = s->tmp_buf;
739  }
740 
741  /* Try and fetch the entire block */
742  av_assert0(inner_pos == block_pos);
743  int bytes_read = 0;
744  while (bytes_read < block_size) {
745  ret = ffurl_read(s->inner, &tmp[bytes_read], block_size - bytes_read);
746  if (!ret || ret == AVERROR_EOF)
747  break;
748  else if (ret < 0) {
749  av_log(h, AV_LOG_ERROR, "Failed to read block 0x%"PRIx64": %s\n",
750  block_id, av_err2str(ret));
751  int new_state = BLOCK_FAILED;
752  if (ret == AVERROR(EAGAIN) || ret == AVERROR_EXIT)
753  new_state = BLOCK_NONE; /* transient error, allow retries */
754 
755  /* Try to mark block as failed; ignore errors - any mismatch
756  * here will mean that either another thread already marked it
757  * as failed, or successfully cached it in the meantime */
759  new_state,
760  memory_order_relaxed,
761  memory_order_relaxed);
762  return ret;
763  }
764 
765  bytes_read += ret;
766  s->inner_pos += ret;
767  }
768 
769  if (bytes_read < block_size) {
770  /* Learned location of true EOF, update filesize */
771  ret = set_filesize(h, inner_pos + bytes_read);
772  if (ret < 0)
773  return ret;
774  }
775 
776  if (bytes_read > 0) {
777  ret = write_back ? write_cache(s, tmp, bytes_read, block_pos) : 0;
778  if (ret < 0) {
779  av_log(h, AV_LOG_ERROR, "Failed to write to cache file: %s\n",
780  av_err2str(ret));
781  s->write_err = 1;
782  /* Mark as NONE, not FAILED, since the block itself is fine -
783  * just absent from the cache. */
785  BLOCK_NONE,
786  memory_order_relaxed,
787  memory_order_relaxed);
788  } else {
789  uint16_t crc = get_block_crc(tmp, bytes_read);
790  av_log(h, AV_LOG_TRACE, "Cached %d bytes to block 0x%"PRIx64" at "
791  "offset 0x%"PRIx64", CRC 0x%04X\n", bytes_read, block_id,
792  block_pos, crc);
793  atomic_store_explicit(&block->state, crc, memory_order_release);
794  }
795  } else {
796  return AVERROR_EOF;
797  }
798 
799  size = FFMIN(bytes_read - offset, size);
800  if (size <= 0)
801  return AVERROR_EOF;
802  if (tmp != buf)
803  memcpy(buf, &tmp[offset], size);
804  s->pos += size;
805  return size;
806 }
807 
808 static int64_t shared_seek(URLContext *h, int64_t pos, int whence)
809 {
810  SharedContext *s = h->priv_data;
811  const int64_t filesize = get_filesize(h);
812  int64_t res;
813 
814  switch (whence) {
815  case AVSEEK_SIZE:
816  if (filesize)
817  return filesize;
818  res = ffurl_seek(s->inner, pos, whence);
819  if (res > 0) {
820  if (set_filesize(h, res) < 0)
821  return AVERROR(EINVAL);
822  }
823  return res;
824  case SEEK_SET:
825  break;
826  case SEEK_CUR:
827  pos += s->pos;
828  break;
829  case SEEK_END:
830  if (filesize) {
831  pos += filesize;
832  break;
833  }
834  /* Defer to underlying protocol if filesize is unknown */
835  res = ffurl_seek(s->inner, pos, whence);
836  if (res < 0)
837  return res;
838  /* Opportunistically update known filesize */
839  if (set_filesize(h, res - pos) < 0)
840  return AVERROR(EINVAL);
841  av_log(h, AV_LOG_DEBUG, "Inner seek to 0x%"PRIx64"\n", res);
842  return s->pos = s->inner_pos = res;
843  default:
844  return AVERROR(EINVAL);
845  }
846 
847  if (pos < 0)
848  return AVERROR(EINVAL);
849 
850  av_log(h, AV_LOG_DEBUG, "Virtual seek to 0x%"PRIx64"\n", pos);
851  return s->pos = pos;
852 }
853 
855 {
856  SharedContext *s = h->priv_data;
857  return ffurl_get_file_handle(s->inner);
858 }
859 
861 {
862  SharedContext *s = h->priv_data;
863  int ret = ffurl_get_short_seek(s->inner);
864  return ret > 0 ? FFMAX(ret, s->block_size) : s->block_size;
865 }
866 
867 #define OFFSET(x) offsetof(SharedContext, x)
868 #define D AV_OPT_FLAG_DECODING_PARAM
869 
870 static const AVOption options[] = {
871  { "cache_dir", "Directory path for shared file cache", OFFSET(cache_dir), AV_OPT_TYPE_STRING, {.str = NULL}, .flags = D },
872  { "block_shift", "Set the base 2 logarithm of the block size", OFFSET(block_shift), AV_OPT_TYPE_INT, {.i64 = 15}, 9, 30, .flags = D },
873  { "read_only", "Don't write data to the cache, only read from it", OFFSET(read_only), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, .flags = D },
874  { "cache_verify", "Verify correctness of the cache against the source", OFFSET(verify), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, .flags = D },
875  { "cache_timeout", "Time in us to wait before re-fetching pending blocks", OFFSET(timeout), AV_OPT_TYPE_INT64, {.i64 = 10000}, 0, INT64_MAX, .flags = D },
876  { "retry_errors", "Re-request blocks even if they previously failed", OFFSET(retry_errors), AV_OPT_TYPE_BOOL, {.i64 = 1}, 0, 1, .flags = D },
877  {0},
878 };
879 
881  .class_name = "shared",
882  .item_name = av_default_item_name,
883  .option = options,
884  .version = LIBAVUTIL_VERSION_INT,
885 };
886 
888  .name = "shared",
889  .url_open2 = shared_open,
890  .url_read = shared_read,
891  .url_seek = shared_seek,
892  .url_close = shared_close,
893  .url_get_file_handle = shared_get_file_handle,
894  .url_get_short_seek = shared_get_short_seek,
895  .priv_data_size = sizeof(SharedContext),
896  .priv_data_class = &shared_context_class,
897 };
flags
const SwsFlags flags[]
Definition: swscale.c:85
ffurl_seek
static int64_t ffurl_seek(URLContext *h, int64_t pos, int whence)
Change the position that will be used by the next read/write operation on the resource accessed by h.
Definition: url.h:222
av_gettime_relative
int64_t av_gettime_relative(void)
Get the current time in microseconds since some unspecified starting point.
Definition: time.c:56
AV_LOG_WARNING
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:216
AVHashContext::crc
uint32_t crc
Definition: hash.c:70
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
opt.h
get_block_crc
static uint16_t get_block_crc(const uint8_t *block, size_t block_size)
Definition: shared.c:96
shared_close
static int shared_close(URLContext *h)
Definition: shared.c:180
set_filesize
static int set_filesize(URLContext *h, int64_t new_size)
Definition: shared.c:212
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
get_filesize
static int64_t get_filesize(URLContext *h)
Definition: shared.c:206
Spacemap::version
atomic_ushort version
Definition: shared.c:115
int64_t
long long int64_t
Definition: coverity.c:34
av_asprintf
char * av_asprintf(const char *fmt,...)
Definition: avstring.c:115
write_cache
static int write_cache(SharedContext *s, const uint8_t *buf, size_t size, off_t offset)
Definition: shared.c:550
atomic_ushort
intptr_t atomic_ushort
Definition: stdatomic.h:54
BLOCK_PENDING
@ BLOCK_PENDING
a thread is currently trying to write this block
Definition: shared.c:87
AVOption
AVOption.
Definition: opt.h:428
AVSEEK_SIZE
#define AVSEEK_SIZE
Passing this as the "whence" parameter to a seek function causes it to return the filesize without se...
Definition: avio.h:468
atomic_compare_exchange_weak_explicit
#define atomic_compare_exchange_weak_explicit(object, expected, desired, success, failure)
Definition: stdatomic.h:129
HEADER_VERSION
#define HEADER_VERSION
Definition: shared.c:82
ff_shared_protocol
const URLProtocol ff_shared_protocol
Definition: shared.c:887
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:226
ffurl_close
int ffurl_close(URLContext *h)
Definition: avio.c:617
AVDictionary
Definition: dict.c:32
FFMAX
#define FFMAX(a, b)
Definition: macros.h:47
SharedContext::cache_path
char * cache_path
Definition: shared.c:165
SharedContext::verify
int verify
Definition: shared.c:155
hash
static uint8_t hash[HASH_SIZE]
Definition: movenc.c:58
URLProtocol
Definition: url.h:51
hash_uri
static int hash_uri(uint8_t hash[HASH_SIZE], const char *uri)
Definition: shared.c:63
D
#define D
Definition: shared.c:868
BlockState
BlockState
Definition: shared.c:84
crc.h
close
static av_cold void close(AVCodecParserContext *s)
Definition: apv_parser.c:197
read_cache
static int read_cache(SharedContext *s, uint8_t *buf, size_t size, off_t offset)
Definition: shared.c:530
BLOCK_FAILED
@ BLOCK_FAILED
the underlying I/O source failed to read this block
Definition: shared.c:88
ffurl_get_short_seek
int ffurl_get_short_seek(void *urlcontext)
Return the current short seek threshold value for this URL.
Definition: avio.c:844
SharedContext::tmp_buf
uint8_t * tmp_buf
Definition: shared.c:159
SharedContext::cache_dir
char * cache_dir
Definition: shared.c:150
locked
FFmpeg hosted at Telepoint in bulgaria ns2 avcodec org Replica Name hosted at Prometeus Cdlan in italy instead several VMs run on it Main server peering exchange and hosting They have multiple DC buildings in Sofia and FFmpeg is hosted in XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX The building is locked down and accessible only with personal key cards that are registered People who are granted access to a rack have to go through the access center with their ID to get logged and receive a one time access card that can open the service elevator and only the hall where the destination rack is All racks are locked
Definition: infra.txt:35
SharedContext::nb_miss
int64_t nb_miss
Definition: shared.c:177
shared_get_file_handle
static int shared_get_file_handle(URLContext *h)
Definition: shared.c:854
avassert.h
AV_LOG_TRACE
#define AV_LOG_TRACE
Extremely verbose debugging, useful for libav* development.
Definition: log.h:236
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:210
FF_ARRAY_ELEMS
#define FF_ARRAY_ELEMS(a)
Definition: sinewin_tablegen.c:29
SharedContext::map_size
off_t map_size
Definition: shared.c:172
Spacemap::hash
atomic_uchar hash[HASH_SIZE]
Definition: shared.c:118
SharedContext::spacemap
Spacemap * spacemap
Definition: shared.c:170
ffurl_open_whitelist
int ffurl_open_whitelist(URLContext **puc, const char *filename, int flags, const AVIOInterruptCB *int_cb, AVDictionary **options, const char *whitelist, const char *blacklist, URLContext *parent)
Create an URLContext for accessing to the resource indicated by url, and open it.
Definition: avio.c:368
spacemap_init
static int spacemap_init(URLContext *h, const uint8_t hash[HASH_SIZE])
Definition: shared.c:477
avpriv_open
int avpriv_open(const char *filename, int flags,...)
A wrapper for open() setting O_CLOEXEC.
Definition: file_open.c:67
DEF_SET_ONCE
#define DEF_SET_ONCE(ctype, atype)
Definition: shared.c:125
SharedContext::read_only
int read_only
Definition: shared.c:152
av_hash_alloc
int av_hash_alloc(AVHashContext **ctx, const char *name)
Allocate a hash context for the algorithm specified by name.
Definition: hash.c:114
AV_OPT_TYPE_INT64
@ AV_OPT_TYPE_INT64
Underlying C type is int64_t.
Definition: opt.h:262
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:42
shared_context_class
static const AVClass shared_context_class
Definition: shared.c:880
Spacemap::filesize
atomic_ullong filesize
Definition: shared.c:117
AV_LOG_DEBUG
#define AV_LOG_DEBUG
Stuff which is only useful for libav* developers.
Definition: log.h:231
ctx
static AVFormatContext * ctx
Definition: movenc.c:49
SharedContext::retry_errors
int retry_errors
Definition: shared.c:154
SharedContext::write_err
int write_err
write error occurred
Definition: shared.c:161
SharedContext::nb_hit
int64_t nb_hit
Definition: shared.c:176
av_usleep
int av_usleep(unsigned usec)
Sleep for a period of time.
Definition: time.c:84
atomic_load
#define atomic_load(object)
Definition: stdatomic.h:93
Spacemap::header_magic
atomic_uint header_magic
Definition: shared.c:114
file_open.h
tmp
static uint8_t tmp[40]
Definition: aes_ctr.c:52
arg
const char * arg
Definition: jacosubdec.c:65
atomic_compare_exchange_strong_explicit
#define atomic_compare_exchange_strong_explicit(object, expected, desired, success, failure)
Definition: stdatomic.h:123
SharedContext::inner_pos
int64_t inner_pos
Definition: shared.c:147
AV_CRC_16_ANSI
@ AV_CRC_16_ANSI
Definition: crc.h:50
BLOCK_NONE
@ BLOCK_NONE
block is not cached
Definition: shared.c:86
fail
#define fail
Definition: test.h:478
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
SharedContext::map_path
char * map_path
Definition: shared.c:171
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:76
NULL
#define NULL
Definition: coverity.c:32
av_hash_init
void av_hash_init(AVHashContext *ctx)
Initialize or reset a hash context.
Definition: hash.c:151
SharedContext
Definition: shared.c:144
av_fallthrough
#define av_fallthrough
Definition: attributes.h:67
av_default_item_name
const char * av_default_item_name(void *ptr)
Return the context name.
Definition: log.c:242
options
Definition: swscale.c:50
SharedContext::block_shift
int block_shift
requested shift; may disagree with actual
Definition: shared.c:151
spacemap_remap
static int spacemap_remap(URLContext *h, size_t map_size)
Definition: shared.c:374
time.h
av_hash_update
void av_hash_update(AVHashContext *ctx, const uint8_t *src, size_t len)
Update a hash context with additional data.
Definition: hash.c:172
attributes.h
atomic_load_explicit
#define atomic_load_explicit(object, order)
Definition: stdatomic.h:96
HEADER_MAGIC
#define HEADER_MAGIC
Definition: shared.c:81
av_hash_freep
void av_hash_freep(AVHashContext **ctx)
Free hash context and set hash context pointer to NULL.
Definition: hash.c:248
error.h
Block
Definition: flashsv2enc.c:70
SharedContext::cache_size
off_t cache_size
size of mapped memory region (for munmap)
Definition: shared.c:166
shift
static int shift(int a, int b)
Definition: bonk.c:261
i
#define i(width, name, range_min, range_max)
Definition: cbs_h264.c:63
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:122
size
int size
Definition: twinvq_data.h:10344
URLProtocol::name
const char * name
Definition: url.h:52
av_hash_final
void av_hash_final(AVHashContext *ctx, uint8_t *dst)
Finalize a hash context and compute the actual hash value.
Definition: hash.c:193
shared_get_short_seek
static int shared_get_short_seek(URLContext *h)
Definition: shared.c:860
SharedContext::mapfd
int mapfd
Definition: shared.c:173
av_crc_get_table
const AVCRC * av_crc_get_table(AVCRCId crc_id)
Get an initialized standard CRC table.
Definition: crc.c:389
offset
it s the only field you need to keep assuming you have a context There is some magic you don t need to care about around this just let it vf offset
Definition: writing_filters.txt:86
AVHashContext
Definition: hash.c:66
av_strstart
int av_strstart(const char *str, const char *pfx, const char **ptr)
Return non-zero if pfx is a prefix of str.
Definition: avstring.c:36
state
static struct @593 state
spacemap_grow
static int spacemap_grow(URLContext *h, int64_t block)
Definition: shared.c:444
atomic_uchar
intptr_t atomic_uchar
Definition: stdatomic.h:52
filesize
static int64_t filesize(AVIOContext *pb)
Definition: ffmpeg_mux.c:51
URLContext
Definition: url.h:35
av_malloc
#define av_malloc(s)
Definition: ops_asmgen.c:44
shared_seek
static int64_t shared_seek(URLContext *h, int64_t pos, int whence)
Definition: shared.c:808
av_assert1
#define av_assert1(cond)
assert() equivalent, that does not lie in speed critical code.
Definition: avassert.h:58
s
uint8_t s
Definition: llvidencdsp.c:39
atomic_store_explicit
#define atomic_store_explicit(object, desired, order)
Definition: stdatomic.h:90
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
url.h
atomic_ullong
intptr_t atomic_ullong
Definition: stdatomic.h:60
SharedContext::cache_data
uint8_t * cache_data
optional mmap of the cache file
Definition: shared.c:164
ret
ret
Definition: filter_design.txt:187
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:81
pos
unsigned int pos
Definition: spdifenc.c:414
options
static const AVOption options[]
Definition: shared.c:870
hash.h
SharedContext::block_size
int block_size
Definition: shared.c:160
av_crc
uint32_t av_crc(const AVCRC *ctx, uint32_t crc, const uint8_t *buffer, size_t length)
Calculate the CRC of a block.
Definition: crc.c:421
SharedContext::timeout
int64_t timeout
Definition: shared.c:153
shared_open
static int shared_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
Definition: shared.c:235
AV_OPT_TYPE_INT
@ AV_OPT_TYPE_INT
Underlying C type is int.
Definition: opt.h:258
av_hash_get_size
int av_hash_get_size(const AVHashContext *ctx)
Definition: hash.c:109
atomic_uint
intptr_t atomic_uint
Definition: stdatomic.h:56
HASH_METHOD
#define HASH_METHOD
This hash should be resistant against collision attacks, so that an attacker could not generate e....
Definition: shared.c:60
HASH_SIZE
#define HASH_SIZE
Definition: shared.c:61
mem.h
Block::state
atomic_ushort state
Definition: shared.c:110
cache_map
static int cache_map(URLContext *h, int64_t filesize)
Definition: shared.c:338
Spacemap
Definition: shared.c:113
SharedContext::fd
int fd
Definition: shared.c:167
FFALIGN
#define FFALIGN(x, a)
Definition: macros.h:78
SharedContext::inner
URLContext * inner
Definition: shared.c:146
AV_OPT_TYPE_BOOL
@ AV_OPT_TYPE_BOOL
Underlying C type is int.
Definition: opt.h:326
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:35
Spacemap::blocks
Block blocks[]
Definition: shared.c:121
SharedContext::pos
int64_t pos
current logical position
Definition: shared.c:158
OFFSET
#define OFFSET(x)
Definition: shared.c:867
Spacemap::reserved
char reserved[80]
Definition: shared.c:119
block
The exact code depends on how similar the blocks are and how related they are to the block
Definition: filter_design.txt:207
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
ffurl_size
int64_t ffurl_size(URLContext *h)
Return the filesize of the resource accessed by h, AVERROR(ENOSYS) if the operation is not supported ...
Definition: avio.c:805
h
h
Definition: vp9dsp_template.c:2070
AVERROR_EXIT
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:58
avstring.h
shared_read
static int shared_read(URLContext *h, unsigned char *buf, int size)
Definition: shared.c:581
AV_OPT_TYPE_STRING
@ AV_OPT_TYPE_STRING
Underlying C type is a uint8_t* that is either NULL or points to a C string allocated with the av_mal...
Definition: opt.h:275
clamp_size
static size_t clamp_size(URLContext *h, size_t size, int64_t pos)
Definition: shared.c:570
Spacemap::block_shift
atomic_ushort block_shift
Definition: shared.c:116
ffurl_get_file_handle
int ffurl_get_file_handle(URLContext *h)
Return the file descriptor associated with this URL.
Definition: avio.c:820
ffurl_read
static int ffurl_read(URLContext *h, uint8_t *buf, int size)
Read up to size bytes from the resource accessed by h, and store the read bytes in buf.
Definition: url.h:181