Java Socket通信技术在很长的时间里都在使用,在不少的程序员眼中都有很多高的评价。那么下面我们就看看如何才能掌握这门复杂的编程语言,希望大家在今后的Java Socket通信技术使用中有所收获。
下面就是Java Socket通信技术在解决收发线程互斥的代码介绍。
1. packagecom.bill99.svr;
2. importjava.io.IOException;
3. importjava.io.InputStream;
4. importjava.io.OutputStream;
5. importjava.net.InetSocketAddress;
6. importjava.net.Socket;
7. importjava.net.SocketException;
8. importjava.net.SocketTimeoutException;
9. importjava.text.SimpleDateFormat;
10. importjava.util.Date;
11. importjava.util.Properties;
12. importjava.util.Timer;
13. importjava.util.TimerTask;
14. importjava.util.concurrent.ConcurrentHashMap;
15. importjava.util.concurrent.TimeUnit;
16. importjava.util.concurrent.locks.Condition;
17. importjava.util.concurrent.locks.ReentrantLock;
18. importorg.apache.log4j.Logger;
19. /**
20. *<p>title:socket通信包装类</p>
21. *<p>Description:</p>
22. *<p>CopyRight:CopyRight(c)2009</p>
23. *<p>Company:99bill.com</p>
24. *<p>Createdate:2009-10-14</P>
25. *authorsunnylocus<Ahref="mailto:sunnylocus@163.com">
26. </A>*v0.102009-10-14初类
27. *v0.112009-11-12对命令收发逻辑及收发线程互斥机制进行了优化,
处理命令速度由原来8~16个/秒提高到25~32个/秒
28. */publicclassSocketConnection{
29. privatevolatileSocketsocket;
30. privateinttimeout=1000*10;//超时时间,初始值10秒
31. privatebooleanisLaunchHeartcheck=false;//是否已启动心跳检测
32. privatebooleanisNetworkConnect=false;//网络是否已连接
33. privatestaticStringhost="";
34. privatestaticintport;
35. staticInputStreaminStream=null;
36. staticOutputStreamoutStream=null;
37. privatestaticLoggerlog=Logger.getLogger
(SocketConnection.class);
38. privatestaticSocketConnectionsocketConnection=null;
39. privatestaticjava.util.TimerheartTimer=null;
40. //privatefinalMap<String,Object>recMsgMap=Collections.
synchronizedMap(newHashMap<String,Object>());
41. privatefinalConcurrentHashMap<String,Object>recMsgMap
=newConcurrentHashMap<String,Object>();
42. privatestaticThreadreceiveThread=null;
43. privatefinalReentrantLocklock=newReentrantLock();
44. privateSocketConnection(){
45. Propertiesconf=newProperties();
46. try{
47. conf.load(SocketConnection.class.getResourceAsStream
("test.conf"));
48. this.timeout=Integer.valueOf(conf.getProperty("timeout"));
49. init(conf.getProperty("ip"),Integer.valueOf
(conf.getProperty("port")));
50. }catch(IOExceptione){
51. log.fatal("socket初始化异常!",e);
52. thrownewRuntimeException("socket初始化异常,请检查配置参数");
53. }
54. }
55. /**
56. *单态模式
57. */
58. publicstaticSocketConnectiongetInstance(){
59. if(socketConnection==null){
60. synchronized(SocketConnection.class){
61. if(socketConnection==null){
62. socketConnection=newSocketConnection();
63. returnsocketConnection;
64. }
65. }
66. }
67. returnsocketConnection;
68. }
69. privatevoidinit(Stringhost,intport)throwsIOException{
70. InetSocketAddressaddr=newInetSocketAddress(host,port);
71. socket=newSocket();
72. synchronized(this){
73. log.info("【准备与"+addr+"建立连接】");
74. socket.connect(addr,timeout);
75. log.info("【与"+addr+"连接已建立】");
76. inStream=socket.getInputStream();
77. outStream=socket.getOutputStream();
78. socket.setTcpNoDelay(true);//数据不作缓冲,立即发送
79. socket.setSoLinger(true,0);//socket关闭时,立即释放资源
80. socket.setKeepAlive(true);
81. socket.setTrafficClass(0x04|0x10);//高可靠性和最小延迟传输
82. isNetworkConnect=true;
83. receiveThread=newThread(newReceiveWorker());
84. receiveThread.start();
85. SocketConnection.host=host;
86. SocketConnection.port=port;
87. if(!isLaunchHeartcheck)
88. launchHeartcheck();
89. }
90. }
91. /**
92. *心跳包检测
93. */
94. privatevoidlaunchHeartcheck(){
95. if(socket==null)
96. thrownewIllegalStateException("socketisnot
established!");
97. heartTimer=newTimer();
98. isLaunchHeartcheck=true;
99. heartTimer.schedule(newTimerTask(){
100. publicvoidrun(){
101. StringmsgStreamNo=StreamNoGenerator.getStreamNo("kq");
102. intmstType=9999;//999-心跳包请求
103. SimpleDateFormatdateformate=newSimpleDateFormat
("yyyyMMddHHmmss");
104. StringmsgDateTime=dateformate.format(newDate());
105. intmsgLength=38;//消息头长度
106. Stringcommandstr="00"+msgLength+mstType+msgStreamNo;
107. log.info("心跳检测包->IVR"+commandstr);
108. intreconnCounter=1;
109. while(true){
110. StringresponseMsg=null;
111. try{
112. responseMsg=readReqMsg(commandstr);
113. }catch(IOExceptione){
114. log.error("IO流异常",e);
115. reconnCounter++;
116. }
117. if(responseMsg!=null){
118. log.info("心跳响应包<-IVR"+responseMsg);
119. reconnCounter=1;
120. break;
121. }else{
122. reconnCounter++;
123. }
124. if(reconnCounter>3){//重连次数已达三次,判定网络连接中断,
重新建立连接。连接未被建立时不释放锁
125. reConnectToCTCC();break;
126. }
127. }
128. }
129. },1000*60*1,1000*60*2);
130. }
131. /**
132. *重连与目标IP建立重连
133. */
134. privatevoidreConnectToCTCC(){
135. newThread(newRunnable(){
136. publicvoidrun(){
137. log.info("重新建立与"+host+":"+port+"的连接");
138. //清理工作,中断计时器,中断接收线程,恢复初始变量
139. heartTimer.cancel();
140. isLaunchHeartcheck=false;
141. isNetworkConnect=false;
142. receiveThread.interrupt();
143. try{
144. socket.close();
145. }catch(IOExceptione1){log.error("重连时,关闭socket连
接发生IO流异常",e1);}
146. //----------------
147. synchronized(this){
148. for(;;){
149. try{
150. Thread.currentThread();
151. Thread.sleep(1000*1);
152. init(host,port);
153. this.notifyAll();
154. break;
155. }catch(IOExceptione){
156. log.error("重新建立连接未成功",e);
157. }catch(InterruptedExceptione){
158. log.error("重连线程中断",e);
159. }
160. }
161. }
162. }
163. }).start();
164. }
165. /**
166. *发送命令并接受响应
167. *@paramrequestMsg
168. *@return
169. *@throwsSocketTimeoutException
170. *@throwsIOException
171. */
172. publicStringreadReqMsg(StringrequestMsg)throwsIOException{
173. if(requestMsg==null){
174. returnnull;
175. }
176. if(!isNetworkConnect){
177. synchronized(this){
178. try{
179. this.wait(1000*5);//等待5秒,如果网络还没有恢复,抛出IO流异常
180. if(!isNetworkConnect){
181. thrownewIOException("网络连接中断!");
182. }
183. }catch(InterruptedExceptione){
184. log.error("发送线程中断",e);
185. }
186. }
187. }
188. StringmsgNo=requestMsg.substring(8,8+24);//读取流水号
189. outStream=socket.getOutputStream();
190. outStream.write(requestMsg.getBytes());
191. outStream.flush();
192. Conditionmsglock=lock.newCondition();//消息锁
193. //注册等待接收消息
194. recMsgMap.put(msgNo,msglock);
195. try{
196. lock.lock();
197. msglock.await(timeout,TimeUnit.MILLISECONDS);
198. }catch(InterruptedExceptione){
199. log.error("发送线程中断",e);
200. }finally{
201. lock.unlock();
202. }
203. ObjectrespMsg=recMsgMap.remove(msgNo);//响应信息
204. if(respMsg!=null&&(respMsg!=msglock)){
205. //已经接收到消息,注销等待,成功返回消息
206. return(String)respMsg;
207. }else{
208. log.error(msgNo+"超时,未收到响应消息");
209. thrownewSocketTimeoutException(msgNo+"超时,未收到响应消息");
210. }
211. }
212. publicvoidfinalize(){
213. if(socket!=null){
214. try{
215. socket.close();
216. }catch(IOExceptione){
217. e.printStackTrace();
218. }
219. }
220. }
221. //消息接收线程
222. privateclassReceiveWorkerimplementsRunnable{
223. StringintStr=null;
224. publicvoidrun(){
225. while(!Thread.interrupted()){
226. try{
227. byte[]headBytes=newbyte[4];
228. if(inStream.read(headBytes)==-1){
229. log.warn("读到流未尾,对方已关闭流!");
230. reConnectToCTCC();//读到流未尾,对方已关闭流
231. return;
232. }
233. byte[]tmp=newbyte[4];
234. tmp=headBytes;
235. StringtempStr=newString(tmp).trim();
236. if(tempStr==null||tempStr.equals("")){
237. log.error("receivedmessageisnull");
238. continue;
239. }
240. intStr=newString(tmp);
241. inttotalLength=Integer.parseInt(intStr);
242. //----------------
243. byte[]msgBytes=newbyte[totalLength-4];
244. inStream.read(msgBytes);
245. StringresultMsg=newString(headBytes)+new
String(msgBytes);
246. //抽出消息ID
247. StringmsgNo=resultMsg.substring(8,8+24);
248. Conditionmsglock=(Condition)recMsgMap.get(msgNo);
249. if(msglock==null){
250. log.warn(msgNo+"序号可能已被注销!响应消息丢弃");
251. recMsgMap.remove(msgNo);
252. continue;
253. }
254. recMsgMap.put(msgNo,resultMsg);
255. try{
256. lock.lock();
257. msglock.signalAll();
258. }finally{
259. lock.unlock();
260. }
261. }catch(SocketExceptione){
262. log.error("服务端关闭socket",e);
263. reConnectToCTCC();
264. }catch(IOExceptione){
265. log.error("接收线程读取响应数据时发生IO流异常",e);
266. }catch(NumberFormatExceptione){
267. log.error("收到没良心包,String转int异常,异常字符:"+intStr);
268. }
269. }
270. }
271. }
272. }
以上就是对Java Socket通信技术中收发线程互斥的详细解决方法。希望大家有所领悟。
分享到:
相关推荐
g: 利用互斥量来解决线程同步互斥问题 h: problem1 生产者消费者问题 (1生产者 1消费者 1缓冲区) problem1 more 生产者消费者问题 (1生产者 2消费者 4缓冲区) problem2 读者与写着问题 I: 信号量 semaphore ...
解决多线程编程中的同步互斥问题
易语言 线程池 多线程 线程互斥 易语言线程互斥的解决办法
本代码是用JAVA实现的生产者与消费者的问题,线程间的同步与互斥功能
这个多线程例子。C#的。主要讲述线程互斥的问题 。只是个简单的例子。
一、题目: 创建线程,利用互斥实现线程共享变量通信 二、目的 掌握线程创建和终止,加深对线程和进程概念的理解,会用同步与互斥方法实现线程之间的通信。 三、内容和要求 软件界面上点“创建线程” 按钮,创建三个...
操作系统实验 多线程同步与互斥 java编写 可动态创建
C#多线程互斥实例 多线程获取同一变量(不重复)。是一个很好的学习例子
C 创建线程互斥对象的实例源码下载,声明线程函数,创建线程,程序睡眠,释放互斥对象,设置事件对象为无信号状态,生成控制台程序,仅供参考。
4个线程互斥类,代码例子,完整的工程,详细说明了4中互斥对象的作用,对学习多线程编程的朋友很有用处............
线程互斥测试2!
C#.NET多线程实例6个(包括多线程基本使用,多线程互斥等全部多线程使用实例)
操作系统实验(三)线程的互斥 操作系统实验(三)线程的互斥
Java语言中的线程同步互斥研究
C# 多线程互斥 两个线程交替工作 C#多线程互斥,两个线程交替工作,如上图所示,挺有意思的。
Java线程间同步互斥,在实际的编程中,经常要处理线程间的同步互斥问题。Java 语言内在强大的多线程支持使得处理这类问题变得相对来说比较简单。本例将模仿经典的线程同步互斥例子——生产者和消费者问题,来演示 ...
一个多线程访问的同一个资源,java synchronized互斥锁的用法,android和此用法一致。
线程中的互斥问题,包括代码!!近似恩唢呐架飞往速度法wehfsdfhak
编写程序实现并发线程之间的同步和互斥问题 线程间的互斥:并发执行的线程共享某些类临界资源,对临界资源的访问应当采取互斥的机制。 线程间的同步:并发执行的线程间通常存在相互制约的关系,线程必须遵循一定的...
Java实现的进程同步与互斥(PV) Hao语言