Skip to content

简单实现await和async理解其原理

约 4731 字大约 16 分钟

NET

2026-01-14

await async

用于实现异步编程,以接近同步的代码的写法编写非阻塞的异步操作,适用于IO密集型任务

async

用于修饰方法,表示该方法内部包含异步操作,并允许使用 await

实际的线程任务之类的都是await实现的,async只是告诉编辑器 函数内部有await,要把他编译为状态机。
因为async,awaitC# 5.0发布的,为兼容之前使用了await作为变量/方法名的代码

await

用于"等待"一个异步操作,但是不会阻塞线程

我们正常理解await的逻辑是阻塞等待这里执行完成后再往后执行,但是恰恰相反!await其实不是等待,反而是暂停、挂起。释放当前线程,让线程取执行其他任务

把async修饰的块的await后的代码包装成一个块(Continuation)

捋一捋逻辑:

public async Task<string> FetchDataAsync()
{
    Console.WriteLine("1. 开始请求");
    string data = await HttpClient.GetStringAsync("https://api.example.com"); // 关键点
    Console.WriteLine("3. 请求完成");
    return data;
}
  1. 执行Console.WriteLine("1. 开始请求"); 输出 1. 开始请求
  2. 遇到await后正常来说会返回一个Task,但是按逻辑理会分两种情况
    • GetStringAsync()秒执行完成 返回的task已经是Completed状态,就不会暂停切换线程,而是直接执行后续的代码
    • GetStringAsync()如果是一个耗时操作,返回的task是未完成的,则会把await后面的代码封装成一个Continuation,然后立即返回,同时释放他占用的线程。 等待GetStringAsync执行完成后task的IsCompleted = true,再由task调度器分配线程给他执行刚才没执行的Continuation(就是await后的代码)

await的使用

注意事项

不建议在 catch/finally,不能在lock中使用await
因为根据await的原理,他会分割函数,但是catch/finally依赖连续性, lock依赖确定性的执行上下文,await会分割逻辑导致这些的不确定性!
虽然C# 6.0以后能编译器能做到在catch/finally中使用

并发异步操作的异常处理

var task1 = httpClient1.GetStringAsync("http://www.github.com");
var task2 = httpClient2.GetStringAsync("http://www.google.com");

var str1 = await task1;
var str2 = await task2;

这样些的代码,如果task1发生了异常会导致task2无法被执行会丢失。

逻辑:

  1. task1,2 同时执行(并发),独立的异步操作
  2. 又有两种情况
    • task1成功 -> 继续执行task2
    • task1发生异常 -> task1抛出异常,那么task2永远不会被执行
      • task2被吞掉,永远无法被执行
      • 但是task依然存在状态机,严重可能会导致进程崩溃

正常的写法:

var task1 = httpClient1.GetStringAsync("http://www.github.com");
var task2 = httpClient2.GetStringAsync("http://www.google.com");

try
{
    string[] results = await Task.WhenAll(task1, task2);
    string str1 = results[0];
    string str2 = results[1];
}
catch (Exception ex)// 这里的 ex 是一个 AggregateException! 会把 task1 和 task2 的所有异常都收集起来
{
    foreach (var inner in ex.InnerExceptions)
    {
    }
}



/******.NET8中可以这么写******/
await foreach (var result in GetResultsAsync())
{
    Console.WriteLine(result);
}

async IAsyncEnumerable<string> GetResultsAsync()
{
    var task1 = httpClient1.GetStringAsync("...");
    var task2 = httpClient2.GetStringAsync("...");

    var completedTask = await Task.WhenAny(task1, task2);
    yield return await completedTask; //这个完成了 直接返回 yield;下次来从这里往后执行

    var remainingTask = completedTask == task1 ? task2 : task1;
    yield return await remainingTask;//在等待剩下那个
}
//解析
//核心思想是 异步流式处理,手动模拟异步迭代器的状态机
//使用IAsyncEnumerable和yield return 来模拟状态机的形式
//做到那个任务先完成先输出哪个,不用等全部完成才输出
  • 所有 Task 都会被 await 到
  • 所有异常都会被收集到 AggregateException.InnerExceptions

自定义await

主要分几步:

  1. 实现自己的线程池
  2. 实现带上下文的线程池
  3. 使用实现的线程池来实现自己的Task
  4. 实现await、async

重点!!! 理解await与async的原理,以及 异步迭代器 async iterators

自定义线程池

线程池就是将线程添加到一个队列中

 public static class MyThreadPool_v1
 {
     static void Main(string[] args)
     {
         AsyncLocal<int> asyncLocal = new();
         for (int i = 0; i < 1000; i++)
         {
             var tempIndex = i;
             MyThreadPool_v1.QueueUserWorkItem(delegate
             {
                 Console.WriteLine($" {tempIndex} .");
                 Thread.Sleep(500);
             });
         }
     }
      
     private static readonly BlockingCollection<Action> s_workItems = new();
     public static void QueueUserWorkItem(Action action) => s_workItems.Add(action);
     static MyThreadPool_v1()
     {
         //线程数
         for (int i = 0; i < Environment.ProcessorCount; i++)
         {
             new Thread(() =>
             {
                 while (true)
                 {
                     Action action = s_workItems.Take();
                     action();
                 }
             })
             { IsBackground = true }.Start();
         }
     }
 }

BlockingCollection

线程安全且自带阻塞的队列

线程安全集合类

前后台线程

net中分前台线程与后台线程 区别:是否让创建的所有线程都退出

  • 前台线程
    • 当主方法结束时,前台线程后等待
  • 后台线程不会等待

说人话就是退出程序时,前台线程会阻止进程退出,后台线程会跟随程序一起销毁退出

程序(进程)会等到所有前台线程都跑完才真正退出;

后台线程跑不跑完无所谓,一旦所有前台线程结束了,后台线程会被强制终止(不等它执行完)。

new Thread(()=>
{ 
	//....
})
{ 
	IsBackgroud  = true 
}.Start();

带线程上下文的线程池

这样就有了一个基于线程实现的线程池,但是这样的线程池不能携带参数并且在线程池中共享 net中有线程执行上下文的对象: ExecutionContext

引入上下文对象:

static void Main(string[] args)
{
    //专门为**异步流程(async/await)**设计的“上下文局部变量”或叫“环境数据”(ambient data)
    AsyncLocal<int> asyncLocal = new();
    for (int i = 0; i < 1000; i++)
    {
	    asyncLocal.Value = i;
        MyThreadPool_v2.QueueUserWorkItem(delegate
        {
            Console.WriteLine($" {asyncLocal.Value} ");
            Thread.Sleep(500);
        });
    }
    Console.ReadLine();
}


public static class MyThreadPool_v2
{
    private static readonly BlockingCollection<(Action, ExecutionContext?)> s_workItems = new();

    //ExecutionContext.Capture() 抓取当前上下文对象
    public static void QueueUserWorkItem(Action action) => s_workItems.Add((action, ExecutionContext.Capture()));

    static MyThreadPool_v2()
    {
        //线程数
        for (int i = 0; i < Environment.ProcessorCount; i++)
        {
            new Thread(() =>
            {
                while (true)
                {
                    (Action workItem, ExecutionContext? executionContext) = s_workItems.Take();
                    if (executionContext is null)
                        workItem();
                    else
                    {
                        ExecutionContext.Run(executionContext, _ => workItem(), null);
                    }
                }
            })
            { IsBackground = true }.Start();
        }
    }
}

实现自己的Task

在带线程上下文的线程池的基础上,我们可以扩展实现自己的task类

task 任务,说得通俗点他就是安排线程干什么事情的清单。

它不是线程本身,但类似线程池
比如线程是找个人来帮你做事,做完就完事。 thread
线程池呢就是一个包工头 找了几个人来干活,把活儿给他们 他们谁闲着谁就干。 1个thread/线程池

task呢 就是有一堆活 需要找人干 干完了或者失败告诉你,你可以随时让他们不干了,这个人没干了的活给其他闲着的人干。他也是在线程池中拿线程来干活

对比线程池

  • 有自己的task对象,能随时知道执行完成没,结果是什么,有没有异常
  • 支持ContinueWith 一个完成了继续完成后面的
  • 有CancellationToken,随时取消停止执行
  • 有统一的异常包装 AggregateException

Task可以说是一个包装的很好的线程池

实现自己的Task:

  public class MyTask
  {
      //Task只是核心 只是内存的一个数据结构 目的是作为线程池的分层

      private bool _completed;
      private Exception? _excption;
      private Action? _continuation;
      private ExecutionContext? _executionContext;

      public bool IsCompleted
      {
          get
          {
              lock (this)
              {
                  return _completed;
              }
          }

      }

      public void SetResult() => Complete(null);
      public void SetException(Exception exception) => Complete(exception);

      private void Complete(Exception? exception)
      {
          lock (this)
          {
              if (_completed) throw new InvalidOperationException("Task already completed.");

              _completed = true;
              _excption = exception;

              if (_continuation is not null)
              {
                  MyThreadPool_v2.QueueUserWorkItem(() =>
                  {
                      if (_executionContext is null)
                      {
                          _continuation();
                      }
                      else
                      {
                          ExecutionContext.Run(_executionContext, _ => _continuation(), null);
                      }
                  });
              }
          }
      }

      //等待任务完成
      public void Wait()
      {
          ManualResetEventSlim? mres = null;

          lock (this)
          {
              if (!_completed)
              {
                  mres = new ManualResetEventSlim();
                  ContinueWith(mres.Set);
              }
          }
          mres?.Wait();
          if (_excption is not null)
          {
              ExceptionDispatchInfo.Throw(_excption); //抛出所有的异常 叠加 不覆盖
              //throw _excption;
          }
      }

      public void ContinueWith(Action action)
      {
          lock (this)
          {
              if (_completed) MyThreadPool_v2.QueueUserWorkItem(action);
              else
              {
                  _continuation = action;
                  _executionContext = ExecutionContext.Capture();
              }
          }
      }

      public static MyTask Run(Action action)
      {
          MyTask task = new MyTask();
          MyThreadPool_v2.QueueUserWorkItem(() =>
          {
              try
              {
                  action();
              }
              catch (Exception ex)
              {
                  task.SetException(ex);
                  return;
              }
              task.SetResult();
          });
          return task;
      }
      
	   public static MyTask WhenAll(List<MyTask> tasks)
	   {
		    MyTask myTask = new();
		
		    if (tasks.Count == 0)
		        myTask.SetResult();
		    else
		    {
		        int remaining = tasks.Count;
		        Action continuation = () =>
		        {
		            if (Interlocked.Decrement(ref remaining) == 0)
		            {
		                myTask.SetResult();
		            }
		        };
		
		        foreach (var task in tasks)
		        {
		            task.ContinueWith(continuation);
		        }
		
		    }
		    return myTask;
		}
  }
  
  
  
static void Main(string[] args)
{
     AsyncLocal<int> asyncLocal = new();
     List<MyTask> tasks = new();
     for (int i = 0; i < 100; i++)
     {
         asyncLocal.Value = i;
         //tasks中的delegate在添加到线程池中就以及开始执行了,并且根据MyTask的实现 使用的线程MyThreadPool_v2来做的
         tasks.Add(MyTask.Run(delegate
         {
             Console.WriteLine($" {asyncLocal.Value} ");
             Thread.Sleep(900);
         }));
     }
     MyTask.WhenAll(tasks).Wait(); //等待所有任务完成 
     //以上就完成了一个task的任务执行,等待
     
     //以及基础的任务接任务ContinueWith
    Console.Write("Hello, ");
    MyTask.Delay(2000).ContinueWith(delegate
    {
        Console.Write("World!");
        MyTask.Delay(1000).ContinueWith(delegate
       {
           Console.Write(" And Coder !!!");
       });
    });
    
    Console.ReadLine();//异步的工作需等待线程执行完成 防止主程序退出.
    
      /******************************************************/
}

考虑ContinueWith调用等待及退出线程的问题

为什么要考虑Console.ReadLine();//异步的工作需等待线程执行完成 防止主程序退出? 明明我们已经是后台线程了??

正常来说使用MyThreadPool_v2实现的任务执行会受到IsBackground的影响,比如设置为false代表了前台线程会等待任务执行完成才退出。
但是!!!我们实现的 Delay 没有走线程池 他是使用的timertimer也是后台池线程 还没有来得及触发回调去调用线程池执行任务,主线程已经退出了
可以将ContinueWith返回mytask,这样可以使用wait来等待 就不会退出了

尝试wait ContinueWith:

 public MyTask ContinueWith(Action action)
 {
     MyTask task = new();
     Action callback = () =>
     {
         try
         {
             action();
         }
         catch (Exception ex)
         {
             task.SetException(ex);
             return;
         }
         task.SetResult();
     };

     lock (this)
     {
         if (_completed) MyThreadPool_v2.QueueUserWorkItem(callback);
         else
         {
             _continuation = callback;
             _executionContext = ExecutionContext.Capture();
         }
     }
     return task;
 }
 
 
main()
{
 //完成ContinueWith的扩展
  Console.Write("Hello, ");
    MyTask.Delay(2000).ContinueWith(delegate
    {
        Console.Write("World!");
        MyTask.Delay(1000).ContinueWith(delegate
       {
           Console.Write(" And Coder !!!");
       });
    }).Wait();
}

从结果来看并没有我们预想的输出,只输出了 Hello, World!就退出了,为什么?

拆分代码来看.Wait();只对 MyTask.Delay(2000).ContinueWith(delegate{ }).Wait()的内部负责
比如 MyTask.Delay(1000).ContinueWith(delegate { Console.Write(" And Coder !!!"); });属于内部的子任务 他只等待delay(2000)的任务

所以我们需要更改一下他的结构,让任务链条一样 一个一个执行完成

考虑链式调用内部的问题

由于目前是 Action ,无返回的委托 对链式调用不友好 所以可以扩展重载有参委托,把任务返回来调用

 public MyTask ContinueWith(Func<MyTask> action)
 {
     MyTask task = new();

     Action callback = () =>
     {
         try
         {
             MyTask next = action();
             next.ContinueWith(() =>
             {
                 if (next._excption is not null)
                 {
                     task.SetException(next._excption);
                 }
                 else
                 {
                     task.SetResult();
                 }
             });
         }
         catch (Exception ex)
         {
             task.SetException(ex);
             return;
         }
     };

     lock (this)
     {
         if (_completed) MyThreadPool_v2.QueueUserWorkItem(callback);
         else
         {
             _continuation = callback;
             _executionContext = ExecutionContext.Capture();
         }
     }
     return task;
 }
 
main()
{
    Console.Write("Hello, ");
    MyTask.Delay(2000).ContinueWith(delegate
    {
        Console.Write("World!");
        return MyTask.Delay(1000).ContinueWith(delegate
       {
           Console.Write(" And Coder !!!");
       });
    });
}

达到了我们预期的输出 Hello, World! And Coder !!! 这样就可以返回任务 等待每个任务执行完成后再退出!

简洁的串行链式

但是这样写跟我们日常的写法好像不一样,我们日常以及预期的写法是这样:

Console.Write("Hello, ");
MyTask.Delay(2000).ContinueWith(delegate
{
    Console.Write("World!");
}).ContinueWith(delegate
{
    Console.Write(" And ");
}).ContinueWith(delegate
{
    Console.Write(" Code !");
}).Wait();

但是这样写无法满足在后面的ContinueWith也Delay(2000),输出Hello, 后除了第一个World! 后面的都是一起执行,并不会等待!类似这样:

for (int i = 0; ; i++)           
{
    MyTask.Delay(1000); //想要的其实是  await MyTask.Delay(1000); 这种效果
    Console.WriteLine(i);
}
//会无限的输出累加,并不会等待!!!

我们需要一个什么? await! 想实现这个需要知道一个东西:

异步迭代器 async iterators

C# 2.0 就有了

   IEnumerable<int> Count(int count)
   {
       for (int i = 0; i < count; i++)
       {
           yield return i;
       }
   }

   foreach (var i in Count(10))
   {
       Console.WriteLine(i);
   }
   //输出0,1,2,3,....9

这是一个迭代器的写法,主要依赖于 yield return。惰性求值

的语法糖糖,生成一个可枚举的序列,而不是集合
编译器会将其转换为一个状态机,实现懒加载,每次调用的时候才会触发拿下一个值
每次执行到 yield return i; 就会暂停方法,并且返回i给调用者,调用者想要下一个元素的时候,会冲上次暂停的地方继续执行 不会从头开始执行 就有点等待的感觉了

原理,编译器做了什么? :

private class CountEnumerable : IEnumerable<int>, IEnumerator<int>
{
	private int count;
    private int state = -1;     // 记录当前状态 怎么执行
    private int i;              // 保存局部变量

    public bool MoveNext()
    {
        switch (state)
        {
            case -1:   // 第一次调用
                i = 0;
                state = 0;
                goto case 0;
            case 0:
                if (i < count)
                {
                    Current = i;     // 返回当前值
                    i++;
                    state = 0;
                    return true;     // 还有下一个
                }
                state = 1;
                return false;        // 结束了
            case 1:
                return false;
        }
    }

    public int Current { get; private set; }
    // ... 其他接口实现
}

foreach隐式执行了:

var e = Count(10).GetEnumerator();
while (e.MoveNext())
{
    int i = e.Current;
    Console.WriteLine(i);
}

总结:yield return会将一个普通函数变成可以暂停/恢复的状态机;每次yield return就暂停,下次MoveNext从暂停的地方再恢复! 学会了 yield return,就已经理解了 async/await 80% 的原理了!

注!!!!!

的编译器中编译器中,用于支持支持实现迭代器(iterators)与异步方法(async methods)的逻辑90%是相同的! 可能在细节上有些差异,但是本质上都是在实现一个状态机(保存执行的状态),这个状态机允许函数在某个点暂停、退出、重新进入并恢复到之前的状态继续执行 真正不同的地方在于,是谁在调用 MoveNext() ?

  • 比如foreach逻辑中隐式调用的enumerator.MoveNext()
  • 异步函数中 await 完成Task后,自动调用的continuation,或者我们实现的mytask里.ContinueWith(MoveNext)

实现await/async

就可以使用 yield return 的特性和原理来实现这个功能。其实是利用的编译器的特性来实现的

  1. 给 MyTask 扩展一个迭代器
public class MyTask
{

	//其他已实现的...


	//mytask异步迭代器 虽然是异步 但是是串行的一个接一个
	public static MyTask Iterate(IEnumerable<MyTask> tasks)
	{
	    MyTask t = new();
	
	    IEnumerator<MyTask> e = tasks.GetEnumerator();
	
	    void MoveNext()
	    {
	        try
	        {
	            if (e.MoveNext())
	            {
	                MyTask currentTask = (MyTask)e.Current;
	                currentTask.ContinueWith(MoveNext);
	                return;
	            }
	        }
	        catch (Exception ex)
	        {
	            t.SetException(ex);
	            return;
	        }
	        t.SetResult();
	    }
	
	    MoveNext();
	
	    return t;
	}
}

 async static Task Main(string[] args)
 {
     IEnumerable<MyTask> PrintAsync()
     {
         for (int i = 0; ; i++)
         {
             yield return MyTask.Delay(1000);
             Console.WriteLine(i);
         }
     }

     MyTask.Iterate(PrintAsync()).Wait();

输出: 每秒一个累加! 达成我们的目标

实现语法糖awaiter

public class MyTask
{
	//其他已实现的...
	
	
	 public struct Awaiter(MyTask t) : INotifyCompletion
	 {
	     public Awaiter GetAwaiter() => this;
	     public bool IsCompleted => t.IsCompleted;
	     public void GetResult() => t.Wait();
	     public void OnCompleted(Action continuation) => t.ContinueWith(continuation);
	 }
	
	 public Awaiter GetAwaiter() => new Awaiter(this);
 }

await 就是状态机的暂停点

完整代码:

https://www.youtube.com/watch?v=R-z2Hv-7nxk&t=415s