ref: f7a4f9314d76309fc5ec064014ace3ec9a112a24
parent: 639debe97b02705a3285bd7db0e934ed4699e4f1
author: 9ferno <[email protected]>
date: Sat Oct 23 06:13:31 EDT 2021
import 9front proc.c and qio.c
--- a/os/pc/irq.c
+++ b/os/pc/irq.c
@@ -71,7 +71,7 @@
if(up->delaysched)
sched();
} else {
- preempted();
+ preemption(0);
}
}
return 1;
--- a/os/pc64/dat.h
+++ b/os/pc64/dat.h
@@ -188,7 +188,9 @@
u64 ticks; /* of the clock since boot time */
Proc* proc; /* current process on this processor */
- Label sched; /* scheduler wakeup */
+
+ PMach;
+
Lock alarmlock; /* access to alarm list */
void* alarm; /* alarms bound to this clock */
u32 inclockintr;
--- a/os/pc64/fns.h
+++ b/os/pc64/fns.h
@@ -162,6 +162,7 @@
void pcmunmap(int, PCMmap*);
void pmap(uintptr, u64, s64);
void poolsizeinit(void);
+void procrestore(Proc*);
void procsave(Proc*);
void procsetup(Proc*);
void punmap(uintptr, uvlong);
--- a/os/pc64/main.c
+++ b/os/pc64/main.c
@@ -285,7 +285,10 @@
Proc *p;
Osenv *o;
- p = newproc();
+ while((p = newproc()) == nil){
+/* TODO freebroken(); */
+ resrcwait("no procs for userinit");
+ }
o = p->env;
o->fgrp = newfgrp(nil);
@@ -374,7 +377,22 @@
* You might think it would be a win not to do this in that case,
* especially on VMware, but it turns out not to matter.
*/
+ /* TODO obsolete with linear memory? */
mmuflushtlb();
+}
+void
+procrestore(Proc *p)
+{
+ if(p->dr[7] != 0){
+ m->dr7 = p->dr[7];
+ putdr(p->dr);
+ }
+
+/* if(p->vmx != nil)
+ vmxprocrestore(p);
+*/
+
+ fpuprocrestore(p);
}
void
--- a/os/port/dev.c
+++ b/os/port/dev.c
@@ -50,14 +50,53 @@
}
/*
+ * (here, Devgen is the prototype; devgen is the function in dev.c.)
+ *
+ * a Devgen is expected to return the directory entry for ".."
+ * if you pass it s==DEVDOTDOT (-1). otherwise...
+ *
+ * there are two contradictory rules.
+ *
+ * (i) if c is a directory, a Devgen is expected to list its children
+ * as you iterate s.
+ *
+ * (ii) whether or not c is a directory, a Devgen is expected to list
+ * its siblings as you iterate s.
+ *
+ * devgen always returns the list of children in the root
+ * directory. thus it follows (i) when c is the root and (ii) otherwise.
+ * many other Devgens follow (i) when c is a directory and (ii) otherwise.
+ *
+ * devwalk assumes (i). it knows that devgen breaks (i)
+ * for children that are themselves directories, and explicitly catches them.
+ *
+ * devstat assumes (ii). if the Devgen in question follows (i)
+ * for this particular c, devstat will not find the necessary info.
+ * with our particular Devgen functions, this happens only for
+ * directories, so devstat makes something up, assuming
+ * c->name, c->qid, eve, DMDIR|0555.
+ *
+ * devdirread assumes (i). the callers have to make sure
+ * that the Devgen satisfies (i) for the chan being read.
+ */
+/*
* the zeroth element of the table MUST be the directory itself for ..
*/
int
-devgen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp)
+devgen(Chan *c, char *name, Dirtab *tab, int ntab, int i, Dir *dp)
{
if(tab == 0)
return -1;
- if(i != DEVDOTDOT){
+ if(i == DEVDOTDOT){
+ /* nothing */
+ }else if(name){
+ for(i=1; i<ntab; i++)
+ if(strcmp(tab[i].name, name) == 0)
+ break;
+ if(i==ntab)
+ return -1;
+ tab += i;
+ }else{
/* skip over the first element, that for . itself */
i++;
if(i >= ntab)
@@ -164,7 +203,11 @@
continue;
}
if(strcmp(n, "..") == 0){
- (*gen)(nc, nil, tab, ntab, DEVDOTDOT, &dir);
+ if((*gen)(nc, nil, tab, ntab, DEVDOTDOT, &dir) != 1){
+ print("devgen walk .. in dev%s %llux broken\n",
+ devtab[nc->type]->name, nc->qid.path);
+ error("broken devgen");
+ }
nc->qid = dir.qid;
goto Accept;
}
--- a/os/port/devmnt.c
+++ b/os/port/devmnt.c
@@ -73,6 +73,8 @@
int rpcattn(void*);
Chan* mntchan(void);
+#define cachedchan(c) (((c)->flag & CCACHE) != 0 && (c)->mcp != nil)
+
char Esbadstat[] = "invalid directory entry received from server";
char Enoversion[] = "version not established for mount channel";
@@ -95,7 +97,7 @@
/*
* Version is not multiplexed: message sent only once per connection.
*/
-long
+int
mntversion(Chan *c, char *version, int msize, int returnlen)
{
Fcall f;
@@ -719,7 +721,7 @@
nr = nreq;
if(type == Tread)
- r->b = bl2mem((uchar*)uba, r->b, nr);
+ nr = readblist(r->b, (uchar*)uba, nr, 0);
else if(cache)
cwrite(c, (uchar*)uba, nr, off);
--- a/os/port/nocache.c
+++ b/os/port/nocache.c
@@ -13,10 +13,11 @@
{
}
-void
+int
copen(Chan *c)
{
c->flag &= ~CCACHE;
+ return 0;
}
int
@@ -47,3 +48,14 @@
USED(off);
}
+void
+ctrunc(Chan *c)
+{
+ USED(c);
+}
+
+void
+cclunk(Chan *c)
+{
+ USED(c);
+}
--- a/os/port/portclock.c
+++ b/os/port/portclock.c
@@ -17,7 +17,7 @@
ulong intrcount[MAXMACH];
ulong fcallcount[MAXMACH];
-static uvlong
+static vlong
tadd(Timers *tt, Timer *nt)
{
Timer *t, **last;
@@ -29,12 +29,10 @@
panic("timer");
break;
case Trelative:
- assert(nt->tns > 0);
+ if(nt->tns <= 0)
+ nt->tns = 1;
nt->twhen = fastticks(nil) + ns2fastticks(nt->tns);
break;
- case Tabsolute:
- nt->twhen = tod2fastticks(nt->tns);
- break;
case Tperiodic:
assert(nt->tns >= 100000); /* At least 100 µs period */
if(nt->twhen == 0){
@@ -94,15 +92,6 @@
Timers *tt;
vlong when;
- if (nt->tmode == Tabsolute){
- when = todget(nil);
- if (nt->tns <= when){
- // if (nt->tns + MS2NS(10) <= when) /* small deviations will happen */
- // print("timeradd (%lld %lld) %lld too early 0x%lux\n",
- // when, nt->tns, when - nt->tns, getcallerpc(&nt));
- nt->tns = when;
- }
- }
/* Must lock Timer struct before Timers struct */
ilock(nt);
if(tt = nt->tt){
@@ -123,9 +112,13 @@
void
timerdel(Timer *dt)
{
+ Mach *mp;
Timers *tt;
uvlong when;
+ /* avoid Tperiodic getting re-added */
+ dt->tmode = Trelative;
+
ilock(dt);
if(tt = dt->tt){
ilock(tt);
@@ -134,7 +127,16 @@
timerset(tt->head->twhen);
iunlock(tt);
}
+ if((mp = dt->tactive) == nil || mp->machno == m->machno){
+ iunlock(dt);
+ return;
+ }
iunlock(dt);
+
+ /* rare, but tf can still be active on another cpu */
+ while(dt->tactive == mp && dt->tt == nil)
+ if(up->nlocks == 0 && islo())
+ sched();
}
void
@@ -144,6 +146,7 @@
if(m->proc)
m->proc->pc = ur->pc;
+ /* accounttime(); */
kmapinval();
if(kproftick != nil)
@@ -157,7 +160,8 @@
exit(0);
}
- checkalarms();
+ if(m->machno == 0)
+ checkalarms();
if(up && up->state == Running){
if(anyready()){
@@ -168,13 +172,12 @@
}
void
-timerintr(Ureg *u, uvlong)
+timerintr(Ureg *u, Tval)
{
Timer *t;
Timers *tt;
uvlong when, now;
int callhzclock;
- static int sofar;
intrcount[m->machno]++;
callhzclock = 0;
@@ -199,6 +202,7 @@
tt->head = t->tnext;
assert(t->tt == tt);
t->tt = nil;
+ t->tactive = MACHP(m->machno);
fcallcount[m->machno]++;
iunlock(tt);
if(t->tf)
@@ -205,6 +209,7 @@
(*t->tf)(u, t);
else
callhzclock++;
+ t->tactive = nil;
ilock(tt);
if(t->tmode == Tperiodic)
tadd(tt, t);
@@ -217,8 +222,13 @@
{
Timer *t;
+ /*
+ * T->tf == nil means the HZ clock for this processor.
+ */
todinit();
t = malloc(sizeof(*t));
+ if(t == nil)
+ panic("timersinit: no memory for Timer");
t->tmode = Tperiodic;
t->tt = nil;
t->tns = 1000000000/HZ;
@@ -234,6 +244,8 @@
/* Synchronize to hztimer if ms is 0 */
nt = malloc(sizeof(Timer));
+ if(nt == nil)
+ panic("addclock0link: no memory for Timer");
if(ms == 0)
ms = 1000/HZ;
nt->tns = (vlong)ms*1000000LL;
--- a/os/port/portdat.h
+++ b/os/port/portdat.h
@@ -28,6 +28,7 @@
typedef struct Perf Perf;
typedef struct Pgrp Pgrp;
typedef struct Proc Proc;
+typedef struct PMach PMach;
typedef struct QLock QLock;
typedef struct Queue Queue;
typedef struct Ref Ref;
@@ -35,6 +36,7 @@
typedef struct Rept Rept;
typedef struct Rootdata Rootdata;
typedef struct RWlock RWlock;
+typedef struct Schedq Schedq;
typedef struct Signerkey Signerkey;
typedef struct Skeyset Skeyset;
typedef struct Talarm Talarm;
@@ -425,14 +427,16 @@
struct Timer
{
/* Public interface */
- s32 tmode; /* See above */
- s64 tns; /* meaning defined by mode */
+ int tmode; /* See above */
+ vlong tns; /* meaning defined by mode */
void (*tf)(Ureg*, Timer*);
void *ta;
/* Internal */
Lock;
+ Mach *tactive; /* The cpu that tf is active on */
Timers *tt; /* Timers queue this timer runs on */
- s64 twhen; /* ns represented in fastticks */
+ Tval tticks; /* tns converted to ticks */
+ Tval twhen; /* ns represented in fastticks */
Timer *tnext;
};
@@ -476,6 +480,14 @@
Nrq
};
+struct Schedq
+{
+ Lock;
+ Proc* head;
+ Proc* tail;
+ int n;
+};
+
struct Proc
{
Label sched; /* known to l.s */
@@ -502,8 +514,9 @@
Rendez sleep; /* place for syssleep/debug */
s32 killed; /* by swiproc */
s32 kp; /* true if a kernel process */
+ Proc *palarm; /* Next alarm time */
u32 alarm; /* Time of call */
- s32 pri; /* scheduler priority */
+ s32 priority; /* scheduler priority */
u32 twhen;
Timer; /* For tsleep and real-time */
@@ -525,7 +538,7 @@
Mach* mp; /* machine this process last ran on */
Mach* wired;
int nlocks; /* number of locks held by proc */
- u32 movetime; /* next time process should switch processors */
+ /* obsoleted u32 movetime; */ /* next time process should switch processors */
u32 delaysched;
s32 preempted; /* process yielding in interrupt */
uintptr qpc; /* last call that blocked in qlock */
@@ -533,6 +546,16 @@
s32 dbgstop; /* don't run this kproc */
Edf* edf; /* if non-null, real-time proc, edf contains scheduling params */
+ /*
+ * pcycles: cycles spent in this process (updated on procswitch)
+ * when this is the current proc and we're in the kernel
+ * (procrestores outnumber procsaves by one)
+ * the number of cycles spent in the proc is pcycles + cycles()
+ * when this is not the current process or we're in user mode
+ * (procrestores and procsaves balance), it is pcycles.
+ */
+ s64 pcycles;
+
PMMU; /* TODO obsolete? machine specific mmu state */
};
@@ -761,6 +784,30 @@
WATCHEX = 4,
} type;
uintptr addr, len;
+};
+
+struct PMach
+{
+ Proc* readied; /* for runproc */ /* unused in inferno */
+ Label sched; /* scheduler wakeup */
+ ulong ticks; /* of the clock since boot time */
+ ulong schedticks; /* next forced context switch */
+
+ int pfault;
+ int cs;
+ int syscall;
+ int load;
+ int intr;
+ int ilockdepth;
+
+ int flushmmu; /* make current proc flush it's mmu state */
+
+ int tlbfault;
+ int tlbpurge;
+
+ Perf perf; /* performance counters */
+
+ uvlong cyclefreq; /* Frequency of user readable cycle counter */
};
/* queue state bits, Qmsg, Qcoalesce, and Qkick can be set in qopen */
--- a/os/port/portfns.h
+++ b/os/port/portfns.h
@@ -7,7 +7,6 @@
int anyhigher(void);
int anyready(void);
void _assert(char*);
-Block* bl2mem(uchar*, Block*, int);
int blocklen(Block*);
int breakhit(Ureg *ur, Proc*);
void callwithureg(void(*)(Ureg*));
@@ -37,9 +36,11 @@
void cnameclose(Cname*);
Block* concatblock(Block*);
void confinit(void);
-void copen(Chan*);
+int copen(Chan*);
+void cclunk(Chan*);
Block* copyblock(Block*, int);
int cread(Chan*, uchar*, int, vlong);
+void ctrunc(Chan*);
Chan* cunique(Chan*);
Chan* createdir(Chan*, Mhead*);
void cunmount(Chan*, Chan*);
@@ -50,7 +51,7 @@
void cursoroff(void);
void cwrite(Chan*, uchar*, int, vlong);
void debugkey(Rune, char *, void(*)(), int);
-int decref(Ref*);
+s32 decref(Ref*);
Chan* devattach(int, char*);
Block* devbread(Chan*, s32, u32);
s32 devbwrite(Chan*, Block*, u32);
@@ -114,7 +115,7 @@
Block* iallocb(int);
void iallocsummary(void);
void ilock(Lock*);
-int incref(Ref*);
+s32 incref(Ref*);
void iomapinit(u32);
s32 ioreservewin(u32, u32, u32, u32, char*);
int iprint(char*, ...);
@@ -147,6 +148,7 @@
void logb(Log*, int, char*, ...);
#define pragma varargck argpos logb 3
Cmdtab* lookupcmd(Cmdbuf*, Cmdtab*, int);
+#define MS2NS(n) (((s64)(n))*1000000LL)
void machinit(void);
extern void machbreakinit(void);
extern Instr machinstr(ulong addr);
@@ -169,9 +171,10 @@
void mkqid(Qid*, vlong, ulong, int);
void modinit(void);
Chan* mntauth(Chan*, char*);
-long mntversion(Chan*, char*, int, int);
+int mntversion(Chan*, char*, int, int);
void mountfree(Mount*);
void mousetrack(int, int, int, int);
+ulong ms2tk(ulong);
uvlong ms2fastticks(ulong);
ulong msize(void*);
void mul64fract(uvlong*, uvlong, uvlong);
@@ -218,7 +221,7 @@
Block* pullupqueue(Queue*, int);
void putmhead(Mhead*);
void putstrn(char*, int);
-void qaddlist(Queue*, Block*);
+int qaddlist(Queue*, Block*);
Block* qbread(Queue*, int);
long qbwrite(Queue*, Block*);
Queue* qbypass(void (*)(void*, Block*), void*);
@@ -252,6 +255,7 @@
void randominit(void);
ulong randomread(void*, ulong);
void* realloc(void*, ulong);
+long readblist(Block*, uchar*, long, ulong);
int readnum(ulong, char*, ulong, ulong, int);
int readnum_vlong(ulong, char*, ulong, vlong, int);
int readstr(ulong, char*, ulong, char*);
@@ -285,7 +289,7 @@
void timeradd(Timer*);
void timerdel(Timer*);
void timersinit(void);
-void timerintr(Ureg*, uvlong);
+void timerintr(Ureg*, s64);
void timerset(s64);
ulong tk2ms(ulong);
#define TK2MS(x) ((x)*(1000/HZ))
@@ -314,7 +318,7 @@
void validname(char*, int);
void validstat(uchar*, int);
void validwstatname(char*);
-int wakeup(Rendez*);
+Proc* wakeup(Rendez*);
int walk(Chan**, char**, int, int, int*);
void werrstr(char*, ...);
void wlock(RWlock*);
--- a/os/port/proc.c
+++ b/os/port/proc.c
@@ -7,10 +7,21 @@
#include <interp.h>
Ref pidalloc;
+int schedgain = 30; /* units in seconds */
+int nrdy;
-typedef struct Schedq Schedq;
+ulong delayedscheds; /* statistics */
+ulong skipscheds;
+ulong preempts;
-struct
+/* bitmap of priorities of procs ready to run. When any process is
+ * queued, the bit corresponding to that priority gets set.
+ * when there are no more processes to run at a priority, then the
+ * corresponding bit gets cleared.
+ */
+static u32 occupied;
+
+static struct Procalloc
{
Lock;
Proc* arena;
@@ -17,19 +28,12 @@
Proc* free;
} procalloc;
-struct Schedq
-{
- Lock;
- Proc* head;
- Proc* tail;
-};
-
/* multiple run queues by priority, different from Brian's book
- * Per Brian's book, it is the struct Procs == struct Scheq.
+ * Per Brian's book, it is the struct Procs == struct Schedq.
* Now, inferno maintains multiple Schedq based on priority.
*/
static Schedq runq[Nrq];
-static ulong occupied;
+static ulong runvec;
int nrdy;
char *statename[] =
@@ -40,13 +44,28 @@
"Scheding",
"Running",
"Queueing",
+ "QueueingR",
+ "QueueingW",
"Wakeme",
"Broken",
"Stopped",
"Rendez",
+ "Waitrelease",
};
/*
+ * The kernel scheduler state is in m->sched. It is set to the address
+ * of schedinit().
+ * When sched() is switching processes, it transfers control to the kernel
+ * scheduler by using the m->sched label. The kernel scheduler then updates
+ * the running process's state and picks the next process to run.
+ * By using m->sched, we are creating a loop from sched() to schedinit()
+ * after every process switch.
+ *
+ * inferno does not change the process priorities. So, ignoring updatecpu().
+ * process priorities are set with setpri()
+ */
+/*
* Always splhi()'ed.
*/
void
@@ -53,7 +72,7 @@
schedinit(void) /* never returns */
{
setlabel(&m->sched);
- if(up) {
+ if(up != nil) {
/*
if((e = up->edf) && (e->flags & Admitted))
edfrecord(up);
@@ -61,6 +80,10 @@
m->proc = nil;
switch(up->state) {
case Running:
+ /*
+ * process state from Runnning -> Ready
+ * gets put to the back of the run queue
+ */
ready(up);
break;
case Moribund:
@@ -72,15 +95,21 @@
up->edf = nil;
}
*/
- /*
- * Holding locks from pexit:
- * procalloc
- */
+
+ lock(&procalloc);
+ up->mach = nil;
up->qnext = procalloc.free;
procalloc.free = up;
+ /* proc is free now, make sure unlock() wont touch it */
+ up = procalloc.Lock.p = nil;
unlock(&procalloc);
- break;
+
+ sched();
}
+ coherence();
+ /* flag to indicate that the process has saved itself
+ * for the next run
+ */
up->mach = nil;
up = nil;
}
@@ -87,26 +116,93 @@
sched();
}
+static void
+procswitch(void)
+{
+ uvlong t;
+
+ /* statistics */
+ m->cs++;
+
+ cycles(&t);
+ up->pcycles += t;
+
+ procsave(up);
+
+ if(!setlabel(&up->sched))
+ gotolabel(&m->sched);
+
+ /* process wakes up here next time */
+ procrestore(up);
+
+ cycles(&t);
+ up->pcycles -= t;
+}
+
+/*
+ * If changing this routine, look also at sleep(). It
+ * contains a copy of the guts of sched().
+ */
void
sched(void)
{
- if(up) {
- splhi();
- procsave(up);
- if(setlabel(&up->sched)) {
- /* procrestore(up); */
- spllo();
+ Proc *p;
+
+ if(m->ilockdepth)
+ panic("cpu%d: ilockdepth %d, last lock %#p at %#p, sched called from %#p",
+ m->machno,
+ m->ilockdepth,
+ up != nil ? up->lastilock: nil,
+ (up != nil && up->lastilock != nil) ? up->lastilock->pc: 0,
+ getcallerpc(&p+2));
+ if(up != nil) {
+ /*
+ * Delay the sched until the process gives up the locks
+ * it is holding. This avoids dumb lock loops.
+ * Don't delay if the process is Moribund.
+ * It called sched to die.
+ * But do sched eventually. This avoids a missing unlock
+ * from hanging the entire kernel.
+ * But don't reschedule procs holding palloc or procalloc.
+ * Those are far too important to be holding while asleep.
+ *
+ * This test is not exact. There can still be a few instructions
+ * in the middle of taslock when a process holds a lock
+ * but Lock.p has not yet been initialized.
+ */
+ if(up->nlocks)
+ if(up->state != Moribund)
+ if(up->delaysched < 20
+/* || palloc.Lock.p == up
+ || fscache.Lock.p == up
+*/
+ || procalloc.Lock.p == up){
+ up->delaysched++;
+ delayedscheds++;
return;
}
- gotolabel(&m->sched);
+ up->delaysched = 0;
+ splhi();
+ procswitch();
+ spllo();
+ return;
}
- up = runproc();
+ /* if up == nil, it is the scheduler process after the
+ * previous process state has been saved
+ */
+ p = runproc();
+ up = p;
up->state = Running;
- up->mach = MACHP(m->machno); /* m might be a fixed address; use MACHP */
+ up->mach = MACHP(m->machno);
m->proc = up;
gotolabel(&up->sched);
}
+/*
+ * ready(p) is simpler as we do not change process priorities
+ * puts the current process at the end of the run queue
+ * p->state = Running -> Ready
+ */
void
ready(Proc *p)
{
@@ -113,6 +209,11 @@
int s;
Schedq *rq;
+ if(p->state == Ready){
+ print("double ready %s %zud pc %p\n", p->text, p->pid, getcallerpc(&p));
+ return;
+ }
+
s = splhi();
/*
if(edfready(p)){
@@ -121,8 +222,12 @@
}
*/
- /* adding to the end of the queue of procs ready to run */
- rq = &runq[p->pri];
+/* 9front does this. Not sure what it does yet
+ if(up != p && (p->wired == nil || p->wired == MACHP(m->machno)))
+ m->readied = p; *//* group scheduling */
+
+ /* add to the end of the run queue */
+ rq = &runq[p->priority];
lock(runq);
p->rnext = 0;
if(rq->tail)
@@ -130,9 +235,9 @@
else
rq->head = p;
rq->tail = p;
-
+ rq->n++;
nrdy++;
- occupied |= 1<<p->pri;
+ occupied |= 1<<p->priority;
p->state = Ready;
unlock(runq);
splx(s);
@@ -142,43 +247,31 @@
anyready(void)
{
/* same priority only */
- return occupied & (1<<up->pri);
+ return occupied & (1<<up->priority);
}
+/*
+ * the higher the priority, the lower the number
+ * unlike 9front
+ */
int
anyhigher(void)
{
- return occupied & ((1<<up->pri)-1);
+ return occupied & ((1<<up->priority)-1);
}
-int
-preemption(int tick)
-{
- if(up != nil && up->state == Running && !up->preempted &&
- (anyhigher() || tick && anyready())){
- up->preempted = 1;
- sched();
- splhi();
- up->preempted = 0;
- return 1;
- }
- return 0;
-}
-
-/* this is from 9front, above is inferno's */
/*
* here at the end of non-clock interrupts to see if we should preempt the
* current process. Returns 1 if preempted, 0 otherwise.
+ * similar to 9front's preempted()
*/
int
-preempted(void)
+preemption(int tick)
{
- if(up != nil && up->state == Running)
- if(up->preempted == 0)
- if(anyhigher())
- if(anyready()){ /* TODO replaced the below 2 lines with this line. Have to test */
- /* if(!active.exiting){
- m->readied = nil; */ /* avoid cooperative scheduling */
+ if(up != nil && up->state == Running &&
+ up->preempted == 0 &&
+ active.exiting == 0 &&
+ (anyhigher() || tick && anyready())){
up->preempted = 1;
sched();
splhi();
@@ -191,45 +284,90 @@
Proc*
runproc(void)
{
- Proc *p, *l;
Schedq *rq, *erq;
+ Proc *p, *tp, *last;
+ u64 start, now;
+ int i;
+/* void (*pt)(Proc*, int, vlong); */
+ start = perfticks();
+ preempts++;
+
erq = runq + Nrq - 1;
loop:
+ /*
+ * find a process that last ran on this processor (affinity),
+ * or one that can be moved to this processor.
+ */
+ spllo();
+ for(i = 0;; i++){
+ /*
+ * find the highest priority target process that this
+ * processor can run given affinity constraints.
+ * when i == 0, thats where we pick the associated procs
+ * after this, we take anyone even from other cores
+ */
+ for(rq = runq; rq <= erq; rq++){
+ for(tp = rq->head; tp != nil; tp = tp->rnext){
+ if(tp->mp == nil || tp->mp == MACHP(m->machno)
+ || (tp->wired == nil && i > 0))
+ goto found;
+ }
+ }
+
+ /* waste time or halt the CPU */
+ idlehands();
+
+ /* remember how much time we're here */
+ now = perfticks();
+ m->perf.inidle += now-start;
+ start = now;
+ }
+
+found:
/* print("runproc\n");
procdump(); */
splhi();
- for(rq = runq; rq->head == 0; rq++)
- if(rq >= erq) {
- idlehands();
- spllo();
- goto loop;
- }
-
+ /*
+ * try to remove the process from a scheduling queue
+ * similar to 9front's dequeueproc()
+ */
if(!canlock(runq))
goto loop;
- /* choose first one we last ran on this processor at this level or hasn't moved recently */
- l = nil;
- for(p = rq->head; p != nil; p = p->rnext)
- if(p->mp == nil || p->mp == MACHP(m->machno) || p->movetime < MACHP(0)->ticks)
+
+ /*
+ * the queue may have changed before we locked runq,
+ * refind the target process.
+ */
+ last = nil;
+ for(p = rq->head; p != nil; p = p->rnext){
+ if(p == tp)
break;
- if(p == nil)
- p = rq->head;
- /* p->mach==0 only when process state is saved */
- if(p == 0 || p->mach) {
+ last = p;
+ }
+
+ /*
+ * p->mach==0 only when process state is saved
+ */
+ if(p == nil || p->mach != nil){
unlock(runq);
goto loop;
}
+ /* if p is the last in the run queue
+ * update run queue tail to point to the last */
if(p->rnext == nil)
- rq->tail = l;
- if(l)
- l->rnext = p->rnext;
+ rq->tail = last;
+ /* remove p from the linked list */
+ if(last != nil)
+ last->rnext = p->rnext;
else
rq->head = p->rnext;
+ /* no other procs in the run queue */
if(rq->head == nil){
rq->tail = nil;
- occupied &= ~(1<<p->pri);
+ occupied &= ~(1<<p->priority);
}
+ rq->n--;
nrdy--;
if(p->dbgstop){
p->state = Stopped;
@@ -237,35 +375,41 @@
goto loop;
}
if(p->state != Ready)
- print("runproc %s %ud %s\n", p->text, p->pid, statename[p->state]);
+ print("runproc %s %zud %s\n", p->text, p->pid, statename[p->state]);
unlock(runq);
+
p->state = Scheding;
- if(p->mp != MACHP(m->machno))
- p->movetime = MACHP(0)->ticks + HZ/10;
p->mp = MACHP(m->machno);
-/*
- if(edflock(p)){
- edfrun(p, rq == &runq[PriEdf]); // start deadline timer and do admin
+/* if(edflock(p)){
+ edfrun(p, rq == &runq[PriEdf]); *//* start deadline timer and do admin *//*
edfunlock();
}
+ pt = proctrace;
+ if(pt != nil)
+ pt(p, SRun, 0);
*/
return p;
}
int
-setpri(int pri)
+setpri(int priority)
{
int p;
/* called by up so not on run queue */
- p = up->pri;
- up->pri = pri;
+ p = up->priority;
+ up->priority = priority;
if(up->state == Running && anyhigher())
sched();
return p;
}
+/*
+ * TODO
+ * add p->wired and procwired()
+ * pid reuse
+ */
Proc*
newproc(void)
{
@@ -272,20 +416,18 @@
Proc *p;
lock(&procalloc);
- for(;;) {
- if(p = procalloc.free)
- break;
-
+ p = procalloc.free;
+ if(p == nil || (p->kstack == nil && (p->kstack = malloc(KSTACK)) == nil)){
unlock(&procalloc);
- resrcwait("no procs");
- lock(&procalloc);
+ return nil;
}
procalloc.free = p->qnext;
+ p->qnext = nil;
unlock(&procalloc);
p->type = Unknown;
p->state = Scheding;
- p->pri = PriNormal;
+ p->priority = PriNormal;
p->psstate = "New";
p->mach = 0;
p->qnext = 0;
@@ -294,7 +436,6 @@
p->killed = 0;
p->swipend = 0;
p->mp = 0;
- p->movetime = 0;
p->delaysched = 0;
p->edf = nil;
memset(&p->defenv, 0, sizeof(p->defenv));
@@ -321,8 +462,13 @@
int i;
print("procinit conf.nproc %d\n", conf.nproc);
- procalloc.free = xalloc(conf.nproc*sizeof(Proc));
- procalloc.arena = procalloc.free;
+ p = xalloc(conf.nproc*sizeof(Proc));
+ if(p == nil){
+ xsummary();
+ panic("cannot allocate %zud procs (%zudMB)", conf.nproc, conf.nproc*sizeof(Proc)/(1024*1024));
+ }
+ procalloc.arena = p;
+ procalloc.free = p;
p = procalloc.free;
for(i=0; i<conf.nproc-1; i++,p++)
@@ -332,6 +478,16 @@
/* debugkey('p', "processes", procdump, 0); */
}
+/*
+ * sleep if a condition is not true. Another process will
+ * awaken us after it sets the condition. When we awaken
+ * the condition may no longer be true.
+ *
+ * we lock both the process and the rendezvous to keep r->p
+ * and p->r synchronized.
+ * TODO
+ * 9front checks up->notepending instead of up->swipend
+ */
void
sleep(Rendez *r, int (*f)(void*), void *arg)
{
@@ -345,102 +501,112 @@
*/
s = splhi();
- lock(&up->rlock);
+ if(up->nlocks)
+ print("process %zud sleeps with %d locks held, last lock %#p locked at pc %#p, sleep called from %#p\n",
+ up->pid, up->nlocks, up->lastlock, up->lastlock->pc, getcallerpc(&r));
lock(r);
+ lock(&up->rlock);
+ if(r->p != nil){
+ print("double sleep called from %#p, %zud %zud\n", getcallerpc(&r), r->p->pid, up->pid);
+ dumpstack();
+ panic("sleep");
+ }
/*
+ * Wakeup only knows there may be something to do by testing
+ * r->p in order to get something to lock on.
+ * Flush that information out to memory in case the sleep is
+ * committed.
+ */
+ r->p = up;
+
+ /*
* if killed or condition happened, never mind
*/
if(up->killed || f(arg)){
+ r->p = nil;
+ unlock(&up->rlock);
unlock(r);
}else{
-
/*
* now we are committed to
* change state and call scheduler
*/
- if(r->p != nil) {
- print("double sleep pc=0x%zux %ud %ud r=0x%lux\n", getcallerpc(&r), r->p->pid, up->pid, r);
- dumpstack();
- panic("sleep");
- }
+/* pt = proctrace;
+ if(pt != nil)
+ pt(up, SSleep, 0); */
up->state = Wakeme;
- r->p = up;
+ up->r = r; /* for swiproc */
+ unlock(&up->rlock);
unlock(r);
up->swipend = 0;
- up->r = r; /* for swiproc */
unlock(&up->rlock);
-
- sched();
- splhi(); /* sched does spllo */
-
- lock(&up->rlock);
- up->r = nil;
+ procswitch();
}
if(up->killed || up->swipend) {
up->killed = 0;
up->swipend = 0;
- unlock(&up->rlock);
splx(s);
error(Eintr);
}
- unlock(&up->rlock);
splx(s);
}
-int
+static int
tfn(void *arg)
{
- return MACHP(0)->ticks >= up->twhen || (*up->tfn)(arg);
+ return up->trend == nil || up->tfn(arg);
}
void
-tsleep(Rendez *r, int (*fn)(void*), void *arg, int ms)
+twakeup(Ureg*, Timer *t)
{
- ulong when;
- Proc *f, **l;
+ Proc *p;
+ Rendez *trend;
- if(up == nil)
- panic("tsleep() not in process (0x%zux)", getcallerpc(&r));
+ p = t->ta;
+ trend = p->trend;
+ if(trend != nil){
+ p->trend = nil;
+ wakeup(trend);
+ }
+}
- when = MS2TK(ms)+MACHP(0)->ticks;
- lock(&talarm);
- /* take out of list if checkalarm didn't */
- if(up->trend) {
- l = &talarm.list;
- for(f = *l; f; f = f->tlink) {
- if(f == up) {
- *l = up->tlink;
- break;
- }
- l = &f->tlink;
- }
+void
+tsleep(Rendez *r, int (*fn)(void*), void *arg, s32 ms)
+{
+ if(up->tt != nil){
+ print("%s %lud: tsleep timer active: mode %d, tf %#p, pc %#p\n",
+ up->text, up->pid, up->tmode, up->tf, getcallerpc(&r));
+ timerdel(up);
}
- /* insert in increasing time order */
- l = &talarm.list;
- for(f = *l; f; f = f->tlink) {
- if(f->twhen >= when)
- break;
- l = &f->tlink;
- }
+ up->tns = MS2NS(ms);
+ up->tf = twakeup;
+ up->tmode = Trelative;
+ up->ta = up;
up->trend = r;
- up->twhen = when;
up->tfn = fn;
- up->tlink = *l;
- *l = up;
- unlock(&talarm);
+ timeradd(up);
if(waserror()){
- up->twhen = 0;
+ up->trend = nil;
+ timerdel(up);
nexterror();
}
sleep(r, tfn, arg);
- up->twhen = 0;
+ up->trend = nil;
+ timerdel(up);
poperror();
}
-int
+/*
+ * Expects that only one process can call wakeup for any given Rendez.
+ * We hold both locks to ensure that r->p and p->r remain consistent.
+ * Richard Miller has a better solution that doesn't require both to
+ * be held simultaneously, but I'm a paranoid - presotto.
+ */
+Proc*
wakeup(Rendez *r)
{
Proc *p;
@@ -447,19 +613,29 @@
int s;
s = splhi();
+
lock(r);
p = r->p;
- if(p){
- r->p = nil;
- if(p->state != Wakeme)
+
+ if(p != nil){
+ lock(&p->rlock);
+ if(p->state != Wakeme || p->r != r){
+ iprint("%p %p %d\n", p->r, r, p->state);
panic("wakeup: state");
+ }
+ r->p = nil;
+ p->r = nil;
ready(p);
+ unlock(&p->rlock);
}
unlock(r);
+
splx(s);
- return p != nil;
+
+ return p;
}
+
void
swiproc(Proc *p, int interp)
{
@@ -510,9 +686,6 @@
closesigs(o->sigs);
}
- /* Sched must not loop for this lock */
- lock(&procalloc);
-
/*
edfstop(up);
*/
@@ -540,8 +713,8 @@
snprint(tmp, sizeof(tmp), " /%.8lux", p->r);
else
*tmp = '\0';
- print("%p:%3ud:%14s pc %.8zux %s/%s qpc %.8zux pri %d%s\n",
- p, p->pid, p->text, p->pc, s, statename[p->state], p->qpc, p->pri, tmp);
+ print("%p:%3ud:%14s pc %.8zux %s/%s qpc %.8zux priority %d%s\n",
+ p, p->pid, p->text, p->pc, s, statename[p->state], p->qpc, p->priority, tmp);
}
void
@@ -567,7 +740,10 @@
Fgrp *fg;
Egrp *eg;
- p = newproc();
+ while((p = newproc()) == nil){
+/* TODO freebroken(); */
+ resrcwait("no procs for kproc");
+ }
p->psstate = 0;
p->kp = 1;
@@ -744,6 +920,9 @@
kstrdup(&up->env->user, name);
}
+/* TODO no idea what this rptproc() does
+ * something to do with repeat of tk actions
+ */
void
rptwakeup(void *o, void *ar)
{
--- a/os/port/qio.c
+++ b/os/port/qio.c
@@ -14,8 +14,9 @@
static ulong qcopycnt;
static int debugging;
+long readblist(Block *b, uchar *p, long n, ulong o);
-#define QDEBUG if(0)
+#define QDEBUG if(1)
/*
* IO queues
@@ -75,9 +76,9 @@
{
Block *next;
- for(; b != 0; b = next){
+ for(; b != nil; b = next){
next = b->next;
- b->next = 0;
+ b->next = nil;
freeb(b);
}
}
@@ -91,40 +92,31 @@
int n;
Block *nbp;
- QDEBUG checkb(bp, "padblock 1");
+ QDEBUG checkb(bp, "padblock 0");
if(size >= 0){
if(bp->rp - bp->base >= size){
bp->rp -= size;
return bp;
}
-
- if(bp->next)
- panic("padblock 0x%zuX", getcallerpc(&bp));
n = BLEN(bp);
- padblockcnt++;
nbp = allocb(size+n);
nbp->rp += size;
nbp->wp = nbp->rp;
memmove(nbp->wp, bp->rp, n);
nbp->wp += n;
- freeb(bp);
nbp->rp -= size;
} else {
size = -size;
-
- if(bp->next)
- panic("padblock 0x%zuX", getcallerpc(&bp));
-
if(bp->lim - bp->wp >= size)
return bp;
-
n = BLEN(bp);
- padblockcnt++;
- nbp = allocb(size+n);
+ nbp = allocb(n+size);
memmove(nbp->wp, bp->rp, n);
nbp->wp += n;
- freeb(bp);
}
+ nbp->next = bp->next;
+ freeb(bp);
+ padblockcnt++;
QDEBUG checkb(nbp, "padblock 1");
return nbp;
}
@@ -138,7 +130,7 @@
int len;
len = 0;
- while(bp) {
+ while(bp != nil) {
len += BLEN(bp);
bp = bp->next;
}
@@ -154,7 +146,7 @@
int len;
len = 0;
- while(bp) {
+ while(bp != nil) {
len += BALLOC(bp);
bp = bp->next;
}
@@ -162,7 +154,7 @@
}
/*
- * copy the string of blocks into
+ * copy the string of blocks into
* a single block and free the string
*/
Block*
@@ -169,21 +161,12 @@
concatblock(Block *bp)
{
int len;
- Block *nb, *f;
- if(bp->next == 0)
+ if(bp->next == nil)
return bp;
-
- nb = allocb(blocklen(bp));
- for(f = bp; f; f = f->next) {
- len = BLEN(f);
- memmove(nb->wp, f->rp, len);
- nb->wp += len;
- }
- concatblockcnt += BLEN(nb);
- freeblist(bp);
- QDEBUG checkb(nb, "concatblock 1");
- return nb;
+ len = blocklen(bp);
+ concatblockcnt += len;
+ return pullupblock(bp, len);
}
/*
@@ -192,8 +175,8 @@
Block*
pullupblock(Block *bp, int n)
{
- int i;
Block *nbp;
+ int i;
/*
* this should almost always be true, it's
@@ -216,22 +199,26 @@
* copy bytes from the trailing blocks into the first
*/
n -= BLEN(bp);
- while(nbp = bp->next){
+ while((nbp = bp->next) != nil){
+ pullupblockcnt++;
i = BLEN(nbp);
if(i > n) {
memmove(bp->wp, nbp->rp, n);
- pullupblockcnt++;
bp->wp += n;
nbp->rp += n;
QDEBUG checkb(bp, "pullupblock 1");
return bp;
- }
- else {
+ } else {
+ /* shouldn't happen but why crash if it does */
+ if(i < 0){
+ print("pullup negative length packet, called from %#p\n",
+ getcallerpc(&bp));
+ i = 0;
+ }
memmove(bp->wp, nbp->rp, i);
- pullupblockcnt++;
bp->wp += i;
bp->next = nbp->next;
- nbp->next = 0;
+ nbp->next = nil;
freeb(nbp);
n -= i;
if(n == 0){
@@ -241,7 +228,7 @@
}
}
freeb(bp);
- return 0;
+ return nil;
}
/*
@@ -271,7 +258,10 @@
Block *nb, *startb;
QDEBUG checkb(bp, "trimblock 1");
- if(blocklen(bp) < offset+len) {
+ l = blocklen(bp);
+ if(offset == 0 && len == l)
+ return bp;
+ if(l < offset+len) {
freeblist(bp);
return nil;
}
@@ -294,7 +284,7 @@
bp->wp -= (BLEN(bp) - len);
- if(bp->next) {
+ if(bp->next != nil) {
freeblist(bp->next);
bp->next = nil;
}
@@ -313,7 +303,7 @@
QDEBUG checkb(bp, "copyblock 0");
nbp = allocb(count);
- for(; count > 0 && bp != 0; bp = bp->next){
+ for(; count > 0 && bp != nil; bp = bp->next){
l = BLEN(bp);
if(l > count)
l = count;
@@ -411,11 +401,11 @@
iunlock(q);
return nil;
}
+ QDEBUG checkb(b, "qget");
q->bfirst = b->next;
- b->next = 0;
+ b->next = nil;
q->len -= BALLOC(b);
q->dlen -= BLEN(b);
- QDEBUG checkb(b, "qget");
/* if writer flow controlled, restart */
if((q->state & Qflow) && q->len < q->limit/2){
@@ -434,12 +424,11 @@
/*
* throw away the next 'len' bytes in the queue
- * returning the number actually discarded
*/
int
qdiscard(Queue *q, int len)
{
- Block *b;
+ Block *b, *tofree = nil;
int dowakeup, n, sofar;
ilock(q);
@@ -451,10 +440,12 @@
n = BLEN(b);
if(n <= len - sofar){
q->bfirst = b->next;
- b->next = 0;
q->len -= BALLOC(b);
q->dlen -= BLEN(b);
- freeb(b);
+
+ /* remember to free this */
+ b->next = tofree;
+ tofree = b;
} else {
n = len - sofar;
b->rp += n;
@@ -483,6 +474,9 @@
if(dowakeup)
wakeup(&q->wr);
+ if(tofree != nil)
+ freeblist(tofree);
+
return sofar;
}
@@ -492,10 +486,9 @@
int
qconsume(Queue *q, void *vp, int len)
{
- Block *b;
+ Block *b, *tofree = nil;
int n, dowakeup;
uchar *p = vp;
- Block *tofree = nil;
/* sync with qwrite */
ilock(q);
@@ -502,10 +495,10 @@
for(;;) {
b = q->bfirst;
- if(b == 0){
+ if(b == nil){
q->state |= Qstarve;
- iunlock(q);
- return -1;
+ len = -1;
+ goto out;
}
QDEBUG checkb(b, "qconsume 1");
@@ -520,10 +513,10 @@
tofree = b;
};
+ consumecnt += n;
if(n < len)
len = n;
memmove(p, b->rp, len);
- consumecnt += n;
b->rp += len;
q->dlen -= len;
@@ -530,7 +523,6 @@
/* discard the block if we're done with it */
if((q->state & Qmsg) || len == n){
q->bfirst = b->next;
- b->next = 0;
q->len -= BALLOC(b);
q->dlen -= BLEN(b);
@@ -539,6 +531,7 @@
tofree = b;
}
+out:
/* if writer flow controlled, restart */
if((q->state & Qflow) && q->len < q->limit/2){
q->state &= ~Qflow;
@@ -560,40 +553,23 @@
int
qpass(Queue *q, Block *b)
{
- int dlen, len, dowakeup;
+ int len, dowakeup;
/* sync with qread */
dowakeup = 0;
ilock(q);
if(q->len >= q->limit){
- freeblist(b);
iunlock(q);
+ freeblist(b);
return -1;
}
if(q->state & Qclosed){
- len = blocklen(b);
- freeblist(b);
iunlock(q);
- return len;
+ freeblist(b);
+ return 0;
}
- /* add buffer to queue */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- len = BALLOC(b);
- dlen = BLEN(b);
- QDEBUG checkb(b, "qpass");
- while(b->next){
- b = b->next;
- QDEBUG checkb(b, "qpass");
- len += BALLOC(b);
- dlen += BLEN(b);
- }
- q->blast = b;
- q->len += len;
- q->dlen += dlen;
+ len = qaddlist(q, b);
if(q->len >= q->limit/2)
q->state |= Qflow;
@@ -613,7 +589,7 @@
int
qpassnolim(Queue *q, Block *b)
{
- int dlen, len, dowakeup;
+ int len, dowakeup;
/* sync with qread */
dowakeup = 0;
@@ -620,28 +596,12 @@
ilock(q);
if(q->state & Qclosed){
- freeblist(b);
iunlock(q);
- return BALLOC(b);
+ freeblist(b);
+ return 0;
}
- /* add buffer to queue */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- len = BALLOC(b);
- dlen = BLEN(b);
- QDEBUG checkb(b, "qpass");
- while(b->next){
- b = b->next;
- QDEBUG checkb(b, "qpass");
- len += BALLOC(b);
- dlen += BLEN(b);
- }
- q->blast = b;
- q->len += len;
- q->dlen += dlen;
+ len = qaddlist(q, b);
if(q->len >= q->limit/2)
q->state |= Qflow;
@@ -668,8 +628,7 @@
Block **l, *nbp;
int n;
- for(l = &bp; *l; l = &(*l)->next){
- nbp = *l;
+ for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
n = BLEN(nbp);
if((n<<2) < BALLOC(nbp)){
*l = allocb(n);
@@ -690,6 +649,10 @@
int dowakeup;
uchar *p = vp;
+ b = iallocb(len);
+ if(b == nil)
+ return 0;
+
/* sync with qread */
dowakeup = 0;
ilock(q);
@@ -700,30 +663,12 @@
iunlock(q);
return -1;
}
+ producecnt += len;
/* save in buffer */
- /* use Qcoalesce here to save storage */
- b = q->blast;
- if((q->state & Qcoalesce)==0 || q->bfirst==nil || b->lim-b->wp < len){
- /* need a new block */
- b = iallocb(len);
- if(b == 0){
- iunlock(q);
- return 0;
- }
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->blast = b;
- /* b->next = 0; done by iallocb() */
- q->len += BALLOC(b);
- }
memmove(b->wp, p, len);
- producecnt += len;
b->wp += len;
- q->dlen += len;
- QDEBUG checkb(b, "qproduce");
+ qaddlist(q, b);
if(q->state & Qstarve){
q->state &= ~Qstarve;
@@ -746,49 +691,13 @@
Block*
qcopy(Queue *q, int len, ulong offset)
{
- int sofar;
- int n;
- Block *b, *nb;
- uchar *p;
+ Block *b;
- nb = allocb(len);
-
+ b = allocb(len);
ilock(q);
-
- /* go to offset */
- b = q->bfirst;
- for(sofar = 0; ; sofar += n){
- if(b == nil){
- iunlock(q);
- return nb;
- }
- n = BLEN(b);
- if(sofar + n > offset){
- p = b->rp + offset - sofar;
- n -= offset - sofar;
- break;
- }
- QDEBUG checkb(b, "qcopy");
- b = b->next;
- }
-
- /* copy bytes from there */
- for(sofar = 0; sofar < len;){
- if(n > len - sofar)
- n = len - sofar;
- memmove(nb->wp, p, n);
- qcopycnt += n;
- sofar += n;
- nb->wp += n;
- b = b->next;
- if(b == nil)
- break;
- n = BLEN(b);
- p = b->rp;
- }
+ b->wp += readblist(q->bfirst, b->wp, len, offset);
iunlock(q);
-
- return nb;
+ return b;
}
/*
@@ -800,13 +709,14 @@
Queue *q;
q = malloc(sizeof(Queue));
- if(q == 0)
- return 0;
+ if(q == nil)
+ return nil;
q->limit = q->inilim = limit;
q->kick = kick;
q->arg = arg;
q->state = msg;
+
q->state |= Qstarve;
q->eof = 0;
q->noblock = 0;
@@ -821,8 +731,8 @@
Queue *q;
q = malloc(sizeof(Queue));
- if(q == 0)
- return 0;
+ if(q == nil)
+ return nil;
q->limit = 0;
q->arg = arg;
@@ -837,7 +747,7 @@
{
Queue *q = a;
- return (q->state & Qclosed) || q->bfirst != 0;
+ return (q->state & Qclosed) || q->bfirst != nil;
}
/*
@@ -869,21 +779,34 @@
}
/*
- * add a block list to a queue
+ * add a block list to a queue, return bytes added
*/
-void
+int
qaddlist(Queue *q, Block *b)
{
+ int len, dlen;
+
+ QDEBUG checkb(b, "qaddlist 1");
+
/* queue the block */
- if(q->bfirst)
+ if(q->bfirst != nil)
q->blast->next = b;
else
q->bfirst = b;
- q->len += blockalloclen(b);
- q->dlen += blocklen(b);
- while(b->next)
+
+ len = BALLOC(b);
+ dlen = BLEN(b);
+ while(b->next != nil){
b = b->next;
+ QDEBUG checkb(b, "qaddlist 2");
+
+ len += BALLOC(b);
+ dlen += BLEN(b);
+ }
q->blast = b;
+ q->len += len;
+ q->dlen += dlen;
+ return dlen;
}
/*
@@ -897,77 +820,45 @@
b = q->bfirst;
if(b == nil)
return nil;
+ QDEBUG checkb(b, "qremove");
q->bfirst = b->next;
b->next = nil;
q->dlen -= BLEN(b);
q->len -= BALLOC(b);
- QDEBUG checkb(b, "qremove");
return b;
}
/*
* copy the contents of a string of blocks into
- * memory. emptied blocks are freed. return
- * pointer to first unconsumed block.
+ * memory from an offset. blocklist kept unchanged.
+ * return number of copied bytes.
*/
-Block*
-bl2mem(uchar *p, Block *b, int n)
+long
+readblist(Block *b, uchar *p, long n, ulong o)
{
- int i;
- Block *next;
+ ulong m, r;
- for(; b != nil; b = next){
- i = BLEN(b);
- if(i > n){
- memmove(p, b->rp, n);
- b->rp += n;
- return b;
+ r = 0;
+ while(n > 0 && b != nil){
+ m = BLEN(b);
+ if(o >= m)
+ o -= m;
+ else {
+ m -= o;
+ if(n < m)
+ m = n;
+ memmove(p, b->rp + o, m);
+ p += m;
+ r += m;
+ n -= m;
+ o = 0;
}
- memmove(p, b->rp, i);
- n -= i;
- p += i;
- b->rp += i;
- next = b->next;
- freeb(b);
+ b = b->next;
}
- return nil;
+ return r;
}
/*
- * copy the contents of memory into a string of blocks.
- * return nil on error.
- */
-Block*
-mem2bl(uchar *p, int len)
-{
- int n;
- Block *b, *first, **l;
-
- first = nil;
- l = &first;
- if(waserror()){
- freeblist(first);
- nexterror();
- }
- do {
- n = len;
- if(n > Maxatomic)
- n = Maxatomic;
-
- *l = b = allocb(n);
- setmalloctag(b, getcallerpc(&p));
- memmove(b->wp, p, n);
- b->wp += n;
- p += n;
- len -= n;
- l = &b->next;
- } while(len > 0);
- poperror();
-
- return first;
-}
-
-/*
* put a block back to the front of the queue
* called with q ilocked
*/
@@ -983,6 +874,35 @@
}
/*
+ * cut off n bytes from the end of *h. return a new
+ * block with the tail and change *h to refer to the
+ * head.
+ */
+static Block*
+splitblock(Block **h, int n)
+{
+ Block *a, *b;
+ int m;
+
+ a = *h;
+ m = BLEN(a) - n;
+ if(m < n){
+ b = allocb(m);
+ memmove(b->wp, a->rp, m);
+ b->wp += m;
+ a->rp += m;
+ *h = b;
+ return a;
+ } else {
+ b = allocb(n);
+ a->wp -= n;
+ memmove(b->wp, a->wp, n);
+ b->wp += n;
+ return b;
+ }
+}
+
+/*
* flow control, get producer going again
* called with q ilocked
*/
@@ -1001,7 +921,7 @@
/* wakeup flow controlled writers */
if(dowakeup){
- if(q->kick)
+ if(q->kick != nil)
q->kick(q->arg);
wakeup(&q->wr);
}
@@ -1013,10 +933,10 @@
Block*
qbread(Queue *q, int len)
{
- Block *b, *nb;
+ Block *b;
int n;
- qlock(&q->rlock);
+ eqlock(&q->rlock);
if(waserror()){
qunlock(&q->rlock);
nexterror();
@@ -1041,24 +961,21 @@
n = BLEN(b);
/* split block if it's too big and this is not a message queue */
- nb = b;
if(n > len){
- if((q->state&Qmsg) == 0){
- n -= len;
- b = allocb(n);
- memmove(b->wp, nb->rp+len, n);
- b->wp += n;
- qputback(q, b);
- }
- nb->wp = nb->rp + len;
+ n -= len;
+ if((q->state & Qmsg) == 0)
+ qputback(q, splitblock(&b, n));
+ else
+ b->wp -= n;
}
/* restart producer */
qwakeup_iunlock(q);
- poperror();
qunlock(&q->rlock);
- return nb;
+ poperror();
+
+ return b;
}
/*
@@ -1068,10 +985,10 @@
long
qread(Queue *q, void *vp, int len)
{
- Block *b, *first, **l;
+ Block *b, *first, **last;
int m, n;
- qlock(&q->rlock);
+ eqlock(&q->rlock);
if(waserror()){
qunlock(&q->rlock);
nexterror();
@@ -1093,32 +1010,29 @@
}
/* if we get here, there's at least one block in the queue */
+ last = &first;
if(q->state & Qcoalesce){
/* when coalescing, 0 length blocks just go away */
b = q->bfirst;
- if(BLEN(b) <= 0){
+ m = BLEN(b);
+ if(m <= 0){
freeb(qremove(q));
goto again;
}
/* grab the first block plus as many
- * following blocks as will completely
+ * following blocks as will partially
* fit in the read.
*/
n = 0;
- l = &first;
- m = BLEN(b);
for(;;) {
- *l = qremove(q);
- l = &b->next;
+ *last = qremove(q);
n += m;
-
- b = q->bfirst;
- if(b == nil)
+ if(n >= len || q->bfirst == nil)
break;
+ last = &b->next;
+ b = q->bfirst;
m = BLEN(b);
- if(n+m > len)
- break;
}
} else {
first = qremove(q);
@@ -1125,25 +1039,24 @@
n = BLEN(first);
}
- /* copy to user space outside of the ilock */
- iunlock(q);
- b = bl2mem(vp, first, len);
- ilock(q);
+ /* split last block if it's too big and this is not a message queue */
+ if(n > len && (q->state & Qmsg) == 0)
+ qputback(q, splitblock(last, n - len));
- /* take care of any left over partial block */
- if(b != nil){
- n -= BLEN(b);
- if(q->state & Qmsg)
- freeb(b);
- else
- qputback(q, b);
- }
-
/* restart producer */
qwakeup_iunlock(q);
- poperror();
qunlock(&q->rlock);
+ poperror();
+
+ if(waserror()){
+ freeblist(first);
+ nexterror();
+ }
+ n = readblist(first, vp, len, 0);
+ freeblist(first);
+ poperror();
+
return n;
}
@@ -1155,8 +1068,31 @@
return q->len < q->limit || (q->state & Qclosed);
}
-ulong noblockcnt;
+/*
+ * flow control, wait for queue to get below the limit
+ */
+static void
+qflow(Queue *q)
+{
+ for(;;){
+ if(q->noblock || qnotfull(q))
+ break;
+ ilock(q);
+ q->state |= Qflow;
+ iunlock(q);
+
+ eqlock(&q->wlock);
+ if(waserror()){
+ qunlock(&q->wlock);
+ nexterror();
+ }
+ sleep(&q->wr, qnotfull, q);
+ qunlock(&q->wlock);
+ poperror();
+ }
+}
+
/*
* add a block to a queue obeying flow control
*/
@@ -1163,24 +1099,20 @@
long
qbwrite(Queue *q, Block *b)
{
- int n, dowakeup;
+ int len, dowakeup;
+ Proc *p;
- n = BLEN(b);
-
- if(q->bypass){
+ if(q->bypass != nil){
+ len = blocklen(b);
(*q->bypass)(q->arg, b);
- return n;
+ return len;
}
dowakeup = 0;
- qlock(&q->wlock);
if(waserror()){
- if(b != nil)
- freeb(b);
- qunlock(&q->wlock);
+ freeblist(b);
nexterror();
}
-
ilock(q);
/* give up if the queue is closed */
@@ -1189,29 +1121,16 @@
error(q->err);
}
- /* if nonblocking, don't queue over the limit */
- if(q->len >= q->limit){
- if(q->noblock){
- iunlock(q);
- freeb(b);
- noblockcnt += n;
- qunlock(&q->wlock);
- poperror();
- return n;
- }
+ /* don't queue over the limit */
+ if(q->len >= q->limit && q->noblock){
+ iunlock(q);
+ poperror();
+ len = blocklen(b);
+ freeblist(b);
+ return len;
}
- /* queue the block */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->blast = b;
- b->next = 0;
- q->len += BALLOC(b);
- q->dlen += n;
- QDEBUG checkb(b, "qbwrite");
- b = nil;
+ len = qaddlist(q, b);
/* make sure other end gets awakened */
if(q->state & Qstarve){
@@ -1219,41 +1138,31 @@
dowakeup = 1;
}
iunlock(q);
+ poperror();
/* get output going again */
- if(q->kick && (dowakeup || (q->state&Qkick)))
+ if(q->kick != nil && (dowakeup || (q->state&Qkick)))
q->kick(q->arg);
/* wakeup anyone consuming at the other end */
- if(dowakeup)
- wakeup(&q->rr);
+ if(dowakeup){
+ p = wakeup(&q->rr);
+ /* if we just wokeup a higher priority process, let it run */
+ if(p != nil && p->priority > up->priority)
+ sched();
+ }
+
/*
- * flow control, wait for queue to get below the limit
- * before allowing the process to continue and queue
- * more. We do this here so that postnote can only
- * interrupt us after the data has been queued. This
- * means that things like 9p flushes and ssl messages
- * will not be disrupted by software interrupts.
- *
- * Note - this is moderately dangerous since a process
- * that keeps getting interrupted and rewriting will
- * queue infinite crud.
+ * flow control, before allowing the process to continue and
+ * queue more. We do this here so that postnote can only
+ * interrupt us after the data has been queued. This means that
+ * things like 9p flushes and ssl messages will not be disrupted
+ * by software interrupts.
*/
- for(;;){
- if(q->noblock || qnotfull(q))
- break;
+ qflow(q);
- ilock(q);
- q->state |= Qflow;
- iunlock(q);
- sleep(&q->wr, qnotfull, q);
- }
- USED(b);
-
- qunlock(&q->wlock);
- poperror();
- return n;
+ return len;
}
/*
@@ -1267,8 +1176,18 @@
uchar *p = vp;
QDEBUG if(!islo())
- print("qwrite hi %zux\n", getcallerpc(&q));
+ print("qwrite hi %#p\n", getcallerpc(&q));
+ /* stop queue bloat before allocating blocks */
+ if(q->len/2 >= q->limit && q->noblock == 0 && q->bypass == nil){
+ while(waserror()){
+ if(up->procctl == Proc_exitme || up->procctl == Proc_exitbig)
+ error(Egreg);
+ }
+ qflow(q);
+ poperror();
+ }
+
sofar = 0;
do {
n = len-sofar;
@@ -1276,7 +1195,6 @@
n = Maxatomic;
b = allocb(n);
- setmalloctag(b, getcallerpc(&q));
if(waserror()){
freeb(b);
nexterror();
@@ -1285,9 +1203,7 @@
poperror();
b->wp += n;
- qbwrite(q, b);
-
- sofar += n;
+ sofar += qbwrite(q, b);
} while(sofar < len && (q->state & Qmsg) == 0);
return len;
@@ -1320,15 +1236,17 @@
ilock(q);
- QDEBUG checkb(b, "qiwrite");
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->blast = b;
- q->len += BALLOC(b);
- q->dlen += n;
+ /* we use an artificially high limit for kernel prints since anything
+ * over the limit gets dropped
+ */
+ if((q->state & Qclosed) != 0 || q->len/2 >= q->limit){
+ iunlock(q);
+ freeb(b);
+ break;
+ }
+ qaddlist(q, b);
+
if(q->state & Qstarve){
q->state &= ~Qstarve;
dowakeup = 1;
@@ -1337,7 +1255,7 @@
iunlock(q);
if(dowakeup){
- if(q->kick)
+ if(q->kick != nil)
q->kick(q->arg);
wakeup(&q->rr);
}
@@ -1375,9 +1293,9 @@
ilock(q);
q->state |= Qclosed;
q->state &= ~(Qflow|Qstarve);
- strcpy(q->err, Ehungup);
+ kstrcpy(q->err, Ehungup, ERRMAX);
bfirst = q->bfirst;
- q->bfirst = 0;
+ q->bfirst = nil;
q->len = 0;
q->dlen = 0;
q->noblock = 0;
@@ -1401,10 +1319,9 @@
/* mark it */
ilock(q);
q->state |= Qclosed;
- if(msg == 0 || *msg == 0)
- strcpy(q->err, Ehungup);
- else
- strncpy(q->err, msg, ERRMAX-1);
+ if(msg == nil || *msg == '\0')
+ msg = Ehungup;
+ kstrcpy(q->err, msg, ERRMAX);
iunlock(q);
/* wake up readers/writers */
@@ -1464,7 +1381,7 @@
int
qcanread(Queue *q)
{
- return q->bfirst!=0;
+ return q->bfirst != nil;
}
/*
@@ -1496,7 +1413,7 @@
/* mark it */
ilock(q);
bfirst = q->bfirst;
- q->bfirst = 0;
+ q->bfirst = nil;
q->len = 0;
q->dlen = 0;
iunlock(q);
@@ -1504,7 +1421,7 @@
/* free queued blocks */
freeblist(bfirst);
- /* wake up readers/writers */
+ /* wake up writers */
wakeup(&q->wr);
}
@@ -1512,18 +1429,4 @@
qfull(Queue *q)
{
return q->state & Qflow;
-}
-
-int
-qstate(Queue *q)
-{
- return q->state;
-}
-
-void
-qdump(Queue *q)
-{
- if(q)
- iprint("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
- q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
}