ref: eda51ffbd5c5697bf0578eaff6874133a6d480d8
dir: /writer.c/
#include "all.h" /* Only 1 writer to maintain the sequence of writes. Else, the directory entry could get written before the directory content by another write process. This creates a mess to recover on a crash. */ /* below is from nemo's Pg 252 */ typedef struct Dirties Dirties; typedef struct Wbuf Wbuf; struct Dirties { QLock lck; /* controls access to this queue */ Wbuf *head, *tail; /* linked list of dirty blocks yet to be written to the disk */ s32 n; /* number of dirty blocks in this linked list */ Rendez isfull; /* write throttling */ Rendez isempty; /* writer does not have to keep polling to find work */ } drts = {0}; struct Wbuf { u64 blkno; /* block number on the disk, primary key */ u16 len; Wbuf *prev, *next; /* writer queue */ Iobuf *iobuf; /* pointer to the used Iobuf in the buffer cache */ union{ u8 *payload; /* "real" contents */ Content *io; /* cast'able to contents */ }; }; u64 npendingwrites; /* write throttling */ u8 stopwrites = 0; static void stats(void); static Wbuf * pluck(Wbuf *b) { if(b == nil) return nil; else if(b->prev == nil && b->next == nil){ /* only one */ drts.head = drts.tail = nil; goto Done; }else if(b->prev == nil){ /* first in the linked list */ drts.head = b->next; b->next = nil; drts.head->prev = nil; goto Done; }else if(b->prev != nil && b->next != nil){ /* somewhere in the middle */ b->next->prev = b->prev; b->prev->next = b->next; b->prev = b->next = nil; goto Done; }else if(b->next == nil){ /* last in the linked list */ drts.tail = b->prev; b->prev->next = nil; b->prev = nil; goto Done; } panic("pluck should not be here\n"); return nil; // too late, b was written already Done: drts.n--; if(drts.n < 0) panic("drts.n < 0\n"); return b; } /* the Iobuf should be wlock()'ed at entry and until it is placed in the writer queue. It is unlocked after it is placed in writer queue to avoid another process putting it in the writer queue before us. This ensures that the write order is maintained and a newer write being overwritten by older write. */ void putwrite(Iobuf *b) { Wbuf *w; if(chatty9p > 4){ dprint("putwrite start p->blkno %llud\n", b->blkno); stats(); } w = emalloc9p(sizeof(Wbuf)); w->blkno = b->blkno; w->len = b->len; w->payload = allocmemunits(b->len); memcpy(w->payload, b->xiobuf, b->len*Unit); incref(&b->dirties); w->iobuf = b; qlock(&drts.lck); if(drts.n == npendingwrites) rsleep(&drts.isfull); if(chkwunlock(b) == 0){ showbuf(b); panic("putwrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&b)); } if(drts.head == nil){ drts.head = drts.tail = w; }else{ drts.tail->next = w; w->prev = drts.tail; drts.tail = w; } drts.n++; if(drts.n == 1) rwakeup(&drts.isempty); qunlock(&drts.lck); if(chatty9p > 4 && b!=nil){ dprint("putwrite done b->blkno %llud\n", b->blkno); stats(); } } /* dirties is decremented without a wlock() on the buffer in dowrite(). Using a wlock() in dowrite() deadlocks with putwrite(). getbuf() guarantees that even a free'ed block cannot be stolen until the dirties == 0. This avoids dirty blocks being stolen by other block numbers. incref(dirties) only happens with a wlock() in putwrite(). */ void dowrite(void) { Wbuf *b; u64 n; qlock(&drts.lck); if(drts.n == 0){ if(stopwrites){ qunlock(&drts.lck); return; } rsleep(&drts.isempty); if(drts.n == 0 && stopwrites){ qunlock(&drts.lck); return; } } // dprint("dowrite: drts.n %llud\n", drts.n); if(drts.head == nil){ qunlock(&drts.lck); return; } b = pluck(drts.head); if(drts.n == npendingwrites-1) rwakeup(&drts.isfull); qunlock(&drts.lck); if(chatty9p > 4 && b!=nil) dprint("getwrite b->blkno %llud\n", b->blkno); if((n = devwrite(b->blkno, b->payload, b->len)) != b->len*Unit){ dprint("%s\n", errstring[Esystem]); panic("error writing block %llud: %llud bytes: %r\n", b->blkno, n); } if(chatty9p > 4) dprint("dowrite %llud wunlock()'ed\n", b->blkno); decref(&b->iobuf->dirties); freememunits(b->payload, b->len); free(b); if(chatty9p > 4 && b!=nil) stats(); } void initwriter(void) { char name[Namelen]; // set the locks used by the Rendezes drts.isempty.l = &drts.lck; drts.isfull.l = &drts.lck; switch(rfork(RFPROC|RFMEM)){ case -1: panic("can't fork"); case 0: if(chatty9p > 4) dprint("writer started\n"); break; default: return; } snprint(name, Namelen, "%s writer", service); procsetname(name); while(stopwrites == 0 || drts.n > 0){ dowrite(); } if(chatty9p > 4) dprint("%s process exited\n", name); exits(nil); } void stopwriter(void) { u64 n; stopwrites = 1; do{ qlock(&drts.lck); if(chatty9p > 4) dprint("stopwriter drts.n %d\n", drts.n); if(drts.n == 0) rwakeup(&drts.isempty); n = drts.n; qunlock(&drts.lck); if(n == 0) return; else sleep(1000); }while(n > 0); } static void stats(void) { dprint("dirties nwrites %d hd %llud tl %llud\n", drts.n, drts.head == nil ? 0 : drts.head->blkno, drts.tail == nil ? 0 : drts.tail->blkno); } u64 pendingwrites(void) { u64 n; qlock(&drts.lck); n = drts.n; if(chatty9p>4) stats(); qunlock(&drts.lck); return n; } void sync(void) { while(drts.n > 0) sleep(1000); }