网站顶部怎么做新浪链接,做交互的网站,wordpress怎么给栏目添加tdk,简述网站建设的基本流程图前言
上篇[【从入门到放弃-Java】并发编程-NIO使用]()简单介绍了nio的基础使用#xff0c;本篇将深入源码分析nio中channel的实现。
简介
channel即通道#xff0c;可以用来读、写数据#xff0c;它是全双工的可以同时用来读写操作。这也是它与stream流的最大区别。
cha…前言
上篇[【从入门到放弃-Java】并发编程-NIO使用]()简单介绍了nio的基础使用本篇将深入源码分析nio中channel的实现。
简介
channel即通道可以用来读、写数据它是全双工的可以同时用来读写操作。这也是它与stream流的最大区别。
channel需要与buffer配合使用channel通道的一端是buffer一端是数据源实体如文件、socket等。在nio中通过channel的不同实现来处理 不同实体与数据buffer中的数据传输。
channel接口
package java.nio.channels;import java.io.IOException;
import java.io.Closeable;/*** A nexus for I/O operations.** p A channel represents an open connection to an entity such as a hardware* device, a file, a network socket, or a program component that is capable of* performing one or more distinct I/O operations, for example reading or* writing.** p A channel is either open or closed. A channel is open upon creation,* and once closed it remains closed. Once a channel is closed, any attempt to* invoke an I/O operation upon it will cause a {link ClosedChannelException}* to be thrown. Whether or not a channel is open may be tested by invoking* its {link #isOpen isOpen} method.** p Channels are, in general, intended to be safe for multithreaded access* as described in the specifications of the interfaces and classes that extend* and implement this interface.*** author Mark Reinhold* author JSR-51 Expert Group* since 1.4*/public interface Channel extends Closeable {/*** Tells whether or not this channel is open.** return tttrue/tt if, and only if, this channel is open*/public boolean isOpen();/*** Closes this channel.** p After a channel is closed, any further attempt to invoke I/O* operations upon it will cause a {link ClosedChannelException} to be* thrown.** p If this channel is already closed then invoking this method has no* effect.** p This method may be invoked at any time. If some other thread has* already invoked it, however, then another invocation will block until* the first invocation is complete, after which it will return without* effect. /p** throws IOException If an I/O error occurs*/public void close() throws IOException;}
常见的channel实现有
FileChannel文件读写数据通道SocketChannelTCP读写网络数据通道ServerSocketChannel服务端网络数据读写通道可以监听TCP连接。对每一个新进来的连接都会创建一个SocketChannel。DatagramChannelUDP读写网络数据通道
FileChannel FileChannel是一个抽象类它继承了AbstractInterruptibleChannel类并实现了 SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel接口。 具体的实现类主要是sun.nio.ch.FileChannelImpl。下面详细分析下FileChannelImpl中每个方法的具体实现。
open
private FileChannelImpl(FileDescriptor var1, String var2, boolean var3, boolean var4, boolean var5, Object var6) {//主要记载操作系统维护的文件描述符this.fd var1;//是否可读this.readable var3;//是否可写this.writable var4;//是否以追加的方式打开this.append var5;this.parent var6;this.path var2;//底层使用native的read和write来处理文件的this.nd new FileDispatcherImpl(var5);
}//FileInputStream::getChannel 调用 FileChannelImpl.open(fd, path, true, false, this) 获取只读channel
public static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, Object var4) {return new FileChannelImpl(var0, var1, var2, var3, false, var4);
}//FileOutputStream::getChannel 调用 FileChannelImpl.open(fd, path, false, true, append, this) 获取只写channel
public static FileChannel open(FileDescriptor var0, String var1, boolean var2, boolean var3, boolean var4, Object var5) {return new FileChannelImpl(var0, var1, var2, var3, var4, var5);
}
private FileChannelImpl(FileDescriptor fd, String path, boolean readable,boolean writable, boolean direct, Object parent)
{this.fd fd;//是否可读this.readable readable;//是否可写this.writable writable;//对于从流创建的channel在结束时要做不同的清理动作openJDK中才有sun的jdk中没有this.parent parent;//源文件的paththis.path path;//是否使用DirectIOthis.direct direct;this.nd new FileDispatcherImpl();if (direct) {assert path ! null;this.alignment nd.setDirectIO(fd, path);} else {this.alignment -1;}//当parent不存在时则注册一个cleaner否则交由parent做清理动作。// Register a cleaning action if and only if there is no parent// as the parent will take care of closing the file descriptor.// FileChannel is used by the LambdaMetaFactory so a lambda cannot// be used here hence we use a nested class instead.this.closer parent ! null ? null :CleanerFactory.cleaner().register(this, new Closer(fd));
}// Used by FileInputStream.getChannel(), FileOutputStream.getChannel
// and RandomAccessFile.getChannel()
public static FileChannel open(FileDescriptor fd, String path,boolean readable, boolean writable,boolean direct, Object parent)
{return new FileChannelImpl(fd, path, readable, writable, direct, parent);
}
open方法主要是返回一个新new的FileChannelImpl对象初始化时设置fileDescriptor、readable、writable、append、parent、path等属性看变量名很容易理解在此不赘述变量含义。
read
//实现自SeekableByteChannel接口的方法将文件中的内容读取到给定的byteBuffer中
public int read(ByteBuffer dst) throws IOException {//保证读写时channel处于开启状态ensureOpen();//判断是否可读if (!readable)throw new NonReadableChannelException();synchronized (positionLock) {if (direct)Util.checkChannelPositionAligned(position(), alignment);int n 0;int ti -1;try {//开始阻塞并注册为Interruptible可以被中断beginBlocking();//将当前线程添加到NativeThreadSet中并返回索引方便后续操作。//NativeThreadSet是一个线程安全的本地线程集合方便管理用来发送信号ti threads.add();if (!isOpen())return 0;do {//当未被系统中断即读取完毕或channel未被关闭则一直读将内容写入到byteBufferdst中n IOUtil.read(fd, dst, -1, direct, alignment, nd);} while ((n IOStatus.INTERRUPTED) isOpen());return IOStatus.normalize(n);} finally {//把当前线程从set中移出threads.remove(ti);//结束释放锁endBlocking(n 0);assert IOStatus.check(n);}}
}//实现自ScatteringByteChannel接口的方法将文件中的内容依次读取到给定的byteBuffer数组中。
public long read(ByteBuffer[] dsts, int offset, int length)throws IOException
{if ((offset 0) || (length 0) || (offset dsts.length - length))throw new IndexOutOfBoundsException();//保证读写时channel处于开启状态ensureOpen();//判断是否可读if (!readable)throw new NonReadableChannelException();synchronized (positionLock) {if (direct)Util.checkChannelPositionAligned(position(), alignment);long n 0;int ti -1;try {//开始阻塞并注册为Interruptible可以被中断beginBlocking();//将当前线程添加到NativeThreadSet中并返回索引方便后续操作。//NativeThreadSet是一个线程安全的本地线程集合方便管理用来发送信号ti threads.add();if (!isOpen())return 0;do {//当未被系统中断即读取完毕或channel未被关闭则一直读将内容写入到byteBufferdst中n IOUtil.read(fd, dsts, offset, length,direct, alignment, nd);} while ((n IOStatus.INTERRUPTED) isOpen());return IOStatus.normalize(n);} finally {//把当前线程从set中移出threads.remove(ti);//结束释放锁endBlocking(n 0);assert IOStatus.check(n);}}
}
write
//实现自SeekableByteChannel接口的方法将byteBuffer中的内容写入到文件中
public int write(ByteBuffer src) throws IOException {//保证写时channel处于开启状态ensureOpen();//判断是否可写if (!writable)throw new NonWritableChannelException();synchronized (positionLock) {if (direct)Util.checkChannelPositionAligned(position(), alignment);int n 0;int ti -1;try {//开始阻塞并注册为Interruptible可以被中断beginBlocking();//将当前线程添加到NativeThreadSet中并返回索引方便后续操作。//NativeThreadSet是一个线程安全的本地线程集合方便管理用来发送信号ti threads.add();if (!isOpen())return 0;do {//当未被系统中断即写入完毕或channel未被关闭则一直写将内容写入到文件中n IOUtil.write(fd, src, -1, direct, alignment, nd);} while ((n IOStatus.INTERRUPTED) isOpen());return IOStatus.normalize(n);} finally {//把当前线程从set中移出threads.remove(ti);//结束释放锁assert IOStatus.check(n);}}
}//实现自GatheringByteChannel接口的方法将byteBuffer数组中的内容依次写入到文件中
public long write(ByteBuffer[] srcs, int offset, int length)throws IOException
{if ((offset 0) || (length 0) || (offset srcs.length - length))throw new IndexOutOfBoundsException();//保证写时channel处于开启状态ensureOpen();//判断是否可写if (!writable)throw new NonWritableChannelException();synchronized (positionLock) {if (direct)Util.checkChannelPositionAligned(position(), alignment);long n 0;int ti -1;try {//开始阻塞并注册为Interruptible可以被中断beginBlocking();//将当前线程添加到NativeThreadSet中并返回索引方便后续操作。//NativeThreadSet是一个线程安全的本地线程集合方便管理用来发送信号ti threads.add();if (!isOpen())return 0;do {//当未被系统中断即写入完毕或channel未被关闭则一直写将内容写入到文件中n IOUtil.write(fd, srcs, offset, length,direct, alignment, nd);} while ((n IOStatus.INTERRUPTED) isOpen());return IOStatus.normalize(n);} finally {//把当前线程从set中移出threads.remove(ti);//结束释放锁assert IOStatus.check(n);}}
}
position
//实现自SeekableByteChannel接口的方法获取当前channel的position
public long position() throws IOException {ensureOpen();synchronized (positionLock) {long p -1;int ti -1;try {beginBlocking();ti threads.add();if (!isOpen())return 0;boolean append fdAccess.getAppend(fd);do {//append模式下position在channel的末尾// in append-mode then position is advanced to end before writingp (append) ? nd.size(fd) : nd.seek(fd, -1);} while ((p IOStatus.INTERRUPTED) isOpen());return IOStatus.normalize(p);} finally {threads.remove(ti);endBlocking(p -1);assert IOStatus.check(p);}}
}//实现自SeekableByteChannel接口的方法设置当前channel的position为newPosition
public FileChannel position(long newPosition) throws IOException {ensureOpen();if (newPosition 0)throw new IllegalArgumentException();synchronized (positionLock) {long p -1;int ti -1;try {beginBlocking();ti threads.add();if (!isOpen())return null;do {//设置当前position为newPositionp nd.seek(fd, newPosition);} while ((p IOStatus.INTERRUPTED) isOpen());return this;} finally {threads.remove(ti);endBlocking(p -1);assert IOStatus.check(p);}}
}
size
实现自SeekableByteChannel接口的方法返回当前实体文件的大小
truncate
实现自SeekableByteChannel接口的方法用来截取文件至newSize大小
force
实现自SeekableByteChannel接口的方法用来将channel中尚未写入磁盘的数据强制落盘
transferTo
将fileChannel中的数据传递至另一个channel
transferFrom
从其它channel读取数据至fileChannel
SocketChannel open
/*** Opens a socket channel.** p The new channel is created by invoking the {link* java.nio.channels.spi.SelectorProvider#openSocketChannel* openSocketChannel} method of the system-wide default {link* java.nio.channels.spi.SelectorProvider} object. /p** return A new socket channel** throws IOException* If an I/O error occurs*/
public static SocketChannel open() throws IOException {return SelectorProvider.provider().openSocketChannel();
}
open方法是调用SelectorProvider中实现了java.nio.channels.spi.SelectorProvider#openSocketChannel的方法底层实际是new SocketChannelImpl调用native方法创建socket
connect
public boolean connect(SocketAddress sa) throws IOException {//校验Address是否合法InetSocketAddress isa Net.checkAddress(sa);//获取系统安全管理器SecurityManager sm System.getSecurityManager();if (sm ! null)//校验IP和端口是否被允许连接sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());InetAddress ia isa.getAddress();//如果是本机地址则获取本机的hostif (ia.isAnyLocalAddress())ia InetAddress.getLocalHost();try {//加读锁readLock.lock();try {//加写锁writeLock.lock();try {int n 0;//是否阻塞boolean blocking isBlocking();try {//开启connect前的校验并设置为ST_CONNECTIONPENDING如果blocking是true 即阻塞模式则记录当前线程的ID以便接收信号处理。beginConnect(blocking, isa);do {//调用native connect方法n Net.connect(fd, ia, isa.getPort());} while (n IOStatus.INTERRUPTED isOpen());} finally {//结束连接endConnect(blocking, (n 0));}assert IOStatus.check(n);return n 0;} finally {//释放写锁writeLock.unlock();}} finally {//释放读锁readLock.unlock();}} catch (IOException ioe) {// connect failed, close the channelclose();throw SocketExceptions.of(ioe, isa);}
}
configureBlocking
实现自SelectableChannel的接口方法调用native方法设置socket的阻塞状态
register
在AbstractSelectableChannel中定义注册要监听的事件。
public final SelectionKey register(Selector sel, int ops, Object att)throws ClosedChannelException
{if ((ops ~validOps()) ! 0)throw new IllegalArgumentException();if (!isOpen())throw new ClosedChannelException();synchronized (regLock) {if (isBlocking())throw new IllegalBlockingModeException();synchronized (keyLock) {// re-check if channel has been closedif (!isOpen())throw new ClosedChannelException();SelectionKey k findKey(sel);if (k ! null) {k.attach(att);k.interestOps(ops);} else {// 向Selector中注册事件// New registrationk ((AbstractSelector)sel).register(this, ops, att);addKey(k);}return k;}}
}
read
//实现自ReadableByteChannel接口的方法从socket中读取数据至ByteBuffer
Override
public int read(ByteBuffer buf) throws IOException {Objects.requireNonNull(buf);readLock.lock();try {boolean blocking isBlocking();int n 0;try {//检查channel是否开启并已经是connected的状态。如果blocking是true 即阻塞模式则记录当前线程的ID以便接收信号处理。beginRead(blocking);// check if input is shutdownif (isInputClosed)return IOStatus.EOF;//如果是阻塞模式则一直读取直到数据读取完毕非阻塞模式则直接调用native方法不需要等待。if (blocking) {do {n IOUtil.read(fd, buf, -1, nd);} while (n IOStatus.INTERRUPTED isOpen());} else {n IOUtil.read(fd, buf, -1, nd);}} finally {endRead(blocking, n 0);if (n 0 isInputClosed)return IOStatus.EOF;}return IOStatus.normalize(n);} finally {readLock.unlock();}
}//实现自ScatteringByteChannel接口的方法从socket中依次读取数据至ByteBuffer数组
Override
public long read(ByteBuffer[] dsts, int offset, int length)throws IOException
{Objects.checkFromIndexSize(offset, length, dsts.length);readLock.lock();try {boolean blocking isBlocking();long n 0;try {beginRead(blocking);// check if input is shutdownif (isInputClosed)return IOStatus.EOF;//如果是阻塞模式则一直读取直到数据读取完毕非阻塞模式则直接调用native方法不需要等待。if (blocking) {do {n IOUtil.read(fd, dsts, offset, length, nd);} while (n IOStatus.INTERRUPTED isOpen());} else {n IOUtil.read(fd, dsts, offset, length, nd);}} finally {endRead(blocking, n 0);if (n 0 isInputClosed)return IOStatus.EOF;}return IOStatus.normalize(n);} finally {readLock.unlock();}
}
write
//实现自ReadableByteChannel接口的方法将ByteBuffer中的数据写入socket
Override
public int write(ByteBuffer buf) throws IOException {Objects.requireNonNull(buf);writeLock.lock();try {boolean blocking isBlocking();int n 0;try {beginWrite(blocking);//如果是阻塞模式则一直读取直到数据读取完毕非阻塞模式则直接调用native方法不需要等待。if (blocking) {do {n IOUtil.write(fd, buf, -1, nd);} while (n IOStatus.INTERRUPTED isOpen());} else {n IOUtil.write(fd, buf, -1, nd);}} finally {endWrite(blocking, n 0);if (n 0 isOutputClosed)throw new AsynchronousCloseException();}return IOStatus.normalize(n);} finally {writeLock.unlock();}
}Override
public long write(ByteBuffer[] srcs, int offset, int length)throws IOException
{Objects.checkFromIndexSize(offset, length, srcs.length);writeLock.lock();try {boolean blocking isBlocking();long n 0;try {beginWrite(blocking);//如果是阻塞模式则一直等待直到数据写入完毕非阻塞模式则直接调用native方法不需要等待。if (blocking) {do {n IOUtil.write(fd, srcs, offset, length, nd);} while (n IOStatus.INTERRUPTED isOpen());} else {n IOUtil.write(fd, srcs, offset, length, nd);}} finally {endWrite(blocking, n 0);if (n 0 isOutputClosed)throw new AsynchronousCloseException();}return IOStatus.normalize(n);} finally {writeLock.unlock();}
}//实现自ReadableByteChannel接口的方法将ByteBuffer数组中的数据依次写入socket
/*** Writes a byte of out of band data.*/
int sendOutOfBandData(byte b) throws IOException {writeLock.lock();try {boolean blocking isBlocking();int n 0;try {beginWrite(blocking);//如果是阻塞模式则一直等待直到数据写入完毕非阻塞模式则直接调用native方法不需要等待。if (blocking) {do {n sendOutOfBandData(fd, b);} while (n IOStatus.INTERRUPTED isOpen());} else {n sendOutOfBandData(fd, b);}} finally {endWrite(blocking, n 0);if (n 0 isOutputClosed)throw new AsynchronousCloseException();}return IOStatus.normalize(n);} finally {writeLock.unlock();}
}
ServerSocketChannel socket
Override
public ServerSocket socket() {synchronized (stateLock) {if (socket null)socket ServerSocketAdaptor.create(this);return socket;}
}
bind
Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {synchronized (stateLock) {ensureOpen();if (localAddress ! null)throw new AlreadyBoundException();InetSocketAddress isa (local null)? new InetSocketAddress(0): Net.checkAddress(local);SecurityManager sm System.getSecurityManager();if (sm ! null)sm.checkListen(isa.getPort());//绑定前做一些前置处理如将tcp socket文件描述符转换成SDPNetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());//绑定IP和地址Net.bind(fd, isa.getAddress(), isa.getPort());//开始监听设置socket上最多可以挂起backlog个连接若backlog小于1 则默认设置50个Net.listen(fd, backlog 1 ? 50 : backlog);localAddress Net.localAddress(fd);}return this;
}
accept
Override
public SocketChannel accept() throws IOException {acceptLock.lock();try {int n 0;FileDescriptor newfd new FileDescriptor();InetSocketAddress[] isaa new InetSocketAddress[1];boolean blocking isBlocking();try {begin(blocking);do {//阻塞等待接收客户端链接n accept(this.fd, newfd, isaa);} while (n IOStatus.INTERRUPTED isOpen());} finally {end(blocking, n 0);assert IOStatus.check(n);}if (n 1)return null;//新接收的socket初始设置为阻塞模式因此非阻塞模式的每次需要显示设置// newly accepted socket is initially in blocking modeIOUtil.configureBlocking(newfd, true);InetSocketAddress isa isaa[0];//用新接收的socket创建SocketChannelSocketChannel sc new SocketChannelImpl(provider(), newfd, isa);// check permitted to accept connections from the remote addressSecurityManager sm System.getSecurityManager();if (sm ! null) {try {sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());} catch (SecurityException x) {sc.close();throw x;}}return sc;} finally {acceptLock.unlock();}
}
ServerSocketChannel并没有read和write方法只是继承了AbstractSelectableChannel以便在selector中使用
DatagramChannel open
public DatagramChannelImpl(SelectorProvider sp)throws IOException
{super(sp);ResourceManager.beforeUdpCreate();try {//如果不支持IPv6则使用IPv4this.family Net.isIPv6Available()? StandardProtocolFamily.INET6: StandardProtocolFamily.INET;//设置非流式的sockettcp是流模式协议udp是数据报模式协议this.fd Net.socket(family, false);this.fdVal IOUtil.fdVal(fd);} catch (IOException ioe) {ResourceManager.afterUdpClose();throw ioe;}
}
receive
public SocketAddress receive(ByteBuffer dst) throws IOException {if (dst.isReadOnly())throw new IllegalArgumentException(Read-only buffer);readLock.lock();try {boolean blocking isBlocking();int n 0;ByteBuffer bb null;try {SocketAddress remote beginRead(blocking, false);boolean connected (remote ! null);SecurityManager sm System.getSecurityManager();if (connected || (sm null)) {// connected or no security managerdo {n receive(fd, dst, connected);} while ((n IOStatus.INTERRUPTED) isOpen());if (n IOStatus.UNAVAILABLE)return null;} else {// Cannot receive into users buffer when running with a// security manager and not connectedbb Util.getTemporaryDirectBuffer(dst.remaining());for (;;) {do {n receive(fd, bb, connected);} while ((n IOStatus.INTERRUPTED) isOpen());if (n IOStatus.UNAVAILABLE)return null;InetSocketAddress isa (InetSocketAddress)sender;try {sm.checkAccept(isa.getAddress().getHostAddress(),isa.getPort());} catch (SecurityException se) {// Ignore packetbb.clear();n 0;continue;}bb.flip();dst.put(bb);break;}}//sender:发送方地址 Set by receive0 (## ugh)assert sender ! null;return sender;} finally {if (bb ! null)Util.releaseTemporaryDirectBuffer(bb);endRead(blocking, n 0);assert IOStatus.check(n);}} finally {readLock.unlock();}
}
send
public int send(ByteBuffer src, SocketAddress target)throws IOException
{Objects.requireNonNull(src);InetSocketAddress isa Net.checkAddress(target, family);writeLock.lock();try {boolean blocking isBlocking();int n 0;try {//当connect后remote会设置为连接的地址SocketAddress remote beginWrite(blocking, false);if (remote ! null) {// connectedif (!target.equals(remote)) {throw new AlreadyConnectedException();}do {n IOUtil.write(fd, src, -1, nd);} while ((n IOStatus.INTERRUPTED) isOpen());} else {// not connectedSecurityManager sm System.getSecurityManager();if (sm ! null) {InetAddress ia isa.getAddress();if (ia.isMulticastAddress()) {sm.checkMulticast(ia);} else {sm.checkConnect(ia.getHostAddress(), isa.getPort());}}do {n send(fd, src, isa);} while ((n IOStatus.INTERRUPTED) isOpen());}} finally {endWrite(blocking, n 0);assert IOStatus.check(n);}return IOStatus.normalize(n);} finally {writeLock.unlock();}
}
connect
Override
public DatagramChannel connect(SocketAddress sa) throws IOException {InetSocketAddress isa Net.checkAddress(sa, family);SecurityManager sm System.getSecurityManager();if (sm ! null) {InetAddress ia isa.getAddress();if (ia.isMulticastAddress()) {sm.checkMulticast(ia);} else {sm.checkConnect(ia.getHostAddress(), isa.getPort());sm.checkAccept(ia.getHostAddress(), isa.getPort());}}readLock.lock();try {writeLock.lock();try {synchronized (stateLock) {ensureOpen();if (state ST_CONNECTED)throw new AlreadyConnectedException();int n Net.connect(family,fd,isa.getAddress(),isa.getPort());if (n 0)throw new Error(); // Cant happen// connectedremoteAddress isa;state ST_CONNECTED;// refresh local addresslocalAddress Net.localAddress(fd);// flush any packets already received.boolean blocking isBlocking();if (blocking) {IOUtil.configureBlocking(fd, false);}try {ByteBuffer buf ByteBuffer.allocate(100);while (receive(buf) ! null) {buf.clear();}} finally {if (blocking) {IOUtil.configureBlocking(fd, true);}}}} finally {writeLock.unlock();}} finally {readLock.unlock();}return this;
}
udp是数据报模式的协议是没有connect的。这里的connect实际上是在底层忽略了与其他地址的数据传输。 在connect后就可以像socketChannel似得使用read和write了
总结
本文学习了各种channel的实现主要是对底层native方法的一些封装针对不同属性的实体文件、socket使用对应的channel与byteBuffer传输数据。再通过byteBuffer与byte数据进行转换。 channel的实现中封装了大量的native方法重要的底层实现全在native中后续可以深入学习下。
本文中出现的byteBuffer和selector将在接下来的文章中单独分析。
原文链接 本文为云栖社区原创内容未经允许不得转载。