RError.com

RError.com Logo RError.com Logo

RError.com Navigation

  • 主页

Mobile menu

Close
  • 主页
  • 系统&网络
    • 热门问题
    • 最新问题
    • 标签
  • Ubuntu
    • 热门问题
    • 最新问题
    • 标签
  • 帮助
主页 / 问题 / 1599981
Accepted
D .Stark
D .Stark
Asked:2024-11-16 23:31:09 +0000 UTC2024-11-16 23:31:09 +0000 UTC 2024-11-16 23:31:09 +0000 UTC

具有异步等待更改功能的 C# 线程安全字典

  • 772

任务是创建一个线程安全的字典,允许您异步等待键更新值。此外,如果在值中使用特殊集合,则会跟踪集合本身内的更改。如果在用户调用等待更新方法之前添加或更新了值,它将立即返回。如何优化该字典以使其在高负载环境中工作?我将很高兴收到任何改进建议(包括那些与性能改进无关的建议)。

public enum CollectionChangedAction {
    ItemAdded, 
    ItemRemoved, 
    ItemUpdated,
    Reset
}

public class CollectionChangedEventArgs : EventArgs {

    public CollectionChangedAction Action { get; private set; }

    public CollectionChangedEventArgs(CollectionChangedAction action) => Action = action;
}

public interface IObservableCollection {
    public event EventHandler<CollectionChangedEventArgs> CollectionChanged;
}

public class AsyncDictionary<TKey, TValue> : IEnumerable<KeyValuePair<TKey, TValue>> {

    private class DefaultEqualityComparer<T> : IEqualityComparer<T> {
        public static DefaultEqualityComparer<T> Instance { get; } = new DefaultEqualityComparer<T>();
        private DefaultEqualityComparer() { }

        public bool Equals(T x, T y) {
            if (default(T) == null) return ReferenceEquals(x, y);
            return EqualityComparer<T>.Default.Equals(x, y);
        }

        public int GetHashCode(T obj) {
            if (default(T) == null)
                return obj != null ? RuntimeHelpers.GetHashCode(obj) : 0;
            return EqualityComparer<T>.Default.GetHashCode(obj);
        }
    }

    /// <summary>
    /// Управляет подпиской на события изменения коллекции.
    /// </summary>
    private class CollectionChangedHandlerController {

        private EventHandler<CollectionChangedEventArgs> EventHandler;

        /// <summary>
        /// Подключенная наблюдаемая коллекция.
        /// </summary>
        public IObservableCollection Collection { get; private set; }

        public CollectionChangedHandlerController(IObservableCollection collection) {
            Collection = collection;
        }

        public void AddHandler(EventHandler<CollectionChangedEventArgs> eventHandler) {
            if (eventHandler == null)
                throw new ArgumentNullException(nameof(eventHandler));
            if (EventHandler != null)
                throw new InvalidOperationException("Обработчик уже зарегистрирован для данного события.");

            if (Collection != null) {
                EventHandler = eventHandler;
                Collection.CollectionChanged += eventHandler;
            }
        }

        public void RemoveHandler() {
            if (EventHandler != null) {
                Collection.CollectionChanged -= EventHandler;
                EventHandler = null;
            }
        }
    }

    private readonly bool IsObservableCollectionInValue;
    private readonly ConcurrentDictionary<TKey, TValue> InnerDictionary;
    private readonly ConcurrentDictionary<TKey, CollectionChangedHandlerController> InnerCollectionsDictionary;
    private readonly ConcurrentDictionary<TKey, TaskCompletionSource> Awaiters = new();
    private readonly IEqualityComparer<TValue> ValueEqualityComparer = DefaultEqualityComparer<TValue>.Instance;

    public int Count => IsObservableCollectionInValue ? InnerCollectionsDictionary.Count : InnerDictionary.Count;

    public bool IsEmpty => IsObservableCollectionInValue ? InnerCollectionsDictionary.IsEmpty : InnerDictionary.IsEmpty;

    /// <summary>
    /// Типы отслеживаемых изменений коллекции.
    /// </summary>
    public CollectionChangedAction[] TrackedChangeActions { get; private set; }

    public AsyncDictionary() {
        IsObservableCollectionInValue = typeof(TValue).IsAssignableTo(typeof(IObservableCollection));
        if (!IsObservableCollectionInValue) InnerDictionary = new();
        else {
            InnerCollectionsDictionary = new();
            TrackedChangeActions = [
                CollectionChangedAction.ItemAdded,
                CollectionChangedAction.ItemRemoved,
                CollectionChangedAction.ItemUpdated,
                CollectionChangedAction.Reset];
        }
    }

    /// <summary>
    /// Инициализирует новый экземпляр класса <see cref="AsyncDictionary{TKey, TValue}"/>, 
    /// задавая отслеживаемые типы изменений коллекции.
    /// </summary>
    /// <param name="trackedChangeActions">
    /// Массив отслеживаемых типов изменений, происходящих в коллекции.
    /// </param>
    public AsyncDictionary(CollectionChangedAction[] trackedChangeActions) : this() {
        if (trackedChangeActions == null)
            throw new ArgumentNullException(nameof(trackedChangeActions));
        if (!IsObservableCollectionInValue)
            throw new NotSupportedException("Отслеживание изменений поддерживается только для значений, реализующих интерфейс IObservableCollection.");

        TrackedChangeActions = trackedChangeActions.Distinct().ToArray();
    }

    /// <summary>
    /// Инициализирует новый экземпляр класса <see cref="AsyncDictionary{TKey, TValue}"/>, 
    /// копируя элементы из указанного словаря.
    /// </summary>
    /// <param name="valueEqualityComparer">
    /// Необязательный компаратор значений для определения необходимости уведомлений об обновлениях. 
    /// Если не указан, используется компаратор по умолчанию.
    /// </param>
    public AsyncDictionary(IDictionary<TKey, TValue> dictionary, IEqualityComparer<TValue> valueEqualityComparer = null) : this() {
        if (dictionary == null)
            throw new ArgumentNullException(nameof(dictionary));

        InnerDictionary = new ConcurrentDictionary<TKey, TValue>(dictionary);
        foreach (var key in dictionary.Keys) {
            var completionSource = new TaskCompletionSource();
            completionSource.SetResult();
            Awaiters.TryAdd(key, completionSource);
        }

        if (valueEqualityComparer != null)
            ValueEqualityComparer = valueEqualityComparer;
    }

    /// <summary>
    /// Добавляет новый элемент или обновляет существующий элемент в словаре.
    /// </summary>
    public void AddOrUpdate(TKey key, TValue value) {
        if (IsObservableCollectionInValue) {
            void onCollectionChanged(object sender, CollectionChangedEventArgs e) {
                if (TrackedChangeActions.Contains(e.Action) &&
                    Awaiters.TryGetValue(key, out var completionSource) &&
                    !completionSource.Task.IsCompleted) {
                    completionSource.SetResult();
                }
            };

            var newValue = new CollectionChangedHandlerController((IObservableCollection)value);
            newValue.AddHandler(onCollectionChanged);
            InnerCollectionsDictionary.AddOrUpdate(key, newValue, (key, existingValue) => {
                existingValue.RemoveHandler();
                return newValue;
            });
        } else {
            bool newValueProduced = true;
            InnerDictionary.AddOrUpdate(key, value, (key, existingValue) => {
                newValueProduced = !ValueEqualityComparer.Equals(value, existingValue);
                return value;
            });

            if (newValueProduced)
                Awaiters.GetOrAdd(key, _ => new TaskCompletionSource()).TrySetResult();
        }
    }

    /// <summary>
    /// Пытается получить значение из словаря по ключу.
    /// </summary>
    public bool TryGetValue(TKey key, out TValue value) {
        if (IsObservableCollectionInValue) {
            if (InnerCollectionsDictionary.TryGetValue(key, out var handlerController)) {
                value = (TValue)handlerController.Collection;
                return true;
            }
            value = default;
            return false;
        } else {
            return InnerDictionary.TryGetValue(key, out value);
        }
    }

    /// <summary>
    /// Пытается удалить элемент из словаря по ключу.
    /// </summary>
    public bool TryRemove(TKey key) {
        if (IsObservableCollectionInValue) {
            if (!InnerCollectionsDictionary.TryRemove(key, out var handlerController))
                return false;
            handlerController.RemoveHandler();
        } else if (!InnerDictionary.TryRemove(key, out _)) {
            return false;
        }

        if (Awaiters.TryRemove(key, out var completionSource))
            completionSource.TrySetCanceled();
        return true;
    }

    /// <summary>
    /// Очищает словарь, удаляя все элементы.
    /// </summary>
    public void Clear() {
        if (IsObservableCollectionInValue) {
            var handlerControllers = InnerCollectionsDictionary.Values.ToArray();
            InnerCollectionsDictionary.Clear();
            foreach (var handlerController in handlerControllers)
                handlerController.RemoveHandler();
        } else {
            InnerDictionary.Clear();
        }

        var completionSources = Awaiters.Values.ToArray();
        Awaiters.Clear();
        foreach (var completionSource in completionSources)
            completionSource.TrySetCanceled();
    }

    /// <summary>
    /// Асинхронно ожидает изменения значения для указанного ключа.
    /// </summary>
    /// <param name="key">Ключ отслеживаемого значения.</param>
    /// <param name="cancellationToken">Токен отмены операции.</param>
    /// <returns>Обновлённое значение, связанное с указанным ключом.</returns>
    public async ValueTask<TValue> WaitForUpdate(TKey key, CancellationToken cancellationToken) {
        var completionSource = Awaiters.GetOrAdd(key, _ => new TaskCompletionSource());
        await completionSource.Task.WaitAsync(cancellationToken);

        Awaiters.TryUpdate(key, new TaskCompletionSource(), completionSource);

        TValue value;
        if (IsObservableCollectionInValue) {
            if (!InnerCollectionsDictionary.TryGetValue(key, out var handlerController))
                throw new OperationCanceledException();
            value = (TValue)handlerController.Collection;
        } else {
            if (!InnerDictionary.TryGetValue(key, out value))
                throw new OperationCanceledException();
        }

        return value;
    }

    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator() {
        if (IsObservableCollectionInValue) {
            return InnerCollectionsDictionary.ToDictionary(pair => pair.Key,
                pair => (TValue)pair.Value.Collection).GetEnumerator();
        }

        return InnerDictionary.GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
c#
  • 1 1 个回答
  • 54 Views

1 个回答

  • Voted
  1. Best Answer
    aepot
    2024-11-17T04:25:54Z2024-11-17T04:25:54Z

    字典不应该关心其含义的内容。例如,其他人应该这样做Channel<T>。

    假设有一条这样的消息TextMessage

    public interface IMessage { }
    
    public class GenericMessage<T> : IMessage
    {
        public T Body { get; init; } 
    }
    
    public class TextMessage : GenericMessage<string> { }
    

    然后你就可以“跪下”写下面的类了

    public class MessageDispatcher
    {
        private readonly ConcurrentDictionary<Type, object> _channels = new();
    
        public ChannelReader<T> GetConsumer<T>() where T : IMessage
        {
            if (_channels.ContainsKey(typeof(T)))
                throw new InvalidOperationException($"Consumer for {typeof(T).Name} is already registered");
            var channel = Channel.CreateUnbounded<T>();
            _channels.TryAdd(typeof(T), channel.Writer);
            // если канал закрывается, следующая строчка удалит его из словаря автоматически
            channel.Reader.Completion.ContinueWith(_ => _channels.Remove(typeof(T), out object _));
            return channel.Reader;
        }
    
        public ChannelWriter<T> GetProducer<T>() where T: IMessage
        {
            return _channels.TryGetValue(typeof(T), out object instance)
                ? (ChannelWriter<T>)instance
                : throw new InvalidOperationException($"Consumer for {typeof(T).Name} is not registered");
        }
    }
    

    当然,我不确定他的逻辑是否100%适合你,但作为例子应该是正常的。

    internal class Program
    {
        static async Task Main(string[] args)
        {
            MessageDispatcher dispatcher = new();
            var reader = dispatcher.GetConsumer<TextMessage>();
            Task consumerTask = ConsumeTextMessageAsync(reader);
    
            var writer = dispatcher.GetProducer<TextMessage>();
            writer.TryWrite(new TextMessage { Body = "Hello" });
            writer.TryWrite(new TextMessage { Body = "World" });
            writer.Complete();
    
            await consumerTask;
            try
            {
                writer = dispatcher.GetProducer<TextMessage>();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    
        static async Task ConsumeTextMessageAsync(ChannelReader<TextMessage> reader)
        {
            await foreach (TextMessage message in reader.ReadAllAsync())
            {
                Console.WriteLine($"Message received: {message.Body}");
            }
        }
    }
    

    我正在启动

    Message received: Hello
    Message received: World
    Consumer for TextMessage is not registered
    

    它似乎有效,错误处理也有效。

    也就是说,你可以剪切和继承任何类型的消息,并且每种类型都会有自己的通道,可以根据同一消息的类型来创建通道。

    当然,上面的代码实现了“一个消费者 = 尽可能多的生产者”的方案,但本质上,通道在技术上允许您创建任意数量的消费者,以及创建任意数量的生产者,例如,平衡消费者之间的负载。通道是一个强大的东西,并且有非常严肃的表现。

    • 2

相关问题

  • 使用嵌套类导出 xml 文件

  • 分层数据模板 [WPF]

  • 如何在 WPF 中为 ListView 手动创建列?

  • 在 2D 空间中,Collider 2D 挂在玩家身上,它对敌人的重量相同,我需要它这样当它们碰撞时,它们不会飞向不同的方向。统一

  • 如何在 c# 中使用 python 神经网络来创建语音合成?

  • 如何知道类中的方法是否属于接口?

Sidebar

Stats

  • 问题 10021
  • Answers 30001
  • 最佳答案 8000
  • 用户 6900
  • 常问
  • 回答
  • Marko Smith

    我看不懂措辞

    • 1 个回答
  • Marko Smith

    请求的模块“del”不提供名为“default”的导出

    • 3 个回答
  • Marko Smith

    "!+tab" 在 HTML 的 vs 代码中不起作用

    • 5 个回答
  • Marko Smith

    我正在尝试解决“猜词”的问题。Python

    • 2 个回答
  • Marko Smith

    可以使用哪些命令将当前指针移动到指定的提交而不更改工作目录中的文件?

    • 1 个回答
  • Marko Smith

    Python解析野莓

    • 1 个回答
  • Marko Smith

    问题:“警告:检查最新版本的 pip 时出错。”

    • 2 个回答
  • Marko Smith

    帮助编写一个用值填充变量的循环。解决这个问题

    • 2 个回答
  • Marko Smith

    尽管依赖数组为空,但在渲染上调用了 2 次 useEffect

    • 2 个回答
  • Marko Smith

    数据不通过 Telegram.WebApp.sendData 发送

    • 1 个回答
  • Martin Hope
    Alexandr_TT 2020年新年大赛! 2020-12-20 18:20:21 +0000 UTC
  • Martin Hope
    Alexandr_TT 圣诞树动画 2020-12-23 00:38:08 +0000 UTC
  • Martin Hope
    Air 究竟是什么标识了网站访问者? 2020-11-03 15:49:20 +0000 UTC
  • Martin Hope
    Qwertiy 号码显示 9223372036854775807 2020-07-11 18:16:49 +0000 UTC
  • Martin Hope
    user216109 如何为黑客设下陷阱,或充分击退攻击? 2020-05-10 02:22:52 +0000 UTC
  • Martin Hope
    Qwertiy 并变成3个无穷大 2020-11-06 07:15:57 +0000 UTC
  • Martin Hope
    koks_rs 什么是样板代码? 2020-10-27 15:43:19 +0000 UTC
  • Martin Hope
    Sirop4ik 向 git 提交发布的正确方法是什么? 2020-10-05 00:02:00 +0000 UTC
  • Martin Hope
    faoxis 为什么在这么多示例中函数都称为 foo? 2020-08-15 04:42:49 +0000 UTC
  • Martin Hope
    Pavel Mayorov 如何从事件或回调函数中返回值?或者至少等他们完成。 2020-08-11 16:49:28 +0000 UTC

热门标签

javascript python java php c# c++ html android jquery mysql

Explore

  • 主页
  • 问题
    • 热门问题
    • 最新问题
  • 标签
  • 帮助

Footer

RError.com

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

帮助

© 2023 RError.com All Rights Reserve   沪ICP备12040472号-5