余晖落尽暮晚霞,黄昏迟暮远山寻
本站
当前位置:网站首页 > 编程知识 > 正文

asp.net core supersocket介绍以及源码分析(asp.net core oauth2.0)

xiyangw 2023-02-28 12:00 1360 浏览 0 评论

跟物联网设备常用的通信协议有TCP,MQTT.今天我们介绍的是TCP连接,TCP连接程序的组件有Supersocket,dotnetty.Supersocket相信搞过.net的朋友应该都知道,dotnetty是有微软Azure从java平台下移植过来的一个高性能、异步事件驱动的 NIO 框架,Kafka和RocketMQ等消息中间件、ElasticSearch开源搜索引擎、大数据处理Hadoop的RPC框架Avro、分布式通信框架Dubbo,都使用了Netty,Netty的资料很多,有兴趣的可以搜一下,dotnetty和Netty语法基本一致,所以资料可以互相参考,今天我们介绍的是.net下supersocket的使用以及源码分析

SuperSocket 可以和 ASP.NET Core 网站一起同时运行。你需要做的是将 SuperSocket 注册到 ASP.NET Core 网站的host builder中去, 同时将服务器的选项放到配置文件中或者通过代码定义。

  //don't forget the usings
    using SuperSocket;
    using SuperSocket.ProtoBase;

    public static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureWebHostDefaults(webBuilder =>
            {
                webBuilder.UseStartup<Startup>();
            })
            .AsSuperSocketHostBuilder<TextPackageInfo, LinePipelineFilter>()
            .UsePackageHandler(async (s, p) =>
            {
                // echo message back to client
                await s.SendAsync(Encoding.UTF8.GetBytes(p.Text + "\r\n"));
            });

同时将服务器的配置选项放到配置文件 "appsettings.json" 中去:

    {
        "Logging": {
            "LogLevel": {
            "Default": "Information",
            "Microsoft": "Warning",
            "Microsoft.Hosting.Lifetime": "Information"
            }
        },
        "serverOptions": {
            "name": "TestServer",
            "listeners": [
                {
                    "ip": "Any",
                    "port": 4040
                }
            ]
        },
        "AllowedHosts": "*"
    }

上面代码就可以实现supersocket和asp.net core (.net core3,.net5,.net6)的集成,可以支持最新的.NET6,当然DotNetty也支持.Net 6

今天我们分析源码的版本是supersocket V2.0的版本,跟之前.net freamwork下有很大的区别

SuperSocket 请求处理模型示意图


SuperSocket 请求处理模型示意图


源码主线流程分析


主流程图

1分析的入口就从SuperSocketService这个类开始,他集成自IHostedService, IServer,因此这个类的StartAsync就是执行的入口

 async Task<bool> IServer.StartAsync()
        {
            await StartAsync(CancellationToken.None);
            return true;
        }



2.通过channelCreatorFactory创建监听器,监听代码如下


  public bool Start()
        {
            var options = Options;

            try
            {
                if (options.Security != SslProtocols.None && options.CertificateOptions != null)
                {
                    options.CertificateOptions.EnsureCertificate();
                }

                var listenEndpoint = options.GetListenEndPoint();
                var listenSocket = _listenSocket = new Socket(listenEndpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                
                listenSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
                listenSocket.LingerState = new LingerOption(false, 0);

                if (options.NoDelay)
                    listenSocket.NoDelay = true;
                
                listenSocket.Bind(listenEndpoint);
                listenSocket.Listen(options.BackLog);

                IsRunning = true;

                _cancellationTokenSource = new CancellationTokenSource();

                KeepAccept(listenSocket).DoNotAwait();
                return true;
            }
            catch (Exception e)
            {
                _logger.LogError(e, #34;The listener[{this.ToString()}] failed to start.");
                return false;
            }
        }

3.开启异步接收accept线程


        private async Task KeepAccept(Socket listenSocket)
        {
            while (!_cancellationTokenSource.IsCancellationRequested)
            {
                try
                {
                    var client = await listenSocket.AcceptAsync().ConfigureAwait(false);
                    OnNewClientAccept(client);
                }
                catch (Exception e)
                {
                    if (e is ObjectDisposedException || e is NullReferenceException)
                        break;
                    
                    if (e is SocketException se)
                    {
                        var errorCode = se.ErrorCode;

                        //The listen socket was closed
                        if (errorCode == 125 || errorCode == 89 || errorCode == 995 || errorCode == 10004 || errorCode == 10038)
                        {
                            break;
                        }
                    }
                    
                    _logger.LogError(e, #34;Listener[{this.ToString()}] failed to do AcceptAsync");
                    continue;
                }
            }

            _stopTaskCompletionSource.TrySetResult(true);
        }

4 当有连接进来后,执行执行注册的事件

listener.NewClientAccepted += OnNewClientAccept;


5.accepted后就创建新的Channel,session


     private void AcceptNewChannel(IChannel channel)
        {
            var session = _sessionFactory.Create() as AppSession;
            HandleSession(session, channel).DoNotAwait();
        }

6 channel.Start(),

  public override void Start()
        {
            _readsTask = ProcessReads();
            _sendsTask = ProcessSends();
            WaitHandleClosing();
        }


7.将从内核里读的socket数据异步写到 Pipe中

           protected virtual async Task ProcessReads()
        {
            var pipe = In;

            Task writing = FillPipeAsync(pipe.Writer);
            Task reading = ReadPipeAsync(pipe.Reader);

            await Task.WhenAll(reading, writing);
        }

8当Pipe有数据写入后,通知Pipe读线程去解析数据,这里通知用的方法是ManualResetValueTaskSourceCore,写线程写入数据后执行 _taskSourceCore.SetResult(target);

就会触发读线程去读,读的时候会根据你设置的协议模版去解析,这个过程会去处理粘包和拆包的过程,

因为Pipe是可以定向的从流中取部分数据的

内置的协议模板如下;

TerminatorReceiveFilter (SuperSocket.SocketBase.Protocol.TerminatorReceiveFilter, SuperSocket.SocketBase)

CountSpliterReceiveFilter (SuperSocket.Facility.Protocol.CountSpliterReceiveFilter, SuperSocket.Facility)

FixedSizeReceiveFilter (SuperSocket.Facility.Protocol.FixedSizeReceiveFilter, SuperSocket.Facility)

BeginEndMarkReceiveFilter (SuperSocket.Facility.Protocol.BeginEndMarkReceiveFilter, SuperSocket.Facility)

FixedHeaderReceiveFilter (SuperSocket.Facility.Protocol.FixedHeaderReceiveFilter, SuperSocket.Facility)


9 读取到的数据解析成packageInfo 后继续往下执行

      await foreach (var p in packageChannel.RunAsync())
                {
                    if(_packageHandlingContextAccessor!=null)
                    {
                        _packageHandlingContextAccessor.PackageHandlingContext = new PackageHandlingContext<IAppSession, TReceivePackageInfo>(session, p);
                    }
                    await packageHandlingScheduler.HandlePackage(session, p);
                }

10再执行到我们定义的command即可

        ValueTask IPackageHandler<TNetPackageInfo>.Handle(IAppSession session, TNetPackageInfo package)
        {
            return HandlePackage(session, PackageMapper.Map(package));
        }

11 执行我们预制的command代码


    [Command("add")]
    public class ADD : IAsyncCommand<StringPackageInfo>
    {
        public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package)
        {
            var result = package.Parameters
                .Select(p => int.Parse(p))
                .Sum();

            await session.SendAsync(Encoding.UTF8.GetBytes(result.ToString() + "\r\n"));
        }
    }

相关推荐

排序算法--归并排序_归并排序例题讲解

原理如图所示(先分割再合并):归并排序代码工作原理:1、申请空间,使其大小为两个已经排序序列之和,该空间用来存放合并后的序列2、设定两个指针,最初位置分别为两个已经排序序列的起始位置3、比较两个指针所...

八大排序算法-归并排序_归并排序 算法

算法思想归并排序分为三个步骤:1.分解:将数列分解成n个子数列。(如果是将数列分成2个子数列则为2路归并)2.治理:对每个子数列进行排序操作3.合并:将两个排好序的子数列进行合并生成新的数列算法实现P...

高级排序之归并排序、希尔排序_希尔排序和归并排序区别

前言继上次排序算法简单排序算法之冒泡、插入和选择排序-Java实现版后,本文学习高级排序算法——归并排序、希尔排序,快速排序将在后续更新。本文实现代码调用方法,部分来自前一个文章:简单排序算法之冒泡、...

Excel办公应用:按合并单元格排序的三大方法

1.按姓名对科目排序重点:在"C2"中输入公式=IF(A2<>"",1,C1+1),然后下拉填充。2.按姓名添加连续序号(方法一)重点:选择"A2:A11"单元格区域,在编辑栏中输入公...

快速排序 Vs. 归并排序 Vs. 堆排序——谁才是最强的排序算法

知乎上有一个问题是这样的:堆排序是渐进最优的比较排序算法,达到了O(nlgn)这一下界,而快排有一定的可能性会产生最坏划分,时间复杂度可能为O(n^2),那为什么快排在实际使用中通常优于堆排序?昨天刚...

归并排序思路图解 #归并排序_归并排序百度百科

排序算法1.图解。OK,让它排一下。看好了,要开始排了。能看出来像递归吗?肯定算法难,但是这个次数非常的多,不用管次数。这个是帝规,就是递归。这是并,这是并,这是两个有序数,组合成一个最后的大的有序数...

排序算法学习——归并排序_归并排序算法稳定吗

我们先看归并排序的定义归并排序是建立在归并操作上的一种有效的排序算法,该算法是采用分治法(DivideandConquer)的一个非常典型的应用。将已有序的子序列合并,得到完全有序的序列;即先使每...

动画|经典的归并排序究竟怎么玩儿?

作者|菠了个菜责编|郭芮由于LeetCode上的算法题很多涉及到一些基础的数据结构,为了更好的理解后续更新的一些复杂题目的动画,推出一个新系列——《图解数据结构》,主要使用动画来描述常见的数据...

Excel中,多列数据统一排名,Rank函数直接搞定

Rank实现多列联合排序排序,那太简单啦,Excel中,升序降序,一个按键就可以。但,那是针对单列情况,若需要联合多列数据进行排序呢?如下图所示,需要对1、3、5列进行统一排序,咋弄嘞?联合排序案例先...

【数据结构与算法】归并排序_数据结构中归并排序

归并排序是建立在归并操作的一种高效的排序方法,该方法采用了分治的思想,比较适用于处理较大规模的数据,但比较耗内存,今天我们聊聊归并排序排序思想一天,小一尘和慧能坐在石头上,眺望着远方师傅,我听山下的柳...

C++基础算法:归并排序_经典排序算法-----归并排序(c语言实现)

归并排序(MergeSort)是建立在归并操作上的一种有效,稳定的排序算法,该算法是采用分治法(DivideandConquer)的一个非常典型的应用。将已有序的子序列合并,得到完全有序的序列。...

马士兵说之归并排序_马士兵教育的内推是真的

大家对于排序应该是挺熟悉的吧,马士兵老师特意为排序出了一波视频,当然文章是转自博客园的,马士兵老师的视频观看请点击下方的了解更多概要本章介绍排序算法中的归并排序。内容包括:1.归并排序介绍2.归并...

C++快速排序和归并排序_c++快速排序sort

快速排序每一轮挑选一个基准元素(随机选择,编程时一般选取第一个),并让比它大或小的元素移动到基准元素的两边,把数列拆解成了两个部分。而后对这两部分分别进行快速排序。时间复杂度:O(nlogn),辅助空...

经典的排序算法——归并排序_归并排序算法步骤

归并排序(MergeSort)是一种基于分治策略的高效排序算法。它将原始数组不断地分割成两个子数组,直到每个子数组只剩下一个元素为止(即基本有序),然后再通过合并已排序的子数组来最终得到完全有序的大...

归并排序_归并排序c++实现

归并排序概念:归并排序中涉及到一个概念就是分而治之,总序列化成小序列,将小序列排序好,利用排序好的小序列,再归并排序成原来要排序的序列。所以排序前先要分:functiondivide(arr){...

取消回复欢迎 发表评论: