XRootD
Loading...
Searching...
No Matches
XrdFrcReqAgent.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d F r c R e q A g e n t . c c */
4/* */
5/* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdio>
32#include <cstdlib>
33#include <cstring>
34#include <strings.h>
35#include <unistd.h>
36#include <fcntl.h>
37#include <sys/types.h>
38#include <sys/stat.h>
39
41#include "XrdFrc/XrdFrcTrace.hh"
42#include "XrdFrc/XrdFrcUtils.hh"
43#include "XrdNet/XrdNetMsg.hh"
44#include "XrdOuc/XrdOucUtils.hh"
47
48using namespace XrdFrc;
49
50/******************************************************************************/
51/* S t a t i c V a r i a b l e s */
52/******************************************************************************/
53
54char *XrdFrcReqAgent::c2sFN = 0;
55
56/******************************************************************************/
57/* C o n s t r u c t o r */
58/******************************************************************************/
59
60XrdFrcReqAgent::XrdFrcReqAgent(const char *Me, int qVal)
61 : Persona(Me),myName(""),theQ(qVal)
62{
63// Set default ping message
64//
65 switch(qVal)
66 {case XrdFrcRequest::getQ: pingMsg = "!<\n"; break;
67 case XrdFrcRequest::migQ: pingMsg = "!&\n"; break;
68 case XrdFrcRequest::stgQ: pingMsg = "!+\n"; break;
69 case XrdFrcRequest::putQ: pingMsg = "!>\n"; break;
70 default: pingMsg = "!\n" ; break;
71 }
72}
73
74/******************************************************************************/
75/* Public: A d d */
76/******************************************************************************/
77
79{
80
81// Complete the request including verifying the priority
82//
83 if (Request.Prty > XrdFrcRequest::maxPrty)
85 else if (Request.Prty < 0)Request.Prty = 0;
86
87// Add time and instance name
88//
89 Request.addTOD = time(0);
90 if (myName) strlcpy(Request.iName, myName, sizeof(Request.iName));
91
92// Now add it to the queue
93//
94 rQueue[static_cast<int>(Request.Prty)]->Add(&Request);
95
96// Now wake the boss
97//
98 Ping();
99}
100
101/******************************************************************************/
102/* Public: D e l */
103/******************************************************************************/
104
106{
107 int i;
108
109// Remove all pending requests for this id
110//
111 for (i = 0; i <= XrdFrcRequest::maxPrty; i++) rQueue[i]->Can(&Request);
112}
113
114/******************************************************************************/
115/* Public: L i s t */
116/******************************************************************************/
117
119{
120 char myLfn[8192];
121 int i, Offs, n = 0;
122
123// List entries in each priority queue
124//
125 for (i = 0; i <= XrdFrcRequest::maxPrty; i++)
126 {Offs = 0;
127 while(rQueue[i]->List(myLfn, sizeof(myLfn), Offs, Items, Num))
128 {std::cout <<myLfn <<std::endl; n++;}
129 }
130// All done
131//
132 return n;
133}
134
135/******************************************************************************/
136
137int XrdFrcReqAgent::List(XrdFrcRequest::Item *Items, int Num, int Prty)
138{
139 char myLfn[8192];
140 int Offs, n = 0;
141
142// List entries in each priority queue
143//
144 if (Prty <= XrdFrcRequest::maxPrty)
145 {Offs = 0;
146 while(rQueue[Prty]->List(myLfn, sizeof(myLfn), Offs, Items, Num))
147 {std::cout <<myLfn <<std::endl; n++;}
148 }
149
150// All done
151//
152 return n;
153}
154
155/******************************************************************************/
156/* Public: N e x t L F N */
157/******************************************************************************/
158
159int XrdFrcReqAgent::NextLFN(char *Buff, int Bsz, int Prty, int &Offs)
160{
161 static XrdFrcRequest::Item Items[1] = {XrdFrcRequest::getLFN};
162
163// Return entry, if it exists
164//
165 return rQueue[Prty]->List(Buff, Bsz, Offs, Items, 1) != 0;
166}
167
168/******************************************************************************/
169/* P i n g */
170/******************************************************************************/
171
172void XrdFrcReqAgent::Ping(const char *Msg)
173{
174 static XrdNetMsg udpMsg(&Say, c2sFN);
175 static int udpOK = 0;
176 struct stat buf;
177
178// Send given message or default message based on our persona
179//
180 if (udpOK || !stat(c2sFN, &buf))
181 {udpMsg.Send(Msg ? Msg : pingMsg); udpOK = 1;}
182}
183
184/******************************************************************************/
185/* S t a r t */
186/******************************************************************************/
187
188int XrdFrcReqAgent::Start(char *aPath, int aMode)
189{
190 XrdFrcRequest Request;
191 const char *myClid;
192 char buff[2048], *qPath;
193 int i;
194
195// Initialize the udp path for pings, if we have not done so
196//
197 if (!c2sFN)
198 {sprintf(buff, "%sxfrd.udp", aPath);
199 c2sFN = strdup(buff);
200 }
201
202// Get the instance name
203//
204 myName = XrdOucUtils::InstName(1);
205
206// Generate the queue directory path
207//
208 if (!(qPath = XrdFrcUtils::makeQDir(aPath, aMode))) return 0;
209
210// Initialize the registration entry and register ourselves
211//
212 if ((myClid = getenv("XRDCMSCLUSTERID")))
213 {int Uid = static_cast<int>(geteuid());
214 int Gid = static_cast<int>(getegid());
215 memset(&Request, 0, sizeof(Request));
216 strlcpy(Request.LFN, myClid, sizeof(Request.LFN));
217 sprintf(Request.User,"%d %d", Uid, Gid);
218 sprintf(Request.ID, "%d", static_cast<int>(getpid()));
219 strlcpy(Request.iName, myName, sizeof(Request.iName));
220 Request.addTOD = time(0);
222 Request.OPc = '@';
223 }
224
225// Initialize the request queues if all went well
226//
227 for (i = 0; i <= XrdFrcRequest::maxPrty; i++)
228 {sprintf(buff, "%s%sQ.%d", qPath, Persona, i);
229 rQueue[i] = new XrdFrcReqFile(buff, 1);
230 if (!rQueue[i]->Init()) return 0;
231 if (myClid) rQueue[i]->Add(&Request);
232 }
233
234// All done
235//
236 if (myClid) Ping();
237 free(qPath);
238 return 1;
239}
XrdOucPup XrdCmsParser::Pup & Say
#define stat(a, b)
Definition XrdPosix.hh:101
size_t strlcpy(char *dst, const char *src, size_t sz)
void Ping(const char *Msg=0)
void Add(XrdFrcRequest &Request)
XrdFrcReqAgent(const char *Me, int qVal)
int Start(char *aPath, int aMode)
void Del(XrdFrcRequest &Request)
int List(XrdFrcRequest::Item *Items, int Num)
int NextLFN(char *Buff, int Bsz, int Prty, int &Offs)
char * List(char *Buff, int bsz, int &Offs, XrdFrcRequest::Item *ITList=0, int ITNum=0)
void Add(XrdFrcRequest *rP)
static const int stgQ
static const int getQ
char LFN[3072]
static const int migQ
static const int putQ
static const int Register
signed char Prty
static const int maxPrty
long long addTOD
static char * makeQDir(const char *Path, int Mode)
int Send(const char *buff, int blen=0, const char *dest=0, int tmo=-1)
Definition XrdNetMsg.cc:70
static const char * InstName(int TranOpt=0)