FFmpeg
shared.c
Go to the documentation of this file.
1 /*
2  * Shared file cache protocol.
3  * Copyright (c) 2026 Niklas Haas
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  *
21  * Based on cache.c by Michael Niedermayer
22  */
23 
24 #include "libavutil/attributes.h"
25 #include "libavutil/avassert.h"
26 #include "libavutil/avstring.h"
27 #include "libavutil/crc.h"
28 #include "libavutil/hash.h"
29 #include "libavutil/file_open.h"
30 #include "libavutil/mem.h"
31 #include "libavutil/opt.h"
32 #include "libavutil/time.h"
33 
34 #include "url.h"
35 
36 #include <errno.h>
37 #include <fcntl.h>
38 #include <stdatomic.h>
39 #include <sys/file.h>
40 #include <sys/mman.h>
41 #include <sys/stat.h>
42 #include <unistd.h>
43 
44 /**
45  * This hash should be resistant against collision attacks, so that an
46  * attacker could not generate e.g. two different URIs that map to the same
47  * cache file. This requires at least 64 bits of collision resistance in
48  * practice (i.e. 128 bits = 16 bytes of hash size). However, we can be
49  * conservative by computing e.g. a 256 bit hash and storing it inside the
50  * file header for verification.
51  *
52  * Note that due to the way we use atomics, we should avoid zero bytes in
53  * the resulting hash; hence we tweak the input slightly to avoid this.
54  * The resulting loss in hash strength is negligible, since 32 bytes is
55  * already much more than needed.
56  */
57 #define HASH_METHOD "SHA512/256"
58 #define HASH_SIZE 32
59 
60 static int hash_uri(uint8_t hash[HASH_SIZE], const char *uri)
61 {
62  struct AVHashContext *ctx = NULL;
64  if (ret < 0)
65  return ret;
66 
69  av_hash_update(ctx, (const uint8_t *) uri, strlen(uri));
72 
73  for (int i = 0; i < HASH_SIZE; i++)
74  hash[i] = hash[i] ? hash[i] : ~hash[i]; /* prevent zero bytes */
75  return 0;
76 }
77 
78 #define HEADER_MAGIC MKTAG(u'\xFF', 'S', 'h', '$')
79 #define HEADER_VERSION 2
80 
81 enum BlockState {
82  /* Reserved block state values */
83  BLOCK_NONE = 0, ///< block is not cached
84  BLOCK_PENDING, ///< a thread is currently trying to write this block
85  BLOCK_FAILED, ///< the underlying I/O source failed to read this block
86 
87  /**
88  * All other block states represent valid cached blocks, with the value
89  * being the CRC of the block data.
90  */
91 };
92 
93 static uint16_t get_block_crc(const uint8_t *block, size_t block_size)
94 {
95  uint16_t crc = av_crc(av_crc_get_table(AV_CRC_16_ANSI), 0, block, block_size);
96  switch (crc) {
97  case BLOCK_NONE:
98  case BLOCK_FAILED:
99  case BLOCK_PENDING:
100  return ~crc; /* avoid reserved block states */
101  default:
102  return crc;
103  }
104 }
105 
106 typedef struct Block {
107  atomic_ushort state; /* enum BlockState */
108 } Block;
109 
110 typedef struct Spacemap {
114  atomic_ullong filesize; /* byte offset of true EOF, or 0 if unknown */
115  atomic_uchar hash[HASH_SIZE]; /* hash of resource URI / filename */
116  char reserved[80];
117 
119 } Spacemap;
120 
121 /* Set to value iff the current value is unset (zero) */
122 #define DEF_SET_ONCE(ctype, atype) \
123  static int set_once_##atype(atomic_##atype *const ptr, const ctype value) \
124  { \
125  ctype prev = 0; \
126  av_assert1(value != 0); \
127  if (atomic_compare_exchange_strong_explicit( \
128  ptr, &prev, value, memory_order_release, memory_order_relaxed)) \
129  return 1; \
130  else if (prev == value) \
131  return 0; \
132  else \
133  return AVERROR(EINVAL); \
134  }
135 
136 DEF_SET_ONCE(unsigned char, uchar)
137 DEF_SET_ONCE(unsigned int, uint)
138 DEF_SET_ONCE(unsigned short, ushort)
139 DEF_SET_ONCE(unsigned long long, ullong)
140 
141 typedef struct SharedContext {
142  AVClass *class;
145 
146  /* options */
147  char *cache_dir;
148  int block_shift; ///< requested shift; may disagree with actual
152  int verify;
153 
154  /* misc state */
155  int64_t pos; ///< current logical position
156  uint8_t *tmp_buf;
158  int write_err; ///< write error occurred
159 
160  /* cache file */
161  uint8_t *cache_data; ///< optional mmap of the cache file
162  char *cache_path;
163  off_t cache_size; ///< size of mapped memory region (for munmap)
164  int fd;
165 
166  /* space map */
168  char *map_path;
169  off_t map_size;
170  int mapfd;
171 
172  /* statistics */
175 } SharedContext;
176 
178 {
179  SharedContext *s = h->priv_data;
180 
181  ffurl_close(s->inner);
182  if (s->cache_data)
183  munmap(s->cache_data, s->cache_size);
184  if (s->spacemap)
185  munmap(s->spacemap, s->map_size);
186  if (s->fd != -1)
187  close(s->fd);
188  if (s->mapfd != -1)
189  close(s->mapfd);
190  av_freep(&s->cache_path);
191  av_freep(&s->map_path);
192  av_freep(&s->tmp_buf);
193 
194  av_log(h, AV_LOG_DEBUG, "Cache statistics: %"PRId64" hits, %"PRId64" misses\n",
195  s->nb_hit, s->nb_miss);
196  return 0;
197 }
198 
199 static int cache_map(URLContext *h, int64_t filesize);
200 static int spacemap_init(URLContext *h, const uint8_t hash[HASH_SIZE]);
201 static int spacemap_grow(URLContext *h, int64_t block);
202 
204 {
205  SharedContext *s = h->priv_data;
206  return atomic_load_explicit(&s->spacemap->filesize, memory_order_relaxed);
207 }
208 
209 static int set_filesize(URLContext *h, int64_t new_size)
210 {
211  SharedContext *s = h->priv_data;
212  int ret;
213 
214  if (!new_size)
215  return 0;
216 
217  ret = set_once_ullong(&s->spacemap->filesize, new_size);
218  if (ret < 0) {
219  av_log(h, AV_LOG_ERROR, "Cached file size mismatch, expected: "
220  "%"PRId64", got: %"PRIu64"!\n", new_size,
221  (uint64_t) atomic_load(&s->spacemap->filesize));
222  return ret;
223  } else if (ret) {
224  /* Opportunistically map the file; this also sets the correct filesize.
225  * Ignore errors as this is not critical to the cache logic. */
226  cache_map(h, new_size);
227  }
228 
229  return ret;
230 }
231 
232 static int shared_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
233 {
234  SharedContext *s = h->priv_data;
235  int ret;
236 
237  if (!s->cache_dir || !s->cache_dir[0]) {
238  av_log(h, AV_LOG_ERROR, "Missing path for shared cache! Specify a "
239  "directory using the -cache_dir option.\n");
240  return AVERROR(EINVAL);
241  }
242 
243  s->fd = s->mapfd = -1; /* Set these early for shared_close() failure path */
244 
245  /* Open underlying protocol */
246  av_strstart(arg, "shared:", &arg);
247  ret = ffurl_open_whitelist(&s->inner, arg, flags, &h->interrupt_callback,
248  options, h->protocol_whitelist, h->protocol_blacklist, h);
249 
250  if (ret < 0)
251  goto fail;
252 
253  uint8_t hash[HASH_SIZE];
254  ret = hash_uri(hash, arg);
255  if (ret < 0)
256  goto fail;
257 
258  /* 128 bits is enough for collision resistance; we already store the full
259  * hash inside the header for verification */
260  char filename[2 * 16 + 1];
261  for (int i = 0; i < FF_ARRAY_ELEMS(filename) / 2; i++)
262  sprintf(&filename[i * 2], "%02X", hash[i]);
263  s->cache_path = av_asprintf("%s/%s.cache", s->cache_dir, filename);
264  s->map_path = av_asprintf("%s/%s.spacemap", s->cache_dir, filename);
265  if (!s->cache_path || !s->map_path) {
266  ret = AVERROR(ENOMEM);
267  goto fail;
268  }
269 
270  av_log(h, AV_LOG_VERBOSE, "Opening cache file '%s' for URI: '%s'\n",
271  s->cache_path, s->inner->filename);
272 
273  s->fd = avpriv_open(s->cache_path, O_RDWR | O_CREAT, 0660);
274  s->mapfd = avpriv_open(s->map_path, O_RDWR | O_CREAT, 0660);
275  if (s->fd < 0 || s->mapfd < 0) {
276  ret = AVERROR(errno);
277  av_log(h, AV_LOG_ERROR, "Failed to open '%s': %s\n",
278  s->fd < 0 ? s->cache_path : s->map_path, av_err2str(ret));
279  goto fail;
280  }
281 
282  ret = spacemap_init(h, hash);
283  if (ret < 0)
284  goto fail;
285 
286  s->block_size = 1 << atomic_load(&s->spacemap->block_shift);
287 
289  if (!filesize) {
290  /* Filesize is not yet known, try to get it from the underlying URL */
291  filesize = ffurl_size(s->inner);
292  if (filesize < 0 && filesize != AVERROR(ENOSYS)) {
293  ret = (int) filesize;
294  goto fail;
295  } else if (filesize > 0)
297  }
298 
299  if (filesize > 0) {
300  int64_t last_pos = filesize - 1;
301  int64_t last_block = last_pos >> atomic_load(&s->spacemap->block_shift);
302  ret = spacemap_grow(h, last_block);
303  if (ret < 0)
304  goto fail;
305 
306  /* If filesize is known, we can directly mmap() the cache file */
307  ret = cache_map(h, filesize);
308  if (ret < 0) {
309  av_log(h, AV_LOG_WARNING, "Failed to map cache file: %s. Falling "
310  "back to normal read/write\n", av_err2str(ret));
311  ret = 0;
312  }
313  }
314 
315  if (!s->cache_data) {
316  /* Temporary buffer needed for pread/pwrite() fallback */
317  s->tmp_buf = av_malloc(s->block_size);
318  if (!s->tmp_buf) {
319  ret = AVERROR(ENOMEM);
320  goto fail;
321  }
322  }
323 
324  h->max_packet_size = s->block_size;
325  h->min_packet_size = s->block_size;
326 
327 fail:
328  if (ret < 0)
329  shared_close(h);
330  return ret;
331 }
332 
334 {
335  SharedContext *s = h->priv_data;
336  if (s->cache_size >= filesize || filesize > SIZE_MAX)
337  return 0;
338 
339  if (s->cache_data) {
340  munmap(s->cache_data, s->cache_size);
341  s->cache_data = NULL;
342  s->cache_size = 0;
343  }
344 
345  struct stat st;
346  int ret = fstat(s->fd, &st);
347  if (ret < 0)
348  return AVERROR(errno);
349 
350  if (st.st_size != filesize) {
351  /* Ensure the file size is correct before mapping; this can happen if
352  * another process wrote the correct filesize to the header but
353  * crashed right before actually successfully resizing the file. */
354  ret = ftruncate(s->fd, filesize);
355  if (ret < 0)
356  return AVERROR(errno);
357  }
358 
359  s->cache_data = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, s->fd, 0);
360  if (s->cache_data == MAP_FAILED) {
361  s->cache_data = NULL;
362  return AVERROR(errno);
363  }
364 
365  s->cache_size = filesize;
366  return 0;
367 }
368 
369 static int spacemap_remap(URLContext *h, size_t map_size)
370 {
371  SharedContext *s = h->priv_data;
372  struct flock fl = { .l_type = F_WRLCK };
373  int ret, did_grow = 0;
374  if (map_size <= s->map_size)
375  return 0;
376 
377  /* Opportunistically get current filesize before attempting to lock */
378  struct stat st;
379  ret = fstat(s->mapfd, &st);
380  if (ret < 0) {
381  ret = AVERROR(errno);
382  goto fail;
383  }
384 
385  if (st.st_size >= map_size)
386  goto skip_resize;
387 
388  /* Lock the spacemap to ensure nobody else is currently resizing it */
389  ret = fcntl(s->mapfd, F_SETLKW, &fl);
390  if (ret < 0) {
391  ret = AVERROR(errno);
392  goto fail;
393  }
394  fl.l_type = F_UNLCK;
395 
396  /* Refresh filesize after acquiring the lock */
397  ret = fstat(s->mapfd, &st);
398  if (ret < 0) {
399  ret = AVERROR(errno);
400  goto fail;
401  }
402 
403  if (st.st_size >= map_size)
404  goto skip_resize;
405 
406  ret = ftruncate(s->mapfd, map_size);
407  if (ret < 0) {
408  ret = AVERROR(errno);
409  goto fail;
410  }
411  st.st_size = map_size;
412  did_grow = 1;
413 
414 skip_resize:
415  if (s->spacemap)
416  munmap(s->spacemap, s->map_size);
417  s->map_size = st.st_size;
418  s->spacemap = mmap(NULL, s->map_size, PROT_READ | PROT_WRITE, MAP_SHARED, s->mapfd, 0);
419  if (s->spacemap == MAP_FAILED) {
420  s->spacemap = NULL; /* for munmap check */
421  s->map_size = 0;
422  ret = AVERROR(errno);
423  goto fail;
424  }
425 
426  /* fl.l_type is set to F_UNLCK only after successful lock */
427  if (fl.l_type == F_UNLCK)
428  fcntl(s->mapfd, F_SETLK, &fl);
429 
430  return did_grow;
431 
432 fail:
433  if (fl.l_type == F_UNLCK)
434  fcntl(s->mapfd, F_SETLK, &fl);
435  av_log(h, AV_LOG_ERROR, "Failed to resize space map: %s\n", av_err2str(ret));
436  return ret;
437 }
438 
440 {
441  SharedContext *s = h->priv_data;
442  int64_t num_blocks = block + 1;
443  size_t map_bytes = sizeof(Spacemap) + num_blocks * sizeof(Block);
444 
445  /* When streaming files without known size, round up the number of blocks
446  * to the nearest multiple of the block size to reduce the rate of resizes */
447  if (!get_filesize(h)) {
448  av_assert0(s->block_size > 0);
449  map_bytes = FFALIGN(map_bytes, (int64_t) s->block_size);
450  }
451 
452  if (map_bytes < num_blocks)
453  return AVERROR(EINVAL); /* overflow */
454 
455  const off_t old_size = s->map_size;
456  int ret = spacemap_remap(h, map_bytes);
457  if (ret < 0)
458  return ret;
459 
460  /* Report new size after successful grow */
461  if (s->map_size > old_size) {
462  num_blocks = (s->map_size - sizeof(Spacemap)) / sizeof(Block);
464  "%s %zu bytes, capacity: %"PRId64" blocks = %zu MB\n",
465  ret ? "Resized spacemap to" : "Mapped spacemap with",
466  (size_t) s->map_size, num_blocks,
467  (num_blocks * (int64_t) s->block_size) >> 20);
468  }
469  return 0;
470 }
471 
472 static int spacemap_init(URLContext *h, const uint8_t hash[HASH_SIZE])
473 {
474  SharedContext *s = h->priv_data;
475  int ret;
476 
477  ret = spacemap_remap(h, sizeof(Spacemap));
478  if (ret < 0)
479  return ret;
480 
481  if ((ret = set_once_uint(&s->spacemap->header_magic, HEADER_MAGIC)) < 0 ||
482  (ret = set_once_ushort(&s->spacemap->version, HEADER_VERSION)) < 0)
483  {
484  av_log(h, AV_LOG_ERROR, "Shared cache spacemap header mismatch!\n");
485  av_log(h, AV_LOG_ERROR, " Expected magic: 0x%X, version: %d\n",
487  av_log(h, AV_LOG_ERROR, " Got magic: 0x%X, version: %d\n",
488  atomic_load(&s->spacemap->header_magic),
489  atomic_load(&s->spacemap->version));
490  return ret;
491  }
492 
493  ret = set_once_ushort(&s->spacemap->block_shift, s->block_shift);
494  if (ret < 0) {
495  const int shift = atomic_load(&s->spacemap->block_shift);
496  av_log(h, AV_LOG_WARNING, "Shared cache uses block shift %d, "
497  "but requested block shift is %d.\n", shift, s->block_shift);
498  if (shift < 9 || shift > 30) {
499  av_log(h, AV_LOG_ERROR, "Invalid block shift %d in cache file!\n", shift);
500  return AVERROR(EINVAL);
501  }
502  }
503 
504  for (int i = 0; i < HASH_SIZE; i++) {
505  ret = set_once_uchar(&s->spacemap->hash[i], hash[i]);
506  if (ret < 0) {
507  av_log(h, AV_LOG_ERROR, "Shared cache spacemap hash mismatch!\n");
508  av_log(h, AV_LOG_ERROR, " Expected hash: ");
509  for (int j = 0; j < 32; j++)
510  av_log(h, AV_LOG_ERROR, "%02X", hash[j]);
511  av_log(h, AV_LOG_ERROR, "\n Got hash: ");
512  for (int j = 0; j < 32; j++)
513  av_log(h, AV_LOG_ERROR, "%02X", atomic_load(&s->spacemap->hash[j]));
514  av_log(h, AV_LOG_ERROR, "\n");
515  return ret;
516  }
517  }
518 
519  if (ret) /* set_once() return 1 if this is the first time setting the value */
520  av_log(h, AV_LOG_DEBUG, "Initialized new cache spacemap.\n");
521 
522  return ret;
523 }
524 
525 static int read_cache(SharedContext *s, uint8_t *buf, size_t size, off_t offset)
526 {
527  if (s->cache_data) {
528  av_assert1(offset + size <= s->cache_size);
529  memcpy(buf, s->cache_data + offset, size);
530  return 0;
531  }
532 
533  while (size) {
534  ssize_t ret = pread(s->fd, buf, size, offset);
535  if (ret <= 0)
536  return ret ? AVERROR(errno) : AVERROR(EIO);
537  buf += ret;
538  offset += ret;
539  size -= ret;
540  }
541 
542  return 0;
543 }
544 
545 static int write_cache(SharedContext *s, const uint8_t *buf, size_t size, off_t offset)
546 {
547  if (s->cache_data) {
548  av_assert1(offset + size <= s->cache_size);
549  memcpy(s->cache_data + offset, buf, size);
550  return 0;
551  }
552 
553  while (size) {
554  ssize_t ret = pwrite(s->fd, buf, size, offset);
555  if (ret <= 0)
556  return ret ? AVERROR(errno) : AVERROR(EIO);
557  buf += ret;
558  offset += ret;
559  size -= ret;
560  }
561 
562  return 0;
563 }
564 
565 static size_t clamp_size(URLContext *h, size_t size, int64_t pos)
566 {
567  const int64_t filesize = get_filesize(h);
568  if (!filesize)
569  return size;
570  else if (pos > filesize)
571  return 0;
572  else
573  return FFMIN(filesize - pos, size);
574 }
575 
576 static int shared_read(URLContext *h, unsigned char *buf, int size)
577 {
578  SharedContext *s = h->priv_data;
579  uint8_t *tmp;
580  int ret;
581 
582  if (size <= 0)
583  return 0;
584 
585  size = clamp_size(h, size, s->pos);
586  if (size <= 0)
587  return AVERROR_EOF;
588 
589  const int shift = atomic_load_explicit(&s->spacemap->block_shift, memory_order_relaxed);
590  const int64_t block_id = s->pos >> shift;
591  const int64_t offset = s->pos & (s->block_size - 1);
592  const int64_t block_pos = block_id * s->block_size;
593  int block_size = clamp_size(h, s->block_size, block_pos);
594  ret = spacemap_grow(h, block_id);
595  if (ret < 0)
596  return ret;
597 
598  Block *const block = &s->spacemap->blocks[block_id];
599  unsigned short state = atomic_load_explicit(&block->state, memory_order_acquire);
600  int64_t pending_since = 0;
601  int verify_read = 0;
602 
603 retry:
604  switch (state) {
605  default:
606  /* We always need to read the entire block to verify integrity */
607  block_size = clamp_size(h, block_size, block_pos); /* filesize may have changed */
608  if (s->cache_data) {
609  av_assert1(block_pos + block_size <= s->cache_size);
610  tmp = s->cache_data + block_pos;
611  } else {
612  tmp = s->tmp_buf;
613  ret = read_cache(s, tmp, block_size, block_pos);
614  if (ret < 0) {
615  av_log(h, AV_LOG_ERROR, "Failed to read from cache file: %s\n", av_err2str(ret));
616  return ret;
617  }
618  }
619 
620  uint16_t crc = get_block_crc(tmp, block_size);
621  if (crc != state) {
622  av_log(h, AV_LOG_ERROR, "Cache corruption detected for block 0x%"PRIx64" at "
623  "offset 0x%"PRIx64": expected CRC: 0x%04X, got: 0x%04X\n",
624  block_id, block_pos, state, crc);
625  return AVERROR(EIO);
626  }
627 
628  tmp += (ptrdiff_t) offset;
629  size = FFMIN(size, block_size - offset);
630  if (s->verify) {
631  verify_read = 1;
632  break; /* fall through to the cache miss logic */
633  }
634 
635  memcpy(buf, tmp, size);
636  s->nb_hit++;
637  s->pos += size;
638  return size;
639 
640  case BLOCK_FAILED:
641  if (!s->retry_errors)
642  return AVERROR(EIO);
644  case BLOCK_NONE:
645  if (s->read_only)
646  break; /* don't mark block as pending */
649  memory_order_acquire,
650  memory_order_acquire))
651  {
652  /* Acquired pending state, proceed to fetch the block */
654  break;
655  }
656  /* CAS failed, another thread changed the state; reload it */
657  goto retry;
658 
659  case BLOCK_PENDING:
660  /* Another thread is busy fetching this block, wait for it to finish */
661  if (!s->timeout) {
662  break; /* no timeout requested, immediately race to fetch block */
663  } else if (pending_since) {
665  if (new - pending_since >= s->timeout)
666  break; /* timeout expired, try to fetch the block ourselves */
667  } else {
668  pending_since = av_gettime_relative();
669  }
670 
671  /* Make sure we try a few times before giving up */
672  av_usleep(s->timeout >> 4);
673  state = atomic_load_explicit(&block->state, memory_order_acquire);
674  goto retry;
675  }
676 
677  /* Cache miss, fetch this block from underlying protocol */
678  s->nb_miss++;
679 
680  const int read_only = s->read_only || s->write_err || verify_read;
681  int64_t inner_pos = read_only ? s->pos : block_pos;
682  if (s->inner_pos != inner_pos) {
683  inner_pos = ffurl_seek(s->inner, inner_pos, SEEK_SET);
684  if (inner_pos < 0) {
685  av_log(h, AV_LOG_ERROR, "Failed to seek underlying protocol: %s\n",
686  av_err2str(inner_pos));
687  if (!read_only) {
688  /* Release pending state to avoid stalling other threads. Don't
689  * mark this as failed, since the seek error may be unrelated to
690  * the block and should probably be tried again. */
692  BLOCK_NONE,
693  memory_order_relaxed,
694  memory_order_relaxed);
695  }
696  return inner_pos;
697  }
698 
699  av_log(h, AV_LOG_DEBUG, "Inner seek to 0x%"PRIx64"\n", inner_pos);
700  s->inner_pos = inner_pos;
701  }
702 
703  if (read_only) {
704  /* Directly defer to the underlying protocol */
705  ret = ffurl_read(s->inner, buf, size);
706  if (ret < 0)
707  return ret;
708 
709  /* Verify the read data against the cached data if requested */
710  if (verify_read && memcmp(buf, tmp, ret)) {
711  av_log(h, AV_LOG_ERROR, "Cache verification failed for %d bytes "
712  "in block 0x%"PRIx64" at offset 0x%"PRIx64" + %"PRId64"!\n",
713  ret, block_id, block_pos, offset);
714  }
715 
716  s->pos = s->inner_pos = inner_pos + ret;
717  return ret;
718  }
719 
720  int write_back = 1;
721  if (s->cache_data) {
722  /* Read directly into memory mapped cache file */
723  tmp = s->cache_data + block_pos;
724  write_back = 0;
725  } else if (size >= block_size && !offset) {
726  /* Read directly into output buffer if aligned and large enough */
727  tmp = buf;
728  } else {
729  /* Read into temporary buffer and copy later */
730  tmp = s->tmp_buf;
731  }
732 
733  /* Try and fetch the entire block */
734  av_assert0(inner_pos == block_pos);
735  int bytes_read = 0;
736  while (bytes_read < block_size) {
737  ret = ffurl_read(s->inner, &tmp[bytes_read], block_size - bytes_read);
738  if (!ret || ret == AVERROR_EOF)
739  break;
740  else if (ret < 0) {
741  av_log(h, AV_LOG_ERROR, "Failed to read block 0x%"PRIx64": %s\n",
742  block_id, av_err2str(ret));
743  /* Try to mark block as failed; ignore errors - any mismatch
744  * here will mean that either another thread already marked it
745  * as failed, or successfully cached it in the meantime */
747  BLOCK_FAILED,
748  memory_order_relaxed,
749  memory_order_relaxed);
750  return ret;
751  }
752 
753  bytes_read += ret;
754  s->inner_pos += ret;
755  }
756 
757  if (bytes_read < block_size) {
758  /* Learned location of true EOF, update filesize */
759  ret = set_filesize(h, inner_pos + bytes_read);
760  if (ret < 0)
761  return ret;
762  }
763 
764  if (bytes_read > 0) {
765  ret = write_back ? write_cache(s, tmp, bytes_read, block_pos) : 0;
766  if (ret < 0) {
767  av_log(h, AV_LOG_ERROR, "Failed to write to cache file: %s\n",
768  av_err2str(ret));
769  s->write_err = 1;
770  /* Mark as NONE, not FAILED, since the block itself is fine -
771  * just absent from the cache. */
773  BLOCK_NONE,
774  memory_order_relaxed,
775  memory_order_relaxed);
776  } else {
777  uint16_t crc = get_block_crc(tmp, bytes_read);
778  av_log(h, AV_LOG_TRACE, "Cached %d bytes to block 0x%"PRIx64" at "
779  "offset 0x%"PRIx64", CRC 0x%04X\n", bytes_read, block_id,
780  block_pos, crc);
781  atomic_store_explicit(&block->state, crc, memory_order_release);
782  }
783  } else {
784  return AVERROR_EOF;
785  }
786 
787  size = FFMIN(bytes_read - offset, size);
788  av_assert0(size > 0);
789  if (tmp != buf)
790  memcpy(buf, &tmp[offset], size);
791  s->pos += size;
792  return size;
793 }
794 
795 static int64_t shared_seek(URLContext *h, int64_t pos, int whence)
796 {
797  SharedContext *s = h->priv_data;
798  const int64_t filesize = get_filesize(h);
799  int64_t res;
800 
801  switch (whence) {
802  case AVSEEK_SIZE:
803  if (filesize)
804  return filesize;
805  res = ffurl_seek(s->inner, pos, whence);
806  if (res > 0)
807  set_filesize(h, res);
808  return res;
809  case SEEK_SET:
810  break;
811  case SEEK_CUR:
812  pos += s->pos;
813  break;
814  case SEEK_END:
815  if (filesize) {
816  pos += filesize;
817  break;
818  }
819  /* Defer to underlying protocol if filesize is unknown */
820  res = ffurl_seek(s->inner, pos, whence);
821  if (res < 0)
822  return res;
823  set_filesize(h, res - pos); /* Opportunistically update known filesize */
824  av_log(h, AV_LOG_DEBUG, "Inner seek to 0x%"PRIx64"\n", res);
825  return s->pos = s->inner_pos = res;
826  default:
827  return AVERROR(EINVAL);
828  }
829 
830  if (pos < 0)
831  return AVERROR(EINVAL);
832 
833  av_log(h, AV_LOG_DEBUG, "Virtual seek to 0x%"PRIx64"\n", pos);
834  return s->pos = pos;
835 }
836 
838 {
839  SharedContext *s = h->priv_data;
840  return ffurl_get_file_handle(s->inner);
841 }
842 
844 {
845  SharedContext *s = h->priv_data;
846  int ret = ffurl_get_short_seek(s->inner);
847  if (ret < 0)
848  return ret;
849  return FFMAX(ret, s->block_size);
850 }
851 
852 #define OFFSET(x) offsetof(SharedContext, x)
853 #define D AV_OPT_FLAG_DECODING_PARAM
854 
855 static const AVOption options[] = {
856  { "cache_dir", "Directory path for shared file cache", OFFSET(cache_dir), AV_OPT_TYPE_STRING, {.str = NULL}, .flags = D },
857  { "block_shift", "Set the base 2 logarithm of the block size", OFFSET(block_shift), AV_OPT_TYPE_INT, {.i64 = 15}, 9, 30, .flags = D },
858  { "read_only", "Don't write data to the cache, only read from it", OFFSET(read_only), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, .flags = D },
859  { "cache_verify", "Verify correctness of the cache against the source", OFFSET(verify), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, .flags = D },
860  { "cache_timeout", "Time in us to wait before re-fetching pending blocks", OFFSET(timeout), AV_OPT_TYPE_INT64, {.i64 = 0}, 0, INT64_MAX, .flags = D },
861  { "retry_errors", "Re-request blocks even if they previously failed", OFFSET(retry_errors), AV_OPT_TYPE_BOOL, {.i64 = 1}, 0, 1, .flags = D },
862  {0},
863 };
864 
866  .class_name = "shared",
867  .item_name = av_default_item_name,
868  .option = options,
869  .version = LIBAVUTIL_VERSION_INT,
870 };
871 
873  .name = "shared",
874  .url_open2 = shared_open,
875  .url_read = shared_read,
876  .url_seek = shared_seek,
877  .url_close = shared_close,
878  .url_get_file_handle = shared_get_file_handle,
879  .url_get_short_seek = shared_get_short_seek,
880  .priv_data_size = sizeof(SharedContext),
881  .priv_data_class = &shared_context_class,
882 };
flags
const SwsFlags flags[]
Definition: swscale.c:85
ffurl_seek
static int64_t ffurl_seek(URLContext *h, int64_t pos, int whence)
Change the position that will be used by the next read/write operation on the resource accessed by h.
Definition: url.h:222
av_gettime_relative
int64_t av_gettime_relative(void)
Get the current time in microseconds since some unspecified starting point.
Definition: time.c:56
AV_LOG_WARNING
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:216
AVHashContext::crc
uint32_t crc
Definition: hash.c:70
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
opt.h
get_block_crc
static uint16_t get_block_crc(const uint8_t *block, size_t block_size)
Definition: shared.c:93
shared_close
static int shared_close(URLContext *h)
Definition: shared.c:177
set_filesize
static int set_filesize(URLContext *h, int64_t new_size)
Definition: shared.c:209
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
get_filesize
static int64_t get_filesize(URLContext *h)
Definition: shared.c:203
Spacemap::version
atomic_ushort version
Definition: shared.c:112
int64_t
long long int64_t
Definition: coverity.c:34
av_asprintf
char * av_asprintf(const char *fmt,...)
Definition: avstring.c:115
state
static struct @583 state
write_cache
static int write_cache(SharedContext *s, const uint8_t *buf, size_t size, off_t offset)
Definition: shared.c:545
atomic_ushort
intptr_t atomic_ushort
Definition: stdatomic.h:54
BLOCK_PENDING
@ BLOCK_PENDING
a thread is currently trying to write this block
Definition: shared.c:84
AVOption
AVOption.
Definition: opt.h:429
AVSEEK_SIZE
#define AVSEEK_SIZE
Passing this as the "whence" parameter to a seek function causes it to return the filesize without se...
Definition: avio.h:468
atomic_compare_exchange_weak_explicit
#define atomic_compare_exchange_weak_explicit(object, expected, desired, success, failure)
Definition: stdatomic.h:129
HEADER_VERSION
#define HEADER_VERSION
Definition: shared.c:79
ff_shared_protocol
const URLProtocol ff_shared_protocol
Definition: shared.c:872
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:226
ffurl_close
int ffurl_close(URLContext *h)
Definition: avio.c:617
AVDictionary
Definition: dict.c:32
FFMAX
#define FFMAX(a, b)
Definition: macros.h:47
SharedContext::cache_path
char * cache_path
Definition: shared.c:162
SharedContext::verify
int verify
Definition: shared.c:152
hash
static uint8_t hash[HASH_SIZE]
Definition: movenc.c:58
URLProtocol
Definition: url.h:51
hash_uri
static int hash_uri(uint8_t hash[HASH_SIZE], const char *uri)
Definition: shared.c:60
D
#define D
Definition: shared.c:853
BlockState
BlockState
Definition: shared.c:81
crc.h
close
static av_cold void close(AVCodecParserContext *s)
Definition: apv_parser.c:197
read_cache
static int read_cache(SharedContext *s, uint8_t *buf, size_t size, off_t offset)
Definition: shared.c:525
BLOCK_FAILED
@ BLOCK_FAILED
the underlying I/O source failed to read this block
Definition: shared.c:85
ffurl_get_short_seek
int ffurl_get_short_seek(void *urlcontext)
Return the current short seek threshold value for this URL.
Definition: avio.c:844
SharedContext::tmp_buf
uint8_t * tmp_buf
Definition: shared.c:156
SharedContext::cache_dir
char * cache_dir
Definition: shared.c:147
SharedContext::nb_miss
int64_t nb_miss
Definition: shared.c:174
shared_get_file_handle
static int shared_get_file_handle(URLContext *h)
Definition: shared.c:837
avassert.h
AV_LOG_TRACE
#define AV_LOG_TRACE
Extremely verbose debugging, useful for libav* development.
Definition: log.h:236
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:210
FF_ARRAY_ELEMS
#define FF_ARRAY_ELEMS(a)
Definition: sinewin_tablegen.c:29
SharedContext::map_size
off_t map_size
Definition: shared.c:169
Spacemap::hash
atomic_uchar hash[HASH_SIZE]
Definition: shared.c:115
SharedContext::spacemap
Spacemap * spacemap
Definition: shared.c:167
ffurl_open_whitelist
int ffurl_open_whitelist(URLContext **puc, const char *filename, int flags, const AVIOInterruptCB *int_cb, AVDictionary **options, const char *whitelist, const char *blacklist, URLContext *parent)
Create an URLContext for accessing to the resource indicated by url, and open it.
Definition: avio.c:368
spacemap_init
static int spacemap_init(URLContext *h, const uint8_t hash[HASH_SIZE])
Definition: shared.c:472
s
#define s(width, name)
Definition: cbs_vp9.c:198
avpriv_open
int avpriv_open(const char *filename, int flags,...)
A wrapper for open() setting O_CLOEXEC.
Definition: file_open.c:67
DEF_SET_ONCE
#define DEF_SET_ONCE(ctype, atype)
Definition: shared.c:122
SharedContext::read_only
int read_only
Definition: shared.c:149
av_hash_alloc
int av_hash_alloc(AVHashContext **ctx, const char *name)
Allocate a hash context for the algorithm specified by name.
Definition: hash.c:114
AV_OPT_TYPE_INT64
@ AV_OPT_TYPE_INT64
Underlying C type is int64_t.
Definition: opt.h:263
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:42
shared_context_class
static const AVClass shared_context_class
Definition: shared.c:865
Spacemap::filesize
atomic_ullong filesize
Definition: shared.c:114
AV_LOG_DEBUG
#define AV_LOG_DEBUG
Stuff which is only useful for libav* developers.
Definition: log.h:231
ctx
static AVFormatContext * ctx
Definition: movenc.c:49
SharedContext::retry_errors
int retry_errors
Definition: shared.c:151
SharedContext::write_err
int write_err
write error occurred
Definition: shared.c:158
SharedContext::nb_hit
int64_t nb_hit
Definition: shared.c:173
av_usleep
int av_usleep(unsigned usec)
Sleep for a period of time.
Definition: time.c:84
atomic_load
#define atomic_load(object)
Definition: stdatomic.h:93
Spacemap::header_magic
atomic_uint header_magic
Definition: shared.c:111
file_open.h
tmp
static uint8_t tmp[40]
Definition: aes_ctr.c:52
arg
const char * arg
Definition: jacosubdec.c:65
atomic_compare_exchange_strong_explicit
#define atomic_compare_exchange_strong_explicit(object, expected, desired, success, failure)
Definition: stdatomic.h:123
SharedContext::inner_pos
int64_t inner_pos
Definition: shared.c:144
AV_CRC_16_ANSI
@ AV_CRC_16_ANSI
Definition: crc.h:50
BLOCK_NONE
@ BLOCK_NONE
block is not cached
Definition: shared.c:83
fail
#define fail
Definition: test.h:478
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
SharedContext::map_path
char * map_path
Definition: shared.c:168
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:76
NULL
#define NULL
Definition: coverity.c:32
av_hash_init
void av_hash_init(AVHashContext *ctx)
Initialize or reset a hash context.
Definition: hash.c:151
SharedContext
Definition: shared.c:141
av_fallthrough
#define av_fallthrough
Definition: attributes.h:67
av_default_item_name
const char * av_default_item_name(void *ptr)
Return the context name.
Definition: log.c:242
options
Definition: swscale.c:50
SharedContext::block_shift
int block_shift
requested shift; may disagree with actual
Definition: shared.c:148
spacemap_remap
static int spacemap_remap(URLContext *h, size_t map_size)
Definition: shared.c:369
time.h
av_hash_update
void av_hash_update(AVHashContext *ctx, const uint8_t *src, size_t len)
Update a hash context with additional data.
Definition: hash.c:172
attributes.h
atomic_load_explicit
#define atomic_load_explicit(object, order)
Definition: stdatomic.h:96
HEADER_MAGIC
#define HEADER_MAGIC
Definition: shared.c:78
av_hash_freep
void av_hash_freep(AVHashContext **ctx)
Free hash context and set hash context pointer to NULL.
Definition: hash.c:248
Block
Definition: flashsv2enc.c:70
SharedContext::cache_size
off_t cache_size
size of mapped memory region (for munmap)
Definition: shared.c:163
shift
static int shift(int a, int b)
Definition: bonk.c:261
i
#define i(width, name, range_min, range_max)
Definition: cbs_h264.c:63
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:122
size
int size
Definition: twinvq_data.h:10344
URLProtocol::name
const char * name
Definition: url.h:52
av_hash_final
void av_hash_final(AVHashContext *ctx, uint8_t *dst)
Finalize a hash context and compute the actual hash value.
Definition: hash.c:193
shared_get_short_seek
static int shared_get_short_seek(URLContext *h)
Definition: shared.c:843
SharedContext::mapfd
int mapfd
Definition: shared.c:170
av_crc_get_table
const AVCRC * av_crc_get_table(AVCRCId crc_id)
Get an initialized standard CRC table.
Definition: crc.c:389
offset
it s the only field you need to keep assuming you have a context There is some magic you don t need to care about around this just let it vf offset
Definition: writing_filters.txt:86
AVHashContext
Definition: hash.c:66
av_strstart
int av_strstart(const char *str, const char *pfx, const char **ptr)
Return non-zero if pfx is a prefix of str.
Definition: avstring.c:36
spacemap_grow
static int spacemap_grow(URLContext *h, int64_t block)
Definition: shared.c:439
atomic_uchar
intptr_t atomic_uchar
Definition: stdatomic.h:52
filesize
static int64_t filesize(AVIOContext *pb)
Definition: ffmpeg_mux.c:51
URLContext
Definition: url.h:35
av_malloc
#define av_malloc(s)
Definition: ops_asmgen.c:44
shared_seek
static int64_t shared_seek(URLContext *h, int64_t pos, int whence)
Definition: shared.c:795
av_assert1
#define av_assert1(cond)
assert() equivalent, that does not lie in speed critical code.
Definition: avassert.h:58
atomic_store_explicit
#define atomic_store_explicit(object, desired, order)
Definition: stdatomic.h:90
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
url.h
atomic_ullong
intptr_t atomic_ullong
Definition: stdatomic.h:60
SharedContext::cache_data
uint8_t * cache_data
optional mmap of the cache file
Definition: shared.c:161
ret
ret
Definition: filter_design.txt:187
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:81
pos
unsigned int pos
Definition: spdifenc.c:414
options
static const AVOption options[]
Definition: shared.c:855
hash.h
SharedContext::block_size
int block_size
Definition: shared.c:157
av_crc
uint32_t av_crc(const AVCRC *ctx, uint32_t crc, const uint8_t *buffer, size_t length)
Calculate the CRC of a block.
Definition: crc.c:421
SharedContext::timeout
int64_t timeout
Definition: shared.c:150
shared_open
static int shared_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
Definition: shared.c:232
AV_OPT_TYPE_INT
@ AV_OPT_TYPE_INT
Underlying C type is int.
Definition: opt.h:259
av_hash_get_size
int av_hash_get_size(const AVHashContext *ctx)
Definition: hash.c:109
atomic_uint
intptr_t atomic_uint
Definition: stdatomic.h:56
HASH_METHOD
#define HASH_METHOD
This hash should be resistant against collision attacks, so that an attacker could not generate e....
Definition: shared.c:57
HASH_SIZE
#define HASH_SIZE
Definition: shared.c:58
mem.h
Block::state
atomic_ushort state
Definition: shared.c:107
cache_map
static int cache_map(URLContext *h, int64_t filesize)
Definition: shared.c:333
Spacemap
Definition: shared.c:110
SharedContext::fd
int fd
Definition: shared.c:164
FFALIGN
#define FFALIGN(x, a)
Definition: macros.h:78
SharedContext::inner
URLContext * inner
Definition: shared.c:143
AV_OPT_TYPE_BOOL
@ AV_OPT_TYPE_BOOL
Underlying C type is int.
Definition: opt.h:327
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:35
Spacemap::blocks
Block blocks[]
Definition: shared.c:118
SharedContext::pos
int64_t pos
current logical position
Definition: shared.c:155
OFFSET
#define OFFSET(x)
Definition: shared.c:852
Spacemap::reserved
char reserved[80]
Definition: shared.c:116
block
The exact code depends on how similar the blocks are and how related they are to the block
Definition: filter_design.txt:207
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
ffurl_size
int64_t ffurl_size(URLContext *h)
Return the filesize of the resource accessed by h, AVERROR(ENOSYS) if the operation is not supported ...
Definition: avio.c:805
h
h
Definition: vp9dsp_template.c:2070
avstring.h
shared_read
static int shared_read(URLContext *h, unsigned char *buf, int size)
Definition: shared.c:576
AV_OPT_TYPE_STRING
@ AV_OPT_TYPE_STRING
Underlying C type is a uint8_t* that is either NULL or points to a C string allocated with the av_mal...
Definition: opt.h:276
clamp_size
static size_t clamp_size(URLContext *h, size_t size, int64_t pos)
Definition: shared.c:565
Spacemap::block_shift
atomic_ushort block_shift
Definition: shared.c:113
ffurl_get_file_handle
int ffurl_get_file_handle(URLContext *h)
Return the file descriptor associated with this URL.
Definition: avio.c:820
ffurl_read
static int ffurl_read(URLContext *h, uint8_t *buf, int size)
Read up to size bytes from the resource accessed by h, and store the read bytes in buf.
Definition: url.h:181