UnbufferedFile improvements v5

Message ID 4384BEDC.2050301@o2.pl
State New
Headers

Commit Message

Artur Skawina Nov. 23, 2005, 7:11 p.m. UTC
  Ralf Müller wrote:
> On Dienstag 22 November 2005 20:29, Artur Skawina wrote:
>> 4th, hopefully final version.
> 
> Looks good - at a first glance. I'm not sure I can test it before 
> weekend, but I will try. Had a died power supply on my vdr yesterday 
> and have to deal with this first ;)

Unfortunately while getting the read side cache management working, i managed to 
break it on the write side completely. And the fdatasyncs somehow covered this 
up in testing.
After fixing that issue, the code now works as advertised: the cache does not 
grow while recording, does not grow when playing back, both together are fine 
too. Cutting uses ~20..30M and seems to be IO bound (~10M/s NFS<>disk, ~6M/s 
NFS<>NFS, couldn't test disk<>disk on large enough files). Playing back one file 
while another one is being cut works, jumping around is maybe a little slower 
than on an idle vdr, but i guess that's to be expected when we use up all the 
disk b/w for the copying. Note that playback-while-cutting is probably very 
dependent on hw setup - using a fast disk and/or NFS it might work, but for 
slower media some kind of b/w limit on the cutting process could be necessary.

Final patch version below, with just the write fix mentioned above, some 
cosmetic changes and more comments. Tested on real data this time (that was 
before the cleanups/commenting, but i hope i didn't break it this time :)

artur
  

Patch

--- vdr-1.3.36.org/config.c	2005-09-09 17:08:59.000000000 +0200
+++ vdr-1.3.36/config.c	2005-11-21 22:17:55.000000000 +0100
@@ -453,6 +453,7 @@  bool cSetup::Parse(const char *Name, con
   else if (!strcasecmp(Name, "UseSmallFont"))        UseSmallFont       = atoi(Value);
   else if (!strcasecmp(Name, "MaxVideoFileSize"))    MaxVideoFileSize   = atoi(Value);
   else if (!strcasecmp(Name, "SplitEditedFiles"))    SplitEditedFiles   = atoi(Value);
+  else if (!strcasecmp(Name, "WriteStrategy"))       WriteStrategy      = atoi(Value);
   else if (!strcasecmp(Name, "MinEventTimeout"))     MinEventTimeout    = atoi(Value);
   else if (!strcasecmp(Name, "MinUserInactivity"))   MinUserInactivity  = atoi(Value);
   else if (!strcasecmp(Name, "MultiSpeedMode"))      MultiSpeedMode     = atoi(Value);
@@ -518,6 +519,7 @@  bool cSetup::Save(void)
   Store("UseSmallFont",       UseSmallFont);
   Store("MaxVideoFileSize",   MaxVideoFileSize);
   Store("SplitEditedFiles",   SplitEditedFiles);
+  Store("WriteStrategy",      WriteStrategy);
   Store("MinEventTimeout",    MinEventTimeout);
   Store("MinUserInactivity",  MinUserInactivity);
   Store("MultiSpeedMode",     MultiSpeedMode);
--- vdr-1.3.36.org/config.h	2005-11-04 16:55:05.000000000 +0100
+++ vdr-1.3.36/config.h	2005-11-21 21:57:52.000000000 +0100
@@ -249,6 +249,7 @@  public:
   int UseSmallFont;
   int MaxVideoFileSize;
   int SplitEditedFiles;
+  int WriteStrategy;
   int MinEventTimeout, MinUserInactivity;
   int MultiSpeedMode;
   int ShowReplayMode;
--- vdr-1.3.36.org/cutter.c	2005-10-31 13:26:44.000000000 +0100
+++ vdr-1.3.36/cutter.c	2005-11-23 15:54:44.000000000 +0100
@@ -66,6 +66,7 @@  void cCuttingThread::Action(void)
      toFile = toFileName->Open();
      if (!fromFile || !toFile)
         return;
+     fromFile->setreadahead(MEGABYTE(10));
      int Index = Mark->position;
      Mark = fromMarks.Next(Mark);
      int FileSize = 0;
@@ -90,6 +91,7 @@  void cCuttingThread::Action(void)
            if (fromIndex->Get(Index++, &FileNumber, &FileOffset, &PictureType, &Length)) {
               if (FileNumber != CurrentFileNumber) {
                  fromFile = fromFileName->SetOffset(FileNumber, FileOffset);
+                 fromFile->setreadahead(MEGABYTE(10));
                  CurrentFileNumber = FileNumber;
                  }
               if (fromFile) {
--- vdr-1.3.36.org/menu.c	2005-11-05 18:29:22.000000000 +0100
+++ vdr-1.3.36/menu.c	2005-11-21 22:14:38.000000000 +0100
@@ -2270,6 +2270,7 @@  cMenuSetupRecord::cMenuSetupRecord(void)
   Add(new cMenuEditIntItem( tr("Setup.Recording$Instant rec. time (min)"),   &data.InstantRecordTime, 1, MAXINSTANTRECTIME));
   Add(new cMenuEditIntItem( tr("Setup.Recording$Max. video file size (MB)"), &data.MaxVideoFileSize, MINVIDEOFILESIZE, MAXVIDEOFILESIZE));
   Add(new cMenuEditBoolItem(tr("Setup.Recording$Split edited files"),        &data.SplitEditedFiles));
+  Add(new cMenuEditIntItem( tr("Setup.Recording$Write strategy"),            &data.WriteStrategy, 0, 2));
 }
 
 // --- cMenuSetupReplay ------------------------------------------------------
--- vdr-1.3.36.org/tools.c	2005-11-04 17:33:18.000000000 +0100
+++ vdr-1.3.36/tools.c	2005-11-23 18:28:25.000000000 +0100
@@ -7,6 +7,7 @@ 
  * $Id: tools.c 1.103 2005/11/04 16:33:18 kls Exp $
  */
 
+#include "config.h"
 #include "tools.h"
 #include <ctype.h>
 #include <dirent.h>
@@ -851,8 +852,7 @@  bool cSafeFile::Close(void)
 
 // --- cUnbufferedFile -------------------------------------------------------
 
-#define READ_AHEAD MEGABYTE(2)
-#define WRITE_BUFFER MEGABYTE(10)
+#define WRITE_BUFFER KILOBYTE(800)
 
 cUnbufferedFile::cUnbufferedFile(void)
 {
@@ -868,23 +868,28 @@  int cUnbufferedFile::Open(const char *Fi
 {
   Close();
   fd = open(FileName, Flags, Mode);
-  begin = end = ahead = -1;
+  curpos = 0;
+  begin = lastpos = ahead = 0;
+  readahead = 128*1024;
+  pendingreadahead = 0;
   written = 0;
+  totwritten = 0;
+  if (fd >= 0) {
+     // we really mean POSIX_FADV_SEQUENTIAL, but we do our own readahead
+     // so turn off the kernel one.
+     if (Setup.WriteStrategy!=1)
+        posix_fadvise(fd, 0, 0, POSIX_FADV_RANDOM);
+     }
   return fd;
 }
 
 int cUnbufferedFile::Close(void)
 {
   if (fd >= 0) {
-     if (ahead > end)
-        end = ahead;
-     if (begin >= 0 && end > begin) {
-        //dsyslog("close buffer: %d (flush: %d bytes, %ld-%ld)", fd, written, begin, end);
-        if (written)
-           fdatasync(fd);
-        posix_fadvise(fd, begin, end - begin, POSIX_FADV_DONTNEED);
-        }
-     begin = end = ahead = -1;
+     if (totwritten)    // if we wrote anything make sure the data has hit the disk before
+        fdatasync(fd);  // calling fadvise, as this is our last chance to un-cache it.
+     posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED);
+     begin = lastpos = ahead = 0;
      written = 0;
      }
   int OldFd = fd;
@@ -894,40 +899,100 @@  int cUnbufferedFile::Close(void)
 
 off_t cUnbufferedFile::Seek(off_t Offset, int Whence)
 {
-  if (fd >= 0)
-     return lseek(fd, Offset, Whence);
-  return -1;
-}
-
+  //dsyslog("Seek: fd: %d offs: %ld whence: %d diff: %ld", fd, (long)Offset, Whence, Offset-curpos);
+  if (Whence == SEEK_SET && Offset == curpos)
+     return curpos;
+  
+  curpos = lseek(fd, Offset, Whence);
+  return curpos;
+}
+
+// When replaying and going eg FF->PLAY the position jumps back 2..8M
+// hence we might not want to drop that data at once. 
+// Ignoring this for now to avoid making this even more complex,
+// but we could at least try to handle the common cases.
+// (PLAY->FF->PLAY, small jumps, moving editing marks etc)
+
+#define KREADAHEAD MEGABYTE(4) // amount of kernel/fs prefetch that could
+                               // happen in addition to our own.
+#define FADVGRAN   4096        // AKA fadvise-chunk-size; PAGE_SIZE or
+                               // getpagesize(2) would also work.
+                               
 ssize_t cUnbufferedFile::Read(void *Data, size_t Size)
 {
   if (fd >= 0) {
-     off_t pos = lseek(fd, 0, SEEK_CUR);
-     // jump forward - adjust end position
-     if (pos > end)
-        end = pos;
-     // after adjusting end - don't clear more than previously requested
-     if (end > ahead)
-        end = ahead;
-     // jump backward - drop read ahead of previous run
-     if (pos < begin)
-        end = ahead;
-     if (begin >= 0 && end > begin)
-        posix_fadvise(fd, begin - KILOBYTE(200), end - begin + KILOBYTE(200), POSIX_FADV_DONTNEED);//XXX macros/parameters???
-     begin = pos;
+     off_t jumped = curpos-lastpos; // nonzero means we're not at the last offset
+     if (jumped) {                  // ie some kind of jump happened.
+        pendingreadahead += ahead-lastpos+KREADAHEAD;
+        // jumped forward? - treat as if we did read all the way to current pos.
+        if (jumped >= 0) {
+           lastpos = curpos;
+           // but clamp at ahead so we don't clear more than previously requested.
+           // (would be mostly harmless anyway, unless we got more than one reader of this file)
+           if (lastpos > (ahead+KREADAHEAD))
+              lastpos = ahead+KREADAHEAD;
+        }
+        // jumped backward? - drop both last read _and_ read-ahead
+        else
+	   if (curpos < begin)
+              lastpos = ahead+KREADAHEAD;
+           // jumped backward, but still inside prev read window? - pretend we read less.
+           else /* if (curpos >= begin) */
+              lastpos = curpos;
+        }
+        
      ssize_t bytesRead = safe_read(fd, Data, Size);
+     
+     // Now drop all data accesed during _previous_ Read().
+     // fadvise() internally has page granularity; it drops only whole pages
+     // from the range we specify (since it cannot know if we will be accessing
+     // the rest). This is why we drop only the previously read data, and do it
+     // _after_ the current read() call, while rounding up the window to make
+     // sure that even not PAGE_SIZE-aligned data gets freed.
+     // Try to merge the fadvise calls a bit in order to reduce overhead.
+     if (Setup.WriteStrategy!=1 && begin >= 0 && lastpos > begin)
+        if (jumped || (size_t)(lastpos-begin) > readahead) {
+           //dsyslog("taildrop:    %ld..%ld size %ld", begin, lastpos, lastpos-begin);
+           posix_fadvise(fd, begin-(FADVGRAN-1), lastpos-begin+(FADVGRAN-1)*2, POSIX_FADV_DONTNEED);
+           begin = curpos;
+           }
+        
      if (bytesRead > 0) {
-        pos += bytesRead;
-        end = pos;
-        // this seems to trigger a non blocking read - this
-        // may or may not have been finished when we will be called next time.
-        // If it is not finished we can't release the not yet filled buffers.
-        // So this is commented out till we find a better solution.
-        //posix_fadvise(fd, pos, READ_AHEAD, POSIX_FADV_WILLNEED);
-        ahead = pos + READ_AHEAD;
+        curpos += bytesRead;
+        //dsyslog("jump: %06ld ra: %06ld size: %ld", jumped, (long)readahead, (long)Size);
+
+        // no jump? (allow small forward jump still inside readahead window).
+        if (Setup.WriteStrategy!=1 && jumped>=0 && jumped<=(off_t)readahead) {
+           if ( readahead < Size*32 ) { // automagically tune readahead size.
+              readahead = Size*32;
+              //dsyslog("Readahead for fd: %d increased to %ld", fd, (long)readahead);
+              }
+           // Trigger the readahead IO, but only if we're used at least
+           // half of the previously requested area. This avoids calling
+           // fadvise() after every read() call.
+           if (ahead<(off_t)(curpos+readahead/2)) {
+              posix_fadvise(fd, curpos, readahead, POSIX_FADV_WILLNEED);
+              ahead = curpos + readahead;
+              }
+           }
+        else {
+           // jumped - we really don't want any readahead now. otherwise
+           // eg fast-rewind gets in trouble.
+           ahead = curpos;
+
+           // Every now and then, flush all cached data from this file; mostly
+           // to get rid of nonflushed readahead coming from _previous_ jumps
+           // (if the readahead I/O hasn't finished by the time we called
+           // fadvice() to undo it, the data could still be cached).
+           // The accounting does not have to be 100% accurate, as long as
+           // this triggers after _some_ jumps we should be ok.
+           if (Setup.WriteStrategy!=1 && pendingreadahead>MEGABYTE(30)) {
+              pendingreadahead = 0;
+              posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED);
+              }
+           }
         }
-     else
-        end = pos;
+     lastpos = curpos;
      return bytesRead;
      }
   return -1;
@@ -936,25 +1001,33 @@  ssize_t cUnbufferedFile::Read(void *Data
 ssize_t cUnbufferedFile::Write(const void *Data, size_t Size)
 {
   if (fd >=0) {
-     off_t pos = lseek(fd, 0, SEEK_CUR);
      ssize_t bytesWritten = safe_write(fd, Data, Size);
-     if (bytesWritten >= 0) {
+     if (bytesWritten > 0) {
+        begin = min(begin, curpos);
+        curpos += bytesWritten;
         written += bytesWritten;
-        if (begin >= 0) {
-           if (pos < begin)
-              begin = pos;
-           }
-        else
-           begin = pos;
-        if (pos + bytesWritten > end)
-           end = pos + bytesWritten;
+        lastpos = max(lastpos, curpos);
         if (written > WRITE_BUFFER) {
-           //dsyslog("flush buffer: %d (%d bytes, %ld-%ld)", fd, written, begin, end);
-           fdatasync(fd);
-           if (begin >= 0 && end > begin)
-              posix_fadvise(fd, begin, end - begin, POSIX_FADV_DONTNEED);
-           begin = end = -1;
+           //dsyslog("flush buffer: %d (%d bytes, %ld-%ld)", fd, written, begin, lastpos);
+           if (Setup.WriteStrategy!=1 && lastpos>begin) {
+              off_t headdrop = max(begin, WRITE_BUFFER*2L);
+              posix_fadvise(fd, begin-headdrop, lastpos-begin+headdrop, POSIX_FADV_DONTNEED);
+              }
+           begin = lastpos = max(0L, curpos-4095);
+           totwritten += written;
            written = 0;
+           // The above fadvise() works when writing slowly (recording), but could
+           // leave cached data around when writing at a high rate (cutting).
+           // Also, it seems in some setups, the above does not trigger any I/O and
+           // the fdatasync() call below has to do all the work (reiserfs with some
+           // kind of write gathering enabled).
+           // We add 'readahead' to the threshold in an attempt to increase cutting
+           // speed; it's a tradeoff -- speed vs RAM-used.
+           if (Setup.WriteStrategy!=1 && totwritten>(MEGABYTE(20)+readahead)) {
+              totwritten = 0;
+              fdatasync(fd);
+              posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED);
+              }
            }
         }
      return bytesWritten;
--- vdr-1.3.36.org/tools.h	2005-11-05 11:54:39.000000000 +0100
+++ vdr-1.3.36/tools.h	2005-11-23 16:21:19.000000000 +0100
@@ -205,10 +205,14 @@  public:
 class cUnbufferedFile {
 private:
   int fd;
+  off_t curpos;
   off_t begin;
-  off_t end;
+  off_t lastpos;
   off_t ahead;
-  ssize_t written;
+  size_t pendingreadahead;
+  size_t readahead;
+  size_t written;
+  size_t totwritten;
 public:
   cUnbufferedFile(void);
   ~cUnbufferedFile();
@@ -218,6 +222,7 @@  public:
   ssize_t Read(void *Data, size_t Size);
   ssize_t Write(const void *Data, size_t Size);
   static cUnbufferedFile *Create(const char *FileName, int Flags, mode_t Mode = DEFFILEMODE);
+  void setreadahead(size_t ra) { readahead = ra; };
   };
 
 class cLockFile {