socket客户端监控案例

java socket 监控案例包含

1.客户端
 1.1 客户端心跳线程
 1.2 客户端消息发送线程
 1.3 客户端消息接收线程 
 1.4 客户端启动,关闭 方法
 1.5 接收处理器接口(心跳接收,消息接收)


2.服务端
    2.1 服务端链接看门口线程
    2.2 服务端接收线程
    2.3 服务端启动,关闭方法
    2.4 接受处理器接口(心跳接收,消息接收)

3.心跳包
    3.1 包含的时间戳和id字段

4.普通消息
    4.4 消息正文,消息类型

5.异常处理
    5.1套接字异常的处理办法


服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151

package client;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;

/**
* Created by sheng on 17/12/4.
*/
public class Server {

/**
* 要处理客户端发来的对象,并返回一个对象,可实现该接口。
*/
public interface ObjectAction{
Object doAction(Object rev, Server server);
}

public final class DefaultObjectAction implements ObjectAction{
@Override
public Object doAction(Object rev,Server server) {
KeepAlive keepAlive=(KeepAlive)rev;
System.out.println("接收数据,心跳包处理:\t"+keepAlive.toString());
//心跳包解析
return keepAlive;
}
}


public class MsgObjectAction implements ObjectAction{
@Override
public Object doAction(Object obj, Server server) {
Message message=(Message)obj;
System.out.println("接收数据,消息包处理:\t"+message.toString());
// TODO 消息解析
return message;
}
}
public static void main(String[] args) {
int port = 65432;
Server server = new Server(port);
server.start();
}

private int port;
//线程运行状态控制
private volatile boolean running=false;
//接收超时时间
private long receiveTimeDelay=3000;
private ConcurrentHashMap<Class<Object>, ObjectAction> actionMapping = new ConcurrentHashMap<Class<Object>,ObjectAction>();
private Thread connWatchDog;
public Server(int port) {
this.port = port;
}

public void start(){
if(running){return;}
running=true;
connWatchDog = new Thread(new ConnWatchDog());
connWatchDog.start();
}

public void stop(){
if(running){running=false;}
if(connWatchDog!=null){connWatchDog.stop();}
}

public void addActionMap(Class<Object> cls,ObjectAction action){
actionMapping.put(cls, action);
}

class ConnWatchDog implements Runnable{
@Override
public void run(){
try {
System.out.println("server initial....");
ServerSocket ss = new ServerSocket(port,5);
while(running){
Socket s = ss.accept();
System.out.println("server accept one client ....");
new Thread(new ReceiveWatchDogs(s)).start();
}
} catch (IOException e) {
e.printStackTrace();
Server.this.stop();
}

}
}

class ReceiveWatchDogs implements Runnable{
Socket s;
boolean run=true;
long lastReceiveTime = System.currentTimeMillis();
KeepAlive lastKeepAlive;
public ReceiveWatchDogs(Socket s) {
this.s = s;
}
@Override
public void run() {
while(running && run){
if(System.currentTimeMillis()-lastReceiveTime>receiveTimeDelay){
overThis(lastKeepAlive);
}else{
try {
InputStream in = s.getInputStream();
if(in.available()>0){
ObjectInputStream ois = new ObjectInputStream(in);
Object obj = ois.readObject();
lastReceiveTime = System.currentTimeMillis();
System.out.println("server receive... :\t"+obj);
if(obj instanceof Message){
addActionMap((Class<Object>) obj.getClass(),new MsgObjectAction());
}else if(obj instanceof KeepAlive){
this.lastKeepAlive=(KeepAlive) obj;
addActionMap((Class<Object>) obj.getClass(),new DefaultObjectAction());
}

ObjectAction oa = actionMapping.get(obj.getClass());
oa = oa==null?new DefaultObjectAction():oa;
Object out = oa.doAction(obj,Server.this);
}else{
Thread.sleep(10);
}
} catch (Exception e) {
e.printStackTrace();
overThis(this.lastKeepAlive);
}
}
}
}

private void overThis(KeepAlive keepAlive) {
if(run){run=false;}
if(s!=null){
try {
s.close();
} catch (IOException e) {
e.printStackTrace();
}
}System.out.println("client "+keepAlive.getId()+" 离线");
System.out.println("client connect over time...."+s.getRemoteSocketAddress());
}

}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package client;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.net.UnknownHostException;
import java.util.Scanner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RunnableFuture;

/**
* Created by sheng on 17/12/13.
* 客户端可以发送心跳包/消息包,在 keepAliveDelay毫秒内未发送任何数据,则自动发送一个KeepAlive Object(心跳)给服务器.
*
*/
public class Client {
//sendObject 发送

//Message Object 消息

//keepAliveDelay 延时

//KeepAlive Object

//addActionMap


/**
* 处理服务端发回的对象,可实现该接口。
*/
public interface ObjectAction{
void doAction(Object obj,Client client);
}

public static final class DefaultObjectAction implements ObjectAction{
@Override
public void doAction(Object obj,Client client) {
KeepAlive alive=(KeepAlive) obj;
System.out.println("接收数据,心跳包处理:\t"+alive.toString());
}
}

public static class MsgObjectAction implements ObjectAction{
@Override
public void doAction(Object obj, Client client) {
Message message=(Message) obj;
System.out.println("接收数据,消息包处理:\t"+message.toString());
}
}

public static void main(String[] args) throws UnknownHostException, IOException {
String serverIp = "127.0.0.1";
int port = 65432;
Client client = new Client(serverIp,port);
KeepAlive keepAlive=new KeepAlive("2","2","2");
client.start(keepAlive);
}

private String serverIp;
private int port;
private Socket socket;
//连接状态
private boolean running=false;
//最后一次发送数据的时间
private long lastSendTime;

//用于保存接收消息对象类型及该类型消息处理的对象
private ConcurrentHashMap<Class, ObjectAction> actionMapping = new ConcurrentHashMap<Class,ObjectAction>();

public Client(String serverIp, int port) {
this.serverIp=serverIp;
this.port=port;
}

public void start(KeepAlive keepAlive) throws UnknownHostException, IOException {
if(running)return;
socket = new Socket(serverIp,port);
System.out.println("本地端口:"+socket.getLocalPort());
lastSendTime=System.currentTimeMillis();
running=true;
//保持长连接的线程,每隔2秒项服务器发一个一个保持连接的心跳消息
new Thread(new KeepAliveWatchDog(keepAlive)).start();
//接受消息的线程,处理消息
new Thread(new ReceiveWatchDog()).start();
//发送消息线程.
new Thread(new SendMsgPacket()).start();
}

public void stop(){
if(running)running=false;
}

/**
* 添加接收对象的处理对象。
* @param cls 待处理的对象,其所属的类。
* @param action 处理过程对象。
*/
public void addActionMap(Class<? extends Object> cls, ObjectAction action){
actionMapping.put(cls, action);
}

public void sendObject(Object obj) throws IOException {
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeObject(obj);

System.out.println("client send..:\t"+obj);
oos.flush();
}

/**
* 心跳包发送
* */
class KeepAliveWatchDog implements Runnable{
KeepAlive keepAlive;
public KeepAliveWatchDog(KeepAlive keepAlive){
this.keepAlive=keepAlive;
}
long checkDelay = 10;
long keepAliveDelay = 2000;
@Override
public void run() {
while(running){
if(System.currentTimeMillis()-lastSendTime>keepAliveDelay){
try {
KeepAlive alive=this.keepAlive;
Client.this.sendObject(alive);
addActionMap(alive.getClass(),new DefaultObjectAction());
} catch (IOException e) {
e.printStackTrace();
Client.this.stop();
}
lastSendTime = System.currentTimeMillis();
}else{
try {
//线程休眠10毫秒
Thread.sleep(checkDelay);
} catch (InterruptedException e) {
e.printStackTrace();
Client.this.stop();
}
}
}
}
}

class SendMsgPacket implements Runnable{
boolean isrunning=true;
@Override
public void run() {
//接收控制台输入
Scanner input=new Scanner(System.in);
while(input.hasNext() && isrunning){
String str = input.next();
if(str.equals("cancel")){
isrunning=false;
Client.this.stop();
}
try {
Message msg = new Message(str, 0);
Client.this.sendObject(msg);
addActionMap(msg.getClass(),new MsgObjectAction());
}catch(IOException e){
e.printStackTrace();
Client.this.stop();
}

}

}
}

class ReceiveWatchDog implements Runnable{
@Override
public void run() {
while(running){
try {
InputStream in = socket.getInputStream();
if(in.available()>0){
ObjectInputStream ois = new ObjectInputStream(in);
Object obj = ois.readObject();
System.out.println("client receive ....:\t"+obj);
ObjectAction oa = actionMapping.get(obj.getClass());
oa = oa==null?new DefaultObjectAction():oa;
oa.doAction(obj, Client.this);
}else{
Thread.sleep(10);
}
} catch (Exception e) {
e.printStackTrace();
Client.this.stop();
}
}
}
}


}

心跳包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package client;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
*
* 维持连接的消息对象(心跳对象)
*/
public class KeepAlive implements Serializable{
private String id;
private String ip;
private String timestamp=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
private String mac;

public KeepAlive(String id,String ip,String mac){
this.id=id;
this.ip=ip;
this.mac=mac;
}
private static final long serialVersionUID = -2813120366138988480L;
public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}

public String getMac() {
return mac;
}

public void setMac(String mac) {
this.mac = mac;
}

public String getTimestamp() {
return timestamp;
}
/**
* 覆盖该方法,仅用于测试使用。
* @see java.lang.Object#toString()
*/
@Override
public String toString()
{
StringBuffer sb=new StringBuffer(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
sb.append("||").append(this.id);
sb.append("||").append(this.ip);
sb.append("||").append(this.mac);
return sb.toString();
}

}

普通消息类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package client;

import java.io.Serializable;

/**
* Created by sheng on 17/12/14.
*/
public class Message implements Serializable {
private static final long serialVersionUID = -2813120366138988480L;
public Message(){}
public Message(String msg,Integer type){
this.msg=msg;
this.type=type;
}
private String msg;
private Integer type;

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}

public Integer getType() {
return type;
}

public void setType(Integer type) {
this.type = type;
}

@Override
public String toString() {
return "Message{" +
"msg='" + msg + '\'' +
", type=" + type +
'}';
}
}

异常处理

socket的异常往往是连接超时引起,如果socket丢失,则操作socket会引起IOException,只需要在异常处理中,将socket释放,将线程的run()结束就行了.

write by shengfq 17/12/13.