blob: 46d306fa5a7d8f1f424c3bd49d3543677459405e [file] [log] [blame]
fei.dengb9a1a572023-09-13 01:33:57 +00001/*
2 * Copyright (C) 2021 Amlogic Corporation.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#include <sys/time.h>
17#include <stdarg.h>
18#include <sys/types.h>
19#include <limits.h>
20#include <time.h>
21#include <sys/socket.h>
22#include <errno.h>
23#include <string.h>
24
25#include <unistd.h>
26#include "Times.h"
27#include "Poll.h"
28#include "Logger.h"
29
30#define TAG "Poll"
31
32namespace Tls {
33
34#define INCREASE_ACTIVE_FDS 8
35
36Poll::Poll(bool controllable)
37 : mControllable(controllable)
38{
39 mFds = NULL;
40 mFdsCnt = 0;
41 mFdsMaxCnt = 0;
42
43 mWaiting.store(0);
44 mControlPending.store(0);
45 mFlushing.store(0);
46
47 //init control sockets
48 int control_sock[2];
49
50 if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0) {
51 mControlReadFd = -1;
52 mControlWriteFd = -1;
53 } else {
54 mControlReadFd = control_sock[0];
55 mControlWriteFd = control_sock[1];
56 }
57 addFd(mControlReadFd);
58 setFdReadable(mControlReadFd,true);
59}
60
61Poll::~Poll()
62{
63 if (mControlWriteFd >= 0) {
64 close (mControlWriteFd);
65 mControlWriteFd = -1;
66 }
67
68 if (mControlReadFd >= 0) {
69 close (mControlReadFd);
70 mControlReadFd = -1;
71 }
72
73 if (mFds) {
74 free(mFds);
75 mFds = NULL;
76 }
77}
78
79int Poll::addFd(int fd)
80{
81 Tls::Mutex::Autolock _l(mMutex);
82 struct pollfd *pfd = findFd(fd);
83 if (pfd) {
84 return 0;
85 }
86
87 if ((mFdsCnt + 1) > mFdsMaxCnt) {
88 mFdsMaxCnt += INCREASE_ACTIVE_FDS;
89 mFds = (struct pollfd *)realloc(mFds,mFdsMaxCnt*sizeof(struct pollfd));
90 if (!mFds) {
91 ERROR(NO_CAT,"NO memory");
92 return -1;
93 }
94 }
95 mFds[mFdsCnt].fd = fd;
96 mFds[mFdsCnt].events = POLLERR | POLLNVAL | POLLHUP;
97 mFds[mFdsCnt].revents = 0;
98 mFdsCnt++;
99
100 //DEBUG("mFds:%p,maxcnt:%d,cnt:%d",mFds,mFdsMaxCnt,mFdsCnt);
101 return 0;
102}
103
104int Poll::removeFd(int fd)
105{
106 Tls::Mutex::Autolock _l(mMutex);
107 for (int i = 0; i < mFdsCnt; i++) {
108 struct pollfd *pfd = &mFds[i];
109 if (pfd->fd == fd) {
110 memmove(mFds+i*sizeof(struct pollfd),mFds+(i+1)*sizeof(struct pollfd), 1);
111 mFdsCnt--;
112 return 0;
113 }
114 }
115 return 0;
116}
117
118int Poll::setFdReadable(int fd, bool readable)
119{
120 Tls::Mutex::Autolock _l(mMutex);
121 struct pollfd * pfd = findFd(fd);
122 if (!pfd) {
123 return -1;
124 }
125 if (readable) {
126 pfd->events |= POLLIN | POLLPRI | POLLRDNORM;
127 } else {
128 pfd->events &= ~POLLIN;
129 }
130
131 return 0;
132}
133
134int Poll::setFdWritable(int fd, bool writable)
135{
136 Tls::Mutex::Autolock _l(mMutex);
137 struct pollfd * pfd = findFd(fd);
138 if (!pfd) {
139 return -1;
140 }
141 if (writable) {
142 pfd->events |= POLLOUT;
143 } else {
144 pfd->events &= ~POLLOUT;
145 }
146
147 return 0;
148}
149
150int Poll::wait(int64_t timeoutMs /*millisecond*/)
151{
152 int oldwaiting;
153 int activecnt = 0;
154
155 oldwaiting = mWaiting.load();
156
157 mWaiting.fetch_add(1);
158
159 if (oldwaiting > 0) { //had other thread waiting
160 goto tag_already_waiting;
161 }
162
163 if (mFlushing.load()) {
164 goto tag_flushing;
165 }
166
167 do {
168 int64_t t = -1; //nanosecond
169 if (timeoutMs >= 0) {
170 t = timeoutMs;
171 }
172 //DEBUG("waiting");
173 activecnt = poll(mFds, mFdsCnt, t);
174 //DEBUG("waiting end");
175 if (mFlushing.load()) {
176 goto tag_flushing;
177 }
178 } while(0);
179
180tag_success:
181 mWaiting.fetch_sub(1);
182 return activecnt;
183tag_already_waiting:
184 mWaiting.fetch_sub(1);
185 errno = EPERM;
186 return -1;
187tag_flushing:
188 mWaiting.fetch_sub(1);
189 errno = EBUSY;
190 return -1;
191}
192
193void Poll::setFlushing(bool flushing)
194{
195 /* update the new state first */
196 if (flushing) {
197 mFlushing.store(1);
198 } else {
199 mFlushing.store(0);
200 }
201
202 if (mFlushing.load() && mControllable && mWaiting.load() > 0) {
203 /* we are flushing, controllable and waiting, wake up the waiter. When we
204 * stop the flushing operation we don't clear the wakeup fd here, this will
205 * happen in the _wait() thread. */
206 raiseWakeup();
207 }
208}
209
210bool Poll::isReadable(int fd)
211{
212 for (int i = 0; i < mFdsCnt; i++) {
213 if (mFds[i].fd == fd && ((mFds[i].revents & (POLLIN|POLLRDNORM)) != 0)) {
214 return true;
215 }
216 }
217 return false;
218}
219
220bool Poll::isWritable(int fd)
221{
222 for (int i = 0; i < mFdsCnt; i++) {
223 if (mFds[i].fd == fd && ((mFds[i].revents & POLLOUT) != 0)) {
224 return true;
225 }
226 }
227 return false;
228}
229
230struct pollfd * Poll::findFd(int fd)
231{
232 for (int i = 0; i < mFdsCnt; i++) {
233 struct pollfd *pfd = &mFds[i];
234 if (pfd->fd == fd) {
235 return pfd;
236 }
237 }
238 return NULL;
239}
240
241bool Poll::wakeEvent()
242{
243 ssize_t num_written;
244 while ((num_written = write (mControlWriteFd, "W", 1)) != 1) {
245 if (num_written == -1 && errno != EAGAIN && errno != EINTR) {
246 ERROR(NO_CAT,"failed to wake event: %s", strerror (errno));
247 return false;
248 }
249 }
250 return true;
251}
252
253bool Poll::releaseEvent()
254{
255 char buf[1] = { '\0' };
256 ssize_t num_read;
257 while ((num_read = read (mControlReadFd, buf, 1)) != 1) {
258 if (num_read == -1 && errno != EAGAIN && errno != EINTR) {
259 ERROR(NO_CAT,"failed to release event: %s", strerror (errno));
260 return false;
261 }
262 }
263 return true;
264}
265bool Poll::raiseWakeup()
266{
267 bool result = true;
268
269 /* makes testing control_pending and WAKE_EVENT() atomic. */
270 Tls::Mutex::Autolock _l(mMutex);
271 //DEBUG("mControlPending:%d",mControlPending.load());
272 if (mControlPending.load() == 0) {
273 /* raise when nothing pending */
274 //GST_LOG ("%p: raise", set);
275 result = wakeEvent();
276 }
277
278 if (result) {
279 mControlPending.fetch_add(1);
280 }
281
282 return result;
283}
284
285bool Poll::releaseWakeup()
286{
287 bool result = false;
288
289 /* makes testing/modifying control_pending and RELEASE_EVENT() atomic. */
290 Tls::Mutex::Autolock _l(mMutex);
291
292 if (mControlPending.load() > 0) {
293 /* release, only if this was the last pending. */
294 if (mControlPending.load() == 1) {
295 //GST_LOG ("%p: release", set);
296 result = releaseEvent();
297 } else {
298 result = true;
299 }
300
301 if (result) {
302 mControlPending.fetch_sub(1);
303 }
304 } else {
305 errno = EWOULDBLOCK;
306 }
307
308 return result;
309}
310
311bool Poll::releaseAllWakeup()
312{
313 bool result = false;
314
315 /* makes testing/modifying control_pending and RELEASE_EVENT() atomic. */
316 Tls::Mutex::Autolock _l(mMutex);
317
318 if (mControlPending.load() > 0) {
319 //GST_LOG ("%p: release", set);
320 result = releaseEvent();
321 if (result) {
322 mControlPending.store(0);
323 }
324 } else {
325 errno = EWOULDBLOCK;
326 }
327
328 return result;
329}
330
331}