Kafka在ASP.Net Core上的应用
2020/2/26 17:16:27
本文主要是介绍Kafka在ASP.Net Core上的应用,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
ASP.Net Core中使用的最多的是Confluent.Kafka这个包,以下用实例展示应用1.下载Nuget包
首先是下载Confluent.Kafka这个包
2.创建Producer消息生产者
发送者
public class KafkaProducer { public static async Task SendAsync<T>(string topic, T value) where T: KafkaMessage { var config = new ProducerConfig { BootstrapServers = ConfigEntity.Instance.kafkaMapping.BootstrapServers };//服务器IP ProducerBuilder<Null, string> producerBuilder = new ProducerBuilder<Null, string>(config); using (var p = producerBuilder.Build()) { try { var dr = await p.ProduceAsync(topic, new Message<Null, string> { Value = JsonConvert.SerializeObject(value) }); Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'"); } catch (ProduceException<Null, string> e) { Console.WriteLine($"Delivery failed: {e.Error.Reason}"); } } } } 复制代码
其中要注意的一点ProducerBuilder<TKey,TValue>中的TValue类型只能是Confluent.Kafka.Null, int, long, string, float, double, byte[]. 这7种类型, 否则在调用producerBuilder.Build()时会抛出 ArgumentNullException(Key serializer not specified and there is no default serializer defined for type {typeof(TKey).Name})
消息体中包含你的消息必须的内容
public class KafkaMessage { } 复制代码
3.创建Consumer消息消费者
总消费类
public class KafkaConsumer<T> where T : KafkaMessage { public string Topic { get; set; } public string ConsumerGroup { get; set; } public void Subscribe(Action<T> dealMessage) { var config = new ConsumerConfig { GroupId = ConsumerGroup, BootstrapServers = ConfigEntity.Instance.kafkaMapping.BootstrapServers, AutoOffsetReset = AutoOffsetReset.Latest }; Task.Run(() => { var builder = new ConsumerBuilder<string, string>(config); using (var consumer = builder.Build()) { consumer.Subscribe(Topic); while (true) { var result = consumer.Consume(); try { var message = JsonConvert.DeserializeObject<T>(result.Value); dealMessage(message); } catch (Exception) { Console.WriteLine($"Topic : {result.Topic}, Message : {result.Value}"); } } } }); } } 复制代码
子消费类
interface ITestKafkaConsumer { void DealMessage(TestKafkaEntity message); void Subscribe(); } public class TestKafkaConsumer : ITestKafkaConsumer { private KafkaConsumer<TestKafkaEntity> consumer { get; set; } public TestKafkaConsumer() { consumer = new KafkaConsumer<TestKafkaEntity> { Topic = "test", ConsumerGroup = "console-consumer-63873", }; } public void DealMessage(TestKafkaEntity message) { Console.WriteLine("-------------------------------------------------------------"); Console.WriteLine("这是一个消费者!!!" + message.ConsumerValue); Console.WriteLine("-------------------------------------------------------------"); } public void Subscribe() { consumer.Subscribe(DealMessage); } } 复制代码
通过回调方法的方式, 将子消息类中的方法传入总消息类中
4.注入消费者
在Startup.cs类中的ConfigureServices方法中注入子消费类:
public void ConfigureServices(IServiceCollection services) { services.AddSingleton<ITestKafkaConsumer, TestKafkaConsumer>(); } 复制代码
然后在Program.cs类中的Main方法启动消费者:
public static void Main(string[] args) { var hostBuilder = CreateHostBuilder(args); var host = hostBuilder.Build(); using (var scope = host.Services.CreateScope()) { var testConsumer = scope.ServiceProvider.GetService<ITestKafkaConsumer>(); testConsumer.Subscribe(); } host.Run(); ; } 复制代码
结果展示:
以上就是kafka在ASP.Net Core中的简单实现
这篇关于Kafka在ASP.Net Core上的应用的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2022-03-01沐雪多租宝商城源码从.NetCore3.1升级到.Net6的步骤
- 2024-05-08首个适配Visual Studio平台的国产智能编程助手CodeGeeX正式上线!C#程序员必备效率神器!
- 2024-03-30C#设计模式之十六迭代器模式(Iterator Pattern)【行为型】
- 2024-03-29c# datetime tryparse
- 2024-02-21list find index c#
- 2024-01-24convert toint32 c#
- 2024-01-24Advanced .Net Debugging 1:你必须知道的调试工具
- 2024-01-24.NET集成IdGenerator生成分布式全局唯一ID
- 2024-01-23用CI/CD工具Vela部署Elasticsearch + C# 如何使用
- 2024-01-23.NET开源的简单、快速、强大的前后端分离后台权限管理系统