XRootD
Loading...
Searching...
No Matches
XrdClPostMaster.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
22#include "XrdCl/XrdClMessage.hh"
25#include "XrdCl/XrdClPoller.hh"
29#include "XrdCl/XrdClChannel.hh"
31#include "XrdCl/XrdClLog.hh"
33
35
36namespace XrdCl
37{
38 struct ConnErrJob : public Job
39 {
41 std::function<void( const URL&, const XRootDStatus& )> handler) : url( url ),
42 status( status ),
44 {
45 }
46
47 void Run( void *arg )
48 {
49 handler( url, status );
50 delete this;
51 }
52
55 std::function<void( const URL&, const XRootDStatus& )> handler;
56 };
57
59 {
60 PostMasterImpl() : pPoller( 0 ), pInitialized( false ), pRunning( false )
61 {
62 Env *env = DefaultEnv::GetEnv();
63 int workerThreads = DefaultWorkerThreads;
64 env->GetInt( "WorkerThreads", workerThreads );
65
67 pJobManager = new JobManager(workerThreads);
68 }
69
71 {
72 delete pPoller;
73 delete pTaskManager;
74 delete pJobManager;
75 }
76
77 typedef std::map<std::string, Channel*> ChannelMap;
78
86
88 std::unique_ptr<Job> pOnConnJob;
89 std::function<void( const URL&, const XRootDStatus& )> pOnConnErrCB;
90
92 };
93
94 //----------------------------------------------------------------------------
95 // Constructor
96 //----------------------------------------------------------------------------
98 {
99 }
100
101 //----------------------------------------------------------------------------
102 // Destructor
103 //----------------------------------------------------------------------------
107
108 //----------------------------------------------------------------------------
109 // Initializer
110 //----------------------------------------------------------------------------
112 {
113 Env *env = DefaultEnv::GetEnv();
114 std::string pollerPref = DefaultPollerPreference;
115 env->GetString( "PollerPreference", pollerPref );
116
117 pImpl->pPoller = PollerFactory::CreatePoller( pollerPref );
118
119 if( !pImpl->pPoller )
120 return false;
121
122 bool st = pImpl->pPoller->Initialize();
123
124 if( !st )
125 {
126 delete pImpl->pPoller;
127 return false;
128 }
129
130 pImpl->pJobManager->Initialize();
131 pImpl->pInitialized = true;
132 return true;
133 }
134
135 //----------------------------------------------------------------------------
136 // Finalizer
137 //----------------------------------------------------------------------------
139 {
140 //--------------------------------------------------------------------------
141 // Clean up the channels
142 //--------------------------------------------------------------------------
143 if( !pImpl->pInitialized )
144 return true;
145
146 pImpl->pInitialized = false;
147 pImpl->pJobManager->Finalize();
148 PostMasterImpl::ChannelMap::iterator it;
149
150 for( it = pImpl->pChannelMap.begin(); it != pImpl->pChannelMap.end(); ++it )
151 delete it->second;
152
153 pImpl->pChannelMap.clear();
154 return pImpl->pPoller->Finalize();
155 }
156
157 //----------------------------------------------------------------------------
158 // Start the post master
159 //----------------------------------------------------------------------------
161 {
162 if( !pImpl->pInitialized )
163 return false;
164
165 if( !pImpl->pPoller->Start() )
166 return false;
167
168 if( !pImpl->pTaskManager->Start() )
169 {
170 pImpl->pPoller->Stop();
171 return false;
172 }
173
174 if( !pImpl->pJobManager->Start() )
175 {
176 pImpl->pPoller->Stop();
177 pImpl->pTaskManager->Stop();
178 return false;
179 }
180
181 pImpl->pRunning = true;
182 return true;
183 }
184
185 //----------------------------------------------------------------------------
186 // Stop the postmaster
187 //----------------------------------------------------------------------------
189 {
190 if( !pImpl->pInitialized || !pImpl->pRunning )
191 return true;
192
193 if( !pImpl->pJobManager->Stop() )
194 return false;
195 if( !pImpl->pPoller->Stop() )
196 return false;
197 if( !pImpl->pTaskManager->Stop() )
198 return false;
199 pImpl->pRunning = false;
200 return true;
201 }
202
203 //----------------------------------------------------------------------------
204 // Reinitialize after fork
205 //----------------------------------------------------------------------------
207 {
208 return true;
209 }
210
211 //----------------------------------------------------------------------------
212 // Send the message asynchronously
213 //----------------------------------------------------------------------------
215 Message *msg,
216 MsgHandler *handler,
217 bool stateful,
218 time_t expires )
219 {
220 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
221 Channel *channel = GetChannel( url );
222
223 if( !channel )
225
226 return channel->Send( msg, handler, stateful, expires );
227 }
228
230 Message *msg,
231 MsgHandler *inHandler )
232 {
234 VirtualRedirector *redirector = registry.Get( url );
235 if( !redirector )
236 return Status( stError, errInvalidOp );
237 return redirector->HandleRequest( msg, inHandler );
238 }
239
240 //----------------------------------------------------------------------------
241 // Query the transport handler
242 //----------------------------------------------------------------------------
244 uint16_t query,
245 AnyObject &result )
246 {
247 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
248 Channel *channel = 0;
249 {
250 XrdSysMutexHelper scopedLock2( pImpl->pChannelMapMutex );
251 PostMasterImpl::ChannelMap::iterator it =
252 pImpl->pChannelMap.find( url.GetChannelId() );
253 if( it == pImpl->pChannelMap.end() )
254 return Status( stError, errInvalidOp );
255 channel = it->second;
256 }
257
258 if( !channel )
259 return Status( stError, errNotSupported );
260
261 return channel->QueryTransport( query, result );
262 }
263
264 //----------------------------------------------------------------------------
265 // Register channel event handler
266 //----------------------------------------------------------------------------
268 ChannelEventHandler *handler )
269 {
270 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
271 Channel *channel = GetChannel( url );
272
273 if( !channel )
274 return Status( stError, errNotSupported );
275
276 channel->RegisterEventHandler( handler );
277 return Status();
278 }
279
280 //----------------------------------------------------------------------------
281 // Remove a channel event handler
282 //----------------------------------------------------------------------------
284 ChannelEventHandler *handler )
285 {
286 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
287 Channel *channel = GetChannel( url );
288
289 if( !channel )
290 return Status( stError, errNotSupported );
291
292 channel->RemoveEventHandler( handler );
293 return Status();
294 }
295
296 //------------------------------------------------------------------------
297 // Get the task manager object user by the post master
298 //------------------------------------------------------------------------
300 {
301 return pImpl->pTaskManager;
302 }
303
304 //------------------------------------------------------------------------
305 // Get the job manager object user by the post master
306 //------------------------------------------------------------------------
308 {
309 return pImpl->pJobManager;
310 }
311
312 //------------------------------------------------------------------------
313 // Shut down a channel
314 //------------------------------------------------------------------------
316 {
317 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock, false );
318 PostMasterImpl::ChannelMap::iterator it =
319 pImpl->pChannelMap.find( url.GetChannelId() );
320
321 if( it == pImpl->pChannelMap.end() )
322 return Status( stError, errInvalidOp );
323
324 it->second->ForceDisconnect();
325 delete it->second;
326 pImpl->pChannelMap.erase( it );
327
328 return Status();
329 }
330
332 {
333 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock, false );
334 PostMasterImpl::ChannelMap::iterator it =
335 pImpl->pChannelMap.find( url.GetChannelId() );
336
337 if( it == pImpl->pChannelMap.end() )
338 return Status( stError, errInvalidOp );
339
340 it->second->ForceReconnect();
341 return Status();
342 }
343
344 //------------------------------------------------------------------------
345 // Get the number of connected data streams
346 //------------------------------------------------------------------------
347 uint16_t PostMaster::NbConnectedStrm( const URL &url )
348 {
349 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
350 Channel *channel = GetChannel( url );
351 if( !channel ) return 0;
352 return channel->NbConnectedStrm();
353 }
354
355 //------------------------------------------------------------------------
357 //------------------------------------------------------------------------
359 std::shared_ptr<Job> onConnJob )
360 {
361 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
362 Channel *channel = GetChannel( url );
363 if( !channel ) return;
364 channel->SetOnDataConnectHandler( onConnJob );
365 }
366
367 //------------------------------------------------------------------------
369 //------------------------------------------------------------------------
370 void PostMaster::SetOnConnectHandler( std::unique_ptr<Job> onConnJob )
371 {
372 XrdSysMutexHelper lck( pImpl->pMtx );
373 pImpl->pOnConnJob = std::move( onConnJob );
374 }
375
376 //------------------------------------------------------------------------
377 // Set the global connection error handler
378 //------------------------------------------------------------------------
379 void PostMaster::SetConnectionErrorHandler( std::function<void( const URL&, const XRootDStatus& )> handler )
380 {
381 XrdSysMutexHelper lck( pImpl->pMtx );
382 pImpl->pOnConnErrCB = std::move( handler );
383 }
384
385 //------------------------------------------------------------------------
386 // Notify the global on-connect handler
387 //------------------------------------------------------------------------
389 {
390 XrdSysMutexHelper lck( pImpl->pMtx );
391 if( pImpl->pOnConnJob )
392 {
393 URL *ptr = new URL( url );
394 pImpl->pJobManager->QueueJob( pImpl->pOnConnJob.get(), ptr );
395 }
396 }
397
398 //------------------------------------------------------------------------
399 // Notify the global error connection handler
400 //------------------------------------------------------------------------
401 void PostMaster::NotifyConnErrHandler( const URL &url, const XRootDStatus &status )
402 {
403 XrdSysMutexHelper lck( pImpl->pMtx );
404 if( pImpl->pOnConnErrCB )
405 {
406 ConnErrJob *job = new ConnErrJob( url, status, pImpl->pOnConnErrCB );
407 pImpl->pJobManager->QueueJob( job, nullptr );
408 }
409 }
410
411 //----------------------------------------------------------------------------
413 //----------------------------------------------------------------------------
414 void PostMaster::CollapseRedirect( const URL &alias, const URL &url )
415 {
416 XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
417
418 //--------------------------------------------------------------------------
419 // Get the passive channel
420 //--------------------------------------------------------------------------
421 PostMasterImpl::ChannelMap::iterator it =
422 pImpl->pChannelMap.find( alias.GetChannelId() );
423 Channel *passive = 0;
424 if( it != pImpl->pChannelMap.end() )
425 passive = it->second;
426 //--------------------------------------------------------------------------
427 // If the channel does not exist there's nothing to do
428 //--------------------------------------------------------------------------
429 else return;
430
431 //--------------------------------------------------------------------------
432 // Check if this URL is eligible for collapsing
433 //--------------------------------------------------------------------------
434 if( !passive->CanCollapse( url ) ) return;
435
436 //--------------------------------------------------------------------------
437 // Create the active channel
438 //--------------------------------------------------------------------------
440 TransportHandler *trHandler = trManager->GetHandler( url.GetProtocol() );
441
442 if( !trHandler )
443 {
444 Log *log = DefaultEnv::GetLog();
445 log->Error( PostMasterMsg, "Unable to get transport handler for %s "
446 "protocol", url.GetProtocol().c_str() );
447 return;
448 }
449
450 Log *log = DefaultEnv::GetLog();
451 log->Info( PostMasterMsg, "Label channel %s with alias %s.",
452 url.GetHostId().c_str(), alias.GetHostId().c_str() );
453
454 Channel *active = new Channel( alias, pImpl->pPoller, trHandler,
455 pImpl->pTaskManager, pImpl->pJobManager, url );
456 pImpl->pChannelMap[alias.GetChannelId()] = active;
457
458 //--------------------------------------------------------------------------
459 // The passive channel will be deallocated by TTL
460 //--------------------------------------------------------------------------
461 }
462
463 //------------------------------------------------------------------------
464 // Decrement file object instance count bound to this channel
465 //------------------------------------------------------------------------
467 {
468 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
469 Channel *channel = GetChannel( url );
470
471 if( !channel ) return;
472
473 return channel->DecFileInstCnt();
474 }
475
476 //------------------------------------------------------------------------
477 //true if underlying threads are running, false otherwise
478 //------------------------------------------------------------------------
480 {
481 return pImpl->pRunning;
482 }
483
484 //----------------------------------------------------------------------------
485 // Get the channel
486 //----------------------------------------------------------------------------
487 Channel *PostMaster::GetChannel( const URL &url )
488 {
489 XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
490 Channel *channel = 0;
491 PostMasterImpl::ChannelMap::iterator it = pImpl->pChannelMap.find( url.GetChannelId() );
492
493 if( it == pImpl->pChannelMap.end() )
494 {
496 TransportHandler *trHandler = trManager->GetHandler( url.GetProtocol() );
497
498 if( !trHandler )
499 {
500 Log *log = DefaultEnv::GetLog();
501 log->Error( PostMasterMsg, "Unable to get transport handler for %s "
502 "protocol", url.GetProtocol().c_str() );
503 return 0;
504 }
505
506 channel = new Channel( url, pImpl->pPoller, trHandler, pImpl->pTaskManager,
507 pImpl->pJobManager );
508 pImpl->pChannelMap[url.GetChannelId()] = channel;
509 }
510 else
511 channel = it->second;
512 return channel;
513 }
514}
A communication channel between the client and the server.
uint16_t NbConnectedStrm()
Get the number of connected data streams.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void DecFileInstCnt()
Decrement file object instance count bound to this channel.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
bool CanCollapse(const URL &url)
Status QueryTransport(uint16_t query, AnyObject &result)
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
A synchronized queue.
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
The message representation used throughout the system.
static Poller * CreatePoller(const std::string &preference)
Interface for socket pollers.
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
bool Start()
Start the post master.
bool Finalize()
Finalizer.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Status ForceReconnect(const URL &url)
Reconnect the channel.
bool Stop()
Stop the postmaster.
bool Reinitialize()
Reinitialize after fork.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
uint16_t NbConnectedStrm(const URL &url)
Get the number of connected data streams.
void SetOnConnectHandler(std::unique_ptr< Job > onConnJob)
Set the global connection error handler.
Status RemoveEventHandler(const URL &url, ChannelEventHandler *handler)
Remove a channel event handler.
virtual ~PostMaster()
Destructor.
PostMaster()
Constructor.
void SetConnectionErrorHandler(std::function< void(const URL &, const XRootDStatus &)> handler)
Set the global on-error on-connect handler for control streams.
Status ForceDisconnect(const URL &url)
Shut down a channel.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
Status RegisterEventHandler(const URL &url, ChannelEventHandler *handler)
Register channel event handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
JobManager * GetJobManager()
Get the job manager object user by the post master.
bool Initialize()
Initializer.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Perform the handshake and the authentication for each physical stream.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
URL representation.
Definition XrdClURL.hh:31
std::string GetChannelId() const
Definition XrdClURL.cc:494
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:94
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:113
An interface for metadata redirectors.
virtual XRootDStatus HandleRequest(const Message *msg, MsgHandler *handler)=0
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t PostMasterMsg
const uint16_t errInvalidOp
const char *const DefaultPollerPreference
const uint16_t errNotSupported
const int DefaultWorkerThreads
void Run(void *arg)
The job logic.
ConnErrJob(const URL &url, const XRootDStatus &status, std::function< void(const URL &, const XRootDStatus &)> handler)
std::function< void(const URL &, const XRootDStatus &)> handler
std::map< std::string, Channel * > ChannelMap
std::unique_ptr< Job > pOnConnJob
std::function< void(const URL &, const XRootDStatus &)> pOnConnErrCB
Procedure execution status.