64 env->
GetInt(
"WorkerThreads", workerThreads );
124 env->
GetString(
"PollerPreference", pollerPref );
128 if( !pImpl->pPoller )
131 bool st = pImpl->pPoller->Initialize();
135 delete pImpl->pPoller;
139 pImpl->pJobManager->Initialize();
140 pImpl->pInitialized =
true;
152 if( !pImpl->pInitialized )
155 pImpl->pInitialized =
false;
156 pImpl->pJobManager->Finalize();
157 PostMasterImpl::ChannelMap::iterator it;
159 for( it = pImpl->pChannelMap.begin(); it != pImpl->pChannelMap.end(); ++it )
162 pImpl->pChannelMap.clear();
163 return pImpl->pPoller->Finalize();
171 if( !pImpl->pInitialized )
174 if( !pImpl->pPoller->Start() )
177 if( !pImpl->pTaskManager->Start() )
179 pImpl->pPoller->Stop();
183 if( !pImpl->pJobManager->Start() )
185 pImpl->pPoller->Stop();
186 pImpl->pTaskManager->Stop();
190 pImpl->pRunning =
true;
199 if( !pImpl->pInitialized || !pImpl->pRunning )
202 if( !pImpl->pJobManager->Stop() )
204 if( !pImpl->pPoller->Stop() )
206 if( !pImpl->pTaskManager->Stop() )
208 pImpl->pRunning =
false;
230 Channel *channel = GetChannel( url );
235 return channel->
Send( msg, handler, stateful, expires );
260 PostMasterImpl::ChannelMap::iterator it =
262 if( it == pImpl->pChannelMap.end() )
264 channel = it->second;
280 Channel *channel = GetChannel( url );
296 Channel *channel = GetChannel( url );
310 return pImpl->pTaskManager;
318 return pImpl->pJobManager;
339 PostMasterImpl::CollapsedMap::iterator it =
340 pImpl->pCollapsedMap.find( &url );
341 if( it != pImpl->pCollapsedMap.end() )
346 pImpl->pCollapsedMap.erase( it );
351 PostMasterImpl::ChannelMap::iterator it =
354 if( it == pImpl->pChannelMap.end() )
357 it->second->ForceDisconnect( hush );
359 pImpl->pChannelMap.erase( it );
367 PostMasterImpl::ChannelMap::iterator it =
370 if( it == pImpl->pChannelMap.end() )
373 it->second->ForceReconnect();
383 Channel *channel = GetChannel( url );
384 if( !channel )
return 0;
392 std::shared_ptr<Job> onConnJob )
395 Channel *channel = GetChannel( url );
396 if( !channel )
return;
406 pImpl->pOnConnJob = std::move( onConnJob );
415 pImpl->pOnConnErrCB = std::move( handler );
424 if( pImpl->pOnConnJob )
426 URL *ptr =
new URL( url );
427 pImpl->pJobManager->QueueJob( pImpl->pOnConnJob.get(), ptr );
437 if( pImpl->pOnConnErrCB )
440 pImpl->pJobManager->QueueJob( job,
nullptr );
455 PostMasterImpl::ChannelMap::iterator it =
458 if( it != pImpl->pChannelMap.end() )
459 passive = it->second;
489 pImpl->pTaskManager, pImpl->pJobManager, url );
491 pImpl->pCollapsedMap[&passive->
GetURL()] = passive;
504 Channel *channel = GetChannel( url );
506 if( !channel )
return;
516 return pImpl->pRunning;
522 Channel *PostMaster::GetChannel(
const URL &url )
526 PostMasterImpl::ChannelMap::iterator it = pImpl->pChannelMap.find( url.
GetChannelId() );
528 if( it == pImpl->pChannelMap.end() )
541 channel =
new Channel( url, pImpl->pPoller, trHandler, pImpl->pTaskManager,
542 pImpl->pJobManager );
546 channel = it->second;
A communication channel between the client and the server.
const URL & GetURL() const
Get the URL.
uint16_t NbConnectedStrm()
Get the number of connected data streams.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void DecFileInstCnt()
Decrement file object instance count bound to this channel.
Status ForceDisconnect()
Force disconnect of all streams.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
bool CanCollapse(const URL &url)
Status QueryTransport(uint16_t query, AnyObject &result)
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
bool GetInt(const std::string &key, int &value)
Interface for a job to be run by the job manager.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Info(uint64_t topic, const char *format,...)
Print an info.
The message representation used throughout the system.
static Poller * CreatePoller(const std::string &preference)
Interface for socket pollers.
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
bool Start()
Start the post master.
bool Finalize()
Finalizer.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Status ForceReconnect(const URL &url)
Reconnect the channel.
bool Stop()
Stop the postmaster.
bool Reinitialize()
Reinitialize after fork.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
uint16_t NbConnectedStrm(const URL &url)
Get the number of connected data streams.
void SetOnConnectHandler(std::unique_ptr< Job > onConnJob)
Set the global connection error handler.
Status RemoveEventHandler(const URL &url, ChannelEventHandler *handler)
Remove a channel event handler.
virtual ~PostMaster()
Destructor.
void SetConnectionErrorHandler(std::function< void(const URL &, const XRootDStatus &)> handler)
Set the global on-error on-connect handler for control streams.
Status ForceDisconnect(const URL &url)
Shut down a channel.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
Status RegisterEventHandler(const URL &url, ChannelEventHandler *handler)
Register channel event handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
JobManager * GetJobManager()
Get the job manager object user by the post master.
bool Initialize()
Initializer.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Perform the handshake and the authentication for each physical stream.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
std::string GetChannelId() const
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
const std::string & GetProtocol() const
Get the protocol.
An interface for metadata redirectors.
virtual XRootDStatus HandleRequest(const Message *msg, MsgHandler *handler)=0
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t PostMasterMsg
const uint16_t errInvalidOp
const char *const DefaultPollerPreference
const uint16_t errNotSupported
const int DefaultWorkerThreads
void Run(void *arg)
The job logic.
ConnErrJob(const URL &url, const XRootDStatus &status, std::function< void(const URL &, const XRootDStatus &)> handler)
std::function< void(const URL &, const XRootDStatus &)> handler
TaskManager * pTaskManager
std::map< std::string, Channel * > ChannelMap
std::unique_ptr< Job > pOnConnJob
XrdSysRWLock pDisconnectLock
XrdSysMutex pChannelMapMutex
std::function< void(const URL &, const XRootDStatus &)> pOnConnErrCB
std::map< const URL *, Channel * > CollapsedMap
CollapsedMap pCollapsedMap
Procedure execution status.