Java实现心跳机制的方法

 更新时间:2020年7月9日 22:58  点击:1808

一、心跳机制简介

     在分布式系统中,分布在不同主机上的节点需要检测其他节点的状态,如服务器节点需要检测从节点是否失效。为了检测对方节点的有效性,每隔固定时间就发送一个固定信息给对方,对方回复一个固定信息,如果长时间没有收到对方的回复,则断开与对方的连接。

     发包方既可以是服务端,也可以是客户端,这要看具体实现。因为是每隔固定时间发送一次,类似心跳,所以发送的固定信息称为心跳包。心跳包一般为比较小的包,可根据具体实现。心跳包主要应用于长连接的保持与短线链接。

      一般而言,应该客户端主动向服务器发送心跳包,因为服务器向客户端发送心跳包会影响服务器的性能。

二、心跳机制实现方式

    心跳机制有两种实现方式,一种基于TCP自带的心跳包,TCP的SO_KEEPALIVE选项可以,系统默认的默认跳帧频率为2小时,超过2小时后,本地的TCP 实现会发送一个数据包给远程的 Socket. 如果远程Socket 没有发回响应, TCP实现就会持续尝试 11 分钟, 直到接收到响应为止。 否则就会自动断开Socket连接。但TCP自带的心跳包无法检测比较敏感地知道对方的状态,默认2小时的空闲时间,对于大多数的应用而言太长了。可以手工开启KeepAlive功能并设置合理的KeepAlive参数。

    另一种在应用层自己进行实现,基本步骤如下:

Client使用定时器,不断发送心跳;
Server收到心跳后,回复一个包;
Server为每个Client启动超时定时器,如果在指定时间内没有收到Client的心跳包,则Client失效。

三、Java实现心跳机制

    这里基于Java实现的简单RPC框架实现心跳机制。Java实现代码如下所示:

    心跳客户端类:

public class HeartbeatClient implements Runnable {
 
 private String serverIP = "127.0.0.1";
 private int serverPort = 8089;
 private String nodeID = UUID.randomUUID().toString();
 private boolean isRunning = true;
 // 最近的心跳时间
 private long lastHeartbeat;
 // 心跳间隔时间
 private long heartBeatInterval = 10 * 1000;
 
 public void run() {
 try {
  while (isRunning) {
  HeartbeatHandler handler = RPClient.getRemoteProxyObj(HeartbeatHandler.class, new InetSocketAddress(serverIP, serverPort));
  long startTime = System.currentTimeMillis();
  // 是否达到发送心跳的周期时间
  if (startTime - lastHeartbeat > heartBeatInterval) {
   System.out.println("send a heart beat");
   lastHeartbeat = startTime;
 
   HeartbeatEntity entity = new HeartbeatEntity();
   entity.setTime(startTime);
   entity.setNodeID(nodeID);
 
   // 向服务器发送心跳,并返回需要执行的命令
   Cmder cmds = handler.sendHeartBeat(entity);
 
   if (!processCommand(cmds))
   continue;
  }
  }
 } catch (Exception e) {
  e.printStackTrace();
 }
 }
 
 private boolean processCommand(Cmder cmds) {
 // ...
 return true;
 }
 
}

      心跳包实体类:

public class HeartbeatEntity implements Serializable {
 
 private long time;
 private String nodeID;
 private String error;
 private Map<String, Object> info = new HashMap<String, Object>();
 
 public String getNodeID() {
 return nodeID;
 }
 
 public void setNodeID(String nodeID) {
 this.nodeID = nodeID;
 }
 
 public String getError() {
 return error;
 }
 
 public void setError(String error) {
 this.error = error;
 }
 
 public Map<String, Object> getInfo() {
 return info;
 }
 
 public void setInfo(Map<String, Object> info) {
 this.info = info;
 }
 
 public long getTime() {
 return time;
 }
 
 public void setTime(long time) {
 this.time = time;
 }
}

  服务器接受心跳包返回的命令对象类:

public class Cmder implements Serializable {
 
 private String nodeID;
 private String error;
 private Map<String, Object> info = new HashMap<String, Object>();
 
 public String getNodeID() {
 return nodeID;
 }
 
 public void setNodeID(String nodeID) {
 this.nodeID = nodeID;
 }
 
 public String getError() {
 return error;
 }
 
 public void setError(String error) {
 this.error = error;
 }
 
 public Map<String, Object> getInfo() {
 return info;
 }
 
 public void setInfo(Map<String, Object> info) {
 this.info = info;
 }
}

  RPC服务注册中心:

public class ServiceCenter {
 
 private ExecutorService executor = Executors.newFixedThreadPool(20);
 
 private final ConcurrentHashMap<String, Class> serviceRegistry = new ConcurrentHashMap<String, Class>();
 
 private AtomicBoolean isRunning = new AtomicBoolean(true);
 
 // 服务器监听端口
 private int port = 8089;
 
 // 心跳监听器
 HeartbeatLinstener linstener;
 
 // 单例模式
 private static class SingleHolder {
 private static final ServiceCenter INSTANCE = new ServiceCenter();
 }
 
 private ServiceCenter() {
 }
 
 public static ServiceCenter getInstance() {
 return SingleHolder.INSTANCE;
 }
 
 public void register(Class serviceInterface, Class impl) {
 System.out.println("regeist service " + serviceInterface.getName());
 serviceRegistry.put(serviceInterface.getName(), impl);
 }
 
 public void start() throws IOException {
 ServerSocket server = new ServerSocket();
 server.bind(new InetSocketAddress(port));
 System.out.println("start server");
 linstener = HeartbeatLinstener.getInstance();
 System.out.println("start listen heart beat");
 try {
  while (true) {
  // 1.监听客户端的TCP连接,接到TCP连接后将其封装成task,由线程池执行
  executor.execute(new ServiceTask(server.accept()));
  }
 } finally {
  server.close();
 }
 }
 
 public void stop() {
 isRunning.set(false);
 executor.shutdown();
 }
 
 
 public boolean isRunning() {
 return isRunning.get();
 }
 
 public int getPort() {
 return port;
 }
 
 public void settPort(int port) {
 this.port = port;
 }
 
 public ConcurrentHashMap<String, Class> getServiceRegistry() {
 return serviceRegistry;
 }
 
 private class ServiceTask implements Runnable {
 Socket clent = null;
 
 public ServiceTask(Socket client) {
  this.clent = client;
 }
 
 public void run() {
  ObjectInputStream input = null;
  ObjectOutputStream output = null;
  try {
  // 2.将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果
  input = new ObjectInputStream(clent.getInputStream());
  String serviceName = input.readUTF();
  String methodName = input.readUTF();
  Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
  Object[] arguments = (Object[]) input.readObject();
  Class serviceClass = serviceRegistry.get(serviceName);
  if (serviceClass == null) {
   throw new ClassNotFoundException(serviceName + " not found");
  }
  Method method = serviceClass.getMethod(methodName, parameterTypes);
  Object result = method.invoke(serviceClass.newInstance(), arguments);
 
  // 3.将执行结果反序列化,通过socket发送给客户端
  output = new ObjectOutputStream(clent.getOutputStream());
  output.writeObject(result);
  } catch (Exception e) {
  e.printStackTrace();
  } finally {
  if (output != null) {
   try {
   output.close();
   } catch (IOException e) {
   e.printStackTrace();
   }
  }
  if (input != null) {
   try {
   input.close();
   } catch (IOException e) {
   e.printStackTrace();
   }
  }
  if (clent != null) {
   try {
   clent.close();
   } catch (IOException e) {
   e.printStackTrace();
   }
  }
  }
 
 }
 }
}

  心跳监听类:

package com.cang.heartbeat;
 
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
 
/**
 * 心跳监听保存信息
 *
 * @author cang
 * @create_time 2016-09-28 11:40
 */
public class HeartbeatLinstener {
 
 private ExecutorService executor = Executors.newFixedThreadPool(20);
 
 private final ConcurrentHashMap<String, Object> nodes = new ConcurrentHashMap<String, Object>();
 private final ConcurrentHashMap<String, Long> nodeStatus = new ConcurrentHashMap<String, Long>();
 
 private long timeout = 10 * 1000;
 
 // 服务器监听端口
 private int port = 8089;
 
 // 单例模式
 private static class SingleHolder {
 private static final HeartbeatLinstener INSTANCE = new HeartbeatLinstener();
 }
 
 private HeartbeatLinstener() {
 }
 
 public static HeartbeatLinstener getInstance() {
 return SingleHolder.INSTANCE;
 }
 
 public ConcurrentHashMap<String, Object> getNodes() {
 return nodes;
 }
 
 public void registerNode(String nodeId, Object nodeInfo) {
 nodes.put(nodeId, nodeInfo);
 nodeStatus.put(nodeId, System.currentTimeMillis());
 }
 
 public void removeNode(String nodeID) {
 if (nodes.containsKey(nodeID)) {
  nodes.remove(nodeID);
 }
 }
 
 // 检测节点是否有效
 public boolean checkNodeValid(String key) {
 if (!nodes.containsKey(key) || !nodeStatus.containsKey(key)) return false;
 if ((System.currentTimeMillis() - nodeStatus.get(key)) > timeout) return false;
 return true;
 }
 
 // 删除所有失效节点
 public void removeInValidNode() {
 Iterator<Map.Entry<String, Long>> it = nodeStatus.entrySet().iterator();
 while (it.hasNext()) {
  Map.Entry<String, Long> e = it.next();
  if ((System.currentTimeMillis() - nodeStatus.get(e.getKey())) > timeout) {
  nodes.remove(e.getKey());
  }
 }
 }
 
}

  心跳处理类接口:

public interface HeartbeatHandler {
 public Cmder sendHeartBeat(HeartbeatEntity info);
}

      心跳处理实现类:

public class HeartbeatHandlerImpl implements HeartbeatHandler {
 public Cmder sendHeartBeat(HeartbeatEntity info) {
 HeartbeatLinstener linstener = HeartbeatLinstener.getInstance();
 
 // 添加节点
 if (!linstener.checkNodeValid(info.getNodeID())) {
  linstener.registerNode(info.getNodeID(), info);
 }
 
 // 其他操作
 Cmder cmder = new Cmder();
 cmder.setNodeID(info.getNodeID());
 // ...
 
 System.out.println("current all the nodes: ");
 Map<String, Object> nodes = linstener.getNodes();
 for (Map.Entry e : nodes.entrySet()) {
  System.out.println(e.getKey() + " : " + e.getValue());
 }
 System.out.println("hadle a heartbeat");
 return cmder;
 }
}

  测试类:

public class HeartbeatTest {
 
 public static void main(String[] args) {
 new Thread(new Runnable() {
  public void run() {
  try {
   ServiceCenter serviceServer = ServiceCenter.getInstance();
   serviceServer.register(HeartbeatHandler.class, HeartbeatHandlerImpl.class);
   serviceServer.start();
  } catch (IOException e) {
   e.printStackTrace();
  }
  }
 }).start();
 Thread client1 = new Thread(new HeartbeatClient());
 client1.start();
 Thread client2 = new Thread(new HeartbeatClient());
 client2.start();
 }
}

四、总结

    上面的代码还有很多不足的地方,希望有空能进行改善:

  •  配置为硬编码;
  •  命令类Cmder没有实际实现,返回的Cmder对象没有实际进行处理;

其他小问题就暂时不管了,希望以后能重写上面的代码。

以上就是Java实现心跳机制的方法的详细内容,更多关于Java实现心跳机制的资料请关注猪先飞其它相关文章!

[!--infotagslink--]

相关文章

  • Java实现经典游戏复杂迷宫

    这篇文章主要介绍了如何利用java语言实现经典《复杂迷宫》游戏,文中采用了swing技术进行了界面化处理,感兴趣的小伙伴可以动手试一试...2022-02-01
  • java 运行报错has been compiled by a more recent version of the Java Runtime

    java 运行报错has been compiled by a more recent version of the Java Runtime (class file version 54.0)...2021-04-01
  • 在java中获取List集合中最大的日期时间操作

    这篇文章主要介绍了在java中获取List集合中最大的日期时间操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-08-15
  • php语言实现redis的客户端

    php语言实现redis的客户端与服务端有一些区别了因为前面介绍过服务端了这里我们来介绍客户端吧,希望文章对各位有帮助。 为了更好的了解redis协议,我们用php来实现...2016-11-25
  • jQuery+jRange实现滑动选取数值范围特效

    有时我们在页面上需要选择数值范围,如购物时选取价格区间,购买主机时自主选取CPU,内存大小配置等,使用直观的滑块条直接选取想要的数值大小即可,无需手动输入数值,操作简单又方便。HTML首先载入jQuery库文件以及jRange相关...2015-03-15
  • 教你怎么用Java获取国家法定节假日

    这篇文章主要介绍了教你怎么用Java获取国家法定节假日,文中有非常详细的代码示例,对正在学习java的小伙伴们有非常好的帮助,需要的朋友可以参考下...2021-04-23
  • Java如何发起http请求的实现(GET/POST)

    这篇文章主要介绍了Java如何发起http请求的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-03-31
  • 浅谈Java与C#的一些细微差别

    说起C#和Java这两门语言(语法,数据类型 等),个人以为,大概有90%以上的相似,甚至可以认为几乎一样。但是在工作中,我也发现了一些细微的差别...2020-06-25
  • JS实现的简洁纵向滑动菜单(滑动门)效果

    本文实例讲述了JS实现的简洁纵向滑动菜单(滑动门)效果。分享给大家供大家参考,具体如下:这是一款纵向布局的CSS+JavaScript滑动门代码,相当简洁的手法来实现,如果对颜色不满意,你可以试着自己修改CSS代码,这个滑动门将每一...2015-10-21
  • 解决Java处理HTTP请求超时的问题

    这篇文章主要介绍了解决Java处理HTTP请求超时的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-03-29
  • java 判断两个时间段是否重叠的案例

    这篇文章主要介绍了java 判断两个时间段是否重叠的案例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-08-15
  • jQuery+slidereveal实现的面板滑动侧边展出效果

    我们借助一款jQuery插件:slidereveal.js,可以使用它控制面板左右侧滑出与隐藏等效果,项目地址:https://github.com/nnattawat/slideReveal。如何使用首先在页面中加载jquery库文件和slidereveal.js插件。复制代码 代码如...2015-03-15
  • java 画pdf用itext调整表格宽度、自定义各个列宽的方法

    这篇文章主要介绍了java 画pdf用itext调整表格宽度、自定义各个列宽的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-01-31
  • 超简洁java实现双色球若干注随机号码生成(实例代码)

    这篇文章主要介绍了超简洁java实现双色球若干注随机号码生成(实例代码),本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2021-04-02
  • PHP+jQuery翻板抽奖功能实现

    翻板抽奖的实现流程:前端页面提供6个方块,用数字1-6依次表示6个不同的方块,当抽奖者点击6个方块中的某一块时,方块翻转到背面,显示抽奖中奖信息。看似简单的一个操作过程,却包含着WEB技术的很多知识面,所以本文的读者应该熟...2015-10-21
  • Java生成随机姓名、性别和年龄的实现示例

    这篇文章主要介绍了Java生成随机姓名、性别和年龄的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-10-01
  • SQLMAP结合Meterpreter实现注入渗透返回shell

    sqlmap 是一个自动SQL 射入工具。它是可胜任执行一个广泛的数据库管理系统后端指印, 检索遥远的DBMS 数据库等,下面我们来看一个学习例子。 自己搭建一个PHP+MYSQ...2016-11-25
  • java正则表达式判断前端参数修改表中另一个字段的值

    这篇文章主要介绍了java正则表达式判断前端参数修改表中另一个字段的值,需要的朋友可以参考下...2021-05-07
  • Java使用ScriptEngine动态执行代码(附Java几种动态执行代码比较)

    这篇文章主要介绍了Java使用ScriptEngine动态执行代码,并且分享Java几种动态执行代码比较,需要的朋友可以参考下...2021-04-15
  • Java开发实现人机猜拳游戏

    这篇文章主要介绍了Java开发实现人机猜拳游戏,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2020-08-03