XRootD
Loading...
Searching...
No Matches
XrdOfsCksFile.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d O f s C k s F i l e . c c */
4/* */
5/* (c) 2026 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <stdlib.h>
32#include <netinet/in.h>
33#include <sys/param.h>
34
35#include "XrdCks/XrdCks.hh"
36#include "XrdCks/XrdCksCalc.hh"
37#include "XrdCks/XrdCksData.hh"
39#include "XrdOuc/XrdOucEnv.hh"
40#include "XrdSfs/XrdSfsAio.hh"
41#include "XrdSys/XrdSysError.hh"
42
43/******************************************************************************/
44/* s t a t i c o b j e c t s */
45/******************************************************************************/
46
48
49namespace
50{
52 XrdCks* cksP = 0;
53}
54
55/******************************************************************************/
56/* C o n s t r u c t o r */
57/******************************************************************************/
58
59XrdOfsCksFile::XrdOfsCksFile(const char* tid, const char* path, XrdOssDF* df,
60 XrdCksCalc* cP, bool& delF)
61 : XrdOssWrapDF(*df),
62 tident(tid), fPath(strdup(path)), ossDF(df),
63 calcP(cP), altcP(0), viaDel(delF), nextOff(0),
64 ioBuff(0)
65{
66// Obtain information about the chacksum we are to use. It should have been
67// pre-screened for viability, but we check it again just to make sure and
68// to setup the proper execution path.
69//
70 int rc, sz;
71 char eBuff[128];
72
73// Set the name and get the size of the checksum
74//
75 cksName = cP->Type(sz);
76 Dirty = false;
77
78
79// Determine how we will compute the real-time checksum. It will either be
80// either combinable via a re-read computation.
81//
82 if (cP->Combinable() && (sz == (int)sizeof(uint32_t)))
83 {ProcessRTC = &XrdOfsCksFile::RTC_CB32;
84 altcP = calcP->New();
85 } else {
86 ProcessRTC = &XrdOfsCksFile::RTC_NCXX;
87 if ((rc = posix_memalign(&ioBuff, 4096, ioBlen)))
88 {snprintf(eBuff, sizeof(eBuff),
89 "get buffer for real-time %s checksum for", cksName);
90 eLog.Emsg("ckscon", rc, eBuff, path);
91 Dirty = true;
92 }
93 }
94}
95
96/******************************************************************************/
97/* D e s t r u c t o r */
98/******************************************************************************/
99
101{
102// Cleanup
103//
104 if (ossDF) delete ossDF;
105 if (calcP) calcP->Recycle();
106 if (altcP) altcP->Recycle();
107 if (fPath) free(fPath);
108 if (ioBuff) free(ioBuff);
109}
110
111/******************************************************************************/
112/* c l o s e */
113/******************************************************************************/
114
115/*
116 Function: Close the file associated with this object.
117
118 Input: None.
119
120 Output: Returns XrdOssOK upon success and -1 upon failure.
121*/
122int XrdOfsCksFile::Close(long long *retsz)
123{
124 XrdCksData cksData;
125 struct stat Stat;
126 int csSize, rc;;
127
128// Process checksum if it is valid
129//
130 cksMtx.Lock();
131 while(!Dirty) // This is not a loop but avoids deeply next if's.
132 {char eBuff[256];
133
134 // If we are here vecause of a delete, skip setting checksum
135 //
136 if (viaDel)
137 {snprintf(eBuff, sizeof(eBuff), "File not properly closed; "
138 "real-time %s checksum was not set for", cksName);
139 eLog.Emsg("ckscls", eBuff, fPath);
140 break;
141 }
142
143 // Verify that checksum was fully calculated
144 //
145
146 // Verify that all data has been written for this checksum
147 //
148 if (segMap.size())
149 {auto it = segMap.begin();
150 snprintf(eBuff, sizeof(eBuff),
151 "%lld bytes missing at offset %lld; real-time %s",
152 (long long)(it->second.segBeg - nextOff),
153 (long long)nextOff, cksName);
154 eLog.Emsg("ckcls", eBuff, "checksum was not set for", fPath);
155 break;
156 }
157
158 // Fill out the checksum information
159 //
160 memset((void*)&cksData, 0, sizeof(cksData));
161 cksData.Set(calcP->Type(csSize));
162 cksData.Length = csSize;
163 memcpy(cksData.Value, calcP->Final(), csSize);
164
165 if ((rc = ossDF->Fstat(&Stat)))
166 {eLog.Emsg("clscls", rc, "get real-time checksum mtime for", fPath);
167 break;
168 }
169
170 cksData.fmTime = static_cast<long long>(Stat.st_mtime);
171 cksData.csTime = static_cast<int>(time(0) - Stat.st_mtime);
172
173 if ((rc = cksP->Set(fPath, cksData, 1)))
174 eLog.Emsg("ckscls", rc, "set real-time checksum for", fPath);
175
176 break;
177 }
178
179// Issue close to the underlying object
180//
181 Dirty = true; // Prevent re-entry processing
182 cksMtx.UnLock();
183 return wrapDF.Close(retsz);
184}
185
186/******************************************************************************/
187/* F t r u n c a t e */
188/******************************************************************************/
189/*
190 Function: Set the length of associated file to 'flen'.
191
192 Input: flen - The new size of the file.
193
194 Output: Returns XrdOssOK upon success and -errno upon failure.
195*/
196int XrdOfsCksFile::Ftruncate(unsigned long long flen)
197{
198
199// Execute the truncate
200//
201 int rc = wrapDF.Ftruncate(flen);
202
203// We support streaming checksum only when the truncate makes the file 0 length
204//
205 if (!Dirty)
206 {if (rc < 0)
207 {eLog.Emsg("ckstrunc", rc, "continue real-time checksum for", fPath);
208 Dirty = true;
209 } else {
210 if (flen)
211 {eLog.Emsg("ckstrunc","Unable to continue real-time checksum for",
212 fPath, "; truncate arg not 0.");
213 Dirty = true;
214 } else {
215 nextOff = 0;
216 segMap.clear();
217 calcP->Init();
218 }
219 }
220 }
221
222// All done
223//
224 return rc;
225}
226
227/******************************************************************************/
228/* I n i t */
229/******************************************************************************/
230
232{
233// Record the checksum manager
234//
235 cksP = cp;
236}
237
238/******************************************************************************/
239/* O p e n */
240/******************************************************************************/
241
242int XrdOfsCksFile::Open(const char* path, int Oflag, mode_t Mode,
243 XrdOucEnv& env)
244{
245// Make sure we have a clean setup. If not return an error.
246//
247 if (Dirty) return -ENOTSUP;
248
249// We intercept open because for non-combinable checksums we need to make
250// sure the file is open in r/w mode. Since we don't support such checksums
251// yet, the code below is commented out.
252//
253/* if (!(calcp->Combinable()))
254 {Oflag &= ~O_ACCMODE;
255 Oflag |= O_RDWR;
256 }
257*/
258// Issue open and if unsuccessful, mark this as a dirty object
259//
260 int rc = wrapDF.Open(path, Oflag, Mode, env);
261 if (rc) Dirty = true;
262 return rc;
263}
264
265/******************************************************************************/
266/* p g W r i t e */
267/******************************************************************************/
268
269ssize_t XrdOfsCksFile::pgWrite(void* buffer,
270 off_t offset,
271 size_t wrlen,
272 uint32_t* csvec,
273 uint64_t opts)
274{
275 const char* eText;
276
277// We will only continue the checksum if the underlying write succeeds
278//
279 ssize_t retval = wrapDF.pgWrite(buffer, offset, wrlen, csvec, opts);
280
281// Continue the streaming checksum if at all possible
282//
283 if (!Dirty)
284 {if (retval < 0)
285 {eLog.Emsg("ckspgw", retval, "continue real-time checksum for",fPath);
286 Dirty = true;
287 } else {
288 if ((eText = (this->*ProcessRTC)(buffer, offset, wrlen)))
289 {eLog.Emsg("ckspgw","unable to continue real-time checksum for",
290 fPath, eText);
291 }
292 }
293 }
294
295// Return actual result
296//
297 return retval;
298}
299
300/******************************************************************************/
301
302int XrdOfsCksFile::pgWrite(XrdSfsAio* aioparm, uint64_t opts)
303{
304 const char* eText;
305
306// It is too complicated to do the async I/O before doing the checksum
307//
308 if (!Dirty)
309 {if ((eText = (this->*ProcessRTC)((void *)aioparm->sfsAio.aio_buf,
310 (off_t) aioparm->sfsAio.aio_offset,
311 (size_t)aioparm->sfsAio.aio_nbytes)))
312 {eLog.Emsg("cksaiopw", "Unable to continue real-time checksum for",
313 fPath, eText);
314 Dirty = true;
315 }
316 }
317
318// Now do the I/O
319//
320 return wrapDF.pgWrite(aioparm, opts);
321}
322
323/******************************************************************************/
324/* w r i t e */
325/******************************************************************************/
326
327/*
328 Function: Write `blen' bytes to the associated file, from 'buff'
329 and return the actual number of bytes written.
330
331 Input: buff - Address of the buffer from which to get the data.
332 offset - The absolute 64-bit byte offset at which to write.
333 blen - The number of bytes to write from the buffer.
334
335 Output: Returns the number of bytes written upon success and -errno o/w.
336*/
337
338ssize_t XrdOfsCksFile::Write(const void* buff, off_t offset, size_t blen)
339{
340 const char* eText;
341
342// We will only continue the checksum if the underlying write succeeds
343//
344 ssize_t retval = wrapDF.Write(buff, offset, blen);
345
346// Continue the streaming checksum if at all possible
347//
348 if (!Dirty)
349 {if (retval < 0)
350 {eLog.Emsg("cksw", retval, "continue streaming checksum for", fPath);
351 Dirty = true;
352 } else {
353 if ((eText = (this->*ProcessRTC)(buff, offset, blen)))
354 {eLog.Emsg("cksw", "Unable to continue real-time checksum for",
355 fPath, eText);
356 Dirty = true;
357 }
358 }
359 }
360
361// Return actual result
362//
363 return retval;
364}
365
366/******************************************************************************/
367
369{
370 const char* eText;
371
372// It is too complicated to do the async I/O before doing the checksum
373//
374 if (!Dirty)
375 {if ((eText = (this->*ProcessRTC)((void *)aioparm->sfsAio.aio_buf,
376 (off_t) aioparm->sfsAio.aio_offset,
377 (size_t)aioparm->sfsAio.aio_nbytes)))
378 {eLog.Emsg("cksaiopw", "Unable to continue real-time checksum for",
379 fPath, eText);
380 Dirty = true;
381 }
382 }
383
384// Now do the I/O
385//
386 return wrapDF.Write(aioparm);
387}
388
389/******************************************************************************/
390/* W r i t e V */
391/******************************************************************************/
392
393ssize_t XrdOfsCksFile::WriteV(XrdOucIOVec* writeV, int n)
394{
395
396// We do not support streaming checksums when WriteV is used
397//
398 if (!Dirty)
399 {eLog.Emsg("ckswv", "Unable to continue streaming checksum for",
400 fPath, "; WriteV() conflict.");
401 Dirty = true;
402 }
403
404// We still handle the actual write
405//
406 return wrapDF.WriteV(writeV, n);
407}
408
409/******************************************************************************/
410/* P r i v a t e M e t h o d s */
411/******************************************************************************/
412/******************************************************************************/
413/* R T C _ C B 3 2 */
414/******************************************************************************/
415
416// This method handles combinable checkums that are 32 bits in length
417//
418const char* XrdOfsCksFile::RTC_CB32(const void* inBuff, off_t inOff, int inLen)
419{
420 XrdSysMutexHelper mHelp(cksMtx);
421
422// Check where the incomming segment is adjacent to current segment
423//
424 if (inOff == nextOff)
425 {calcP->Update((const char*)inBuff, inLen);
426 nextOff = inOff + inLen;
427 auto it = segMap.begin();
428 while(it != segMap.end() && nextOff == it->second.segBeg)
429 {// Combine or update base checksum
430 calcP->Combine((const char*)&(it->second.segCks),it->second.segLen);
431 nextOff = it->second.segBeg + it->second.segLen;
432 it = segMap.erase(it);
433 }
434
435 // Verify that we end in a proper state
436 //
437 if (it != segMap.end() && nextOff > it->second.segBeg)
438 return "; I/O segments overlap";
439
440 return 0;
441 }
442
443// Verify that incomming segment is past the expectected segment
444//
445 if (inOff < nextOff) return "; ovewrite of previous data";
446
447// Compute checksum for the incomming block
448//
449 uint32_t theCS;
450 memcpy(&theCS, altcP->Calc((const char*)inBuff, inLen), sizeof(theCS));
451
452// Create new segment and try inserting it into the map
453//
454 inSeg newSeg(inOff, inLen, theCS);
455
456// Insert this element into the map
457//
458 auto it = segMap.insert(std::pair(inOff, newSeg));
459 if (it.second == false)
460 return "; duplicate write";
461
462// All done
463//
464 return 0;
465}
466
467/******************************************************************************/
468/* R T C _ N C X X */
469/******************************************************************************/
470
471// This method handles noncombinable checkums of any legth.
472//
473const char* XrdOfsCksFile::RTC_NCXX(const void* inBuff, off_t inOff, int inLen)
474{
475 XrdSysMutexHelper mHelp(cksMtx);
476
477// Check where the incomming segment is adjacent to current segment
478//
479 if (inOff == nextOff)
480 {const char* eText;
481 calcP->Update((const char*)inBuff, inLen);
482 nextOff = inOff + inLen;
483 auto it = segMap.begin();
484 while(it != segMap.end() && nextOff == it->second.segBeg)
485 {// Update base checksum for subsequent blocks
486 if ((eText = RTC_Updt(it->second.segBeg, it->second.segLen)))
487 return eText;
488 nextOff = it->second.segBeg + it->second.segLen;
489 it = segMap.erase(it);
490 }
491
492 // Verify that we end in a proper state
493 //
494 if (it != segMap.end() && nextOff > it->second.segBeg)
495 return "; I/O segments overlap";
496
497 return 0;
498 }
499
500// Verify that incomming segment is past the expectected segment
501//
502 if (inOff < nextOff) return "; ovewrite of previous data";
503
504// Create new segment and try inserting it into the map
505//
506 inSeg newSeg(inOff, inLen, 0);
507
508// Insert this element into the map
509//
510 auto it = segMap.insert(std::pair(inOff, newSeg));
511 if (it.second == false)
512 return "; duplicate write";
513
514// All done
515//
516 return 0;
517}
518
519/******************************************************************************/
520/* R T C _ U p d t */
521/******************************************************************************/
522
523const char* XrdOfsCksFile::RTC_Updt(off_t inOff, int inLen)
524{
525 ssize_t ioLen, retval;
526
527 while(inLen)
528 {ioLen = (inLen > ioBlen ? ioBlen : inLen);
529 if ((retval = ossDF->Read(ioBuff, inOff, ioLen)) != ioLen)
530 {if (retval >= 0) retval = -ENODATA;
531 return eLog.ec2text(retval);
532 }
533 calcP->Update((const char*)ioBuff, ioLen);
534 inOff += ioLen;
535 inLen -= ioLen;
536 }
537 return 0;
538}
struct stat Stat
Definition XrdCks.cc:49
#define ENODATA
XrdSysError OfsEroute(0)
#define stat(a, b)
Definition XrdPosix.hh:105
int Mode
struct myOpts opts
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
void * aio_buf
Definition XrdSfsAio.hh:47
virtual bool Combinable()
Definition XrdCksCalc.hh:67
virtual void Update(const char *Buff, int BLen)=0
virtual const char * Combine(const char *Cksum, int DLen)
Definition XrdCksCalc.hh:85
virtual const char * Type(int &csSize)=0
int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) override
ssize_t Write(const void *buffer, off_t offset, size_t size) override
ssize_t WriteV(XrdOucIOVec *writeV, int wrvcnt) override
static void Init(XrdCks *cp, XrdOucEnv *ep)
XrdOfsCksFile(const char *tid, const char *path, XrdOssDF *df, XrdCksCalc *cP, bool &delFlag)
int Close(long long *retsz=0) override
ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts) override
int Ftruncate(unsigned long long flen) override
virtual ~XrdOfsCksFile()
XrdOssDF(const char *tid="", uint16_t dftype=0, int fdnum=-1)
Definition XrdOss.hh:504
XrdOssDF & wrapDF
XrdOssWrapDF(XrdOssDF &df2Wrap)
struct aiocb sfsAio
Definition XrdSfsAio.hh:62
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static const char * ec2text(int ecode)
XrdSysError * eLog