RError.com

RError.com Logo RError.com Logo

RError.com Navigation

  • 主页

Mobile menu

Close
  • 主页
  • 系统&网络
    • 热门问题
    • 最新问题
    • 标签
  • Ubuntu
    • 热门问题
    • 最新问题
    • 标签
  • 帮助
主页 / 问题 / 1607798
Accepted
Alexey Trukhanov
Alexey Trukhanov
Asked:2025-02-27 17:12:09 +0000 UTC2025-02-27 17:12:09 +0000 UTC 2025-02-27 17:12:09 +0000 UTC

按时间序列范围以最快的速度过滤动态数据

  • 772

笔记

在此任务中,您可以使用任何数据类型和模块,包括来自第三方开发人员的数据类型和模块(numpy, pandas, sorted containers等等)。主要标准是执行速度。

鉴于:

我们的逻辑的输入是连续提供相同结构的字典,其中包含某个事件的信息(参数)。在这些字典中,几个值是类型float(我们称之为键param_1, param_2...),其中一个值是该事件的时间戳(我们称之为键dt)。我们将这本词典分析成不同的列表。每个参数都在单独的列表中,时间序列也在单独的列表中。因此,在不同列表中的同一索引下存在有关同一事件的信息。我们知道时间戳是不递减的,这意味着下一个时间戳可以等于或大于前一个时间戳。相邻标记之间的时间增量是有条件随机的,可以是微秒,可以是几分钟,也可以有条件地不存在。

我们还给出了一个时间间隔(我们称之为duration)

任务

为了进一步处理,我们需要上述列表包含有关duration最后一个传入事件内的事件的信息。所有早于此时间的事件均被删除。

我的决定

我们接受吧pandas.Dataframe。事件到达——我们将字典分布在数据框的各列中。dt使用 mask按列进行过滤dt > время последнего события - duration。我们将数据框分成列表。

为了清楚起见,我编写了一个流程模拟器:

import time
import datetime
import random
import pandas as pd

random.seed(123)
duration = 30
df = pd.DataFrame(columns=['Param_1', 'Param_2', 'Param_3', 'dt'])

for _ in range(10000):
    # получили словарь
    input_dict = {'Param_1': random.random(),
                  'Param_2': random.random(),
                  'Param_3': random.random(),
                  'dt': datetime.datetime.now()}

    # добавили в датафрейм
    df.loc[0 if df.empty else df.index[-1]+1] = input_dict.values()

    # отфильтровали
    df = df.loc[df['dt'].gt(df.dt.iloc[-1] - datetime.timedelta(seconds=duration))]

    # разобрали по спискам
    param_1 = df.Param_1.to_list()
    param_2 = df.Param_2.to_list()
    param_3 = df.Param_3.to_list()
    dt = df.dt.to_list()

    # это рандомная задержка, чтобы эмулировать случайную разницу между событиями
    time.sleep(random.random() * 3)

问题

还有其他更快的选择吗?我将非常感激任何有关代码优化的评论。

python
  • 3 3 个回答
  • 142 Views

3 个回答

  • Voted
  1. Best Answer
    MBo
    2025-02-27T18:09:54Z2025-02-27T18:09:54Z

    像 Pandas 这样的库可能会更快,并且算法上可以使用双向队列来实现这一点。collections.deque

    当添加到末尾时,它们将从记录的开头被删除(您自己删除它们),直到时间大于 lasttime-duration

    from collections import deque
    from time import time, sleep
    from random import randint
    
    duration = 1
    
    d = deque()
    for i in range(100):
        sleep(randint(1,10)/100)
        t = time()
        while d and t-d[0]>duration:
            d.popleft()
        d.append(t)
        print(len(d))
    
    • 4
  2. CrazyElf
    2025-02-27T18:05:50Z2025-02-27T18:05:50Z

    注意:deque MBo 答案中的队列比Pandas此任务中的队列运行速度快 10 倍,最好使用它。本质上,这就是我在答案末尾写到的环形缓冲区。


    我对你的代码做了一些调整(关闭它time.sleep并减少它duration)。相当多的时间(这在很大程度上取决于哪部分数据落入间隔)都被这段代码占用了:

        # разобрали по спискам
        param_1 = df.Param_1.to_list()
        param_2 = df.Param_2.to_list()
        param_3 = df.Param_3.to_list()
        dt = df.dt.to_list()
    

    因此代码本身相当快,但是列的转换list会耗费大量时间。考虑一下您是否需要输出列表。如果你从Pandas里面拿走它numpy.array,那么根本不需要花费时间:

        # разобрали по спискам
        param_1 = df.Param_1.values
        param_2 = df.Param_2.values
        param_3 = df.Param_3.values
        dt = df.dt.values
    

    顺便说一句,这种转变根本没有必要:

    df.loc[len(df)] = [val for val in input_dict.values()]
    

    您可以直接获取它并进行分配:

    df.loc[len(df)] = input_dict.values()
    

    从下面开始,我会尝试从日期移动到timestamp,也许这样会更快。否则,我不认为代码有那么严重的不理想或特别需要返工。

    您还可以尝试使用某种内存数据库。但不知道会不会更快。

    因此,如果您以完全算法最优的方式处理它,那么您可能可以创建所需数量的环形缓冲区(根据所需列表的数量)并通过它们移动数据,只需移动开始和结束指针即可。但再说一遍,从他们那里获取列表并非易事。 )

    • 2
  3. Alexey Trukhanov
    2025-03-03T18:03:33Z2025-03-03T18:03:33Z

    同事们,我已经找到了解决这个问题的方法,到目前为止,它是所有提出的解决方案中最快的(有保留,请参阅测试末尾的注释)。特别感谢@MBo 提供的提示collections.deque和@CrazyElf 提供的宝贵建议和想法。您给了我当前解决方案的想法。我正在发布测试测量结果和我的版本。

    测试数据

    由于此逻辑适用于已填充的数据集,为了实验的纯粹性,我创建了具有时间序列的起始测试列表。

    import time
    import datetime
    import random
    import pandas as pd
    from collections import deque
    import numpy as np
    from bisect import bisect
    
    # Количество повторов в тестах
    iterations = 60
    
    # Частота значений в тестовом временном ряде
    data_test_frequency = 1
    
    # Частота значений в поступающих данных
    stream_frequency = 5
    
    # Длинна временного ряда, под который обрезаем датасет
    duration = 32 * 60
    
    # Здесь храним время тестов
    total_time_first = total_time_second = total_time_third = total_time_fourth = total_time_fifth = 0
    
    # Функция, генерирующая тестовый таймлайн и списки
    def test_data(dur=duration, ds=data_test_frequency):
    
        random.seed(123)
        np.random.seed(123)
        start_timestamp = timestamp = datetime.datetime.now()
        timeline = [timestamp]
    
        while start_timestamp - timestamp <= datetime.timedelta(seconds=dur):
            timeline.append(timestamp := timestamp - datetime.timedelta(seconds=random.random()*ds))
    
        return (timeline[::-1],
                np.random.rand(len(timeline)).tolist(),
                np.random.rand(len(timeline)).tolist(),
                np.random.rand(len(timeline)).tolist()
                )
    

    第一个解决方案

    通过熊猫,正如问题中所说,我不会重复代码。

    一次迭代的平均时间:0.0057396014531453455

    第二种解决方案

    我们创建了四个对象deque。正如@MBo 的回答所指出的,我们从它们中删除初始元素,直到初始元素包含在我们的条件中。将对象转换deque为列表。

    ldt, l1, l2, l3 = test_data()
    
    deq_dt = deque(ldt)
    deq_par1 = deque(l1)
    deq_par2 = deque(l2)
    deq_par3 = deque(l3)
    for _ in range(iterations):
        # получили словарь
        input_dict = {'Param_1': random.random(),
                      'Param_2': random.random(),
                      'Param_3': random.random(),
                      'dt': datetime.datetime.now()}
    
        
        start_time = time.time()
    
        # Начало измеряемого блока
        # В дальнейших примерах буду приводить только этот блок
    
        deq_dt.append(input_dict['dt'])
        deq_par1.append(input_dict['Param_1'])
        deq_par2.append(input_dict['Param_2'])
        deq_par3.append(input_dict['Param_3'])
    
        while deq_dt and input_dict['dt'] - deq_dt[0] > datetime.timedelta(seconds=duration):
            deq_dt.popleft()
            deq_par1.popleft()
            deq_par2.popleft()
            deq_par3.popleft()
    
        lst_dt = list(deq_dt)
        lst_par1 = list(deq_par1)
        lst_par2 = list(deq_par2)
        lst_par3 = list(deq_par3)
        #------Конец измеряемого блока
    
        print(len(lst_dt), second:=time.time() - start_time)
        total_time_second += second
    
        time.sleep(random.random() * stream_frequency)
    print(f'All deque mean time: {total_time_second/iterations}')
    print('df/all_deque', total_time_first/total_time_second)
    

    一次迭代的平均时间:0.00020250876744588216

    使用 Pandas 的版本与使用 的版本的时间比率deque:28.342483762729824

    在不同的测试设置下,利润波动从 25 倍到 65 倍

    第三种选择

    我们创建一个对象deque和四个基础列表。在我们删除元素的同一循环中,deque我们启动一个计数器来计算删除的对象的数量。循环结束后,该计数器将包含从中取出切片的列表的索引,以截断条件中未包含的时间戳。我们取四片。

        counter = 0
        deq_dt.append(input_dict['dt'])
        lst_dt_c.append(input_dict['dt'])
        lst_par1_c.append(input_dict['Param_1'])
        lst_par2_c.append(input_dict['Param_2'])
        lst_par3_c.append(input_dict['Param_3'])
    
        while deq_dt and input_dict['dt'] - deq_dt[0] > datetime.timedelta(seconds=duration):
            counter += 1
            deq_dt.popleft()
    
        lst_dt_c = lst_dt_c[counter:]
        lst_par1_c = lst_par1_c[counter:]
        lst_par2_c = lst_par2_c[counter:]
        lst_par3_c = lst_par3_c[counter:]
    

    一次迭代的平均时间:0.00015288591384887695

    deque全部变量与计数器变量的时间比率deque:1.3245743989603638

    在不同的测试设置下,利润在 20% 到 40% 之间波动

    *我还尝试不在时间线上输入列表,而是将对象转换deque为列表 - 这个选项结果比较慢,但请参阅测试末尾的注释

    第四种选择

    从之前的版本可以看出,这个问题已经简化为尽快找到满足指定的首个元素的索引duration。我假设这是一项典型的任务,事实上,确实numpy.array有一个函数(由于添加元素然后将其转换为原始列表的np.searchsorted成本过高,因此测试失败),并且对于列表,有一个标准模块,其中有一个同名的函数,它接受一个排序列表和一些元素,并返回输入列表中可以放置此元素同时保持排序的索引。这对我们来说很合适。numpy.arraybisect

        lst_dt_c.append(input_dict['dt'])
        lst_par1_c.append(input_dict['Param_1'])
        lst_par2_c.append(input_dict['Param_2'])
        lst_par3_c.append(input_dict['Param_3'])
    
        counter = bisect(lst_dt_c, input_dict['dt']-datetime.timedelta(seconds=duration))
    
        lst_dt_c = lst_dt_c[counter:]
        lst_par1_c = lst_par1_c[counter:]
        lst_par2_c = lst_par2_c[counter:]
        lst_par3_c = lst_par3_c[counter:]
    

    一次迭代的平均时间:0.00014625787734985353

    前一个变体与该变体的时间比率bisect:1.045317466786209

    在不同的测试设置下,利润波动范围为 3% 至 10%

    笔记

    结果在很大程度上取决于时间线的结构、频率和对象的数量。例如,对象越多,从deque到 的转换list与切片的损失就越大;如果对象数量较少,则差异不那么明显。另外,如果在某个时候输入数据的频率变高并且要切割的对象数量变小(或者在多次迭代中根本没有切割任何对象),那么bisect我认为带有计数器的方法会开始获胜,因为bisect在任何情况下它都会寻找索引,并且计数器会在第一次检查时立即失败。因此,尽管我尝试为工作数据选择测试参数,但只有当我将此逻辑附加到工作流程中时,哪个选项是最好的才会变得清晰。

    非常感谢您读到最后! :)

    • 1

相关问题

  • 是否可以以某种方式自定义 QTabWidget?

  • telebot.anihelper.ApiException 错误

  • Python。检查一个数字是否是 3 的幂。输出 无

  • 解析多个响应

  • 交换两个数组的元素,以便它们的新内容也反转

Sidebar

Stats

  • 问题 10021
  • Answers 30001
  • 最佳答案 8000
  • 用户 6900
  • 常问
  • 回答
  • Marko Smith

    我看不懂措辞

    • 1 个回答
  • Marko Smith

    请求的模块“del”不提供名为“default”的导出

    • 3 个回答
  • Marko Smith

    "!+tab" 在 HTML 的 vs 代码中不起作用

    • 5 个回答
  • Marko Smith

    我正在尝试解决“猜词”的问题。Python

    • 2 个回答
  • Marko Smith

    可以使用哪些命令将当前指针移动到指定的提交而不更改工作目录中的文件?

    • 1 个回答
  • Marko Smith

    Python解析野莓

    • 1 个回答
  • Marko Smith

    问题:“警告:检查最新版本的 pip 时出错。”

    • 2 个回答
  • Marko Smith

    帮助编写一个用值填充变量的循环。解决这个问题

    • 2 个回答
  • Marko Smith

    尽管依赖数组为空,但在渲染上调用了 2 次 useEffect

    • 2 个回答
  • Marko Smith

    数据不通过 Telegram.WebApp.sendData 发送

    • 1 个回答
  • Martin Hope
    Alexandr_TT 2020年新年大赛! 2020-12-20 18:20:21 +0000 UTC
  • Martin Hope
    Alexandr_TT 圣诞树动画 2020-12-23 00:38:08 +0000 UTC
  • Martin Hope
    Air 究竟是什么标识了网站访问者? 2020-11-03 15:49:20 +0000 UTC
  • Martin Hope
    Qwertiy 号码显示 9223372036854775807 2020-07-11 18:16:49 +0000 UTC
  • Martin Hope
    user216109 如何为黑客设下陷阱,或充分击退攻击? 2020-05-10 02:22:52 +0000 UTC
  • Martin Hope
    Qwertiy 并变成3个无穷大 2020-11-06 07:15:57 +0000 UTC
  • Martin Hope
    koks_rs 什么是样板代码? 2020-10-27 15:43:19 +0000 UTC
  • Martin Hope
    Sirop4ik 向 git 提交发布的正确方法是什么? 2020-10-05 00:02:00 +0000 UTC
  • Martin Hope
    faoxis 为什么在这么多示例中函数都称为 foo? 2020-08-15 04:42:49 +0000 UTC
  • Martin Hope
    Pavel Mayorov 如何从事件或回调函数中返回值?或者至少等他们完成。 2020-08-11 16:49:28 +0000 UTC

热门标签

javascript python java php c# c++ html android jquery mysql

Explore

  • 主页
  • 问题
    • 热门问题
    • 最新问题
  • 标签
  • 帮助

Footer

RError.com

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

帮助

© 2023 RError.com All Rights Reserve   沪ICP备12040472号-5