FFmpeg
thread_queue.c
Go to the documentation of this file.
1 /*
2  * This file is part of FFmpeg.
3  *
4  * FFmpeg is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * FFmpeg is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with FFmpeg; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 #include <stdint.h>
20 #include <string.h>
21 
22 #include "libavutil/avassert.h"
24 #include "libavutil/error.h"
25 #include "libavutil/fifo.h"
26 #include "libavutil/frame.h"
27 #include "libavutil/intreadwrite.h"
28 #include "libavutil/mem.h"
29 #include "libavutil/thread.h"
30 
31 #include "libavcodec/packet.h"
32 
33 #include "thread_queue.h"
34 
35 enum {
36  FINISHED_SEND = (1 << 0),
37  FINISHED_RECV = (1 << 1),
38 };
39 
40 struct ThreadQueue {
41  int choked;
42  int *finished;
43  unsigned int nb_streams;
44 
46 
49 
52 };
53 
54 void tq_free(ThreadQueue **ptq)
55 {
56  ThreadQueue *tq = *ptq;
57 
58  if (!tq)
59  return;
60 
63 
64  av_freep(&tq->finished);
65 
68 
69  av_freep(ptq);
70 }
71 
72 ThreadQueue *tq_alloc(unsigned int nb_streams, size_t queue_size,
73  enum ThreadQueueType type)
74 {
75  ThreadQueue *tq;
76  int ret;
77 
78  tq = av_mallocz(sizeof(*tq));
79  if (!tq)
80  return NULL;
81 
82  ret = pthread_cond_init(&tq->cond, NULL);
83  if (ret) {
84  av_freep(&tq);
85  return NULL;
86  }
87 
89  if (ret) {
91  av_freep(&tq);
92  return NULL;
93  }
94 
95  tq->finished = av_calloc(nb_streams, sizeof(*tq->finished));
96  if (!tq->finished)
97  goto fail;
98  tq->nb_streams = nb_streams;
99 
100  tq->type = type;
101 
102  tq->fifo = (type == THREAD_QUEUE_FRAMES) ?
104  if (!tq->fifo)
105  goto fail;
106 
107  tq->fifo_stream_index = av_fifo_alloc2(queue_size, sizeof(unsigned), 0);
108  if (!tq->fifo_stream_index)
109  goto fail;
110 
111  return tq;
112 fail:
113  tq_free(&tq);
114  return NULL;
115 }
116 
117 int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
118 {
119  int *finished;
120  int ret;
121 
122  av_assert0(stream_idx < tq->nb_streams);
123  finished = &tq->finished[stream_idx];
124 
125  pthread_mutex_lock(&tq->lock);
126 
127  if (*finished & FINISHED_SEND) {
128  ret = AVERROR(EINVAL);
129  goto finish;
130  }
131 
132  while (!(*finished & FINISHED_RECV) && !av_fifo_can_write(tq->fifo_stream_index))
133  pthread_cond_wait(&tq->cond, &tq->lock);
134 
135  if (*finished & FINISHED_RECV) {
136  ret = AVERROR_EOF;
137  *finished |= FINISHED_SEND;
138  } else {
139  ret = av_fifo_write(tq->fifo_stream_index, &stream_idx, 1);
140  if (ret < 0)
141  goto finish;
142 
144  if (ret < 0)
145  goto finish;
146 
148  }
149 
150 finish:
152 
153  return ret;
154 }
155 
156 static int receive_locked(ThreadQueue *tq, int *stream_idx,
157  void *data)
158 {
159  unsigned int nb_finished = 0;
160 
161  if (tq->choked)
162  return AVERROR(EAGAIN);
163 
164  while (av_container_fifo_read(tq->fifo, data, 0) >= 0) {
165  unsigned idx;
166  int ret;
167 
168  ret = av_fifo_read(tq->fifo_stream_index, &idx, 1);
169  av_assert0(ret >= 0);
170  if (tq->finished[idx] & FINISHED_RECV) {
171  (tq->type == THREAD_QUEUE_FRAMES) ?
173  continue;
174  }
175 
176  *stream_idx = idx;
177  return 0;
178  }
179 
180  for (unsigned int i = 0; i < tq->nb_streams; i++) {
181  if (!tq->finished[i])
182  continue;
183 
184  /* return EOF to the consumer at most once for each stream */
185  if (!(tq->finished[i] & FINISHED_RECV)) {
186  tq->finished[i] |= FINISHED_RECV;
187  *stream_idx = i;
188  return AVERROR_EOF;
189  }
190 
191  nb_finished++;
192  }
193 
194  return nb_finished == tq->nb_streams ? AVERROR_EOF : AVERROR(EAGAIN);
195 }
196 
197 int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
198 {
199  int ret;
200 
201  *stream_idx = -1;
202 
203  pthread_mutex_lock(&tq->lock);
204 
205  while (1) {
206  size_t can_read = av_container_fifo_can_read(tq->fifo);
207 
208  ret = receive_locked(tq, stream_idx, data);
209 
210  // signal other threads if the fifo state changed
211  if (can_read != av_container_fifo_can_read(tq->fifo))
213 
214  if (ret == AVERROR(EAGAIN)) {
215  pthread_cond_wait(&tq->cond, &tq->lock);
216  continue;
217  }
218 
219  break;
220  }
221 
223 
224  return ret;
225 }
226 
227 void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
228 {
229  av_assert0(stream_idx < tq->nb_streams);
230 
231  pthread_mutex_lock(&tq->lock);
232 
233  /* mark the stream as send-finished;
234  * next time the consumer thread tries to read this stream it will get
235  * an EOF and recv-finished flag will be set */
236  tq->finished[stream_idx] |= FINISHED_SEND;
237  tq->choked = 0;
239 
241 }
242 
243 void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
244 {
245  av_assert0(stream_idx < tq->nb_streams);
246 
247  pthread_mutex_lock(&tq->lock);
248 
249  /* mark the stream as recv-finished;
250  * next time the producer thread tries to send for this stream, it will
251  * get an EOF and send-finished flag will be set */
252  tq->finished[stream_idx] |= FINISHED_RECV;
254 
256 }
257 
258 void tq_choke(ThreadQueue *tq, int choked)
259 {
260  pthread_mutex_lock(&tq->lock);
261 
262  int prev_choked = tq->choked;
263  tq->choked = choked;
264  if (choked != prev_choked)
266 
268 }
ThreadQueue::choked
int choked
Definition: thread_queue.c:41
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
av_packet_unref
void av_packet_unref(AVPacket *pkt)
Wipe the packet.
Definition: packet.c:433
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
av_container_fifo_write
int av_container_fifo_write(AVContainerFifo *cf, void *obj, unsigned flags)
Write the contents of obj to the FIFO.
Definition: container_fifo.c:162
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
av_container_fifo_alloc_avframe
AVContainerFifo * av_container_fifo_alloc_avframe(unsigned flags)
Allocate an AVContainerFifo instance for AVFrames.
Definition: container_fifo.c:215
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
FINISHED_RECV
@ FINISHED_RECV
Definition: thread_queue.c:37
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
ThreadQueue::lock
pthread_mutex_t lock
Definition: thread_queue.c:50
container_fifo.h
pthread_mutex_lock
static av_always_inline int pthread_mutex_lock(pthread_mutex_t *mutex)
Definition: os2threads.h:119
ThreadQueue::cond
pthread_cond_t cond
Definition: thread_queue.c:51
data
const char data[16]
Definition: mxf.c:149
tq_alloc
ThreadQueue * tq_alloc(unsigned int nb_streams, size_t queue_size, enum ThreadQueueType type)
Allocate a queue for sending data between threads.
Definition: thread_queue.c:72
fifo.h
finish
static void finish(void)
Definition: movenc.c:374
fail
#define fail()
Definition: checkasm.h:204
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
ThreadQueue::type
enum ThreadQueueType type
Definition: thread_queue.c:45
type
it s the only field you need to keep assuming you have a context There is some magic you don t need to care about around this just let it vf type
Definition: writing_filters.txt:86
ThreadQueue::fifo
AVContainerFifo * fifo
Definition: thread_queue.c:47
avassert.h
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
FINISHED_SEND
@ FINISHED_SEND
Definition: thread_queue.c:36
intreadwrite.h
pthread_mutex_unlock
static av_always_inline int pthread_mutex_unlock(pthread_mutex_t *mutex)
Definition: os2threads.h:126
receive_locked
static int receive_locked(ThreadQueue *tq, int *stream_idx, void *data)
Definition: thread_queue.c:156
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:41
nb_streams
static int nb_streams
Definition: ffprobe.c:340
av_container_fifo_read
int av_container_fifo_read(AVContainerFifo *cf, void *obj, unsigned flags)
Read the next available object from the FIFO into obj.
Definition: container_fifo.c:122
AVContainerFifo
AVContainerFifo is a FIFO for "containers" - dynamically allocated reusable structs (e....
Definition: container_fifo.c:27
pthread_cond_broadcast
static av_always_inline int pthread_cond_broadcast(pthread_cond_t *cond)
Definition: os2threads.h:162
tq_free
void tq_free(ThreadQueue **ptq)
Definition: thread_queue.c:54
NULL
#define NULL
Definition: coverity.c:32
ThreadQueue::finished
int * finished
Definition: thread_queue.c:42
ThreadQueueType
ThreadQueueType
Definition: thread_queue.h:24
tq_receive_finish
void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the receiving side.
Definition: thread_queue.c:243
ThreadQueue::fifo_stream_index
AVFifo * fifo_stream_index
Definition: thread_queue.c:48
error.h
tq_send
int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
Send an item for the given stream to the queue.
Definition: thread_queue.c:117
AVFifo
Definition: fifo.c:35
frame.h
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
packet.h
THREAD_QUEUE_FRAMES
@ THREAD_QUEUE_FRAMES
Definition: thread_queue.h:25
av_frame_unref
void av_frame_unref(AVFrame *frame)
Unreference all the buffers referenced by frame and reset the frame fields.
Definition: frame.c:496
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:256
pthread_cond_t
Definition: os2threads.h:58
tq_receive
int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
Read the next item from the queue.
Definition: thread_queue.c:197
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
ret
ret
Definition: filter_design.txt:187
av_container_fifo_free
void av_container_fifo_free(AVContainerFifo **pcf)
Free a AVContainerFifo and everything in it.
Definition: container_fifo.c:101
ThreadQueue
Definition: thread_queue.c:40
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
thread_queue.h
av_container_fifo_can_read
size_t av_container_fifo_can_read(const AVContainerFifo *cf)
Definition: container_fifo.c:185
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
mem.h
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:35
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
tq_choke
void tq_choke(ThreadQueue *tq, int choked)
Prevent further reads from the thread queue until it is unchoked.
Definition: thread_queue.c:258
av_container_fifo_alloc_avpacket
AVContainerFifo * av_container_fifo_alloc_avpacket(unsigned flags)
Allocate an AVContainerFifo instance for AVPacket.
Definition: packet.c:795
tq_send_finish
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the sending side.
Definition: thread_queue.c:227
ThreadQueue::nb_streams
unsigned int nb_streams
Definition: thread_queue.c:43