JAVA Netty实现聊天室+私聊功能的示例代码

 更新时间:2020年8月20日 10:09  点击:1546

功能介绍

使用Netty框架实现聊天室功能,服务器可监控客户端上下限状态,消息转发。同时实现了点对点私聊功能。技术点我都在代码中做了备注,这里不再重复写了。希望能给想学习netty的同学一点参考。

服务器代码

服务器入口代码

package nio.test.netty.groupChat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

/**
 * netty群聊 服务器端
 * @author zhang
 *
 */
public class NettyChatServer {
	private int port;
	
	public NettyChatServer(int port){
		this.port = port;
	}
	
	//初始化 netty服务器
	private void init() throws Exception{
		EventLoopGroup boss = new NioEventLoopGroup(1);
		EventLoopGroup work = new NioEventLoopGroup(16);
		try {
			ServerBootstrap boot = new ServerBootstrap();
			boot.group(boss,work);
			boot.channel(NioServerSocketChannel.class);//设置boss selector建立channel使用的对象
			boot.option(ChannelOption.SO_BACKLOG, 128);//boss 等待连接的 队列长度
			boot.childOption(ChannelOption.SO_KEEPALIVE, true); //让客户端保持长期活动状态
			boot.childHandler(new ChannelInitializer<SocketChannel>() {

				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					//从channel中获取pipeline 并往里边添加Handler
					ChannelPipeline pipeline = ch.pipeline();
					pipeline.addLast("encoder",new StringEncoder());
					pipeline.addLast("decoder",new StringDecoder());
					pipeline.addLast(new ServerMessageHandler());//自定义Handler来处理消息
				}
			});
			System.out.println("服务器开始启动...");
			//绑定端口 
			ChannelFuture channelFuture = boot.bind(port).sync();
			channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {

				@Override
				public void operationComplete(Future<? super Void> future)
						throws Exception {
					if(future.isSuccess()){
						System.out.println("服务器正在启动...");
					}
					if(future.isDone()){
						System.out.println("服务器启动成功...OK");
					}
					
				}
			});
			//监听channel关闭
			channelFuture.channel().closeFuture().sync();
			channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {

				@Override
				public void operationComplete(Future<? super Void> future)
						throws Exception {
					if(future.isCancelled()){
						System.out.println("服务器正在关闭..");
					}
					if(future.isCancellable()){
						System.out.println("服务器已经关闭..OK");
					}
					
				}
			});
			
		}finally{
			boss.shutdownGracefully();
			work.shutdownGracefully();
		}
	}
	/**
	 * 启动服务器 main 函数
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		new NettyChatServer(9090).init();

	}

}

服务器端消息处理Handler

package nio.test.netty.groupChat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
 * 自定义 服务器端消息处理Handler
 * @author zhang
 *
 */
public class ServerMessageHandler extends SimpleChannelInboundHandler<String>{
	/**
	 * 管理全局的channel
	 * GlobalEventExecutor.INSTANCE 全局事件监听器
	 * 一旦将channel 加入 ChannelGroup 就不要用手动去
	 * 管理channel的连接失效后移除操作,他会自己移除
	 */
	private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
	/**
	 * 为了实现私聊功能,这里key存储用户的唯一标识,
	 * 我保存 客户端的端口号
	 * 当然 这个集合也需要自己去维护 用户的上下线 不能像 ChannelGroup那样自己去维护
	 */
	private static Map<String,Channel> all = new HashMap<String,Channel>();
	
	private SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	/**
	 * 处理收到的消息
	 */
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, String msg)
			throws Exception {
		Channel channel = ctx.channel();
		/**
		 * 这里简单判断 如果内容里边包含#那么就是私聊
		 */
		if(msg.contains("#")){
			String id = msg.split("#")[0];
			String body = msg.split("#")[1];
			Channel userChannel = all.get(id);
			String key = channel.remoteAddress().toString().split(":")[1];
			userChannel.writeAndFlush(sf.format(new Date())+"\n 【用户】 "+key+" 说 : "+body);
			return;
		}
		
		//判断当前消息是不是自己发送的
		for(Channel c : channels){
			String addr = c.remoteAddress().toString();
			if(channel !=c){
				c.writeAndFlush(sf.format(new Date())+"\n 【用户】 "+addr+" 说 : "+msg);
			}else{
				c.writeAndFlush(sf.format(new Date())+"\n 【自己】 "+addr+" 说 : "+msg);
			}
		}
		
	}
	/**
	 * 建立连接以后第一个调用的方法
	 */
	@Override
	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
		Channel channel = ctx.channel();
		String addr = channel.remoteAddress().toString();
		/**
		 * 这里 ChannelGroup 底层封装会遍历给所有的channel发送消息
		 * 
		 */
		channels.writeAndFlush(sf.format(new Date())+"\n 【用户】 "+addr+" 加入聊天室 ");
		channels.add(channel);
		String key = channel.remoteAddress().toString().split(":")[1];
		all.put(key, channel);
	}
	/**
	 * channel连接状态就绪以后调用
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		String addr = ctx.channel().remoteAddress().toString();
		System.out.println(sf.format(new Date())+" \n【用户】 "+addr+" 上线 ");
	}
	/**
	 * channel连接状态断开后触发
	 */
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		String addr = ctx.channel().remoteAddress().toString();
		System.out.println(sf.format(new Date())+" \n【用户】 "+addr+" 下线 ");
		//下线移除
		String key = ctx.channel().remoteAddress().toString().split(":")[1];
		all.remove(key);
	}
	/**
	 * 连接发生异常时触发
	 */
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		//System.out.println("连接发生异常!");
		ctx.close();
	}
	/**
	 * 断开连接会触发该消息
	 * 同时当前channel 也会自动从ChannelGroup中被移除
	 */
	@Override
	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
		Channel channel = ctx.channel();
		String addr = channel.remoteAddress().toString();
		/**
		 * 这里 ChannelGroup 底层封装会遍历给所有的channel发送消息
		 * 
		 */
		channels.writeAndFlush(sf.format(new Date())+"\n 【用户】 "+addr+" 离开了 ");
		//打印 ChannelGroup中的人数
		System.out.println("当前在线人数是:"+channels.size());
		System.out.println("all:"+all.size());
	}

	
}

客户端主方法代码

package nio.test.netty.groupChat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import java.util.Scanner;

public class NettyChatClient {

	private String ip;
	
	private int port;
	
	public NettyChatClient(String ip,int port){
		this.ip = ip;
		this.port = port;
	}
	/**
	 * 初始化客户
	 */
	private void init() throws Exception{
		//创建监听事件的监听器
		EventLoopGroup work = new NioEventLoopGroup();
		try {
			Bootstrap boot = new Bootstrap();
			boot.group(work);
			boot.channel(NioSocketChannel.class);
			boot.handler(new ChannelInitializer<NioSocketChannel>() {

				@Override
				protected void initChannel(NioSocketChannel ch)
						throws Exception {
					ChannelPipeline pipeline = ch.pipeline();
					pipeline.addLast("encoder",new StringEncoder());
					pipeline.addLast("decoder",new StringDecoder());
					pipeline.addLast(new ClientMessageHandler());
					
				}
			});
			
			ChannelFuture channelFuture = boot.connect(ip, port).sync();
			channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {

				@Override
				public void operationComplete(Future<? super Void> future)
						throws Exception {
					if(future.isSuccess()){
						System.out.println("客户端启动中...");
					}
					if(future.isDone()){
						System.out.println("客户端启动成功...OK!");
					}
				}
			});
			System.out.println(channelFuture.channel().localAddress().toString());
			System.out.println("#################################################");
			System.out.println("~~~~~~~~~~~~~~端口号#消息内容~~这样可以给单独一个用户发消息~~~~~~~~~~~~~~~~~~");
			System.out.println("#################################################");
			
			/**
			 * 这里用控制台输入数据
			 */
			Channel channel = channelFuture.channel();
			//获取channel
			Scanner scanner = new Scanner(System.in);
			while(scanner.hasNextLine()){
				String str = scanner.nextLine();
				channel.writeAndFlush(str+"\n");
			}
			channelFuture.channel().closeFuture().sync();
			scanner.close();
		} finally {
			work.shutdownGracefully();
		}
	}
	
	/**
	 * 主方法入口
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception{

		new NettyChatClient("127.0.0.1",9090).init();
	}

}

客户端消息处理Handler

package nio.test.netty.groupChat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
 * 客户点消息处理 Handler
 * @author zhang
 *
 */
public class ClientMessageHandler extends SimpleChannelInboundHandler<String> {

	/**
	 * 处理收到的消息
	 */
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, String msg)
			throws Exception {
		System.out.println(msg);
		
	}
	/**
	 * 连接异常后触发
	 */
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		ctx.close();
		
	}
}

测试结果

启动了四个客户端 服务器端日志效果如下:


客户端一端日志:


客户端二日志:


客户端三日志:


客户端四日志:


现在在客户端四发送消息:


每个客户端都可以收到消息:




软化关闭客户端客户端三:

服务器日志:


其他客户端日志:




发送私聊消息:


这个客户端收不到消息


到此这篇关于JAVA Netty实现聊天室+私聊功能的示例代码的文章就介绍到这了,更多相关JAVA Netty聊天室内容请搜索猪先飞以前的文章或继续浏览下面的相关文章希望大家以后多多支持猪先飞!

[!--infotagslink--]

相关文章