ref: 5076a214bf64da3c52f49e242003d91a17940308
parent: 8ba26cf58ea5df0dbd29149ed04794fff0c8bf87
author: 9ferno <[email protected]>
date: Tue Oct 18 18:25:25 EDT 2022
added a separate writer process
--- a/9p.c
+++ b/9p.c
@@ -856,7 +856,7 @@
a = 0;
for(i = 0; i<Nprocs; i++){
if(worker[i].w.f == nil){
- if(worker[i].pid > 0){
+ if(worker[i].pid > 0 && worker[i].pid != getpid()){
// rwakeup(&buf.isempty); TODO why can't I get this to work?
postnote(PNPROC, worker[i].pid, "interrupt");
worker[i].pid = 0;
@@ -888,7 +888,7 @@
void
shutdown(void)
{
- int i, a, b;
+ int i, a;
/* User *u, *v; */
char srvfilename[Namelen];
@@ -903,9 +903,9 @@
// rwakeupall(&buf.isempty);
for(i = 0; i< 10; i++){
a = stopworkers();
- b = stopwriters();
- sleep((a>b?a:b)*1000);
+ sleep(a*1000);
}
+ stopwriter();
if(a > 1){ /* 1 for this running process */
dprint("%d processes still running\n", a);
for(i = 0; i<Nprocs; i++){
--- a/all.h
+++ b/all.h
@@ -78,13 +78,15 @@
Iobuf* getbufchk(u64 blkno, u16 len, u8 readonly, int tag, u64 qpath);
void iobufinit(void);
void putbuf(Iobuf *p);
+void putbuffree(Iobuf *p);
void settag(Iobuf *p, int tag, u64 qpath);
void showbuf(Iobuf *p);
/* writer functions */
-void initwriters(u8 nws);
+void initwriter(void);
void putwrite(Iobuf *b);
-int stopwriters(void);
+Iobuf *rmwrite(Iobuf *b);
+void stopwriter(void);
/* routines to manipulate the contents */
Iobuf* allocblocks(u16 len, int tag, u64 qpath);
--- a/iobuf.c
+++ b/iobuf.c
@@ -105,6 +105,8 @@
wlock(p);
if(chatty9p > 4)
dprint(" after wlock() blkno %llud\n", blkno);
+ if(p->dirty)
+ panic("p->dirty with p->len != len\n");
free(p->xiobuf);
p->xiobuf = emalloc9p(len*Rawblocksize);
p->len = len;
@@ -126,6 +128,13 @@
wlock(p);
if(chatty9p > 4)
dprint(" after wlock() blkno %llud\n", blkno);
+ /* as we lock from top down, this should ensure that the
+ lower blocks in the hierarchy are removed from the write
+ queue only after the top blocks have been removed or erased.
+ */
+ if(p->dirty)
+ rmwrite(p); /* remove the pending write as it will be
+ written by putbuf() anyway */
}
decref(p);
return p;
@@ -140,9 +149,14 @@
Ncollisions is a soft limit.
*/
if(ncollisions >= Ncollisions){
+Another:
do{
p = s->back;
if(p->ref == 0 && p->dirty == 0 && canwlock(p)){
+ if(p->dirty){
+ wunlock(p);
+ goto Another;
+ }
if(p->len != len){
free(p->xiobuf);
p->xiobuf = emalloc9p(len*Rawblocksize);
@@ -301,6 +315,50 @@
}else if(srcbno == config.root.srcbno){
for(i=0; i<Nbkp; i++)
bkp(srcbno, buf, config.root.dest[i], Qproot0+i*3);
+ }
+ }
+}
+
+/* only caller is freeblockbuf().
+ These blocks do not need to be written to the disk.
+ Hence, avoiding putwrite().
+ */
+void
+putbuffree(Iobuf *p)
+{
+ if(p == nil){
+ panic("putbuffree p == nil called by %#p\n", getcallerpc(&p));
+ dprint("%s\n", errstring[Ephase]);
+ return;
+ }
+ if(p->io == nil){
+ showbuf(p);
+ panic("putbuffree p->io == nil by %#p\n", getcallerpc(&p));
+ dprint("%s\n", errstring[Ephase]);
+ return;
+ }
+ if(p->len == 0){
+ showbuf(p);
+ panic("putbuffree p->len == 0 by %#p\n", getcallerpc(&p));
+ dprint("%s\n", errstring[Ephase]);
+ return;
+ }
+
+ if(chatty9p > 4)
+ dprint("putbuffree p->blkno %llud\n", p->blkno);
+
+ if(p->readers){
+ chkrunlock(p);
+ if(chatty9p > 4)
+ dprint(" .. runlock()'ed\n");
+ }else{
+ if(canwlock(p)){
+ panic("putbuffree: buffer not locked %llud\n", p->blkno);
+ }
+ p->dirty = 0;
+ if(chkwunlock(p) == 0){
+ showbuf(p);
+ panic("putbuffree chkwunlock(p) == 0 called by %#p\n", getcallerpc(&p));
}
}
}
--- a/mafs.c
+++ b/mafs.c
@@ -42,7 +42,7 @@
static char *nets[8];
int doream, stdio, netc;
char buf[Namelen];
- int pid, ctl, nwriters;
+ int pid, ctl;
progname = "mafs";
procname = "init";
@@ -55,7 +55,6 @@
rfork(RFNAMEG|RFNOTEG|RFREND);
nbuckets = Nbuckets;
- nwriters = Nwriters;
sfd = -1;
doream = stdio = netc = 0;
@@ -70,7 +69,6 @@
case 'D': chatty9p++; break;
case 'f': devfile = ARGF(); break;
case 'h': nbuckets = atoi(EARGF(usage())); break;
- case 'w': nwriters = atoi(EARGF(usage())); break;
case 'r':
doream = 1;
/* fall through */
@@ -106,7 +104,7 @@
formatinit();
tlocks = emalloc9p(NTLOCK * sizeof *tlocks);
- initwriters(nwriters);
+ initwriter();
iobufinit();
/*
--- a/sub.c
+++ b/sub.c
@@ -124,7 +124,7 @@
/* clear the buf to avoid leaks on reuse */
memset(buf->io, 0, buf->len*Rawblocksize);
bfree(&frees, buf->blkno, buf->len);
- putbuf(buf);
+ putbuffree(buf);
}
/* add the block to the extents used to manage free blocks */
--- a/writer.c
+++ b/writer.c
@@ -5,45 +5,46 @@
Npendingwrites = 32,
};
+/* Only 1 writer to maintain the sequence of writes.
+ Else, the directory entry could get written before the directory
+ content by another write process.
+ This creates a mess to recover on a crash.
+ */
+
/* below is from nemo's Pg 252 */
typedef struct Dirties Dirties;
-typedef struct Writer Writer;
struct Dirties
{
QLock lck;
Iobuf *head, *tail;
- u32 n;
+ s32 n;
Rendez isempty;
-} drts;
+} drts = {0};
-struct Writer
-{
- u32 pid;
- u64 blkno;
-}; /* keeps track of running procs to flush */
+u8 stopwrites = 0;
-Writer *writers; /* array of writer proc pids */
-u16 nwriters;
-
static Iobuf *
pluck(Iobuf *b)
{
- if(b->prevdirty != nil && b->nextdirty != nil){
- /* somewhere in the middle */
- b->nextdirty->prevdirty = b->prevdirty;
- b->prevdirty->nextdirty = b->nextdirty;
- b->prevdirty = b->nextdirty = nil;
- goto Done;
- }else if(b->prevdirty == nil && b->nextdirty == nil){
- drts.head = drts.tail = nil;
+ if(b == nil)
+ return nil;
+ else if(b->prevdirty == nil && b->nextdirty == nil){
/* only one */
+ drts.head = drts.tail = nil;
goto Done;
}else if(b->prevdirty == nil){
/* first in the linked list */
drts.head = b->nextdirty;
b->nextdirty = nil;
+ drts.head->prevdirty = nil;
goto Done;
+ }else if(b->prevdirty != nil && b->nextdirty != nil){
+ /* somewhere in the middle */
+ b->nextdirty->prevdirty = b->prevdirty;
+ b->prevdirty->nextdirty = b->nextdirty;
+ b->prevdirty = b->nextdirty = nil;
+ goto Done;
}else if(b->nextdirty == nil){
/* last in the linked list */
drts.tail = b->prevdirty;
@@ -51,9 +52,12 @@
b->prevdirty = nil;
goto Done;
}
- panic("pluck: blkno %llud should not be here\n", b->blkno);
+ panic("pluck should not be here\n");
+ return nil; // too late, b was written already
Done:
drts.n--;
+ if(drts.n < 0)
+ panic("drts.n < 0\n");
return b;
}
@@ -61,10 +65,12 @@
Iobuf *
rmwrite(Iobuf *b)
{
+ Iobuf *p;
+
qlock(&drts.lck);
- pluck(b);
+ p = pluck(b);
qunlock(&drts.lck);
- return b;
+ return p;
}
Iobuf *
@@ -73,23 +79,18 @@
Iobuf *b;
qlock(&drts.lck);
-Again:
if(drts.n == 0){
+ if(stopwrites){
+ qunlock(&drts.lck);
+ return nil;
+ }
rsleep(&drts.isempty);
- if(shuttingdown)
+ if(drts.n == 0 && stopwrites){
+ qunlock(&drts.lck);
return nil;
- }
- for(b=drts.head; b->nextdirty != nil; b=b->nextdirty){
- if(canwlock(b)){
- pluck(b);
- break;
- }else{
- dprint("getwrite: could not lock block %llud\n", b->blkno);
- continue;
}
}
- if(b == nil)
- goto Again; /* only when the putbuf() is still adding to dirties */
+ b = pluck(drts.head);
qunlock(&drts.lck);
return b;
}
@@ -100,29 +101,30 @@
void
putwrite(Iobuf *b)
{
+ if(chatty9p > 4)
+ dprint("putwrite p->blkno %llud\n", b->blkno);
qlock(&drts.lck);
if(drts.n > Npendingwrites){
+ if(drts.n < 0)
+ panic("putwrite drts.n < 0\n");
stats();
}
b->dirty = 1;
if(drts.head == nil){
drts.head = drts.tail = b;
- rwakeup(&drts.isempty);
}else{
drts.tail->nextdirty = b;
b->prevdirty = drts.tail;
drts.tail = b;
}
- drts.n++;
+ if(drts.head == drts.tail)
+ rwakeup(&drts.isempty);
if(chkwunlock(b) == 0){
showbuf(b);
- panic("putbuf chkwunlock(p) == 0 called by %#p\n", getcallerpc(&b));
+ panic("putwrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&b));
}
- if(chatty9p > 4)
- dprint(" .. wunlock()'ed\n");
+ drts.n++;
qunlock(&drts.lck);
- if(chatty9p > 1)
- stats();
}
void
@@ -130,9 +132,11 @@
{
u64 n;
- if(canwlock(p)){
- panic("putbuf: buffer not locked %llud\n", p->blkno);
- }
+ if(chatty9p > 4)
+ dprint("dowrite p->blkno %llud\n", p->blkno);
+ wlock(p);
+ if(chatty9p > 4)
+ dprint("dowrite p->blkno %llud locked\n", p->blkno);
p->io->dirty = 1;
if((n = devwrite(p->blkno, p->io, p->len)) != p->len*Rawblocksize){
dprint("%s\n", errstring[Esystem]);
@@ -142,90 +146,74 @@
p->io->dirty = 0;
devwritedirtyclear(p->blkno);
p->dirty = 0;
+ n = p->blkno;
if(chkwunlock(p) == 0){
showbuf(p);
- panic("putbuf chkwunlock(p) == 0 called by %#p\n", getcallerpc(&p));
+ panic("dowrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&p));
}
if(chatty9p > 4)
- dprint(" .. wunlock()'ed\n");
+ dprint("dowrite %llud wunlock()'ed\n", n);
}
void
-startwriter(int id)
+initwriter(void)
{
- char name[16];
Iobuf *b;
+ char name[Namelen];
- switch(rfork(RFPROC|RFMEM|RFFDG)){
+ // set the locks used by the Rendezes
+ drts.isempty.l = &drts.lck;
+
+ switch(rfork(RFPROC|RFMEM)){
case -1:
panic("can't fork");
case 0:
- if(chatty9p > 1)
- dprint("child %d pid: %d\n", id, getpid());
+ if(chatty9p > 4)
+ dprint("writer started\n");
break;
default:
return;
}
- writers[id].pid = id;
procname = name;
- snprint(name, 16, "mafs writer %d", id);
+ snprint(name, Namelen, "%s writer", service);
procsetname(name);
- while((b=getwrite()) != nil)
- dowrite(b);
+ while(stopwrites == 0 || drts.n > 0){
+ b=getwrite();
+ if(b != nil)
+ dowrite(b);
+ }
+ if(chatty9p > 4)
dprint("%s process exited\n", name);
_exits(nil);
}
void
-initwriters(u8 nws)
+stopwriter(void)
{
- u8 i;
+ u64 n;
- nwriters = nws;
- // release all locks, set everything to null values
- memset(&drts, 0, sizeof(drts));
- writers = malloc(sizeof(Writer)*nwriters);
-
- // set the locks used by the Rendezes
- drts.isempty.l = &drts.lck;
-
- for(i = 0; i < nwriters; i++)
- startwriter(i);
+ stopwrites = 1;
+ do{
+ qlock(&drts.lck);
+ if(chatty9p > 4)
+ dprint("stopwriter drts.n %d\n", drts.n);
+ if(drts.n == 0)
+ rwakeup(&drts.isempty);
+ n = drts.n;
+ qunlock(&drts.lck);
+ if(n == 0)
+ return;
+ else
+ sleep(1000);
+ }while(n > 0);
}
-int
-stopwriters(void)
-{
- int i, a;
-
- a = 0;
- for(i = 0; i<nwriters; i++){
- if(writers[i].blkno == 0){
- // rwakeup(&buf.isempty); TODO why can't I get this to work?
- postnote(PNPROC, writers[i].pid, "interrupt");
- writers[i].pid = 0;
- }else
- a++;
- }
- dprint("%d processes still writing, %llud writes still pending\n", a, drts.n);
- return a;
-}
-
static void
stats(void)
{
- int n, w, inv, i;
-
- n = w = inv = 0;
- for(i = 0; i<Nprocs; i++){
- if(writers[i].pid == 0)
- inv++;
- else if(writers[i].blkno == 0)
- n++;
- else if(writers[i].blkno > 0)
- w++;
- }
- dprint("Nprocs %d inv %d idle %d working %d dirties nworks %d hd %p tl %p\n",
- nwriters, inv, n, w, drts.n, drts.head, drts.tail);
+ dprint("dirties nwrites %d hd %llud tl %llud\n",
+ drts.n,
+ drts.head == nil ? 0 : drts.head->blkno,
+ drts.tail == nil ? 0 : drts.tail->blkno);
}