1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.SocketAddress;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.AsynchronousCloseException;
24 import java.nio.channels.CancelledKeyException;
25 import java.nio.channels.ClosedChannelException;
26 import java.nio.channels.NotYetConnectedException;
27 import java.nio.channels.SelectionKey;
28 import java.nio.channels.Selector;
29 import java.nio.channels.SocketChannel;
30 import java.util.Iterator;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.locks.ReadWriteLock;
37 import java.util.concurrent.locks.ReentrantReadWriteLock;
38
39 import org.jboss.netty.buffer.ChannelBuffer;
40 import org.jboss.netty.buffer.ChannelBufferFactory;
41 import org.jboss.netty.channel.Channel;
42 import org.jboss.netty.channel.ChannelException;
43 import org.jboss.netty.channel.ChannelFuture;
44 import org.jboss.netty.channel.MessageEvent;
45 import org.jboss.netty.channel.ReceiveBufferSizePredictor;
46 import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
47 import org.jboss.netty.logging.InternalLogger;
48 import org.jboss.netty.logging.InternalLoggerFactory;
49 import org.jboss.netty.util.ThreadRenamingRunnable;
50 import org.jboss.netty.util.internal.IoWorkerRunnable;
51 import org.jboss.netty.util.internal.LinkedTransferQueue;
52
53
54
55
56
57
58
59
60
61 class NioWorker implements Runnable {
62
63 private static final InternalLogger logger =
64 InternalLoggerFactory.getInstance(NioWorker.class);
65
66 private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL;
67
68 static final int CLEANUP_INTERVAL = 256;
69
70 private final int bossId;
71 private final int id;
72 private final Executor executor;
73 private boolean started;
74 private volatile Thread thread;
75 volatile Selector selector;
76 private final AtomicBoolean wakenUp = new AtomicBoolean();
77 private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
78 private final Object startStopLock = new Object();
79 private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
80 private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>();
81 private volatile int cancelledKeys;
82
83 private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool();
84 private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
85
86 NioWorker(int bossId, int id, Executor executor) {
87 this.bossId = bossId;
88 this.id = id;
89 this.executor = executor;
90 }
91
92 void register(NioSocketChannel channel, ChannelFuture future) {
93
94 boolean server = !(channel instanceof NioClientSocketChannel);
95 Runnable registerTask = new RegisterTask(channel, future, server);
96 Selector selector;
97
98 synchronized (startStopLock) {
99 if (!started) {
100
101 try {
102 this.selector = selector = Selector.open();
103 } catch (Throwable t) {
104 throw new ChannelException(
105 "Failed to create a selector.", t);
106 }
107
108
109 String threadName =
110 (server ? "New I/O server worker #"
111 : "New I/O client worker #") + bossId + '-' + id;
112
113 boolean success = false;
114 try {
115 executor.execute(
116 new IoWorkerRunnable(
117 new ThreadRenamingRunnable(this, threadName)));
118 success = true;
119 } finally {
120 if (!success) {
121
122 try {
123 selector.close();
124 } catch (Throwable t) {
125 logger.warn("Failed to close a selector.", t);
126 }
127 this.selector = selector = null;
128
129 }
130 }
131 } else {
132
133 selector = this.selector;
134 }
135
136 assert selector != null && selector.isOpen();
137
138 started = true;
139 boolean offered = registerTaskQueue.offer(registerTask);
140 assert offered;
141 }
142
143 if (wakenUp.compareAndSet(false, true)) {
144 selector.wakeup();
145 }
146 }
147
148 public void run() {
149 thread = Thread.currentThread();
150
151 boolean shutdown = false;
152 Selector selector = this.selector;
153 for (;;) {
154 wakenUp.set(false);
155
156 if (CONSTRAINT_LEVEL != 0) {
157 selectorGuard.writeLock().lock();
158
159
160 selectorGuard.writeLock().unlock();
161 }
162
163 try {
164 SelectorUtil.select(selector);
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194 if (wakenUp.get()) {
195 selector.wakeup();
196 }
197
198 cancelledKeys = 0;
199 processRegisterTaskQueue();
200 processWriteTaskQueue();
201 processSelectedKeys(selector.selectedKeys());
202
203
204
205
206
207
208 if (selector.keys().isEmpty()) {
209 if (shutdown ||
210 executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
211
212 synchronized (startStopLock) {
213 if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
214 started = false;
215 try {
216 selector.close();
217 } catch (IOException e) {
218 logger.warn(
219 "Failed to close a selector.", e);
220 } finally {
221 this.selector = null;
222 }
223 break;
224 } else {
225 shutdown = false;
226 }
227 }
228 } else {
229
230 shutdown = true;
231 }
232 } else {
233 shutdown = false;
234 }
235 } catch (Throwable t) {
236 logger.warn(
237 "Unexpected exception in the selector loop.", t);
238
239
240
241 try {
242 Thread.sleep(1000);
243 } catch (InterruptedException e) {
244
245 }
246 }
247 }
248 }
249
250 private void processRegisterTaskQueue() throws IOException {
251 for (;;) {
252 final Runnable task = registerTaskQueue.poll();
253 if (task == null) {
254 break;
255 }
256
257 task.run();
258 cleanUpCancelledKeys();
259 }
260 }
261
262 private void processWriteTaskQueue() throws IOException {
263 for (;;) {
264 final Runnable task = writeTaskQueue.poll();
265 if (task == null) {
266 break;
267 }
268
269 task.run();
270 cleanUpCancelledKeys();
271 }
272 }
273
274 private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
275 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
276 SelectionKey k = i.next();
277 i.remove();
278 try {
279 int readyOps = k.readyOps();
280 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
281 if (!read(k)) {
282
283 continue;
284 }
285 }
286 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
287 writeFromSelectorLoop(k);
288 }
289 } catch (CancelledKeyException e) {
290 close(k);
291 }
292
293 if (cleanUpCancelledKeys()) {
294 break;
295 }
296 }
297 }
298
299 private boolean cleanUpCancelledKeys() throws IOException {
300 if (cancelledKeys >= CLEANUP_INTERVAL) {
301 cancelledKeys = 0;
302 selector.selectNow();
303 return true;
304 }
305 return false;
306 }
307
308 private boolean read(SelectionKey k) {
309 final SocketChannel ch = (SocketChannel) k.channel();
310 final NioSocketChannel channel = (NioSocketChannel) k.attachment();
311
312 final ReceiveBufferSizePredictor predictor =
313 channel.getConfig().getReceiveBufferSizePredictor();
314 final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
315
316 int ret = 0;
317 int readBytes = 0;
318 boolean failure = true;
319
320 ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
321 try {
322 while ((ret = ch.read(bb)) > 0) {
323 readBytes += ret;
324 if (!bb.hasRemaining()) {
325 break;
326 }
327 }
328 failure = false;
329 } catch (ClosedChannelException e) {
330
331 } catch (Throwable t) {
332 fireExceptionCaught(channel, t);
333 }
334
335 if (readBytes > 0) {
336 bb.flip();
337
338 final ChannelBufferFactory bufferFactory =
339 channel.getConfig().getBufferFactory();
340 final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
341 buffer.setBytes(0, bb);
342 buffer.writerIndex(readBytes);
343
344 recvBufferPool.release(bb);
345
346
347 predictor.previousReceiveBufferSize(readBytes);
348
349
350 fireMessageReceived(channel, buffer);
351 } else {
352 recvBufferPool.release(bb);
353 }
354
355 if (ret < 0 || failure) {
356 close(channel, succeededFuture(channel));
357 return false;
358 }
359
360 return true;
361 }
362
363 private void close(SelectionKey k) {
364 NioSocketChannel ch = (NioSocketChannel) k.attachment();
365 close(ch, succeededFuture(ch));
366 }
367
368 void writeFromUserCode(final NioSocketChannel channel) {
369 if (!channel.isConnected()) {
370 cleanUpWriteBuffer(channel);
371 return;
372 }
373
374 if (scheduleWriteIfNecessary(channel)) {
375 return;
376 }
377
378
379
380 if (channel.writeSuspended) {
381 return;
382 }
383
384 if (channel.inWriteNowLoop) {
385 return;
386 }
387
388 write0(channel);
389 }
390
391 void writeFromTaskLoop(final NioSocketChannel ch) {
392 if (!ch.writeSuspended) {
393 write0(ch);
394 }
395 }
396
397 void writeFromSelectorLoop(final SelectionKey k) {
398 NioSocketChannel ch = (NioSocketChannel) k.attachment();
399 ch.writeSuspended = false;
400 write0(ch);
401 }
402
403 private boolean scheduleWriteIfNecessary(final NioSocketChannel channel) {
404 final Thread currentThread = Thread.currentThread();
405 final Thread workerThread = thread;
406 if (currentThread != workerThread) {
407 if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
408 boolean offered = writeTaskQueue.offer(channel.writeTask);
409 assert offered;
410 }
411
412 if (!(channel instanceof NioAcceptedSocketChannel) ||
413 ((NioAcceptedSocketChannel) channel).bossThread != currentThread) {
414 final Selector workerSelector = selector;
415 if (workerSelector != null) {
416 if (wakenUp.compareAndSet(false, true)) {
417 workerSelector.wakeup();
418 }
419 }
420 } else {
421
422
423
424
425
426
427
428
429
430 }
431
432 return true;
433 }
434
435 return false;
436 }
437
438 private void write0(NioSocketChannel channel) {
439 boolean open = true;
440 boolean addOpWrite = false;
441 boolean removeOpWrite = false;
442
443 long writtenBytes = 0;
444
445 final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
446 final SocketChannel ch = channel.socket;
447 final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
448 final int writeSpinCount = channel.getConfig().getWriteSpinCount();
449 synchronized (channel.writeLock) {
450 channel.inWriteNowLoop = true;
451 for (;;) {
452 MessageEvent evt = channel.currentWriteEvent;
453 SendBuffer buf;
454 if (evt == null) {
455 if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
456 removeOpWrite = true;
457 channel.writeSuspended = false;
458 break;
459 }
460
461 channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
462 } else {
463 buf = channel.currentWriteBuffer;
464 }
465
466 ChannelFuture future = evt.getFuture();
467 try {
468 long localWrittenBytes = 0;
469 for (int i = writeSpinCount; i > 0; i --) {
470 localWrittenBytes = buf.transferTo(ch);
471 if (localWrittenBytes != 0) {
472 writtenBytes += localWrittenBytes;
473 break;
474 }
475 if (buf.finished()) {
476 break;
477 }
478 }
479
480 if (buf.finished()) {
481
482 buf.release();
483 channel.currentWriteEvent = null;
484 channel.currentWriteBuffer = null;
485 evt = null;
486 buf = null;
487 future.setSuccess();
488 } else {
489
490 addOpWrite = true;
491 channel.writeSuspended = true;
492
493 if (localWrittenBytes > 0) {
494
495 future.setProgress(
496 localWrittenBytes,
497 buf.writtenBytes(), buf.totalBytes());
498 }
499 break;
500 }
501 } catch (AsynchronousCloseException e) {
502
503 } catch (Throwable t) {
504 buf.release();
505 channel.currentWriteEvent = null;
506 channel.currentWriteBuffer = null;
507 buf = null;
508 evt = null;
509 future.setFailure(t);
510 fireExceptionCaught(channel, t);
511 if (t instanceof IOException) {
512 open = false;
513 close(channel, succeededFuture(channel));
514 }
515 }
516 }
517 channel.inWriteNowLoop = false;
518 }
519
520 fireWriteComplete(channel, writtenBytes);
521
522 if (open) {
523 if (addOpWrite) {
524 setOpWrite(channel);
525 } else if (removeOpWrite) {
526 clearOpWrite(channel);
527 }
528 }
529 }
530
531 private void setOpWrite(NioSocketChannel channel) {
532 Selector selector = this.selector;
533 SelectionKey key = channel.socket.keyFor(selector);
534 if (key == null) {
535 return;
536 }
537 if (!key.isValid()) {
538 close(key);
539 return;
540 }
541
542
543
544 synchronized (channel.interestOpsLock) {
545 int interestOps = channel.getRawInterestOps();
546 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
547 interestOps |= SelectionKey.OP_WRITE;
548 key.interestOps(interestOps);
549 channel.setRawInterestOpsNow(interestOps);
550 }
551 }
552 }
553
554 private void clearOpWrite(NioSocketChannel channel) {
555 Selector selector = this.selector;
556 SelectionKey key = channel.socket.keyFor(selector);
557 if (key == null) {
558 return;
559 }
560 if (!key.isValid()) {
561 close(key);
562 return;
563 }
564
565
566
567 synchronized (channel.interestOpsLock) {
568 int interestOps = channel.getRawInterestOps();
569 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
570 interestOps &= ~SelectionKey.OP_WRITE;
571 key.interestOps(interestOps);
572 channel.setRawInterestOpsNow(interestOps);
573 }
574 }
575 }
576
577 void close(NioSocketChannel channel, ChannelFuture future) {
578 boolean connected = channel.isConnected();
579 boolean bound = channel.isBound();
580 try {
581 channel.socket.close();
582 cancelledKeys ++;
583
584 if (channel.setClosed()) {
585 future.setSuccess();
586 if (connected) {
587 fireChannelDisconnected(channel);
588 }
589 if (bound) {
590 fireChannelUnbound(channel);
591 }
592
593 cleanUpWriteBuffer(channel);
594 fireChannelClosed(channel);
595 } else {
596 future.setSuccess();
597 }
598 } catch (Throwable t) {
599 future.setFailure(t);
600 fireExceptionCaught(channel, t);
601 }
602 }
603
604 private void cleanUpWriteBuffer(NioSocketChannel channel) {
605 Exception cause = null;
606 boolean fireExceptionCaught = false;
607
608
609 synchronized (channel.writeLock) {
610 MessageEvent evt = channel.currentWriteEvent;
611 if (evt != null) {
612
613
614 if (channel.isOpen()) {
615 cause = new NotYetConnectedException();
616 } else {
617 cause = new ClosedChannelException();
618 }
619
620 ChannelFuture future = evt.getFuture();
621 channel.currentWriteBuffer.release();
622 channel.currentWriteBuffer = null;
623 channel.currentWriteEvent = null;
624 evt = null;
625 future.setFailure(cause);
626 fireExceptionCaught = true;
627 }
628
629 Queue<MessageEvent> writeBuffer = channel.writeBuffer;
630 if (!writeBuffer.isEmpty()) {
631
632
633 if (cause == null) {
634 if (channel.isOpen()) {
635 cause = new NotYetConnectedException();
636 } else {
637 cause = new ClosedChannelException();
638 }
639 }
640
641 for (;;) {
642 evt = writeBuffer.poll();
643 if (evt == null) {
644 break;
645 }
646 evt.getFuture().setFailure(cause);
647 fireExceptionCaught = true;
648 }
649 }
650 }
651
652 if (fireExceptionCaught) {
653 fireExceptionCaught(channel, cause);
654 }
655 }
656
657 void setInterestOps(
658 NioSocketChannel channel, ChannelFuture future, int interestOps) {
659 boolean changed = false;
660 try {
661
662
663 synchronized (channel.interestOpsLock) {
664 Selector selector = this.selector;
665 SelectionKey key = channel.socket.keyFor(selector);
666
667 if (key == null || selector == null) {
668
669
670 channel.setRawInterestOpsNow(interestOps);
671 return;
672 }
673
674
675 interestOps &= ~Channel.OP_WRITE;
676 interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE;
677
678 switch (CONSTRAINT_LEVEL) {
679 case 0:
680 if (channel.getRawInterestOps() != interestOps) {
681 key.interestOps(interestOps);
682 if (Thread.currentThread() != thread &&
683 wakenUp.compareAndSet(false, true)) {
684 selector.wakeup();
685 }
686 changed = true;
687 }
688 break;
689 case 1:
690 case 2:
691 if (channel.getRawInterestOps() != interestOps) {
692 if (Thread.currentThread() == thread) {
693 key.interestOps(interestOps);
694 changed = true;
695 } else {
696 selectorGuard.readLock().lock();
697 try {
698 if (wakenUp.compareAndSet(false, true)) {
699 selector.wakeup();
700 }
701 key.interestOps(interestOps);
702 changed = true;
703 } finally {
704 selectorGuard.readLock().unlock();
705 }
706 }
707 }
708 break;
709 default:
710 throw new Error();
711 }
712
713 if (changed) {
714 channel.setRawInterestOpsNow(interestOps);
715 }
716 }
717
718 future.setSuccess();
719 if (changed) {
720 fireChannelInterestChanged(channel);
721 }
722 } catch (CancelledKeyException e) {
723
724 ClosedChannelException cce = new ClosedChannelException();
725 future.setFailure(cce);
726 fireExceptionCaught(channel, cce);
727 } catch (Throwable t) {
728 future.setFailure(t);
729 fireExceptionCaught(channel, t);
730 }
731 }
732
733 private final class RegisterTask implements Runnable {
734 private final NioSocketChannel channel;
735 private final ChannelFuture future;
736 private final boolean server;
737
738 RegisterTask(
739 NioSocketChannel channel, ChannelFuture future, boolean server) {
740
741 this.channel = channel;
742 this.future = future;
743 this.server = server;
744 }
745
746 public void run() {
747 SocketAddress localAddress = channel.getLocalAddress();
748 SocketAddress remoteAddress = channel.getRemoteAddress();
749 if (localAddress == null || remoteAddress == null) {
750 if (future != null) {
751 future.setFailure(new ClosedChannelException());
752 }
753 close(channel, succeededFuture(channel));
754 return;
755 }
756
757 try {
758 if (server) {
759 channel.socket.configureBlocking(false);
760 }
761
762 synchronized (channel.interestOpsLock) {
763 channel.socket.register(
764 selector, channel.getRawInterestOps(), channel);
765 }
766 if (future != null) {
767 channel.setConnected();
768 future.setSuccess();
769 }
770 } catch (IOException e) {
771 if (future != null) {
772 future.setFailure(e);
773 }
774 close(channel, succeededFuture(channel));
775 if (!(e instanceof ClosedChannelException)) {
776 throw new ChannelException(
777 "Failed to register a socket to the selector.", e);
778 }
779 }
780
781 if (!server) {
782 if (!((NioClientSocketChannel) channel).boundManually) {
783 fireChannelBound(channel, localAddress);
784 }
785 fireChannelConnected(channel, remoteAddress);
786 }
787 }
788 }
789 }