.NET Core下使用Kafka的方法步骤

 更新时间:2021年9月22日 09:59  点击:4240

安装

CentOS安装 kafka

Kafka : http://kafka.apache.org/downloads

ZooLeeper : https://zookeeper.apache.org/releases.html

下载并解压

# 下载,并解压
$ wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz
$ tar -zxvf kafka_2.12-2.1.1.tgz
$ mv kafka_2.12-2.1.1.tgz /data/kafka

# 下载 zookeeper,解压
$ wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
$ tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz
$ mv apache-zookeeper-3.5.8-bin /data/zookeeper

启动 ZooKeeper

# 复制配置模版
$ cd /data/kafka/conf
$ cp zoo_sample.cfg zoo.cfg

# 看看配置需不需要改
$ vim zoo.cfg

# 命令
$ ./bin/zkServer.sh start  # 启动
$ ./bin/zkServer.sh status  # 状态
$ ./bin/zkServer.sh stop   # 停止
$ ./bin/zkServer.sh restart # 重启

# 使用客户端测试
$ ./bin/zkCli.sh -server localhost:2181
$ quit

启动 Kafka

# 备份配置
$ cd /data/kafka
$ cp config/server.properties config/server.properties_copy

# 修改配置
$ vim /data/kafka/config/server.properties

# 集群配置下,每个 broker 的 id 是必须不同的
# broker.id=0

# 监听地址设置(内网)
# listeners=PLAINTEXT://ip:9092

# 对外提供服务的IP、端口
# advertised.listeners=PLAINTEXT://106.75.84.97:9092

# 修改每个topic的默认分区参数num.partitions,默认是1,具体合适的取值需要根据服务器配置进程确定,UCloud.ukafka = 3
# num.partitions=3

# zookeeper 配置
# zookeeper.connect=localhost:2181

# 通过配置启动 kafka
$ ./bin/kafka-server-start.sh config/server.properties&

# 状态查看
$ ps -ef|grep kafka
$ jps

docker下安装Kafka

docker pull wurstmeister/zookeeper
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper

docker pull wurstmeister/kafka
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.1.111 --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka

介绍

  • Broker:消息中间件处理节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
  • Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
  • Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。
  • Segment:partition物理上由多个segment组成,下面2.2和2.3有详细说明。
  • offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息。

kafka partition 和 consumer 数目关系

  • 如果consumer比partition多是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 。
  • 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目 。
  • 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
  • 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化 快速开始

在 .NET Core 项目中安装组件

Install-Package Confluent.Kafka

开源地址: https://github.com/confluentinc/confluent-kafka-dotnet

添加 IKafkaService 服务接口

public interface IKafkaService
{
  /// <summary>
  /// 发送消息至指定主题
  /// </summary>
  /// <typeparam name="TMessage"></typeparam>
  /// <param name="topicName"></param>
  /// <param name="message"></param>
  /// <returns></returns>
  Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class;

  /// <summary>
  /// 从指定主题订阅消息
  /// </summary>
  /// <typeparam name="TMessage"></typeparam>
  /// <param name="topics"></param>
  /// <param name="messageFunc"></param>
  /// <param name="cancellationToken"></param>
  /// <returns></returns>
  Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class;
}

实现 IKafkaService

public class KafkaService : IKafkaService
{
  public async Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class
  {
    var config = new ProducerConfig
    {
      BootstrapServers = "127.0.0.1:9092"
    };
    using var producer = new ProducerBuilder<string, string>(config).Build();
    await producer.ProduceAsync(topicName, new Message<string, string>
    {
      Key = Guid.NewGuid().ToString(),
      Value = message.SerializeToJson()
    });
  }

  public async Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class
  {
    var config = new ConsumerConfig
    {
      BootstrapServers = "127.0.0.1:9092",
      GroupId = "crow-consumer",
      EnableAutoCommit = false,
      StatisticsIntervalMs = 5000,
      SessionTimeoutMs = 6000,
      AutoOffsetReset = AutoOffsetReset.Earliest,
      EnablePartitionEof = true
    };
    //const int commitPeriod = 5;
    using var consumer = new ConsumerBuilder<Ignore, string>(config)
               .SetErrorHandler((_, e) =>
               {
                 Console.WriteLine($"Error: {e.Reason}");
               })
               .SetStatisticsHandler((_, json) =>
               {
                 Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
               })
               .SetPartitionsAssignedHandler((c, partitions) =>
               {
                 string partitionsStr = string.Join(", ", partitions);
                 Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}");
               })
               .SetPartitionsRevokedHandler((c, partitions) =>
               {
                 string partitionsStr = string.Join(", ", partitions);
                 Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}");
               })
               .Build();
    consumer.Subscribe(topics);
    try
    {
      while (true)
      {
        try
        {
          var consumeResult = consumer.Consume(cancellationToken);
          Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");
          if (consumeResult.IsPartitionEOF)
          {
            Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
            continue;
          }
          TMessage messageResult = null;
          try
          {
            messageResult = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message.Value);
          }
          catch (Exception ex)
          {
            var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";
            Console.WriteLine(errorMessage);
            messageResult = null;
          }
          if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/)
          {
            messageFunc(messageResult);
            try
            {
              consumer.Commit(consumeResult);
            }
            catch (KafkaException e)
            {
              Console.WriteLine(e.Message);
            }
          }
        }
        catch (ConsumeException e)
        {
          Console.WriteLine($"Consume error: {e.Error.Reason}");
        }
      }
    }
    catch (OperationCanceledException)
    {
      Console.WriteLine("Closing consumer.");
      consumer.Close();
    }
    await Task.CompletedTask;
  }
}

注入 IKafkaService ,在需要使用的地方直接调用即可。

public class MessageService : IMessageService, ITransientDependency
{
  private readonly IKafkaService _kafkaService;
  public MessageService(IKafkaService kafkaService)
  {
    _kafkaService = kafkaService;
  }

  public async Task RequestTraceAdded(XxxEventData eventData)
  {
    await _kafkaService.PublishAsync(eventData.TopicName, eventData);
  }
}

以上相当于一个生产者,当我们消息队列发出后,还需一个消费者进行消费,所以可以使用一个控制台项目接收消息来处理业务。

var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
  e.Cancel = true;
  cts.Cancel();
};

await kafkaService.SubscribeAsync<XxxEventData>(topics, async (eventData) =>
{
  // Your logic

  Console.WriteLine($" - {eventData.EventTime:yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已处理");
}, cts.Token);

IKafkaService 中已经写了订阅消息的接口,这里也是注入后直接使用即可。

生产者消费者示例

生产者

static async Task Main(string[] args)
{
  if (args.Length != 2)
  {
    Console.WriteLine("Usage: .. brokerList topicName");
    // 127.0.0.1:9092 helloTopic
    return;
  }

  var brokerList = args.First();
  var topicName = args.Last();

  var config = new ProducerConfig { BootstrapServers = brokerList };

  using var producer = new ProducerBuilder<string, string>(config).Build();

  Console.WriteLine("\n-----------------------------------------------------------------------");
  Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}.");
  Console.WriteLine("-----------------------------------------------------------------------");
  Console.WriteLine("To create a kafka message with UTF-8 encoded key and value:");
  Console.WriteLine("> key value<Enter>");
  Console.WriteLine("To create a kafka message with a null key and UTF-8 encoded value:");
  Console.WriteLine("> value<enter>");
  Console.WriteLine("Ctrl-C to quit.\n");

  var cancelled = false;

  Console.CancelKeyPress += (_, e) =>
  {
    e.Cancel = true;
    cancelled = true;
  };

  while (!cancelled)
  {
    Console.Write("> ");

    var text = string.Empty;

    try
    {
      text = Console.ReadLine();
    }
    catch (IOException)
    {
      break;
    }

    if (string.IsNullOrWhiteSpace(text))
    {
      break;
    }

    var key = string.Empty;
    var val = text;

    var index = text.IndexOf(" ");
    if (index != -1)
    {
      key = text.Substring(0, index);
      val = text.Substring(index + 1);
    }

    try
    {
      var deliveryResult = await producer.ProduceAsync(topicName, new Message<string, string>
      {
        Key = key,
        Value = val
      });

      Console.WriteLine($"delivered to: {deliveryResult.TopicPartitionOffset}");
    }
    catch (ProduceException<string, string> e)
    {
      Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
    }
  }
}

消费者

static void Main(string[] args)
{
  if (args.Length != 2)
  {
    Console.WriteLine("Usage: .. brokerList topicName");
    // 127.0.0.1:9092 helloTopic
    return;
  }

  var brokerList = args.First();
  var topicName = args.Last();

  Console.WriteLine($"Started consumer, Ctrl-C to stop consuming");

  var cts = new CancellationTokenSource();
  Console.CancelKeyPress += (_, e) =>
  {
    e.Cancel = true;
    cts.Cancel();
  };

  var config = new ConsumerConfig
  {
    BootstrapServers = brokerList,
    GroupId = "consumer",
    EnableAutoCommit = false,
    StatisticsIntervalMs = 5000,
    SessionTimeoutMs = 6000,
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnablePartitionEof = true
  };

  const int commitPeriod = 5;

  using var consumer = new ConsumerBuilder<Ignore, string>(config)
             .SetErrorHandler((_, e) =>
             {
               Console.WriteLine($"Error: {e.Reason}");
             })
             .SetStatisticsHandler((_, json) =>
             {
               Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > monitoring..");
               //Console.WriteLine($"Statistics: {json}");
             })
             .SetPartitionsAssignedHandler((c, partitions) =>
             {
               Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
             })
             .SetPartitionsRevokedHandler((c, partitions) =>
             {
               Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
             })
             .Build();
  consumer.Subscribe(topicName);

  try
  {
    while (true)
    {
      try
      {
        var consumeResult = consumer.Consume(cts.Token);

        if (consumeResult.IsPartitionEOF)
        {
          Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");

          continue;
        }

        Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");

        if (consumeResult.Offset % commitPeriod == 0)
        {
          try
          {
            consumer.Commit(consumeResult);
          }
          catch (KafkaException e)
          {
            Console.WriteLine($"Commit error: {e.Error.Reason}");
          }
        }
      }
      catch (ConsumeException e)
      {
        Console.WriteLine($"Consume error: {e.Error.Reason}");
      }
    }
  }
  catch (OperationCanceledException)
  {
    Console.WriteLine("Closing consumer.");
    consumer.Close();
  }
}

到此这篇关于.NET Core下使用Kafka的方法步骤的文章就介绍到这了,更多相关.NET Core使用Kafka内容请搜索猪先飞以前的文章或继续浏览下面的相关文章希望大家以后多多支持猪先飞!

[!--infotagslink--]

相关文章

  • ASP.NET购物车实现过程详解

    这篇文章主要为大家详细介绍了ASP.NET购物车的实现过程,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-09-22
  • .NET Core下使用Kafka的方法步骤

    这篇文章主要介绍了.NET Core下使用Kafka的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-09-22
  • 在ASP.NET 2.0中操作数据之七十二:调试存储过程

    在开发过程中,使用Visual Studio的断点调试功能可以很方便帮我们调试发现程序存在的错误,同样Visual Studio也支持对SQL Server里面的存储过程进行调试,下面就让我们看看具体的调试方法。...2021-09-22
  • Win10 IIS 安装.net 4.5的方法

    这篇文章主要介绍了Win10 IIS 安装及.net 4.5及Win10安装IIS并配置ASP.NET 4.0的方法,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下...2021-09-22
  • 详解.NET Core 3.0 里新的JSON API

    这篇文章主要介绍了详解.NET Core 3.0 里新的JSON API,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-09-22
  • ASP.NET Core根据环境变量支持多个 appsettings.json配置文件

    这篇文章主要介绍了ASP.NET Core根据环境变量支持多个 appsettings.json配置文件,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-09-22
  • .net数据库操作框架SqlSugar的简单入门

    这篇文章主要介绍了.net数据库操作框架SqlSugar的简单入门,帮助大家更好的理解和学习使用.net技术,感兴趣的朋友可以了解下...2021-09-22
  • 记一次EFCore类型转换错误及解决方案

    这篇文章主要介绍了记一次EFCore类型转换错误及解决方案,帮助大家更好的理解和学习使用asp.net core,感兴趣的朋友可以了解下...2021-09-22
  • 详解ASP.NET Core 中基于工厂的中间件激活的实现方法

    这篇文章主要介绍了ASP.NET Core 中基于工厂的中间件激活的实现方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2021-09-22
  • C#使用Ado.Net更新和添加数据到Excel表格的方法

    这篇文章主要介绍了C#使用Ado.Net更新和添加数据到Excel表格的方法,较为详细的分析了OLEDB的原理与使用技巧,可实现较为方便的操作Excel数据,需要的朋友可以参考下...2020-06-25
  • .NET C#利用ZXing生成、识别二维码/条形码

    ZXing是一个开放源码的,用Java实现的多种格式的1D/2D条码图像处理库,它包含了联系到其他语言的端口。这篇文章主要给大家介绍了.NET C#利用ZXing生成、识别二维码/条形码的方法,文中给出了详细的示例代码,有需要的朋友们可以参考借鉴。...2020-06-25
  • asp.net通过消息队列处理高并发请求(以抢小米手机为例)

    这篇文章主要介绍了asp.net通过消息队列处理高并发请求(以抢小米手机为例),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-09-22
  • ASP.NET单选按钮控件RadioButton常用属性和方法介绍

    RadioButton又称单选按钮,其在工具箱中的图标为 ,单选按钮通常成组出现,用于提供两个或多个互斥选项,即在一组单选钮中只能选择一个...2021-09-22
  • ASP.NET 2.0中的数据操作:使用两个DropDownList过滤的主/从报表

    在前面的指南中我们研究了如何显示一个简单的主/从报表, 该报表使用DropDownList和GridView控件, DropDownList填充类别,GridView显示选定类别的产品. 这类报表用于显示具有...2016-05-19
  • 详解.NET Core 使用HttpClient SSL请求出错的解决办法

    这篇文章主要介绍了.NET Core 使用HttpClient SSL请求出错的解决办法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧...2021-09-22
  • Underscore源码分析

    Underscore 是一个 JavaScript 工具库,它提供了一整套函数式编程的实用功能,但是没有扩展任何 JavaScript 内置对象。这篇文章主要介绍了underscore源码分析相关知识,感兴趣的朋友一起学习吧...2016-01-02
  • Python调用.NET库的方法步骤

    这篇文章主要介绍了Python调用.NET库的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-05-09
  • ASP.NET中iframe框架点击左边页面链接 右边显示链接页面内容

    这篇文章主要介绍了ASP.NET中iframe框架点击左边页面链接,右边显示链接页面内容的实现代码,感兴趣的小伙伴们可以参考一下...2021-09-22
  • 创建一个完整的ASP.NET Web API项目

    ASP.NET Web API具有与ASP.NET MVC类似的编程方式,ASP.NET Web API不仅仅具有一个完全独立的消息处理管道,而且这个管道比为ASP.NET MVC设计的管道更为复杂,功能也更为强大。下面创建一个简单的Web API项目,需要的朋友可以参考下...2021-09-22
  • ASP.NET连接MySql数据库的2个方法及示例

    这篇文章主要介绍了ASP.NET连接MySql数据库的2个方法及示例,使用的是MySQL官方组件和ODBC.NET,需要的朋友可以参考下...2021-09-22