Package SimPy :: Module Lib
[hide private]
[frames] | no frames]

Source Code for Module SimPy.Lib

  1  """ 
  2  This file contains Process, SimEvent, the resources Resource, Leven and Storage  
  3  as well as their dependencies Buffer, Queue, FIFO and PriorityQ. 
  4  """ 
  5  # $Revision: 304 $ $Date: 2009-04-02 06:01:15 +0200 (Do, 02 Apr 2009) $ kgm 
  6  # SimPy version: 2.0 
  7  import inspect 
  8  import new 
  9  import sys 
 10  import types 
 11   
 12  from SimPy.Lister import Lister 
 13  from SimPy.Recording import Monitor, Tally 
 14   
 15  # Required for backward compatiblity 
 16  import SimPy.Globals as Globals 
 17   
 18   
19 -class Process(Lister):
20 """Superclass of classes which may use generator functions"""
21 - def __init__(self, name = 'a_process', sim = None):
22 if not sim: sim = Globals.sim # Use global simulation object if sim is None 23 self.sim = sim 24 #the reference to this Process instances single process (==generator) 25 self._nextpoint = None 26 self.name = name 27 self._nextTime = None #next activation time 28 self._remainService = 0 29 self._preempted = 0 30 self._priority={} 31 self._getpriority={} 32 self._putpriority={} 33 self._terminated = False 34 self._inInterrupt = False 35 self.eventsFired = [] #which events process waited / queued for occurred 36 if hasattr(sim, 'trace'): 37 self._doTracing = True 38 else: 39 self._doTracing = False
40
41 - def active(self):
42 return self._nextTime <> None and not self._inInterrupt
43
44 - def passive(self):
45 return self._nextTime is None and not self._terminated
46
47 - def terminated(self):
48 return self._terminated
49
50 - def interrupted(self):
51 return self._inInterrupt and not self._terminated
52
53 - def queuing(self, resource):
54 return self in resource.waitQ
55
56 - def cancel(self, victim):
57 """Application function to cancel all event notices for this Process 58 instance;(should be all event notices for the _generator_).""" 59 self.sim._e._unpost(whom = victim)
60
61 - def start(self, pem = None, at = 'undefined', delay = 'undefined', prior = False):
62 """Activates PEM of this Process. 63 p.start(p.pemname([args])[,{at = t | delay = period}][, prior = False]) or 64 p.start([p.ACTIONS()][,{at = t | delay = period}][, prior = False]) (ACTIONS 65 parameter optional) 66 """ 67 if pem is None: 68 try: 69 pem = self.ACTIONS() 70 except AttributeError: 71 raise FatalSimerror\ 72 ('Fatal SimPy error: no generator function to activate') 73 else: 74 pass 75 if self.sim._e is None: 76 raise FatalSimerror\ 77 ('Fatal SimPy error: simulation is not initialized'\ 78 '(call initialize() first)') 79 if not (type(pem) == types.GeneratorType): 80 raise FatalSimerror('Fatal SimPy error: activating function which'+ 81 ' is not a generator (contains no \'yield\')') 82 if not self._terminated and not self._nextTime: 83 #store generator reference in object; needed for reactivation 84 self._nextpoint = pem 85 if at == 'undefined': 86 at = self.sim._t 87 if delay == 'undefined': 88 zeit = max(self.sim._t, at) 89 else: 90 zeit = max(self.sim._t, self.sim._t + delay) 91 if self._doTracing: 92 self.sim.trace.recordActivate(who = self, when = zeit, 93 prior = prior) 94 self.sim._e._post(what = self, at = zeit, prior = prior)
95
96 - def _hold(self, a):
97 if len(a[0]) == 3: 98 delay = abs(a[0][2]) 99 else: 100 delay = 0 101 who = a[1] 102 self.interruptLeft = delay 103 self._inInterrupt = False 104 self.interruptCause = None 105 self.sim._e._post(what = who, at = self.sim._t + delay)
106
107 - def _passivate(self, a):
108 a[0][1]._nextTime = None
109
110 - def interrupt(self, victim):
111 """Application function to interrupt active processes""" 112 # can't interrupt terminated / passive / interrupted process 113 if victim.active(): 114 if self._doTracing: 115 save = self.sim.trace._comment 116 self.sim.trace._comment = None 117 victim.interruptCause = self # self causes interrupt 118 left = victim._nextTime - self.sim._t 119 victim.interruptLeft = left # time left in current 'hold' 120 victim._inInterrupt = True 121 self.sim.reactivate(victim) 122 if self._doTracing: 123 self.sim.trace._comment = save 124 self.sim.trace.recordInterrupt(self, victim) 125 return left 126 else: #victim not active -- can't interrupt 127 return None
128
129 - def interruptReset(self):
130 """ 131 Application function for an interrupt victim to get out of 132 'interrupted' state. 133 """ 134 self._inInterrupt = False
135
136 - def acquired(self, res):
137 """Multi - functional test for reneging for 'request' and 'get': 138 (1)If res of type Resource: 139 Tests whether resource res was acquired when proces reactivated. 140 If yes, the parallel wakeup process is killed. 141 If not, process is removed from res.waitQ (reneging). 142 (2)If res of type Store: 143 Tests whether item(s) gotten from Store res. 144 If yes, the parallel wakeup process is killed. 145 If no, process is removed from res.getQ 146 (3)If res of type Level: 147 Tests whether units gotten from Level res. 148 If yes, the parallel wakeup process is killed. 149 If no, process is removed from res.getQ. 150 """ 151 if isinstance(res, Resource): 152 test = self in res.activeQ 153 if test: 154 self.cancel(self._holder) 155 else: 156 res.waitQ.remove(self) 157 if res.monitored: 158 res.waitMon.observe(len(res.waitQ),t = self.sim.now()) 159 return test 160 elif isinstance(res, Store): 161 test = len(self.got) 162 if test: 163 self.cancel(self._holder) 164 else: 165 res.getQ.remove(self) 166 if res.monitored: 167 res.getQMon.observe(len(res.getQ),t = self.sim.now()) 168 return test 169 elif isinstance(res, Level): 170 test = not (self.got is None) 171 if test: 172 self.cancel(self._holder) 173 else: 174 res.getQ.remove(self) 175 if res.monitored: 176 res.getQMon.observe(len(res.getQ),t = self.sim.now()) 177 return test
178
179 - def stored(self, buffer):
180 """Test for reneging for 'yield put . . .' compound statement (Level and 181 Store. Returns True if not reneged. 182 If self not in buffer.putQ, kill wakeup process, else take self out of 183 buffer.putQ (reneged)""" 184 test = self in buffer.putQ 185 if test: #reneged 186 buffer.putQ.remove(self) 187 if buffer.monitored: 188 buffer.putQMon.observe(len(buffer.putQ),t = self.sim.now()) 189 else: 190 self.cancel(self._holder) 191 return not test
192 193
194 -class SimEvent(Lister):
195 """Supports one - shot signalling between processes. All processes waiting for an event to occur 196 get activated when its occurrence is signalled. From the processes queuing for an event, only 197 the first gets activated. 198 """
199 - def __init__(self, name = 'a_SimEvent', sim = None):
200 if not sim: sim = Globals.sim # Use global simulation if sim is None 201 self.sim = sim 202 self.name = name 203 self.waits = [] 204 self.queues = [] 205 self.occurred = False 206 self.signalparam = None 207 if hasattr(sim, 'trace'): 208 self._doTracing = True 209 else: 210 self._doTracing = False
211
212 - def signal(self, param = None):
213 """Produces a signal to self; 214 Fires this event (makes it occur). 215 Reactivates ALL processes waiting for this event. (Cleanup waits lists 216 of other events if wait was for an event - group (OR).) 217 Reactivates the first process for which event(s) it is queuing for 218 have fired. (Cleanup queues of other events if wait was for an event - group (OR).) 219 """ 220 self.signalparam = param 221 if self._doTracing: 222 self.sim.trace.recordSignal(self) 223 if not self.waits and not self.queues: 224 self.occurred = True 225 else: 226 #reactivate all waiting processes 227 for p in self.waits: 228 p[0].eventsFired.append(self) 229 self.sim.reactivate(p[0], prior = True) 230 #delete waits entries for this process in other events 231 for ev in p[1]: 232 if ev != self: 233 if ev.occurred: 234 p[0].eventsFired.append(ev) 235 for iev in ev.waits: 236 if iev[0] == p[0]: 237 ev.waits.remove(iev) 238 break 239 self.waits = [] 240 if self.queues: 241 proc = self.queues.pop(0)[0] 242 proc.eventsFired.append(self) 243 self.sim.reactivate(proc)
244
245 - def _wait(self, par):
246 """Consumes a signal if it has occurred, otherwise process 'proc' 247 waits for this event. 248 """ 249 proc = par[0][1] #the process issuing the yield waitevent command 250 proc.eventsFired = [] 251 if not self.occurred: 252 self.waits.append([proc, [self]]) 253 proc._nextTime = None #passivate calling process 254 else: 255 proc.eventsFired.append(self) 256 self.occurred = False 257 self.sim._e._post(proc, at = self.sim._t, prior = 1)
258
259 - def _waitOR(self, par):
260 """Handles waiting for an OR of events in a tuple / list. 261 """ 262 proc = par[0][1] 263 evlist = par[0][2] 264 proc.eventsFired = [] 265 anyoccur = False 266 for ev in evlist: 267 if ev.occurred: 268 anyoccur = True 269 proc.eventsFired.append(ev) 270 ev.occurred = False 271 if anyoccur: #at least one event has fired; continue process 272 self.sim._e._post(proc, at = self.sim._t, prior = 1) 273 274 else: #no event in list has fired, enter process in all 'waits' lists 275 proc.eventsFired = [] 276 proc._nextTime = None #passivate calling process 277 for ev in evlist: 278 ev.waits.append([proc, evlist])
279
280 - def _queue(self, par):
281 """Consumes a signal if it has occurred, otherwise process 'proc' 282 queues for this event. 283 """ 284 proc = par[0][1] #the process issuing the yield queueevent command 285 proc.eventsFired = [] 286 if not self.occurred: 287 self.queues.append([proc, [self]]) 288 proc._nextTime = None #passivate calling process 289 else: 290 proc.eventsFired.append(self) 291 self.occurred = False 292 self.sim._e._post(proc, at = self.sim._t, prior = 1)
293
294 - def _queueOR(self, par):
295 """Handles queueing for an OR of events in a tuple / list. 296 """ 297 proc = par[0][1] 298 evlist = par[0][2] 299 proc.eventsFired = [] 300 anyoccur = False 301 for ev in evlist: 302 if ev.occurred: 303 anyoccur = True 304 proc.eventsFired.append(ev) 305 ev.occurred = False 306 if anyoccur: #at least one event has fired; continue process 307 self.sim._e._post(proc, at = self.sim._t, prior = 1) 308 309 else: #no event in list has fired, enter process in all 'waits' lists 310 proc.eventsFired = [] 311 proc._nextTime = None #passivate calling process 312 for ev in evlist: 313 ev.queues.append([proc, evlist])
314 315
316 -class Queue(list):
317 - def __init__(self, res, moni):
318 if not moni is None: #moni == []: 319 self.monit = True # True if a type of Monitor / Tally attached 320 else: 321 self.monit = False 322 self.moni = moni # The Monitor / Tally 323 self.resource = res # the resource / buffer this queue belongs to
324
325 - def enter(self, obj):
326 pass
327
328 - def leave(self):
329 pass
330
331 - def takeout(self, obj):
332 self.remove(obj) 333 if self.monit: 334 self.moni.observe(len(self), t = self.moni.sim.now())
335
336 -class FIFO(Queue):
337 - def __init__(self, res, moni):
338 Queue.__init__(self, res, moni)
339
340 - def enter(self, obj):
341 self.append(obj) 342 if self.monit: 343 self.moni.observe(len(self),t = self.moni.sim.now())
344
345 - def enterGet(self, obj):
346 self.enter(obj)
347
348 - def enterPut(self, obj):
349 self.enter(obj)
350
351 - def leave(self):
352 a = self.pop(0) 353 if self.monit: 354 self.moni.observe(len(self),t = self.moni.sim.now()) 355 return a
356
357 -class PriorityQ(FIFO):
358 """Queue is always ordered according to priority. 359 Higher value of priority attribute == higher priority. 360 """
361 - def __init__(self, res, moni):
362 FIFO.__init__(self, res, moni)
363
364 - def enter(self, obj):
365 """Handles request queue for Resource""" 366 if len(self): 367 ix = self.resource 368 if self[-1]._priority[ix] >= obj._priority[ix]: 369 self.append(obj) 370 else: 371 z = 0 372 while self[z]._priority[ix] >= obj._priority[ix]: 373 z += 1 374 self.insert(z, obj) 375 else: 376 self.append(obj) 377 if self.monit: 378 self.moni.observe(len(self),t = self.moni.sim.now())
379
380 - def enterGet(self, obj):
381 """Handles getQ in Buffer""" 382 if len(self): 383 ix = self.resource 384 #print 'priority:', [x._priority[ix] for x in self] 385 if self[-1]._getpriority[ix] >= obj._getpriority[ix]: 386 self.append(obj) 387 else: 388 z = 0 389 while self[z]._getpriority[ix] >= obj._getpriority[ix]: 390 z += 1 391 self.insert(z, obj) 392 else: 393 self.append(obj) 394 if self.monit: 395 self.moni.observe(len(self),t = self.moni.sim.now())
396
397 - def enterPut(self, obj):
398 """Handles putQ in Buffer""" 399 if len(self): 400 ix = self.resource 401 #print 'priority:', [x._priority[ix] for x in self] 402 if self[-1]._putpriority[ix] >= obj._putpriority[ix]: 403 self.append(obj) 404 else: 405 z = 0 406 while self[z]._putpriority[ix] >= obj._putpriority[ix]: 407 z += 1 408 self.insert(z, obj) 409 else: 410 self.append(obj) 411 if self.monit: 412 self.moni.observe(len(self),t = self.moni.sim.now())
413
414 -class Resource(Lister):
415 """Models shared, limited capacity resources with queuing; 416 FIFO is default queuing discipline. 417 """ 418
419 - def __init__(self, capacity = 1, name = 'a_resource', unitName = 'units', 420 qType = FIFO, preemptable = 0, monitored = False, 421 monitorType = Monitor,sim=None):
422 """ 423 monitorType={Monitor(default) | Tally} 424 """ 425 426 if not sim: sim = Globals.sim # Use global simulation if sim is Non 427 self.sim = sim 428 self.name = name # resource name 429 self.capacity = capacity # resource units in this resource 430 self.unitName = unitName # type name of resource units 431 self.n = capacity # uncommitted resource units 432 self.monitored = monitored 433 434 if self.monitored: # Monitor waitQ, activeQ 435 self.actMon = monitorType(name = 'Active Queue Monitor %s'%self.name, 436 ylab = 'nr in queue', tlab = 'time', 437 sim=self.sim) 438 monact = self.actMon 439 self.waitMon = monitorType(name = 'Wait Queue Monitor %s'%self.name, 440 ylab = 'nr in queue', tlab = 'time', 441 sim=self.sim) 442 monwait = self.waitMon 443 else: 444 monwait = None 445 monact = None 446 self.waitQ = qType(self, monwait) 447 self.preemptable = preemptable 448 self.activeQ = qType(self, monact) 449 self.priority_default = 0
450
451 - def _request(self, arg):
452 """Process request event for this resource""" 453 obj = arg[1] 454 if len(arg[0]) == 4: # yield request, self, resource, priority 455 obj._priority[self] = arg[0][3] 456 else: # yield request, self, resource 457 obj._priority[self] = self.priority_default 458 if self.preemptable and self.n == 0: # No free resource 459 # test for preemption condition 460 preempt = obj._priority[self] > self.activeQ[-1]._priority[self] 461 # If yes: 462 if preempt: 463 z = self.activeQ[-1] 464 # Keep track of preempt level 465 z._preempted += 1 466 # suspend lowest priority process being served 467 # record remaining service time at first preempt only 468 if z._preempted == 1: 469 z._remainService = z._nextTime - self.sim._t 470 # cancel only at first preempt 471 Process(sim=self.sim).cancel(z) 472 # remove from activeQ 473 self.activeQ.remove(z) 474 # put into front of waitQ 475 self.waitQ.insert(0, z) 476 # if self is monitored, update waitQ monitor 477 if self.monitored: 478 self.waitMon.observe(len(self.waitQ), self.sim.now()) 479 # passivate re - queued process 480 z._nextTime = None 481 # assign resource unit to preemptor 482 self.activeQ.enter(obj) 483 # post event notice for preempting process 484 self.sim._e._post(obj, at = self.sim._t, prior = 1) 485 else: 486 self.waitQ.enter(obj) 487 # passivate queuing process 488 obj._nextTime = None 489 else: # treat non - preemption case 490 if self.n == 0: 491 self.waitQ.enter(obj) 492 # passivate queuing process 493 obj._nextTime = None 494 else: 495 self.n -= 1 496 self.activeQ.enter(obj) 497 self.sim._e._post(obj, at = self.sim._t, prior = 1)
498
499 - def _release(self, arg):
500 """Process release request for this resource""" 501 actor = arg[1] 502 self.n += 1 503 self.activeQ.remove(arg[1]) 504 if self.monitored: 505 self.actMon.observe(len(self.activeQ),t = self.sim.now()) 506 #reactivate first waiting requestor if any; assign Resource to it 507 if self.waitQ: 508 obj = self.waitQ.leave() 509 self.n -= 1 #assign 1 resource unit to object 510 self.activeQ.enter(obj) 511 # if resource preemptable: 512 if self.preemptable: 513 # if object had been preempted: 514 if obj._preempted: 515 # keep track of preempt level 516 obj._preempted -= 1 517 # reactivate object delay = remaining service time 518 # but only, if all other preempts are over 519 if obj._preempted == 0: 520 self.sim.reactivate(obj, delay = obj._remainService, 521 prior = 1) 522 # else reactivate right away 523 else: 524 self.sim.reactivate(obj, delay = 0, prior = 1) 525 # else: 526 else: 527 self.sim.reactivate(obj, delay = 0, prior = 1) 528 self.sim._e._post(arg[1], at = self.sim._t, prior = 1)
529
530 -class Buffer(Lister):
531 """Abstract class for buffers 532 Blocks a process when a put would cause buffer overflow or a get would cause 533 buffer underflow. 534 Default queuing discipline for blocked processes is FIFO.""" 535 536 priorityDefault = 0
537 - def __init__(self, name = None, capacity = 'unbounded', unitName = 'units', 538 putQType = FIFO, getQType = FIFO, 539 monitored = False, monitorType = Monitor, initialBuffered = None, 540 sim = None):
541 if not sim: sim = Globals.sim # Use global simulation if sim is None 542 self.sim = sim 543 if capacity == 'unbounded': capacity = sys.maxint 544 self.capacity = capacity 545 self.name = name 546 self.putQType = putQType 547 self.getQType = getQType 548 self.monitored = monitored 549 self.initialBuffered = initialBuffered 550 self.unitName = unitName 551 if self.monitored: 552 ## monitor for Producer processes' queue 553 self.putQMon = monitorType(name = 'Producer Queue Monitor %s'%self.name, 554 ylab = 'nr in queue', tlab = 'time', 555 sim=self.sim) 556 ## monitor for Consumer processes' queue 557 self.getQMon = monitorType(name = 'Consumer Queue Monitor %s'%self.name, 558 ylab = 'nr in queue', tlab = 'time', 559 sim=self.sim) 560 ## monitor for nr items in buffer 561 self.bufferMon = monitorType(name = 'Buffer Monitor %s'%self.name, 562 ylab = 'nr in buffer', tlab = 'time', 563 sim=self.sim) 564 else: 565 self.putQMon = None 566 self.getQMon = None 567 self.bufferMon = None 568 self.putQ = self.putQType(res = self, moni = self.putQMon) 569 self.getQ = self.getQType(res = self, moni = self.getQMon) 570 if self.monitored: 571 self.putQMon.observe(y = len(self.putQ),t = self.sim.now()) 572 self.getQMon.observe(y = len(self.getQ),t = self.sim.now()) 573 self._putpriority={} 574 self._getpriority={} 575 576 def _put(self): 577 pass
578 def _get(self): 579 pass
580
581 -class Level(Buffer):
582 """Models buffers for processes putting / getting un - distinguishable items. 583 """
584 - def getamount(self):
585 return self.nrBuffered
586
587 - def gettheBuffer(self):
588 return self.nrBuffered
589 590 theBuffer = property(gettheBuffer) 591
592 - def __init__(self,**pars):
593 Buffer.__init__(self,**pars) 594 if self.name is None: 595 self.name = 'a_level' ## default name 596 597 if (type(self.capacity) != type(1.0) and\ 598 type(self.capacity) != type(1)) or\ 599 self.capacity < 0: 600 raise FatalSimerror\ 601 ('Level: capacity parameter not a positive number: %s'\ 602 %self.initialBuffered) 603 604 if type(self.initialBuffered) == type(1.0) or\ 605 type(self.initialBuffered) == type(1): 606 if self.initialBuffered > self.capacity: 607 raise FatalSimerror('initialBuffered exceeds capacity') 608 if self.initialBuffered >= 0: 609 self.nrBuffered = self.initialBuffered ## nr items initially in buffer 610 ## buffer is just a counter (int type) 611 else: 612 raise FatalSimerror\ 613 ('initialBuffered param of Level negative: %s'\ 614 %self.initialBuffered) 615 elif self.initialBuffered is None: 616 self.initialBuffered = 0 617 self.nrBuffered = 0 618 else: 619 raise FatalSimerror\ 620 ('Level: wrong type of initialBuffered (parameter=%s)'\ 621 %self.initialBuffered) 622 if self.monitored: 623 self.bufferMon.observe(y = self.amount, t = self.sim.now())
624 amount = property(getamount) 625
626 - def _put(self, arg):
627 """Handles put requests for Level instances""" 628 obj = arg[1] 629 whichSim=self.sim 630 if len(arg[0]) == 5: # yield put, self, buff, whattoput, priority 631 obj._putpriority[self] = arg[0][4] 632 whatToPut = arg[0][3] 633 elif len(arg[0]) == 4: # yield get, self, buff, whattoput 634 obj._putpriority[self] = Buffer.priorityDefault #default 635 whatToPut = arg[0][3] 636 else: # yield get, self, buff 637 obj._putpriority[self] = Buffer.priorityDefault #default 638 whatToPut = 1 639 if type(whatToPut) != type(1) and type(whatToPut) != type(1.0): 640 raise FatalSimerror('Level: put parameter not a number') 641 if not whatToPut >= 0.0: 642 raise FatalSimerror('Level: put parameter not positive number') 643 whatToPutNr = whatToPut 644 if whatToPutNr + self.amount > self.capacity: 645 obj._nextTime = None #passivate put requestor 646 obj._whatToPut = whatToPutNr 647 self.putQ.enterPut(obj) #and queue, with size of put 648 else: 649 self.nrBuffered += whatToPutNr 650 if self.monitored: 651 self.bufferMon.observe(y = self.amount, t = self.sim.now()) 652 # service any getters waiting 653 # service in queue - order; do not serve second in queue before first 654 # has been served 655 while len(self.getQ) and self.amount > 0: 656 proc = self.getQ[0] 657 if proc._nrToGet <= self.amount: 658 proc.got = proc._nrToGet 659 self.nrBuffered -= proc.got 660 if self.monitored: 661 self.bufferMon.observe(y = self.amount, t = self.sim.now()) 662 self.getQ.takeout(proc) # get requestor's record out of queue 663 whichSim._e._post(proc, at = whichSim._t) # continue a blocked get requestor 664 else: 665 break 666 whichSim._e._post(obj, at = whichSim._t, prior = 1) # continue the put requestor
667
668 - def _get(self, arg):
669 """Handles get requests for Level instances""" 670 obj = arg[1] 671 obj.got = None 672 if len(arg[0]) == 5: # yield get, self, buff, whattoget, priority 673 obj._getpriority[self] = arg[0][4] 674 nrToGet = arg[0][3] 675 elif len(arg[0]) == 4: # yield get, self, buff, whattoget 676 obj._getpriority[self] = Buffer.priorityDefault #default 677 nrToGet = arg[0][3] 678 else: # yield get, self, buff 679 obj._getpriority[self] = Buffer.priorityDefault 680 nrToGet = 1 681 if type(nrToGet) != type(1.0) and type(nrToGet) != type(1): 682 raise FatalSimerror\ 683 ('Level: get parameter not a number: %s'%nrToGet) 684 if nrToGet < 0: 685 raise FatalSimerror\ 686 ('Level: get parameter not positive number: %s'%nrToGet) 687 if self.amount < nrToGet: 688 obj._nrToGet = nrToGet 689 self.getQ.enterGet(obj) 690 # passivate queuing process 691 obj._nextTime = None 692 else: 693 obj.got = nrToGet 694 self.nrBuffered -= nrToGet 695 if self.monitored: 696 self.bufferMon.observe(y = self.amount, t = self.sim.now()) 697 self.sim._e._post(obj, at = self.sim._t, prior = 1) 698 # reactivate any put requestors for which space is now available 699 # service in queue - order; do not serve second in queue before first 700 # has been served 701 while len(self.putQ): #test for queued producers 702 proc = self.putQ[0] 703 if proc._whatToPut + self.amount <= self.capacity: 704 self.nrBuffered += proc._whatToPut 705 if self.monitored: 706 self.bufferMon.observe(y = self.amount, t = self.sim.now()) 707 self.putQ.takeout(proc)#requestor's record out of queue 708 self.sim._e._post(proc, at = self.sim._t) # continue a blocked put requestor 709 else: 710 break
711
712 -class Store(Buffer):
713 """Models buffers for processes coupled by putting / getting distinguishable 714 items. 715 Blocks a process when a put would cause buffer overflow or a get would cause 716 buffer underflow. 717 Default queuing discipline for blocked processes is priority FIFO. 718 """
719 - def getnrBuffered(self):
720 return len(self.theBuffer)
721 nrBuffered = property(getnrBuffered) 722
723 - def getbuffered(self):
724 return self.theBuffer
725 buffered = property(getbuffered) 726
727 - def __init__(self,**pars):
728 Buffer.__init__(self,**pars) 729 self.theBuffer = [] 730 if self.name is None: 731 self.name = 'a_store' ## default name 732 if type(self.capacity) != type(1) or self.capacity <= 0: 733 raise FatalSimerror\ 734 ('Store: capacity parameter not a positive integer > 0: %s'\ 735 %self.initialBuffered) 736 if type(self.initialBuffered) == type([]): 737 if len(self.initialBuffered) > self.capacity: 738 raise FatalSimerror('initialBuffered exceeds capacity') 739 else: 740 self.theBuffer[:] = self.initialBuffered##buffer == list of objects 741 elif self.initialBuffered is None: 742 self.theBuffer = [] 743 else: 744 raise FatalSimerror\ 745 ('Store: initialBuffered not a list') 746 if self.monitored: 747 self.bufferMon.observe(y = self.nrBuffered, t = self.sim.now()) 748 self._sort = None
749 750 751
752 - def addSort(self, sortFunc):
753 """Adds buffer sorting to this instance of Store. It maintains 754 theBuffer sorted by the sortAttr attribute of the objects in the 755 buffer. 756 The user - provided 'sortFunc' must look like this: 757 758 def mySort(self, par): 759 tmplist = [(x.sortAttr, x) for x in par] 760 tmplist.sort() 761 return [x for (key, x) in tmplist] 762 763 """ 764 765 self._sort = new.instancemethod(sortFunc, self, self.__class__) 766 self.theBuffer = self._sort(self.theBuffer)
767
768 - def _put(self, arg):
769 """Handles put requests for Store instances""" 770 obj = arg[1] 771 whichSim=self.sim 772 if len(arg[0]) == 5: # yield put, self, buff, whattoput, priority 773 obj._putpriority[self] = arg[0][4] 774 whatToPut = arg[0][3] 775 elif len(arg[0]) == 4: # yield put, self, buff, whattoput 776 obj._putpriority[self] = Buffer.priorityDefault #default 777 whatToPut = arg[0][3] 778 else: # error, whattoput missing 779 raise FatalSimerror('Item to put missing in yield put stmt') 780 if type(whatToPut) != type([]): 781 raise FatalSimerror('put parameter is not a list') 782 whatToPutNr = len(whatToPut) 783 if whatToPutNr + self.nrBuffered > self.capacity: 784 obj._nextTime = None #passivate put requestor 785 obj._whatToPut = whatToPut 786 self.putQ.enterPut(obj) #and queue, with items to put 787 else: 788 self.theBuffer.extend(whatToPut) 789 if not(self._sort is None): 790 self.theBuffer = self._sort(self.theBuffer) 791 if self.monitored: 792 self.bufferMon.observe(y = self.nrBuffered, t = whichSim.now()) 793 794 # service any waiting getters 795 # service in queue order: do not serve second in queue before first 796 # has been served 797 while self.nrBuffered > 0 and len(self.getQ): 798 proc = self.getQ[0] 799 if inspect.isfunction(proc._nrToGet): 800 movCand = proc._nrToGet(self.theBuffer) #predicate parameter 801 if movCand: 802 proc.got = movCand[:] 803 for i in movCand: 804 self.theBuffer.remove(i) 805 self.getQ.takeout(proc) 806 if self.monitored: 807 self.bufferMon.observe( 808 y = self.nrBuffered, t = whichSim._t) 809 whichSim._e._post(what = proc, at = whichSim._t) # continue a blocked get requestor 810 else: 811 break 812 else: #numerical parameter 813 if proc._nrToGet <= self.nrBuffered: 814 nrToGet = proc._nrToGet 815 proc.got = [] 816 proc.got[:] = self.theBuffer[0:nrToGet] 817 self.theBuffer[:] = self.theBuffer[nrToGet:] 818 if self.monitored: 819 self.bufferMon.observe( 820 y = self.nrBuffered, t = whichSim._t) 821 # take this get requestor's record out of queue: 822 self.getQ.takeout(proc) 823 whichSim._e._post(what = proc, at = whichSim._t) # continue a blocked get requestor 824 else: 825 break 826 827 whichSim._e._post(what = obj, at = whichSim._t, prior = 1) # continue the put requestor
828
829 - def _get(self, arg):
830 """Handles get requests""" 831 filtfunc = None 832 obj = arg[1] 833 whichSim=obj.sim 834 obj.got = [] # the list of items retrieved by 'get' 835 if len(arg[0]) == 5: # yield get, self, buff, whattoget, priority 836 obj._getpriority[self] = arg[0][4] 837 if inspect.isfunction(arg[0][3]): 838 filtfunc = arg[0][3] 839 else: 840 nrToGet = arg[0][3] 841 elif len(arg[0]) == 4: # yield get, self, buff, whattoget 842 obj._getpriority[self] = Buffer.priorityDefault #default 843 if inspect.isfunction(arg[0][3]): 844 filtfunc = arg[0][3] 845 else: 846 nrToGet = arg[0][3] 847 else: # yield get, self, buff 848 obj._getpriority[self] = Buffer.priorityDefault 849 nrToGet = 1 850 if not filtfunc: #number specifies nr items to get 851 if nrToGet < 0: 852 raise FatalSimerror\ 853 ('Store: get parameter not positive number: %s'%nrToGet) 854 if self.nrBuffered < nrToGet: 855 obj._nrToGet = nrToGet 856 self.getQ.enterGet(obj) 857 # passivate / block queuing 'get' process 858 obj._nextTime = None 859 else: 860 for i in range(nrToGet): 861 obj.got.append(self.theBuffer.pop(0)) # move items from 862 # buffer to requesting process 863 if self.monitored: 864 self.bufferMon.observe(y = self.nrBuffered, t = whichSim.now()) 865 whichSim._e._post(obj, at = whichSim._t, prior = 1) 866 # reactivate any put requestors for which space is now available 867 # serve in queue order: do not serve second in queue before first 868 # has been served 869 while len(self.putQ): 870 proc = self.putQ[0] 871 if len(proc._whatToPut) + self.nrBuffered <= self.capacity: 872 for i in proc._whatToPut: 873 self.theBuffer.append(i) #move items to buffer 874 if not(self._sort is None): 875 self.theBuffer = self._sort(self.theBuffer) 876 if self.monitored: 877 self.bufferMon.observe( 878 y = self.nrBuffered, t = whichSim.now()) 879 self.putQ.takeout(proc) # dequeue requestor's record 880 whichSim._e._post(proc, at = whichSim._t) # continue a blocked put requestor 881 else: 882 break 883 else: # items to get determined by filtfunc 884 movCand = filtfunc(self.theBuffer) 885 if movCand: # get succeded 886 whichSim._e._post(obj, at = whichSim._t, prior = 1) 887 obj.got = movCand[:] 888 for item in movCand: 889 self.theBuffer.remove(item) 890 if self.monitored: 891 self.bufferMon.observe(y = self.nrBuffered, t = whichSim.now()) 892 # reactivate any put requestors for which space is now available 893 # serve in queue order: do not serve second in queue before first 894 # has been served 895 while len(self.putQ): 896 proc = self.putQ[0] 897 if len(proc._whatToPut) + self.nrBuffered <= self.capacity: 898 for i in proc._whatToPut: 899 self.theBuffer.append(i) #move items to buffer 900 if not(self._sort is None): 901 self.theBuffer = self._sort(self.theBuffer) 902 if self.monitored: 903 self.bufferMon.observe( 904 y = self.nrBuffered, t = whichSim.now()) 905 self.putQ.takeout(proc) # dequeue requestor's record 906 whichSim._e._post(proc, at = whichSim._t) # continue a blocked put requestor 907 else: 908 break 909 else: # get did not succeed, block 910 obj._nrToGet = filtfunc 911 self.getQ.enterGet(obj) 912 # passivate / block queuing 'get' process 913 obj._nextTime = None
914