余晖落尽暮晚霞,黄昏迟暮远山寻
本站
当前位置:网站首页 > 编程知识 > 正文

asp.net core supersocket介绍以及源码分析(asp.net core oauth2.0)

xiyangw 2023-02-28 12:00 1131 浏览 0 评论

跟物联网设备常用的通信协议有TCP,MQTT.今天我们介绍的是TCP连接,TCP连接程序的组件有Supersocket,dotnetty.Supersocket相信搞过.net的朋友应该都知道,dotnetty是有微软Azure从java平台下移植过来的一个高性能、异步事件驱动的 NIO 框架,Kafka和RocketMQ等消息中间件、ElasticSearch开源搜索引擎、大数据处理Hadoop的RPC框架Avro、分布式通信框架Dubbo,都使用了Netty,Netty的资料很多,有兴趣的可以搜一下,dotnetty和Netty语法基本一致,所以资料可以互相参考,今天我们介绍的是.net下supersocket的使用以及源码分析

SuperSocket 可以和 ASP.NET Core 网站一起同时运行。你需要做的是将 SuperSocket 注册到 ASP.NET Core 网站的host builder中去, 同时将服务器的选项放到配置文件中或者通过代码定义。

  //don't forget the usings
    using SuperSocket;
    using SuperSocket.ProtoBase;

    public static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureWebHostDefaults(webBuilder =>
            {
                webBuilder.UseStartup<Startup>();
            })
            .AsSuperSocketHostBuilder<TextPackageInfo, LinePipelineFilter>()
            .UsePackageHandler(async (s, p) =>
            {
                // echo message back to client
                await s.SendAsync(Encoding.UTF8.GetBytes(p.Text + "\r\n"));
            });

同时将服务器的配置选项放到配置文件 "appsettings.json" 中去:

    {
        "Logging": {
            "LogLevel": {
            "Default": "Information",
            "Microsoft": "Warning",
            "Microsoft.Hosting.Lifetime": "Information"
            }
        },
        "serverOptions": {
            "name": "TestServer",
            "listeners": [
                {
                    "ip": "Any",
                    "port": 4040
                }
            ]
        },
        "AllowedHosts": "*"
    }

上面代码就可以实现supersocket和asp.net core (.net core3,.net5,.net6)的集成,可以支持最新的.NET6,当然DotNetty也支持.Net 6

今天我们分析源码的版本是supersocket V2.0的版本,跟之前.net freamwork下有很大的区别

SuperSocket 请求处理模型示意图


SuperSocket 请求处理模型示意图


源码主线流程分析


主流程图

1分析的入口就从SuperSocketService这个类开始,他集成自IHostedService, IServer,因此这个类的StartAsync就是执行的入口

 async Task<bool> IServer.StartAsync()
        {
            await StartAsync(CancellationToken.None);
            return true;
        }



2.通过channelCreatorFactory创建监听器,监听代码如下


  public bool Start()
        {
            var options = Options;

            try
            {
                if (options.Security != SslProtocols.None && options.CertificateOptions != null)
                {
                    options.CertificateOptions.EnsureCertificate();
                }

                var listenEndpoint = options.GetListenEndPoint();
                var listenSocket = _listenSocket = new Socket(listenEndpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                
                listenSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
                listenSocket.LingerState = new LingerOption(false, 0);

                if (options.NoDelay)
                    listenSocket.NoDelay = true;
                
                listenSocket.Bind(listenEndpoint);
                listenSocket.Listen(options.BackLog);

                IsRunning = true;

                _cancellationTokenSource = new CancellationTokenSource();

                KeepAccept(listenSocket).DoNotAwait();
                return true;
            }
            catch (Exception e)
            {
                _logger.LogError(e, #34;The listener[{this.ToString()}] failed to start.");
                return false;
            }
        }

3.开启异步接收accept线程


        private async Task KeepAccept(Socket listenSocket)
        {
            while (!_cancellationTokenSource.IsCancellationRequested)
            {
                try
                {
                    var client = await listenSocket.AcceptAsync().ConfigureAwait(false);
                    OnNewClientAccept(client);
                }
                catch (Exception e)
                {
                    if (e is ObjectDisposedException || e is NullReferenceException)
                        break;
                    
                    if (e is SocketException se)
                    {
                        var errorCode = se.ErrorCode;

                        //The listen socket was closed
                        if (errorCode == 125 || errorCode == 89 || errorCode == 995 || errorCode == 10004 || errorCode == 10038)
                        {
                            break;
                        }
                    }
                    
                    _logger.LogError(e, #34;Listener[{this.ToString()}] failed to do AcceptAsync");
                    continue;
                }
            }

            _stopTaskCompletionSource.TrySetResult(true);
        }

4 当有连接进来后,执行执行注册的事件

listener.NewClientAccepted += OnNewClientAccept;


5.accepted后就创建新的Channel,session


     private void AcceptNewChannel(IChannel channel)
        {
            var session = _sessionFactory.Create() as AppSession;
            HandleSession(session, channel).DoNotAwait();
        }

6 channel.Start(),

  public override void Start()
        {
            _readsTask = ProcessReads();
            _sendsTask = ProcessSends();
            WaitHandleClosing();
        }


7.将从内核里读的socket数据异步写到 Pipe中

           protected virtual async Task ProcessReads()
        {
            var pipe = In;

            Task writing = FillPipeAsync(pipe.Writer);
            Task reading = ReadPipeAsync(pipe.Reader);

            await Task.WhenAll(reading, writing);
        }

8当Pipe有数据写入后,通知Pipe读线程去解析数据,这里通知用的方法是ManualResetValueTaskSourceCore,写线程写入数据后执行 _taskSourceCore.SetResult(target);

就会触发读线程去读,读的时候会根据你设置的协议模版去解析,这个过程会去处理粘包和拆包的过程,

因为Pipe是可以定向的从流中取部分数据的

内置的协议模板如下;

TerminatorReceiveFilter (SuperSocket.SocketBase.Protocol.TerminatorReceiveFilter, SuperSocket.SocketBase)

CountSpliterReceiveFilter (SuperSocket.Facility.Protocol.CountSpliterReceiveFilter, SuperSocket.Facility)

FixedSizeReceiveFilter (SuperSocket.Facility.Protocol.FixedSizeReceiveFilter, SuperSocket.Facility)

BeginEndMarkReceiveFilter (SuperSocket.Facility.Protocol.BeginEndMarkReceiveFilter, SuperSocket.Facility)

FixedHeaderReceiveFilter (SuperSocket.Facility.Protocol.FixedHeaderReceiveFilter, SuperSocket.Facility)


9 读取到的数据解析成packageInfo 后继续往下执行

      await foreach (var p in packageChannel.RunAsync())
                {
                    if(_packageHandlingContextAccessor!=null)
                    {
                        _packageHandlingContextAccessor.PackageHandlingContext = new PackageHandlingContext<IAppSession, TReceivePackageInfo>(session, p);
                    }
                    await packageHandlingScheduler.HandlePackage(session, p);
                }

10再执行到我们定义的command即可

        ValueTask IPackageHandler<TNetPackageInfo>.Handle(IAppSession session, TNetPackageInfo package)
        {
            return HandlePackage(session, PackageMapper.Map(package));
        }

11 执行我们预制的command代码


    [Command("add")]
    public class ADD : IAsyncCommand<StringPackageInfo>
    {
        public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package)
        {
            var result = package.Parameters
                .Select(p => int.Parse(p))
                .Sum();

            await session.SendAsync(Encoding.UTF8.GetBytes(result.ToString() + "\r\n"));
        }
    }

相关推荐

辞旧迎新,新手使用Containerd时的几点须知

相信大家在2020年岁末都被Kubernetes即将抛弃Docker的消息刷屏了。事实上作为接替Docker运行时的Containerd在早在Kubernetes1.7时就能直接与Kubelet集成使...

分布式日志系统ELK+skywalking分布式链路完整搭建流程

开头在分布式系统中,日志跟踪是一件很令程序员头疼的问题,在遇到生产问题时,如果是多节点需要打开多节点服务器去跟踪问题,如果下游也是多节点且调用多个服务,那就更麻烦,再者,如果没有分布式链路,在生产日志...

Linux用户和用户组管理

1、用户账户概述-AAA介绍AAA指的是Authentication、Authorization、Accounting,即认证、授权和审计。?认证:验证用户是否可以获得权限,是3A的第一步,即验证身份...

linux查看最后N条日志

其实很简单,只需要用到tail这个命令tail-100catalina.out输入以上命令,就能列出catalina.out的最后100行。...

解决linux系统日志时间错误的问题

今天发现一台虚拟机下的系统日志:/var/log/messages,文件时间戳不对,跟正常时间差了12个小时。按网上说的执行了servicersyslogrestart重启syslog服务,还是不...

全程软件测试(六十二):软件测试工作如何运用Linux—读书笔记

从事过软件测试的小伙们就会明白会使用Linux是多么重要的一件事,工作时需要用到,面试时会被问到,简历中需要写到。对于软件测试人员来说,不需要你多么熟练使用Linux所有命令,也不需要你对Linux...

Linux运维之为Nginx添加错误日志(error_log)配置

Nginx错误日志信息介绍配置记录Nginx的错误信息是调试Nginx服务的重要手段,属于核心功能模块(nginx_core_module)的参数,该参数名字为error_log,可以放在不同的虚机主...

Linux使用swatchdog实时监控日志文件的变化

1.前言本教程主要讲解在Linux系统中如何使用swatchdog实时监控日志文件的变化。swatchdog(SimpleWATCHDOG)是一个简单的Perl脚本,用于监视类Unix系统(比如...

syslog服务详解

背景:需求来自于一个客户想将服务器的日志转发到自己的日志服务器上,所以希望我们能提供这个转发的功能,同时还要满足syslog协议。1什么是syslog服务1.1syslog标准协议如下图这里的fa...

linux日志文件的管理、备份及日志服务器的搭建

日志文件存放目录:/var/log[root@xinglog]#cd/var/log[root@xinglog]#lsmessages:系统日志secure:登录日志———————————...

运维之日志管理简介

日志简介在运维过程中,日志是必不可少的东西,通过日志可以快速发现问题所在。日志分类日志分类,对不同的日志进行不同维度的分析。操作系统日志操作系统是基础,应用都是在其之上;操作系统日志的分析,可以反馈出...

Apache Log4j 爆核弹级漏洞,Spring Boot 默认日志框架就能完美躲过

这两天沸沸扬扬的Log4j2漏洞门事件炒得热火朝天:突发!ApacheLog4j2报核弹级漏洞。。赶紧修复!!|Java技术栈|Java|SpringBoot|Spring...

Linux服务器存在大量log日志,如何快速定位错误?

来源:blog.csdn.net/nan1996jiang/articlep/details/109550303针对大量log日志快速定位错误地方tail/head简单命令使用:附加针对大量log日志...

Linux中查看日志文件的正确姿势,求你别tail走天下了!

作为一个后端开发工程师,在Linux中查看查看文件内容是基本操作了。尤其是通常要分析日志文件排查问题,那么我们应该如何正确打开日志文件呢?对于我这种小菜鸡来说,第一反应就是cat,tail,vi(或...

分享几款常用的付费日志系统,献给迷茫的你!

概述在前一篇文章中,我们分享了几款免费的日志服务器。他们各有各的特点,但是大家有不同的需求,有时免费的服务器不能满足大家的需要,下面推荐几款付费的日志服务器。1.Nagios日志服务器Nagio...

取消回复欢迎 发表评论: