1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.SocketAddress;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.AsynchronousCloseException;
24 import java.nio.channels.CancelledKeyException;
25 import java.nio.channels.ClosedChannelException;
26 import java.nio.channels.DatagramChannel;
27 import java.nio.channels.NotYetConnectedException;
28 import java.nio.channels.SelectionKey;
29 import java.nio.channels.Selector;
30 import java.util.Iterator;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.locks.ReadWriteLock;
37 import java.util.concurrent.locks.ReentrantReadWriteLock;
38
39 import org.jboss.netty.buffer.ChannelBufferFactory;
40 import org.jboss.netty.channel.Channel;
41 import org.jboss.netty.channel.ChannelException;
42 import org.jboss.netty.channel.ChannelFuture;
43 import org.jboss.netty.channel.MessageEvent;
44 import org.jboss.netty.channel.ReceiveBufferSizePredictor;
45 import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
46 import org.jboss.netty.logging.InternalLogger;
47 import org.jboss.netty.logging.InternalLoggerFactory;
48 import org.jboss.netty.util.ThreadRenamingRunnable;
49 import org.jboss.netty.util.internal.LinkedTransferQueue;
50
51
52
53
54
55
56
57
58
59
60
61 class NioDatagramWorker implements Runnable {
62
63
64
65 private static final InternalLogger logger = InternalLoggerFactory
66 .getInstance(NioDatagramWorker.class);
67
68
69
70
71 private final int id;
72
73
74
75
76 private final int bossId;
77
78
79
80
81
82 private final Executor executor;
83
84
85
86
87 private boolean started;
88
89
90
91
92
93 private volatile Thread thread;
94
95
96
97
98 volatile Selector selector;
99
100
101
102
103
104
105
106 private final AtomicBoolean wakenUp = new AtomicBoolean();
107
108
109
110
111 private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
112
113
114
115
116 private final Object startStopLock = new Object();
117
118
119
120
121 private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
122
123
124
125
126 private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>();
127
128 private volatile int cancelledKeys;
129
130 private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
131
132
133
134
135
136
137
138
139
140 NioDatagramWorker(final int bossId, final int id, final Executor executor) {
141 this.bossId = bossId;
142 this.id = id;
143 this.executor = executor;
144 }
145
146
147
148
149
150
151
152
153 void register(final NioDatagramChannel channel, final ChannelFuture future) {
154 final Runnable channelRegTask = new ChannelRegistionTask(channel,
155 future);
156 Selector selector;
157
158 synchronized (startStopLock) {
159 if (!started) {
160
161 try {
162 this.selector = selector = Selector.open();
163 } catch (final Throwable t) {
164 throw new ChannelException("Failed to create a selector.",
165 t);
166 }
167
168 boolean success = false;
169 try {
170
171 executor.execute(new ThreadRenamingRunnable(this,
172 "New I/O datagram worker #" + bossId + "'-'" + id));
173 success = true;
174 } finally {
175 if (!success) {
176 try {
177
178 selector.close();
179 } catch (final Throwable t) {
180 logger.warn("Failed to close a selector.", t);
181 }
182 this.selector = selector = null;
183
184 }
185 }
186 } else {
187
188 selector = this.selector;
189 }
190 assert selector != null && selector.isOpen();
191
192 started = true;
193
194
195 boolean offered = registerTaskQueue.offer(channelRegTask);
196 assert offered;
197 }
198
199 if (wakenUp.compareAndSet(false, true)) {
200 selector.wakeup();
201 }
202 }
203
204
205
206
207 public void run() {
208
209 thread = Thread.currentThread();
210
211 final Selector selector = this.selector;
212 boolean shutdown = false;
213
214 for (;;) {
215 wakenUp.set(false);
216
217 if (NioProviderMetadata.CONSTRAINT_LEVEL != 0) {
218 selectorGuard.writeLock().lock();
219
220 selectorGuard.writeLock().unlock();
221 }
222
223 try {
224 SelectorUtil.select(selector);
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254 if (wakenUp.get()) {
255 selector.wakeup();
256 }
257
258 cancelledKeys = 0;
259 processRegisterTaskQueue();
260 processWriteTaskQueue();
261 processSelectedKeys(selector.selectedKeys());
262
263
264
265
266
267
268
269 if (selector.keys().isEmpty()) {
270 if (shutdown || executor instanceof ExecutorService &&
271 ((ExecutorService) executor).isShutdown()) {
272 synchronized (startStopLock) {
273 if (registerTaskQueue.isEmpty() &&
274 selector.keys().isEmpty()) {
275 started = false;
276 try {
277 selector.close();
278 } catch (IOException e) {
279 logger.warn("Failed to close a selector.",
280 e);
281 } finally {
282 this.selector = null;
283 }
284 break;
285 } else {
286 shutdown = false;
287 }
288 }
289 } else {
290
291 shutdown = true;
292 }
293 } else {
294 shutdown = false;
295 }
296 } catch (Throwable t) {
297 logger.warn("Unexpected exception in the selector loop.", t);
298
299
300
301 try {
302 Thread.sleep(1000);
303 } catch (InterruptedException e) {
304
305 }
306 }
307 }
308 }
309
310
311
312
313
314 private void processRegisterTaskQueue() throws IOException {
315 for (;;) {
316 final Runnable task = registerTaskQueue.poll();
317 if (task == null) {
318 break;
319 }
320
321 task.run();
322 cleanUpCancelledKeys();
323 }
324 }
325
326
327
328
329 private void processWriteTaskQueue() throws IOException {
330 for (;;) {
331 final Runnable task = writeTaskQueue.poll();
332 if (task == null) {
333 break;
334 }
335
336 task.run();
337 cleanUpCancelledKeys();
338 }
339 }
340
341 private void processSelectedKeys(final Set<SelectionKey> selectedKeys) throws IOException {
342 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
343 SelectionKey k = i.next();
344 i.remove();
345 try {
346 int readyOps = k.readyOps();
347 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
348 if (!read(k)) {
349
350 continue;
351 }
352 }
353 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
354 writeFromSelectorLoop(k);
355 }
356 } catch (CancelledKeyException e) {
357 close(k);
358 }
359
360 if (cleanUpCancelledKeys()) {
361 break;
362 }
363 }
364 }
365
366 private boolean cleanUpCancelledKeys() throws IOException {
367 if (cancelledKeys >= NioWorker.CLEANUP_INTERVAL) {
368 cancelledKeys = 0;
369 selector.selectNow();
370 return true;
371 }
372 return false;
373 }
374
375
376
377
378
379
380
381
382 private boolean read(final SelectionKey key) {
383 final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();
384 ReceiveBufferSizePredictor predictor =
385 channel.getConfig().getReceiveBufferSizePredictor();
386 final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
387 final DatagramChannel nioChannel = (DatagramChannel) key.channel();
388
389
390
391
392
393 final ByteBuffer byteBuffer = ByteBuffer.allocate(
394 predictor.nextReceiveBufferSize()).order(bufferFactory.getDefaultOrder());
395
396 boolean failure = true;
397 SocketAddress remoteAddress = null;
398 try {
399
400
401 remoteAddress = nioChannel.receive(byteBuffer);
402 failure = false;
403 } catch (ClosedChannelException e) {
404
405 } catch (Throwable t) {
406 fireExceptionCaught(channel, t);
407 }
408
409 if (remoteAddress != null) {
410
411 byteBuffer.flip();
412
413 int readBytes = byteBuffer.remaining();
414 if (readBytes > 0) {
415
416 predictor.previousReceiveBufferSize(readBytes);
417
418
419 fireMessageReceived(
420 channel, bufferFactory.getBuffer(byteBuffer), remoteAddress);
421 }
422 }
423
424 if (failure) {
425 close(channel, succeededFuture(channel));
426 return false;
427 }
428
429 return true;
430 }
431
432 private void close(SelectionKey k) {
433 final NioDatagramChannel ch = (NioDatagramChannel) k.attachment();
434 close(ch, succeededFuture(ch));
435 }
436
437 void writeFromUserCode(final NioDatagramChannel channel) {
438
439
440
441
442
443 if (!channel.isOpen()) {
444 cleanUpWriteBuffer(channel);
445 return;
446 }
447
448 if (scheduleWriteIfNecessary(channel)) {
449 return;
450 }
451
452
453
454 if (channel.writeSuspended) {
455 return;
456 }
457
458 if (channel.inWriteNowLoop) {
459 return;
460 }
461
462 write0(channel);
463 }
464
465 void writeFromTaskLoop(final NioDatagramChannel ch) {
466 if (!ch.writeSuspended) {
467 write0(ch);
468 }
469 }
470
471 void writeFromSelectorLoop(final SelectionKey k) {
472 NioDatagramChannel ch = (NioDatagramChannel) k.attachment();
473 ch.writeSuspended = false;
474 write0(ch);
475 }
476
477 private boolean scheduleWriteIfNecessary(final NioDatagramChannel channel) {
478 final Thread workerThread = thread;
479 if (workerThread == null || Thread.currentThread() != workerThread) {
480 if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
481
482 boolean offered = writeTaskQueue.offer(channel.writeTask);
483 assert offered;
484 }
485
486 final Selector selector = this.selector;
487 if (selector != null) {
488 if (wakenUp.compareAndSet(false, true)) {
489 selector.wakeup();
490 }
491 }
492 return true;
493 }
494
495 return false;
496 }
497
498 private void write0(final NioDatagramChannel channel) {
499
500 boolean addOpWrite = false;
501 boolean removeOpWrite = false;
502
503 long writtenBytes = 0;
504
505 final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
506 final DatagramChannel ch = channel.getDatagramChannel();
507 final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
508 final int writeSpinCount = channel.getConfig().getWriteSpinCount();
509 synchronized (channel.writeLock) {
510
511 channel.inWriteNowLoop = true;
512
513
514 for (;;) {
515 MessageEvent evt = channel.currentWriteEvent;
516 SendBuffer buf;
517 if (evt == null) {
518 if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
519 removeOpWrite = true;
520 channel.writeSuspended = false;
521 break;
522 }
523
524 channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
525 } else {
526 buf = channel.currentWriteBuffer;
527 }
528
529 try {
530 long localWrittenBytes = 0;
531 SocketAddress raddr = evt.getRemoteAddress();
532 if (raddr == null) {
533 for (int i = writeSpinCount; i > 0; i --) {
534 localWrittenBytes = buf.transferTo(ch);
535 if (localWrittenBytes != 0) {
536 writtenBytes += localWrittenBytes;
537 break;
538 }
539 if (buf.finished()) {
540 break;
541 }
542 }
543 } else {
544 for (int i = writeSpinCount; i > 0; i --) {
545 localWrittenBytes = buf.transferTo(ch, raddr);
546 if (localWrittenBytes != 0) {
547 writtenBytes += localWrittenBytes;
548 break;
549 }
550 if (buf.finished()) {
551 break;
552 }
553 }
554 }
555
556 if (localWrittenBytes > 0 || buf.finished()) {
557
558 buf.release();
559 ChannelFuture future = evt.getFuture();
560 channel.currentWriteEvent = null;
561 channel.currentWriteBuffer = null;
562 evt = null;
563 buf = null;
564 future.setSuccess();
565 } else {
566
567 addOpWrite = true;
568 channel.writeSuspended = true;
569 break;
570 }
571 } catch (final AsynchronousCloseException e) {
572
573 } catch (final Throwable t) {
574 buf.release();
575 ChannelFuture future = evt.getFuture();
576 channel.currentWriteEvent = null;
577 channel.currentWriteBuffer = null;
578 buf = null;
579 evt = null;
580 future.setFailure(t);
581 fireExceptionCaught(channel, t);
582 }
583 }
584 channel.inWriteNowLoop = false;
585 }
586
587 fireWriteComplete(channel, writtenBytes);
588
589 if (addOpWrite) {
590 setOpWrite(channel);
591 } else if (removeOpWrite) {
592 clearOpWrite(channel);
593 }
594 }
595
596 private void setOpWrite(final NioDatagramChannel channel) {
597 Selector selector = this.selector;
598 SelectionKey key = channel.getDatagramChannel().keyFor(selector);
599 if (key == null) {
600 return;
601 }
602 if (!key.isValid()) {
603 close(key);
604 return;
605 }
606
607
608
609 synchronized (channel.interestOpsLock) {
610 int interestOps = channel.getRawInterestOps();
611 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
612 interestOps |= SelectionKey.OP_WRITE;
613 key.interestOps(interestOps);
614 channel.setRawInterestOpsNow(interestOps);
615 }
616 }
617 }
618
619 private void clearOpWrite(NioDatagramChannel channel) {
620 Selector selector = this.selector;
621 SelectionKey key = channel.getDatagramChannel().keyFor(selector);
622 if (key == null) {
623 return;
624 }
625 if (!key.isValid()) {
626 close(key);
627 return;
628 }
629
630
631
632 synchronized (channel.interestOpsLock) {
633 int interestOps = channel.getRawInterestOps();
634 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
635 interestOps &= ~SelectionKey.OP_WRITE;
636 key.interestOps(interestOps);
637 channel.setRawInterestOpsNow(interestOps);
638 }
639 }
640 }
641
642 static void disconnect(NioDatagramChannel channel, ChannelFuture future) {
643 boolean connected = channel.isConnected();
644 try {
645 channel.getDatagramChannel().disconnect();
646 future.setSuccess();
647 if (connected) {
648 fireChannelDisconnected(channel);
649 }
650 } catch (Throwable t) {
651 future.setFailure(t);
652 fireExceptionCaught(channel, t);
653 }
654 }
655
656 void close(final NioDatagramChannel channel,
657 final ChannelFuture future) {
658 boolean connected = channel.isConnected();
659 boolean bound = channel.isBound();
660 try {
661 channel.getDatagramChannel().close();
662 cancelledKeys ++;
663
664 if (channel.setClosed()) {
665 future.setSuccess();
666 if (connected) {
667 fireChannelDisconnected(channel);
668 }
669 if (bound) {
670 fireChannelUnbound(channel);
671 }
672
673 cleanUpWriteBuffer(channel);
674 fireChannelClosed(channel);
675 } else {
676 future.setSuccess();
677 }
678 } catch (Throwable t) {
679 future.setFailure(t);
680 fireExceptionCaught(channel, t);
681 }
682 }
683
684 private void cleanUpWriteBuffer(final NioDatagramChannel channel) {
685 Exception cause = null;
686 boolean fireExceptionCaught = false;
687
688
689 synchronized (channel.writeLock) {
690 MessageEvent evt = channel.currentWriteEvent;
691 if (evt != null) {
692
693
694 if (channel.isOpen()) {
695 cause = new NotYetConnectedException();
696 } else {
697 cause = new ClosedChannelException();
698 }
699
700 ChannelFuture future = evt.getFuture();
701 channel.currentWriteBuffer.release();
702 channel.currentWriteBuffer = null;
703 channel.currentWriteEvent = null;
704 evt = null;
705 future.setFailure(cause);
706 fireExceptionCaught = true;
707 }
708
709 Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
710 if (!writeBuffer.isEmpty()) {
711
712
713 if (cause == null) {
714 if (channel.isOpen()) {
715 cause = new NotYetConnectedException();
716 } else {
717 cause = new ClosedChannelException();
718 }
719 }
720
721 for (;;) {
722 evt = writeBuffer.poll();
723 if (evt == null) {
724 break;
725 }
726 evt.getFuture().setFailure(cause);
727 fireExceptionCaught = true;
728 }
729 }
730 }
731
732 if (fireExceptionCaught) {
733 fireExceptionCaught(channel, cause);
734 }
735 }
736
737 void setInterestOps(final NioDatagramChannel channel,
738 ChannelFuture future, int interestOps) {
739
740 boolean changed = false;
741 try {
742
743
744 synchronized (channel.interestOpsLock) {
745 final Selector selector = this.selector;
746 final SelectionKey key = channel.getDatagramChannel().keyFor(selector);
747
748 if (key == null || selector == null) {
749
750
751 channel.setRawInterestOpsNow(interestOps);
752 return;
753 }
754
755
756 interestOps &= ~Channel.OP_WRITE;
757 interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE;
758
759 switch (NioProviderMetadata.CONSTRAINT_LEVEL) {
760 case 0:
761 if (channel.getRawInterestOps() != interestOps) {
762
763 key.interestOps(interestOps);
764
765
766
767 if (Thread.currentThread() != thread &&
768 wakenUp.compareAndSet(false, true)) {
769 selector.wakeup();
770 }
771 changed = true;
772 }
773 break;
774 case 1:
775 case 2:
776 if (channel.getRawInterestOps() != interestOps) {
777 if (Thread.currentThread() == thread) {
778
779
780 key.interestOps(interestOps);
781 changed = true;
782 } else {
783
784
785 selectorGuard.readLock().lock();
786 try {
787 if (wakenUp.compareAndSet(false, true)) {
788 selector.wakeup();
789 }
790 key.interestOps(interestOps);
791 changed = true;
792 } finally {
793 selectorGuard.readLock().unlock();
794 }
795 }
796 }
797 break;
798 default:
799 throw new Error();
800 }
801 if (changed) {
802 channel.setRawInterestOpsNow(interestOps);
803 }
804 }
805
806 future.setSuccess();
807 if (changed) {
808 fireChannelInterestChanged(channel);
809 }
810 } catch (final CancelledKeyException e) {
811
812 ClosedChannelException cce = new ClosedChannelException();
813 future.setFailure(cce);
814 fireExceptionCaught(channel, cce);
815 } catch (final Throwable t) {
816 future.setFailure(t);
817 fireExceptionCaught(channel, t);
818 }
819 }
820
821
822
823
824
825 private final class ChannelRegistionTask implements Runnable {
826 private final NioDatagramChannel channel;
827
828 private final ChannelFuture future;
829
830 ChannelRegistionTask(final NioDatagramChannel channel,
831 final ChannelFuture future) {
832 this.channel = channel;
833 this.future = future;
834 }
835
836
837
838
839
840
841 public void run() {
842 final SocketAddress localAddress = channel.getLocalAddress();
843 if (localAddress == null) {
844 if (future != null) {
845 future.setFailure(new ClosedChannelException());
846 }
847 close(channel, succeededFuture(channel));
848 return;
849 }
850
851 try {
852 synchronized (channel.interestOpsLock) {
853 channel.getDatagramChannel().register(
854 selector, channel.getRawInterestOps(), channel);
855 }
856 if (future != null) {
857 future.setSuccess();
858 }
859 } catch (final ClosedChannelException e) {
860 if (future != null) {
861 future.setFailure(e);
862 }
863 close(channel, succeededFuture(channel));
864 throw new ChannelException(
865 "Failed to register a socket to the selector.", e);
866 }
867 }
868 }
869 }