外观
上下位机通讯异步
上下位机通讯时有一个核心需求。上位机请求下位机时 下位机的硬件移动需要时间,由于协议之类的原因请求之后会立即返回结果,这个结果告知我们通讯是否成功不是硬件操作完成的结果。
开始我是用Prism 事件总线(IEventAggregator)来通知结果,但是这样写太累了
捋一捋逻辑:
我们想达到的效果
await device.MoveToAsync(1000); // 看起来像同步
MessageBox.Show("终于到位了!");但实际上它会立即返回调用是否成功, 我们想达到的效果是硬件完成动作后才返回
TaskCompletionSource
我们可以通过异步task的方式来实现,主要依赖TaskCompletionSource
TaskCompletionSource<T>是 .NET 中用于手动控制Task<T>生命周期的一个关键类,常用于将基于回调、事件或非 Task 异步模型的代码桥接到基于async/await的异步编程模型中。当无法直接使用
async/await(例如调用一个只提供回调接口的 API),但又希望对外暴露一个Task<T>供调用方await,就可以使用TaskCompletionSource<T>。
| 方法 | 作用 |
|---|---|
SetResult(T) | 将 Task 置为 成功完成 状态,并设置结果 |
SetException(Exception) | 将 Task 置为 失败 状态 |
SetCanceled() | 将 Task 置为 已取消 状态(需配合 CancellationToken 使用更佳) |
TrySetResult(T) 等 Try* 方法 | 线程安全地尝试设置状态,若 Task 已完成则返回 false |
方案1
简单清晰的一种方式
private readonly ConcurrentDictionary<int, TaskCompletionSource<Response>> _pending
= new ConcurrentDictionary<int, TaskCompletionSource<Response>>();
//发送命令
public async Task SendCommandAsync(Command cmd)
{
var tcs = new TaskCompletionSource();
_pending[cmd.Id] = tcs;
Send(cmd);
return await tcs.Task.TimeoutAfter(TimeSpan.FromSeconds(15));
}
//下位机串口处理完成后的返回事件
//只需要在其中监听 把对于命令id的task设置的结果设置就行
public void OnResponse(Response resp)
{
if (_pending.TryRemove(resp.CmdId, out var tcs))
tcs.SetResult(resp);
}方案2
支持多指令并发,但是每个指令需要有自己的id 排队、超时管理、重发
public class DeviceService
{
// 每个请求的完整信息
private sealed class PendingRequest
{
public int CmdId { get; }
public string RawCommand { get; }
public TaskCompletionSource<DeviceResponse> Tcs { get; }
public CancellationTokenSource Cts { get; } = new();
public PendingRequest(int cmdId, string rawCommand)
{
CmdId = cmdId;
RawCommand = rawCommand;
Tcs = new TaskCompletionSource<DeviceResponse>();
}
}
private int _nextCmdId = 1;
private readonly Channel<PendingRequest> _requestChannel = Channel.CreateUnbounded<PendingRequest>();
private readonly ConcurrentDictionary<int, PendingRequest> _pending = new();
public DeviceService()
{
// 启动一个后台线程/任务,专门负责从 Channel 取命令并发送
Task.Run(WorkerLoopAsync);
}
// 调用
public async Task<DeviceResponse> SendAsync(string command, TimeSpan? timeout = null)
{
int cmdId = Interlocked.Increment(ref _nextCmdId);
string fullCmd = $"{command} ID={cmdId}"; // 假设协议支持 ID
var request = new PendingRequest(cmdId, fullCmd);
// 设置超时
timeout ??= TimeSpan.FromSeconds(10);
request.Cts.Token.Register(() => request.Tcs.TrySetException(
new TimeoutException($"命令超时: {command}")));
_requestChannel.Writer.TryWrite(request); // 立刻返回,排队
_pending[cmdId] = request;
SendImmediately(fullCmd); // 或者也丢给 Worker 慢慢发,看你协议
return await request.Tcs.Task;
}
// 下位机收到回应时调用这
public void OnResponseReceived(DeviceResponse response)
{
if (_pending.TryRemove(response.CmdId, out var req))
{
req.Cts.Cancel(); // 取消超时计时器
if (response.IsSuccess)
req.Tcs.TrySetResult(response);
else
req.Tcs.TrySetException(new DeviceException(response.ErrorMsg));
}
}
// 可选:真正的发送循环(支持自动重发、流控等)
private async Task WorkerLoopAsync()
{
await foreach (var req in _requestChannel.Reader.ReadAllAsync())
{
// 可以在这里加发送前延迟、流量控制、自动重试等
SendImmediately(req.RawCommand);
}
}
private void SendImmediately(string cmd) => _serialPort.WriteLine(cmd);
}第三种
支持大量设备 每个指令都是一个独立类,业务逻辑清晰 统一的日志、超时设置
// 底层通信服务(方案一/三的核心,负责TCS匹配)
public class DeviceService
{
private readonly ConcurrentDictionary<int, TaskCompletionSource<DeviceResponse>> _pending = new();
private int _cmdId = 0;
public async Task<DeviceResponse> ExecuteCommandAsync(string command, TimeSpan timeout)
{
int cmdId = Interlocked.Increment(ref _cmdId);
var tcs = new TaskCompletionSource<DeviceResponse>();
_pending[cmdId] = tcs;
// 发送命令(假设协议带cmdId)
SendToHardware($"{command} [ID:{cmdId}]");
try
{
return await tcs.Task.WaitAsync(timeout);
}
finally
{
_pending.TryRemove(cmdId, out _);
}
}
// 下位机回包时调用(串口/TCP 接收线程)
public void OnHardwareResponse(DeviceResponse response)
{
if (response.TryGetCmdId(out int cmdId) &&
_pending.TryRemove(cmdId, out var tcs))
{
if (response.IsSuccess)
tcs.TrySetResult(response);
else
tcs.TrySetException(new DeviceExecuteException(response.ErrorCode, response.Message));
}
}
}// 操作接口(带返回值)
public interface IDeviceOperation<out TResult>
{
Task<TResult> ExecuteAsync(DeviceService device, CancellationToken ct = default);
}
//移动操作
public class MoveToOperation : IDeviceOperation<MoveResult>
{
private readonly double _position;
public MoveToOperation(double position) => _position = position;
public async Task<MoveResult> ExecuteAsync(DeviceService device, CancellationToken ct = default)
{
var resp = await device.ExecuteCommandAsync($"MOVE {_position}", TimeSpan.FromSeconds(30));
// 这里可以把硬件的原始响应转成业务想要的强类型结果
return new MoveResult
{
Success = resp.IsSuccess,
ActualPosition = resp.GetValue<double>("POS"),
ErrorCode = resp.ErrorCode,
Message = resp.Message
};
}
}
public class MoveResult // 业务层想要的结果类型
{
public bool Success { get; set; }
public double ActualPosition { get; set; }
public int ErrorCode { get; set; }
public string Message { get; set; }
}// 统一执行器 (加日志、统一超时、异常包装等)
// 在DeviceService的基础上包了一层,用于处理日志异常等统一管理的东西
public class DeviceOperationExecutor
{
private readonly DeviceService _device;
public DeviceOperationExecutor(DeviceService device)
{
_device = device;
}
public async Task<TResult> ExecuteAsync<TResult>(
IDeviceOperation<TResult> operation,
CancellationToken ct = default)
{
string opName = operation.GetType().Name;
// _logger.LogInformation("开始执行 {Operation} 参数:{Params}", opName, ...);
try
{
// 真正执行,并等待硬件结果
return await operation.ExecuteAsync(_device, ct);
}
catch (TimeoutException)
{
throw new DeviceTimeoutException(opName);
}
catch (DeviceExecuteException ex)
{
// 可以在这里统一转成业务异常,或记录硬件错误码
throw;
}
catch (Exception ex)
{
// 未知异常
throw new DeviceOperationFailedException(opName, ex);
}
finally
{
}
}
}调用
public class MainViewModel : BindableBase
{
private readonly DeviceOperationExecutor _executor;
private readonly DeviceService _device; // 可选,如果你想直接用底层
public MainViewModel(
DeviceOperationExecutor executor,
DeviceService device)
{
_executor = executor;
_device = device;
// 命令绑定
MoveCommand = new AsyncCommand(MoveToAsync);
ReadTempCommand = new AsyncCommand(ReadTemperatureAsync);
}
// Prism 的 IAsyncCommand(推荐 Prism 9+ 的 AsyncDelegateCommand)
public IAsyncCommand MoveCommand { get; }
// 命令执行方法(最常见的地方)
private async Task MoveToAsync()
{
try
{
// 传入一个具体的操作对象,await 它,得到硬件真实结果
MoveResult result = await _executor.ExecuteAsync(
new MoveToOperation(1000.0)); // 位置 1000mm
// 现在 result 就是下位机真正执行完后的结果(成功/失败、实际位置、错误码等)
if (result.Success)
{
StatusMessage = $"移动成功!实际位置:{result.ActualPosition:F2} mm";
// 更新 UI、进度条等
}
else
{
StatusMessage = $"移动失败!错误码:{result.ErrorCode} - {result.Message}";
// 弹窗提示或记录日志
}
}
catch (DeviceTimeoutException ex)
{
StatusMessage = $"超时:{ex.Message}";
}
catch (DeviceExecuteException ex)
{
StatusMessage = $"硬件错误:{ex.ErrorCode} - {ex.Message}";
}
catch (Exception ex)
{
StatusMessage = $"未知异常:{ex.Message}";
}
}
// 另一个例子:读取温度
private async Task ReadTemperatureAsync()
{
double temp = await _executor.ExecuteAsync(new ReadTemperatureOperation());
CurrentTemperature = temp;
// UI 绑定自动更新
}
}