1、工程概况
1.1、都是使用的默认的库,就用了一个第3方库:ojdbc14.jar(用于操作Oracle10g[x86])
2、package dr.Client; --> 用于测试的 客户端代码:
Tclient.java
package dr.Client;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.util.concurrent.Future;import utils.TsomeOperate;public class Tclient{ // http://yunhaifeiwu.iteye.com/blog/1714664 public static void main(String[] args) throws Exception { AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); //FuturefutureConn = client.connect(new InetSocketAddress("localhost", 9888)); Future futureConn = client.connect(new InetSocketAddress("192.168.1.233", 9888)); futureConn.get(); // Future .get();等待异步事件的完成 Future futureWrite = client.write(ByteBuffer.wrap("testAA".getBytes())); int iWritten = futureWrite.get(); TsomeOperate.LogConsole("Client send ["+iWritten+"] bytes ."); ByteBuffer bufRead = ByteBuffer.allocate(256); Future futureRead = client.read(bufRead); int iRead = futureRead.get(); TsomeOperate.LogConsole("Client recv ["+iRead+"] bytes : "+bufRead); TsomeOperate.LogConsole("\t "+bufRead.capacity()); //byte[] bytesRead = new byte[iRead+1]; byte[] bytesRead = new byte[256]; bufRead.position(0); // ZC: 我擦,这步操作,搞死我了(一直没想到会需要这步操作...) bufRead.get(bytesRead, 0, iRead); bytesRead[iRead] = (byte)0; TsomeOperate.LogConsole("\t "+new String(bytesRead)); TsomeOperate.LogConsole("\t "+new String(bytesRead, 0, iRead)); bufRead.put(iRead, (byte)0); TsomeOperate.LogConsole("\t "+new String(bufRead.array(), 0, iRead)); Thread.sleep(1000*2); }}
3、package dr.DataSet; --> 打包SQL查询结果
TdrDataSet.java
package dr.DataSet;import java.io.ByteArrayOutputStream;import java.io.InputStream;import java.lang.reflect.Field;import java.math.BigDecimal;import java.sql.Blob;import java.sql.ResultSet;import java.util.Iterator;import java.util.Map.Entry;import jdbcOper.TjdbcOperB;import utils.TsomeOperate;import java.util.TreeMap;public class TdrDataSet{ public static void main(String[] args) throws Exception { TjdbcOperB jdbcB = new TjdbcOperB(); //ResultSet rs = jdbcB.SelectGet("select * from PwDevExMaster", null); //ResultSet rs = jdbcB.SelectGet("select * from file_tbl where rownum<5 order by file_id", null); //ResultSet rs = jdbcB.SelectGet("select * from file_tbl where rownum<5", null); //ResultSet rs = jdbcB.SelectGet("select rownum from file_tbl", null); //byte[] bytes = Pack(rs); //ResultSet rs = jdbcB.SelectGet("select * from file_tbl where rownum<5 order by file_id", null); //ResultSet rs = jdbcB.SelectGet("select * from file_tbl order by file_id", null); ResultSet rs = jdbcB.SelectGet("select * from pwfiles", null); boolean b = false; long lTime01 = System.currentTimeMillis(); while (rs.next()) {// if (! b)// {// byte[] bytes = new byte[255];// b = true;// InputStream instream = rs.getBinaryStream(2);// int iRead = instream.read(bytes);// System.out.println("iRead : "+iRead);// for (int i=0; i0)// System.out.println(lLenBlob+" , "+bytes.length); // *** //Blob bb = rs.getBlob(4); //InputStream instream = bb.getBinaryStream();// InputStream instream = rs.getBinaryStream(2);// if (instream != null)// {// byte[] bytes = new byte[1024 * 1024];// int iRead01 = 0;// while (true)// {// int iRead = instream.read(bytes);// if (iRead == -1)// break;// else// iRead01 += iRead;// }// //System.out.println("iRead01 : "+iRead01);// } // *** //double d = rs.getDouble(1); BigDecimal bd = rs.getBigDecimal(1); double d = bd.doubleValue(); } long lTime02 = System.currentTimeMillis(); System.out.println("time : "+(lTime02 - lTime01)); String strTest = "蒲州变"; byte[] bytes = strTest.getBytes("utf-8"); for (int i=0; i tm = new TreeMap (); Class clazz = Class.forName("java.sql.Types"); Field[] fields = clazz.getDeclaredFields(); for (int i=0; i > it = tm.entrySet().iterator(); while (it.hasNext()) { Entry entry = it.next(); System.out.println(entry.getValue() +" : "+entry.getKey()); } } // *** public static byte[] Pack(ResultSet _rs) throws Exception { if (! _rs.first()) return null; int iFirstRowNum = _rs.getRow(); if (! _rs.last()) return null; int iLastRowNum = _rs.getRow(); int iRowCnt = iLastRowNum - iFirstRowNum + 1; int iColumnCnt = _rs.getMetaData().getColumnCount(); ByteArrayOutputStream byteArray = new ByteArrayOutputStream(); byte[] bytes4 = new byte[4]; // (A)、结果集 总长度(先空着) byteArray.write(bytes4); // (B)、数据共有多少列 TsomeOperate.Integer2ByteArray(iColumnCnt, bytes4, 0); byteArray.write(bytes4); // (C)、数据共有多少行 TsomeOperate.Integer2ByteArray(iRowCnt, bytes4, 0); byteArray.write(bytes4); // *** *** *** // [1]、DataSet头(字段类型 + 字段名称) // (1.1)、字段类型 int[] iiFieldType = new int[iColumnCnt]; for (int i=0; i
4、package dr.Server; --> 服务端程序入口
Tserver.java
package dr.Server;import java.lang.reflect.Method;import java.net.InetSocketAddress;import java.net.SocketOption;import java.nio.*;import java.nio.channels.*;import java.util.concurrent.*;import dr.Server.CompletionHandler.TacceptHandlerSrv;import dr.Server.CompletionHandler.TreadHandlerSrv;import dr.Server.CompletionHandler.TwriteHandlerSrv;import dr.Server.RemoteClient.TremoteClient;import dr.Server.RemoteClient.TremoteClients;import utils.TsomeOperate;import java.net.StandardSocketOptions;public class Tserver{ public final static int PORT = 9888; public boolean FbStarted = false; AsynchronousChannelGroup FasynchronousChannelGroup = null; int FiThreadPoolSize = 100; // ZC: 这个参数的含义,不太确定是什么意思... AsynchronousServerSocketChannel FserverSocketChannel = null; int FiBacklog = 100; // ZC: 这个参数的含义,不确定是否就是 等待accept的最大数量(相当于Windows中开的accpet的线程数) TremoteClients Fclients = null; public Tserver() throws Exception { FasynchronousChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), FiThreadPoolSize); FserverSocketChannel = AsynchronousServerSocketChannel.open(FasynchronousChannelGroup); // 通过SocketOption类设置一些TCP选项 FserverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR,true); FserverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); // 下面这样设置,会将 其它形式的IP地址的连接 都拒绝... //FserverSocketChannel.bind(new InetSocketAddress("localhost", PORT), FiBacklog); FserverSocketChannel.bind(new InetSocketAddress(PORT), FiBacklog); FbStarted = true; // *** Fclients = new TremoteClients(); } public void PendingAccept() { if (FbStarted && FserverSocketChannel.isOpen()) { FserverSocketChannel.accept(Fclients, new TacceptHandlerSrv(this)); } else { throw new IllegalStateException("Controller has been closed"); } } public void PendingRead(TremoteClient _client) { if (FbStarted && FserverSocketChannel.isOpen()) { TsomeOperate.LogConsole("PendingRead - (1)"); _client.Fskt.read(ByteBuffer.wrap(_client.FbytesRecv, _client.FiRecv, _client.FbytesRecv.length - _client.FiRecv), _client, new TreadHandlerSrv(this)); TsomeOperate.LogConsole("PendingRead - (2)"); } else { throw new IllegalStateException("Controller has been closed"); } } // 投递 带TimeOut的 接收操作 public void PendingRead_timeout(TremoteClient _client) { if (FbStarted && FserverSocketChannel.isOpen()) { _client.Fskt.read(ByteBuffer.wrap(_client.FbytesRecv, _client.FiRecv, _client.FbytesRecv.length - _client.FiRecv), 1, TimeUnit.MILLISECONDS, _client, new TreadHandlerSrv(this)); } else { throw new IllegalStateException("Controller has been closed"); } } public void PendingWrite(TremoteClient _client) { if (FbStarted && FserverSocketChannel.isOpen()) { int iLen = _client.FbytesSend.length - _client.FiHasSendLen; _client.Fskt.write(ByteBuffer.wrap(_client.FbytesSend, _client.FiHasSendLen, iLen), _client, new TwriteHandlerSrv(this)); } else { throw new IllegalStateException("Controller has been closed"); } } public void Send(byte[] _buf, TremoteClient _client) { _client.FbytesSend = _buf; _client.FiHasSendLen = 0; PendingWrite(_client); } // *** 反射,提供函数调用 public static void PendingAccept_r(Object _obj) { try { Class clazz = Tserver.class; Method method = clazz.getMethod("PendingAccept"); method.invoke(_obj); } catch (Exception e) { e.printStackTrace(); } } // 投递接收操作的 2个函数的函数名 public static final String METHODNAME_PEND_READ = "PendingRead"; public static final String METHODNAME_PEND_READ_TIMEOUT = "PendingRead_timeout"; public static final String METHODNAME_PEND_WRITE = "PendingWrite"; public static void PendingReadWrite_r(String _strMethodName, Object _obj, TremoteClient _client) { Class clazz = Tserver.class; try { Method method = clazz.getMethod(_strMethodName, TremoteClient.class); method.invoke(_obj, _client); } catch (Exception e) { e.printStackTrace(); } } public static void Send_r(Object _obj, byte[] _buf, TremoteClient _client) { try { Class clazz = Tserver.class; Method method = clazz.getMethod("Send", byte[].class, TremoteClient.class); //method.invoke(_obj, new Object[]{_buf}, _client); method.invoke(_obj, _buf, _client); } catch (Exception e) { e.printStackTrace(); } } // *** *** *** *** *** *** *** *** *** *** *** public static void main(String args[]) throws Exception { TsomeOperate.LogConsole("main in <<=="); Tserver server = new Tserver(); server.PendingAccept(); // ZC: Why只能投递一个Accept未决操作? //server.PendingAccept(); while (true) { TsomeOperate.LogConsole("main thread"); Thread.sleep(5000); } //TsomeOperate.LogConsole("main out ==>>"); }}
5、package dr.Server.CompletionHandler; --> TCP异步网络事件 的 回调函数
5.1、TacceptHandlerSrv.java
package dr.Server.CompletionHandler;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import dr.Server.Tserver;import dr.Server.RemoteClient.TremoteClient;import dr.Server.RemoteClient.TremoteClients;import utils.TsomeOperate;public class TacceptHandlerSrv implements CompletionHandler{ Object FaioServer = null; Object Fclients = null; public TacceptHandlerSrv(Object _obj) { FaioServer = _obj; } @Override public void completed(AsynchronousSocketChannel _rstNewSkt, TremoteClients _attachment) { try { String strRemoteAddress = _rstNewSkt.getRemoteAddress().toString(); TsomeOperate.LogConsole("("+Thread.currentThread().getId()+")Accept connection from " + strRemoteAddress); //TsomeOperate.LogConsole("TaioAcceptHandler.completed - 1 : "+_attachment); TremoteClient client = _attachment.ClientNew(strRemoteAddress); client.FlLastUse = System.currentTimeMillis(); client.FclientsOwner = _attachment; client.Fskt = _rstNewSkt; TsomeOperate.LogConsole("TaioAcceptHandler.completed - 2"); Tserver.PendingReadWrite_r("PendingRead", FaioServer, client); TsomeOperate.LogConsole("TaioAcceptHandler.completed - 3"); } catch (Exception ex) { ex.printStackTrace(); } finally { Tserver.PendingAccept_r(FaioServer); } } @Override public void failed(Throwable exc, TremoteClients _attachment) { TsomeOperate.LogConsole("TacceptHandlerSrv failed : "); exc.printStackTrace(); Tserver.PendingAccept_r(FaioServer); }}
5.2、TreadHandlerSrv.java
package dr.Server.CompletionHandler;import java.io.IOException;import java.lang.reflect.Method;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import dr.Server.Tserver;import dr.Server.RemoteClient.TremoteClient;import utils.TsomeOperate;public class TreadHandlerSrv implements CompletionHandler{ public static void main(String[] args) { } Object FaioServer = null; public TreadHandlerSrv(Object _obj) { FaioServer = _obj; } @Override public void completed(Integer _iRst, TremoteClient _attachment) { // 参数_iRst: 接收到多少byte的数据 TsomeOperate.LogConsole("TaioReadHandler.completed - _iRst : "+_iRst); if (_iRst < 0) { TsomeOperate.LogConsole("对方关闭socket"); try { _attachment.Fskt.close(); // 去掉对应的客户端信息 _attachment.DelSelf(); } catch (IOException e) { e.printStackTrace(); } } else if (_iRst > 0) { _attachment.FlLastUse = System.currentTimeMillis(); if (_iRst < _attachment.FbytesRecv.length) {// try {// Thread.sleep(1000);// } catch (InterruptedException e) {// e.printStackTrace();// } //_attachment.FbufRecv.Append(_attachment.FbytesRecv, _iRst); // some 操作 ...(业务逻辑) // (1)接收数据 _attachment.Recv(_iRst); // (2)处理数据 _attachment.MsgHandle(_attachment); // 继续投递 接收操作(非timeout) Tserver.PendingReadWrite_r(Tserver.METHODNAME_PEND_READ, FaioServer, _attachment); // 继续投递 timeout的接收操作 //Tserver.PendingReadWrite_r(Tserver.METHODNAME_PEND_READ_TIMEOUT, FaioServer, _attachment); } else// if (_iRst == _attachment.FbytesRecv.length) { // some 操作 ...(业务逻辑) // (1)接收数据 _attachment.Recv(_iRst); // (2)处理数据 _attachment.MsgHandle(_attachment); // ZC: 这里,接收的数据长度正好等于缓冲区的长度==>无法判断是否接收完毕了==>就使用“投递超时接收操作”的方式来做后续处理和判断 // ZC: 我记得WindowsIOCP的处理方式就是这样的,具体以后再说 // 继续投递 接收操作(非timeout) Tserver.PendingReadWrite_r(Tserver.METHODNAME_PEND_READ, FaioServer, _attachment); // 继续投递 timeout的接收操作 //Tserver.PendingReadWrite_r(Tserver.METHODNAME_PEND_READ_TIMEOUT, FaioServer, _attachment); } } } @Override public void failed(Throwable _ex, TremoteClient _attachment) { try {// Class clazz = Class.forName("java.nio.channels.InterruptedByTimeoutException");// if (_ex.getClass() == clazz)// {// // some 操作 ...(业务逻辑)// // (2)处理数据// _attachment.MsgHandle(_attachment);// // // 继续投递 接收操作(非timeout)// Tserver.PendingReadWrite_r(Tserver.METHODNAME_PEND_READ, FaioServer, _attachment);// // // ZC: 操作超时之后,再投递read操作的话,会报错:// // ZC: “java.lang.IllegalStateException: Reading not allowed due to timeout or cancellation”// // ZC: 于是,投递失败 --> 没有接收操作 --> 收不到数据 --> ...// // ZC: 于是暂时,改变策略... 这里不投递了... 也不再投递 timeout的接收操作了...// }// else { TsomeOperate.LogConsole("TaioReadHandler.failed : "+_ex.getClass()); _ex.printStackTrace(); _attachment.Fskt.close(); // 去掉对应的客户端信息 _attachment.DelSelf(); } } catch (Exception e1) { e1.printStackTrace(); } }}
5.3、TwriteHandlerSrv.java --> 暂时 发送数据没有使用回调函数的方式,∴这个类暂时也没有使用到,以后要用的话 还要测试下
package dr.Server.CompletionHandler;import java.nio.channels.CompletionHandler;import dr.Server.Tserver;import dr.Server.RemoteClient.TremoteClient;import utils.TsomeOperate;public class TwriteHandlerSrv implements CompletionHandler{ Object FaioServer = null; public TwriteHandlerSrv(Object _obj) { FaioServer = _obj; } @Override public void completed(Integer _iRst, TremoteClient _attachment) { _attachment.FiHasSendLen += _iRst; if (_attachment.FiHasSendLen > _attachment.FbytesSend.length) { TsomeOperate.LogConsole("TaioWriteHandlerSrv.completed FiSendLen > FbufSend.length"); } else if (_attachment.FiHasSendLen == _attachment.FbytesSend.length) { TsomeOperate.LogConsole("TaioWriteHandlerSrv.completed 发送完毕"); } else { Tserver.PendingReadWrite_r(Tserver.METHODNAME_PEND_WRITE, FaioServer, _attachment); } } @Override public void failed(Throwable _ex, TremoteClient _attachment) { _ex.printStackTrace(); }}
6、package dr.Server.RemoteClient; --> 用于管理各个连接到Server的Client的信息,以及 处理接收到的TCP包
6.1、TremoteClient.java
package dr.Server.RemoteClient;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import dr.DataSet.TdrDataSet;import utils.TsomeOperate;public class TremoteClient{ public long FlLastUse = 0;//System.currentTimeMillis();// 最后一次使用的时间([1]accept/[2]异步接收/[3]同步发送) // *** public TremoteClients FclientsOwner = null; public String FstrRemoteAddress = null; // *** public AsynchronousSocketChannel Fskt = null; public byte[] FbytesRecv = null; public int FiRecv = 0; // 总共接收了多少字节的数据 public int FiFirstPktLen = 0; // 第1个TCP包的长度(0:无效) // 接收到的数据的 暂存缓冲// public Tbuffer FbufRecv = new Tbuffer(); // *** // 每次最多发送多少byte的数据 // ZC: 这个值得取值,还有待商榷(评测),以后再说吧... //public final int SEND_LEN = 1024; public byte[] FbytesSend = null; public int FiHasSendLen = 0; // 已经发送了多少byte的数据了 public TremoteClient(int _iRecvBufLen) { FbytesRecv = new byte[_iRecvBufLen]; } public int DelSelf() { if (FclientsOwner != null) FclientsOwner.ClientDel(this); return 0; } public int SendSync(AsynchronousSocketChannel _skt, byte[] _buf, int _iSendLen) throws Exception { FlLastUse = System.currentTimeMillis(); int iSend = _skt.write(ByteBuffer.wrap(_buf, 0, _iSendLen)).get(); return iSend; } // 接收 数据包 public synchronized void Recv(int _iRecv) { FiRecv += _iRecv; if (FiRecv >= FbytesRecv.length) throw new RuntimeException("接收缓冲区 满了"); } final int TCP_PACKET_HEADER_LEN = 4 * 3; // TCP数据 操作类型:高16位:高一级类型; 低16位:低一级类型 // (正值)正常的 C/S之间的业务逻辑数据 final int OP_TYPE_SQL = 0x00010000; final int OP_TYPE_PUSH = 0x00020000; final int OP_TYPE_HEARTBEAT = 0x00030000; // (负值)C/S之间的 管理数据 final int OP_TYPE_MANAGE = 0x80000000; final int OP_TYPE_MANAGE_CLIENTS = 0x80000001; // c请求s,所有客户端的socket信息(简单) final int OP_TYPE_MANAGE_SOCKET_REQ = 0x80000010; // c-->s,s-->c, 某个客户端的详细socket信息 (request) (搬运工) final int OP_TYPE_MANAGE_SOCKET_RES = 0x80000020; // (response) (搬运工) final int OP_TYPE_MANAGE_BLOCK_REQ = 0x80000030; // c-->s,s-->c, 某个客户端的内存block信息 (request) (搬运工) final int OP_TYPE_MANAGE_BLOCK_RES = 0x80000040; // (response) (搬运工) final int OP_TYPE_MANAGE_SQL = 0x80000002; // 我的SQL语句操作 public void MsgHandle(TremoteClient _client) { TsomeOperate.LogConsole("MsgHandle - FiRecv : "+FiRecv); if (FiRecv < TCP_PACKET_HEADER_LEN) return; if ( (FiFirstPktLen != 0) && (FiRecv < FiFirstPktLen) ) return; int iPktLen = TsomeOperate.ByteArray2Integer(FbytesRecv, 0); int iPktIdx = TsomeOperate.ByteArray2Integer(FbytesRecv, 4); int iPktType = TsomeOperate.ByteArray2Integer(FbytesRecv, 8); TsomeOperate.LogConsole("接收的TCP包总长 : "+iPktLen); TsomeOperate.LogConsole("接收的TCP包序号 : "+iPktIdx); TsomeOperate.LogConsole("接收的TCP包类型 : "+Integer.toHexString(iPktType)); FiFirstPktLen = iPktLen; if (FiRecv < iPktLen) return; FiFirstPktLen = 0; // ZC: 这里肯定会处理一个TCP包了,于是将FiFirstPktLen设为0 if ((iPktType & OP_TYPE_SQL) == OP_TYPE_SQL) { String strContent = new String(FbytesRecv, TCP_PACKET_HEADER_LEN, iPktLen - TCP_PACKET_HEADER_LEN); TsomeOperate.LogConsole("\tRecv msg : "+strContent); System.arraycopy(FbytesRecv, iPktLen, FbytesRecv, 0, FiRecv - iPktLen); SendTest(iPktIdx); } else if ((iPktType & OP_TYPE_MANAGE) == OP_TYPE_MANAGE) { MsgHandle_Manage(iPktType, _client, FbytesRecv, TCP_PACKET_HEADER_LEN, iPktLen - TCP_PACKET_HEADER_LEN); } //else if (iPktType == OP_TYPE_HEARTBEAT) //{} // TCP包处理完毕 FiRecv -= iPktLen; if (FiRecv != 0) { // 剩余数据的移动(如果有的话) System.arraycopy(FbytesRecv, iPktLen, FbytesRecv, 0, FiRecv); } } @SuppressWarnings("finally") public void MsgHandle_Manage(int _iOpType, TremoteClient _client, byte[] _bytes, int _iOffset, int _iLen) { AsynchronousSocketChannel skt = null; int iSendLen = 0; byte[] bytesSend = new byte[1024*1]; if (_iOpType == OP_TYPE_MANAGE_CLIENTS) { byte[] bytesClients = FclientsOwner.ClientsMsg(this); int iPktlen = TCP_PACKET_HEADER_LEN + bytesClients.length; int iPktIdx = 0; int iPktType= _iOpType; TsomeOperate.Integer2ByteArray(iPktlen, bytesSend, 0); TsomeOperate.Integer2ByteArray(iPktIdx, bytesSend, 4); TsomeOperate.Integer2ByteArray(iPktType, bytesSend, 8); System.arraycopy(bytesClients, 0, bytesSend, TCP_PACKET_HEADER_LEN, bytesClients.length); skt = _client.Fskt; iSendLen = iPktlen; } else if ( (_iOpType == OP_TYPE_MANAGE_SOCKET_REQ) | (_iOpType == OP_TYPE_MANAGE_BLOCK_REQ) ) { // ClientA向Server请求ClientB的信息 // ClientA发来Server 的信息:ClientB的RemoteAddress信息 // Server 发去ClientB的信息:ClientA的RemoteAddress信息 int iLenAddressB = TsomeOperate.ByteArray2Integer(_bytes, _iOffset); String strAddressB = new String(_bytes, _iOffset + 4, iLenAddressB - 4); // *** TremoteClient clientB = FclientsOwner.ClientGet(strAddressB); if (clientB == null) { // 未找到 相应目标机的信息 // 返回(长度[负值]+信息) strAddressB = " Dest machine \""+strAddressB+"\" not found ."; byte[] bytesMsgB = strAddressB.getBytes(); int iPktlen = TCP_PACKET_HEADER_LEN + (4 + bytesMsgB.length); int iPktIdx = 0; int iPktType= _iOpType + 0x10; TsomeOperate.Integer2ByteArray(iPktlen, bytesSend, 0); TsomeOperate.Integer2ByteArray(iPktIdx, bytesSend, 4); TsomeOperate.Integer2ByteArray(iPktType, bytesSend, 8); TsomeOperate.Integer2ByteArray(-bytesMsgB.length, bytesSend, TCP_PACKET_HEADER_LEN); System.arraycopy(bytesMsgB, 0, bytesSend, TCP_PACKET_HEADER_LEN + 4, bytesMsgB.length); skt = _client.Fskt; iSendLen = iPktlen; } else { // 转发,后面加上一段 byte[] bytesAddressA = _client.FstrRemoteAddress.getBytes(); int iPktlen = TCP_PACKET_HEADER_LEN + iLenAddressB + (4+bytesAddressA.length); int iPktIdx = 0; int iPktType= _iOpType; TsomeOperate.Integer2ByteArray(iPktlen, bytesSend, 0); TsomeOperate.Integer2ByteArray(iPktIdx, bytesSend, 4); TsomeOperate.Integer2ByteArray(iPktType, bytesSend, 8); // *** System.arraycopy(_bytes, _iOffset, bytesSend, TCP_PACKET_HEADER_LEN, iLenAddressB); // *** TsomeOperate.Integer2ByteArray((bytesAddressA.length + 4), bytesSend, TCP_PACKET_HEADER_LEN + iLenAddressB); System.arraycopy(bytesAddressA, 0, bytesSend, TCP_PACKET_HEADER_LEN + iLenAddressB + 4, bytesAddressA.length); skt = clientB.Fskt; iSendLen = iPktlen; } } else if ( (_iOpType == OP_TYPE_MANAGE_SOCKET_RES) | (_iOpType == OP_TYPE_MANAGE_BLOCK_RES) ) { // ClientA向Server请求ClientB的信息 // ClientB发来Server 的信息:ClientB的 socket/内存block 详细信息(长度+内容[ip信息+mac信息]) + ClientA的RemoteAddress信息(长度+内容) // Server 发去ClientA的信息:ClientB的 socket/内存block 详细信息(长度+内容[ip信息+mac信息]) + ClientB的RemoteAddress信息(长度+内容) int iLenMsgB = TsomeOperate.ByteArray2Integer(_bytes, _iOffset); int iLenAddressB = TsomeOperate.ByteArray2Integer(_bytes, _iOffset + iLenMsgB); int iLenAddressA = TsomeOperate.ByteArray2Integer(_bytes, _iOffset + iLenMsgB + iLenAddressB); String strAddressA = new String(_bytes, _iOffset + iLenMsgB + iLenAddressB + 4, iLenAddressA - 4); byte[] bytesSend01 = new byte[TCP_PACKET_HEADER_LEN + iLenMsgB + 32]; int iPktlen = TCP_PACKET_HEADER_LEN + iLenMsgB + iLenAddressB; int iPktIdx = 0; int iPktType= _iOpType; TsomeOperate.Integer2ByteArray(iPktlen, bytesSend01, 0); TsomeOperate.Integer2ByteArray(iPktIdx, bytesSend01, 4); TsomeOperate.Integer2ByteArray(iPktType, bytesSend01, 8); // *** System.arraycopy(_bytes, _iOffset, bytesSend01, TCP_PACKET_HEADER_LEN, iLenMsgB); // *** System.arraycopy(_bytes, _iOffset + iLenMsgB, bytesSend01, TCP_PACKET_HEADER_LEN + iLenMsgB, iLenAddressB); TremoteClient clientA = FclientsOwner.ClientGet(strAddressA); try { SendSync(clientA.Fskt, bytesSend01, iPktlen); } catch (Exception e) { e.printStackTrace(); } finally { return; } } else if (_iOpType == OP_TYPE_MANAGE_SQL) { int iLenSql = TsomeOperate.ByteArray2Integer(_bytes, _iOffset); String strSql = new String(_bytes, _iOffset + 4, iLenSql - 4); // *** byte[] bytesRtn = TdrDataSet.SelectZ(strSql); int iPktlen = TCP_PACKET_HEADER_LEN + bytesRtn.length; int iPktIdx = 0; int iPktType= OP_TYPE_MANAGE_SQL; TsomeOperate.Integer2ByteArray(iPktlen, bytesSend, 0); TsomeOperate.Integer2ByteArray(iPktIdx, bytesSend, 4); TsomeOperate.Integer2ByteArray(iPktType, bytesSend, 8); if (bytesRtn != null) { try { SendSync(_client.Fskt, bytesSend, TCP_PACKET_HEADER_LEN); SendSync(_client.Fskt, bytesRtn, bytesRtn.length); } catch (Exception e) { e.printStackTrace(); } finally { return; } } } else // 具体如何处理,再议 { String str = "Unsupport operate"; int iPktlen = TCP_PACKET_HEADER_LEN + str.length(); int iPktIdx = 0; int iPktType= _iOpType; TsomeOperate.Integer2ByteArray(iPktlen, bytesSend, 0); TsomeOperate.Integer2ByteArray(iPktIdx, bytesSend, 4); TsomeOperate.Integer2ByteArray(iPktType, bytesSend, 8); // *** System.arraycopy(str.getBytes(), 0, bytesSend, 0, str.length()); skt = _client.Fskt; iSendLen = iPktlen; } try { SendSync(skt, bytesSend, iSendLen); } catch (Exception e) { e.printStackTrace(); } }// *** int[] Fnum01 = new int[10]; byte[] FbytesSend01 = new byte[1024 * 1024 * 2]; public int SendTest(int _iPktIdx) { int iGroup = FbytesSend01.length / 10; for (int i=0; i=0; j--) { if (Fnum01[j] > 9) { if (j != 0) Fnum01[j-1] ++; } } for (int j=0; j<=9; j++) { FbytesSend01[i*10 + j] = (byte)(48+Fnum01[j]); } } int iYuShu = FbytesSend01.length % 10; if (iYuShu != 0) for (int i=0; i
6.2、TremoteClients.java
package dr.Server.RemoteClient;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.util.HashMap;import java.util.Iterator;import java.util.Map.Entry;import utils.TsomeOperate;public class TremoteClients{// 该类用于 服务端 管理 远程客户端的信息 // *** *** *** //HashMap?HashTable? HashMapFhash = new HashMap (); //HashSet Fset = new HashSet (); //TreeSet Fset = new TreeSet (); public final int RECV_BUF_LEN = 1024 * 4; public TremoteClient ClientNew(String _strRemoteAddress) { TremoteClient rst = new TremoteClient(RECV_BUF_LEN); rst.FstrRemoteAddress = _strRemoteAddress; boolean bNotAlreadyContain = ClientAdd(_strRemoteAddress, rst); if (bNotAlreadyContain) return rst; else { rst = null; return null; } } synchronized boolean ClientAdd(String _strRemoteAddress, TremoteClient _client) { TremoteClient rtn = Fhash.put(_strRemoteAddress, _client); if (rtn == _client) return false; return true; } synchronized boolean ClientDel(TremoteClient _client) { TremoteClient rtn = Fhash.remove(_client.FstrRemoteAddress); if (rtn == null) return false; return true; } synchronized byte[] ClientsMsg(TremoteClient _client) { long l = System.currentTimeMillis(); byte[] byte8 = new byte[8]; TsomeOperate.Long2ByteArray(l, byte8, 0); ByteArrayOutputStream ba = new ByteArrayOutputStream(); try { ba.write(byte8); Iterator > it = Fhash.entrySet().iterator(); while (it.hasNext()) { Entry entry = it.next(); String str; TremoteClient client = entry.getValue(); if (client == _client) str = entry.getKey() + " <=="; else str = entry.getKey(); TsomeOperate.Long2ByteArray(client.FlLastUse, byte8, 0); byte[] byte4 = new byte[4]; TsomeOperate.Integer2ByteArray(str.length(), byte4, 0); ba.write(byte8); // 该TremoteClient 最后一次使用的时间 ba.write(byte4); // 长度 ba.write(str.getBytes()); // 内容 } } catch (IOException e) { e.printStackTrace(); } return ba.toByteArray();// 需要手动 ba.Close() 吗? } synchronized TremoteClient ClientGet(String _strRemoteAddress) { return Fhash.get(_strRemoteAddress); } public static void main(String args[]) throws Exception { TsomeOperate.LogConsole(Long.SIZE+""); }}
7、package jdbcOper; --> 基本的JDBC数据库操作,分装了及各类 方便使用
7.1、TjdbcOperA.java
package jdbcOper;import java.sql.*;import java.util.ArrayList;// ZC: 基类(基本全是static方法)public class TjdbcOperA{ public void finalize() { System.out.println("TjdbcOperA.finalize"); //ConnectionClose(); } private static Class Fclazz = null; // 加载驱动 protected static Boolean DriverLoad(String _strDriver) { // _strDriver 类似 "oracle.jdbc.driver.OracleDriver" if (Fclazz == null) { try { Fclazz = Class.forName(_strDriver); } catch (ClassNotFoundException e) { Fclazz = null; e.printStackTrace(); } } return (Fclazz != null); } private static Connection Fconn = null; protected static Connection ConnectionOpen(String _strUrl, String _strUserName, String _strPassword) { // _strUrl 类似 "jdbc:oracle:thin:@localhost:1521:ORCLHSP" // _strUserName 类似 "scott" // _strPassword 类似 "tiger" if (Fconn == null) { try { Fconn = DriverManager.getConnection(_strUrl, _strUserName, _strPassword); } catch (SQLException e) { Fconn = null; e.printStackTrace(); } } return Fconn; } protected static void ConnectionClose() { if (Fconn != null) { try { Fconn.close(); } catch (SQLException e) { e.printStackTrace(); } Fconn = null; } } // ZC: 单条的 insert语句 // ZC: 返回 成功的SQL语句条数 protected static int InsertSingle(Connection _conn, String _strInsert, String[] _strsParameter) { int iRtn = 0; PreparedStatement ps = null; try { _conn.setAutoCommit(true); ps = _conn.prepareStatement(_strInsert); if (_strsParameter != null) { for (int i=0; i<_strsParameter.length; i++) ps.setString(i+1, _strsParameter[i]); } iRtn = ps.executeUpdate(); } catch (SQLException e) { e.printStackTrace(); } finally { if (ps != null) { try { ps.close(); } catch (SQLException e) { //e.printStackTrace(); } // ZC: (1)、若 上面关闭成功了,这里 赋值为 null,没有任何效果 // ZC: (2)、若 上面关闭失败,这里 赋值为 null,以便让垃圾回收机制来处理 // ZC: 这里的 ps是局部变量,函数退出后,栈空间回收,也没有变量再指向 PreparedStatement实例 了,应该不用显示的赋值为null了? // ZC: 观察到ps的关闭需要调用close()函数,是否 和 普通的类实例不同?函数退出时 不会自动关闭? // ZC: 然而,记得在哪里见过函数退出后 还能保持局部变量的 机制,是在 C#里还是在java里? ps = null; } } return iRtn; } // ZC: executeUpdate() 执行单条 insert/delete/update语句 // ZC: 返回 成功的SQL语句条数 protected static int ExecuteUpdate_Single(Connection _conn, String _str, String[] _strsParameter) { PreparedStatement ps = null; try { _conn.setAutoCommit(true); ps = _conn.prepareStatement(_str); for (int i=0; i<_strsParameter.length; i++) ps.setString(i+1, _strsParameter[i]); int iRtn = ps.executeUpdate(); return iRtn; } catch (SQLException e) { e.printStackTrace(); return 0; } } protected static int ExecuteUpdate_SingleNoParameter(Connection _conn, String _str) { PreparedStatement ps = null; try { _conn.setAutoCommit(true); ps = _conn.prepareStatement(_str); int iRtn = ps.executeUpdate(); return iRtn; } catch (SQLException e) { e.printStackTrace(); return 0; } } // ZC: 使用的是 单个 PreparedStatement (多个 PreparedStatement 怎么耍?) // ZC: executeUpdate() 执行多条 insert/delete/update语句 // ZC: 返回 成功的SQL语句条数(ZC: 如果是insert/delete/update语句的组合,那返回的条数如何计算?加起来?) protected static int ExecuteUpdate_Multi(Connection _conn, String[] _strsSql, String[][] _strssParameter) { int iRtn = 0; PreparedStatement ps = null; try { _conn.setAutoCommit(false); for (int i=0; i<_strsSql.length; i++) { if (_strssParameter[i] != null) { ps = _conn.prepareStatement(_strsSql[i]); for (int j=0; j<_strssParameter[i].length; j++) ps.setString(j+1, _strssParameter[i][j]); // ZC: 每次得到 PreparedStatement,都要 执行executeUpdate(),不然 它只会执行会有一条语句 iRtn += ps.executeUpdate(); // ZC: 每次获得的都是 新的实例,记得关闭 ps.close(); ps = null; } } _conn.commit(); } catch (SQLException e) { e.printStackTrace(); try { iRtn = 0; // ZC: 如果回滚也异常了,那怎么办?已经执行的语句 就没办法撤回了? // ZC: 还是说不执行commit()的话,SQL语句默认没有提交?可能是的. _conn.rollback(); } catch (SQLException e1) { e1.printStackTrace(); } } finally { if (ps != null) {// try {// ps.close();// } catch (SQLException e) {// e.printStackTrace();// } // 上面已经有关闭的代码了,这里就不必再关闭了,赋值为null后若有需要 就交给JVM处理吧 ps = null; } } return iRtn; } // ZC: 貌似 PreparedStatement.addBatch(); 和 PreparedStatement.addBatch(sql); 不能组合使用。 public static int Batch_SameSQL(Connection _conn, String _strSql, String[][] _strssParameter) { int iRtn = 0; PreparedStatement ps = null; try { _conn.setAutoCommit(false); ps = _conn.prepareStatement(_strSql); for (int i=0; i<_strssParameter.length; i++) { if (_strssParameter[i] != null) { for (int j=0; j<_strssParameter[i].length; j++) ps.setString(j+1, _strssParameter[i][j]); ps.addBatch(); } } int[] iRtns = ps.executeBatch(); for (int i=0; i∵ PreparedStatement.addBatch(); 和 PreparedStatement.addBatch(sql); 不能组合使用。 // ZC: PreparedStatement.addBatch(sql); 测试下来,貌似只能加入不带参数的SQL语句 // ZC: 貌似 一旦执行了"PreparedStatement.addBatch(sql2);","Connection.prepareStatement(sql1)"中传入的参数sql1就无效了... public static int Batch_DifferentSQLs(Connection _conn, ArrayList _listSql) { int iRtn = 0;// PreparedStatement ps = null; Statement st = null; try {// _conn.setAutoCommit(false);// ps = _conn.prepareStatement(_listSql.get(0));// // for (int i=0; i<_listSql.size(); i++)// ps.addBatch(_listSql.get(i));// // int[] iRtns = ps.executeBatch();// for (int i=0; i
7.2、TjdbcOperB.java
package jdbcOper;import java.sql.*;import java.util.ArrayList;// ZC: 子类(没有static方法)public class TjdbcOperB extends TjdbcOperA{ // 下面的这些变量,(1)可以写在代码里,(2)也可以写在属性文件里,(3)或者xml文件里 等等 //* // (1)、Oracle相关配置 String FstrDriver = "oracle.jdbc.driver.OracleDriver"; String FstrConnUrl = "jdbc:oracle:thin:@192.168.1.201:1521:ZheJiang"; String FstrConnUsername = "testWFpas";//"WenZhouPDPas"; //String FstrConnUsername = "WenZhouPDPas"; String FstrConnPassword = "dongruisoft.com"; //*/ /* // (2)、MySQL相关配置 String FstrDriver = "com.mysql.jdbc.Driver"; String FstrConnUrl = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8"; String FstrConnUsername = "root"; String FstrConnPassword = ""; //*/ public int InsertSingle(String _strInsert, String[] _strsParameter) { if (! DriverLoad(FstrDriver)) return 0; Connection conn = ConnectionOpen(FstrConnUrl, FstrConnUsername, FstrConnPassword); if (conn == null) return 0; return super.InsertSingle(conn, _strInsert, _strsParameter); } public int ExecuteUpdate_Multi(String[] _strSql, String[][] _strsParameter) { if (! DriverLoad(FstrDriver)) return 0; Connection conn = ConnectionOpen(FstrConnUrl, FstrConnUsername, FstrConnPassword); if (conn == null) return 0; return super.ExecuteUpdate_Multi(conn, _strSql, _strsParameter); } TselectResult FselRst = null; public ResultSet SelectGet(String _strSql, String[] _strsParameter) { if (! DriverLoad(FstrDriver)) return null; Connection conn = ConnectionOpen(FstrConnUrl, FstrConnUsername, FstrConnPassword); if (conn == null) return null; FselRst = super.SelectGet(conn, _strSql, _strsParameter); return FselRst.Frs; } public void SelectClose() { if (FselRst == null) return; super.SelectClose(FselRst); FselRst = null; } public int Batch_DifferentSQLs(ArrayList_listSql) { if (! DriverLoad(FstrDriver)) return 0; Connection conn = ConnectionOpen(FstrConnUrl, FstrConnUsername, FstrConnPassword); if (conn == null) return 0; return super.Batch_DifferentSQLs(conn, _listSql); } public int ExecuteUpdate_Single(String _str, String[] _strsParameter) { if (! DriverLoad(FstrDriver)) return 0; Connection conn = ConnectionOpen(FstrConnUrl, FstrConnUsername, FstrConnPassword); if (conn == null) return 0; return super.ExecuteUpdate_Single(conn, _str, _strsParameter); } public int ExecuteUpdate_SingleNoParameter(String _str) { if (! DriverLoad(FstrDriver)) return 0; Connection conn = ConnectionOpen(FstrConnUrl, FstrConnUsername, FstrConnPassword); if (conn == null) return 0; return super.ExecuteUpdate_SingleNoParameter(conn, _str); } public static void main(String[] args) { TjdbcOperB jdbcB = new TjdbcOperB(); /* String[] strsParameter = {"A2", "1"}; int iRtn = jdbcB.InsertSingle("insert into PwDevExMaster values (?,?)", strsParameter); System.out.println(iRtn); //*/ /* String strUpdate01 = "update PwDevExMaster set JgLx=50 where MainObjId=?"; String[] strsParaUpdate01 = {"A5"}; String strDelete01 = "delete from PwDevExMaster where MainObjId=?"; String[] strsParaDelete01 = {"A5"}; String strInsert02 = "insert into PwDevExMaster values (?,?)"; String[] strsParaInsert02 = {"A5", "200"}; String[] strsSql = {strUpdate01, strDelete01, strInsert02}; String[][] strssParameter = {strsParaUpdate01, strsParaDelete01, strsParaInsert02}; int iRtn = jdbcB.ExecuteUpdate_Multi(strsSql, strssParameter); System.out.println(iRtn); //*/ try { ResultSet rs = jdbcB.SelectGet("select * from PwDevExMaster", null); while (rs.next()) { System.out.println(rs.getString("MainObjId")+" , "+rs.getString("jglx")); } } catch (SQLException e) { e.printStackTrace(); } finally { jdbcB.SelectClose(); } jdbcB = null; }}
7.3、TselectResult.java
package jdbcOper;import java.sql.*;public class TselectResult{ PreparedStatement Fps; ResultSet Frs;}
8、package utils; --> 小工具类
TsomeOperate.java
package utils;public class TsomeOperate{ public static void main(String[] args) {}// *** public static int ByteArray2Integer(byte[] _bytes, int _iBeginOffset) { int i01 = _bytes[_iBeginOffset + 0] & 0xFF; int i02 = (_bytes[_iBeginOffset + 1] << 8) & 0xFF00; int i03 = (_bytes[_iBeginOffset + 2] << 16) & 0xFF0000; int i04 = (_bytes[_iBeginOffset + 3] << 24) & 0xFF000000; return (i01 | i02 | i03 | i04); } public static int Integer2ByteArray(int _i, byte[] _bytes, int _iBeginOffset) { _bytes[_iBeginOffset + 0] = (byte)(_i & 0xFF); _bytes[_iBeginOffset + 1] = (byte)((_i >> 8) & 0xFF); _bytes[_iBeginOffset + 2] = (byte)((_i >> 16) & 0xFF); _bytes[_iBeginOffset + 3] = (byte)((_i >> 24) & 0xFF); return 0; } public static int Long2ByteArray(long _l, byte[] _bytes, int _iBeginOffset) { _bytes[_iBeginOffset + 0] = (byte)(_l & 0xFF); _bytes[_iBeginOffset + 1] = (byte)((_l >> 8) & 0xFF); _bytes[_iBeginOffset + 2] = (byte)((_l >> 16) & 0xFF); _bytes[_iBeginOffset + 3] = (byte)((_l >> 24) & 0xFF); _bytes[_iBeginOffset + 4] = (byte)((_l >> 32) & 0xFF); _bytes[_iBeginOffset + 5] = (byte)((_l >> 40) & 0xFF); _bytes[_iBeginOffset + 6] = (byte)((_l >> 48) & 0xFF); _bytes[_iBeginOffset + 7] = (byte)((_l >> 56) & 0xFF); return 0; } public static int Double2ByteArray(double _d, byte[] _bytes, int _iBeginOffset) { long l = Double.doubleToLongBits(_d); _bytes[_iBeginOffset + 0] = (byte)(l & 0xFF); _bytes[_iBeginOffset + 1] = (byte)((l >> 8) & 0xFF); _bytes[_iBeginOffset + 2] = (byte)((l >> 16) & 0xFF); _bytes[_iBeginOffset + 3] = (byte)((l >> 24) & 0xFF); _bytes[_iBeginOffset + 4] = (byte)((l >> 32) & 0xFF); _bytes[_iBeginOffset + 5] = (byte)((l >> 40) & 0xFF); _bytes[_iBeginOffset + 6] = (byte)((l >> 48) & 0xFF); _bytes[_iBeginOffset + 7] = (byte)((l >> 56) & 0xFF); return 0; } public static void LogConsole(String _str) { System.out.println(_str); } public static void LogFile(String _str) {}}
9、
10、