blob: 3bab65c3c5437f96c8aeba29102bb809869e95d1 [file] [log] [blame]
fei.dengb9a1a572023-09-13 01:33:57 +00001/*
2 * Copyright (C) 2021 Amlogic Corporation.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#include <sys/time.h>
17#include <stdarg.h>
18#include <sys/types.h>
19#include <limits.h>
20#include <time.h>
21#include <sys/socket.h>
22#include <errno.h>
23#include <string.h>
24
25#include <unistd.h>
26#include "Times.h"
27#include "Poll.h"
28#include "Logger.h"
29
30#define TAG "Poll"
31
32namespace Tls {
33
34#define INCREASE_ACTIVE_FDS 8
35
36Poll::Poll(bool controllable)
37 : mControllable(controllable)
38{
39 mFds = NULL;
40 mFdsCnt = 0;
41 mFdsMaxCnt = 0;
42
43 mWaiting.store(0);
44 mControlPending.store(0);
45 mFlushing.store(0);
46
47 //init control sockets
48 int control_sock[2];
49
50 if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0) {
51 mControlReadFd = -1;
52 mControlWriteFd = -1;
53 } else {
54 mControlReadFd = control_sock[0];
55 mControlWriteFd = control_sock[1];
56 }
57 addFd(mControlReadFd);
58 setFdReadable(mControlReadFd,true);
59}
60
61Poll::~Poll()
62{
63 if (mControlWriteFd >= 0) {
64 close (mControlWriteFd);
65 mControlWriteFd = -1;
66 }
67
68 if (mControlReadFd >= 0) {
69 close (mControlReadFd);
70 mControlReadFd = -1;
71 }
72
73 if (mFds) {
74 free(mFds);
75 mFds = NULL;
76 }
77}
78
79int Poll::addFd(int fd)
80{
81 Tls::Mutex::Autolock _l(mMutex);
82 struct pollfd *pfd = findFd(fd);
83 if (pfd) {
84 return 0;
85 }
86
87 if ((mFdsCnt + 1) > mFdsMaxCnt) {
88 mFdsMaxCnt += INCREASE_ACTIVE_FDS;
89 mFds = (struct pollfd *)realloc(mFds,mFdsMaxCnt*sizeof(struct pollfd));
90 if (!mFds) {
91 ERROR(NO_CAT,"NO memory");
92 return -1;
93 }
94 }
95 mFds[mFdsCnt].fd = fd;
96 mFds[mFdsCnt].events = POLLERR | POLLNVAL | POLLHUP;
97 mFds[mFdsCnt].revents = 0;
98 mFdsCnt++;
99
fei.dengdd910ef2024-06-07 10:25:30 +0800100 // DEBUG(NO_CAT,"fd:%d,maxcnt:%d,cnt:%d",fd,mFdsMaxCnt,mFdsCnt);
101 // for (int i = 0; i < mFdsCnt; i++) {
102 // DEBUG(NO_CAT,"mFds[%d]:%d",i,mFds[i]);
103 // }
fei.dengb9a1a572023-09-13 01:33:57 +0000104 return 0;
105}
106
107int Poll::removeFd(int fd)
108{
109 Tls::Mutex::Autolock _l(mMutex);
110 for (int i = 0; i < mFdsCnt; i++) {
111 struct pollfd *pfd = &mFds[i];
112 if (pfd->fd == fd) {
fei.dengdd910ef2024-06-07 10:25:30 +0800113 if (i == (mFdsCnt - 1)) {
114 mFds[i].fd = -1;
115 mFds[i].events = 0;
116 mFds[i].revents = 0;
117 } else {
118 for (int j = i; j+1 < mFdsCnt; j++) {
119 mFds[j].fd = mFds[j+1].fd;
120 mFds[j].events = mFds[j+1].events;
121 mFds[j].revents = mFds[j+1].revents;
122 }
123 }
124 mFdsCnt -= 1;
fei.dengb9a1a572023-09-13 01:33:57 +0000125 }
126 }
fei.dengdd910ef2024-06-07 10:25:30 +0800127 // DEBUG(NO_CAT,"fd:%d,maxcnt:%d,cnt:%d",fd,mFdsMaxCnt,mFdsCnt);
128 // for (int i = 0; i < mFdsCnt; i++) {
129 // DEBUG(NO_CAT,"mFds[%d]:%d",i,mFds[i]);
130 // }
fei.dengb9a1a572023-09-13 01:33:57 +0000131 return 0;
132}
133
134int Poll::setFdReadable(int fd, bool readable)
135{
136 Tls::Mutex::Autolock _l(mMutex);
137 struct pollfd * pfd = findFd(fd);
138 if (!pfd) {
139 return -1;
140 }
141 if (readable) {
142 pfd->events |= POLLIN | POLLPRI | POLLRDNORM;
143 } else {
144 pfd->events &= ~POLLIN;
145 }
146
147 return 0;
148}
149
150int Poll::setFdWritable(int fd, bool writable)
151{
152 Tls::Mutex::Autolock _l(mMutex);
153 struct pollfd * pfd = findFd(fd);
154 if (!pfd) {
155 return -1;
156 }
157 if (writable) {
158 pfd->events |= POLLOUT;
159 } else {
160 pfd->events &= ~POLLOUT;
161 }
162
163 return 0;
164}
165
166int Poll::wait(int64_t timeoutMs /*millisecond*/)
167{
168 int oldwaiting;
169 int activecnt = 0;
170
171 oldwaiting = mWaiting.load();
172
173 mWaiting.fetch_add(1);
174
175 if (oldwaiting > 0) { //had other thread waiting
176 goto tag_already_waiting;
177 }
178
179 if (mFlushing.load()) {
180 goto tag_flushing;
181 }
182
183 do {
184 int64_t t = -1; //nanosecond
185 if (timeoutMs >= 0) {
186 t = timeoutMs;
187 }
fei.dengb9a1a572023-09-13 01:33:57 +0000188 activecnt = poll(mFds, mFdsCnt, t);
189 //DEBUG("waiting end");
190 if (mFlushing.load()) {
fei.dengdd910ef2024-06-07 10:25:30 +0800191 releaseWakeup();
fei.dengb9a1a572023-09-13 01:33:57 +0000192 goto tag_flushing;
193 }
194 } while(0);
195
196tag_success:
197 mWaiting.fetch_sub(1);
198 return activecnt;
199tag_already_waiting:
200 mWaiting.fetch_sub(1);
201 errno = EPERM;
202 return -1;
203tag_flushing:
204 mWaiting.fetch_sub(1);
205 errno = EBUSY;
206 return -1;
207}
208
209void Poll::setFlushing(bool flushing)
210{
211 /* update the new state first */
212 if (flushing) {
213 mFlushing.store(1);
214 } else {
215 mFlushing.store(0);
216 }
217
218 if (mFlushing.load() && mControllable && mWaiting.load() > 0) {
219 /* we are flushing, controllable and waiting, wake up the waiter. When we
220 * stop the flushing operation we don't clear the wakeup fd here, this will
221 * happen in the _wait() thread. */
222 raiseWakeup();
223 }
224}
225
226bool Poll::isReadable(int fd)
227{
228 for (int i = 0; i < mFdsCnt; i++) {
229 if (mFds[i].fd == fd && ((mFds[i].revents & (POLLIN|POLLRDNORM)) != 0)) {
230 return true;
231 }
232 }
233 return false;
234}
235
236bool Poll::isWritable(int fd)
237{
238 for (int i = 0; i < mFdsCnt; i++) {
239 if (mFds[i].fd == fd && ((mFds[i].revents & POLLOUT) != 0)) {
240 return true;
241 }
242 }
243 return false;
244}
245
246struct pollfd * Poll::findFd(int fd)
247{
248 for (int i = 0; i < mFdsCnt; i++) {
249 struct pollfd *pfd = &mFds[i];
250 if (pfd->fd == fd) {
251 return pfd;
252 }
253 }
254 return NULL;
255}
256
257bool Poll::wakeEvent()
258{
259 ssize_t num_written;
260 while ((num_written = write (mControlWriteFd, "W", 1)) != 1) {
261 if (num_written == -1 && errno != EAGAIN && errno != EINTR) {
262 ERROR(NO_CAT,"failed to wake event: %s", strerror (errno));
263 return false;
264 }
265 }
266 return true;
267}
268
269bool Poll::releaseEvent()
270{
271 char buf[1] = { '\0' };
272 ssize_t num_read;
273 while ((num_read = read (mControlReadFd, buf, 1)) != 1) {
274 if (num_read == -1 && errno != EAGAIN && errno != EINTR) {
275 ERROR(NO_CAT,"failed to release event: %s", strerror (errno));
276 return false;
277 }
278 }
279 return true;
280}
281bool Poll::raiseWakeup()
282{
283 bool result = true;
284
285 /* makes testing control_pending and WAKE_EVENT() atomic. */
286 Tls::Mutex::Autolock _l(mMutex);
287 //DEBUG("mControlPending:%d",mControlPending.load());
288 if (mControlPending.load() == 0) {
289 /* raise when nothing pending */
290 //GST_LOG ("%p: raise", set);
291 result = wakeEvent();
292 }
293
294 if (result) {
295 mControlPending.fetch_add(1);
296 }
297
298 return result;
299}
300
301bool Poll::releaseWakeup()
302{
303 bool result = false;
304
305 /* makes testing/modifying control_pending and RELEASE_EVENT() atomic. */
306 Tls::Mutex::Autolock _l(mMutex);
307
308 if (mControlPending.load() > 0) {
309 /* release, only if this was the last pending. */
310 if (mControlPending.load() == 1) {
311 //GST_LOG ("%p: release", set);
312 result = releaseEvent();
313 } else {
314 result = true;
315 }
316
317 if (result) {
318 mControlPending.fetch_sub(1);
319 }
320 } else {
321 errno = EWOULDBLOCK;
322 }
323
324 return result;
325}
326
327bool Poll::releaseAllWakeup()
328{
329 bool result = false;
330
331 /* makes testing/modifying control_pending and RELEASE_EVENT() atomic. */
332 Tls::Mutex::Autolock _l(mMutex);
333
334 if (mControlPending.load() > 0) {
335 //GST_LOG ("%p: release", set);
336 result = releaseEvent();
337 if (result) {
338 mControlPending.store(0);
339 }
340 } else {
341 errno = EWOULDBLOCK;
342 }
343
344 return result;
345}
346
347}