XRootD
Loading...
Searching...
No Matches
XrdClHttpFilePlugIn.cc
Go to the documentation of this file.
1
6
7#include <unistd.h>
8
9#include <cassert>
10
14#include "XrdCl/XrdClLog.hh"
15#include "XrdCl/XrdClStatus.hh"
16
17#include "XrdOuc/XrdOucCRC.hh"
18
19namespace {
20
21int MakePosixOpenFlags(XrdCl::OpenFlags::Flags flags) {
22 int posix_flags = 0;
23 if (flags & XrdCl::OpenFlags::New) {
24 posix_flags |= O_CREAT | O_EXCL;
25 }
26 if (flags & XrdCl::OpenFlags::Delete) {
27 posix_flags |= O_CREAT | O_TRUNC;
28 }
29 if (flags & XrdCl::OpenFlags::Read) {
30 posix_flags |= O_RDONLY;
31 }
32 if (flags & XrdCl::OpenFlags::Write) {
33 posix_flags |= O_WRONLY;
34 }
35 if (flags & XrdCl::OpenFlags::Update) {
36 posix_flags |= O_RDWR;
37 }
38 return posix_flags;
39}
40
41} // namespace
42
43namespace XrdCl {
44
45Davix::Context *root_davix_context_ = NULL;
46Davix::DavPosix *root_davix_client_file_ = NULL;
47
49 : davix_fd_(nullptr),
50 curr_offset(0),
51 is_open_(false),
52 filesize(0),
53 url_(),
54 properties_(),
55 logger_(DefaultEnv::GetLog()) {
56 SetUpLogging(logger_);
57 logger_->Debug(kLogXrdClHttp, "HttpFilePlugin constructed.");
58
59 std::string origin = getenv("XRDXROOTD_PROXY")? getenv("XRDXROOTD_PROXY") : "";
60 if ( origin.empty() || origin.find("=") == 0) {
61 davix_context_ = new Davix::Context();
62 davix_client_ = new Davix::DavPosix(davix_context_);
63 }
64 else {
65 if (root_davix_context_ == NULL) {
66 root_davix_context_ = new Davix::Context();
67 if (getenv("DAVIX_LOAD_GRID_MODULE_IN_XRD"))
68 root_davix_context_->loadModule("grid");
69 root_davix_client_file_ = new Davix::DavPosix(root_davix_context_);
70 }
71 davix_context_ = root_davix_context_;
72 davix_client_ = root_davix_client_file_;
73 }
74
75}
76
78 if (root_davix_context_ == NULL) {
79 delete davix_client_;
80 delete davix_context_;
81 }
82}
83
84XRootDStatus HttpFilePlugIn::Open(const std::string &url,
85 OpenFlags::Flags flags, Access::Mode /*mode*/,
86 ResponseHandler *handler, uint16_t timeout) {
87 if (is_open_) {
88 logger_->Error(kLogXrdClHttp, "URL %s already open", url.c_str());
90 }
91
92 if (XrdCl::URL(url).GetProtocol().find("https") == 0)
93 isChannelEncrypted = true;
94 else
95 isChannelEncrypted = false;
96
97 avoid_pread_ = false;
98 if (getenv(HTTP_FILE_PLUG_IN_AVOIDRANGE_ENV) != NULL)
99 avoid_pread_ = true;
100 else {
102 auto search = CGIs.find(HTTP_FILE_PLUG_IN_AVOIDRANGE_CGI);
103 if (search != CGIs.end())
104 avoid_pread_ = true;
105 }
106
107 Davix::RequestParams params;
108 if (timeout != 0) {
109 struct timespec ts = {timeout, 0};
110 params.setOperationTimeout(&ts);
111 }
112
114 auto full_path = XrdCl::URL(url).GetLocation();
115 auto pos = full_path.find_last_of('/');
116 auto base_dir =
117 pos != std::string::npos ? full_path.substr(0, pos) : full_path;
118 auto mkdir_status =
119 Posix::MkDir(*davix_client_, base_dir, XrdCl::MkDirFlags::MakePath,
120 XrdCl::Access::None, timeout);
121 if (mkdir_status.IsError()) {
122 logger_->Error(kLogXrdClHttp,
123 "Could not create parent directories when opening: %s",
124 url.c_str());
125 return mkdir_status;
126 }
127 }
128
129 if (((flags & OpenFlags::Write) || (flags & OpenFlags::Update)) &&
130 (flags & OpenFlags::Delete)) {
131 auto stat_info = new StatInfo();
132 auto status = Posix::Stat(*davix_client_, url, timeout, stat_info);
133 if (status.IsOK()) {
134 auto unlink_status = Posix::Unlink(*davix_client_, url, timeout);
135 if (unlink_status.IsError()) {
136 logger_->Error(
138 "Could not delete existing destination file: %s. Error: %s",
139 url.c_str(), unlink_status.GetErrorMessage().c_str());
140 return unlink_status;
141 }
142 }
143 delete stat_info;
144 }
145 else if (flags & OpenFlags::Read) {
146 auto stat_info = new StatInfo();
147 auto status = Posix::Stat(*davix_client_, url, timeout, stat_info);
148 if (status.IsOK()) {
149 filesize = stat_info->GetSize();
150 }
151 delete stat_info;
152 }
153
154 auto posix_open_flags = MakePosixOpenFlags(flags);
155
156 logger_->Debug(kLogXrdClHttp,
157 "Open: URL: %s, XRootD flags: %d, POSIX flags: %d",
158 url.c_str(), flags, posix_open_flags);
159
160 // res == std::pair<fd, XRootDStatus>
161 auto res = Posix::Open(*davix_client_, url, posix_open_flags, timeout);
162 if (!res.first) {
163 logger_->Error(kLogXrdClHttp, "Could not open: %s, error: %s", url.c_str(),
164 res.second.ToStr().c_str());
165 return res.second;
166 }
167
168 davix_fd_ = res.first;
169
170 logger_->Debug(kLogXrdClHttp, "Opened: %s", url.c_str());
171
172 is_open_ = true;
173 url_ = url;
174
175 auto status = new XRootDStatus();
176 handler->HandleResponse(status, nullptr);
177
178 return XRootDStatus();
179}
180
182 uint16_t /*timeout*/) {
183 if (!is_open_) {
184 logger_->Error(kLogXrdClHttp,
185 "Cannot close. URL hasn't been previously opened");
187 }
188
189 logger_->Debug(kLogXrdClHttp, "Closing davix fd: %p", (void*)davix_fd_);
190
191 auto status = Posix::Close(*davix_client_, davix_fd_);
192 if (status.IsError()) {
193 logger_->Error(kLogXrdClHttp, "Could not close davix fd: %p, error: %s",
194 (void*)davix_fd_, status.ToStr().c_str());
195 return status;
196 }
197
198 is_open_ = false;
199 url_.clear();
200
201 handler->HandleResponse(new XRootDStatus(), nullptr);
202
203 return XRootDStatus();
204}
205
207 uint16_t timeout) {
208 if (!is_open_) {
209 logger_->Error(kLogXrdClHttp,
210 "Cannot stat. URL hasn't been previously opened");
212 }
213
214 auto stat_info = new StatInfo();
215 auto status = Posix::Stat(*davix_client_, url_, timeout, stat_info);
216 // A file that is_open_ = true should not retune 400/3011. the only time this
217 // happen is a newly created file. Davix doesn't issue a http PUT so this file
218 // won't show up for Stat(). Here we fake a response.
219 if (status.IsError() && status.code == 400 && status.errNo == 3011) {
220 std::ostringstream data;
221 data << 140737018595560 << " " << filesize << " " << 33261 << " " << time(NULL);
222 stat_info->ParseServerResponse(data.str().c_str());
223 }
224 else if (status.IsError()) {
225 logger_->Error(kLogXrdClHttp, "Stat failed: %s", status.ToStr().c_str());
226 return status;
227 }
228
229 logger_->Debug(kLogXrdClHttp, "Stat-ed URL: %s", url_.c_str());
230
231 auto obj = new AnyObject();
232 obj->Set(stat_info);
233
234 handler->HandleResponse(new XRootDStatus(), obj);
235
236 return XRootDStatus();
237}
238
239XRootDStatus HttpFilePlugIn::Read(uint64_t offset, uint32_t size, void *buffer,
240 ResponseHandler *handler,
241 uint16_t /*timeout*/) {
242 if (!is_open_) {
243 logger_->Error(kLogXrdClHttp,
244 "Cannot read. URL hasn't previously been opened");
246 }
247
248 // DavPosix::pread will return -1 if the pread goes beyond the file size
249 size = (offset + size > filesize)? filesize - offset : size;
250 std::pair<int, XRootDStatus> res;
251 if (! avoid_pread_) {
252 res = Posix::PRead(*davix_client_, davix_fd_, buffer, size, offset);
253 }
254 else {
255 offset_locker.lock();
256 if (offset == curr_offset) {
257 res = Posix::Read(*davix_client_, davix_fd_, buffer, size);
258 }
259 else {
260 res = Posix::PRead(*davix_client_, davix_fd_, buffer, size, offset);
261 }
262 }
263
264 if (res.second.IsError()) {
265 logger_->Error(kLogXrdClHttp, "Could not read URL: %s, error: %s",
266 url_.c_str(), res.second.ToStr().c_str());
267 if (avoid_pread_) offset_locker.unlock();
268 return res.second;
269 }
270
271 int num_bytes_read = res.first;
272 curr_offset = offset + num_bytes_read;
273 if (avoid_pread_) offset_locker.unlock();
274
275 logger_->Debug(kLogXrdClHttp, "Read %d bytes, at offset %llu, from URL: %s",
276 num_bytes_read, (unsigned long long) offset, url_.c_str());
277
278 auto status = new XRootDStatus();
279 auto chunk_info = new ChunkInfo(offset, num_bytes_read, buffer);
280 auto obj = new AnyObject();
281 obj->Set(chunk_info);
282 handler->HandleResponse(status, obj);
283
284 return XRootDStatus();
285}
286
288 private:
289 XrdCl::ResponseHandler *realHandler;
290 bool isChannelEncrypted;
291 public:
292 // constructor
294 bool isHttps) : realHandler(a), isChannelEncrypted(isHttps) {}
295
296 // Response Handler
298 XrdCl::AnyObject *rdresp) {
299
300 if( !status->IsOK() )
301 {
302 realHandler->HandleResponse( status, rdresp );
303 delete this;
304 return;
305 }
306
307 //using namespace XrdCl;
308
309 ChunkInfo *chunk = 0;
310 rdresp->Get(chunk);
311
312 std::vector<uint32_t> cksums;
313 if( isChannelEncrypted )
314 {
315 size_t nbpages = chunk->length / XrdSys::PageSize;
316 if( chunk->length % XrdSys::PageSize )
317 ++nbpages;
318 cksums.reserve( nbpages );
319
320 size_t size = chunk->length;
321 char *buffer = reinterpret_cast<char*>( chunk->buffer );
322
323 for( size_t pg = 0; pg < nbpages; ++pg )
324 {
325 size_t pgsize = XrdSys::PageSize;
326 if( pgsize > size ) pgsize = size;
327 uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
328 cksums.push_back( crcval );
329 buffer += pgsize;
330 size -= pgsize;
331 }
332 }
333
334 PageInfo *pages = new PageInfo(chunk->offset, chunk->length, chunk->buffer, std::move(cksums));
335 delete rdresp;
336 AnyObject *response = new AnyObject();
337 response->Set( pages );
338 realHandler->HandleResponse( status, response );
339
340 delete this;
341 }
342};
343
344XRootDStatus HttpFilePlugIn::PgRead(uint64_t offset, uint32_t size, void *buffer,
345 ResponseHandler *handler,
346 uint16_t timeout) {
347 ResponseHandler *substitHandler = new PgReadSubstitutionHandler( handler, isChannelEncrypted );
348 XRootDStatus st = Read(offset, size, buffer, substitHandler, timeout);
349 return st;
350}
351
352XRootDStatus HttpFilePlugIn::Write(uint64_t offset, uint32_t size,
353 const void *buffer, ResponseHandler *handler,
354 uint16_t timeout) {
355 if (!is_open_) {
356 logger_->Error(kLogXrdClHttp,
357 "Cannot write. URL hasn't previously been opened");
359 }
360
361 // res == std::pair<int, XRootDStatus>
362 auto res =
363 Posix::PWrite(*davix_client_, davix_fd_, offset, size, buffer, timeout);
364 if (res.second.IsError()) {
365 logger_->Error(kLogXrdClHttp, "Could not write URL: %s, error: %s",
366 url_.c_str(), res.second.ToStr().c_str());
367 return res.second;
368 }
369 else
370 filesize += res.first;
371
372 logger_->Debug(kLogXrdClHttp, "Wrote %d bytes, at offset %llu, to URL: %s",
373 res.first, (unsigned long long) offset, url_.c_str());
374
375 handler->HandleResponse(new XRootDStatus(), nullptr);
376
377 return XRootDStatus();
378}
379
380//------------------------------------------------------------------------
382//------------------------------------------------------------------------
384 uint32_t size,
385 const void *buffer,
386 std::vector<uint32_t> &cksums,
387 ResponseHandler *handler,
388 uint16_t timeout )
389{ (void)cksums;
390 return Write(offset, size, buffer, handler, timeout);
391}
392
394 (void)handler;
395 (void)timeout;
396
397 logger_->Debug(kLogXrdClHttp, "Sync is a no-op for HTTP.");
398
399 return XRootDStatus();
400}
401
402
404 ResponseHandler *handler,
405 uint16_t /*timeout*/) {
406 if (!is_open_) {
407 logger_->Error(kLogXrdClHttp,
408 "Cannot read. URL hasn't previously been opened");
410 }
411
412 const auto num_chunks = chunks.size();
413 std::vector<Davix::DavIOVecInput> input_vector(num_chunks);
414 std::vector<Davix::DavIOVecOuput> output_vector(num_chunks);
415
416 for (size_t i = 0; i < num_chunks; ++i) {
417 input_vector[i].diov_offset = chunks[i].offset;
418 input_vector[i].diov_size = chunks[i].length;
419 input_vector[i].diov_buffer = chunks[i].buffer;
420 }
421
422 // res == std::pair<int, XRootDStatus>
423 auto res = Posix::PReadVec(*davix_client_, davix_fd_, chunks, buffer);
424 if (res.second.IsError()) {
425 logger_->Error(kLogXrdClHttp, "Could not vectorRead URL: %s, error: %s",
426 url_.c_str(), res.second.ToStr().c_str());
427 return res.second;
428 }
429
430 int num_bytes_read = res.first;
431
432 logger_->Debug(kLogXrdClHttp, "VecRead %d bytes, from URL: %s",
433 num_bytes_read, url_.c_str());
434
435 char *output = static_cast<char *>(buffer);
436 for (size_t i = 0; i < num_chunks; ++i) {
437 std::memcpy(output + input_vector[i].diov_offset,
438 output_vector[i].diov_buffer, output_vector[i].diov_size);
439 }
440
441 auto status = new XRootDStatus();
442 auto read_info = new VectorReadInfo();
443 read_info->SetSize(num_bytes_read);
444 read_info->GetChunks() = chunks;
445 auto obj = new AnyObject();
446 obj->Set(read_info);
447 handler->HandleResponse(status, obj);
448
449 return XRootDStatus();
450}
451
452bool HttpFilePlugIn::IsOpen() const { return is_open_; }
453
454bool HttpFilePlugIn::SetProperty(const std::string &name,
455 const std::string &value) {
456 properties_[name] = value;
457 return true;
458}
459
460bool HttpFilePlugIn::GetProperty(const std::string &name,
461 std::string &value) const {
462 const auto p = properties_.find(name);
463 if (p == std::end(properties_)) {
464 return false;
465 }
466
467 value = p->second;
468 return true;
469}
470
471} // namespace XrdCl
static std::string ts()
timestamp output for logging messages
Definition XrdCephOss.cc:53
#define HTTP_FILE_PLUG_IN_AVOIDRANGE_ENV
#define HTTP_FILE_PLUG_IN_AVOIDRANGE_CGI
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
virtual XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout) override
virtual XRootDStatus PgWrite(uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout) override
virtual XRootDStatus Sync(ResponseHandler *handler, uint16_t timeout) override
virtual XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout) override
virtual XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout) override
virtual XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout) override
virtual XRootDStatus VectorRead(const ChunkList &chunks, void *buffer, XrdCl::ResponseHandler *handler, uint16_t timeout) override
virtual ~HttpFilePlugIn() noexcept
virtual XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout) override
virtual bool GetProperty(const std::string &name, std::string &value) const override
virtual XRootDStatus Close(ResponseHandler *handler, uint16_t timeout) override
virtual bool SetProperty(const std::string &name, const std::string &value) override
virtual bool IsOpen() const override
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *rdresp)
PgReadSubstitutionHandler(XrdCl::ResponseHandler *a, bool isHttps)
Handle an async response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Object stat info.
URL representation.
Definition XrdClURL.hh:31
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition XrdClURL.cc:344
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:244
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
std::pair< int, XrdCl::XRootDStatus > PReadVec(Davix::DavPosix &davix_client, DAVIX_FD *fd, const XrdCl::ChunkList &chunks, void *buffer)
std::pair< int, XrdCl::XRootDStatus > PWrite(Davix::DavPosix &davix_client, DAVIX_FD *fd, uint64_t offset, uint32_t size, const void *buffer, uint16_t timeout)
std::pair< int, XRootDStatus > PRead(Davix::DavPosix &davix_client, DAVIX_FD *fd, void *buffer, uint32_t size, uint64_t offset)
XRootDStatus Unlink(Davix::DavPosix &davix_client, const std::string &url, uint16_t timeout)
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
MkDirImpl< false > MkDir
const uint16_t stError
An error occurred that could potentially be retried.
StatImpl< false > Stat(Ctx< File > file, Arg< bool > force, uint16_t timeout=0)
CloseImpl< false > Close(Ctx< File > file, uint16_t timeout=0)
Factory for creating CloseImpl objects.
const uint16_t errInvalidOp
Davix::Context * root_davix_context_
std::vector< ChunkInfo > ChunkList
List of chunks.
Davix::DavPosix * root_davix_client_file_
void SetUpLogging(Log *logger)
static const uint64_t kLogXrdClHttp
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
static const int PageSize
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
@ MakePath
create the entire directory tree if it doesn't exist
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Write
Open only for writing.
@ Update
Open for reading and writing.
bool IsOK() const
We're fine.