libzypp 17.34.1
eventdispatcher_glib.cc
Go to the documentation of this file.
1#include "timer.h"
4
8#include <zypp-core/zyppng/base/UnixSignalSource>
9
10namespace zyppng {
11
12static int inline readMask () {
13 return ( G_IO_IN | G_IO_HUP );
14}
15
16static int inline writeMask () {
17 return ( G_IO_OUT );
18}
19
20static int inline excpMask () {
21 return ( G_IO_PRI );
22}
23
24static int inline evModeToMask ( int mode ) {
25 int cond = 0;
26 if ( mode & AbstractEventSource::Read ) {
27 cond = readMask() | G_IO_ERR;
28 }
29 if ( mode & AbstractEventSource::Write ) {
30 cond = cond | writeMask() | G_IO_ERR;
31 }
32 if ( mode & AbstractEventSource::Exception ) {
33 cond = cond | excpMask() | G_IO_ERR;
34 }
35 return cond;
36}
37
38static int inline gioConditionToEventTypes ( const GIOCondition rEvents, const int requestedEvs ) {
39 int ev = 0;
40 if ( ( rEvents & requestedEvs ) != 0 ) {
41 if ( ( rEvents & readMask() ) && ( requestedEvs & readMask() ) )
43 if ( ( rEvents & writeMask() ) && ( requestedEvs & writeMask() ) )
45 if ( ( rEvents & excpMask()) && ( requestedEvs & excpMask() ) )
47 if ( ( rEvents & G_IO_ERR) && ( requestedEvs & G_IO_ERR ) )
49 }
50 return ev;
51}
52
53static GSourceFuncs abstractEventSourceFuncs = {
57 nullptr,
58 nullptr,
59 nullptr
60};
61
63 GAbstractEventSource *src = nullptr;
64 src = reinterpret_cast<GAbstractEventSource *>(g_source_new(&abstractEventSourceFuncs, sizeof(GAbstractEventSource)));
65 (void) new (&src->pollfds) std::vector<GUnixPollFD>();
66
67 src->eventSource = nullptr;
68 src->_ev = ev;
69 return src;
70}
71
73{
74 for ( GUnixPollFD &fd : src->pollfds ) {
75 if ( fd.tag )
76 g_source_remove_unix_fd( &src->source, fd.tag );
77 }
78
79 src->pollfds.clear();
80 src->pollfds.std::vector< GUnixPollFD >::~vector();
81 g_source_destroy( &src->source );
82 g_source_unref( &src->source );
83}
84
85gboolean GAbstractEventSource::prepare(GSource *, gint *timeout)
86{
87 //we can not yet determine if the GSource is ready, polling FDs also have no
88 //timeout, so lets continue
89 if ( timeout )
90 *timeout = -1;
91 return false;
92}
93
94//here we need to figure out which FDs are pending
95gboolean GAbstractEventSource::check( GSource *source )
96{
97 GAbstractEventSource *src = reinterpret_cast<GAbstractEventSource*>( source );
98
99 //check for pending and remove orphaned entries
100 bool hasPending = false;
101
102 for ( auto fdIt = src->pollfds.begin(); fdIt != src->pollfds.end(); ) {
103 if ( fdIt->tag == nullptr ) {
104 //this pollfd was removed, clear it from the list
105 //for now keep the object in the sources list if the pollfd list gets empty, if it does not register new events until
106 //next check it is removed for good
107 fdIt = src->pollfds.erase( fdIt );
108 } else {
109 GIOCondition pendEvents = g_source_query_unix_fd( source, fdIt->tag );
110 if ( pendEvents & G_IO_NVAL ){
111 //that poll is broken, do we need to do more????
112 fdIt = src->pollfds.erase( fdIt );
113 } else {
114 hasPending = hasPending || ( pendEvents & fdIt->reqEvents );
115 fdIt++;
116 }
117 }
118 }
119
120 //if the pollfds are empty trigger dispatch so this source can be removed
121 return hasPending || src->pollfds.empty();
122}
123
124//Trigger all event sources that have been activated
125gboolean GAbstractEventSource::dispatch(GSource *source, GSourceFunc, gpointer)
126{
127 GAbstractEventSource *src = reinterpret_cast<GAbstractEventSource*>( source );
128
129 if ( !src )
130 return G_SOURCE_REMOVE;
131
132 //sources are only removed here so we do not accidentially mess with the pollfd iterator in the next loop
133 //were we trigger all ready FDs
134 if ( src->pollfds.empty() ) {
135 auto it = std::find( src->_ev->_eventSources.begin(), src->_ev->_eventSources.end(), src );
136
137 if ( it != src->_ev->_eventSources.end() ) {
139 src->_ev->_eventSources.erase( it );
140 return G_SOURCE_REMOVE;
141 }
142 }
143
144 for ( const GUnixPollFD &pollfd : src->pollfds ) {
145 //do not trigger orphaned ones
146 if ( pollfd.tag != nullptr ) {
147 GIOCondition pendEvents = g_source_query_unix_fd( source, pollfd.tag );
148
149 if ( (pendEvents & pollfd.reqEvents ) != 0 ) {
150 int ev = gioConditionToEventTypes( pendEvents, pollfd.reqEvents );
151 // we require all event objects to be used in shared_ptr form, by doing this we make sure that the object is not destroyed
152 // while we still use it. However this WILL throw in case of using the EventSource outside of shared_ptr bounds
153 auto eventSourceLocked = src->eventSource->shared_this<AbstractEventSource>();
154 eventSourceLocked->onFdReady( pollfd.pollfd, ev );
155 }
156 }
157 }
158
159 return G_SOURCE_CONTINUE;
160}
161
162static GSourceFuncs glibTimerSourceFuncs = {
166 nullptr,
167 nullptr,
168 nullptr
169};
170
171//check when this timer expires and set the correct timeout
172gboolean GLibTimerSource::prepare(GSource *src, gint *timeout)
173{
174 GLibTimerSource *source = reinterpret_cast<GLibTimerSource *>( src );
175 if ( !source )
176 return false; //not ready for dispatch
177
178 if ( !source->_t )
179 return false;
180
181 uint64_t nextTimeout = source->_t->remaining();
182 if ( timeout ) {
183 //this would be a really looong timeout, but be safe
184 if ( nextTimeout > G_MAXINT )
185 *timeout = G_MAXINT;
186 else
187 *timeout = static_cast<gint>( nextTimeout );
188 }
189 return ( nextTimeout == 0 );
190}
191
192//this is essentially the same as prepare
193gboolean GLibTimerSource::check(GSource *source)
194{
195 return prepare( source, nullptr );
196}
197
198//emit the expired timers, restart timers that are no single shots
199gboolean GLibTimerSource::dispatch(GSource *src, GSourceFunc, gpointer)
200{
201 GLibTimerSource *source = reinterpret_cast<GLibTimerSource *>( src );
202 if ( !source )
203 return true;
204
205 if ( source->_t == nullptr )
206 return true;
207 //this will emit the expired signal and reset the timer
208 //or stop it in case its a single shot timer
209 source->_t->shared_this<Timer>()->expire();
210 return true;
211}
212
214{
215 GLibTimerSource *src = nullptr;
216 src = reinterpret_cast<GLibTimerSource *>(g_source_new(&glibTimerSourceFuncs, sizeof(GLibTimerSource)));
217 src->_t = nullptr;
218 return src;
219}
220
222{
223 g_source_destroy( &src->source );
224 g_source_unref( &src->source );
225}
226
230static gboolean eventLoopIdleFunc ( gpointer user_data )
231{
232 auto dPtr = reinterpret_cast<EventDispatcherPrivate *>( user_data );
233 if ( dPtr ) {
234 if( dPtr->runIdleTasks() ) {
235 return G_SOURCE_CONTINUE;
236 }
237
238 g_source_unref ( dPtr->_idleSource );
239 dPtr->_idleSource = nullptr;
240 }
241 return G_SOURCE_REMOVE;
242}
243
245{
246 source = g_child_watch_source_new( pid );
247}
248
250 : tag( other.tag )
251 , source( other.source )
252 , callback( std::move( other.callback ) )
253{
254 other.source = nullptr;
255}
256
258{
259 if ( source ) {
260 g_source_destroy( source );
261 g_source_unref( source );
262 }
263}
264
266{
267 tag = other.tag;
268 source = other.source;
269 callback = std::move( other.callback );
270 other.source = nullptr;
271 return *this;
272}
273
275{
276 _myThreadId = std::this_thread::get_id();
277
278 //if we get a context specified ( usually when created for main thread ) we use it
279 //otherwise we create our own
280 if ( ctx ) {
281 _ctx = ctx;
282 g_main_context_ref ( _ctx );
283 } else {
284 _ctx = g_main_context_new();
285 }
286 // Enable this again once we switch to a full async API that requires a eventloop before calling any zypp functions
287 // g_main_context_push_thread_default( _ctx );
288}
289
291{
292 std::for_each ( _runningTimers.begin(), _runningTimers.end(), []( GLibTimerSource *src ){
293 GLibTimerSource::destruct( src );
294 });
295 std::for_each ( _eventSources.begin(), _eventSources.end(), []( GAbstractEventSource *src ){
296 GAbstractEventSource::destruct( src );
297 });
298 _runningTimers.clear();
299 _eventSources.clear();
300
301 if ( _idleSource ) {
302 g_source_destroy( _idleSource );
303 g_source_unref ( _idleSource );
304 }
305
306 //g_main_context_pop_thread_default( _ctx );
307 g_main_context_unref( _ctx );
308}
309
311{
312 //run all user defined idle functions
313 //if they return true, they are executed again in the next idle run
314 decltype ( _idleFuncs ) runQueue;
315 runQueue.swap( _idleFuncs );
316
317 while ( runQueue.size() ) {
318 EventDispatcher::IdleFunction fun( std::move( runQueue.front() ) );
319 runQueue.pop();
320 if ( fun() )
321 _idleFuncs.push( std::move(fun) );
322 }
323
324 //keep this as the last thing to call after all user code was executed
325 if ( _unrefLater.size() )
326 _unrefLater.clear();
327
328 return _idleFuncs.size() || _unrefLater.size();
329}
330
332{
333 if ( !_idleSource ) {
334 _idleSource = g_idle_source_new ();
335 g_source_set_callback ( _idleSource, eventLoopIdleFunc, this, nullptr );
336 g_source_attach ( _idleSource, _ctx );
337 }
338}
339
340std::shared_ptr<EventDispatcher> EventDispatcherPrivate::create()
341{
342 return std::shared_ptr<EventDispatcher>( new EventDispatcher() );
343}
344
345void EventDispatcherPrivate::waitPidCallback( GPid pid, gint status, gpointer user_data )
346{
347 EventDispatcherPrivate *that = reinterpret_cast<EventDispatcherPrivate *>( user_data );
348
349 try {
350 auto data = std::move( that->_waitPIDs.at(pid) );
351 that->_waitPIDs.erase( pid );
352
353 if ( data.callback )
354 data.callback( pid, status );
355
356 g_spawn_close_pid( pid );
357
358 // no need to take care of releasing the GSource, the event loop took care of that
359
360 } catch ( const std::out_of_range &e ) {
361 return;
362 }
363}
364
366
368 : Base ( * new EventDispatcherPrivate( reinterpret_cast<GMainContext*>(ctx), *this ) )
369{
370}
371
375
377{
378 Z_D();
379 if ( notifier.eventDispatcher().lock().get() != this )
380 ZYPP_THROW( zypp::Exception("Invalid event dispatcher used to update event source") );
381
382 AbstractEventSource *notifyPtr = &notifier;
383
384 GAbstractEventSource *evSrc = nullptr;
385 auto &evSrcList = d->_eventSources;
386 auto itToEvSrc = std::find_if( evSrcList.begin(), evSrcList.end(), [ notifyPtr ]( const auto elem ){ return elem->eventSource == notifyPtr; } );
387 if ( itToEvSrc == evSrcList.end() ) {
388
389 evSrc = GAbstractEventSource::create( d );
390 evSrc->eventSource = notifyPtr;
391 evSrcList.push_back( evSrc );
392
393 g_source_attach( &evSrc->source, d->_ctx );
394
395 } else
396 evSrc = (*itToEvSrc);
397
398 int cond = evModeToMask( mode );
399 auto it = std::find_if( evSrc->pollfds.begin(), evSrc->pollfds.end(), [fd]( const auto &currPollFd ) {
400 return currPollFd.pollfd == fd;
401 });
402
403 if ( it != evSrc->pollfds.end() ) {
404 //found
405 it->reqEvents = static_cast<GIOCondition>( cond );
406 g_source_modify_unix_fd( &evSrc->source, it->tag, static_cast<GIOCondition>(cond) );
407 } else {
408 evSrc->pollfds.push_back(
410 static_cast<GIOCondition>(cond),
411 fd,
412 g_source_add_unix_fd( &evSrc->source, fd, static_cast<GIOCondition>(cond) )
413 }
414 );
415 }
416}
417
419{
420 Z_D();
421
422 AbstractEventSource *ptr = &notifier;
423
424 if ( notifier.eventDispatcher().lock().get() != this )
425 ZYPP_THROW( zypp::Exception("Invalid event dispatcher used to remove event source") );
426
427 auto &evList = d->_eventSources;
428 auto it = std::find_if( evList.begin(), evList.end(), [ ptr ]( const auto elem ){ return elem->eventSource == ptr; } );
429
430 if ( it == evList.end() )
431 return;
432
433 auto &fdList = (*it)->pollfds;
434
435 if ( fd == -1 ) {
436 //we clear out all unix_fd watches but do not destroy the source just yet. We currently might
437 //be in the dispatch() function of that AbstractEventSource, make sure not to break the iterator
438 //for the fd's
439 for ( auto &pFD : fdList ) {
440 if ( pFD.tag )
441 g_source_remove_unix_fd( &(*it)->source, pFD.tag );
442 pFD.pollfd = -1;
443 pFD.tag = nullptr; //mark as orphaned, do not delete the element here this might break dipatching
444 }
445 } else {
446 auto fdIt = std::find_if( fdList.begin(), fdList.end(), [ fd ]( const auto &pFd ){ return pFd.pollfd == fd; } );
447 if ( fdIt != fdList.end() ) {
448 if ( fdIt->tag )
449 g_source_remove_unix_fd( &(*it)->source, (*fdIt).tag );
450 //also do not remove here, mark as orphaned only to not break iterating in dispatch()
451 fdIt->tag = nullptr;
452 fdIt->pollfd = -1;
453 }
454 }
455}
456
458{
459 Z_D();
460 //make sure timer is not double registered
461 for ( const GLibTimerSource *t : d->_runningTimers ) {
462 if ( t->_t == &timer )
463 return;
464 }
465
467 newSrc->_t = &timer;
468 d->_runningTimers.push_back( newSrc );
469
470 g_source_attach( &newSrc->source, d->_ctx );
471}
472
474{
475 Z_D();
476 auto it = std::find_if( d->_runningTimers.begin(), d->_runningTimers.end(), [ &timer ]( const GLibTimerSource *src ){
477 return src->_t == &timer;
478 });
479
480 if ( it != d->_runningTimers.end() ) {
481 GLibTimerSource *src = *it;
482 d->_runningTimers.erase( it );
484 }
485}
486
488{
489 return d_func()->_ctx;
490}
491
492bool EventDispatcher::waitForFdEvent( const int fd, int events , int &revents , int &timeout )
493{
494 GPollFD pollFd;
495 pollFd.fd = fd;
496 pollFd.events = evModeToMask(events);
497
498 bool eventTriggered = false;
499 zypp::AutoDispose<GTimer *> timer( g_timer_new(), &g_timer_destroy );
500 while ( !eventTriggered ) {
501 g_timer_start( *timer );
502 const int res = g_poll( &pollFd, 1, timeout );
503 switch ( res ) {
504 case 0: //timeout
505 timeout = 0;
506 return false;
507 case -1: { // interrupt
508 // if timeout is -1 we wait until eternity
509 if ( timeout == -1 )
510 continue;
511
512 timeout -= g_timer_elapsed( *timer, nullptr );
513 if ( timeout < 0 ) timeout = 0;
514 if ( timeout <= 0 )
515 return false;
516
517 if ( errno == EINTR )
518 continue;
519
520 ERR << "g_poll error: " << strerror(errno) << std::endl;
521 return false;
522 }
523 case 1:
524 eventTriggered = true;
525 break;
526 }
527 }
528
529 revents = gioConditionToEventTypes( (GIOCondition)pollFd.revents, evModeToMask(events) );
530 return true;
531}
532
533void EventDispatcher::trackChildProcess( int pid, std::function<void (int, int)> callback )
534{
535 Z_D();
536 GlibWaitPIDData data ( pid );
537 data.callback = std::move(callback);
538
539 g_source_set_callback ( data.source, (GSourceFunc) &EventDispatcherPrivate::waitPidCallback , d_ptr.get(), nullptr );
540 data.tag = g_source_attach ( data.source, d->_ctx );
541 d->_waitPIDs.insert( std::make_pair( pid, std::move(data) ) );
542}
543
545{
546 Z_D();
547 try {
548 d->_waitPIDs.erase( pid );
549 } catch ( const std::out_of_range &e ) {
550 return false;
551 }
552 return true;
553}
554
556{
557 Z_D();
558 // lazy init
559 UnixSignalSourceRef r;
560 if ( d->_signalSource.expired ()) {
561 d->_signalSource = r = UnixSignalSource::create();
562 } else {
563 r = d->_signalSource.lock ();
564 }
565 return r;
566}
567
569{
570 return g_main_context_iteration( d_func()->_ctx, false );
571}
572
574{
575 auto d = instance()->d_func();
576 d->_idleFuncs.push( std::move(callback) );
577 d->enableIdleSource();
578}
579
580void EventDispatcher::unrefLaterImpl( std::shared_ptr<void> &&ptr )
581{
582 Z_D();
583 d->_unrefLater.push_back( std::move(ptr) );
584 d->enableIdleSource();
585}
586
588{
589 d_func()->_unrefLater.clear();
590}
591
593{
594 return d_func()->_runningTimers.size();
595}
596
597std::shared_ptr<EventDispatcher> EventDispatcher::instance()
598{
600}
601
602void EventDispatcher::setThreadDispatcher(const std::shared_ptr<EventDispatcher> &disp)
603{
605}
606
607}
struct _GPollFD GPollFD
Definition ZYppImpl.h:26
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
Definition AutoDispose.h:95
Base class for Exception.
Definition Exception.h:147
std::weak_ptr< EventDispatcher > eventDispatcher() const
virtual void onFdReady(int fd, int events)=0
std::shared_ptr< T > shared_this() const
Definition base.h:113
std::unique_ptr< BasePrivate > d_ptr
Definition base.h:174
std::queue< EventDispatcher::IdleFunction > _idleFuncs
std::vector< std::shared_ptr< void > > _unrefLater
static void waitPidCallback(GPid pid, gint status, gpointer user_data)
std::unordered_map< int, GlibWaitPIDData > _waitPIDs
std::vector< GAbstractEventSource * > _eventSources
std::vector< GLibTimerSource * > _runningTimers
EventDispatcherPrivate(GMainContext *ctx, EventDispatcher &p)
static std::shared_ptr< EventDispatcher > create()
UnixSignalSourceRef unixSignalSource()
virtual void registerTimer(Timer &timer)
void unrefLaterImpl(std::shared_ptr< void > &&ptr)
void * nativeDispatcherHandle() const
Returns the native dispatcher handle if the used implementation supports it.
std::function< bool()> IdleFunction
static bool waitForFdEvent(const int fd, int events, int &revents, int &timeout)
virtual void removeTimer(Timer &timer)
void trackChildProcess(int pid, std::function< void(int, int)> callback)
virtual void updateEventSource(AbstractEventSource &notifier, int fd, int mode)
virtual void removeEventSource(AbstractEventSource &notifier, int fd=-1)
static void setThreadDispatcher(const std::shared_ptr< EventDispatcher > &disp)
static std::shared_ptr< EventDispatcher > instance()
void invokeOnIdleImpl(IdleFunction &&callback)
The Timer class provides repetitive and single-shot timers.
Definition timer.h:45
static UnixSignalSourceRef create()
static gboolean eventLoopIdleFunc(gpointer user_data)
Called when the event loop is idle, here we run cleanup tasks and call later() callbacks of the user.
static GSourceFuncs glibTimerSourceFuncs
static int gioConditionToEventTypes(const GIOCondition rEvents, const int requestedEvs)
static int evModeToMask(int mode)
static int writeMask()
static GSourceFuncs abstractEventSourceFuncs
static int readMask()
static int excpMask()
DlContextRefType _ctx
Definition rpmmd.cc:66
static gboolean check(GSource *source)
std::vector< GUnixPollFD > pollfds
static gboolean dispatch(GSource *source, GSourceFunc, gpointer)
static GAbstractEventSource * create(EventDispatcherPrivate *ev)
static void destruct(GAbstractEventSource *src)
static gboolean prepare(GSource *, gint *timeout)
static void destruct(GLibTimerSource *src)
static gboolean prepare(GSource *src, gint *timeout)
static gboolean check(GSource *source)
static GLibTimerSource * create()
static gboolean dispatch(GSource *source, GSourceFunc, gpointer)
GlibWaitPIDData & operator=(GlibWaitPIDData &&other) noexcept
EventDispatcher::WaitPidCallback callback
std::shared_ptr< EventDispatcher > dispatcher()
static ThreadData & current()
Definition threaddata.cc:16
void setDispatcher(const std::shared_ptr< EventDispatcher > &disp)
Definition threaddata.cc:41
#define ZYPP_THROW(EXCPT)
Drops a logline and throws the Exception.
Definition Exception.h:429
#define ERR
Definition Logger.h:100
#define ZYPP_IMPL_PRIVATE(Class)
Definition zyppglobal.h:91
#define Z_D()
Definition zyppglobal.h:104