FFmpeg
ffmpeg_sched.c
Go to the documentation of this file.
1 /*
2  * Inter-thread scheduling/synchronization.
3  * Copyright (c) 2023 Anton Khirnov
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25 
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31 
32 #include "libavcodec/packet.h"
33 
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
41 #include "libavutil/time.h"
42 
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46 
47 enum QueueType {
50 };
51 
52 typedef struct SchWaiter {
56 
57  // the following are internal state of schedule_update_locked() and must not
58  // be accessed outside of it
61 } SchWaiter;
62 
63 typedef struct SchTask {
66 
68  void *func_arg;
69 
72 } SchTask;
73 
74 typedef struct SchDecOutput {
76  uint8_t *dst_finished;
77  unsigned nb_dst;
78 } SchDecOutput;
79 
80 typedef struct SchDec {
81  const AVClass *class;
82 
84 
86  unsigned nb_outputs;
87 
89  // Queue for receiving input packets, one stream.
91 
92  // Queue for sending post-flush end timestamps back to the source
95 
96  // temporary storage used by sch_dec_send()
98 } SchDec;
99 
100 typedef struct SchSyncQueue {
104 
105  unsigned *enc_idx;
106  unsigned nb_enc_idx;
107 } SchSyncQueue;
108 
109 typedef struct SchEnc {
110  const AVClass *class;
111 
114  uint8_t *dst_finished;
115  unsigned nb_dst;
116 
117  // [0] - index of the sync queue in Scheduler.sq_enc,
118  // [1] - index of this encoder in the sq
119  int sq_idx[2];
120 
121  /* Opening encoders is somewhat nontrivial due to their interaction with
122  * sync queues, which are (among other things) responsible for maintaining
123  * constant audio frame size, when it is required by the encoder.
124  *
125  * Opening the encoder requires stream parameters, obtained from the first
126  * frame. However, that frame cannot be properly chunked by the sync queue
127  * without knowing the required frame size, which is only available after
128  * opening the encoder.
129  *
130  * This apparent circular dependency is resolved in the following way:
131  * - the caller creating the encoder gives us a callback which opens the
132  * encoder and returns the required frame size (if any)
133  * - when the first frame is sent to the encoder, the sending thread
134  * - calls this callback, opening the encoder
135  * - passes the returned frame size to the sync queue
136  */
137  int (*open_cb)(void *opaque, const AVFrame *frame);
138  int opened;
139 
141  // Queue for receiving input frames, one stream.
143  // tq_send() to queue returned EOF
145 
146  // temporary storage used by sch_enc_send()
148 } SchEnc;
149 
150 typedef struct SchDemuxStream {
152  uint8_t *dst_finished;
153  unsigned nb_dst;
155 
156 typedef struct SchDemux {
157  const AVClass *class;
158 
160  unsigned nb_streams;
161 
164 
165  // temporary storage used by sch_demux_send()
167 
168  // protected by schedule_lock
170 } SchDemux;
171 
172 typedef struct PreMuxQueue {
173  /**
174  * Queue for buffering the packets before the muxer task can be started.
175  */
177  /**
178  * Maximum number of packets in fifo.
179  */
181  /*
182  * The size of the AVPackets' buffers in queue.
183  * Updated when a packet is either pushed or pulled from the queue.
184  */
185  size_t data_size;
186  /* Threshold after which max_packets will be in effect */
188 } PreMuxQueue;
189 
190 typedef struct SchMuxStream {
192 
193  unsigned *sub_heartbeat_dst;
195 
197 
198  // an EOF was generated while flushing the pre-mux queue
199  int init_eof;
200 
201  ////////////////////////////////////////////////////////////
202  // The following are protected by Scheduler.schedule_lock //
203 
204  /* dts+duration of the last packet sent to this stream
205  in AV_TIME_BASE_Q */
207  // this stream no longer accepts input
209  ////////////////////////////////////////////////////////////
210 } SchMuxStream;
211 
212 typedef struct SchMux {
213  const AVClass *class;
214 
216  unsigned nb_streams;
218 
219  int (*init)(void *arg);
220 
222  /**
223  * Set to 1 after starting the muxer task and flushing the
224  * pre-muxing queues.
225  * Set either before any tasks have started, or with
226  * Scheduler.mux_ready_lock held.
227  */
230  unsigned queue_size;
231 
233 } SchMux;
234 
235 typedef struct SchFilterIn {
239 } SchFilterIn;
240 
241 typedef struct SchFilterOut {
243 } SchFilterOut;
244 
245 typedef struct SchFilterGraph {
246  const AVClass *class;
247 
249  unsigned nb_inputs;
252 
254  unsigned nb_outputs;
255 
257  // input queue, nb_inputs+1 streams
258  // last stream is control
261 
262  // protected by schedule_lock
263  unsigned best_input;
266 
271 };
272 
273 struct Scheduler {
274  const AVClass *class;
275 
277  unsigned nb_demux;
278 
280  unsigned nb_mux;
281 
282  unsigned nb_mux_ready;
284 
285  unsigned nb_mux_done;
286  unsigned task_failed;
289 
290 
292  unsigned nb_dec;
293 
295  unsigned nb_enc;
296 
298  unsigned nb_sq_enc;
299 
301  unsigned nb_filters;
302 
304  int sdp_auto;
305 
308 
310 
312 };
313 
314 /**
315  * Wait until this task is allowed to proceed.
316  *
317  * @retval 0 the caller should proceed
318  * @retval 1 the caller should terminate
319  */
320 static int waiter_wait(Scheduler *sch, SchWaiter *w)
321 {
322  int terminate;
323 
324  if (!atomic_load(&w->choked))
325  return 0;
326 
327  pthread_mutex_lock(&w->lock);
328 
329  while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
330  pthread_cond_wait(&w->cond, &w->lock);
331 
332  terminate = atomic_load(&sch->terminate);
333 
334  pthread_mutex_unlock(&w->lock);
335 
336  return terminate;
337 }
338 
339 static void waiter_set(SchWaiter *w, int choked)
340 {
341  pthread_mutex_lock(&w->lock);
342 
343  atomic_store(&w->choked, choked);
344  pthread_cond_signal(&w->cond);
345 
346  pthread_mutex_unlock(&w->lock);
347 }
348 
349 static int waiter_init(SchWaiter *w)
350 {
351  int ret;
352 
353  atomic_init(&w->choked, 0);
354 
355  ret = pthread_mutex_init(&w->lock, NULL);
356  if (ret)
357  return AVERROR(ret);
358 
359  ret = pthread_cond_init(&w->cond, NULL);
360  if (ret)
361  return AVERROR(ret);
362 
363  return 0;
364 }
365 
366 static void waiter_uninit(SchWaiter *w)
367 {
368  pthread_mutex_destroy(&w->lock);
369  pthread_cond_destroy(&w->cond);
370 }
371 
372 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
373  enum QueueType type)
374 {
375  ThreadQueue *tq;
376 
377  if (queue_size <= 0) {
378  if (type == QUEUE_FRAMES)
379  queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
380  else
382  }
383 
384  if (type == QUEUE_FRAMES) {
385  // This queue length is used in the decoder code to ensure that
386  // there are enough entries in fixed-size frame pools to account
387  // for frames held in queues inside the ffmpeg utility. If this
388  // can ever dynamically change then the corresponding decode
389  // code needs to be updated as well.
391  }
392 
393  tq = tq_alloc(nb_streams, queue_size,
395  if (!tq)
396  return AVERROR(ENOMEM);
397 
398  *ptq = tq;
399  return 0;
400 }
401 
402 static void *task_wrapper(void *arg);
403 
404 static int task_start(SchTask *task)
405 {
406  int ret;
407 
408  if (!task->parent)
409  return 0;
410 
411  av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
412 
413  av_assert0(!task->thread_running);
414 
415  ret = pthread_create(&task->thread, NULL, task_wrapper, task);
416  if (ret) {
417  av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
418  strerror(ret));
419  return AVERROR(ret);
420  }
421 
422  task->thread_running = 1;
423  return 0;
424 }
425 
426 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
427  SchThreadFunc func, void *func_arg)
428 {
429  task->parent = sch;
430 
431  task->node.type = type;
432  task->node.idx = idx;
433 
434  task->func = func;
435  task->func_arg = func_arg;
436 }
437 
438 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
439 {
440  int64_t min_dts = INT64_MAX;
441 
442  for (unsigned i = 0; i < sch->nb_mux; i++) {
443  const SchMux *mux = &sch->mux[i];
444 
445  for (unsigned j = 0; j < mux->nb_streams; j++) {
446  const SchMuxStream *ms = &mux->streams[j];
447 
448  if (ms->source_finished && !count_finished)
449  continue;
450  if (ms->last_dts == AV_NOPTS_VALUE)
451  return AV_NOPTS_VALUE;
452 
453  min_dts = FFMIN(min_dts, ms->last_dts);
454  }
455  }
456 
457  return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
458 }
459 
461 {
462  SchFilterGraph *fg = &sch->filters[idx];
463 
465  memset(&fg->task, 0, sizeof(fg->task));
466 
467  tq_free(&fg->queue);
468 
469  av_freep(&fg->inputs);
470  fg->nb_inputs = 0;
471  av_freep(&fg->outputs);
472  fg->nb_outputs = 0;
473 
474  fg->task_exited = 1;
475 }
476 
477 void sch_free(Scheduler **psch)
478 {
479  Scheduler *sch = *psch;
480 
481  if (!sch)
482  return;
483 
484  sch_stop(sch, NULL);
485 
486  for (unsigned i = 0; i < sch->nb_demux; i++) {
487  SchDemux *d = &sch->demux[i];
488 
489  for (unsigned j = 0; j < d->nb_streams; j++) {
490  SchDemuxStream *ds = &d->streams[j];
491  av_freep(&ds->dst);
492  av_freep(&ds->dst_finished);
493  }
494  av_freep(&d->streams);
495 
497 
498  waiter_uninit(&d->waiter);
499  }
500  av_freep(&sch->demux);
501 
502  for (unsigned i = 0; i < sch->nb_mux; i++) {
503  SchMux *mux = &sch->mux[i];
504 
505  for (unsigned j = 0; j < mux->nb_streams; j++) {
506  SchMuxStream *ms = &mux->streams[j];
507 
508  if (ms->pre_mux_queue.fifo) {
509  AVPacket *pkt;
510  while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
513  }
514 
516  }
517  av_freep(&mux->streams);
518 
520 
521  tq_free(&mux->queue);
522  }
523  av_freep(&sch->mux);
524 
525  for (unsigned i = 0; i < sch->nb_dec; i++) {
526  SchDec *dec = &sch->dec[i];
527 
528  tq_free(&dec->queue);
529 
531 
532  for (unsigned j = 0; j < dec->nb_outputs; j++) {
533  SchDecOutput *o = &dec->outputs[j];
534 
535  av_freep(&o->dst);
536  av_freep(&o->dst_finished);
537  }
538 
539  av_freep(&dec->outputs);
540 
541  av_frame_free(&dec->send_frame);
542  }
543  av_freep(&sch->dec);
544 
545  for (unsigned i = 0; i < sch->nb_enc; i++) {
546  SchEnc *enc = &sch->enc[i];
547 
548  tq_free(&enc->queue);
549 
550  av_packet_free(&enc->send_pkt);
551 
552  av_freep(&enc->dst);
553  av_freep(&enc->dst_finished);
554  }
555  av_freep(&sch->enc);
556 
557  for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
558  SchSyncQueue *sq = &sch->sq_enc[i];
559  sq_free(&sq->sq);
560  av_frame_free(&sq->frame);
562  av_freep(&sq->enc_idx);
563  }
564  av_freep(&sch->sq_enc);
565 
566  for (unsigned i = 0; i < sch->nb_filters; i++) {
567  SchFilterGraph *fg = &sch->filters[i];
568 
569  tq_free(&fg->queue);
570 
571  av_freep(&fg->inputs);
572  av_freep(&fg->outputs);
573 
574  waiter_uninit(&fg->waiter);
575  }
576  av_freep(&sch->filters);
577 
578  av_freep(&sch->sdp_filename);
579 
581 
583 
586 
587  av_freep(psch);
588 }
589 
590 static const AVClass scheduler_class = {
591  .class_name = "Scheduler",
592  .version = LIBAVUTIL_VERSION_INT,
593 };
594 
596 {
597  Scheduler *sch;
598  int ret;
599 
600  sch = av_mallocz(sizeof(*sch));
601  if (!sch)
602  return NULL;
603 
604  sch->class = &scheduler_class;
605  sch->sdp_auto = 1;
606 
608  if (ret)
609  goto fail;
610 
612  if (ret)
613  goto fail;
614 
616  if (ret)
617  goto fail;
618 
620  if (ret)
621  goto fail;
622 
623  return sch;
624 fail:
625  sch_free(&sch);
626  return NULL;
627 }
628 
629 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
630 {
631  av_freep(&sch->sdp_filename);
632  sch->sdp_filename = av_strdup(sdp_filename);
633  return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
634 }
635 
636 static const AVClass sch_mux_class = {
637  .class_name = "SchMux",
638  .version = LIBAVUTIL_VERSION_INT,
639  .parent_log_context_offset = offsetof(SchMux, task.func_arg),
640 };
641 
642 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
643  void *arg, int sdp_auto, unsigned thread_queue_size)
644 {
645  const unsigned idx = sch->nb_mux;
646 
647  SchMux *mux;
648  int ret;
649 
650  ret = GROW_ARRAY(sch->mux, sch->nb_mux);
651  if (ret < 0)
652  return ret;
653 
654  mux = &sch->mux[idx];
655  mux->class = &sch_mux_class;
656  mux->init = init;
657  mux->queue_size = thread_queue_size;
658 
659  task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
660 
661  sch->sdp_auto &= sdp_auto;
662 
663  return idx;
664 }
665 
666 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
667 {
668  SchMux *mux;
669  SchMuxStream *ms;
670  unsigned stream_idx;
671  int ret;
672 
673  av_assert0(mux_idx < sch->nb_mux);
674  mux = &sch->mux[mux_idx];
675 
676  ret = GROW_ARRAY(mux->streams, mux->nb_streams);
677  if (ret < 0)
678  return ret;
679  stream_idx = mux->nb_streams - 1;
680 
681  ms = &mux->streams[stream_idx];
682 
683  ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
684  if (!ms->pre_mux_queue.fifo)
685  return AVERROR(ENOMEM);
686 
687  ms->last_dts = AV_NOPTS_VALUE;
688 
689  return stream_idx;
690 }
691 
692 static const AVClass sch_demux_class = {
693  .class_name = "SchDemux",
694  .version = LIBAVUTIL_VERSION_INT,
695  .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
696 };
697 
699 {
700  const unsigned idx = sch->nb_demux;
701 
702  SchDemux *d;
703  int ret;
704 
705  ret = GROW_ARRAY(sch->demux, sch->nb_demux);
706  if (ret < 0)
707  return ret;
708 
709  d = &sch->demux[idx];
710 
711  task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
712 
713  d->class = &sch_demux_class;
714  d->send_pkt = av_packet_alloc();
715  if (!d->send_pkt)
716  return AVERROR(ENOMEM);
717 
718  ret = waiter_init(&d->waiter);
719  if (ret < 0)
720  return ret;
721 
722  return idx;
723 }
724 
725 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
726 {
727  SchDemux *d;
728  int ret;
729 
730  av_assert0(demux_idx < sch->nb_demux);
731  d = &sch->demux[demux_idx];
732 
733  ret = GROW_ARRAY(d->streams, d->nb_streams);
734  return ret < 0 ? ret : d->nb_streams - 1;
735 }
736 
737 int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
738 {
739  SchDec *dec;
740  int ret;
741 
742  av_assert0(dec_idx < sch->nb_dec);
743  dec = &sch->dec[dec_idx];
744 
745  ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
746  if (ret < 0)
747  return ret;
748 
749  return dec->nb_outputs - 1;
750 }
751 
752 static const AVClass sch_dec_class = {
753  .class_name = "SchDec",
754  .version = LIBAVUTIL_VERSION_INT,
755  .parent_log_context_offset = offsetof(SchDec, task.func_arg),
756 };
757 
758 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
759 {
760  const unsigned idx = sch->nb_dec;
761 
762  SchDec *dec;
763  int ret;
764 
765  ret = GROW_ARRAY(sch->dec, sch->nb_dec);
766  if (ret < 0)
767  return ret;
768 
769  dec = &sch->dec[idx];
770 
771  task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
772 
773  dec->class = &sch_dec_class;
774  dec->send_frame = av_frame_alloc();
775  if (!dec->send_frame)
776  return AVERROR(ENOMEM);
777 
778  ret = sch_add_dec_output(sch, idx);
779  if (ret < 0)
780  return ret;
781 
782  ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
783  if (ret < 0)
784  return ret;
785 
786  if (send_end_ts) {
788  if (ret < 0)
789  return ret;
790  }
791 
792  return idx;
793 }
794 
795 static const AVClass sch_enc_class = {
796  .class_name = "SchEnc",
797  .version = LIBAVUTIL_VERSION_INT,
798  .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
799 };
800 
802  int (*open_cb)(void *opaque, const AVFrame *frame))
803 {
804  const unsigned idx = sch->nb_enc;
805 
806  SchEnc *enc;
807  int ret;
808 
809  ret = GROW_ARRAY(sch->enc, sch->nb_enc);
810  if (ret < 0)
811  return ret;
812 
813  enc = &sch->enc[idx];
814 
815  enc->class = &sch_enc_class;
816  enc->open_cb = open_cb;
817  enc->sq_idx[0] = -1;
818  enc->sq_idx[1] = -1;
819 
820  task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
821 
822  enc->send_pkt = av_packet_alloc();
823  if (!enc->send_pkt)
824  return AVERROR(ENOMEM);
825 
826  ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
827  if (ret < 0)
828  return ret;
829 
830  return idx;
831 }
832 
833 static const AVClass sch_fg_class = {
834  .class_name = "SchFilterGraph",
835  .version = LIBAVUTIL_VERSION_INT,
836  .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
837 };
838 
839 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
840  SchThreadFunc func, void *ctx)
841 {
842  const unsigned idx = sch->nb_filters;
843 
844  SchFilterGraph *fg;
845  int ret;
846 
847  ret = GROW_ARRAY(sch->filters, sch->nb_filters);
848  if (ret < 0)
849  return ret;
850  fg = &sch->filters[idx];
851 
852  fg->class = &sch_fg_class;
853 
854  task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
855 
856  if (nb_inputs) {
857  fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
858  if (!fg->inputs)
859  return AVERROR(ENOMEM);
860  fg->nb_inputs = nb_inputs;
861  }
862 
863  if (nb_outputs) {
864  fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
865  if (!fg->outputs)
866  return AVERROR(ENOMEM);
867  fg->nb_outputs = nb_outputs;
868  }
869 
870  ret = waiter_init(&fg->waiter);
871  if (ret < 0)
872  return ret;
873 
874  ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
875  if (ret < 0)
876  return ret;
877 
878  return idx;
879 }
880 
881 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
882 {
883  SchSyncQueue *sq;
884  int ret;
885 
886  ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
887  if (ret < 0)
888  return ret;
889  sq = &sch->sq_enc[sch->nb_sq_enc - 1];
890 
891  sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
892  if (!sq->sq)
893  return AVERROR(ENOMEM);
894 
895  sq->frame = av_frame_alloc();
896  if (!sq->frame)
897  return AVERROR(ENOMEM);
898 
899  ret = pthread_mutex_init(&sq->lock, NULL);
900  if (ret)
901  return AVERROR(ret);
902 
903  return sq - sch->sq_enc;
904 }
905 
906 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
907  int limiting, uint64_t max_frames)
908 {
909  SchSyncQueue *sq;
910  SchEnc *enc;
911  int ret;
912 
913  av_assert0(sq_idx < sch->nb_sq_enc);
914  sq = &sch->sq_enc[sq_idx];
915 
916  av_assert0(enc_idx < sch->nb_enc);
917  enc = &sch->enc[enc_idx];
918 
919  ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
920  if (ret < 0)
921  return ret;
922  sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
923 
924  ret = sq_add_stream(sq->sq, limiting);
925  if (ret < 0)
926  return ret;
927 
928  enc->sq_idx[0] = sq_idx;
929  enc->sq_idx[1] = ret;
930 
931  if (max_frames != INT64_MAX)
932  sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
933 
934  return 0;
935 }
936 
938 {
939  int ret;
940 
941  switch (src.type) {
942  case SCH_NODE_TYPE_DEMUX: {
943  SchDemuxStream *ds;
944 
945  av_assert0(src.idx < sch->nb_demux &&
946  src.idx_stream < sch->demux[src.idx].nb_streams);
947  ds = &sch->demux[src.idx].streams[src.idx_stream];
948 
949  ret = GROW_ARRAY(ds->dst, ds->nb_dst);
950  if (ret < 0)
951  return ret;
952 
953  ds->dst[ds->nb_dst - 1] = dst;
954 
955  // demuxed packets go to decoding or streamcopy
956  switch (dst.type) {
957  case SCH_NODE_TYPE_DEC: {
958  SchDec *dec;
959 
960  av_assert0(dst.idx < sch->nb_dec);
961  dec = &sch->dec[dst.idx];
962 
963  av_assert0(!dec->src.type);
964  dec->src = src;
965  break;
966  }
967  case SCH_NODE_TYPE_MUX: {
968  SchMuxStream *ms;
969 
970  av_assert0(dst.idx < sch->nb_mux &&
971  dst.idx_stream < sch->mux[dst.idx].nb_streams);
972  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
973 
974  av_assert0(!ms->src.type);
975  ms->src = src;
976 
977  break;
978  }
979  default: av_assert0(0);
980  }
981 
982  break;
983  }
984  case SCH_NODE_TYPE_DEC: {
985  SchDec *dec;
986  SchDecOutput *o;
987 
988  av_assert0(src.idx < sch->nb_dec);
989  dec = &sch->dec[src.idx];
990 
991  av_assert0(src.idx_stream < dec->nb_outputs);
992  o = &dec->outputs[src.idx_stream];
993 
994  ret = GROW_ARRAY(o->dst, o->nb_dst);
995  if (ret < 0)
996  return ret;
997 
998  o->dst[o->nb_dst - 1] = dst;
999 
1000  // decoded frames go to filters or encoding
1001  switch (dst.type) {
1002  case SCH_NODE_TYPE_FILTER_IN: {
1003  SchFilterIn *fi;
1004 
1005  av_assert0(dst.idx < sch->nb_filters &&
1006  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1007  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1008 
1009  av_assert0(!fi->src.type);
1010  fi->src = src;
1011  break;
1012  }
1013  case SCH_NODE_TYPE_ENC: {
1014  SchEnc *enc;
1015 
1016  av_assert0(dst.idx < sch->nb_enc);
1017  enc = &sch->enc[dst.idx];
1018 
1019  av_assert0(!enc->src.type);
1020  enc->src = src;
1021  break;
1022  }
1023  default: av_assert0(0);
1024  }
1025 
1026  break;
1027  }
1028  case SCH_NODE_TYPE_FILTER_OUT: {
1029  SchFilterOut *fo;
1030 
1031  av_assert0(src.idx < sch->nb_filters &&
1032  src.idx_stream < sch->filters[src.idx].nb_outputs);
1033  fo = &sch->filters[src.idx].outputs[src.idx_stream];
1034 
1035  av_assert0(!fo->dst.type);
1036  fo->dst = dst;
1037 
1038  // filtered frames go to encoding or another filtergraph
1039  switch (dst.type) {
1040  case SCH_NODE_TYPE_ENC: {
1041  SchEnc *enc;
1042 
1043  av_assert0(dst.idx < sch->nb_enc);
1044  enc = &sch->enc[dst.idx];
1045 
1046  av_assert0(!enc->src.type);
1047  enc->src = src;
1048  break;
1049  }
1050  case SCH_NODE_TYPE_FILTER_IN: {
1051  SchFilterIn *fi;
1052 
1053  av_assert0(dst.idx < sch->nb_filters &&
1054  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1055  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1056 
1057  av_assert0(!fi->src.type);
1058  fi->src = src;
1059  break;
1060  }
1061  default: av_assert0(0);
1062  }
1063 
1064 
1065  break;
1066  }
1067  case SCH_NODE_TYPE_ENC: {
1068  SchEnc *enc;
1069 
1070  av_assert0(src.idx < sch->nb_enc);
1071  enc = &sch->enc[src.idx];
1072 
1073  ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1074  if (ret < 0)
1075  return ret;
1076 
1077  enc->dst[enc->nb_dst - 1] = dst;
1078 
1079  // encoding packets go to muxing or decoding
1080  switch (dst.type) {
1081  case SCH_NODE_TYPE_MUX: {
1082  SchMuxStream *ms;
1083 
1084  av_assert0(dst.idx < sch->nb_mux &&
1085  dst.idx_stream < sch->mux[dst.idx].nb_streams);
1086  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1087 
1088  av_assert0(!ms->src.type);
1089  ms->src = src;
1090 
1091  break;
1092  }
1093  case SCH_NODE_TYPE_DEC: {
1094  SchDec *dec;
1095 
1096  av_assert0(dst.idx < sch->nb_dec);
1097  dec = &sch->dec[dst.idx];
1098 
1099  av_assert0(!dec->src.type);
1100  dec->src = src;
1101 
1102  break;
1103  }
1104  default: av_assert0(0);
1105  }
1106 
1107  break;
1108  }
1109  default: av_assert0(0);
1110  }
1111 
1112  return 0;
1113 }
1114 
1115 static int mux_task_start(SchMux *mux)
1116 {
1117  int ret = 0;
1118 
1119  ret = task_start(&mux->task);
1120  if (ret < 0)
1121  return ret;
1122 
1123  /* flush the pre-muxing queues */
1124  while (1) {
1125  int min_stream = -1;
1126  Timestamp min_ts = { .ts = AV_NOPTS_VALUE };
1127 
1128  AVPacket *pkt;
1129 
1130  // find the stream with the earliest dts or EOF in pre-muxing queue
1131  for (unsigned i = 0; i < mux->nb_streams; i++) {
1132  SchMuxStream *ms = &mux->streams[i];
1133 
1134  if (av_fifo_peek(ms->pre_mux_queue.fifo, &pkt, 1, 0) < 0)
1135  continue;
1136 
1137  if (!pkt || pkt->dts == AV_NOPTS_VALUE) {
1138  min_stream = i;
1139  break;
1140  }
1141 
1142  if (min_ts.ts == AV_NOPTS_VALUE ||
1143  av_compare_ts(min_ts.ts, min_ts.tb, pkt->dts, pkt->time_base) > 0) {
1144  min_stream = i;
1145  min_ts = (Timestamp){ .ts = pkt->dts, .tb = pkt->time_base };
1146  }
1147  }
1148 
1149  if (min_stream >= 0) {
1150  SchMuxStream *ms = &mux->streams[min_stream];
1151 
1152  ret = av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1);
1153  av_assert0(ret >= 0);
1154 
1155  if (pkt) {
1156  if (!ms->init_eof)
1157  ret = tq_send(mux->queue, min_stream, pkt);
1158  av_packet_free(&pkt);
1159  if (ret == AVERROR_EOF)
1160  ms->init_eof = 1;
1161  else if (ret < 0)
1162  return ret;
1163  } else
1164  tq_send_finish(mux->queue, min_stream);
1165 
1166  continue;
1167  }
1168 
1169  break;
1170  }
1171 
1172  atomic_store(&mux->mux_started, 1);
1173 
1174  return 0;
1175 }
1176 
1177 int print_sdp(const char *filename);
1178 
1179 static int mux_init(Scheduler *sch, SchMux *mux)
1180 {
1181  int ret;
1182 
1183  ret = mux->init(mux->task.func_arg);
1184  if (ret < 0)
1185  return ret;
1186 
1187  sch->nb_mux_ready++;
1188 
1189  if (sch->sdp_filename || sch->sdp_auto) {
1190  if (sch->nb_mux_ready < sch->nb_mux)
1191  return 0;
1192 
1193  ret = print_sdp(sch->sdp_filename);
1194  if (ret < 0) {
1195  av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1196  return ret;
1197  }
1198 
1199  /* SDP is written only after all the muxers are ready, so now we
1200  * start ALL the threads */
1201  for (unsigned i = 0; i < sch->nb_mux; i++) {
1202  ret = mux_task_start(&sch->mux[i]);
1203  if (ret < 0)
1204  return ret;
1205  }
1206  } else {
1207  ret = mux_task_start(mux);
1208  if (ret < 0)
1209  return ret;
1210  }
1211 
1212  return 0;
1213 }
1214 
1215 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1216  size_t data_threshold, int max_packets)
1217 {
1218  SchMux *mux;
1219  SchMuxStream *ms;
1220 
1221  av_assert0(mux_idx < sch->nb_mux);
1222  mux = &sch->mux[mux_idx];
1223 
1224  av_assert0(stream_idx < mux->nb_streams);
1225  ms = &mux->streams[stream_idx];
1226 
1227  ms->pre_mux_queue.max_packets = max_packets;
1228  ms->pre_mux_queue.data_threshold = data_threshold;
1229 }
1230 
1231 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1232 {
1233  SchMux *mux;
1234  int ret = 0;
1235 
1236  av_assert0(mux_idx < sch->nb_mux);
1237  mux = &sch->mux[mux_idx];
1238 
1239  av_assert0(stream_idx < mux->nb_streams);
1240 
1242 
1243  av_assert0(mux->nb_streams_ready < mux->nb_streams);
1244 
1245  // this may be called during initialization - do not start
1246  // threads before sch_start() is called
1247  if (++mux->nb_streams_ready == mux->nb_streams &&
1248  sch->state >= SCH_STATE_STARTED)
1249  ret = mux_init(sch, mux);
1250 
1252 
1253  return ret;
1254 }
1255 
1256 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1257  unsigned dec_idx)
1258 {
1259  SchMux *mux;
1260  SchMuxStream *ms;
1261  int ret = 0;
1262 
1263  av_assert0(mux_idx < sch->nb_mux);
1264  mux = &sch->mux[mux_idx];
1265 
1266  av_assert0(stream_idx < mux->nb_streams);
1267  ms = &mux->streams[stream_idx];
1268 
1270  if (ret < 0)
1271  return ret;
1272 
1273  av_assert0(dec_idx < sch->nb_dec);
1274  ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1275 
1276  if (!mux->sub_heartbeat_pkt) {
1278  if (!mux->sub_heartbeat_pkt)
1279  return AVERROR(ENOMEM);
1280  }
1281 
1282  return 0;
1283 }
1284 
1285 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src);
1286 
1287 // Unchoke any filter graphs that are downstream of this node, to prevent it
1288 // from getting stuck trying to push data to a full queue
1290 {
1291  SchFilterGraph *fg;
1292  SchDec *dec;
1293  SchEnc *enc;
1294  switch (dst->type) {
1295  case SCH_NODE_TYPE_DEC:
1296  dec = &sch->dec[dst->idx];
1297  for (int i = 0; i < dec->nb_outputs; i++)
1298  unchoke_downstream(sch, dec->outputs[i].dst);
1299  break;
1300  case SCH_NODE_TYPE_ENC:
1301  enc = &sch->enc[dst->idx];
1302  for (int i = 0; i < enc->nb_dst; i++)
1303  unchoke_downstream(sch, &enc->dst[i]);
1304  break;
1305  case SCH_NODE_TYPE_MUX:
1306  // muxers are never choked
1307  break;
1309  fg = &sch->filters[dst->idx];
1310  if (fg->best_input == fg->nb_inputs) {
1311  fg->waiter.choked_next = 0;
1312  } else {
1313  // ensure that this filter graph is not stuck waiting for
1314  // input from a different upstream demuxer
1315  unchoke_for_stream(sch, fg->inputs[fg->best_input].src);
1316  }
1317  break;
1318  default:
1319  av_unreachable("Invalid destination node type?");
1320  break;
1321  }
1322 }
1323 
1325 {
1326  while (1) {
1327  SchFilterGraph *fg;
1328  SchDemux *demux;
1329  switch (src.type) {
1330  case SCH_NODE_TYPE_DEMUX:
1331  // fed directly by a demuxer (i.e. not through a filtergraph)
1332  demux = &sch->demux[src.idx];
1333  if (demux->waiter.choked_next == 0)
1334  return; // prevent infinite loop
1335  demux->waiter.choked_next = 0;
1336  for (int i = 0; i < demux->nb_streams; i++)
1337  unchoke_downstream(sch, demux->streams[i].dst);
1338  return;
1339  case SCH_NODE_TYPE_DEC:
1340  src = sch->dec[src.idx].src;
1341  continue;
1342  case SCH_NODE_TYPE_ENC:
1343  src = sch->enc[src.idx].src;
1344  continue;
1346  fg = &sch->filters[src.idx];
1347  // the filtergraph contains internal sources and
1348  // requested to be scheduled directly
1349  if (fg->best_input == fg->nb_inputs) {
1350  fg->waiter.choked_next = 0;
1351  return;
1352  }
1353  src = fg->inputs[fg->best_input].src;
1354  continue;
1355  default:
1356  av_unreachable("Invalid source node type?");
1357  return;
1358  }
1359  }
1360 }
1361 
1362 static void choke_demux(const Scheduler *sch, int demux_id, int choked)
1363 {
1364  av_assert1(demux_id < sch->nb_demux);
1365  SchDemux *demux = &sch->demux[demux_id];
1366 
1367  for (int i = 0; i < demux->nb_streams; i++) {
1368  SchedulerNode *dst = demux->streams[i].dst;
1369  SchFilterGraph *fg;
1370 
1371  switch (dst->type) {
1372  case SCH_NODE_TYPE_DEC:
1373  tq_choke(sch->dec[dst->idx].queue, choked);
1374  break;
1375  case SCH_NODE_TYPE_ENC:
1376  tq_choke(sch->enc[dst->idx].queue, choked);
1377  break;
1378  case SCH_NODE_TYPE_MUX:
1379  break;
1381  fg = &sch->filters[dst->idx];
1382  if (fg->nb_inputs == 1)
1383  tq_choke(fg->queue, choked);
1384  break;
1385  default:
1386  av_unreachable("Invalid destination node type?");
1387  break;
1388  }
1389  }
1390 }
1391 
1393 {
1394  int64_t dts;
1395  int have_unchoked = 0;
1396 
1397  // on termination request all waiters are choked,
1398  // we are not to unchoke them
1399  if (atomic_load(&sch->terminate))
1400  return;
1401 
1402  dts = trailing_dts(sch, 0);
1403 
1404  atomic_store(&sch->last_dts, dts);
1405 
1406  // initialize our internal state
1407  for (unsigned type = 0; type < 2; type++)
1408  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1409  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1410  w->choked_prev = atomic_load(&w->choked);
1411  w->choked_next = 1;
1412  }
1413 
1414  // figure out the sources that are allowed to proceed
1415  for (unsigned i = 0; i < sch->nb_mux; i++) {
1416  SchMux *mux = &sch->mux[i];
1417 
1418  for (unsigned j = 0; j < mux->nb_streams; j++) {
1419  SchMuxStream *ms = &mux->streams[j];
1420 
1421  // unblock sources for output streams that are not finished
1422  // and not too far ahead of the trailing stream
1423  if (ms->source_finished)
1424  continue;
1425  if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1426  continue;
1427  if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1428  continue;
1429 
1430  // resolve the source to unchoke
1431  unchoke_for_stream(sch, ms->src);
1432  have_unchoked = 1;
1433  }
1434  }
1435 
1436  // also unchoke any sources feeding into closed filter graph inputs, so
1437  // that they can observe the downstream EOF
1438  for (unsigned i = 0; i < sch->nb_filters; i++) {
1439  SchFilterGraph *fg = &sch->filters[i];
1440 
1441  for (unsigned j = 0; j < fg->nb_inputs; j++) {
1442  SchFilterIn *fi = &fg->inputs[j];
1443  if (fi->receive_finished && !fi->send_finished)
1444  unchoke_for_stream(sch, fi->src);
1445  }
1446  }
1447 
1448  // make sure to unchoke at least one source, if still available
1449  for (unsigned type = 0; !have_unchoked && type < 2; type++)
1450  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1451  int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1452  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1453  if (!exited) {
1454  w->choked_next = 0;
1455  have_unchoked = 1;
1456  break;
1457  }
1458  }
1459 
1460  for (unsigned type = 0; type < 2; type++) {
1461  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1462  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1463  if (w->choked_prev != w->choked_next) {
1464  waiter_set(w, w->choked_next);
1465  if (!type)
1466  choke_demux(sch, i, w->choked_next);
1467  }
1468  }
1469  }
1470 
1471 }
1472 
1473 enum {
1477 };
1478 
1479 // Finds the filtergraph or muxer upstream of a scheduler node
1481 {
1482  while (1) {
1483  switch (src.type) {
1484  case SCH_NODE_TYPE_DEMUX:
1486  return src;
1487  case SCH_NODE_TYPE_DEC:
1488  src = sch->dec[src.idx].src;
1489  continue;
1490  case SCH_NODE_TYPE_ENC:
1491  src = sch->enc[src.idx].src;
1492  continue;
1493  default:
1494  av_unreachable("Invalid source node type?");
1495  return (SchedulerNode) {0};
1496  }
1497  }
1498 }
1499 
1500 static int
1502  uint8_t *filters_visited, SchedulerNode *filters_stack)
1503 {
1504  unsigned nb_filters_stack = 0;
1505 
1506  memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1507 
1508  while (1) {
1509  const SchFilterGraph *fg = &sch->filters[src.idx];
1510 
1511  filters_visited[src.idx] = CYCLE_NODE_STARTED;
1512 
1513  // descend into every input, depth first
1514  if (src.idx_stream < fg->nb_inputs) {
1515  const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1516  SchedulerNode node = src_filtergraph(sch, fi->src);
1517 
1518  // connected to demuxer, no cycles possible
1519  if (node.type == SCH_NODE_TYPE_DEMUX)
1520  continue;
1521 
1522  // otherwise connected to another filtergraph
1524 
1525  // found a cycle
1526  if (filters_visited[node.idx] == CYCLE_NODE_STARTED)
1527  return AVERROR(EINVAL);
1528 
1529  // place current position on stack and descend
1530  av_assert0(nb_filters_stack < sch->nb_filters);
1531  filters_stack[nb_filters_stack++] = src;
1532  src = (SchedulerNode){ .idx = node.idx, .idx_stream = 0 };
1533  continue;
1534  }
1535 
1536  filters_visited[src.idx] = CYCLE_NODE_DONE;
1537 
1538  // previous search finished,
1539  if (nb_filters_stack) {
1540  src = filters_stack[--nb_filters_stack];
1541  continue;
1542  }
1543  return 0;
1544  }
1545 }
1546 
1547 static int check_acyclic(Scheduler *sch)
1548 {
1549  uint8_t *filters_visited = NULL;
1550  SchedulerNode *filters_stack = NULL;
1551 
1552  int ret = 0;
1553 
1554  if (!sch->nb_filters)
1555  return 0;
1556 
1557  filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1558  if (!filters_visited)
1559  return AVERROR(ENOMEM);
1560 
1561  filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1562  if (!filters_stack) {
1563  ret = AVERROR(ENOMEM);
1564  goto fail;
1565  }
1566 
1567  // trace the transcoding graph upstream from every filtegraph
1568  for (unsigned i = 0; i < sch->nb_filters; i++) {
1569  ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1570  filters_visited, filters_stack);
1571  if (ret < 0) {
1572  av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1573  goto fail;
1574  }
1575  }
1576 
1577 fail:
1578  av_freep(&filters_visited);
1579  av_freep(&filters_stack);
1580  return ret;
1581 }
1582 
1583 static int start_prepare(Scheduler *sch)
1584 {
1585  int ret;
1586 
1587  for (unsigned i = 0; i < sch->nb_demux; i++) {
1588  SchDemux *d = &sch->demux[i];
1589 
1590  for (unsigned j = 0; j < d->nb_streams; j++) {
1591  SchDemuxStream *ds = &d->streams[j];
1592 
1593  if (!ds->nb_dst) {
1594  av_log(d, AV_LOG_ERROR,
1595  "Demuxer stream %u not connected to any sink\n", j);
1596  return AVERROR(EINVAL);
1597  }
1598 
1599  ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1600  if (!ds->dst_finished)
1601  return AVERROR(ENOMEM);
1602  }
1603  }
1604 
1605  for (unsigned i = 0; i < sch->nb_dec; i++) {
1606  SchDec *dec = &sch->dec[i];
1607 
1608  if (!dec->src.type) {
1609  av_log(dec, AV_LOG_ERROR,
1610  "Decoder not connected to a source\n");
1611  return AVERROR(EINVAL);
1612  }
1613 
1614  for (unsigned j = 0; j < dec->nb_outputs; j++) {
1615  SchDecOutput *o = &dec->outputs[j];
1616 
1617  if (!o->nb_dst) {
1618  av_log(dec, AV_LOG_ERROR,
1619  "Decoder output %u not connected to any sink\n", j);
1620  return AVERROR(EINVAL);
1621  }
1622 
1623  o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
1624  if (!o->dst_finished)
1625  return AVERROR(ENOMEM);
1626  }
1627  }
1628 
1629  for (unsigned i = 0; i < sch->nb_enc; i++) {
1630  SchEnc *enc = &sch->enc[i];
1631 
1632  if (!enc->src.type) {
1633  av_log(enc, AV_LOG_ERROR,
1634  "Encoder not connected to a source\n");
1635  return AVERROR(EINVAL);
1636  }
1637  if (!enc->nb_dst) {
1638  av_log(enc, AV_LOG_ERROR,
1639  "Encoder not connected to any sink\n");
1640  return AVERROR(EINVAL);
1641  }
1642 
1643  enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1644  if (!enc->dst_finished)
1645  return AVERROR(ENOMEM);
1646  }
1647 
1648  for (unsigned i = 0; i < sch->nb_mux; i++) {
1649  SchMux *mux = &sch->mux[i];
1650 
1651  for (unsigned j = 0; j < mux->nb_streams; j++) {
1652  SchMuxStream *ms = &mux->streams[j];
1653 
1654  if (!ms->src.type) {
1655  av_log(mux, AV_LOG_ERROR,
1656  "Muxer stream #%u not connected to a source\n", j);
1657  return AVERROR(EINVAL);
1658  }
1659  }
1660 
1661  ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1662  QUEUE_PACKETS);
1663  if (ret < 0)
1664  return ret;
1665  }
1666 
1667  for (unsigned i = 0; i < sch->nb_filters; i++) {
1668  SchFilterGraph *fg = &sch->filters[i];
1669 
1670  for (unsigned j = 0; j < fg->nb_inputs; j++) {
1671  SchFilterIn *fi = &fg->inputs[j];
1672 
1673  if (!fi->src.type) {
1674  av_log(fg, AV_LOG_ERROR,
1675  "Filtergraph input %u not connected to a source\n", j);
1676  return AVERROR(EINVAL);
1677  }
1678  }
1679 
1680  for (unsigned j = 0; j < fg->nb_outputs; j++) {
1681  SchFilterOut *fo = &fg->outputs[j];
1682 
1683  if (!fo->dst.type) {
1684  av_log(fg, AV_LOG_ERROR,
1685  "Filtergraph %u output %u not connected to a sink\n", i, j);
1686  return AVERROR(EINVAL);
1687  }
1688  }
1689  }
1690 
1691  // Check that the transcoding graph has no cycles.
1692  ret = check_acyclic(sch);
1693  if (ret < 0)
1694  return ret;
1695 
1696  return 0;
1697 }
1698 
1700 {
1701  int ret;
1702 
1703  ret = start_prepare(sch);
1704  if (ret < 0)
1705  return ret;
1706 
1708  sch->state = SCH_STATE_STARTED;
1709 
1710  for (unsigned i = 0; i < sch->nb_mux; i++) {
1711  SchMux *mux = &sch->mux[i];
1712 
1713  if (mux->nb_streams_ready == mux->nb_streams) {
1714  ret = mux_init(sch, mux);
1715  if (ret < 0)
1716  goto fail;
1717  }
1718  }
1719 
1720  for (unsigned i = 0; i < sch->nb_enc; i++) {
1721  SchEnc *enc = &sch->enc[i];
1722 
1723  ret = task_start(&enc->task);
1724  if (ret < 0)
1725  goto fail;
1726  }
1727 
1728  for (unsigned i = 0; i < sch->nb_filters; i++) {
1729  SchFilterGraph *fg = &sch->filters[i];
1730 
1731  ret = task_start(&fg->task);
1732  if (ret < 0)
1733  goto fail;
1734  }
1735 
1736  for (unsigned i = 0; i < sch->nb_dec; i++) {
1737  SchDec *dec = &sch->dec[i];
1738 
1739  ret = task_start(&dec->task);
1740  if (ret < 0)
1741  goto fail;
1742  }
1743 
1744  for (unsigned i = 0; i < sch->nb_demux; i++) {
1745  SchDemux *d = &sch->demux[i];
1746 
1747  if (!d->nb_streams)
1748  continue;
1749 
1750  ret = task_start(&d->task);
1751  if (ret < 0)
1752  goto fail;
1753  }
1754 
1758 
1759  return 0;
1760 fail:
1761  sch_stop(sch, NULL);
1762  return ret;
1763 }
1764 
1765 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1766 {
1767  int ret;
1768 
1769  // convert delay to absolute timestamp
1770  timeout_us += av_gettime();
1771 
1773 
1774  if (sch->nb_mux_done < sch->nb_mux) {
1775  struct timespec tv = { .tv_sec = timeout_us / 1000000,
1776  .tv_nsec = (timeout_us % 1000000) * 1000 };
1777  pthread_cond_timedwait(&sch->finish_cond, &sch->finish_lock, &tv);
1778  }
1779 
1780  // abort transcoding if any task failed
1781  ret = sch->nb_mux_done == sch->nb_mux || sch->task_failed;
1782 
1784 
1785  *transcode_ts = atomic_load(&sch->last_dts);
1786 
1787  return ret;
1788 }
1789 
1790 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1791 {
1792  int ret;
1793 
1794  ret = enc->open_cb(enc->task.func_arg, frame);
1795  if (ret < 0)
1796  return ret;
1797 
1798  // ret>0 signals audio frame size, which means sync queue must
1799  // have been enabled during encoder creation
1800  if (ret > 0) {
1801  SchSyncQueue *sq;
1802 
1803  av_assert0(enc->sq_idx[0] >= 0);
1804  sq = &sch->sq_enc[enc->sq_idx[0]];
1805 
1806  pthread_mutex_lock(&sq->lock);
1807 
1808  sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1809 
1810  pthread_mutex_unlock(&sq->lock);
1811  }
1812 
1813  return 0;
1814 }
1815 
1817 {
1818  int ret;
1819 
1820  if (!frame) {
1821  tq_send_finish(enc->queue, 0);
1822  return 0;
1823  }
1824 
1825  if (enc->in_finished)
1826  return AVERROR_EOF;
1827 
1828  ret = tq_send(enc->queue, 0, frame);
1829  if (ret < 0)
1830  enc->in_finished = 1;
1831 
1832  return ret;
1833 }
1834 
1835 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1836 {
1837  SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1838  int ret = 0;
1839 
1840  // inform the scheduling code that no more input will arrive along this path;
1841  // this is necessary because the sync queue may not send an EOF downstream
1842  // until other streams finish
1843  // TODO: consider a cleaner way of passing this information through
1844  // the pipeline
1845  if (!frame) {
1846  for (unsigned i = 0; i < enc->nb_dst; i++) {
1847  SchMux *mux;
1848  SchMuxStream *ms;
1849 
1850  if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1851  continue;
1852 
1853  mux = &sch->mux[enc->dst[i].idx];
1854  ms = &mux->streams[enc->dst[i].idx_stream];
1855 
1857 
1858  ms->source_finished = 1;
1860 
1862  }
1863  }
1864 
1865  pthread_mutex_lock(&sq->lock);
1866 
1867  ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1868  if (ret < 0)
1869  goto finish;
1870 
1871  while (1) {
1872  SchEnc *enc;
1873 
1874  // TODO: the SQ API should be extended to allow returning EOF
1875  // for individual streams
1876  ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1877  if (ret < 0) {
1878  ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1879  break;
1880  }
1881 
1882  enc = &sch->enc[sq->enc_idx[ret]];
1883  ret = send_to_enc_thread(sch, enc, sq->frame);
1884  if (ret < 0) {
1885  av_frame_unref(sq->frame);
1886  if (ret != AVERROR_EOF)
1887  break;
1888 
1889  sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1890  continue;
1891  }
1892  }
1893 
1894  if (ret < 0) {
1895  // close all encoders fed from this sync queue
1896  for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1897  int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1898 
1899  // if the sync queue error is EOF and closing the encoder
1900  // produces a more serious error, make sure to pick the latter
1901  ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1902  }
1903  }
1904 
1905 finish:
1906  pthread_mutex_unlock(&sq->lock);
1907 
1908  return ret;
1909 }
1910 
1911 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1912 {
1913  if (enc->open_cb && frame && !enc->opened) {
1914  int ret = enc_open(sch, enc, frame);
1915  if (ret < 0)
1916  return ret;
1917  enc->opened = 1;
1918 
1919  // discard empty frames that only carry encoder init parameters
1920  if (!frame->buf[0]) {
1922  return 0;
1923  }
1924  }
1925 
1926  return (enc->sq_idx[0] >= 0) ?
1927  send_to_enc_sq (sch, enc, frame) :
1928  send_to_enc_thread(sch, enc, frame);
1929 }
1930 
1932 {
1933  PreMuxQueue *q = &ms->pre_mux_queue;
1934  AVPacket *tmp_pkt = NULL;
1935  int ret;
1936 
1937  if (!av_fifo_can_write(q->fifo)) {
1938  size_t packets = av_fifo_can_read(q->fifo);
1939  size_t pkt_size = pkt ? pkt->size : 0;
1940  int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1941  size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1942  size_t new_size = FFMIN(2 * packets, max_packets);
1943 
1944  if (new_size <= packets) {
1945  av_log(mux, AV_LOG_ERROR,
1946  "Too many packets buffered for output stream.\n");
1947  return AVERROR_BUFFER_TOO_SMALL;
1948  }
1949  ret = av_fifo_grow2(q->fifo, new_size - packets);
1950  if (ret < 0)
1951  return ret;
1952  }
1953 
1954  if (pkt) {
1955  tmp_pkt = av_packet_alloc();
1956  if (!tmp_pkt)
1957  return AVERROR(ENOMEM);
1958 
1959  av_packet_move_ref(tmp_pkt, pkt);
1960  q->data_size += tmp_pkt->size;
1961  }
1962  av_fifo_write(q->fifo, &tmp_pkt, 1);
1963 
1964  return 0;
1965 }
1966 
1967 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1968  AVPacket *pkt)
1969 {
1970  SchMuxStream *ms = &mux->streams[stream_idx];
1971  int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1974 
1975  // queue the packet if the muxer cannot be started yet
1976  if (!atomic_load(&mux->mux_started)) {
1977  int queued = 0;
1978 
1979  // the muxer could have started between the above atomic check and
1980  // locking the mutex, then this block falls through to normal send path
1982 
1983  if (!atomic_load(&mux->mux_started)) {
1984  int ret = mux_queue_packet(mux, ms, pkt);
1985  queued = ret < 0 ? ret : 1;
1986  }
1987 
1989 
1990  if (queued < 0)
1991  return queued;
1992  else if (queued)
1993  goto update_schedule;
1994  }
1995 
1996  if (pkt) {
1997  int ret;
1998 
1999  if (ms->init_eof)
2000  return AVERROR_EOF;
2001 
2002  ret = tq_send(mux->queue, stream_idx, pkt);
2003  if (ret < 0)
2004  return ret;
2005  } else
2006  tq_send_finish(mux->queue, stream_idx);
2007 
2008 update_schedule:
2009  // TODO: use atomics to check whether this changes trailing dts
2010  // to avoid locking unnecessarily
2011  if (dts != AV_NOPTS_VALUE || !pkt) {
2013 
2014  if (pkt) ms->last_dts = dts;
2015  else ms->source_finished = 1;
2016 
2018 
2020  }
2021 
2022  return 0;
2023 }
2024 
2025 static int
2027  uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
2028 {
2029  int ret;
2030 
2031  if (*dst_finished)
2032  return AVERROR_EOF;
2033 
2034  if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
2037  pkt = NULL;
2038  }
2039 
2040  if (!pkt)
2041  goto finish;
2042 
2043  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2044  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2045  tq_send(sch->dec[dst.idx].queue, 0, pkt);
2046  if (ret == AVERROR_EOF)
2047  goto finish;
2048 
2049  return ret;
2050 
2051 finish:
2052  if (dst.type == SCH_NODE_TYPE_MUX)
2053  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2054  else
2055  tq_send_finish(sch->dec[dst.idx].queue, 0);
2056 
2057  *dst_finished = 1;
2058  return AVERROR_EOF;
2059 }
2060 
2062  AVPacket *pkt, unsigned flags)
2063 {
2064  unsigned nb_done = 0;
2065 
2066  for (unsigned i = 0; i < ds->nb_dst; i++) {
2067  AVPacket *to_send = pkt;
2068  uint8_t *finished = &ds->dst_finished[i];
2069 
2070  int ret;
2071 
2072  // sending a packet consumes it, so make a temporary reference if needed
2073  if (pkt && i < ds->nb_dst - 1) {
2074  to_send = d->send_pkt;
2075 
2076  ret = av_packet_ref(to_send, pkt);
2077  if (ret < 0)
2078  return ret;
2079  }
2080 
2081  ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
2082  if (to_send)
2083  av_packet_unref(to_send);
2084  if (ret == AVERROR_EOF)
2085  nb_done++;
2086  else if (ret < 0)
2087  return ret;
2088  }
2089 
2090  return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
2091 }
2092 
2094 {
2095  Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
2096 
2097  av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
2098 
2099  for (unsigned i = 0; i < d->nb_streams; i++) {
2100  SchDemuxStream *ds = &d->streams[i];
2101 
2102  for (unsigned j = 0; j < ds->nb_dst; j++) {
2103  const SchedulerNode *dst = &ds->dst[j];
2104  SchDec *dec;
2105  int ret;
2106 
2107  if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
2108  continue;
2109 
2110  dec = &sch->dec[dst->idx];
2111 
2112  ret = tq_send(dec->queue, 0, pkt);
2113  if (ret < 0)
2114  return ret;
2115 
2116  if (dec->queue_end_ts) {
2117  Timestamp ts;
2119  if (ret < 0)
2120  return ret;
2121 
2122  if (max_end_ts.ts == AV_NOPTS_VALUE ||
2123  (ts.ts != AV_NOPTS_VALUE &&
2124  av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
2125  max_end_ts = ts;
2126 
2127  }
2128  }
2129  }
2130 
2131  pkt->pts = max_end_ts.ts;
2132  pkt->time_base = max_end_ts.tb;
2133 
2134  return 0;
2135 }
2136 
2137 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
2138  unsigned flags)
2139 {
2140  SchDemux *d;
2141  int terminate;
2142 
2143  av_assert0(demux_idx < sch->nb_demux);
2144  d = &sch->demux[demux_idx];
2145 
2146  terminate = waiter_wait(sch, &d->waiter);
2147  if (terminate)
2148  return AVERROR_EXIT;
2149 
2150  // flush the downstreams after seek
2151  if (pkt->stream_index == -1)
2152  return demux_flush(sch, d, pkt);
2153 
2155 
2156  return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
2157 }
2158 
2159 static int demux_done(Scheduler *sch, unsigned demux_idx)
2160 {
2161  SchDemux *d = &sch->demux[demux_idx];
2162  int ret = 0;
2163 
2164  for (unsigned i = 0; i < d->nb_streams; i++) {
2165  int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
2166  if (err != AVERROR_EOF)
2167  ret = err_merge(ret, err);
2168  }
2169 
2171 
2172  d->task_exited = 1;
2173 
2175 
2177 
2178  return ret;
2179 }
2180 
2181 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2182 {
2183  SchMux *mux;
2184  int ret, stream_idx;
2185 
2186  av_assert0(mux_idx < sch->nb_mux);
2187  mux = &sch->mux[mux_idx];
2188 
2189  ret = tq_receive(mux->queue, &stream_idx, pkt);
2190  pkt->stream_index = stream_idx;
2191  return ret;
2192 }
2193 
2194 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2195 {
2196  SchMux *mux;
2197 
2198  av_assert0(mux_idx < sch->nb_mux);
2199  mux = &sch->mux[mux_idx];
2200 
2201  av_assert0(stream_idx < mux->nb_streams);
2202  tq_receive_finish(mux->queue, stream_idx);
2203 
2205  mux->streams[stream_idx].source_finished = 1;
2206 
2208 
2210 }
2211 
2212 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2213  const AVPacket *pkt)
2214 {
2215  SchMux *mux;
2216  SchMuxStream *ms;
2217 
2218  av_assert0(mux_idx < sch->nb_mux);
2219  mux = &sch->mux[mux_idx];
2220 
2221  av_assert0(stream_idx < mux->nb_streams);
2222  ms = &mux->streams[stream_idx];
2223 
2224  for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2225  SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2226  int ret;
2227 
2229  if (ret < 0)
2230  return ret;
2231 
2232  tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2233  }
2234 
2235  return 0;
2236 }
2237 
2238 static int mux_done(Scheduler *sch, unsigned mux_idx)
2239 {
2240  SchMux *mux = &sch->mux[mux_idx];
2241 
2243 
2244  for (unsigned i = 0; i < mux->nb_streams; i++) {
2245  tq_receive_finish(mux->queue, i);
2246  mux->streams[i].source_finished = 1;
2247  }
2248 
2250 
2252 
2254 
2255  av_assert0(sch->nb_mux_done < sch->nb_mux);
2256  sch->nb_mux_done++;
2257 
2259 
2261 
2262  return 0;
2263 }
2264 
2265 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2266 {
2267  SchDec *dec;
2268  int ret, dummy;
2269 
2270  av_assert0(dec_idx < sch->nb_dec);
2271  dec = &sch->dec[dec_idx];
2272 
2273  // the decoder should have given us post-flush end timestamp in pkt
2274  if (dec->expect_end_ts) {
2275  Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2277  if (ret < 0)
2278  return ret;
2279 
2280  dec->expect_end_ts = 0;
2281  }
2282 
2283  ret = tq_receive(dec->queue, &dummy, pkt);
2284  av_assert0(dummy <= 0);
2285 
2286  // got a flush packet, on the next call to this function the decoder
2287  // will give us post-flush end timestamp
2288  if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2289  dec->expect_end_ts = 1;
2290 
2291  return ret;
2292 }
2293 
2295  unsigned in_idx, AVFrame *frame)
2296 {
2297  if (frame)
2298  return tq_send(fg->queue, in_idx, frame);
2299 
2300  if (!fg->inputs[in_idx].send_finished) {
2301  fg->inputs[in_idx].send_finished = 1;
2302  tq_send_finish(fg->queue, in_idx);
2303 
2304  // close the control stream when all actual inputs are done
2305  if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2306  tq_send_finish(fg->queue, fg->nb_inputs);
2307  }
2308  return 0;
2309 }
2310 
2312  uint8_t *dst_finished, AVFrame *frame)
2313 {
2314  int ret;
2315 
2316  if (*dst_finished)
2317  return AVERROR_EOF;
2318 
2319  if (!frame)
2320  goto finish;
2321 
2322  ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2323  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2324  send_to_enc(sch, &sch->enc[dst.idx], frame);
2325  if (ret == AVERROR_EOF)
2326  goto finish;
2327 
2328  return ret;
2329 
2330 finish:
2331  if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2332  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2333  else
2334  send_to_enc(sch, &sch->enc[dst.idx], NULL);
2335 
2336  *dst_finished = 1;
2337 
2338  return AVERROR_EOF;
2339 }
2340 
2341 int sch_dec_send(Scheduler *sch, unsigned dec_idx,
2342  unsigned out_idx, AVFrame *frame)
2343 {
2344  SchDec *dec;
2345  SchDecOutput *o;
2346  int ret;
2347  unsigned nb_done = 0;
2348 
2349  av_assert0(dec_idx < sch->nb_dec);
2350  dec = &sch->dec[dec_idx];
2351 
2352  av_assert0(out_idx < dec->nb_outputs);
2353  o = &dec->outputs[out_idx];
2354 
2355  for (unsigned i = 0; i < o->nb_dst; i++) {
2356  uint8_t *finished = &o->dst_finished[i];
2357  AVFrame *to_send = frame;
2358 
2359  // sending a frame consumes it, so make a temporary reference if needed
2360  if (i < o->nb_dst - 1) {
2361  to_send = dec->send_frame;
2362 
2363  // frame may sometimes contain props only,
2364  // e.g. to signal EOF timestamp
2365  ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2366  av_frame_copy_props(to_send, frame);
2367  if (ret < 0)
2368  return ret;
2369  }
2370 
2371  ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
2372  if (ret < 0) {
2373  av_frame_unref(to_send);
2374  if (ret == AVERROR_EOF) {
2375  nb_done++;
2376  continue;
2377  }
2378  return ret;
2379  }
2380  }
2381 
2382  return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
2383 }
2384 
2385 static int dec_done(Scheduler *sch, unsigned dec_idx)
2386 {
2387  SchDec *dec = &sch->dec[dec_idx];
2388  int ret = 0;
2389 
2390  tq_receive_finish(dec->queue, 0);
2391 
2392  // make sure our source does not get stuck waiting for end timestamps
2393  // that will never arrive
2394  if (dec->queue_end_ts)
2396 
2397  for (unsigned i = 0; i < dec->nb_outputs; i++) {
2398  SchDecOutput *o = &dec->outputs[i];
2399 
2400  for (unsigned j = 0; j < o->nb_dst; j++) {
2401  int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
2402  if (err < 0 && err != AVERROR_EOF)
2403  ret = err_merge(ret, err);
2404  }
2405  }
2406 
2407  return ret;
2408 }
2409 
2410 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2411 {
2412  SchEnc *enc;
2413  int ret, dummy;
2414 
2415  av_assert0(enc_idx < sch->nb_enc);
2416  enc = &sch->enc[enc_idx];
2417 
2418  ret = tq_receive(enc->queue, &dummy, frame);
2419  av_assert0(dummy <= 0);
2420 
2421  return ret;
2422 }
2423 
2425  uint8_t *dst_finished, AVPacket *pkt)
2426 {
2427  int ret;
2428 
2429  if (*dst_finished)
2430  return AVERROR_EOF;
2431 
2432  if (!pkt)
2433  goto finish;
2434 
2435  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2436  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2437  tq_send(sch->dec[dst.idx].queue, 0, pkt);
2438  if (ret == AVERROR_EOF)
2439  goto finish;
2440 
2441  return ret;
2442 
2443 finish:
2444  if (dst.type == SCH_NODE_TYPE_MUX)
2445  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2446  else
2447  tq_send_finish(sch->dec[dst.idx].queue, 0);
2448 
2449  *dst_finished = 1;
2450 
2451  return AVERROR_EOF;
2452 }
2453 
2454 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2455 {
2456  SchEnc *enc;
2457  int ret;
2458 
2459  av_assert0(enc_idx < sch->nb_enc);
2460  enc = &sch->enc[enc_idx];
2461 
2462  for (unsigned i = 0; i < enc->nb_dst; i++) {
2463  uint8_t *finished = &enc->dst_finished[i];
2464  AVPacket *to_send = pkt;
2465 
2466  // sending a packet consumes it, so make a temporary reference if needed
2467  if (i < enc->nb_dst - 1) {
2468  to_send = enc->send_pkt;
2469 
2470  ret = av_packet_ref(to_send, pkt);
2471  if (ret < 0)
2472  return ret;
2473  }
2474 
2475  ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2476  if (ret < 0) {
2477  av_packet_unref(to_send);
2478  if (ret == AVERROR_EOF)
2479  continue;
2480  return ret;
2481  }
2482  }
2483 
2484  return 0;
2485 }
2486 
2487 static int enc_done(Scheduler *sch, unsigned enc_idx)
2488 {
2489  SchEnc *enc = &sch->enc[enc_idx];
2490  int ret = 0;
2491 
2492  tq_receive_finish(enc->queue, 0);
2493 
2494  for (unsigned i = 0; i < enc->nb_dst; i++) {
2495  int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2496  if (err < 0 && err != AVERROR_EOF)
2497  ret = err_merge(ret, err);
2498  }
2499 
2500  return ret;
2501 }
2502 
2503 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2504  unsigned *in_idx, AVFrame *frame)
2505 {
2506  SchFilterGraph *fg;
2507 
2508  av_assert0(fg_idx < sch->nb_filters);
2509  fg = &sch->filters[fg_idx];
2510 
2511  av_assert0(*in_idx <= fg->nb_inputs);
2512 
2513  // update scheduling to account for desired input stream, if it changed
2514  //
2515  // this check needs no locking because only the filtering thread
2516  // updates this value
2517  if (*in_idx != fg->best_input) {
2519 
2520  fg->best_input = *in_idx;
2522 
2524  }
2525 
2526  if (*in_idx == fg->nb_inputs) {
2527  int terminate = waiter_wait(sch, &fg->waiter);
2528  return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2529  }
2530 
2531  while (1) {
2532  int ret, idx;
2533 
2534  ret = tq_receive(fg->queue, &idx, frame);
2535  if (idx < 0)
2536  return AVERROR_EOF;
2537  else if (ret >= 0) {
2538  *in_idx = idx;
2539  return 0;
2540  }
2541 
2542  // disregard EOFs for specific streams - they should always be
2543  // preceded by an EOF frame
2544  }
2545 }
2546 
2547 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2548 {
2549  SchFilterGraph *fg;
2550  SchFilterIn *fi;
2551 
2552  av_assert0(fg_idx < sch->nb_filters);
2553  fg = &sch->filters[fg_idx];
2554 
2555  av_assert0(in_idx < fg->nb_inputs);
2556  fi = &fg->inputs[in_idx];
2557 
2559 
2560  if (!fi->receive_finished) {
2561  fi->receive_finished = 1;
2562  tq_receive_finish(fg->queue, in_idx);
2563 
2564  // close the control stream when all actual inputs are done
2565  if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2566  tq_receive_finish(fg->queue, fg->nb_inputs);
2567 
2569  }
2570 
2572 }
2573 
2574 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2575 {
2576  SchFilterGraph *fg;
2578  int ret;
2579 
2580  av_assert0(fg_idx < sch->nb_filters);
2581  fg = &sch->filters[fg_idx];
2582 
2583  av_assert0(out_idx < fg->nb_outputs);
2584  dst = fg->outputs[out_idx].dst;
2585 
2586  if (dst.type == SCH_NODE_TYPE_ENC) {
2587  ret = send_to_enc(sch, &sch->enc[dst.idx], frame);
2588  if (ret == AVERROR_EOF)
2589  send_to_enc(sch, &sch->enc[dst.idx], NULL);
2590  } else {
2591  ret = send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2592  if (ret == AVERROR_EOF)
2593  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2594  }
2595  return ret;
2596 }
2597 
2598 static int filter_done(Scheduler *sch, unsigned fg_idx)
2599 {
2600  SchFilterGraph *fg = &sch->filters[fg_idx];
2601  int ret = 0;
2602 
2603  for (unsigned i = 0; i <= fg->nb_inputs; i++)
2604  tq_receive_finish(fg->queue, i);
2605 
2606  for (unsigned i = 0; i < fg->nb_outputs; i++) {
2607  SchedulerNode dst = fg->outputs[i].dst;
2608  int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2609  send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2610  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2611 
2612  if (err < 0 && err != AVERROR_EOF)
2613  ret = err_merge(ret, err);
2614  }
2615 
2617 
2618  fg->task_exited = 1;
2619 
2621 
2623 
2624  return ret;
2625 }
2626 
2627 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2628 {
2629  SchFilterGraph *fg;
2630 
2631  av_assert0(fg_idx < sch->nb_filters);
2632  fg = &sch->filters[fg_idx];
2633 
2634  return send_to_filter(sch, fg, fg->nb_inputs, frame);
2635 }
2636 
2637 void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx)
2638 {
2639  SchFilterGraph *fg;
2640  av_assert0(fg_idx < sch->nb_filters);
2641  fg = &sch->filters[fg_idx];
2642 
2644  fg->best_input = fg->nb_inputs;
2647 }
2648 
2649 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2650 {
2651  switch (node.type) {
2652  case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2653  case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2654  case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2655  case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2656  case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2657  default: av_assert0(0);
2658  }
2659 }
2660 
2661 static void *task_wrapper(void *arg)
2662 {
2663  SchTask *task = arg;
2664  Scheduler *sch = task->parent;
2665  int ret;
2666  int err = 0;
2667 
2668  ret = task->func(task->func_arg);
2669  if (ret < 0)
2670  av_log(task->func_arg, AV_LOG_ERROR,
2671  "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2672 
2673  err = task_cleanup(sch, task->node);
2674  ret = err_merge(ret, err);
2675 
2676  // EOF is considered normal termination
2677  if (ret == AVERROR_EOF)
2678  ret = 0;
2679  if (ret < 0) {
2681  sch->task_failed = 1;
2684  }
2685 
2687  "Terminating thread with return code %d (%s)\n", ret,
2688  ret < 0 ? av_err2str(ret) : "success");
2689 
2690  return (void*)(intptr_t)ret;
2691 }
2692 
2693 static int task_stop(Scheduler *sch, SchTask *task)
2694 {
2695  int ret;
2696  void *thread_ret;
2697 
2698  if (!task->parent)
2699  return 0;
2700 
2701  if (!task->thread_running)
2702  return task_cleanup(sch, task->node);
2703 
2704  ret = pthread_join(task->thread, &thread_ret);
2705  av_assert0(ret == 0);
2706 
2707  task->thread_running = 0;
2708 
2709  return (intptr_t)thread_ret;
2710 }
2711 
2712 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2713 {
2714  int ret = 0, err;
2715 
2716  if (sch->state != SCH_STATE_STARTED)
2717  return 0;
2718 
2719  atomic_store(&sch->terminate, 1);
2720 
2721  for (unsigned type = 0; type < 2; type++)
2722  for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2723  SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2724  waiter_set(w, 1);
2725  if (type)
2726  choke_demux(sch, i, 0); // unfreeze to allow draining
2727  }
2728 
2729  for (unsigned i = 0; i < sch->nb_demux; i++) {
2730  SchDemux *d = &sch->demux[i];
2731 
2732  err = task_stop(sch, &d->task);
2733  ret = err_merge(ret, err);
2734  }
2735 
2736  for (unsigned i = 0; i < sch->nb_dec; i++) {
2737  SchDec *dec = &sch->dec[i];
2738 
2739  err = task_stop(sch, &dec->task);
2740  ret = err_merge(ret, err);
2741  }
2742 
2743  for (unsigned i = 0; i < sch->nb_filters; i++) {
2744  SchFilterGraph *fg = &sch->filters[i];
2745 
2746  err = task_stop(sch, &fg->task);
2747  ret = err_merge(ret, err);
2748  }
2749 
2750  for (unsigned i = 0; i < sch->nb_enc; i++) {
2751  SchEnc *enc = &sch->enc[i];
2752 
2753  err = task_stop(sch, &enc->task);
2754  ret = err_merge(ret, err);
2755  }
2756 
2757  for (unsigned i = 0; i < sch->nb_mux; i++) {
2758  SchMux *mux = &sch->mux[i];
2759 
2760  err = task_stop(sch, &mux->task);
2761  ret = err_merge(ret, err);
2762  }
2763 
2764  if (finish_ts)
2765  *finish_ts = trailing_dts(sch, 1);
2766 
2767  sch->state = SCH_STATE_STOPPED;
2768 
2769  return ret;
2770 }
Scheduler::sq_enc
SchSyncQueue * sq_enc
Definition: ffmpeg_sched.c:297
flags
const SwsFlags flags[]
Definition: swscale.c:61
func
int(* func)(AVBPrint *dst, const char *in, const char *arg)
Definition: jacosubdec.c:66
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
SchWaiter
Definition: ffmpeg_sched.c:52
av_packet_unref
void av_packet_unref(AVPacket *pkt)
Wipe the packet.
Definition: packet.c:432
mux_task_start
static int mux_task_start(SchMux *mux)
Definition: ffmpeg_sched.c:1115
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
SchedulerNode::idx_stream
unsigned idx_stream
Definition: ffmpeg_sched.h:106
SchDecOutput::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:76
waiter_init
static int waiter_init(SchWaiter *w)
Definition: ffmpeg_sched.c:349
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
Scheduler::finish_lock
pthread_mutex_t finish_lock
Definition: ffmpeg_sched.c:287
atomic_store
#define atomic_store(object, desired)
Definition: stdatomic.h:85
sch_filter_send
int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
Called by filtergraph tasks to send a filtered frame or EOF to consumers.
Definition: ffmpeg_sched.c:2574
err_merge
static int err_merge(int err0, int err1)
Merge two return codes - return one of the error codes if at least one of them was negative,...
Definition: ffmpeg_utils.h:39
SchDec::task
SchTask task
Definition: ffmpeg_sched.c:88
THREAD_QUEUE_PACKETS
@ THREAD_QUEUE_PACKETS
Definition: thread_queue.h:26
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
Scheduler::enc
SchEnc * enc
Definition: ffmpeg_sched.c:294
Scheduler::nb_mux_done
unsigned nb_mux_done
Definition: ffmpeg_sched.c:285
av_compare_ts
int av_compare_ts(int64_t ts_a, AVRational tb_a, int64_t ts_b, AVRational tb_b)
Compare two timestamps each in its own time base.
Definition: mathematics.c:147
SchedulerState
SchedulerState
Definition: ffmpeg_sched.c:267
sch_mux_receive_finish
void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Called by muxer tasks to signal that a stream will no longer accept input.
Definition: ffmpeg_sched.c:2194
SCH_NODE_TYPE_ENC
@ SCH_NODE_TYPE_ENC
Definition: ffmpeg_sched.h:98
SchSyncQueue::sq
SyncQueue * sq
Definition: ffmpeg_sched.c:101
SchTask::thread
pthread_t thread
Definition: ffmpeg_sched.c:70
atomic_fetch_add
#define atomic_fetch_add(object, operand)
Definition: stdatomic.h:137
demux_flush
static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
Definition: ffmpeg_sched.c:2093
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
Scheduler::nb_sq_enc
unsigned nb_sq_enc
Definition: ffmpeg_sched.c:298
SchMux::sub_heartbeat_pkt
AVPacket * sub_heartbeat_pkt
Definition: ffmpeg_sched.c:232
sq_limit_frames
void sq_limit_frames(SyncQueue *sq, unsigned int stream_idx, uint64_t frames)
Limit the number of output frames for stream with index stream_idx to max_frames.
Definition: sync_queue.c:628
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
SchEnc::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:147
SCHEDULE_TOLERANCE
#define SCHEDULE_TOLERANCE
Definition: ffmpeg_sched.c:45
sch_add_demux
int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
Add a demuxer to the scheduler.
Definition: ffmpeg_sched.c:698
PreMuxQueue::data_size
size_t data_size
Definition: ffmpeg_sched.c:185
AV_TIME_BASE_Q
#define AV_TIME_BASE_Q
Internal time base represented as fractional value.
Definition: avutil.h:263
int64_t
long long int64_t
Definition: coverity.c:34
CYCLE_NODE_STARTED
@ CYCLE_NODE_STARTED
Definition: ffmpeg_sched.c:1475
SchTask::func
SchThreadFunc func
Definition: ffmpeg_sched.c:67
mux_done
static int mux_done(Scheduler *sch, unsigned mux_idx)
Definition: ffmpeg_sched.c:2238
Scheduler::nb_enc
unsigned nb_enc
Definition: ffmpeg_sched.c:295
av_frame_free
void av_frame_free(AVFrame **frame)
Free the frame and any dynamically allocated objects in it, e.g.
Definition: frame.c:64
SQFRAME
#define SQFRAME(frame)
Definition: sync_queue.h:38
av_fifo_peek
int av_fifo_peek(const AVFifo *f, void *buf, size_t nb_elems, size_t offset)
Read data from a FIFO without modifying FIFO state.
Definition: fifo.c:255
check_acyclic_for_output
static int check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, uint8_t *filters_visited, SchedulerNode *filters_stack)
Definition: ffmpeg_sched.c:1501
AVFrame
This structure describes decoded (raw) audio or video data.
Definition: frame.h:427
w
uint8_t w
Definition: llviddspenc.c:38
task_cleanup
static int task_cleanup(Scheduler *sch, SchedulerNode node)
Definition: ffmpeg_sched.c:2649
sync_queue.h
AVPacket::data
uint8_t * data
Definition: packet.h:588
SchMux::nb_streams_ready
unsigned nb_streams_ready
Definition: ffmpeg_sched.c:217
pthread_mutex_lock
static av_always_inline int pthread_mutex_lock(pthread_mutex_t *mutex)
Definition: os2threads.h:119
SchDemux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:160
send_to_mux
static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx, AVPacket *pkt)
Definition: ffmpeg_sched.c:1967
Scheduler::nb_mux_ready
unsigned nb_mux_ready
Definition: ffmpeg_sched.c:282
atomic_int
intptr_t atomic_int
Definition: stdatomic.h:55
tq_alloc
ThreadQueue * tq_alloc(unsigned int nb_streams, size_t queue_size, enum ThreadQueueType type)
Allocate a queue for sending data between threads.
Definition: thread_queue.c:72
enc_done
static int enc_done(Scheduler *sch, unsigned enc_idx)
Definition: ffmpeg_sched.c:2487
SCH_NODE_TYPE_MUX
@ SCH_NODE_TYPE_MUX
Definition: ffmpeg_sched.h:96
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:226
unchoke_downstream
static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst)
Definition: ffmpeg_sched.c:1289
AVPacket::duration
int64_t duration
Duration of this packet in AVStream->time_base units, 0 if unknown.
Definition: packet.h:606
SchWaiter::choked_prev
int choked_prev
Definition: ffmpeg_sched.c:59
QUEUE_PACKETS
@ QUEUE_PACKETS
Definition: ffmpeg_sched.c:48
SchMux
Definition: ffmpeg_sched.c:212
Scheduler::class
const AVClass * class
Definition: ffmpeg_sched.c:274
SchFilterOut
Definition: ffmpeg_sched.c:241
SCH_STATE_UNINIT
@ SCH_STATE_UNINIT
Definition: ffmpeg_sched.c:268
Timestamp::ts
int64_t ts
Definition: ffmpeg_utils.h:31
PreMuxQueue::fifo
AVFifo * fifo
Queue for buffering the packets before the muxer task can be started.
Definition: ffmpeg_sched.c:176
SchMuxStream::last_dts
int64_t last_dts
Definition: ffmpeg_sched.c:206
av_packet_free
void av_packet_free(AVPacket **pkt)
Free the packet, if the packet is reference counted, it will be unreferenced first.
Definition: packet.c:74
DEFAULT_FRAME_THREAD_QUEUE_SIZE
#define DEFAULT_FRAME_THREAD_QUEUE_SIZE
Default size of a frame thread queue.
Definition: ffmpeg_sched.h:262
Scheduler::last_dts
atomic_int_least64_t last_dts
Definition: ffmpeg_sched.c:311
SchDemux::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:166
sch_mux_stream_ready
int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Signal to the scheduler that the specified muxed stream is initialized and ready.
Definition: ffmpeg_sched.c:1231
send_to_enc_thread
static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1816
task_stop
static int task_stop(Scheduler *sch, SchTask *task)
Definition: ffmpeg_sched.c:2693
SchFilterIn::receive_finished
int receive_finished
Definition: ffmpeg_sched.c:238
SchedulerNode::type
enum SchedulerNodeType type
Definition: ffmpeg_sched.h:104
fifo.h
finish
static void finish(void)
Definition: movenc.c:374
sch_stop
int sch_stop(Scheduler *sch, int64_t *finish_ts)
Definition: ffmpeg_sched.c:2712
SchEnc::sq_idx
int sq_idx[2]
Definition: ffmpeg_sched.c:119
fail
#define fail()
Definition: checkasm.h:207
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
SchThreadFunc
int(* SchThreadFunc)(void *arg)
Definition: ffmpeg_sched.h:109
SchFilterOut::dst
SchedulerNode dst
Definition: ffmpeg_sched.c:242
SchDec::outputs
SchDecOutput * outputs
Definition: ffmpeg_sched.c:85
SchEnc
Definition: ffmpeg_sched.c:109
dummy
int dummy
Definition: motion.c:64
av_fifo_grow2
int av_fifo_grow2(AVFifo *f, size_t inc)
Enlarge an AVFifo.
Definition: fifo.c:99
SchDec::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:90
sch_add_mux_stream
int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
Add a muxed stream for a previously added muxer.
Definition: ffmpeg_sched.c:666
SchMux::class
const AVClass * class
Definition: ffmpeg_sched.c:213
SchFilterGraph::nb_inputs_finished_send
atomic_uint nb_inputs_finished_send
Definition: ffmpeg_sched.c:250
SCH_STATE_STOPPED
@ SCH_STATE_STOPPED
Definition: ffmpeg_sched.c:270
sq_receive
int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
Read a frame from the queue.
Definition: sync_queue.c:586
AVERROR_BUFFER_TOO_SMALL
#define AVERROR_BUFFER_TOO_SMALL
Buffer too small.
Definition: error.h:53
type
it s the only field you need to keep assuming you have a context There is some magic you don t need to care about around this just let it vf type
Definition: writing_filters.txt:86
Scheduler::nb_dec
unsigned nb_dec
Definition: ffmpeg_sched.c:292
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:177
sch_add_filtergraph
int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, SchThreadFunc func, void *ctx)
Add a filtergraph to the scheduler.
Definition: ffmpeg_sched.c:839
av_frame_alloc
AVFrame * av_frame_alloc(void)
Allocate an AVFrame and set its fields to default values.
Definition: frame.c:52
SchFilterGraph
Definition: ffmpeg_sched.c:245
avassert.h
pkt
AVPacket * pkt
Definition: movenc.c:60
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:210
sch_free
void sch_free(Scheduler **psch)
Definition: ffmpeg_sched.c:477
Scheduler::state
enum SchedulerState state
Definition: ffmpeg_sched.c:306
SchMux::streams
SchMuxStream * streams
Definition: ffmpeg_sched.c:215
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:161
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
SchMuxStream
Definition: ffmpeg_sched.c:190
SchDecOutput
Definition: ffmpeg_sched.c:74
sch_add_mux
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int(*init)(void *), void *arg, int sdp_auto, unsigned thread_queue_size)
Add a muxer to the scheduler.
Definition: ffmpeg_sched.c:642
waiter_set
static void waiter_set(SchWaiter *w, int choked)
Definition: ffmpeg_sched.c:339
SchFilterGraph::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:254
SchDec::expect_end_ts
int expect_end_ts
Definition: ffmpeg_sched.c:94
pthread_mutex_unlock
static av_always_inline int pthread_mutex_unlock(pthread_mutex_t *mutex)
Definition: os2threads.h:126
enc_open
static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
Definition: ffmpeg_sched.c:1790
sch_alloc
Scheduler * sch_alloc(void)
Definition: ffmpeg_sched.c:595
dec_send_to_dst
static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVFrame *frame)
Definition: ffmpeg_sched.c:2311
task_init
static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx, SchThreadFunc func, void *func_arg)
Definition: ffmpeg_sched.c:426
SchMuxStream::nb_sub_heartbeat_dst
unsigned nb_sub_heartbeat_dst
Definition: ffmpeg_sched.c:194
SchEnc::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:113
sch_add_demux_stream
int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
Add a demuxed stream for a previously added demuxer.
Definition: ffmpeg_sched.c:725
SchMuxStream::src
SchedulerNode src
Definition: ffmpeg_sched.c:191
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:41
SchedulerNodeType
SchedulerNodeType
Definition: ffmpeg_sched.h:93
sch_dec_send
int sch_dec_send(Scheduler *sch, unsigned dec_idx, unsigned out_idx, AVFrame *frame)
Called by decoder tasks to send a decoded frame downstream.
Definition: ffmpeg_sched.c:2341
ctx
AVFormatContext * ctx
Definition: movenc.c:49
nb_streams
static int nb_streams
Definition: ffprobe.c:340
SchMuxStream::source_finished
int source_finished
Definition: ffmpeg_sched.c:208
av_rescale_q
int64_t av_rescale_q(int64_t a, AVRational bq, AVRational cq)
Rescale a 64-bit integer by 2 rational numbers.
Definition: mathematics.c:142
ffmpeg_utils.h
filter_done
static int filter_done(Scheduler *sch, unsigned fg_idx)
Definition: ffmpeg_sched.c:2598
SchFilterGraph::class
const AVClass * class
Definition: ffmpeg_sched.c:246
sch_enc_receive
int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
Called by encoder tasks to obtain frames for encoding.
Definition: ffmpeg_sched.c:2410
AVThreadMessageQueue
Definition: threadmessage.c:30
atomic_load
#define atomic_load(object)
Definition: stdatomic.h:93
sch_sdp_filename
int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
Set the file path for the SDP.
Definition: ffmpeg_sched.c:629
SchEnc::class
const AVClass * class
Definition: ffmpeg_sched.c:110
demux_send_for_stream
static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:2061
arg
const char * arg
Definition: jacosubdec.c:65
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
SchMuxStream::pre_mux_queue
PreMuxQueue pre_mux_queue
Definition: ffmpeg_sched.c:196
sq_add_stream
int sq_add_stream(SyncQueue *sq, int limiting)
Add a new stream to the sync queue.
Definition: sync_queue.c:598
SchMux::mux_started
atomic_int mux_started
Set to 1 after starting the muxer task and flushing the pre-muxing queues.
Definition: ffmpeg_sched.c:228
Scheduler::finish_cond
pthread_cond_t finish_cond
Definition: ffmpeg_sched.c:288
SCH_NODE_TYPE_DEMUX
@ SCH_NODE_TYPE_DEMUX
Definition: ffmpeg_sched.h:95
Scheduler::demux
SchDemux * demux
Definition: ffmpeg_sched.c:276
AVPacket::buf
AVBufferRef * buf
A reference to the reference-counted buffer where the packet data is stored.
Definition: packet.h:571
tq_free
void tq_free(ThreadQueue **ptq)
Definition: thread_queue.c:54
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
waiter_uninit
static void waiter_uninit(SchWaiter *w)
Definition: ffmpeg_sched.c:366
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:76
Scheduler::sdp_filename
char * sdp_filename
Definition: ffmpeg_sched.c:303
send_to_enc_sq
static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1835
NULL
#define NULL
Definition: coverity.c:32
SchEnc::open_cb
int(* open_cb)(void *opaque, const AVFrame *frame)
Definition: ffmpeg_sched.c:137
av_frame_copy_props
int av_frame_copy_props(AVFrame *dst, const AVFrame *src)
Copy only "metadata" fields from src to dst.
Definition: frame.c:599
SchFilterGraph::nb_inputs
unsigned nb_inputs
Definition: ffmpeg_sched.c:249
Scheduler::mux
SchMux * mux
Definition: ffmpeg_sched.c:279
schedule_update_locked
static void schedule_update_locked(Scheduler *sch)
Definition: ffmpeg_sched.c:1392
tq_receive_finish
void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the receiving side.
Definition: thread_queue.c:243
sch_add_enc
int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, int(*open_cb)(void *opaque, const AVFrame *frame))
Definition: ffmpeg_sched.c:801
SchSyncQueue::enc_idx
unsigned * enc_idx
Definition: ffmpeg_sched.c:105
av_unreachable
#define av_unreachable(msg)
Asserts that are used as compiler optimization hints depending upon ASSERT_LEVEL and NBDEBUG.
Definition: avassert.h:108
SCH_STATE_STARTED
@ SCH_STATE_STARTED
Definition: ffmpeg_sched.c:269
choke_demux
static void choke_demux(const Scheduler *sch, int demux_id, int choked)
Definition: ffmpeg_sched.c:1362
dec_done
static int dec_done(Scheduler *sch, unsigned dec_idx)
Definition: ffmpeg_sched.c:2385
SchFilterGraph::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:259
SchDemux::class
const AVClass * class
Definition: ffmpeg_sched.c:157
av_fifo_can_read
size_t av_fifo_can_read(const AVFifo *f)
Definition: fifo.c:87
SchEnc::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:114
sch_add_dec
int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
Add a decoder to the scheduler.
Definition: ffmpeg_sched.c:758
SchWaiter::choked
atomic_int choked
Definition: ffmpeg_sched.c:55
SchWaiter::cond
pthread_cond_t cond
Definition: ffmpeg_sched.c:54
CYCLE_NODE_NEW
@ CYCLE_NODE_NEW
Definition: ffmpeg_sched.c:1474
time.h
DEMUX_SEND_STREAMCOPY_EOF
@ DEMUX_SEND_STREAMCOPY_EOF
Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations send normally to other types.
Definition: ffmpeg_sched.h:340
sch_fg_class
static const AVClass sch_fg_class
Definition: ffmpeg_sched.c:833
QUEUE_FRAMES
@ QUEUE_FRAMES
Definition: ffmpeg_sched.c:49
av_packet_ref
int av_packet_ref(AVPacket *dst, const AVPacket *src)
Setup a new reference to the data described by a given packet.
Definition: packet.c:440
av_packet_move_ref
void av_packet_move_ref(AVPacket *dst, AVPacket *src)
Move every field in src to dst and reset src.
Definition: packet.c:489
SchTask::thread_running
int thread_running
Definition: ffmpeg_sched.c:71
sch_enc_send
int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
Called by encoder tasks to send encoded packets downstream.
Definition: ffmpeg_sched.c:2454
error.h
Scheduler
Definition: ffmpeg_sched.c:273
SchMux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:216
SchSyncQueue::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:103
SchMuxStream::sub_heartbeat_dst
unsigned * sub_heartbeat_dst
Definition: ffmpeg_sched.c:193
SchDec::class
const AVClass * class
Definition: ffmpeg_sched.c:81
sq_frame_samples
void sq_frame_samples(SyncQueue *sq, unsigned int stream_idx, int frame_samples)
Set a constant output audio frame size, in samples.
Definition: sync_queue.c:640
SchEnc::in_finished
int in_finished
Definition: ffmpeg_sched.c:144
SchDemux::task
SchTask task
Definition: ffmpeg_sched.c:162
SchDemuxStream::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:153
SchFilterGraph::nb_inputs_finished_receive
unsigned nb_inputs_finished_receive
Definition: ffmpeg_sched.c:251
tq_send
int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
Send an item for the given stream to the queue.
Definition: thread_queue.c:117
init
int(* init)(AVBSFContext *ctx)
Definition: dts2pts.c:550
AVPacket::size
int size
Definition: packet.h:589
AVFifo
Definition: fifo.c:35
SchSyncQueue::nb_enc_idx
unsigned nb_enc_idx
Definition: ffmpeg_sched.c:106
SchFilterGraph::task_exited
int task_exited
Definition: ffmpeg_sched.c:264
av_frame_ref
int av_frame_ref(AVFrame *dst, const AVFrame *src)
Set up a new reference to the data described by the source frame.
Definition: frame.c:278
threadmessage.h
dst
uint8_t ptrdiff_t const uint8_t ptrdiff_t int intptr_t intptr_t int int16_t * dst
Definition: dsp.h:87
PreMuxQueue::max_packets
int max_packets
Maximum number of packets in fifo.
Definition: ffmpeg_sched.c:180
SchFilterGraph::task
SchTask task
Definition: ffmpeg_sched.c:256
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:122
sch_filter_choke_inputs
void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx)
Called by filtergraph tasks to choke all filter inputs, preventing them from receiving more frames un...
Definition: ffmpeg_sched.c:2637
SchWaiter::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:53
sq_send
int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
Submit a frame for the stream with index stream_idx.
Definition: sync_queue.c:333
PreMuxQueue::data_threshold
size_t data_threshold
Definition: ffmpeg_sched.c:187
sq_free
void sq_free(SyncQueue **psq)
Definition: sync_queue.c:671
AV_NOPTS_VALUE
#define AV_NOPTS_VALUE
Undefined timestamp value.
Definition: avutil.h:247
sch_dec_class
static const AVClass sch_dec_class
Definition: ffmpeg_sched.c:752
SchFilterGraph::inputs
SchFilterIn * inputs
Definition: ffmpeg_sched.c:248
Scheduler::schedule_lock
pthread_mutex_t schedule_lock
Definition: ffmpeg_sched.c:309
frame.h
SchTask::func_arg
void * func_arg
Definition: ffmpeg_sched.c:68
SCH_NODE_TYPE_FILTER_OUT
@ SCH_NODE_TYPE_FILTER_OUT
Definition: ffmpeg_sched.h:100
AVPacket::dts
int64_t dts
Decompression timestamp in AVStream->time_base units; the time at which the packet is decompressed.
Definition: packet.h:587
enc_send_to_dst
static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt)
Definition: ffmpeg_sched.c:2424
CYCLE_NODE_DONE
@ CYCLE_NODE_DONE
Definition: ffmpeg_sched.c:1476
sch_mux_class
static const AVClass sch_mux_class
Definition: ffmpeg_sched.c:636
SchFilterIn
Definition: ffmpeg_sched.c:235
sch_filter_receive
int sch_filter_receive(Scheduler *sch, unsigned fg_idx, unsigned *in_idx, AVFrame *frame)
Called by filtergraph tasks to obtain frames for filtering.
Definition: ffmpeg_sched.c:2503
Scheduler::nb_mux
unsigned nb_mux
Definition: ffmpeg_sched.c:280
av_packet_alloc
AVPacket * av_packet_alloc(void)
Allocate an AVPacket and set its fields to default values.
Definition: packet.c:63
SchEnc::opened
int opened
Definition: ffmpeg_sched.c:138
scheduler_class
static const AVClass scheduler_class
Definition: ffmpeg_sched.c:590
pthread_t
Definition: os2threads.h:44
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
Scheduler::nb_demux
unsigned nb_demux
Definition: ffmpeg_sched.c:277
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:45
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
av_packet_copy_props
int av_packet_copy_props(AVPacket *dst, const AVPacket *src)
Copy only "properties" fields from src to dst.
Definition: packet.c:395
SchDemuxStream::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:152
SchDemux::task_exited
int task_exited
Definition: ffmpeg_sched.c:169
SCH_NODE_TYPE_FILTER_IN
@ SCH_NODE_TYPE_FILTER_IN
Definition: ffmpeg_sched.h:99
task_start
static int task_start(SchTask *task)
Definition: ffmpeg_sched.c:404
Scheduler::filters
SchFilterGraph * filters
Definition: ffmpeg_sched.c:300
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
AVPacket::pts
int64_t pts
Presentation timestamp in AVStream->time_base units; the time at which the decompressed packet will b...
Definition: packet.h:581
demux_done
static int demux_done(Scheduler *sch, unsigned demux_idx)
Definition: ffmpeg_sched.c:2159
packet.h
sch_remove_filtergraph
void sch_remove_filtergraph(Scheduler *sch, int idx)
Definition: ffmpeg_sched.c:460
SchWaiter::choked_next
int choked_next
Definition: ffmpeg_sched.c:60
SchFilterGraph::best_input
unsigned best_input
Definition: ffmpeg_sched.c:263
av_malloc_array
#define av_malloc_array(a, b)
Definition: tableprint_vlc.h:32
Scheduler::mux_ready_lock
pthread_mutex_t mux_ready_lock
Definition: ffmpeg_sched.c:283
Scheduler::terminate
atomic_int terminate
Definition: ffmpeg_sched.c:307
SchDec
Definition: ffmpeg_sched.c:80
av_assert1
#define av_assert1(cond)
assert() equivalent, that does not lie in speed critical code.
Definition: avassert.h:57
DEFAULT_PACKET_THREAD_QUEUE_SIZE
#define DEFAULT_PACKET_THREAD_QUEUE_SIZE
Default size of a packet thread queue.
Definition: ffmpeg_sched.h:257
QueueType
QueueType
Definition: ffmpeg_sched.c:47
THREAD_QUEUE_FRAMES
@ THREAD_QUEUE_FRAMES
Definition: thread_queue.h:25
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
SchDec::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:86
av_frame_unref
void av_frame_unref(AVFrame *frame)
Unreference all the buffers referenced by frame and reset the frame fields.
Definition: frame.c:496
trailing_dts
static int64_t trailing_dts(const Scheduler *sch, int count_finished)
Definition: ffmpeg_sched.c:438
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:256
SchFilterGraph::outputs
SchFilterOut * outputs
Definition: ffmpeg_sched.c:253
sch_enc_class
static const AVClass sch_enc_class
Definition: ffmpeg_sched.c:795
SchedulerNode
Definition: ffmpeg_sched.h:103
SCH_NODE_TYPE_DEC
@ SCH_NODE_TYPE_DEC
Definition: ffmpeg_sched.h:97
pthread_cond_t
Definition: os2threads.h:58
SchTask
Definition: ffmpeg_sched.c:63
mux_init
static int mux_init(Scheduler *sch, SchMux *mux)
Definition: ffmpeg_sched.c:1179
send_to_filter
static int send_to_filter(Scheduler *sch, SchFilterGraph *fg, unsigned in_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2294
tq_receive
int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
Read the next item from the queue.
Definition: thread_queue.c:197
SchDemuxStream::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:151
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
sch_connect
int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
Definition: ffmpeg_sched.c:937
send_to_enc
static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1911
sch_filter_command
int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2627
SchDemuxStream
Definition: ffmpeg_sched.c:150
Timestamp::tb
AVRational tb
Definition: ffmpeg_utils.h:32
atomic_int_least64_t
intptr_t atomic_int_least64_t
Definition: stdatomic.h:68
ret
ret
Definition: filter_design.txt:187
sch_dec_receive
int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
Called by decoder tasks to receive a packet for decoding.
Definition: ffmpeg_sched.c:2265
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:81
frame
these buffered frames must be flushed immediately if a new input produces new the filter must not call request_frame to get more It must just process the frame or queue it The task of requesting more frames is left to the filter s request_frame method or the application If a filter has several the filter must be ready for frames arriving randomly on any input any filter with several inputs will most likely require some kind of queuing mechanism It is perfectly acceptable to have a limited queue and to drop frames when the inputs are too unbalanced request_frame For filters that do not use the this method is called when a frame is wanted on an output For a it should directly call filter_frame on the corresponding output For a if there are queued frames already one of these frames should be pushed If the filter should request a frame on one of its repeatedly until at least one frame has been pushed Return or at least make progress towards producing a frame
Definition: filter_design.txt:265
SchMuxStream::init_eof
int init_eof
Definition: ffmpeg_sched.c:199
mux_queue_packet
static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
Definition: ffmpeg_sched.c:1931
SchMux::init
int(* init)(void *arg)
Definition: ffmpeg_sched.c:219
sch_demux_class
static const AVClass sch_demux_class
Definition: ffmpeg_sched.c:692
ThreadQueue
Definition: thread_queue.c:40
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
task_wrapper
static void * task_wrapper(void *arg)
Definition: ffmpeg_sched.c:2661
SchMux::task
SchTask task
Definition: ffmpeg_sched.c:221
SyncQueue
A sync queue provides timestamp synchronization between multiple streams.
Definition: sync_queue.c:90
sch_demux_send
int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt, unsigned flags)
Called by demuxer tasks to communicate with their downstreams.
Definition: ffmpeg_sched.c:2137
SchDemux
Definition: ffmpeg_sched.c:156
Scheduler::dec
SchDec * dec
Definition: ffmpeg_sched.c:291
atomic_uint
intptr_t atomic_uint
Definition: stdatomic.h:56
SchDec::queue_end_ts
AVThreadMessageQueue * queue_end_ts
Definition: ffmpeg_sched.c:93
demux_stream_send_to_dst
static int demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:2026
SchDec::src
SchedulerNode src
Definition: ffmpeg_sched.c:83
thread_queue.h
AVPacket::stream_index
int stream_index
Definition: packet.h:590
GROW_ARRAY
#define GROW_ARRAY(array, nb_elems)
Definition: cmdutils.h:536
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
SchMux::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:229
SchDemux::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:163
av_gettime
int64_t av_gettime(void)
Get the current time in microseconds.
Definition: time.c:39
waiter_wait
static int waiter_wait(Scheduler *sch, SchWaiter *w)
Wait until this task is allowed to proceed.
Definition: ffmpeg_sched.c:320
av_strdup
char * av_strdup(const char *s)
Duplicate a string.
Definition: mem.c:272
SchSyncQueue::frame
AVFrame * frame
Definition: ffmpeg_sched.c:102
Scheduler::task_failed
unsigned task_failed
Definition: ffmpeg_sched.c:286
SchTask::node
SchedulerNode node
Definition: ffmpeg_sched.c:65
sch_sq_add_enc
int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, int limiting, uint64_t max_frames)
Definition: ffmpeg_sched.c:906
print_sdp
int print_sdp(const char *filename)
Definition: ffmpeg_mux.c:507
mem.h
start_prepare
static int start_prepare(Scheduler *sch)
Definition: ffmpeg_sched.c:1583
sch_mux_sub_heartbeat_add
int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, unsigned dec_idx)
Definition: ffmpeg_sched.c:1256
SchedulerNode::idx
unsigned idx
Definition: ffmpeg_sched.h:105
sch_filter_receive_finish
void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
Called by filter tasks to signal that a filter input will no longer accept input.
Definition: ffmpeg_sched.c:2547
ffmpeg_sched.h
sch_add_dec_output
int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
Add another output to decoder (e.g.
Definition: ffmpeg_sched.c:737
SchEnc::src
SchedulerNode src
Definition: ffmpeg_sched.c:112
sch_wait
int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
Wait until transcoding terminates or the specified timeout elapses.
Definition: ffmpeg_sched.c:1765
AVPacket
This structure stores compressed data.
Definition: packet.h:565
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:96
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:35
src_filtergraph
static SchedulerNode src_filtergraph(const Scheduler *sch, SchedulerNode src)
Definition: ffmpeg_sched.c:1480
cmdutils.h
SchSyncQueue
Definition: ffmpeg_sched.c:100
SchMux::queue_size
unsigned queue_size
Definition: ffmpeg_sched.c:230
SchTask::parent
Scheduler * parent
Definition: ffmpeg_sched.c:64
SchDec::send_frame
AVFrame * send_frame
Definition: ffmpeg_sched.c:97
queue_alloc
static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size, enum QueueType type)
Definition: ffmpeg_sched.c:372
sch_start
int sch_start(Scheduler *sch)
Definition: ffmpeg_sched.c:1699
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:204
pthread_cond_timedwait
static av_always_inline int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
Definition: os2threads.h:170
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
sch_mux_sub_heartbeat
int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, const AVPacket *pkt)
Definition: ffmpeg_sched.c:2212
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
SchDecOutput::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:75
SchEnc::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:142
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
AVERROR_EXIT
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:58
SYNC_QUEUE_FRAMES
@ SYNC_QUEUE_FRAMES
Definition: sync_queue.h:30
sq_alloc
SyncQueue * sq_alloc(enum SyncQueueType type, int64_t buf_size_us, void *logctx)
Allocate a sync queue of the given type.
Definition: sync_queue.c:654
atomic_init
#define atomic_init(obj, value)
Definition: stdatomic.h:33
SchEnc::task
SchTask task
Definition: ffmpeg_sched.c:140
Timestamp
Definition: ffmpeg_utils.h:30
SchFilterIn::src
SchedulerNode src
Definition: ffmpeg_sched.c:236
sch_mux_stream_buffering
void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, size_t data_threshold, int max_packets)
Configure limits on packet buffering performed before the muxer task is started.
Definition: ffmpeg_sched.c:1215
check_acyclic
static int check_acyclic(Scheduler *sch)
Definition: ffmpeg_sched.c:1547
SchDemux::streams
SchDemuxStream * streams
Definition: ffmpeg_sched.c:159
PreMuxQueue
Definition: ffmpeg_sched.c:172
Scheduler::sdp_auto
int sdp_auto
Definition: ffmpeg_sched.c:304
src
#define src
Definition: vp8dsp.c:248
SchFilterIn::send_finished
int send_finished
Definition: ffmpeg_sched.c:237
SchFilterGraph::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:260
AVPacket::time_base
AVRational time_base
Time base of the packet's timestamps.
Definition: packet.h:632
unchoke_for_stream
static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
Definition: ffmpeg_sched.c:1324
AVPacket::side_data_elems
int side_data_elems
Definition: packet.h:600
tq_choke
void tq_choke(ThreadQueue *tq, int choked)
Prevent further reads from the thread queue until it is unchoked.
Definition: thread_queue.c:258
sch_mux_receive
int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
Called by muxer tasks to obtain packets for muxing.
Definition: ffmpeg_sched.c:2181
sch_add_sq_enc
int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
Add an pre-encoding sync queue to the scheduler.
Definition: ffmpeg_sched.c:881
SchDecOutput::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:77
SchEnc::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:115
tq_send_finish
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the sending side.
Definition: thread_queue.c:227
Scheduler::nb_filters
unsigned nb_filters
Definition: ffmpeg_sched.c:301