`
nanjingjiangbiao_T
  • 浏览: 2592783 次
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

如何解决Java Socket通信技术收发线程互斥的问题

 
阅读更多
Java Socket通信技术在很长的时间里都在使用,在不少的程序员眼中都有很多高的评价。那么下面我们就看看如何才能掌握这门复杂的编程语言,希望大家在今后的Java Socket通信技术使用中有所收获。
下面就是Java Socket通信技术在解决收发线程互斥的代码介绍。
1. packagecom.bill99.svr; 
2. importjava.io.IOException; 
3. importjava.io.InputStream; 
4. importjava.io.OutputStream; 
5. importjava.net.InetSocketAddress; 
6. importjava.net.Socket; 
7. importjava.net.SocketException; 
8. importjava.net.SocketTimeoutException; 
9. importjava.text.SimpleDateFormat; 
10. importjava.util.Date; 
11. importjava.util.Properties; 
12. importjava.util.Timer; 
13. importjava.util.TimerTask; 
14. importjava.util.concurrent.ConcurrentHashMap; 
15. importjava.util.concurrent.TimeUnit; 
16. importjava.util.concurrent.locks.Condition; 
17. importjava.util.concurrent.locks.ReentrantLock; 
18. importorg.apache.log4j.Logger; 
19. /** 
20. *<p>title:socket通信包装类</p> 
21. *<p>Description:</p> 
22. *<p>CopyRight:CopyRight(c)2009</p> 
23. *<p>Company:99bill.com</p> 
24. *<p>Createdate:2009-10-14</P> 
25. *authorsunnylocus<Ahref="mailto:sunnylocus@163.com"> 
26. </A>*v0.102009-10-14初类 
27. *v0.112009-11-12对命令收发逻辑及收发线程互斥机制进行了优化,
处理命令速度由原来8~16/秒提高到25~32/
28. */publicclassSocketConnection{ 
29. privatevolatileSocketsocket; 
30. privateinttimeout=1000*10;//超时时间,初始值10 
31. privatebooleanisLaunchHeartcheck=false;//是否已启动心跳检测 
32. privatebooleanisNetworkConnect=false;//网络是否已连接 
33. privatestaticStringhost=""; 
34. privatestaticintport; 
35. staticInputStreaminStream=null; 
36. staticOutputStreamoutStream=null; 
37. privatestaticLoggerlog=Logger.getLogger
(SocketConnection.class);
38. privatestaticSocketConnectionsocketConnection=null; 
39. privatestaticjava.util.TimerheartTimer=null;  
40. //privatefinalMap<String,Object>recMsgMap=Collections.
synchronizedMap(newHashMap
<String,Object>());
41. privatefinalConcurrentHashMap<String,Object>recMsgMap
=
newConcurrentHashMap<String,Object>();
42. privatestaticThreadreceiveThread=null; 
43. privatefinalReentrantLocklock=newReentrantLock(); 
44. privateSocketConnection(){ 
45. Propertiesconf=newProperties(); 
46. try{ 
47. conf.load(SocketConnection.class.getResourceAsStream
("test.conf"));
48. this.timeout=Integer.valueOf(conf.getProperty("timeout")); 
49. init(conf.getProperty("ip"),Integer.valueOf
(conf.getProperty("port")));
50. }catch(IOExceptione){ 
51. log.fatal("socket初始化异常!",e); 
52. thrownewRuntimeException("socket初始化异常,请检查配置参数"); 
53. } 
54. } 
55. /** 
56. *单态模式 
57. */ 
58. publicstaticSocketConnectiongetInstance(){ 
59. if(socketConnection==null){ 
60. synchronized(SocketConnection.class){ 
61. if(socketConnection==null){ 
62. socketConnection=newSocketConnection(); 
63. returnsocketConnection; 
64. } 
65. } 
66. } 
67. returnsocketConnection; 
68. } 
69. privatevoidinit(Stringhost,intport)throwsIOException{ 
70. InetSocketAddressaddr=newInetSocketAddress(host,port); 
71. socket=newSocket(); 
72. synchronized(this){ 
73. log.info("【准备与"+addr+"建立连接】"); 
74. socket.connect(addr,timeout); 
75. log.info("【与"+addr+"连接已建立】"); 
76. inStream=socket.getInputStream(); 
77. outStream=socket.getOutputStream(); 
78. socket.setTcpNoDelay(true);//数据不作缓冲,立即发送 
79. socket.setSoLinger(true,0);//socket关闭时,立即释放资源 
80. socket.setKeepAlive(true); 
81. socket.setTrafficClass(0x04|0x10);//高可靠性和最小延迟传输 
82. isNetworkConnect=true; 
83. receiveThread=newThread(newReceiveWorker()); 
84. receiveThread.start(); 
85. SocketConnection.host=host; 
86. SocketConnection.port=port; 
87. if(!isLaunchHeartcheck) 
88. launchHeartcheck(); 
89. } 
90. } 
91. /** 
92. *心跳包检测 
93. */ 
94. privatevoidlaunchHeartcheck(){ 
95. if(socket==null) 
96. thrownewIllegalStateException("socketisnot
established!");
97. heartTimer=newTimer(); 
98. isLaunchHeartcheck=true; 
99. heartTimer.schedule(newTimerTask(){ 
100. publicvoidrun(){ 
101. StringmsgStreamNo=StreamNoGenerator.getStreamNo("kq"); 
102. intmstType=9999;//999-心跳包请求 
103. SimpleDateFormatdateformate=newSimpleDateFormat
("yyyyMMddHHmmss");
104. StringmsgDateTime=dateformate.format(newDate()); 
105. intmsgLength=38;//消息头长度 
106. Stringcommandstr="00"+msgLength+mstType+msgStreamNo; 
107. log.info("心跳检测包->IVR"+commandstr); 
108. intreconnCounter=1; 
109. while(true){ 
110. StringresponseMsg=null; 
111. try{ 
112. responseMsg=readReqMsg(commandstr); 
113. }catch(IOExceptione){ 
114. log.error("IO流异常",e); 
115. reconnCounter++; 
116. } 
117. if(responseMsg!=null){ 
118. log.info("心跳响应包<-IVR"+responseMsg); 
119. reconnCounter=1; 
120. break; 
121. }else{ 
122. reconnCounter++; 
123. } 
124. if(reconnCounter>3){//重连次数已达三次,判定网络连接中断,
重新建立连接。连接未被建立时不释放锁
125. reConnectToCTCC();break; 
126. } 
127. } 
128. } 
129. },1000*60*1,1000*60*2); 
130. } 
131. /** 
132. *重连与目标IP建立重连 
133. */ 
134. privatevoidreConnectToCTCC(){ 
135. newThread(newRunnable(){ 
136. publicvoidrun(){ 
137. log.info("重新建立与"+host+":"+port+"的连接"); 
138. //清理工作,中断计时器,中断接收线程,恢复初始变量 
139. heartTimer.cancel(); 
140. isLaunchHeartcheck=false; 
141. isNetworkConnect=false; 
142. receiveThread.interrupt(); 
143. try{ 
144. socket.close(); 
145. }catch(IOExceptione1){log.error("重连时,关闭socket
接发生IO流异常",e1);}
146. //---------------- 
147. synchronized(this){ 
148. for(;;){ 
149. try{ 
150. Thread.currentThread(); 
151. Thread.sleep(1000*1); 
152. init(host,port); 
153. this.notifyAll(); 
154. break; 
155. }catch(IOExceptione){ 
156. log.error("重新建立连接未成功",e); 
157. }catch(InterruptedExceptione){ 
158. log.error("重连线程中断",e); 
159. } 
160. } 
161. } 
162. } 
163. }).start(); 
164. } 
165. /** 
166. *发送命令并接受响应 
167. *@paramrequestMsg 
168. *@return 
169. *@throwsSocketTimeoutException 
170. *@throwsIOException 
171. */ 
172. publicStringreadReqMsg(StringrequestMsg)throwsIOException{ 
173. if(requestMsg==null){ 
174. returnnull; 
175. } 
176. if(!isNetworkConnect){ 
177. synchronized(this){ 
178. try{ 
179. this.wait(1000*5);//等待5秒,如果网络还没有恢复,抛出IO流异常 
180. if(!isNetworkConnect){ 
181. thrownewIOException("网络连接中断!"); 
182. } 
183. }catch(InterruptedExceptione){ 
184. log.error("发送线程中断",e); 
185. } 
186. } 
187. } 
188. StringmsgNo=requestMsg.substring(8,8+24);//读取流水号 
189. outStream=socket.getOutputStream(); 
190. outStream.write(requestMsg.getBytes()); 
191. outStream.flush(); 
192. Conditionmsglock=lock.newCondition();//消息锁 
193. //注册等待接收消息 
194. recMsgMap.put(msgNo,msglock); 
195. try{ 
196. lock.lock(); 
197. msglock.await(timeout,TimeUnit.MILLISECONDS); 
198. }catch(InterruptedExceptione){ 
199. log.error("发送线程中断",e); 
200. }finally{ 
201. lock.unlock(); 
202. } 
203. ObjectrespMsg=recMsgMap.remove(msgNo);//响应信息 
204. if(respMsg!=null&&(respMsg!=msglock)){ 
205. //已经接收到消息,注销等待,成功返回消息 
206. return(String)respMsg; 
207. }else{ 
208. log.error(msgNo+"超时,未收到响应消息"); 
209. thrownewSocketTimeoutException(msgNo+"超时,未收到响应消息"); 
210. } 
211. } 
212. publicvoidfinalize(){ 
213. if(socket!=null){ 
214. try{ 
215. socket.close(); 
216. }catch(IOExceptione){ 
217. e.printStackTrace(); 
218. } 
219. } 
220. } 
221. //消息接收线程 
222. privateclassReceiveWorkerimplementsRunnable{ 
223. StringintStr=null; 
224. publicvoidrun(){ 
225. while(!Thread.interrupted()){ 
226. try{ 
227. byte[]headBytes=newbyte[4]; 
228. if(inStream.read(headBytes)==-1){ 
229. log.warn("读到流未尾,对方已关闭流!"); 
230. reConnectToCTCC();//读到流未尾,对方已关闭流 
231. return; 
232. } 
233. byte[]tmp=newbyte[4]; 
234. tmp=headBytes; 
235. StringtempStr=newString(tmp).trim(); 
236. if(tempStr==null||tempStr.equals("")){ 
237. log.error("receivedmessageisnull"); 
238. continue; 
239. } 
240. intStr=newString(tmp); 
241. inttotalLength=Integer.parseInt(intStr); 
242. //---------------- 
243. byte[]msgBytes=newbyte[totalLength-4]; 
244. inStream.read(msgBytes); 
245. StringresultMsg=newString(headBytes)+new
String(msgBytes);
246. //抽出消息ID 
247. StringmsgNo=resultMsg.substring(8,8+24); 
248. Conditionmsglock=(Condition)recMsgMap.get(msgNo); 
249. if(msglock==null){ 
250. log.warn(msgNo+"序号可能已被注销!响应消息丢弃"); 
251. recMsgMap.remove(msgNo); 
252. continue; 
253. } 
254. recMsgMap.put(msgNo,resultMsg); 
255. try{ 
256. lock.lock(); 
257. msglock.signalAll(); 
258. }finally{ 
259. lock.unlock(); 
260. } 
261. }catch(SocketExceptione){ 
262. log.error("服务端关闭socket",e); 
263. reConnectToCTCC(); 
264. }catch(IOExceptione){ 
265. log.error("接收线程读取响应数据时发生IO流异常",e); 
266. }catch(NumberFormatExceptione){ 
267. log.error("收到没良心包,Stringint异常,异常字符:"+intStr); 
268. } 
269. } 
270. } 
271. } 
272. }
以上就是对Java Socket通信技术中收发线程互斥的详细解决方法。希望大家有所领悟。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics