XRootD
Loading...
Searching...
No Matches
XrdClPostMaster.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
22#include "XrdCl/XrdClMessage.hh"
25#include "XrdCl/XrdClPoller.hh"
29#include "XrdCl/XrdClChannel.hh"
31#include "XrdCl/XrdClLog.hh"
33
35
36namespace XrdCl
37{
38 struct ConnErrJob : public Job
39 {
41 std::function<void( const URL&, const XRootDStatus& )> handler) : url( url ),
42 status( status ),
44 {
45 }
46
47 void Run( void *arg )
48 {
49 handler( url, status );
50 delete this;
51 }
52
55 std::function<void( const URL&, const XRootDStatus& )> handler;
56 };
57
59 {
60 PostMasterImpl() : pPoller( 0 ), pInitialized( false ), pRunning( false )
61 {
62 Env *env = DefaultEnv::GetEnv();
63 int workerThreads = DefaultWorkerThreads;
64 env->GetInt( "WorkerThreads", workerThreads );
65
67 pJobManager = new JobManager(workerThreads);
68 }
69
71 {
72 delete pPoller;
73 delete pTaskManager;
74 delete pJobManager;
75 }
76
77 typedef std::map<std::string, Channel*> ChannelMap;
78 typedef std::map<const URL*, Channel*> CollapsedMap;
79
84
85 // lock MapMutex if accessing maps while holding pDisconnectLock Read
86 // if MapMutex is required, acquire after pDisconnetLock.
88
92
94 std::unique_ptr<Job> pOnConnJob;
95 std::function<void( const URL&, const XRootDStatus& )> pOnConnErrCB;
96
97 // take DisconnectLock: Read while using Channel* from map.
98 // Write if destroying Channel
99 // if MapMutex is required, acquire DisconnectLock first.
101 };
102
103 //----------------------------------------------------------------------------
104 // Constructor
105 //----------------------------------------------------------------------------
107 {
108 }
109
110 //----------------------------------------------------------------------------
111 // Destructor
112 //----------------------------------------------------------------------------
116
117 //----------------------------------------------------------------------------
118 // Initializer
119 //----------------------------------------------------------------------------
121 {
122 Env *env = DefaultEnv::GetEnv();
123 std::string pollerPref = DefaultPollerPreference;
124 env->GetString( "PollerPreference", pollerPref );
125
126 pImpl->pPoller = PollerFactory::CreatePoller( pollerPref );
127
128 if( !pImpl->pPoller )
129 return false;
130
131 bool st = pImpl->pPoller->Initialize();
132
133 if( !st )
134 {
135 delete pImpl->pPoller;
136 return false;
137 }
138
139 pImpl->pJobManager->Initialize();
140 pImpl->pInitialized = true;
141 return true;
142 }
143
144 //----------------------------------------------------------------------------
145 // Finalizer
146 //----------------------------------------------------------------------------
148 {
149 //--------------------------------------------------------------------------
150 // Clean up the channels
151 //--------------------------------------------------------------------------
152 if( !pImpl->pInitialized )
153 return true;
154
155 pImpl->pInitialized = false;
156 pImpl->pJobManager->Finalize();
157 PostMasterImpl::ChannelMap::iterator it;
158
159 for( it = pImpl->pChannelMap.begin(); it != pImpl->pChannelMap.end(); ++it )
160 delete it->second;
161
162 pImpl->pChannelMap.clear();
163 return pImpl->pPoller->Finalize();
164 }
165
166 //----------------------------------------------------------------------------
167 // Start the post master
168 //----------------------------------------------------------------------------
170 {
171 if( !pImpl->pInitialized )
172 return false;
173
174 if( !pImpl->pPoller->Start() )
175 return false;
176
177 if( !pImpl->pTaskManager->Start() )
178 {
179 pImpl->pPoller->Stop();
180 return false;
181 }
182
183 if( !pImpl->pJobManager->Start() )
184 {
185 pImpl->pPoller->Stop();
186 pImpl->pTaskManager->Stop();
187 return false;
188 }
189
190 pImpl->pRunning = true;
191 return true;
192 }
193
194 //----------------------------------------------------------------------------
195 // Stop the postmaster
196 //----------------------------------------------------------------------------
198 {
199 if( !pImpl->pInitialized || !pImpl->pRunning )
200 return true;
201
202 if( !pImpl->pJobManager->Stop() )
203 return false;
204 if( !pImpl->pPoller->Stop() )
205 return false;
206 if( !pImpl->pTaskManager->Stop() )
207 return false;
208 pImpl->pRunning = false;
209 return true;
210 }
211
212 //----------------------------------------------------------------------------
213 // Reinitialize after fork
214 //----------------------------------------------------------------------------
216 {
217 return true;
218 }
219
220 //----------------------------------------------------------------------------
221 // Send the message asynchronously
222 //----------------------------------------------------------------------------
224 Message *msg,
225 MsgHandler *handler,
226 bool stateful,
227 time_t expires )
228 {
229 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
230 Channel *channel = GetChannel( url );
231
232 if( !channel )
234
235 return channel->Send( msg, handler, stateful, expires );
236 }
237
239 Message *msg,
240 MsgHandler *inHandler )
241 {
243 VirtualRedirector *redirector = registry.Get( url );
244 if( !redirector )
245 return Status( stError, errInvalidOp );
246 return redirector->HandleRequest( msg, inHandler );
247 }
248
249 //----------------------------------------------------------------------------
250 // Query the transport handler
251 //----------------------------------------------------------------------------
253 uint16_t query,
254 AnyObject &result )
255 {
256 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
257 Channel *channel = 0;
258 {
259 XrdSysMutexHelper scopedLock2( pImpl->pChannelMapMutex );
260 PostMasterImpl::ChannelMap::iterator it =
261 pImpl->pChannelMap.find( url.GetChannelId() );
262 if( it == pImpl->pChannelMap.end() )
263 return Status( stError, errInvalidOp );
264 channel = it->second;
265 }
266
267 if( !channel )
268 return Status( stError, errNotSupported );
269
270 return channel->QueryTransport( query, result );
271 }
272
273 //----------------------------------------------------------------------------
274 // Register channel event handler
275 //----------------------------------------------------------------------------
277 ChannelEventHandler *handler )
278 {
279 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
280 Channel *channel = GetChannel( url );
281
282 if( !channel )
283 return Status( stError, errNotSupported );
284
285 channel->RegisterEventHandler( handler );
286 return Status();
287 }
288
289 //----------------------------------------------------------------------------
290 // Remove a channel event handler
291 //----------------------------------------------------------------------------
293 ChannelEventHandler *handler )
294 {
295 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
296 Channel *channel = GetChannel( url );
297
298 if( !channel )
299 return Status( stError, errNotSupported );
300
301 channel->RemoveEventHandler( handler );
302 return Status();
303 }
304
305 //------------------------------------------------------------------------
306 // Get the task manager object user by the post master
307 //------------------------------------------------------------------------
309 {
310 return pImpl->pTaskManager;
311 }
312
313 //------------------------------------------------------------------------
314 // Get the job manager object user by the post master
315 //------------------------------------------------------------------------
317 {
318 return pImpl->pJobManager;
319 }
320
321 //------------------------------------------------------------------------
322 // Shut down a channel
323 //------------------------------------------------------------------------
325 {
326 return ForceDisconnect(url, false);
327 }
328
329 //------------------------------------------------------------------------
330 // Shut down a channel
331 //------------------------------------------------------------------------
332 Status PostMaster::ForceDisconnect( const URL &url, bool hush )
333 {
334 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock, false );
335 {
336 //--------------------------------------------------------------------
337 // See if this is called by channel replaced by collapse, reaching TTL
338 //--------------------------------------------------------------------
339 PostMasterImpl::CollapsedMap::iterator it =
340 pImpl->pCollapsedMap.find( &url );
341 if( it != pImpl->pCollapsedMap.end() )
342 {
343 Channel *passive = it->second;
344 passive->ForceDisconnect( hush );
345 delete passive;
346 pImpl->pCollapsedMap.erase( it );
347 return Status();
348 }
349 }
350
351 PostMasterImpl::ChannelMap::iterator it =
352 pImpl->pChannelMap.find( url.GetChannelId() );
353
354 if( it == pImpl->pChannelMap.end() )
355 return Status( stError, errInvalidOp );
356
357 it->second->ForceDisconnect( hush );
358 delete it->second;
359 pImpl->pChannelMap.erase( it );
360
361 return Status();
362 }
363
365 {
366 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock, false );
367 PostMasterImpl::ChannelMap::iterator it =
368 pImpl->pChannelMap.find( url.GetChannelId() );
369
370 if( it == pImpl->pChannelMap.end() )
371 return Status( stError, errInvalidOp );
372
373 it->second->ForceReconnect();
374 return Status();
375 }
376
377 //------------------------------------------------------------------------
378 // Get the number of connected data streams
379 //------------------------------------------------------------------------
380 uint16_t PostMaster::NbConnectedStrm( const URL &url )
381 {
382 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
383 Channel *channel = GetChannel( url );
384 if( !channel ) return 0;
385 return channel->NbConnectedStrm();
386 }
387
388 //------------------------------------------------------------------------
390 //------------------------------------------------------------------------
392 std::shared_ptr<Job> onConnJob )
393 {
394 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
395 Channel *channel = GetChannel( url );
396 if( !channel ) return;
397 channel->SetOnDataConnectHandler( onConnJob );
398 }
399
400 //------------------------------------------------------------------------
402 //------------------------------------------------------------------------
403 void PostMaster::SetOnConnectHandler( std::unique_ptr<Job> onConnJob )
404 {
405 XrdSysMutexHelper lck( pImpl->pMtx );
406 pImpl->pOnConnJob = std::move( onConnJob );
407 }
408
409 //------------------------------------------------------------------------
410 // Set the global connection error handler
411 //------------------------------------------------------------------------
412 void PostMaster::SetConnectionErrorHandler( std::function<void( const URL&, const XRootDStatus& )> handler )
413 {
414 XrdSysMutexHelper lck( pImpl->pMtx );
415 pImpl->pOnConnErrCB = std::move( handler );
416 }
417
418 //------------------------------------------------------------------------
419 // Notify the global on-connect handler
420 //------------------------------------------------------------------------
422 {
423 XrdSysMutexHelper lck( pImpl->pMtx );
424 if( pImpl->pOnConnJob )
425 {
426 URL *ptr = new URL( url );
427 pImpl->pJobManager->QueueJob( pImpl->pOnConnJob.get(), ptr );
428 }
429 }
430
431 //------------------------------------------------------------------------
432 // Notify the global error connection handler
433 //------------------------------------------------------------------------
434 void PostMaster::NotifyConnErrHandler( const URL &url, const XRootDStatus &status )
435 {
436 XrdSysMutexHelper lck( pImpl->pMtx );
437 if( pImpl->pOnConnErrCB )
438 {
439 ConnErrJob *job = new ConnErrJob( url, status, pImpl->pOnConnErrCB );
440 pImpl->pJobManager->QueueJob( job, nullptr );
441 }
442 }
443
444 //----------------------------------------------------------------------------
446 //----------------------------------------------------------------------------
447 void PostMaster::CollapseRedirect( const URL &alias, const URL &url )
448 {
449 XrdSysRWLockHelper scopedDiscLock( pImpl->pDisconnectLock );
450 XrdSysMutexHelper scopedMapLock( pImpl->pChannelMapMutex );
451
452 //--------------------------------------------------------------------------
453 // Get the passive channel
454 //--------------------------------------------------------------------------
455 PostMasterImpl::ChannelMap::iterator it =
456 pImpl->pChannelMap.find( alias.GetChannelId() );
457 Channel *passive = 0;
458 if( it != pImpl->pChannelMap.end() )
459 passive = it->second;
460 //--------------------------------------------------------------------------
461 // If the channel does not exist there's nothing to do
462 //--------------------------------------------------------------------------
463 else return;
464
465 //--------------------------------------------------------------------------
466 // Check if this URL is eligible for collapsing
467 //--------------------------------------------------------------------------
468 if( !passive->CanCollapse( url ) ) return;
469
470 //--------------------------------------------------------------------------
471 // Create the active channel
472 //--------------------------------------------------------------------------
474 TransportHandler *trHandler = trManager->GetHandler( url.GetProtocol() );
475
476 if( !trHandler )
477 {
478 Log *log = DefaultEnv::GetLog();
479 log->Error( PostMasterMsg, "Unable to get transport handler for %s "
480 "protocol", url.GetProtocol().c_str() );
481 return;
482 }
483
484 Log *log = DefaultEnv::GetLog();
485 log->Info( PostMasterMsg, "Label channel %s with alias %s.",
486 url.GetHostId().c_str(), alias.GetHostId().c_str() );
487
488 Channel *active = new Channel( alias, pImpl->pPoller, trHandler,
489 pImpl->pTaskManager, pImpl->pJobManager, url );
490 pImpl->pChannelMap[alias.GetChannelId()] = active;
491 pImpl->pCollapsedMap[&passive->GetURL()] = passive;
492
493 //--------------------------------------------------------------------------
494 // The passive channel will be deallocated by TTL
495 //--------------------------------------------------------------------------
496 }
497
498 //------------------------------------------------------------------------
499 // Decrement file object instance count bound to this channel
500 //------------------------------------------------------------------------
502 {
503 XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
504 Channel *channel = GetChannel( url );
505
506 if( !channel ) return;
507
508 return channel->DecFileInstCnt();
509 }
510
511 //------------------------------------------------------------------------
512 //true if underlying threads are running, false otherwise
513 //------------------------------------------------------------------------
515 {
516 return pImpl->pRunning;
517 }
518
519 //----------------------------------------------------------------------------
520 // Get the channel
521 //----------------------------------------------------------------------------
522 Channel *PostMaster::GetChannel( const URL &url )
523 {
524 XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
525 Channel *channel = 0;
526 PostMasterImpl::ChannelMap::iterator it = pImpl->pChannelMap.find( url.GetChannelId() );
527
528 if( it == pImpl->pChannelMap.end() )
529 {
531 TransportHandler *trHandler = trManager->GetHandler( url.GetProtocol() );
532
533 if( !trHandler )
534 {
535 Log *log = DefaultEnv::GetLog();
536 log->Error( PostMasterMsg, "Unable to get transport handler for %s "
537 "protocol", url.GetProtocol().c_str() );
538 return 0;
539 }
540
541 channel = new Channel( url, pImpl->pPoller, trHandler, pImpl->pTaskManager,
542 pImpl->pJobManager );
543 pImpl->pChannelMap[url.GetChannelId()] = channel;
544 }
545 else
546 channel = it->second;
547 return channel;
548 }
549}
A communication channel between the client and the server.
const URL & GetURL() const
Get the URL.
uint16_t NbConnectedStrm()
Get the number of connected data streams.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void DecFileInstCnt()
Decrement file object instance count bound to this channel.
Status ForceDisconnect()
Force disconnect of all streams.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
bool CanCollapse(const URL &url)
Status QueryTransport(uint16_t query, AnyObject &result)
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
A synchronized queue.
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
The message representation used throughout the system.
static Poller * CreatePoller(const std::string &preference)
Interface for socket pollers.
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
bool Start()
Start the post master.
bool Finalize()
Finalizer.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Status ForceReconnect(const URL &url)
Reconnect the channel.
bool Stop()
Stop the postmaster.
bool Reinitialize()
Reinitialize after fork.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
uint16_t NbConnectedStrm(const URL &url)
Get the number of connected data streams.
void SetOnConnectHandler(std::unique_ptr< Job > onConnJob)
Set the global connection error handler.
Status RemoveEventHandler(const URL &url, ChannelEventHandler *handler)
Remove a channel event handler.
virtual ~PostMaster()
Destructor.
PostMaster()
Constructor.
void SetConnectionErrorHandler(std::function< void(const URL &, const XRootDStatus &)> handler)
Set the global on-error on-connect handler for control streams.
Status ForceDisconnect(const URL &url)
Shut down a channel.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
Status RegisterEventHandler(const URL &url, ChannelEventHandler *handler)
Register channel event handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
JobManager * GetJobManager()
Get the job manager object user by the post master.
bool Initialize()
Initializer.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Perform the handshake and the authentication for each physical stream.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
URL representation.
Definition XrdClURL.hh:31
std::string GetChannelId() const
Definition XrdClURL.cc:512
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:99
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:118
An interface for metadata redirectors.
virtual XRootDStatus HandleRequest(const Message *msg, MsgHandler *handler)=0
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t PostMasterMsg
const uint16_t errInvalidOp
const char *const DefaultPollerPreference
const uint16_t errNotSupported
const int DefaultWorkerThreads
void Run(void *arg)
The job logic.
ConnErrJob(const URL &url, const XRootDStatus &status, std::function< void(const URL &, const XRootDStatus &)> handler)
std::function< void(const URL &, const XRootDStatus &)> handler
std::map< std::string, Channel * > ChannelMap
std::unique_ptr< Job > pOnConnJob
std::function< void(const URL &, const XRootDStatus &)> pOnConnErrCB
std::map< const URL *, Channel * > CollapsedMap
Procedure execution status.