Skip to content

上下位机通讯异步

约 1495 字大约 5 分钟

NET框架

2026-01-12

上下位机通讯时有一个核心需求。上位机请求下位机时 下位机的硬件移动需要时间,由于协议之类的原因请求之后会立即返回结果,这个结果告知我们通讯是否成功不是硬件操作完成的结果。

开始我是用Prism 事件总线(IEventAggregator)来通知结果,但是这样写太累了

捋一捋逻辑:

我们想达到的效果

await device.MoveToAsync(1000); // 看起来像同步 
MessageBox.Show("终于到位了!");

但实际上它会立即返回调用是否成功, 我们想达到的效果是硬件完成动作后才返回

TaskCompletionSource

我们可以通过异步task的方式来实现,主要依赖TaskCompletionSource

TaskCompletionSource<T> 是 .NET 中用于手动控制 Task<T> 生命周期的一个关键类,常用于将基于回调、事件或非 Task 异步模型的代码桥接到基于 async/await 的异步编程模型中。

当无法直接使用 async/await(例如调用一个只提供回调接口的 API),但又希望对外暴露一个 Task<T> 供调用方 await,就可以使用 TaskCompletionSource<T>

方法作用
SetResult(T)Task 置为 成功完成 状态,并设置结果
SetException(Exception)Task 置为 失败 状态
SetCanceled()Task 置为 已取消 状态(需配合 CancellationToken 使用更佳)
TrySetResult(T)Try* 方法线程安全地尝试设置状态,若 Task 已完成则返回 false

方案1

简单清晰的一种方式

private readonly ConcurrentDictionary<int, TaskCompletionSource<Response>> _pending 
    = new ConcurrentDictionary<int, TaskCompletionSource<Response>>();

//发送命令
public async Task SendCommandAsync(Command cmd) 
{ 
	var tcs = new TaskCompletionSource(); 
	_pending[cmd.Id] = tcs; 
	Send(cmd); 
	return await tcs.Task.TimeoutAfter(TimeSpan.FromSeconds(15)); 
}

//下位机串口处理完成后的返回事件
//只需要在其中监听 把对于命令id的task设置的结果设置就行
public void OnResponse(Response resp) 
{ 
	if (_pending.TryRemove(resp.CmdId, out var tcs)) 
		tcs.SetResult(resp); 
}

方案2

支持多指令并发,但是每个指令需要有自己的id 排队、超时管理、重发

public class DeviceService
{
    // 每个请求的完整信息
    private sealed class PendingRequest
    {
        public int CmdId { get; }
        public string RawCommand { get; }
        public TaskCompletionSource<DeviceResponse> Tcs { get; }
        public CancellationTokenSource Cts { get; } = new();

        public PendingRequest(int cmdId, string rawCommand)
        {
            CmdId = cmdId;
            RawCommand = rawCommand;
            Tcs = new TaskCompletionSource<DeviceResponse>();
        }
    }

    private int _nextCmdId = 1;
    private readonly Channel<PendingRequest> _requestChannel = Channel.CreateUnbounded<PendingRequest>();
    private readonly ConcurrentDictionary<int, PendingRequest> _pending = new();

    public DeviceService()
    {
        // 启动一个后台线程/任务,专门负责从 Channel 取命令并发送
        Task.Run(WorkerLoopAsync);
    }



    // 调用
    public async Task<DeviceResponse> SendAsync(string command, TimeSpan? timeout = null)
    {
        int cmdId = Interlocked.Increment(ref _nextCmdId);
        string fullCmd = $"{command} ID={cmdId}";   // 假设协议支持 ID

        var request = new PendingRequest(cmdId, fullCmd);

        // 设置超时
        timeout ??= TimeSpan.FromSeconds(10);
        request.Cts.Token.Register(() => request.Tcs.TrySetException(
            new TimeoutException($"命令超时: {command}")));

        _requestChannel.Writer.TryWrite(request);   // 立刻返回,排队
        _pending[cmdId] = request;

        SendImmediately(fullCmd); // 或者也丢给 Worker 慢慢发,看你协议

        return await request.Tcs.Task;
    }

    // 下位机收到回应时调用这
    public void OnResponseReceived(DeviceResponse response)
    {
        if (_pending.TryRemove(response.CmdId, out var req))
        {
            req.Cts.Cancel(); // 取消超时计时器
            if (response.IsSuccess)
                req.Tcs.TrySetResult(response);
            else
                req.Tcs.TrySetException(new DeviceException(response.ErrorMsg));
        }
    }

    // 可选:真正的发送循环(支持自动重发、流控等)
    private async Task WorkerLoopAsync()
    {
        await foreach (var req in _requestChannel.Reader.ReadAllAsync())
        {
            // 可以在这里加发送前延迟、流量控制、自动重试等
            SendImmediately(req.RawCommand);
        }
    }

    private void SendImmediately(string cmd) => _serialPort.WriteLine(cmd);
}

第三种

支持大量设备 每个指令都是一个独立类,业务逻辑清晰 统一的日志、超时设置

// 底层通信服务(方案一/三的核心,负责TCS匹配)
public class DeviceService
{
    private readonly ConcurrentDictionary<int, TaskCompletionSource<DeviceResponse>> _pending = new();
    private int _cmdId = 0;

    public async Task<DeviceResponse> ExecuteCommandAsync(string command, TimeSpan timeout)
    {
        int cmdId = Interlocked.Increment(ref _cmdId);
        var tcs = new TaskCompletionSource<DeviceResponse>();

        _pending[cmdId] = tcs;

        // 发送命令(假设协议带cmdId)
        SendToHardware($"{command} [ID:{cmdId}]");

        try
        {
            return await tcs.Task.WaitAsync(timeout);
        }
        finally
        {
            _pending.TryRemove(cmdId, out _);
        }
    }

    // 下位机回包时调用(串口/TCP 接收线程)
    public void OnHardwareResponse(DeviceResponse response)
    {
        if (response.TryGetCmdId(out int cmdId) && 
            _pending.TryRemove(cmdId, out var tcs))
        {
            if (response.IsSuccess)
                tcs.TrySetResult(response);
            else
                tcs.TrySetException(new DeviceExecuteException(response.ErrorCode, response.Message));
        }
    }
}
// 操作接口(带返回值)
public interface IDeviceOperation<out TResult>
{
    Task<TResult> ExecuteAsync(DeviceService device, CancellationToken ct = default);
}

//移动操作
public class MoveToOperation : IDeviceOperation<MoveResult>
{
    private readonly double _position;

    public MoveToOperation(double position) => _position = position;

    public async Task<MoveResult> ExecuteAsync(DeviceService device, CancellationToken ct = default)
    {
        var resp = await device.ExecuteCommandAsync($"MOVE {_position}", TimeSpan.FromSeconds(30));

        // 这里可以把硬件的原始响应转成业务想要的强类型结果
        return new MoveResult
        {
            Success = resp.IsSuccess,
            ActualPosition = resp.GetValue<double>("POS"),
            ErrorCode = resp.ErrorCode,
            Message = resp.Message
        };
    }
}

public class MoveResult  // 业务层想要的结果类型
{
    public bool Success { get; set; }
    public double ActualPosition { get; set; }
    public int ErrorCode { get; set; }
    public string Message { get; set; }
}
// 统一执行器 (加日志、统一超时、异常包装等)
// 在DeviceService的基础上包了一层,用于处理日志异常等统一管理的东西
public class DeviceOperationExecutor
{
    private readonly DeviceService _device;

    public DeviceOperationExecutor(DeviceService device)
    {
        _device = device;
    }

    public async Task<TResult> ExecuteAsync<TResult>(
        IDeviceOperation<TResult> operation,
        CancellationToken ct = default)
    {
        string opName = operation.GetType().Name;

        // _logger.LogInformation("开始执行 {Operation} 参数:{Params}", opName, ...);

        try
        {
            // 真正执行,并等待硬件结果
            return await operation.ExecuteAsync(_device, ct);
        }
        catch (TimeoutException)
        {
            throw new DeviceTimeoutException(opName);
        }
        catch (DeviceExecuteException ex)
        {
            // 可以在这里统一转成业务异常,或记录硬件错误码
            throw;
        }
        catch (Exception ex)
        {
            // 未知异常
            throw new DeviceOperationFailedException(opName, ex);
        }
        finally
        {
        }
    }
}

调用

public class MainViewModel : BindableBase
{
    private readonly DeviceOperationExecutor _executor;
    private readonly DeviceService _device;  // 可选,如果你想直接用底层

    public MainViewModel(
        DeviceOperationExecutor executor, 
        DeviceService device) 
    {
        _executor = executor;
        _device = device;

        // 命令绑定
        MoveCommand = new AsyncCommand(MoveToAsync);
        ReadTempCommand = new AsyncCommand(ReadTemperatureAsync);
    }

    // Prism 的 IAsyncCommand(推荐 Prism 9+ 的 AsyncDelegateCommand)
	public IAsyncCommand MoveCommand { get; }
	
	// 命令执行方法(最常见的地方)
	private async Task MoveToAsync()
	{
	    try
	    {
	        // 传入一个具体的操作对象,await 它,得到硬件真实结果
	        MoveResult result = await _executor.ExecuteAsync(
	            new MoveToOperation(1000.0));   // 位置 1000mm
	
	        // 现在 result 就是下位机真正执行完后的结果(成功/失败、实际位置、错误码等)
	        if (result.Success)
	        {
	            StatusMessage = $"移动成功!实际位置:{result.ActualPosition:F2} mm";
	            // 更新 UI、进度条等
	        }
	        else
	        {
	            StatusMessage = $"移动失败!错误码:{result.ErrorCode} - {result.Message}";
	            // 弹窗提示或记录日志
	        }
	    }
	    catch (DeviceTimeoutException ex)
	    {
	        StatusMessage = $"超时:{ex.Message}";
	    }
	    catch (DeviceExecuteException ex)
	    {
	        StatusMessage = $"硬件错误:{ex.ErrorCode} - {ex.Message}";
	    }
	    catch (Exception ex)
	    {
	        StatusMessage = $"未知异常:{ex.Message}";
	    }
	}
	
	// 另一个例子:读取温度
	private async Task ReadTemperatureAsync()
	{
	    double temp = await _executor.ExecuteAsync(new ReadTemperatureOperation());
	    CurrentTemperature = temp;
	    // UI 绑定自动更新
	}
}