博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
TCP中间件_java_server
阅读量:4322 次
发布时间:2019-06-06

本文共 46537 字,大约阅读时间需要 155 分钟。

1、工程概况

  1.1、都是使用的默认的库,就用了一个第3方库:ojdbc14.jar(用于操作Oracle10g[x86])

2、package dr.Client;  -->  用于测试的 客户端代码:

  Tclient.java

package dr.Client;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.util.concurrent.Future;import utils.TsomeOperate;public class Tclient{    // http://yunhaifeiwu.iteye.com/blog/1714664    public static void main(String[] args) throws Exception    {        AsynchronousSocketChannel client = AsynchronousSocketChannel.open();        //Future
futureConn = client.connect(new InetSocketAddress("localhost", 9888)); Future
futureConn = client.connect(new InetSocketAddress("192.168.1.233", 9888)); futureConn.get(); // Future
.get();等待异步事件的完成 Future
futureWrite = client.write(ByteBuffer.wrap("testAA".getBytes())); int iWritten = futureWrite.get(); TsomeOperate.LogConsole("Client send ["+iWritten+"] bytes ."); ByteBuffer bufRead = ByteBuffer.allocate(256); Future
futureRead = client.read(bufRead); int iRead = futureRead.get(); TsomeOperate.LogConsole("Client recv ["+iRead+"] bytes : "+bufRead); TsomeOperate.LogConsole("\t "+bufRead.capacity()); //byte[] bytesRead = new byte[iRead+1]; byte[] bytesRead = new byte[256]; bufRead.position(0); // ZC: 我擦,这步操作,搞死我了(一直没想到会需要这步操作...) bufRead.get(bytesRead, 0, iRead); bytesRead[iRead] = (byte)0; TsomeOperate.LogConsole("\t "+new String(bytesRead)); TsomeOperate.LogConsole("\t "+new String(bytesRead, 0, iRead)); bufRead.put(iRead, (byte)0); TsomeOperate.LogConsole("\t "+new String(bufRead.array(), 0, iRead)); Thread.sleep(1000*2); }}

3、package dr.DataSet;  -->  打包SQL查询结果

  TdrDataSet.java

package dr.DataSet;import java.io.ByteArrayOutputStream;import java.io.InputStream;import java.lang.reflect.Field;import java.math.BigDecimal;import java.sql.Blob;import java.sql.ResultSet;import java.util.Iterator;import java.util.Map.Entry;import jdbcOper.TjdbcOperB;import utils.TsomeOperate;import java.util.TreeMap;public class TdrDataSet{    public static void main(String[] args) throws Exception    {        TjdbcOperB jdbcB = new TjdbcOperB();        //ResultSet rs = jdbcB.SelectGet("select * from PwDevExMaster", null);        //ResultSet rs = jdbcB.SelectGet("select * from file_tbl where rownum<5 order by file_id", null);        //ResultSet rs = jdbcB.SelectGet("select * from file_tbl where rownum<5", null);        //ResultSet rs = jdbcB.SelectGet("select rownum from file_tbl", null);        //byte[] bytes = Pack(rs);                //ResultSet rs = jdbcB.SelectGet("select * from file_tbl where rownum<5 order by file_id", null);        //ResultSet rs = jdbcB.SelectGet("select * from file_tbl order by file_id", null);        ResultSet rs = jdbcB.SelectGet("select * from pwfiles", null);        boolean b = false;        long lTime01 = System.currentTimeMillis();        while (rs.next())        {//            if (! b)//            {//                byte[] bytes = new byte[255];//                b = true;//                InputStream instream = rs.getBinaryStream(2);//                int iRead = instream.read(bytes);//                System.out.println("iRead : "+iRead);//                for (int i=0; i
0)// System.out.println(lLenBlob+" , "+bytes.length); // *** //Blob bb = rs.getBlob(4); //InputStream instream = bb.getBinaryStream();// InputStream instream = rs.getBinaryStream(2);// if (instream != null)// {// byte[] bytes = new byte[1024 * 1024];// int iRead01 = 0;// while (true)// {// int iRead = instream.read(bytes);// if (iRead == -1)// break;// else// iRead01 += iRead;// }// //System.out.println("iRead01 : "+iRead01);// } // *** //double d = rs.getDouble(1); BigDecimal bd = rs.getBigDecimal(1); double d = bd.doubleValue(); } long lTime02 = System.currentTimeMillis(); System.out.println("time : "+(lTime02 - lTime01)); String strTest = "蒲州变"; byte[] bytes = strTest.getBytes("utf-8"); for (int i=0; i
tm = new TreeMap
(); Class
clazz = Class.forName("java.sql.Types"); Field[] fields = clazz.getDeclaredFields(); for (int i=0; i
> it = tm.entrySet().iterator(); while (it.hasNext()) { Entry
entry = it.next(); System.out.println(entry.getValue() +" : "+entry.getKey()); } } // *** public static byte[] Pack(ResultSet _rs) throws Exception { if (! _rs.first()) return null; int iFirstRowNum = _rs.getRow(); if (! _rs.last()) return null; int iLastRowNum = _rs.getRow(); int iRowCnt = iLastRowNum - iFirstRowNum + 1; int iColumnCnt = _rs.getMetaData().getColumnCount(); ByteArrayOutputStream byteArray = new ByteArrayOutputStream(); byte[] bytes4 = new byte[4]; // (A)、结果集 总长度(先空着) byteArray.write(bytes4); // (B)、数据共有多少列 TsomeOperate.Integer2ByteArray(iColumnCnt, bytes4, 0); byteArray.write(bytes4); // (C)、数据共有多少行 TsomeOperate.Integer2ByteArray(iRowCnt, bytes4, 0); byteArray.write(bytes4); // *** *** *** // [1]、DataSet头(字段类型 + 字段名称) // (1.1)、字段类型 int[] iiFieldType = new int[iColumnCnt]; for (int i=0; i

4、package dr.Server;  -->  服务端程序入口

  Tserver.java

package dr.Server;import java.lang.reflect.Method;import java.net.InetSocketAddress;import java.net.SocketOption;import java.nio.*;import java.nio.channels.*;import java.util.concurrent.*;import dr.Server.CompletionHandler.TacceptHandlerSrv;import dr.Server.CompletionHandler.TreadHandlerSrv;import dr.Server.CompletionHandler.TwriteHandlerSrv;import dr.Server.RemoteClient.TremoteClient;import dr.Server.RemoteClient.TremoteClients;import utils.TsomeOperate;import java.net.StandardSocketOptions;public class Tserver{    public final static int PORT = 9888;            public boolean FbStarted = false;        AsynchronousChannelGroup FasynchronousChannelGroup = null;    int FiThreadPoolSize = 100; // ZC: 这个参数的含义,不太确定是什么意思...    AsynchronousServerSocketChannel FserverSocketChannel = null;    int FiBacklog = 100; // ZC: 这个参数的含义,不确定是否就是 等待accept的最大数量(相当于Windows中开的accpet的线程数)        TremoteClients Fclients = null;    public Tserver() throws Exception    {        FasynchronousChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), FiThreadPoolSize);        FserverSocketChannel = AsynchronousServerSocketChannel.open(FasynchronousChannelGroup);        // 通过SocketOption类设置一些TCP选项        FserverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR,true);        FserverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024);                // 下面这样设置,会将 其它形式的IP地址的连接 都拒绝...        //FserverSocketChannel.bind(new InetSocketAddress("localhost", PORT), FiBacklog);        FserverSocketChannel.bind(new InetSocketAddress(PORT), FiBacklog);                FbStarted = true;            // ***        Fclients = new TremoteClients();    }        public void PendingAccept()    {         if (FbStarted && FserverSocketChannel.isOpen())        {            FserverSocketChannel.accept(Fclients, new TacceptHandlerSrv(this));        } else {            throw new IllegalStateException("Controller has been closed");        }    }        public void PendingRead(TremoteClient _client)    {        if (FbStarted && FserverSocketChannel.isOpen())        {            TsomeOperate.LogConsole("PendingRead - (1)");            _client.Fskt.read(ByteBuffer.wrap(_client.FbytesRecv, _client.FiRecv, _client.FbytesRecv.length - _client.FiRecv),                    _client, new TreadHandlerSrv(this));            TsomeOperate.LogConsole("PendingRead - (2)");        } else {            throw new IllegalStateException("Controller has been closed");        }    }        // 投递 带TimeOut的 接收操作    public void PendingRead_timeout(TremoteClient _client)    {        if (FbStarted && FserverSocketChannel.isOpen())        {            _client.Fskt.read(ByteBuffer.wrap(_client.FbytesRecv, _client.FiRecv, _client.FbytesRecv.length - _client.FiRecv),                    1, TimeUnit.MILLISECONDS, _client, new TreadHandlerSrv(this));        } else {            throw new IllegalStateException("Controller has been closed");        }    }        public void PendingWrite(TremoteClient _client)    {        if (FbStarted && FserverSocketChannel.isOpen())        {            int iLen = _client.FbytesSend.length - _client.FiHasSendLen;            _client.Fskt.write(ByteBuffer.wrap(_client.FbytesSend, _client.FiHasSendLen, iLen), _client, new TwriteHandlerSrv(this));        } else {            throw new IllegalStateException("Controller has been closed");        }    }        public void Send(byte[] _buf, TremoteClient _client)    {        _client.FbytesSend = _buf;        _client.FiHasSendLen = 0;                PendingWrite(_client);    }    // *** 反射,提供函数调用    public static void PendingAccept_r(Object _obj)    {        try {            Class
clazz = Tserver.class; Method method = clazz.getMethod("PendingAccept"); method.invoke(_obj); } catch (Exception e) { e.printStackTrace(); } } // 投递接收操作的 2个函数的函数名 public static final String METHODNAME_PEND_READ = "PendingRead"; public static final String METHODNAME_PEND_READ_TIMEOUT = "PendingRead_timeout"; public static final String METHODNAME_PEND_WRITE = "PendingWrite"; public static void PendingReadWrite_r(String _strMethodName, Object _obj, TremoteClient _client) { Class
clazz = Tserver.class; try { Method method = clazz.getMethod(_strMethodName, TremoteClient.class); method.invoke(_obj, _client); } catch (Exception e) { e.printStackTrace(); } } public static void Send_r(Object _obj, byte[] _buf, TremoteClient _client) { try { Class
clazz = Tserver.class; Method method = clazz.getMethod("Send", byte[].class, TremoteClient.class); //method.invoke(_obj, new Object[]{_buf}, _client); method.invoke(_obj, _buf, _client); } catch (Exception e) { e.printStackTrace(); } } // *** *** *** *** *** *** *** *** *** *** *** public static void main(String args[]) throws Exception { TsomeOperate.LogConsole("main in <<=="); Tserver server = new Tserver(); server.PendingAccept(); // ZC: Why只能投递一个Accept未决操作? //server.PendingAccept(); while (true) { TsomeOperate.LogConsole("main thread"); Thread.sleep(5000); } //TsomeOperate.LogConsole("main out ==>>"); }}

5、package dr.Server.CompletionHandler;  -->  TCP异步网络事件 的 回调函数

  5.1、TacceptHandlerSrv.java

package dr.Server.CompletionHandler;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import dr.Server.Tserver;import dr.Server.RemoteClient.TremoteClient;import dr.Server.RemoteClient.TremoteClients;import utils.TsomeOperate;public class TacceptHandlerSrv implements CompletionHandler
{ Object FaioServer = null; Object Fclients = null; public TacceptHandlerSrv(Object _obj) { FaioServer = _obj; } @Override public void completed(AsynchronousSocketChannel _rstNewSkt, TremoteClients _attachment) { try { String strRemoteAddress = _rstNewSkt.getRemoteAddress().toString(); TsomeOperate.LogConsole("("+Thread.currentThread().getId()+")Accept connection from " + strRemoteAddress); //TsomeOperate.LogConsole("TaioAcceptHandler.completed - 1 : "+_attachment); TremoteClient client = _attachment.ClientNew(strRemoteAddress); client.FlLastUse = System.currentTimeMillis(); client.FclientsOwner = _attachment; client.Fskt = _rstNewSkt; TsomeOperate.LogConsole("TaioAcceptHandler.completed - 2"); Tserver.PendingReadWrite_r("PendingRead", FaioServer, client); TsomeOperate.LogConsole("TaioAcceptHandler.completed - 3"); } catch (Exception ex) { ex.printStackTrace(); } finally { Tserver.PendingAccept_r(FaioServer); } } @Override public void failed(Throwable exc, TremoteClients _attachment) { TsomeOperate.LogConsole("TacceptHandlerSrv failed : "); exc.printStackTrace(); Tserver.PendingAccept_r(FaioServer); }}

  5.2、TreadHandlerSrv.java

package dr.Server.CompletionHandler;import java.io.IOException;import java.lang.reflect.Method;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import dr.Server.Tserver;import dr.Server.RemoteClient.TremoteClient;import utils.TsomeOperate;public class TreadHandlerSrv implements CompletionHandler
{ public static void main(String[] args) { } Object FaioServer = null; public TreadHandlerSrv(Object _obj) { FaioServer = _obj; } @Override public void completed(Integer _iRst, TremoteClient _attachment) { // 参数_iRst: 接收到多少byte的数据 TsomeOperate.LogConsole("TaioReadHandler.completed - _iRst : "+_iRst); if (_iRst < 0) { TsomeOperate.LogConsole("对方关闭socket"); try { _attachment.Fskt.close(); // 去掉对应的客户端信息 _attachment.DelSelf(); } catch (IOException e) { e.printStackTrace(); } } else if (_iRst > 0) { _attachment.FlLastUse = System.currentTimeMillis(); if (_iRst < _attachment.FbytesRecv.length) {// try {// Thread.sleep(1000);// } catch (InterruptedException e) {// e.printStackTrace();// } //_attachment.FbufRecv.Append(_attachment.FbytesRecv, _iRst); // some 操作 ...(业务逻辑) // (1)接收数据 _attachment.Recv(_iRst); // (2)处理数据 _attachment.MsgHandle(_attachment); // 继续投递 接收操作(非timeout) Tserver.PendingReadWrite_r(Tserver.METHODNAME_PEND_READ, FaioServer, _attachment); // 继续投递 timeout的接收操作 //Tserver.PendingReadWrite_r(Tserver.METHODNAME_PEND_READ_TIMEOUT, FaioServer, _attachment); } else// if (_iRst == _attachment.FbytesRecv.length) { // some 操作 ...(业务逻辑) // (1)接收数据 _attachment.Recv(_iRst); // (2)处理数据 _attachment.MsgHandle(_attachment); // ZC: 这里,接收的数据长度正好等于缓冲区的长度==>无法判断是否接收完毕了==>就使用“投递超时接收操作”的方式来做后续处理和判断 // ZC: 我记得WindowsIOCP的处理方式就是这样的,具体以后再说 // 继续投递 接收操作(非timeout) Tserver.PendingReadWrite_r(Tserver.METHODNAME_PEND_READ, FaioServer, _attachment); // 继续投递 timeout的接收操作 //Tserver.PendingReadWrite_r(Tserver.METHODNAME_PEND_READ_TIMEOUT, FaioServer, _attachment); } } } @Override public void failed(Throwable _ex, TremoteClient _attachment) { try {// Class
clazz = Class.forName("java.nio.channels.InterruptedByTimeoutException");// if (_ex.getClass() == clazz)// {// // some 操作 ...(业务逻辑)// // (2)处理数据// _attachment.MsgHandle(_attachment);// // // 继续投递 接收操作(非timeout)// Tserver.PendingReadWrite_r(Tserver.METHODNAME_PEND_READ, FaioServer, _attachment);// // // ZC: 操作超时之后,再投递read操作的话,会报错:// // ZC: “java.lang.IllegalStateException: Reading not allowed due to timeout or cancellation”// // ZC: 于是,投递失败 --> 没有接收操作 --> 收不到数据 --> ...// // ZC: 于是暂时,改变策略... 这里不投递了... 也不再投递 timeout的接收操作了...// }// else { TsomeOperate.LogConsole("TaioReadHandler.failed : "+_ex.getClass()); _ex.printStackTrace(); _attachment.Fskt.close(); // 去掉对应的客户端信息 _attachment.DelSelf(); } } catch (Exception e1) { e1.printStackTrace(); } }}

  5.3、TwriteHandlerSrv.java  -->  暂时 发送数据没有使用回调函数的方式,∴这个类暂时也没有使用到,以后要用的话 还要测试下

package dr.Server.CompletionHandler;import java.nio.channels.CompletionHandler;import dr.Server.Tserver;import dr.Server.RemoteClient.TremoteClient;import utils.TsomeOperate;public class TwriteHandlerSrv implements CompletionHandler
{ Object FaioServer = null; public TwriteHandlerSrv(Object _obj) { FaioServer = _obj; } @Override public void completed(Integer _iRst, TremoteClient _attachment) { _attachment.FiHasSendLen += _iRst; if (_attachment.FiHasSendLen > _attachment.FbytesSend.length) { TsomeOperate.LogConsole("TaioWriteHandlerSrv.completed FiSendLen > FbufSend.length"); } else if (_attachment.FiHasSendLen == _attachment.FbytesSend.length) { TsomeOperate.LogConsole("TaioWriteHandlerSrv.completed 发送完毕"); } else { Tserver.PendingReadWrite_r(Tserver.METHODNAME_PEND_WRITE, FaioServer, _attachment); } } @Override public void failed(Throwable _ex, TremoteClient _attachment) { _ex.printStackTrace(); }}

6、package dr.Server.RemoteClient;  -->  用于管理各个连接到Server的Client的信息,以及 处理接收到的TCP包

  6.1、TremoteClient.java

package dr.Server.RemoteClient;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import dr.DataSet.TdrDataSet;import utils.TsomeOperate;public class TremoteClient{    public long FlLastUse = 0;//System.currentTimeMillis();// 最后一次使用的时间([1]accept/[2]异步接收/[3]同步发送)    // ***    public TremoteClients FclientsOwner = null;    public String FstrRemoteAddress = null;    // ***    public AsynchronousSocketChannel Fskt = null;    public byte[] FbytesRecv = null;    public int FiRecv = 0; // 总共接收了多少字节的数据    public int FiFirstPktLen = 0; // 第1个TCP包的长度(0:无效)    // 接收到的数据的 暂存缓冲//    public Tbuffer FbufRecv = new Tbuffer();    // ***        // 每次最多发送多少byte的数据    // ZC: 这个值得取值,还有待商榷(评测),以后再说吧...    //public final int SEND_LEN = 1024;    public byte[] FbytesSend = null;    public int FiHasSendLen = 0; // 已经发送了多少byte的数据了        public TremoteClient(int _iRecvBufLen)    {        FbytesRecv = new byte[_iRecvBufLen];    }        public int DelSelf()    {        if (FclientsOwner != null)            FclientsOwner.ClientDel(this);        return 0;    }        public int SendSync(AsynchronousSocketChannel _skt, byte[] _buf, int _iSendLen) throws Exception    {        FlLastUse = System.currentTimeMillis();        int iSend = _skt.write(ByteBuffer.wrap(_buf, 0, _iSendLen)).get();        return iSend;    }    // 接收 数据包    public synchronized void Recv(int _iRecv)    {        FiRecv += _iRecv;        if (FiRecv >= FbytesRecv.length)            throw new RuntimeException("接收缓冲区 满了");    }        final int TCP_PACKET_HEADER_LEN = 4 * 3;        // TCP数据 操作类型:高16位:高一级类型; 低16位:低一级类型    // (正值)正常的 C/S之间的业务逻辑数据    final int OP_TYPE_SQL =          0x00010000;    final int OP_TYPE_PUSH =      0x00020000;    final int OP_TYPE_HEARTBEAT = 0x00030000;    // (负值)C/S之间的 管理数据    final int OP_TYPE_MANAGE =      0x80000000;    final int OP_TYPE_MANAGE_CLIENTS = 0x80000001; // c请求s,所有客户端的socket信息(简单)    final int OP_TYPE_MANAGE_SOCKET_REQ =  0x80000010; // c-->s,s-->c, 某个客户端的详细socket信息  (request)    (搬运工)    final int OP_TYPE_MANAGE_SOCKET_RES =  0x80000020; //                                        (response) (搬运工)    final int OP_TYPE_MANAGE_BLOCK_REQ =   0x80000030; // c-->s,s-->c, 某个客户端的内存block信息    (request)   (搬运工)    final int OP_TYPE_MANAGE_BLOCK_RES =   0x80000040; //                                        (response) (搬运工)    final int OP_TYPE_MANAGE_SQL =       0x80000002;       // 我的SQL语句操作            public void MsgHandle(TremoteClient _client)    {        TsomeOperate.LogConsole("MsgHandle -  FiRecv : "+FiRecv);        if (FiRecv < TCP_PACKET_HEADER_LEN)            return;        if ( (FiFirstPktLen != 0) && (FiRecv < FiFirstPktLen) )            return;                int iPktLen  = TsomeOperate.ByteArray2Integer(FbytesRecv, 0);        int iPktIdx  = TsomeOperate.ByteArray2Integer(FbytesRecv, 4);        int iPktType = TsomeOperate.ByteArray2Integer(FbytesRecv, 8);        TsomeOperate.LogConsole("接收的TCP包总长 : "+iPktLen);        TsomeOperate.LogConsole("接收的TCP包序号 : "+iPktIdx);        TsomeOperate.LogConsole("接收的TCP包类型 : "+Integer.toHexString(iPktType));                FiFirstPktLen = iPktLen;        if (FiRecv < iPktLen)            return;        FiFirstPktLen = 0; // ZC: 这里肯定会处理一个TCP包了,于是将FiFirstPktLen设为0                if ((iPktType & OP_TYPE_SQL) == OP_TYPE_SQL)        {            String strContent = new String(FbytesRecv, TCP_PACKET_HEADER_LEN, iPktLen - TCP_PACKET_HEADER_LEN);            TsomeOperate.LogConsole("\tRecv msg : "+strContent);            System.arraycopy(FbytesRecv, iPktLen, FbytesRecv, 0, FiRecv - iPktLen);                        SendTest(iPktIdx);        }        else if ((iPktType & OP_TYPE_MANAGE) == OP_TYPE_MANAGE)        {            MsgHandle_Manage(iPktType, _client,                    FbytesRecv, TCP_PACKET_HEADER_LEN, iPktLen - TCP_PACKET_HEADER_LEN);        }        //else if (iPktType == OP_TYPE_HEARTBEAT)        //{}                // TCP包处理完毕        FiRecv -= iPktLen;        if (FiRecv != 0)        {            // 剩余数据的移动(如果有的话)            System.arraycopy(FbytesRecv, iPktLen, FbytesRecv, 0, FiRecv);        }    }    @SuppressWarnings("finally")    public void MsgHandle_Manage(int _iOpType, TremoteClient _client,            byte[] _bytes, int _iOffset, int _iLen)    {        AsynchronousSocketChannel skt = null;        int iSendLen = 0;        byte[] bytesSend = new byte[1024*1];                if (_iOpType == OP_TYPE_MANAGE_CLIENTS)        {            byte[] bytesClients = FclientsOwner.ClientsMsg(this);            int iPktlen = TCP_PACKET_HEADER_LEN + bytesClients.length;            int iPktIdx = 0;            int iPktType= _iOpType;            TsomeOperate.Integer2ByteArray(iPktlen, bytesSend, 0);            TsomeOperate.Integer2ByteArray(iPktIdx, bytesSend, 4);            TsomeOperate.Integer2ByteArray(iPktType, bytesSend, 8);            System.arraycopy(bytesClients, 0, bytesSend, TCP_PACKET_HEADER_LEN, bytesClients.length);                        skt = _client.Fskt;            iSendLen = iPktlen;        }        else if ( (_iOpType == OP_TYPE_MANAGE_SOCKET_REQ) | (_iOpType == OP_TYPE_MANAGE_BLOCK_REQ) )        {            // ClientA向Server请求ClientB的信息            // ClientA发来Server 的信息:ClientB的RemoteAddress信息            // Server 发去ClientB的信息:ClientA的RemoteAddress信息            int iLenAddressB = TsomeOperate.ByteArray2Integer(_bytes, _iOffset);            String strAddressB = new String(_bytes, _iOffset + 4, iLenAddressB - 4);                        // ***            TremoteClient clientB = FclientsOwner.ClientGet(strAddressB);            if (clientB == null)            {                // 未找到 相应目标机的信息                // 返回(长度[负值]+信息)                strAddressB = " Dest machine \""+strAddressB+"\" not found .";                byte[] bytesMsgB = strAddressB.getBytes();                                int iPktlen = TCP_PACKET_HEADER_LEN + (4 + bytesMsgB.length);                int iPktIdx = 0;                int iPktType= _iOpType + 0x10;                                TsomeOperate.Integer2ByteArray(iPktlen, bytesSend, 0);                TsomeOperate.Integer2ByteArray(iPktIdx, bytesSend, 4);                TsomeOperate.Integer2ByteArray(iPktType, bytesSend, 8);                                TsomeOperate.Integer2ByteArray(-bytesMsgB.length, bytesSend, TCP_PACKET_HEADER_LEN);                System.arraycopy(bytesMsgB, 0, bytesSend, TCP_PACKET_HEADER_LEN + 4, bytesMsgB.length);                                skt = _client.Fskt;                iSendLen = iPktlen;            }            else            {                // 转发,后面加上一段                byte[] bytesAddressA = _client.FstrRemoteAddress.getBytes();                                int iPktlen = TCP_PACKET_HEADER_LEN + iLenAddressB + (4+bytesAddressA.length);                int iPktIdx = 0;                int iPktType= _iOpType;                                TsomeOperate.Integer2ByteArray(iPktlen, bytesSend, 0);                TsomeOperate.Integer2ByteArray(iPktIdx, bytesSend, 4);                TsomeOperate.Integer2ByteArray(iPktType, bytesSend, 8);                // ***                System.arraycopy(_bytes, _iOffset, bytesSend, TCP_PACKET_HEADER_LEN, iLenAddressB);                // ***                TsomeOperate.Integer2ByteArray((bytesAddressA.length + 4), bytesSend, TCP_PACKET_HEADER_LEN + iLenAddressB);                System.arraycopy(bytesAddressA, 0, bytesSend, TCP_PACKET_HEADER_LEN + iLenAddressB + 4, bytesAddressA.length);                                skt = clientB.Fskt;                iSendLen = iPktlen;            }        }        else if ( (_iOpType == OP_TYPE_MANAGE_SOCKET_RES) | (_iOpType == OP_TYPE_MANAGE_BLOCK_RES) )        {            // ClientA向Server请求ClientB的信息            // ClientB发来Server 的信息:ClientB的 socket/内存block 详细信息(长度+内容[ip信息+mac信息]) + ClientA的RemoteAddress信息(长度+内容)            // Server 发去ClientA的信息:ClientB的 socket/内存block 详细信息(长度+内容[ip信息+mac信息]) + ClientB的RemoteAddress信息(长度+内容)            int iLenMsgB = TsomeOperate.ByteArray2Integer(_bytes, _iOffset);            int iLenAddressB = TsomeOperate.ByteArray2Integer(_bytes, _iOffset + iLenMsgB);            int iLenAddressA = TsomeOperate.ByteArray2Integer(_bytes, _iOffset + iLenMsgB + iLenAddressB);                        String strAddressA = new String(_bytes, _iOffset + iLenMsgB + iLenAddressB + 4, iLenAddressA - 4);                        byte[] bytesSend01 = new byte[TCP_PACKET_HEADER_LEN + iLenMsgB + 32];            int iPktlen = TCP_PACKET_HEADER_LEN + iLenMsgB + iLenAddressB;            int iPktIdx = 0;            int iPktType= _iOpType;                        TsomeOperate.Integer2ByteArray(iPktlen, bytesSend01, 0);            TsomeOperate.Integer2ByteArray(iPktIdx, bytesSend01, 4);            TsomeOperate.Integer2ByteArray(iPktType, bytesSend01, 8);            // ***            System.arraycopy(_bytes, _iOffset, bytesSend01, TCP_PACKET_HEADER_LEN, iLenMsgB);            // ***            System.arraycopy(_bytes, _iOffset + iLenMsgB, bytesSend01, TCP_PACKET_HEADER_LEN + iLenMsgB, iLenAddressB);                        TremoteClient clientA = FclientsOwner.ClientGet(strAddressA);            try {                SendSync(clientA.Fskt, bytesSend01, iPktlen);            } catch (Exception e) {                e.printStackTrace();            } finally {                return;            }        }        else if (_iOpType == OP_TYPE_MANAGE_SQL)        {            int iLenSql = TsomeOperate.ByteArray2Integer(_bytes, _iOffset);            String strSql = new String(_bytes, _iOffset + 4, iLenSql - 4);            // ***            byte[] bytesRtn = TdrDataSet.SelectZ(strSql);            int iPktlen = TCP_PACKET_HEADER_LEN + bytesRtn.length;            int iPktIdx = 0;            int iPktType= OP_TYPE_MANAGE_SQL;            TsomeOperate.Integer2ByteArray(iPktlen, bytesSend, 0);            TsomeOperate.Integer2ByteArray(iPktIdx, bytesSend, 4);            TsomeOperate.Integer2ByteArray(iPktType, bytesSend, 8);            if (bytesRtn != null)            {                try {                    SendSync(_client.Fskt, bytesSend, TCP_PACKET_HEADER_LEN);                    SendSync(_client.Fskt, bytesRtn, bytesRtn.length);                } catch (Exception e) {                    e.printStackTrace();                } finally {                    return;                }            }        }        else // 具体如何处理,再议        {            String str = "Unsupport operate";            int iPktlen = TCP_PACKET_HEADER_LEN + str.length();            int iPktIdx = 0;            int iPktType= _iOpType;                        TsomeOperate.Integer2ByteArray(iPktlen, bytesSend, 0);            TsomeOperate.Integer2ByteArray(iPktIdx, bytesSend, 4);            TsomeOperate.Integer2ByteArray(iPktType, bytesSend, 8);            // ***            System.arraycopy(str.getBytes(), 0, bytesSend, 0, str.length());                        skt = _client.Fskt;            iSendLen = iPktlen;        }                try {            SendSync(skt, bytesSend, iSendLen);        } catch (Exception e) {            e.printStackTrace();        }    }// ***        int[] Fnum01 = new int[10];    byte[] FbytesSend01 = new byte[1024 * 1024 * 2];        public int SendTest(int _iPktIdx)    {        int iGroup = FbytesSend01.length / 10;        for (int i=0; i
=0; j--) { if (Fnum01[j] > 9) { if (j != 0) Fnum01[j-1] ++; } } for (int j=0; j<=9; j++) { FbytesSend01[i*10 + j] = (byte)(48+Fnum01[j]); } } int iYuShu = FbytesSend01.length % 10; if (iYuShu != 0) for (int i=0; i

  6.2、TremoteClients.java

package dr.Server.RemoteClient;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.util.HashMap;import java.util.Iterator;import java.util.Map.Entry;import utils.TsomeOperate;public class TremoteClients{// 该类用于 服务端 管理 远程客户端的信息    // *** *** ***    //HashMap?HashTable?    HashMap
Fhash = new HashMap
(); //HashSet
Fset = new HashSet
(); //TreeSet
Fset = new TreeSet
(); public final int RECV_BUF_LEN = 1024 * 4; public TremoteClient ClientNew(String _strRemoteAddress) { TremoteClient rst = new TremoteClient(RECV_BUF_LEN); rst.FstrRemoteAddress = _strRemoteAddress; boolean bNotAlreadyContain = ClientAdd(_strRemoteAddress, rst); if (bNotAlreadyContain) return rst; else { rst = null; return null; } } synchronized boolean ClientAdd(String _strRemoteAddress, TremoteClient _client) { TremoteClient rtn = Fhash.put(_strRemoteAddress, _client); if (rtn == _client) return false; return true; } synchronized boolean ClientDel(TremoteClient _client) { TremoteClient rtn = Fhash.remove(_client.FstrRemoteAddress); if (rtn == null) return false; return true; } synchronized byte[] ClientsMsg(TremoteClient _client) { long l = System.currentTimeMillis(); byte[] byte8 = new byte[8]; TsomeOperate.Long2ByteArray(l, byte8, 0); ByteArrayOutputStream ba = new ByteArrayOutputStream(); try { ba.write(byte8); Iterator
> it = Fhash.entrySet().iterator(); while (it.hasNext()) { Entry
entry = it.next(); String str; TremoteClient client = entry.getValue(); if (client == _client) str = entry.getKey() + " <=="; else str = entry.getKey(); TsomeOperate.Long2ByteArray(client.FlLastUse, byte8, 0); byte[] byte4 = new byte[4]; TsomeOperate.Integer2ByteArray(str.length(), byte4, 0); ba.write(byte8); // 该TremoteClient 最后一次使用的时间 ba.write(byte4); // 长度 ba.write(str.getBytes()); // 内容 } } catch (IOException e) { e.printStackTrace(); } return ba.toByteArray();// 需要手动 ba.Close() 吗? } synchronized TremoteClient ClientGet(String _strRemoteAddress) { return Fhash.get(_strRemoteAddress); } public static void main(String args[]) throws Exception { TsomeOperate.LogConsole(Long.SIZE+""); }}

7、package jdbcOper;  -->  基本的JDBC数据库操作,分装了及各类 方便使用

  7.1、TjdbcOperA.java

package jdbcOper;import java.sql.*;import java.util.ArrayList;// ZC: 基类(基本全是static方法)public class TjdbcOperA{    public void finalize()    {        System.out.println("TjdbcOperA.finalize");        //ConnectionClose();    }        private static Class
Fclazz = null; // 加载驱动 protected static Boolean DriverLoad(String _strDriver) { // _strDriver 类似 "oracle.jdbc.driver.OracleDriver" if (Fclazz == null) { try { Fclazz = Class.forName(_strDriver); } catch (ClassNotFoundException e) { Fclazz = null; e.printStackTrace(); } } return (Fclazz != null); } private static Connection Fconn = null; protected static Connection ConnectionOpen(String _strUrl, String _strUserName, String _strPassword) { // _strUrl 类似 "jdbc:oracle:thin:@localhost:1521:ORCLHSP" // _strUserName 类似 "scott" // _strPassword 类似 "tiger" if (Fconn == null) { try { Fconn = DriverManager.getConnection(_strUrl, _strUserName, _strPassword); } catch (SQLException e) { Fconn = null; e.printStackTrace(); } } return Fconn; } protected static void ConnectionClose() { if (Fconn != null) { try { Fconn.close(); } catch (SQLException e) { e.printStackTrace(); } Fconn = null; } } // ZC: 单条的 insert语句 // ZC: 返回 成功的SQL语句条数 protected static int InsertSingle(Connection _conn, String _strInsert, String[] _strsParameter) { int iRtn = 0; PreparedStatement ps = null; try { _conn.setAutoCommit(true); ps = _conn.prepareStatement(_strInsert); if (_strsParameter != null) { for (int i=0; i<_strsParameter.length; i++) ps.setString(i+1, _strsParameter[i]); } iRtn = ps.executeUpdate(); } catch (SQLException e) { e.printStackTrace(); } finally { if (ps != null) { try { ps.close(); } catch (SQLException e) { //e.printStackTrace(); } // ZC: (1)、若 上面关闭成功了,这里 赋值为 null,没有任何效果 // ZC: (2)、若 上面关闭失败,这里 赋值为 null,以便让垃圾回收机制来处理 // ZC: 这里的 ps是局部变量,函数退出后,栈空间回收,也没有变量再指向 PreparedStatement实例 了,应该不用显示的赋值为null了? // ZC: 观察到ps的关闭需要调用close()函数,是否 和 普通的类实例不同?函数退出时 不会自动关闭? // ZC: 然而,记得在哪里见过函数退出后 还能保持局部变量的 机制,是在 C#里还是在java里? ps = null; } } return iRtn; } // ZC: executeUpdate() 执行单条 insert/delete/update语句 // ZC: 返回 成功的SQL语句条数 protected static int ExecuteUpdate_Single(Connection _conn, String _str, String[] _strsParameter) { PreparedStatement ps = null; try { _conn.setAutoCommit(true); ps = _conn.prepareStatement(_str); for (int i=0; i<_strsParameter.length; i++) ps.setString(i+1, _strsParameter[i]); int iRtn = ps.executeUpdate(); return iRtn; } catch (SQLException e) { e.printStackTrace(); return 0; } } protected static int ExecuteUpdate_SingleNoParameter(Connection _conn, String _str) { PreparedStatement ps = null; try { _conn.setAutoCommit(true); ps = _conn.prepareStatement(_str); int iRtn = ps.executeUpdate(); return iRtn; } catch (SQLException e) { e.printStackTrace(); return 0; } } // ZC: 使用的是 单个 PreparedStatement (多个 PreparedStatement 怎么耍?) // ZC: executeUpdate() 执行多条 insert/delete/update语句 // ZC: 返回 成功的SQL语句条数(ZC: 如果是insert/delete/update语句的组合,那返回的条数如何计算?加起来?) protected static int ExecuteUpdate_Multi(Connection _conn, String[] _strsSql, String[][] _strssParameter) { int iRtn = 0; PreparedStatement ps = null; try { _conn.setAutoCommit(false); for (int i=0; i<_strsSql.length; i++) { if (_strssParameter[i] != null) { ps = _conn.prepareStatement(_strsSql[i]); for (int j=0; j<_strssParameter[i].length; j++) ps.setString(j+1, _strssParameter[i][j]); // ZC: 每次得到 PreparedStatement,都要 执行executeUpdate(),不然 它只会执行会有一条语句 iRtn += ps.executeUpdate(); // ZC: 每次获得的都是 新的实例,记得关闭 ps.close(); ps = null; } } _conn.commit(); } catch (SQLException e) { e.printStackTrace(); try { iRtn = 0; // ZC: 如果回滚也异常了,那怎么办?已经执行的语句 就没办法撤回了? // ZC: 还是说不执行commit()的话,SQL语句默认没有提交?可能是的. _conn.rollback(); } catch (SQLException e1) { e1.printStackTrace(); } } finally { if (ps != null) {// try {// ps.close();// } catch (SQLException e) {// e.printStackTrace();// } // 上面已经有关闭的代码了,这里就不必再关闭了,赋值为null后若有需要 就交给JVM处理吧 ps = null; } } return iRtn; } // ZC: 貌似 PreparedStatement.addBatch(); 和 PreparedStatement.addBatch(sql); 不能组合使用。 public static int Batch_SameSQL(Connection _conn, String _strSql, String[][] _strssParameter) { int iRtn = 0; PreparedStatement ps = null; try { _conn.setAutoCommit(false); ps = _conn.prepareStatement(_strSql); for (int i=0; i<_strssParameter.length; i++) { if (_strssParameter[i] != null) { for (int j=0; j<_strssParameter[i].length; j++) ps.setString(j+1, _strssParameter[i][j]); ps.addBatch(); } } int[] iRtns = ps.executeBatch(); for (int i=0; i
∵ PreparedStatement.addBatch(); 和 PreparedStatement.addBatch(sql); 不能组合使用。 // ZC: PreparedStatement.addBatch(sql); 测试下来,貌似只能加入不带参数的SQL语句 // ZC: 貌似 一旦执行了"PreparedStatement.addBatch(sql2);","Connection.prepareStatement(sql1)"中传入的参数sql1就无效了... public static int Batch_DifferentSQLs(Connection _conn, ArrayList
_listSql) { int iRtn = 0;// PreparedStatement ps = null; Statement st = null; try {// _conn.setAutoCommit(false);// ps = _conn.prepareStatement(_listSql.get(0));// // for (int i=0; i<_listSql.size(); i++)// ps.addBatch(_listSql.get(i));// // int[] iRtns = ps.executeBatch();// for (int i=0; i

  7.2、TjdbcOperB.java

package jdbcOper;import java.sql.*;import java.util.ArrayList;// ZC: 子类(没有static方法)public class TjdbcOperB extends TjdbcOperA{    // 下面的这些变量,(1)可以写在代码里,(2)也可以写在属性文件里,(3)或者xml文件里 等等    //*    // (1)、Oracle相关配置    String FstrDriver = "oracle.jdbc.driver.OracleDriver";    String FstrConnUrl =  "jdbc:oracle:thin:@192.168.1.201:1521:ZheJiang";    String FstrConnUsername = "testWFpas";//"WenZhouPDPas";    //String FstrConnUsername = "WenZhouPDPas";    String FstrConnPassword = "dongruisoft.com";    //*/    /*    // (2)、MySQL相关配置    String FstrDriver = "com.mysql.jdbc.Driver";    String FstrConnUrl =  "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8";    String FstrConnUsername = "root";    String FstrConnPassword = "";    //*/        public int InsertSingle(String _strInsert, String[] _strsParameter)    {        if (! DriverLoad(FstrDriver))            return 0;                Connection conn = ConnectionOpen(FstrConnUrl, FstrConnUsername, FstrConnPassword);        if (conn == null)            return 0;                return super.InsertSingle(conn, _strInsert, _strsParameter);    }        public int ExecuteUpdate_Multi(String[] _strSql, String[][] _strsParameter)    {        if (! DriverLoad(FstrDriver))            return 0;                Connection conn = ConnectionOpen(FstrConnUrl, FstrConnUsername, FstrConnPassword);        if (conn == null)            return 0;                return super.ExecuteUpdate_Multi(conn, _strSql, _strsParameter);    }            TselectResult FselRst = null;        public ResultSet SelectGet(String _strSql, String[] _strsParameter)    {        if (! DriverLoad(FstrDriver))            return null;                Connection conn = ConnectionOpen(FstrConnUrl, FstrConnUsername, FstrConnPassword);        if (conn == null)            return null;                FselRst = super.SelectGet(conn, _strSql, _strsParameter);        return FselRst.Frs;    }        public void SelectClose()    {        if (FselRst == null)            return;                super.SelectClose(FselRst);        FselRst = null;    }        public int Batch_DifferentSQLs(ArrayList
_listSql) { if (! DriverLoad(FstrDriver)) return 0; Connection conn = ConnectionOpen(FstrConnUrl, FstrConnUsername, FstrConnPassword); if (conn == null) return 0; return super.Batch_DifferentSQLs(conn, _listSql); } public int ExecuteUpdate_Single(String _str, String[] _strsParameter) { if (! DriverLoad(FstrDriver)) return 0; Connection conn = ConnectionOpen(FstrConnUrl, FstrConnUsername, FstrConnPassword); if (conn == null) return 0; return super.ExecuteUpdate_Single(conn, _str, _strsParameter); } public int ExecuteUpdate_SingleNoParameter(String _str) { if (! DriverLoad(FstrDriver)) return 0; Connection conn = ConnectionOpen(FstrConnUrl, FstrConnUsername, FstrConnPassword); if (conn == null) return 0; return super.ExecuteUpdate_SingleNoParameter(conn, _str); } public static void main(String[] args) { TjdbcOperB jdbcB = new TjdbcOperB(); /* String[] strsParameter = {"A2", "1"}; int iRtn = jdbcB.InsertSingle("insert into PwDevExMaster values (?,?)", strsParameter); System.out.println(iRtn); //*/ /* String strUpdate01 = "update PwDevExMaster set JgLx=50 where MainObjId=?"; String[] strsParaUpdate01 = {"A5"}; String strDelete01 = "delete from PwDevExMaster where MainObjId=?"; String[] strsParaDelete01 = {"A5"}; String strInsert02 = "insert into PwDevExMaster values (?,?)"; String[] strsParaInsert02 = {"A5", "200"}; String[] strsSql = {strUpdate01, strDelete01, strInsert02}; String[][] strssParameter = {strsParaUpdate01, strsParaDelete01, strsParaInsert02}; int iRtn = jdbcB.ExecuteUpdate_Multi(strsSql, strssParameter); System.out.println(iRtn); //*/ try { ResultSet rs = jdbcB.SelectGet("select * from PwDevExMaster", null); while (rs.next()) { System.out.println(rs.getString("MainObjId")+" , "+rs.getString("jglx")); } } catch (SQLException e) { e.printStackTrace(); } finally { jdbcB.SelectClose(); } jdbcB = null; }}

  7.3、TselectResult.java

package jdbcOper;import java.sql.*;public class TselectResult{    PreparedStatement Fps;    ResultSet Frs;}

8、package utils;  -->  小工具类

  TsomeOperate.java

package utils;public class TsomeOperate{    public static void main(String[] args)    {}// ***        public static int ByteArray2Integer(byte[] _bytes, int _iBeginOffset)    {        int i01 = _bytes[_iBeginOffset + 0] & 0xFF;        int i02 = (_bytes[_iBeginOffset + 1] << 8)  & 0xFF00;        int i03 = (_bytes[_iBeginOffset + 2] << 16) & 0xFF0000;        int i04 = (_bytes[_iBeginOffset + 3] << 24) & 0xFF000000;        return (i01 | i02 | i03 | i04);    }        public static int Integer2ByteArray(int _i, byte[] _bytes, int _iBeginOffset)    {        _bytes[_iBeginOffset + 0] = (byte)(_i & 0xFF);        _bytes[_iBeginOffset + 1] = (byte)((_i >> 8) & 0xFF);        _bytes[_iBeginOffset + 2] = (byte)((_i >> 16) & 0xFF);        _bytes[_iBeginOffset + 3] = (byte)((_i >> 24) & 0xFF);        return 0;    }        public static int Long2ByteArray(long _l, byte[] _bytes, int _iBeginOffset)    {        _bytes[_iBeginOffset + 0] = (byte)(_l & 0xFF);        _bytes[_iBeginOffset + 1] = (byte)((_l >> 8) & 0xFF);        _bytes[_iBeginOffset + 2] = (byte)((_l >> 16) & 0xFF);        _bytes[_iBeginOffset + 3] = (byte)((_l >> 24) & 0xFF);                _bytes[_iBeginOffset + 4] = (byte)((_l >> 32) & 0xFF);        _bytes[_iBeginOffset + 5] = (byte)((_l >> 40) & 0xFF);        _bytes[_iBeginOffset + 6] = (byte)((_l >> 48) & 0xFF);        _bytes[_iBeginOffset + 7] = (byte)((_l >> 56) & 0xFF);                return 0;    }        public static int Double2ByteArray(double _d, byte[] _bytes, int _iBeginOffset)    {        long l = Double.doubleToLongBits(_d);        _bytes[_iBeginOffset + 0] = (byte)(l & 0xFF);        _bytes[_iBeginOffset + 1] = (byte)((l >> 8) & 0xFF);        _bytes[_iBeginOffset + 2] = (byte)((l >> 16) & 0xFF);        _bytes[_iBeginOffset + 3] = (byte)((l >> 24) & 0xFF);                _bytes[_iBeginOffset + 4] = (byte)((l >> 32) & 0xFF);        _bytes[_iBeginOffset + 5] = (byte)((l >> 40) & 0xFF);        _bytes[_iBeginOffset + 6] = (byte)((l >> 48) & 0xFF);        _bytes[_iBeginOffset + 7] = (byte)((l >> 56) & 0xFF);                return 0;    }        public static void LogConsole(String _str)    {        System.out.println(_str);    }        public static void LogFile(String _str)    {}}

 

9、

10、

 

转载于:https://www.cnblogs.com/CodeHouseZ/p/6144125.html

你可能感兴趣的文章
GridView数据导入Excel/Excel数据读入GridView
查看>>
566. Reshape the Matrix
查看>>
python数据结构与算法之搜索
查看>>
(最小点覆盖) poj 2226
查看>>
(树形DP) poj 3659
查看>>
获取类的属性名和值
查看>>
python对json的操作总结
查看>>
学习进度表第十一周
查看>>
js屏蔽回车键
查看>>
Memcached通用类(基于enyim.com Memcached Client)
查看>>
c#组元(Tuple)的使用
查看>>
【NO.66】转 Yahoo的军规条例
查看>>
vim基础学习之搜索功能
查看>>
session和cookie
查看>>
tftp 开发板ping不通PC机
查看>>
未在本地计算机上注册“Microsoft.Jet.OLEDB.4.0”
查看>>
Erdos
查看>>
docker初学
查看>>
面向对象的程序第三次实验作业
查看>>
Windows 安装启动apache时出现错误的解决方法
查看>>