XRootD
Loading...
Searching...
No Matches
XrdCl::AsyncMsgReader Class Reference

Utility class encapsulating reading response message logic. More...

#include <XrdClAsyncMsgReader.hh>

+ Collaboration diagram for XrdCl::AsyncMsgReader:

Public Member Functions

 AsyncMsgReader (TransportHandler &xrdTransport, Socket &socket, const std::string &strmname, Stream &strm, uint16_t substrmnb)
 
virtual ~AsyncMsgReader ()
 Destructor.
 
XRootDStatus Read ()
 Read out the response from the socket.
 
void Reset ()
 Reset the state of the object (makes it ready to read out next msg)
 

Detailed Description

Utility class encapsulating reading response message logic.

Definition at line 36 of file XrdClAsyncMsgReader.hh.

Constructor & Destructor Documentation

◆ AsyncMsgReader()

XrdCl::AsyncMsgReader::AsyncMsgReader ( TransportHandler & xrdTransport,
Socket & socket,
const std::string & strmname,
Stream & strm,
uint16_t substrmnb )
inline

Constructor

Parameters
xrdTransport: the (xrootd) transport layer
socket: the socket with the message to be read out
strmname: stream name
strm: the stream encapsulating the connection
substrmnb: the substream number

Definition at line 48 of file XrdClAsyncMsgReader.hh.

52 : readstage( ReadStart ),
53 xrdTransport( xrdTransport ),
54 socket( socket ),
55 strmname( strmname ),
56 strm( strm ),
57 substrmnb( substrmnb ),
58 inmsgsize( 0 ),
59 inhandler( nullptr )
60 {
61 }

◆ ~AsyncMsgReader()

virtual XrdCl::AsyncMsgReader::~AsyncMsgReader ( )
inlinevirtual

Destructor.

Definition at line 66 of file XrdClAsyncMsgReader.hh.

66{ }

Member Function Documentation

◆ Read()

XRootDStatus XrdCl::AsyncMsgReader::Read ( )
inline

Read out the response from the socket.

Definition at line 82 of file XrdClAsyncMsgReader.hh.

83 {
84 Log *log = DefaultEnv::GetLog();
85
86 while( true )
87 {
88 switch( readstage )
89 {
90 //------------------------------------------------------------------
91 // There is no incoming message currently being processed so we
92 // create a new one
93 //------------------------------------------------------------------
94 case ReadStart:
95 {
96 inmsg = std::make_shared<Message>();
97 //----------------------------------------------------------------
98 // The next step is to read the header
99 //----------------------------------------------------------------
100 readstage = ReadHeader;
101 continue;
102 }
103 //------------------------------------------------------------------
104 // We need to read the header
105 //------------------------------------------------------------------
106 case ReadHeader:
107 {
108 XRootDStatus st = xrdTransport.GetHeader( *inmsg, &socket );
109 if( !st.IsOK() || st.code == suRetry )
110 return st;
111
112 log->Dump( AsyncSockMsg, "[%s] Received message header for %p size: %d",
113 strmname.c_str(), (void*)inmsg.get(), inmsg->GetCursor() );
114
115 ServerResponse *rsp = (ServerResponse*)inmsg->GetBuffer();
116 if( rsp->hdr.status == kXR_attn )
117 {
118 log->Dump( AsyncSockMsg, "[%s] Will readout the attn action code "
119 "of message %p", strmname.c_str(), (void*)inmsg.get() );
120 inmsg->ReAllocate( 16 ); // header (bytes 8) + action code (8 bytes)
121 readstage = ReadAttn;
122 continue;
123 }
124
125 inmsgsize = inmsg->GetCursor();
126 inhandler = strm.InstallIncHandler( inmsg, substrmnb );
127
128 if( inhandler )
129 {
130 log->Dump( AsyncSockMsg, "[%s] Will use the raw handler to read body "
131 "of message %p", strmname.c_str(), (void*)inmsg.get() );
132 //--------------------------------------------------------------
133 // The next step is to read raw data
134 //--------------------------------------------------------------
135 readstage = ReadRawData;
136 continue;
137 }
138
139 //----------------------------------------------------------------
140 // The next step is to read the message body
141 //----------------------------------------------------------------
142 readstage = ReadMsgBody;
143 continue;
144 }
145 //------------------------------------------------------------------
146 // Before proceeding we need to figure out the attn action code
147 //------------------------------------------------------------------
148 case ReadAttn:
149 {
150 XRootDStatus st = ReadAttnActnum();
151 if( !st.IsOK() || st.code == suRetry )
152 return st;
153
154 //----------------------------------------------------------------
155 // There is an embedded response, overwrite the message with that
156 //----------------------------------------------------------------
157 if( HasEmbeddedRsp() )
158 {
159 inmsg->Free();
160 readstage = ReadHeader;
161 continue;
162 }
163
164 //----------------------------------------------------------------
165 // Readout the rest of the body
166 //----------------------------------------------------------------
167 inmsgsize = inmsg->GetCursor();
168 readstage = ReadMsgBody;
169 continue;
170 }
171 //------------------------------------------------------------------
172 // kXR_status is special as it can have both body and raw data,
173 // handle it separately
174 //------------------------------------------------------------------
175 case ReadMore:
176 {
177 XRootDStatus st = xrdTransport.GetMore( *inmsg, &socket );
178 if( !st.IsOK() || st.code == suRetry )
179 return st;
180 inmsgsize = inmsg->GetCursor();
181
182 //----------------------------------------------------------------
183 // The next step is to finalize the read
184 //----------------------------------------------------------------
185 readstage = ReadDone;
186 continue;
187 }
188 //------------------------------------------------------------------
189 // We need to call a raw message handler to get the data from the
190 // socket
191 //------------------------------------------------------------------
192 case ReadRawData:
193 {
194 uint32_t bytesRead = 0;
195 XRootDStatus st = inhandler->ReadMessageBody( inmsg.get(), &socket, bytesRead );
196 if( !st.IsOK() )
197 return st;
198 inmsgsize += bytesRead;
199 if( st.code == suRetry )
200 return st;
201 //----------------------------------------------------------------
202 // The next step is to finalize the read
203 //----------------------------------------------------------------
204 readstage = ReadDone;
205 continue;
206 }
207 //------------------------------------------------------------------
208 // No raw handler, so we read the message to the buffer
209 //------------------------------------------------------------------
210 case ReadMsgBody:
211 {
212 XRootDStatus st = xrdTransport.GetBody( *inmsg, &socket );
213 if( !st.IsOK() || st.code == suRetry )
214 return st;
215 inmsgsize = inmsg->GetCursor();
216
217
218 //----------------------------------------------------------------
219 // kXR_status response needs special handling as it can have
220 // either (body + raw data) or (body + additional body data)
221 //----------------------------------------------------------------
222 if( IsStatusRsp() )
223 {
224 uint16_t action = strm.InspectStatusRsp( substrmnb,
225 inhandler );
226
227 if( action & MsgHandler::Corrupted )
228 return XRootDStatus( stError, errCorruptedHeader );
229
230 if( action & MsgHandler::Raw )
231 {
232 //--------------------------------------------------------------
233 // The next step is to read the raw data
234 //--------------------------------------------------------------
235 readstage = ReadRawData;
236 continue;
237 }
238
239 if( action & MsgHandler::More )
240 {
241
242 //--------------------------------------------------------------
243 // The next step is to read the additional data in the message
244 // body
245 //--------------------------------------------------------------
246 readstage = ReadMore;
247 continue;
248 }
249 }
250
251 //----------------------------------------------------------------
252 // The next step is to finalize the read
253 //----------------------------------------------------------------
254 readstage = ReadDone;
255 continue;
256 }
257
258 case ReadDone:
259 {
260 //----------------------------------------------------------------
261 // Report the incoming message
262 //----------------------------------------------------------------
263 log->Dump( AsyncSockMsg, "[%s] Received message %p of %d bytes",
264 strmname.c_str(), (void*)inmsg.get(), inmsgsize );
265
266 strm.OnIncoming( substrmnb, std::move( inmsg ), inmsgsize );
267 }
268 }
269 // just in case
270 break;
271 }
272
273 //----------------------------------------------------------------------
274 // We are done
275 //----------------------------------------------------------------------
276 return XRootDStatus();
277 }
@ kXR_attn
Definition XProtocol.hh:901
ServerResponseHeader hdr
static Log * GetLog()
Get default log.
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
@ More
there are more (non-raw) data to be read
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
virtual XRootDStatus GetBody(Message &message, Socket *socket)=0
virtual XRootDStatus GetHeader(Message &message, Socket *socket)=0
virtual XRootDStatus GetMore(Message &message, Socket *socket)=0
const uint16_t suRetry
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t AsyncSockMsg
const uint16_t errCorruptedHeader
XrdSysError Log
Definition XrdConfig.cc:113

References XrdCl::AsyncSockMsg, XrdCl::Status::code, XrdCl::MsgHandler::Corrupted, XrdCl::Log::Dump(), XrdCl::errCorruptedHeader, XrdCl::TransportHandler::GetBody(), XrdCl::TransportHandler::GetHeader(), XrdCl::DefaultEnv::GetLog(), XrdCl::TransportHandler::GetMore(), ServerResponse::hdr, XrdCl::Stream::InspectStatusRsp(), XrdCl::Stream::InstallIncHandler(), XrdCl::Status::IsOK(), kXR_attn, XrdCl::MsgHandler::More, XrdCl::Stream::OnIncoming(), XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::ReadMessageBody(), ServerResponseHeader::status, XrdCl::stError, and XrdCl::suRetry.

+ Here is the call graph for this function:

◆ Reset()

void XrdCl::AsyncMsgReader::Reset ( )
inline

Reset the state of the object (makes it ready to read out next msg)

Definition at line 71 of file XrdClAsyncMsgReader.hh.

72 {
73 readstage = ReadStart;
74 inmsg.reset();
75 inmsgsize = 0;
76 inhandler = nullptr;
77 }

The documentation for this class was generated from the following file: