外观
简单实现await和async理解其原理
await async
用于实现异步编程,以接近同步的代码的写法编写非阻塞的异步操作,适用于IO密集型任务
async
用于修饰方法,表示该方法内部包含异步操作,并允许使用 await。
注
实际的线程任务之类的都是await实现的,async只是告诉编辑器 函数内部有await,要把他编译为状态机。
因为async,await是C# 5.0发布的,为兼容之前使用了await作为变量/方法名的代码
await
用于"等待"一个异步操作,但是不会阻塞线程
注
我们正常理解await的逻辑是阻塞等待这里执行完成后再往后执行,但是恰恰相反!await其实不是等待,反而是暂停、挂起。释放当前线程,让线程取执行其他任务。
把async修饰的块的await后的代码包装成一个块(Continuation)
捋一捋逻辑:
public async Task<string> FetchDataAsync()
{
Console.WriteLine("1. 开始请求");
string data = await HttpClient.GetStringAsync("https://api.example.com"); // 关键点
Console.WriteLine("3. 请求完成");
return data;
}- 执行
Console.WriteLine("1. 开始请求");输出1. 开始请求 - 遇到
await后正常来说会返回一个Task,但是按逻辑理会分两种情况GetStringAsync()秒执行完成 返回的task已经是Completed状态,就不会暂停切换线程,而是直接执行后续的代码GetStringAsync()如果是一个耗时操作,返回的task是未完成的,则会把await后面的代码封装成一个Continuation,然后立即返回,同时释放他占用的线程。 等待GetStringAsync执行完成后task的IsCompleted = true,再由task调度器分配线程给他执行刚才没执行的Continuation(就是await后的代码)
await的使用
注意事项
不建议在 catch/finally,不能在lock中使用await
因为根据await的原理,他会分割函数,但是catch/finally依赖连续性, lock依赖确定性的执行上下文,await会分割逻辑导致这些的不确定性!
虽然C# 6.0以后能编译器能做到在catch/finally中使用
并发异步操作的异常处理
var task1 = httpClient1.GetStringAsync("http://www.github.com");
var task2 = httpClient2.GetStringAsync("http://www.google.com");
var str1 = await task1;
var str2 = await task2;这样些的代码,如果task1发生了异常会导致task2无法被执行会丢失。
逻辑:
- task1,2 同时执行(并发),独立的异步操作
- 又有两种情况
- task1成功 -> 继续执行task2
- task1发生异常 -> task1抛出异常,那么task2永远不会被执行
- task2被吞掉,永远无法被执行
- 但是task依然存在状态机,严重可能会导致进程崩溃
正常的写法:
var task1 = httpClient1.GetStringAsync("http://www.github.com");
var task2 = httpClient2.GetStringAsync("http://www.google.com");
try
{
string[] results = await Task.WhenAll(task1, task2);
string str1 = results[0];
string str2 = results[1];
}
catch (Exception ex)// 这里的 ex 是一个 AggregateException! 会把 task1 和 task2 的所有异常都收集起来
{
foreach (var inner in ex.InnerExceptions)
{
}
}
/******.NET8中可以这么写******/
await foreach (var result in GetResultsAsync())
{
Console.WriteLine(result);
}
async IAsyncEnumerable<string> GetResultsAsync()
{
var task1 = httpClient1.GetStringAsync("...");
var task2 = httpClient2.GetStringAsync("...");
var completedTask = await Task.WhenAny(task1, task2);
yield return await completedTask; //这个完成了 直接返回 yield;下次来从这里往后执行
var remainingTask = completedTask == task1 ? task2 : task1;
yield return await remainingTask;//在等待剩下那个
}
//解析
//核心思想是 异步流式处理,手动模拟异步迭代器的状态机
//使用IAsyncEnumerable和yield return 来模拟状态机的形式
//做到那个任务先完成先输出哪个,不用等全部完成才输出- 所有 Task 都会被 await 到
- 所有异常都会被收集到 AggregateException.InnerExceptions
自定义await
主要分几步:
- 实现自己的线程池
- 实现带上下文的线程池
- 使用实现的线程池来实现自己的Task
- 实现await、async
重点!!! 理解await与async的原理,以及 异步迭代器 async iterators
自定义线程池
线程池就是将线程添加到一个队列中
public static class MyThreadPool_v1
{
static void Main(string[] args)
{
AsyncLocal<int> asyncLocal = new();
for (int i = 0; i < 1000; i++)
{
var tempIndex = i;
MyThreadPool_v1.QueueUserWorkItem(delegate
{
Console.WriteLine($" {tempIndex} .");
Thread.Sleep(500);
});
}
}
private static readonly BlockingCollection<Action> s_workItems = new();
public static void QueueUserWorkItem(Action action) => s_workItems.Add(action);
static MyThreadPool_v1()
{
//线程数
for (int i = 0; i < Environment.ProcessorCount; i++)
{
new Thread(() =>
{
while (true)
{
Action action = s_workItems.Take();
action();
}
})
{ IsBackground = true }.Start();
}
}
}BlockingCollection
线程安全且自带阻塞的队列
前后台线程
net中分前台线程与后台线程 区别:是否让创建的所有线程都退出
- 前台线程
- 当主方法结束时,前台线程后等待
- 后台线程不会等待
说人话就是退出程序时,前台线程会阻止进程退出,后台线程会跟随程序一起销毁退出
程序(进程)会等到所有前台线程都跑完才真正退出;
后台线程跑不跑完无所谓,一旦所有前台线程结束了,后台线程会被强制终止(不等它执行完)。
new Thread(()=>
{
//....
})
{
IsBackgroud = true
}.Start();带线程上下文的线程池
这样就有了一个基于线程实现的线程池,但是这样的线程池不能携带参数并且在线程池中共享 net中有线程执行上下文的对象: ExecutionContext
引入上下文对象:
static void Main(string[] args)
{
//专门为**异步流程(async/await)**设计的“上下文局部变量”或叫“环境数据”(ambient data)
AsyncLocal<int> asyncLocal = new();
for (int i = 0; i < 1000; i++)
{
asyncLocal.Value = i;
MyThreadPool_v2.QueueUserWorkItem(delegate
{
Console.WriteLine($" {asyncLocal.Value} ");
Thread.Sleep(500);
});
}
Console.ReadLine();
}
public static class MyThreadPool_v2
{
private static readonly BlockingCollection<(Action, ExecutionContext?)> s_workItems = new();
//ExecutionContext.Capture() 抓取当前上下文对象
public static void QueueUserWorkItem(Action action) => s_workItems.Add((action, ExecutionContext.Capture()));
static MyThreadPool_v2()
{
//线程数
for (int i = 0; i < Environment.ProcessorCount; i++)
{
new Thread(() =>
{
while (true)
{
(Action workItem, ExecutionContext? executionContext) = s_workItems.Take();
if (executionContext is null)
workItem();
else
{
ExecutionContext.Run(executionContext, _ => workItem(), null);
}
}
})
{ IsBackground = true }.Start();
}
}
}实现自己的Task
在带线程上下文的线程池的基础上,我们可以扩展实现自己的task类
task 任务,说得通俗点他就是安排线程干什么事情的清单。
它不是线程本身,但类似线程池
比如线程是找个人来帮你做事,做完就完事。 thread
线程池呢就是一个包工头 找了几个人来干活,把活儿给他们 他们谁闲着谁就干。 1个thread/线程池task呢 就是有一堆活 需要找人干 干完了或者失败告诉你,你可以随时让他们不干了,这个人没干了的活给其他闲着的人干。他也是在线程池中拿线程来干活
对比线程池
- 有自己的task对象,能随时知道执行完成没,结果是什么,有没有异常
- 支持ContinueWith 一个完成了继续完成后面的
- 有CancellationToken,随时取消停止执行
- 有统一的异常包装
AggregateException
Task可以说是一个包装的很好的线程池
实现自己的Task:
public class MyTask
{
//Task只是核心 只是内存的一个数据结构 目的是作为线程池的分层
private bool _completed;
private Exception? _excption;
private Action? _continuation;
private ExecutionContext? _executionContext;
public bool IsCompleted
{
get
{
lock (this)
{
return _completed;
}
}
}
public void SetResult() => Complete(null);
public void SetException(Exception exception) => Complete(exception);
private void Complete(Exception? exception)
{
lock (this)
{
if (_completed) throw new InvalidOperationException("Task already completed.");
_completed = true;
_excption = exception;
if (_continuation is not null)
{
MyThreadPool_v2.QueueUserWorkItem(() =>
{
if (_executionContext is null)
{
_continuation();
}
else
{
ExecutionContext.Run(_executionContext, _ => _continuation(), null);
}
});
}
}
}
//等待任务完成
public void Wait()
{
ManualResetEventSlim? mres = null;
lock (this)
{
if (!_completed)
{
mres = new ManualResetEventSlim();
ContinueWith(mres.Set);
}
}
mres?.Wait();
if (_excption is not null)
{
ExceptionDispatchInfo.Throw(_excption); //抛出所有的异常 叠加 不覆盖
//throw _excption;
}
}
public void ContinueWith(Action action)
{
lock (this)
{
if (_completed) MyThreadPool_v2.QueueUserWorkItem(action);
else
{
_continuation = action;
_executionContext = ExecutionContext.Capture();
}
}
}
public static MyTask Run(Action action)
{
MyTask task = new MyTask();
MyThreadPool_v2.QueueUserWorkItem(() =>
{
try
{
action();
}
catch (Exception ex)
{
task.SetException(ex);
return;
}
task.SetResult();
});
return task;
}
public static MyTask WhenAll(List<MyTask> tasks)
{
MyTask myTask = new();
if (tasks.Count == 0)
myTask.SetResult();
else
{
int remaining = tasks.Count;
Action continuation = () =>
{
if (Interlocked.Decrement(ref remaining) == 0)
{
myTask.SetResult();
}
};
foreach (var task in tasks)
{
task.ContinueWith(continuation);
}
}
return myTask;
}
}
static void Main(string[] args)
{
AsyncLocal<int> asyncLocal = new();
List<MyTask> tasks = new();
for (int i = 0; i < 100; i++)
{
asyncLocal.Value = i;
//tasks中的delegate在添加到线程池中就以及开始执行了,并且根据MyTask的实现 使用的线程MyThreadPool_v2来做的
tasks.Add(MyTask.Run(delegate
{
Console.WriteLine($" {asyncLocal.Value} ");
Thread.Sleep(900);
}));
}
MyTask.WhenAll(tasks).Wait(); //等待所有任务完成
//以上就完成了一个task的任务执行,等待
//以及基础的任务接任务ContinueWith
Console.Write("Hello, ");
MyTask.Delay(2000).ContinueWith(delegate
{
Console.Write("World!");
MyTask.Delay(1000).ContinueWith(delegate
{
Console.Write(" And Coder !!!");
});
});
Console.ReadLine();//异步的工作需等待线程执行完成 防止主程序退出.
/******************************************************/
}考虑ContinueWith调用等待及退出线程的问题
为什么要考虑Console.ReadLine();//异步的工作需等待线程执行完成 防止主程序退出? 明明我们已经是后台线程了??
正常来说使用MyThreadPool_v2实现的任务执行会受到
IsBackground的影响,比如设置为false代表了前台线程会等待任务执行完成才退出。
但是!!!我们实现的Delay没有走线程池 他是使用的timer,timer也是后台池线程 还没有来得及触发回调去调用线程池执行任务,主线程已经退出了
可以将ContinueWith返回mytask,这样可以使用wait来等待 就不会退出了
尝试wait ContinueWith:
public MyTask ContinueWith(Action action)
{
MyTask task = new();
Action callback = () =>
{
try
{
action();
}
catch (Exception ex)
{
task.SetException(ex);
return;
}
task.SetResult();
};
lock (this)
{
if (_completed) MyThreadPool_v2.QueueUserWorkItem(callback);
else
{
_continuation = callback;
_executionContext = ExecutionContext.Capture();
}
}
return task;
}
main()
{
//完成ContinueWith的扩展
Console.Write("Hello, ");
MyTask.Delay(2000).ContinueWith(delegate
{
Console.Write("World!");
MyTask.Delay(1000).ContinueWith(delegate
{
Console.Write(" And Coder !!!");
});
}).Wait();
}从结果来看并没有我们预想的输出,只输出了 Hello, World!就退出了,为什么?
拆分代码来看
.Wait();只对MyTask.Delay(2000).ContinueWith(delegate{ }).Wait()的内部负责
比如MyTask.Delay(1000).ContinueWith(delegate { Console.Write(" And Coder !!!"); });属于内部的子任务 他只等待delay(2000)的任务
所以我们需要更改一下他的结构,让任务链条一样 一个一个执行完成
考虑链式调用内部的问题
由于目前是 Action ,无返回的委托 对链式调用不友好 所以可以扩展重载有参委托,把任务返回来调用
public MyTask ContinueWith(Func<MyTask> action)
{
MyTask task = new();
Action callback = () =>
{
try
{
MyTask next = action();
next.ContinueWith(() =>
{
if (next._excption is not null)
{
task.SetException(next._excption);
}
else
{
task.SetResult();
}
});
}
catch (Exception ex)
{
task.SetException(ex);
return;
}
};
lock (this)
{
if (_completed) MyThreadPool_v2.QueueUserWorkItem(callback);
else
{
_continuation = callback;
_executionContext = ExecutionContext.Capture();
}
}
return task;
}
main()
{
Console.Write("Hello, ");
MyTask.Delay(2000).ContinueWith(delegate
{
Console.Write("World!");
return MyTask.Delay(1000).ContinueWith(delegate
{
Console.Write(" And Coder !!!");
});
});
}达到了我们预期的输出 Hello, World! And Coder !!! 这样就可以返回任务 等待每个任务执行完成后再退出!
简洁的串行链式
但是这样写跟我们日常的写法好像不一样,我们日常以及预期的写法是这样:
Console.Write("Hello, ");
MyTask.Delay(2000).ContinueWith(delegate
{
Console.Write("World!");
}).ContinueWith(delegate
{
Console.Write(" And ");
}).ContinueWith(delegate
{
Console.Write(" Code !");
}).Wait();但是这样写无法满足在后面的ContinueWith也Delay(2000),输出Hello, 后除了第一个World! 后面的都是一起执行,并不会等待!类似这样:
for (int i = 0; ; i++)
{
MyTask.Delay(1000); //想要的其实是 await MyTask.Delay(1000); 这种效果
Console.WriteLine(i);
}
//会无限的输出累加,并不会等待!!!我们需要一个什么? await! 想实现这个需要知道一个东西:
异步迭代器 async iterators
C# 2.0 就有了
IEnumerable<int> Count(int count)
{
for (int i = 0; i < count; i++)
{
yield return i;
}
}
foreach (var i in Count(10))
{
Console.WriteLine(i);
}
//输出0,1,2,3,....9这是一个迭代器的写法,主要依赖于 yield return。惰性求值
的语法糖糖,生成一个可枚举的序列,而不是集合
编译器会将其转换为一个状态机,实现懒加载,每次调用的时候才会触发拿下一个值
每次执行到yield return i;就会暂停方法,并且返回i给调用者,调用者想要下一个元素的时候,会冲上次暂停的地方继续执行 不会从头开始执行 就有点等待的感觉了
原理,编译器做了什么? :
private class CountEnumerable : IEnumerable<int>, IEnumerator<int>
{
private int count;
private int state = -1; // 记录当前状态 怎么执行
private int i; // 保存局部变量
public bool MoveNext()
{
switch (state)
{
case -1: // 第一次调用
i = 0;
state = 0;
goto case 0;
case 0:
if (i < count)
{
Current = i; // 返回当前值
i++;
state = 0;
return true; // 还有下一个
}
state = 1;
return false; // 结束了
case 1:
return false;
}
}
public int Current { get; private set; }
// ... 其他接口实现
}foreach隐式执行了:
var e = Count(10).GetEnumerator();
while (e.MoveNext())
{
int i = e.Current;
Console.WriteLine(i);
}总结:yield return会将一个普通函数变成可以暂停/恢复的状态机;每次yield return就暂停,下次MoveNext从暂停的地方再恢复! 学会了 yield return,就已经理解了 async/await 80% 的原理了!
注!!!!!
的编译器中编译器中,用于支持支持实现迭代器(iterators)与异步方法(async methods)的逻辑90%是相同的! 可能在细节上有些差异,但是本质上都是在实现一个状态机(保存执行的状态),这个状态机允许函数在某个点暂停、退出、重新进入并恢复到之前的状态继续执行 真正不同的地方在于,是谁在调用 MoveNext() ?
- 比如
foreach逻辑中隐式调用的enumerator.MoveNext() - 异步函数中
await完成Task后,自动调用的continuation,或者我们实现的mytask里.ContinueWith(MoveNext)
实现await/async
就可以使用 yield return 的特性和原理来实现这个功能。其实是利用的编译器的特性来实现的
- 给 MyTask 扩展一个迭代器
public class MyTask
{
//其他已实现的...
//mytask异步迭代器 虽然是异步 但是是串行的一个接一个
public static MyTask Iterate(IEnumerable<MyTask> tasks)
{
MyTask t = new();
IEnumerator<MyTask> e = tasks.GetEnumerator();
void MoveNext()
{
try
{
if (e.MoveNext())
{
MyTask currentTask = (MyTask)e.Current;
currentTask.ContinueWith(MoveNext);
return;
}
}
catch (Exception ex)
{
t.SetException(ex);
return;
}
t.SetResult();
}
MoveNext();
return t;
}
}
async static Task Main(string[] args)
{
IEnumerable<MyTask> PrintAsync()
{
for (int i = 0; ; i++)
{
yield return MyTask.Delay(1000);
Console.WriteLine(i);
}
}
MyTask.Iterate(PrintAsync()).Wait();输出: 每秒一个累加! 达成我们的目标
实现语法糖awaiter
public class MyTask
{
//其他已实现的...
public struct Awaiter(MyTask t) : INotifyCompletion
{
public Awaiter GetAwaiter() => this;
public bool IsCompleted => t.IsCompleted;
public void GetResult() => t.Wait();
public void OnCompleted(Action continuation) => t.ContinueWith(continuation);
}
public Awaiter GetAwaiter() => new Awaiter(this);
}await 就是状态机的暂停点
完整代码:
using System.Collections;
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
namespace DeepNet
{
internal class Program
{
#region 自定义线程池V1
/*************************************自定义线程池****************************************************/
/// <summary>
/// 自定义线程池v1
/// 只能执行工作项,无法携带参数
/// </summary>
public static class MyThreadPool_v1
{
//call
/*
static void Main(string[] args)
{
AsyncLocal<int> asyncLocal = new();
for (int i = 0; i < 1000; i++)
{
var tempIndex = i;
MyThreadPool_v1.QueueUserWorkItem(delegate
{
Console.WriteLine($" {tempIndex} .");
Thread.Sleep(500);
});
}
}
*/
private static readonly BlockingCollection<Action> s_workItems = new();
public static void QueueUserWorkItem(Action action) => s_workItems.Add(action);
static MyThreadPool_v1()
{
//线程数
for (int i = 0; i < Environment.ProcessorCount; i++)
{
new Thread(() =>
{
while (true)
{
Action action = s_workItems.Take();
action();
}
})
{ IsBackground = true }.Start();
}
}
}
#endregion
async static Task Main(string[] args)
{
async Task PrintAsync()
{
for (int i = 0; ; i++)
{
await MyTask.Delay(1000);
Console.WriteLine(i);
}
}
PrintAsync().Wait();
return;
/********************void ContinueWith 执行等待的问题**********************************/
//Console.Write("Hello, ");
//MyTask.Delay(2000).ContinueWith(() =>
//{
// Console.WriteLine("World!");
//});
//Console.ReadLine(); //异步工作,防止主线程退出
//ContinueWith是void时,正常来说使用MyThreadPool_v2实现的任务执行会受到IsBackground的影响,比如设置为false代表了前台线程会等待任务执行完成才退出。
//但是!!!!!!!!!我们实现的Delay没有走线程池 他是使用的timer,timer也是后台池线程 还没有来得及触发回调去调用线程池执行任务,主线程已经退出了
//可以将ContinueWith返回mytask,这样可以使用wait来等待 就不会退出了
/******************************************************/
/********************MyTask ContinueWith 使用Wait()**********************************/
//完成ContinueWith的扩展
//Console.Write("Hello, ");
//MyTask.Delay(2000).ContinueWith(delegate
//{
// Console.Write("World!");
// MyTask.Delay(1000).ContinueWith(delegate
// {
// Console.Write(" And Coder !!!");
// });
//}).Wait();
/******************************************************/
/*********************链式*********************************/
//Console.Write("Hello, ");
//MyTask.Delay(2000).ContinueWith(delegate
//{
// Console.Write("World!");
// return MyTask.Delay(1000).ContinueWith(delegate
// {
// Console.Write(" And Coder !!!");
// });
//}).Wait();
/* 如何修改为链式调动 这样写他其实不会等待上一个任务完成才执行下一个任务 而是直接注册了多个回调函数 到时候Delay时间一到就会同时触发这些回调函数的执行 */
Console.Write("Hello, ");
MyTask.Delay(2000).ContinueWith(delegate
{
Console.Write("World!");
}).ContinueWith(delegate
{
Console.Write(" And ");
}).ContinueWith(delegate
{
Console.Write(" Code !");
}).Wait();
///****** 类比这种写法 ******/
//for (int i = 0; ; i++)
//{
// MyTask.Delay(1000); //想要的其实是 await MyTask.Delay(1000); 这种效果
// Console.WriteLine(i);
//}
/******************************************************/
return;
//专门为**异步流程(async/await)**设计的“上下文局部变量”或叫“环境数据”(ambient data)
AsyncLocal<int> asyncLocal = new();
List<MyTask> tasks = new();
for (int i = 0; i < 100; i++)
{
asyncLocal.Value = i;
//tasks中的delegate在添加到线程池中就以及开始执行了,并且根据MyTask的实现 使用的线程MyThreadPool_v2来做的
tasks.Add(MyTask.Run(delegate
{
Console.WriteLine($" {asyncLocal.Value} ");
Thread.Sleep(900);
}));
}
//MyTask.WhenAll(tasks).Wait(); //等待所有任务完成
Console.ReadLine();
}
}
/*************************************自定义携带上下文对象的线程池****************************************************/
/*
{
//专门为**异步流程(async/await)**设计的“上下文局部变量”或叫“环境数据”(ambient data)
AsyncLocal<int> asyncLocal = new();
for (int i = 0; i < 1000; i++)
{
asyncLocal.Value = i;
MyThreadPool_v2.QueueUserWorkItem(delegate
{
Console.WriteLine($" {asyncLocal.Value} ");
Thread.Sleep(500);
});
}
Console.ReadLine();
}
*/
/// <summary>
/// 携带参数
/// 引入执行上下文的概念 ExecutionContext
/// </summary>
public static class MyThreadPool_v2
{
private static readonly BlockingCollection<(Action, ExecutionContext?)> s_workItems = new();
//ExecutionContext.Capture() 抓取当前上下文对象
public static void QueueUserWorkItem(Action action) => s_workItems.Add((action, ExecutionContext.Capture()));
static MyThreadPool_v2()
{
//线程数
for (int i = 0; i < Environment.ProcessorCount; i++)
{
new Thread(() =>
{
while (true)
{
(Action workItem, ExecutionContext? executionContext) = s_workItems.Take();
if (executionContext is null)
workItem();
else
{
//ExecutionContext.Run(executionContext, state => ((Action)state!).Invoke(), workItem);
ExecutionContext.Run(executionContext, _ => workItem(), null);
}
}
})
{ IsBackground = true }.Start();
}
}
}
public class MyTask
{
//Task只是核心 只是内存的一个数据结构 目的是作为线程池的分层
private bool _completed;
private Exception? _excption;
private Action? _continuation;
private ExecutionContext? _executionContext;
public bool IsCompleted
{
get
{
lock (this)
{
return _completed;
}
}
}
public void SetResult() => Complete(null);
public void SetException(Exception exception) => Complete(exception);
private void Complete(Exception? exception)
{
lock (this)
{
if (_completed) throw new InvalidOperationException("Task already completed.");
_completed = true;
_excption = exception;
if (_continuation is not null)
{
MyThreadPool_v2.QueueUserWorkItem(() =>
{
if (_executionContext is null)
{
_continuation();
}
else
{
ExecutionContext.Run(_executionContext, _ => _continuation(), null);
}
});
}
}
}
//等待任务完成
public void Wait()
{
ManualResetEventSlim? mres = null;
lock (this)
{
if (!_completed)
{
mres = new ManualResetEventSlim();
ContinueWith(mres.Set);
}
}
mres?.Wait();
if (_excption is not null)
{
ExceptionDispatchInfo.Throw(_excption); //抛出所有的异常 叠加 不覆盖
//throw _excption;
}
}
public MyTask ContinueWith(Action action)
{
MyTask task = new();
Action callback = () =>
{
try
{
action();
}
catch (Exception ex)
{
task.SetException(ex);
return;
}
task.SetResult();
};
lock (this)
{
if (_completed) MyThreadPool_v2.QueueUserWorkItem(callback);
else
{
_continuation = callback;
_executionContext = ExecutionContext.Capture();
}
}
return task;
}
public MyTask ContinueWith(Func<MyTask> action)
{
MyTask task = new();
Action callback = () =>
{
try
{
MyTask next = action();
next.ContinueWith(() =>
{
if (next._excption is not null)
{
task.SetException(next._excption);
}
else
{
task.SetResult();
}
});
}
catch (Exception ex)
{
task.SetException(ex);
return;
}
};
lock (this)
{
if (_completed) MyThreadPool_v2.QueueUserWorkItem(callback);
else
{
_continuation = callback;
_executionContext = ExecutionContext.Capture();
}
}
return task;
}
public static MyTask Run(Action action)
{
MyTask task = new MyTask();
MyThreadPool_v2.QueueUserWorkItem(() =>
{
try
{
action();
}
catch (Exception ex)
{
task.SetException(ex);
return;
}
task.SetResult();
});
return task;
}
/// <summary>
/// 等待所有任务执行完成
/// </summary>
/// <param name="tasks"></param>
/// <returns></returns>
public static MyTask WhenAll(List<MyTask> tasks)
{
MyTask myTask = new();
if (tasks.Count == 0)
myTask.SetResult();
else
{
int remaining = tasks.Count;
Action continuation = () =>
{
if (Interlocked.Decrement(ref remaining) == 0)
{
myTask.SetResult();
}
};
foreach (var task in tasks)
{
task.ContinueWith(continuation);
}
}
return myTask;
}
public static MyTask Delay(int time)
{
MyTask myTask = new();
new Timer(_ => myTask.SetResult()).Change(time, -1);//定义一个触发器 延迟指定时间(time) 不重复(-1)的执行一次任务(myTask.SetResult())
return myTask;
}
public static MyTask Iterate(IEnumerable<MyTask> tasks)
{
MyTask t = new();
IEnumerator<MyTask> e = tasks.GetEnumerator();
void MoveNext()
{
try
{
if (e.MoveNext())
{
MyTask currentTask = (MyTask)e.Current;
currentTask.ContinueWith(MoveNext);
return;
}
}
catch (Exception ex)
{
t.SetException(ex);
return;
}
t.SetResult();
}
MoveNext();
return t;
}
public struct Awaiter(MyTask t) : INotifyCompletion
{
public Awaiter GetAwaiter() => this;
public bool IsCompleted => t.IsCompleted;
public void GetResult() => t.Wait();
public void OnCompleted(Action continuation) => t.ContinueWith(continuation);
}
public Awaiter GetAwaiter() => new Awaiter(this);
}
}https://www.youtube.com/watch?v=R-z2Hv-7nxk&t=415s
