XRootD
Loading...
Searching...
No Matches
XrdClInQueue.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
20#include "XrdCl/XrdClInQueue.hh"
22#include "XrdCl/XrdClMessage.hh"
23#include "XrdCl/XrdClLog.hh"
26
27#include <arpa/inet.h> // for network unmarshalling stuff
28
29namespace XrdCl
30{
31 //----------------------------------------------------------------------------
32 // Filter messages
33 //----------------------------------------------------------------------------
34 bool InQueue::DiscardMessage( Message& msg, uint16_t& sid) const
35 {
36 if( msg.GetSize() < 8 )
37 return true;
38
39 ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();
40
41 // We only care about async responses, but those are extracted now
42 // in the SocketHandler
43 if( rsp->hdr.status == kXR_attn )
44 return true;
45 else
46 sid = ((uint16_t)rsp->hdr.streamid[1] << 8) | (uint16_t)rsp->hdr.streamid[0];
47
48 return false;
49 }
50
51 //----------------------------------------------------------------------------
52 // Add a listener that should be notified about incoming messages
53 //----------------------------------------------------------------------------
54 void InQueue::AddMessageHandler( MsgHandler *handler, bool &rmMsg )
55 {
56 uint16_t handlerSid = handler->GetSid();
57 XrdSysMutexHelper scopedLock( pMutex );
58
59 pHandlers[handlerSid] = HandlerAndExpire( handler, 0 );
60 }
61
62 //----------------------------------------------------------------------------
63 // Get a message handler interested in receiving message whose header
64 // is stored in msg
65 //----------------------------------------------------------------------------
66 MsgHandler *InQueue::GetHandlerForMessage( std::shared_ptr<Message> &msg,
67 time_t &expires,
68 uint16_t &action )
69 {
70 time_t exp = 0;
71 uint16_t act = 0;
72 uint16_t msgSid = 0;
73 MsgHandler* handler = 0;
74
75 if (DiscardMessage(*msg, msgSid))
76 {
77 return handler;
78 }
79
80 XrdSysMutexHelper scopedLock( pMutex );
81 HandlerMap::iterator it = pHandlers.find(msgSid);
82
83 if (it != pHandlers.end())
84 {
85 Log *log = DefaultEnv::GetLog();
86 handler = it->second.first;
87 act = handler->Examine( msg );
88 if( it->second.second == 0 ) {
89 it->second.second = handler->GetExpiration();
90 log->Debug( ExDbgMsg, "[handler: %p] Assigned expiration %lld.",
91 (void*)handler, (long long)it->second.second );
92 }
93 exp = it->second.second;
94 log->Debug( ExDbgMsg, "[msg: %p] Assigned MsgHandler: %p.",
95 (void*)msg.get(), (void*)handler );
96
97
99 {
100 pHandlers.erase( it );
101 log->Debug( ExDbgMsg, "[handler: %p] Removed MsgHandler: %p from the in-queue.",
102 (void*)handler, (void*)handler );
103 }
104 }
105
106 if( handler )
107 {
108 expires = exp;
109 action = act;
110 }
111
112 return handler;
113 }
114
115 //----------------------------------------------------------------------------
116 // Re-insert the handler without scanning the cached messages
117 //----------------------------------------------------------------------------
119 time_t expires )
120 {
121 uint16_t handlerSid = handler->GetSid();
122 XrdSysMutexHelper scopedLock( pMutex );
123 pHandlers[handlerSid] = HandlerAndExpire( handler, expires );
124 }
125
126 //----------------------------------------------------------------------------
127 // Remove a listener
128 //----------------------------------------------------------------------------
130 {
131 uint16_t handlerSid = handler->GetSid();
132 XrdSysMutexHelper scopedLock( pMutex );
133 pHandlers.erase(handlerSid);
134 Log *log = DefaultEnv::GetLog();
135 log->Debug( ExDbgMsg, "[handler: %p] Removed MsgHandler: %p from the in-queue.",
136 (void*)handler, (void*)handler );
137
138 }
139
140 //----------------------------------------------------------------------------
141 // Report an event to the handlers
142 //----------------------------------------------------------------------------
144 XRootDStatus status )
145 {
146 uint8_t action = 0;
147 XrdSysMutexHelper scopedLock( pMutex );
148 for( HandlerMap::iterator it = pHandlers.begin(); it != pHandlers.end(); )
149 {
150 action = it->second.first->OnStreamEvent( event, status );
151
152 if( action & MsgHandler::RemoveHandler )
153 {
154 auto next = it; ++next;
155 pHandlers.erase( it );
156 it = next;
157 }
158 else
159 ++it;
160 }
161 }
162
163 //----------------------------------------------------------------------------
164 // Timeout handlers
165 //----------------------------------------------------------------------------
166 void InQueue::ReportTimeout( time_t now )
167 {
168 if( !now )
169 now = ::time(0);
170
171 XrdSysMutexHelper scopedLock( pMutex );
172 HandlerMap::iterator it = pHandlers.begin();
173 while( it != pHandlers.end() )
174 {
175 if( it->second.second && it->second.second <= now )
176 {
177 uint8_t act = it->second.first->OnStreamEvent( MsgHandler::Timeout,
179 auto next = it; ++next;
180 if( act & MsgHandler::RemoveHandler )
181 pHandlers.erase( it );
182 it = next;
183 }
184 else
185 ++it;
186 }
187 }
188
189 //----------------------------------------------------------------------------
190 // Query the handler and extract the expiration time
191 //----------------------------------------------------------------------------
193 {
194 uint16_t handlerSid = handler->GetSid();
195 XrdSysMutexHelper scopedLock( pMutex );
196 HandlerMap::iterator it = pHandlers.find( handlerSid );
197 if( it != pHandlers.end() )
198 {
199 if( it->second.second == 0 )
200 {
201 it->second.second = handler->GetExpiration();
202
203 Log *log = DefaultEnv::GetLog();
204 log->Debug( ExDbgMsg, "[handler: %p] Assigned expiration %lld.",
205 (void*)handler, (long long)it->second.second );
206
207 }
208 }
209 }
210
211}
kXR_char streamid[2]
Definition XProtocol.hh:914
@ kXR_attn
Definition XProtocol.hh:901
ServerResponseHeader hdr
static Log * GetLog()
Get default log.
void ReportTimeout(time_t now=0)
Timeout handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
void AssignTimeout(MsgHandler *handler)
void AddMessageHandler(MsgHandler *handler, bool &rmMsg)
Handle diagnostics.
Definition XrdClLog.hh:101
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
virtual uint16_t Examine(std::shared_ptr< Message > &msg)=0
virtual uint16_t GetSid() const =0
virtual time_t GetExpiration()=0
StreamEvent
Events that may have occurred to the stream.
@ Timeout
The declared timeout has occurred.
const uint16_t errOperationExpired
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t ExDbgMsg
Procedure execution status.