XRootD
Loading...
Searching...
No Matches
XrdSendQ.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d S e n d Q . c c */
4/* */
5/* (c) 2016 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdio>
32#include <cstring>
33#include <unistd.h>
34#include <sys/uio.h>
35
36#include "Xrd/XrdLink.hh"
37#include "Xrd/XrdScheduler.hh"
38#include "Xrd/XrdSendQ.hh"
39
40#include "XrdSys/XrdSysError.hh"
42
43/******************************************************************************/
44/* L o c a l C l a s s e s */
45/******************************************************************************/
46
47class LinkShutdown : public XrdJob
48{
49public:
50
51virtual void DoIt() {myLink->Shutdown(true);
52 myLink->setRef(-1);
53 delete this;
54 }
55
57 : XrdJob("SendQ Shutdown"), myLink(link) {}
58
59virtual ~LinkShutdown() {}
60
61private:
62
63XrdLink *myLink;
64};
65
66
67/******************************************************************************/
68/* G l o b a l O b j e c t s */
69/******************************************************************************/
70
71namespace XrdGlobal
72{
73extern XrdSysError Log;
74extern XrdScheduler Sched;
75};
76
77using namespace XrdGlobal;
78
79/******************************************************************************/
80/* S t a t i c O b j e c t s */
81/******************************************************************************/
82
83unsigned int XrdSendQ::qWarn = 3;
84unsigned int XrdSendQ::qMax = 0xffffffff;
85bool XrdSendQ::qPerm = false;
86
87/******************************************************************************/
88/* C o n s t r u c t o r */
89/******************************************************************************/
90
92 : XrdJob("sendQ runner"),
93 mLink(lP), wMutex(mP),
94 fMsg(0), lMsg(0), delQ(0), theFD(lP.FDnum()),
95 inQ(0), qWmsg(qWarn), discards(0),
96 active(false), terminate(false) {}
97
98/******************************************************************************/
99/* D o I t */
100/******************************************************************************/
101
103{
104 mBuff *theMsg;
105 int myFD, rc;
106 bool theEnd;
107
108// Obtain the lock
109//
110 wMutex.Lock();
111
112// Before we start check if we should delete any messages
113//
114 if (delQ) {RelMsgs(delQ); delQ = 0;}
115
116// Send all queued messages (we can use a blocking send here)
117//
118 while(!terminate && (theMsg = fMsg))
119 {if (!(fMsg = fMsg->next)) lMsg = 0;
120 inQ--; myFD = theFD;
121 wMutex.UnLock();
122 rc = send(myFD, theMsg->mData, theMsg->mLen, 0);
123 free(theMsg);
124 wMutex.Lock();
125 if (rc < 0) {Scuttle(); break;}
126 }
127
128// Before we exit check if we should delete any messages
129//
130 if (delQ) {RelMsgs(delQ); delQ = 0;}
131 if ((theEnd = terminate) && fMsg) RelMsgs(fMsg);
132 active = false;
133 qWmsg = qWarn;
134
135// Release any messages that need to be released. Note that we may have been
136// deleted at this point so we cannot reference anything via "this" once we
137// unlock the mutex. We may also need to delete ourselves.
138//
139 wMutex.UnLock();
140 if (theEnd) delete this;
141}
142
143/******************************************************************************/
144/* Private: Q M s g */
145/******************************************************************************/
146
147bool XrdSendQ::QMsg(XrdSendQ::mBuff *theMsg)
148{
149// Check if we reached the max number of messages
150//
151 if (inQ >= qMax)
152 {discards++;
153 if ((discards & 0xff) == 0x01)
154 {char qBuff[80];
155 snprintf(qBuff, sizeof(qBuff),
156 "%u) reached; %hu message(s) discarded!", qMax, discards);
157 Log.Emsg("SendQ", mLink.Host(),
158 "appears to be slow; queue limit (", qBuff);
159 }
160 return false;
161 }
162
163// Add the message at the end of the queue
164//
165 theMsg->next = 0;
166 if (lMsg) lMsg->next = theMsg;
167 else fMsg = theMsg;
168 lMsg = theMsg;
169 inQ++;
170
171// If there is no active thread handling this queue, schedule one
172//
173 if (!active)
174 {Sched.Schedule((XrdJob *)this);
175 active = true;
176 }
177
178// Check if we should issue a warning.
179//
180 if (inQ >= qWmsg)
181 {char qBuff[32];
182 qWmsg += qWarn;
183 snprintf(qBuff, sizeof(qBuff), "%ud messages queued!", inQ);
184 Log.Emsg("SendQ", mLink.Host(), "appears to be slow;", qBuff);
185 } else {
186 if (inQ < qWarn && qWmsg != qWarn) qWmsg = qWarn;
187 }
188
189// All done
190//
191 return true;
192}
193
194/******************************************************************************/
195/* Private: R e l M s g s */
196/******************************************************************************/
197
198void XrdSendQ::RelMsgs(XrdSendQ::mBuff *mP)
199{
200 mBuff *freeMP;
201
202 while((freeMP = mP))
203 {mP = mP->next;
204 free(freeMP);
205 }
206}
207
208/******************************************************************************/
209/* Private: S c u t t l e */
210/******************************************************************************/
211
212void XrdSendQ::Scuttle() // qMutex must be locked!
213{
214// Simply move any outsanding messages to the deletion queue
215//
216 if (fMsg)
217 {lMsg->next = delQ;
218 delQ = fMsg;
219 fMsg = lMsg = 0;
220 inQ = 0;
221 }
222}
223
224/******************************************************************************/
225/* S e n d */
226/******************************************************************************/
227
228// Called with wMutex locked.
229
230int XrdSendQ::Send(const char *buff, int blen)
231{
232 mBuff *theMsg;
233 int bleft, bsent;
234
235// If there is an active thread handling messages then we have to queue it.
236// Otherwise try to send it. We need to hold the lock here to prevent messing
237// up the message is only part of it could be sent. This is a non-blocking call.
238//
239 if (active) bleft = blen;
240 else if ((bleft = SendNB(buff, blen)) <= 0) return (bleft ? -1 : blen);
241
242// Allocate buffer for the message
243//
244 if (!(theMsg = (mBuff *)malloc(sizeof(mBuff) + bleft)))
245 {errno = ENOMEM; return -1;}
246
247// Copy the unsent message fragment
248//
249 bsent = blen - bleft;
250 memcpy(theMsg->mData, buff+bsent, bleft);
251 theMsg->mLen = bleft;
252
253// Queue the message.
254//
255 return (QMsg(theMsg) ? blen : -1);
256}
257
258/******************************************************************************/
259
260// Called with wMutex locked.
261
262int XrdSendQ::Send(const struct iovec *iov, int iovcnt, int iotot)
263{
264 mBuff *theMsg;
265 char *body;
266 int bleft, bmore, iovX;
267
268// If there is an active thread handling messages then we have to queue it.
269// Otherwise try to send it. We need to hold the lock here to prevent messing
270// up the message is only part of it could be sent. This is a non-blocking call.
271//
272 if (active)
273 {bleft = 0;
274 for (iovX = 0; iovX < iovcnt; iovX++)
275 if ((bleft = iov[iovX].iov_len)) break;
276 if (!bleft) return iotot;
277 } else {
278 if ((bleft = SendNB(iov, iovcnt, iotot, iovX)) <= 0)
279 return (bleft ? -1 : 0);
280 }
281
282// Readjust the total amount not sent based on where we stopped in the iovec.
283//
284 bmore = bleft;
285 for (int i = iovX+1; i < iovcnt; i++) bmore += iov[i].iov_len;
286
287// Copy the unsent message (for simplicity we will copy the whole iovec stop).
288//
289 if (!(theMsg = (mBuff *)malloc(bmore+sizeof(mBuff))))
290 {errno = ENOMEM; return -1;}
291
292// Setup the message length
293//
294 theMsg->mLen = bmore;
295
296// Copy the first fragment (it cannot be zero length)
297//
298 body = theMsg->mData;
299 memcpy(body, ((char *)iov[iovX].iov_base)+(iov[iovX].iov_len-bleft), bleft);
300 body += bleft;
301
302// All remaining items
303//
304 for (int i = iovX+1; i < iovcnt; i++)
305 {if (iov[i].iov_len)
306 {memcpy(body, iov[i].iov_base, iov[i].iov_len);
307 body += iov[i].iov_len;
308 }
309 }
310
311// Queue the message.
312//
313 return (QMsg(theMsg) ? iotot : 0);
314}
315
316/******************************************************************************/
317/* S e n d N B */
318/******************************************************************************/
319
320// Called with wMutex locked.
321
322int XrdSendQ::SendNB(const char *Buff, int Blen)
323{
324#if !defined(__linux__)
325 return -1;
326#else
327 ssize_t retc = 0, bytesleft = Blen;
328
329// Write the data out
330//
331 while(bytesleft)
332 {do {retc = send(theFD, Buff, bytesleft, MSG_DONTWAIT);}
333 while(retc < 0 && errno == EINTR);
334 if (retc <= 0) break;
335 bytesleft -= retc; Buff += retc;
336 }
337
338// All done
339//
340 if (retc <= 0)
341 {if (!retc || errno == EAGAIN || retc == EWOULDBLOCK) return bytesleft;
342 Log.Emsg("SendQ", errno, "send to", mLink.ID);
343 return -1;
344 }
345 return bytesleft;
346#endif
347}
348
349/******************************************************************************/
350
351// Called with wMutex locked.
352
353int XrdSendQ::SendNB(const struct iovec *iov, int iocnt, int bytes, int &iovX)
354{
355
356#if !defined(__linux__)
357 return -1;
358#else
359 char *msgP;
360 ssize_t retc;
361 int msgL, msgF = MSG_DONTWAIT|MSG_MORE, ioLast = iocnt-1;
362
363// Write the data out. The following code only works in Linux as we use the
364// new POSIX flags deined for send() which currently is only implemented in
365// Linux. This allows us to selectively use non-blocking I/O.
366//
367 for (iovX = 0; iovX < iocnt; iovX++)
368 {msgP = (char *)iov[iovX].iov_base;
369 msgL = iov[iovX].iov_len;
370 if (iovX == ioLast) msgF &= ~MSG_MORE;
371 while(msgL)
372 {do {retc = send(theFD, msgP, msgL, msgF);}
373 while(retc < 0 && errno == EINTR);
374 if (retc <= 0)
375 {if (!retc || errno == EAGAIN || retc == EWOULDBLOCK)
376 return msgL;
377 Log.Emsg("SendQ", errno, "send to", mLink.ID);
378 return -1;
379 }
380 msgL -= retc;
381 }
382 }
383
384// All done
385//
386 return 0;
387#endif
388}
389
390/******************************************************************************/
391/* T e r m i n a t e */
392/******************************************************************************/
393
394// This must be called with wMutex locked!
395
397{
398// First step is to see if we need to schedule a shutdown prior to quiting
399//
400 if (lP) Sched.Schedule((XrdJob *)new LinkShutdown(lP));
401
402// If there is an active thread then we need to let the thread handle the
403// termination of this object. Otherwise, we can do it now.
404//
405 if (active)
406 {Scuttle();
407 terminate = true;
408 theFD =-1;
409 } else {
410 if (fMsg) {RelMsgs(fMsg); fMsg = lMsg = 0;}
411 if (delQ) {RelMsgs(delQ); delQ = 0;}
412 delete this;
413 }
414}
virtual ~LinkShutdown()
Definition XrdSendQ.cc:59
virtual void DoIt()
Definition XrdSendQ.cc:51
LinkShutdown(XrdLink *link)
Definition XrdSendQ.cc:56
void Schedule(XrdJob *jp)
XrdSendQ(XrdLink &lP, XrdSysMutex &mP)
Definition XrdSendQ.cc:91
void Terminate(XrdLink *lP=0)
Definition XrdSendQ.cc:396
int Send(const char *buff, int blen)
Definition XrdSendQ.cc:230
virtual void DoIt()
Definition XrdSendQ.cc:102
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysError Log
Definition XrdConfig.cc:113
XrdScheduler Sched
Definition XrdLinkCtl.cc:54