libassa 3.5.1
Loading...
Searching...
No Matches
Reactor.cpp
Go to the documentation of this file.
1// -*- c++ -*-
2//------------------------------------------------------------------------------
3// Reactor.cpp
4//------------------------------------------------------------------------------
5// Copyright (C) 1997-2002,2005-2007 Vladislav Grinchenko
6//
7// This library is free software; you can redistribute it and/or
8// modify it under the terms of the GNU Library General Public
9// License as published by the Free Software Foundation; either
10// version 2 of the License, or (at your option) any later version.
11//-----------------------------------------------------------------------------
12// Created: 05/25/1999
13//-----------------------------------------------------------------------------
14#include <iostream>
15#include <sstream>
16#include <string>
17
18#include "assa/Reactor.h"
19#include "assa/Logger.h"
20
21using namespace ASSA;
22
24Reactor () :
25 m_fd_setsize (1024),
26 m_maxfd_plus1 (0),
27 m_active (true)
28{
29 trace_with_mask("Reactor::Reactor",REACTTRACE);
30
34#if defined(WIN32)
36
37#else // POSIX
38 struct rlimit rlim;
39 rlim.rlim_max = 0;
40
41 if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
42 m_fd_setsize = rlim.rlim_cur;
43 }
44#endif
45
48#if defined (WIN32)
50 WSAStartup (MAKEWORD (2, 2), &data);
51#endif
52}
53
56{
57 trace_with_mask("Reactor::~Reactor",REACTTRACE);
58
59 m_readSet.clear ();
60 m_writeSet.clear ();
61 m_exceptSet.clear ();
62 deactivate ();
63}
64
68 const TimeVal& timeout_,
69 const std::string& name_)
70{
71 trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE);
73
76
77 DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n",
78 timeout_.sec(),timeout_.msec()));
79 DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() ));
80 DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() ));
81
83
84 DL((REACT,"---Modified Timer Queue----\n"));
85 m_tqueue.dump();
86 DL((REACT,"---------------------------\n"));
87
88 return (tid);
89}
90
91bool
94{
95 trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE);
96
97 std::ostringstream msg;
99
100 if (isReadEvent (et_))
101 {
102 if (!m_waitSet.m_rset.setFd (fd_))
103 {
104 DL((ASSAERR,"readset: fd %d out of range\n", fd_));
105 return (false);
106 }
107 m_readSet[fd_] = eh_;
108 msg << "READ_EVENT";
109 }
110
111 if (isWriteEvent (et_))
112 {
113 if (!m_waitSet.m_wset.setFd (fd_))
114 {
115 DL((ASSAERR,"writeset: fd %d out of range\n", fd_));
116 return (false);
117 }
118 m_writeSet[fd_] = eh_;
119 msg << " WRITE_EVENT";
120 }
121
122 if (isExceptEvent (et_))
123 {
124 if (!m_waitSet.m_eset.setFd (fd_))
125 {
126 DL((ASSAERR,"exceptset: fd %d out of range\n", fd_));
127 return (false);
128 }
130 msg << " EXCEPT_EVENT";
131 }
132 msg << std::ends;
133
134 DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n",
135 eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () ));
136
137#if !defined (WIN32)
138 if (m_maxfd_plus1 < fd_+1) {
139 m_maxfd_plus1 = fd_+1;
140 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
141 }
142#endif
143
144 DL((REACT,"Modified waitSet:\n"));
145 m_waitSet.dump ();
146
147 return (true);
148}
149
150bool
153{
154 trace_with_mask("Reactor::removeTimer",REACTTRACE);
155 bool ret;
156
157 if ((ret = m_tqueue.remove (tid_))) {
158 DL((REACT,"---Modified Timer Queue----\n"));
159 m_tqueue.dump();
160 DL((REACT,"---------------------------\n"));
161 }
162 else {
163 EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ ));
164 }
165 return (ret);
166}
167
171bool
174{
175 trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE);
176
177 bool ret = false;
178 handler_t fd;
180
181 if (eh_ == NULL) {
182 return false;
183 }
184
185 if (isTimeoutEvent (event_)) {
187 ret = true;
188 }
189
190 if (isReadEvent (event_)) {
191 iter = m_readSet.begin ();
192 while (iter != m_readSet.end ()) {
193 if ((*iter).second == eh_) {
194 fd = (*iter).first;
195 m_readSet.erase (iter);
197 ret = true;
198 break;
199 }
200 iter++;
201 }
202 }
203
204 if (isWriteEvent (event_)) {
205 iter = m_writeSet.begin ();
206 while (iter != m_writeSet.end ()) {
207 if ((*iter).second == eh_) {
208 fd = (*iter).first;
209 m_writeSet.erase (iter);
211 ret = true;
212 break;
213 }
214 iter++;
215 }
216 }
217
218 if (isExceptEvent (event_)) {
219 iter = m_exceptSet.begin ();
220 while (iter != m_exceptSet.end ()) {
221 if ((*iter).second == eh_) {
222 fd = (*iter).first;
223 m_exceptSet.erase (iter);
225 ret = true;
226 break;
227 }
228 iter++;
229 }
230 }
231
232 if (ret == true) {
233 DL((REACT,"Found EvtH \"%s\"(%p)\n", eh_->get_id ().c_str (), eh_));
234 eh_->handle_close (fd);
235 }
236
237 adjust_maxfdp1 (fd);
238
239 DL((REACT,"Modifies waitSet:\n"));
240 m_waitSet.dump ();
241
242 return (ret);
243}
244
245bool
248{
249 trace_with_mask("Reactor::removeIOHandler",REACTTRACE);
250
251 bool ret = false;
254
256
257 DL((REACT,"Removing handler for fd=%d\n",fd_));
258
263 if ((iter = m_readSet.find (fd_)) != m_readSet.end ())
264 {
265 ehp = (*iter).second;
266 m_readSet.erase (iter);
269 if (m_readSet.size () > 0) {
270 iter = m_readSet.end ();
271 iter--;
272 }
273 ret = true;
274 }
275
276 if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ())
277 {
278 ehp = (*iter).second;
279 m_writeSet.erase (iter);
282 if (m_writeSet.size () > 0) {
283 iter = m_writeSet.end ();
284 iter--;
285 }
286 ret = true;
287 }
288
289 if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ())
290 {
291 ehp = (*iter).second;
292 m_exceptSet.erase (iter);
295 if (m_exceptSet.size () > 0) {
296 iter = m_exceptSet.end ();
297 iter--;
298 }
299 ret = true;
300 }
301
302 if (ret == true && ehp != NULL) {
303 DL((REACT,"Removed EvtH \"%s\"(%p)\n", ehp->get_id ().c_str (), ehp));
304 ehp->handle_close (fd_);
305 }
306
308
309 DL((REACT,"Modifies waitSet:\n"));
310 m_waitSet.dump ();
311
312 return (ret);
313}
314
315bool
317checkFDs (void)
318{
319 trace_with_mask("Reactor::checkFDs",REACTTRACE);
320
321 bool num_removed = false;
322 FdSet mask;
323 timeval poll = { 0, 0 };
324
325 for (handler_t fd = 0; fd < m_fd_setsize; fd++) {
326 if ( m_readSet[fd] != NULL ) {
327 mask.setFd (fd);
328 if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
329 removeIOHandler (fd);
330 num_removed = true;
331 DL((REACT,"Detected BAD FD: %d\n", fd ));
332 }
333 mask.clear (fd);
334 }
335 }
336 return (num_removed);
337}
338
339bool
341handleError (void)
342{
343 trace_with_mask("Reactor::handleError",REACTTRACE);
344
347 if ( !m_active ) {
348 DL((REACT,"Received cmd to stop Reactor\n"));
349 return (false);
350 }
351
352 /*---
353 TODO: If select(2) returns before time expires, with
354 a descriptor ready or with EINTR, timeval is not
355 going to be updated with number of seconds remaining.
356 This is true for all systems except Linux, which will
357 do so. Therefore, to restart correctly in case of
358 EINTR, we ought to take time measurement before and
359 after select, and try to select() for remaining time.
360
361 For now, we restart with the initial timing value.
362 ---*/
363 /*---
364 BSD kernel never restarts select(2). SVR4 will restart if
365 the SA_RESTART flag is specified when the signal handler
366 for the signal delivered is installed. This means taht for
367 portability, we must handle signal interrupts.
368 ---*/
369
370 if ( errno == EINTR ) {
371 EL((REACT,"EINTR: interrupted select(2)\n"));
372 /*
373 If I was sitting in select(2) and received SIGTERM,
374 the signal handler would have set m_active to 'false',
375 and this function would have returned 'false' as above.
376 For any other non-critical signals (USR1,...),
377 we retry select.
378 */
379 return (true);
380 }
381 /*
382 EBADF - bad file number. One of the file descriptors does
383 not reference an open file to open(), close(), ioctl().
384 This can happen if user closed fd and forgot to remove
385 handler from Reactor.
386 */
387 if ( errno == EBADF ) {
388 DL((REACT,"EBADF: bad file descriptor\n"));
389 return (checkFDs ());
390 }
391 /*
392 Any other error from select
393 */
394#if defined (WIN32)
395 DL ((REACT,"select(3) error = %d\n", WSAGetLastError()));
396#else
397 EL((ASSAERR,"select(3) error\n"));
398#endif
399 return (false);
400}
401
402int
404isAnyReady (void)
405{
406 trace_with_mask("Reactor::isAnyReady",REACTTRACE);
407
408 int n = m_readySet.m_rset.numSet () +
411
412 if ( n > 0 ) {
413 DL((REACT,"m_readySet: %d FDs are ready for processing\n", n));
414 m_readySet.dump ();
415 }
416 return (n);
417}
418
419void
422{
423 trace_with_mask("Reactor::calculateTimeout",REACTTRACE);
424
425 TimeVal now;
426 TimeVal tv;
427
428 if (m_tqueue.isEmpty () ) {
430 goto done;
431 }
433 tv = m_tqueue.top ();
434
435 if (tv < now) {
436 /*---
437 It took too long to get here (fraction of a millisecond),
438 and top timer had already expired. In this case,
439 perform non-blocking select in order to drain the timer queue.
440 ---*/
441 *howlong_ = 0;
442 }
443 else {
444 DL((REACT,"--------- Timer Queue ----------\n"));
445 m_tqueue.dump();
446 DL((REACT,"--------------------------------\n"));
447
448 if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) {
449 *howlong_ = tv - now;
450 }
451 else {
452 *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
453 }
454 }
455
456 done:
457 if (howlong_ != NULL) {
458 DL((REACT,"delay (%f)\n", double (*howlong_) ));
459 }
460 else {
461 DL((REACT,"delay (forever)\n"));
462 }
463}
464
468void
470waitForEvents (void)
471{
472 while ( m_active ) {
474 }
475}
476
493void
496{
497 trace_with_mask("Reactor::waitForEvents",REACTTRACE);
498
500 DL((REACT,"======================================\n"));
501
502 /*--- Expire all stale Timers ---*/
504
505 /* Test to see if Reactor has been deactivated as a result
506 * of processing done by any TimerHandlers.
507 */
508 if (!m_active) {
509 return;
510 }
511
512 int nReady;
514 TimeVal* dlp = &delay;
515
516 /*---
517 In case if not all data have been processed by the EventHandler,
518 and EventHandler stated so in its callback's return value
519 to dispatcher (), it will be called again. This way
520 underlying file/socket stream can efficiently utilize its
521 buffering mechaninsm.
522 ---*/
523 if ((nReady = isAnyReady ())) {
524 DL((REACT,"isAnyReady returned: %d\n",nReady));
526 return;
527 }
528
529 DL((REACT,"=== m_waitSet ===\n"));
530 m_waitSet.dump ();
531
532 do {
533 m_readySet.reset ();
534 DL ((REACT,"m_readySet after reset():\n"));
535 m_readySet.dump ();
536
538 DL ((REACT,"m_readySet after assign:\n"));
539 m_readySet.dump ();
540
542
547 dlp);
548 DL((REACT,"::select() returned: %d\n",nReady));
549
550 m_readySet.sync ();
551 DL ((REACT,"m_readySet after select:\n"));
552 m_readySet.dump ();
553
554 }
555 while (nReady < 0 && handleError ());
556
558}
559
566void
569{
570 trace_with_mask("Reactor::dispatchHandler",REACTTRACE);
571
572 int ret = 0;
573 handler_t fd;
575 std::string eh_id;
576
577 Fd2Eh_Map_Iter iter = fdSet_.begin ();
578
579 while (iter != fdSet_.end ())
580 {
581 fd = (*iter).first;
582 ehp = (*iter).second;
583
584 if (mask_.isSet (fd) && ehp != NULL)
585 {
586 eh_id = ehp->get_id ();
587 DL((REACT,"Data detected from \"%s\"(fd=%d)\n",
588 eh_id.c_str (), fd));
589
590 ret = (ehp->*callback_) (fd); /* Fire up a callback */
591
592 if (ret == -1) {
593 removeIOHandler (fd);
594 }
595 else if (ret > 0) {
596 DL((REACT,"%d bytes pending on fd=%d \"%s\"\n",
597 ret, fd, eh_id.c_str ()));
598 //return; <-- would starve other connections
599 }
600 else {
601 DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n",
602 eh_id.c_str (), fd));
603 mask_.clear (fd);
604 }
611 iter = fdSet_.begin ();
612 }
613 else {
614 iter++;
615 }
616 }
617}
618
624bool
626dispatch (int ready_)
627{
628 trace_with_mask("Reactor::dispatch", REACTTRACE);
629
631
632 if ( ready_ < 0 )
633 {
634#if !defined (WIN32)
635 EL((ASSAERR,"::select(3) error\n"));
636#endif
637 return (false);
638 }
639 if ( ready_ == 0 ) {
640 return (true);
641 }
642
643 DL((REACT,"Dispatching %d FDs.\n",ready_));
644 DL((REACT,"m_readySet:\n"));
645 m_readySet.dump ();
646
647 /*--- Writes first ---*/
651
652 /*--- Exceptions next ---*/
656
657 /*--- Finally, the Reads ---*/
659 m_readSet,
661
662 return (true);
663}
664
665void
667stopReactor (void)
668{
669 trace_with_mask("Reactor::stopReactor", REACTTRACE);
670
671 m_active = false;
672
675
676 while (m_readSet.size () > 0) {
677 iter = m_readSet.begin ();
678 ehp = (*iter).second;
680 }
681
682 while (m_writeSet.size () > 0) {
683 iter = m_writeSet.begin ();
684 ehp = (*iter).second;
686 }
687
688 while (m_exceptSet.size () > 0) {
689 iter = m_exceptSet.begin ();
690 ehp = (*iter).second;
692 }
693}
694
699void
702{
703#if !defined (WIN32) /* POSIX */
704
705 trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE);
706
707 if (m_maxfd_plus1 == fd_ + 1)
708 {
710 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
711 }
712#endif
713}
#define Assure_return(exp_)
Test condition and return bool from a function if assertion fails.
Definition Assure.h:64
An abstraction to message logging facility.
#define EL(X)
A macro for writing error message to the Logger.
Definition Logger.h:285
#define DL(X)
A macro for writing debug message to the Logger.
Definition Logger.h:273
#define trace_with_mask(s, m)
trace_with_mask() is used to trace function call chain in C++ program.
Definition Logger.h:437
int handler_t
Definition Logger_Impl.h:82
An implementation of Reactor pattern.
A wrapper class to provide AutoPtr with reference semantics.
Definition AutoPtr.h:32
EventHandler class.
virtual int handle_write(int fd)
Write handler callback.
virtual int handle_except(int fd)
Exception handler callback.
virtual int handle_read(int fd)
Read event callback.
Class FdSet.
Definition FdSet.h:52
bool setFd(handler_t fd_)
Set flag (ON) for the argument fd.
Definition FdSet.cpp:20
bool clear(handler_t fd_)
Clear flag (OFF) for the argument fd.
Definition FdSet.cpp:39
int numSet()
Determine how many bits are set (ON) in the set.
Definition FdSet.h:126
FdSet m_rset
Read fds set.
Definition MaskSet.h:28
void sync()
Resync internals after select() call.
Definition MaskSet.h:52
FdSet m_eset
Exception fds set.
Definition MaskSet.h:34
int max_fd()
Return maximum value of the file descriptor in the Set.
Definition MaskSet.h:71
void reset()
Clear all bits in all sets.
Definition MaskSet.h:62
void dump()
Write current state of MaskSet object to log file.
Definition MaskSet.h:80
FdSet m_wset
Write fds set.
Definition MaskSet.h:31
void calculateTimeout(TimeVal *&howlong_, TimeVal *maxwait_)
Calculate closest timeout.
Definition Reactor.cpp:421
bool registerIOHandler(EventHandler *eh_, handler_t fd_, EventType et_=RWE_EVENTS)
Register I/O Event handler with Reactor.
Definition Reactor.cpp:93
TimerQueue m_tqueue
The queue of Timers.
Definition Reactor.h:227
MaskSet m_waitSet
Handlers to wait for event on.
Definition Reactor.h:221
Fd2Eh_Map_Type m_writeSet
Event handlers awaiting on WRITE_EVENT.
Definition Reactor.h:215
Reactor()
Constructor.
Definition Reactor.cpp:24
Fd2Eh_Map_Type::iterator Fd2Eh_Map_Iter
Definition Reactor.h:155
MaskSet m_readySet
Handlers that are ready for processing.
Definition Reactor.h:224
void waitForEvents(void)
Main waiting loop that blocks indefinitely processing events.
Definition Reactor.cpp:470
handler_t m_maxfd_plus1
Max file descriptor number (in all sets) plus 1.
Definition Reactor.h:206
void dispatchHandler(FdSet &mask_, Fd2Eh_Map_Type &fdSet_, EH_IO_Callback callback_)
Call handler's callback and, if callback returns negative value, remove it from the Reactor.
Definition Reactor.cpp:568
int m_fd_setsize
Max number of open files per process.
Definition Reactor.h:200
bool handleError(void)
Handle error in select(2) loop appropriately.
Definition Reactor.cpp:341
std::map< u_int, EventHandler * > Fd2Eh_Map_Type
no cloning
Definition Reactor.h:154
void deactivate(void)
Deactivate Reactor.
Definition Reactor.h:234
bool dispatch(int minimum_)
Notify all EventHandlers registered on respecful events occured.
Definition Reactor.cpp:626
Fd2Eh_Map_Type m_readSet
Event handlers awaiting on READ_EVENT.
Definition Reactor.h:212
bool m_active
Flag that indicates whether Reactor is active or had been stopped.
Definition Reactor.h:209
void stopReactor(void)
Stop Reactor's activity.
Definition Reactor.cpp:667
TimerId registerTimerHandler(EventHandler *eh_, const TimeVal &tv_, const std::string &name_="<unknown>")
Register Timer Event handler with Reactor.
Definition Reactor.cpp:67
bool checkFDs(void)
Check mask for bad file descriptors.
Definition Reactor.cpp:317
int isAnyReady(void)
Return number of file descriptors ready accross all sets.
Definition Reactor.cpp:404
bool removeHandler(EventHandler *eh_, EventType et_=ALL_EVENTS)
Remove Event handler from reactor for either all I/O events or timeout event or both.
Definition Reactor.cpp:173
~Reactor()
Destructor.
Definition Reactor.cpp:55
bool removeTimerHandler(TimerId id_)
Remove Timer event from the queue.
Definition Reactor.cpp:152
void adjust_maxfdp1(handler_t fd_)
Adjust maxfdp1 in a portable way (win32 ignores maxfd alltogether).
Definition Reactor.cpp:701
bool removeIOHandler(handler_t fd_)
Remove IO Event handler from reactor.
Definition Reactor.cpp:247
Fd2Eh_Map_Type m_exceptSet
Event handlers awaiting on EXCEPT_EVENT.
Definition Reactor.h:218
static TimeVal zeroTime()
Static that returns zero timeval: {0,0}.
Definition TimeVal.h:157
static TimeVal gettimeofday()
Shields off underlying OS differences in getting current time.
Definition TimeVal.cpp:44
int expire(const TimeVal &tv_)
Traverse the queue, triggering all timers that are past argument timeval.
void dump(void)
Dump Queue information to the log file.
int remove(EventHandler *eh_)
Cancel all timers for the EventHandler eh_.
bool isEmpty()
Is queue empty?
Definition TimerQueue.h:110
TimerId insert(EventHandler *eh_, const TimeVal &tv_, const TimeVal &delta_, const std::string &name_)
Add timer (EventHandler object) to the queue to be dispatch at the time specified.
TimeVal & top(void)
Return expiration time of the top element in the queue.
Definition TimerQueue.h:117
EventType
EventType defines events types that Reactor understands.
unsigned long TimerId
Timer Id is used in handle_timeout() calls.
bool isReadEvent(EventType e_)
bool isExceptEvent(EventType e_)
@ REACT
Class Reactor/PrioriyQueue messages
Definition LogMask.h:39
@ ASSAERR
ASSA and system errors
Definition LogMask.h:34
@ REACTTRACE
Extended Reactor/PrioriyQueue messages
Definition LogMask.h:40
bool isSignalEvent(EventType e_)
bool is_valid_handler(handler_t socket_)
Detect socket() error in a portable way.
bool isTimeoutEvent(EventType e_)
int(EventHandler::* EH_IO_Callback)(int)
A type for the pointer to I/O-related callback member function of class EventHandler.
bool isWriteEvent(EventType e_)