不太清楚结构
async with lock:对于python。
我需要这样做,以便一次只执行一个函数来处理数据库。也就是说,对数据库有独占访问权。有几个功能可以与基础一起工作。
asyncioscheduler.add_job()按计划运行 N 个函数doWork1, doWork2, doWork3等,这些函数又调用函数与数据库 ( query1, query2, query3) 一起工作。问题是,async with lock 可以帮助限制同时执行一个用于处理数据库的函数吗?如果是这样,怎么做?这个想法是为这个任务使用一个队列,但它是可能的并且lock可以帮助简化实现。
下面是正在解决的问题的工作伪代码。要求每单位时间只有一个分配的块工作async with lock:,无论正在执行哪个功能strategyBuy或strategySell这些功能的实例有多少正在运行。所有其他块必须“排队”并按顺序处理。一个问题是否可以 async with lock:帮助特定任务的给定实现?在示例的当前结果中,您可以看到条目Bought2.出现了多次,这表明 for 的代码段async with lock:是 strategyBuy并行执行的,这是不可接受的。理论上Bought2.,只有在数据库中没有记录或没有带有side: 'buy'and的记录时才会显示parent_id is null。在第一次运行时,这些条件都是 True(仅适用于第一个条目)。
import random
import asyncio
import aiosqlite as aiosql
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import uuid
import numpy as np
myState = 0
async def dbConnect():
sqlConn = await aiosql.connect('test.db')
await sqlConn.execute("""
CREATE TABLE IF NOT EXISTS tbl(
id TEXT NOT NULL,
side TEXT NOT NULL,
price NUMERIC NOT NULL,
parent_id TEXT NULL,
buy_ids TEXT NULL
)""")
return sqlConn
async def check():
return (random.uniform(0, 1) < 0.05)
async def fetchPrice():
return random.uniform(0, 1)
async def sell():
pass
async def getOpenBuyOrders(sqlConn):
cursor = await sqlConn.execute(
"""SELECT id, price, parent_id
FROM tbl
WHERE side='buy'
AND parent_id IS NULL""")
return await cursor.fetchall()
async def saveBuyOrder(sqlConn, price):
buyId = uuid.uuid1()
await sqlConn.execute(f"INSERT INTO tbl(id, price, side) VALUES (?, ?, 'buy')", (buyId.hex, price))
await sqlConn.commit()
async def saveSellOrder(sqlConn, price, buyIDs):
sellId = uuid.uuid1()
await sqlConn.execute(f"INSERT INTO tbl (id, price, buy_ids, side) VALUES (?, ?, ?, 'sell')", (sellId.hex, price, str(buyIDs)))
buyIDString = ','.join(['?']*len(buyIDs))
await sqlConn.execute(f"UPDATE tbl SET parent_id = ? WHERE id in (?)", (sellId.hex, buyIDString))
await sqlConn.commit()
async def strategyBuy(sqlConn, i):
lock = asyncio.Lock()
if(await check()):
currentPrice = await fetchPrice()
async with lock:
global myState
myState = myState + 1
openBuyOrders = await getOpenBuyOrders(sqlConn)
if(openBuyOrders):
avgPrice = np.mean([openBuyOrder[1]
for openBuyOrder in openBuyOrders])
if(avgPrice > (currentPrice * 1.05)):
await saveBuyOrder(sqlConn, currentPrice)
print(f"Bought1. Task: {i}, myState:{myState}")
else:
await saveBuyOrder(sqlConn, currentPrice)
print(f"Bought2. Task: {i}, myState:{myState}")
return None
async def strategySell(sqlConn, i):
lock = asyncio.Lock()
if(await check()):
currentPrice = await fetchPrice()
async with lock:
global myState
myState = myState + 1
openBuyOrders = await getOpenBuyOrders(sqlConn)
if(openBuyOrders):
avgPrice = np.mean([openBuyOrder[1]
for openBuyOrder in openBuyOrders])
if((avgPrice * 1.05) < currentPrice):
buyIDs = [openBuyOrder[0]
for openBuyOrder in openBuyOrders]
await saveSellOrder(sqlConn, currentPrice, buyIDs)
print(f"Sold. Task: {i}, myState:{myState}")
return None
async def main():
sqlConn = await dbConnect()
scheduler = AsyncIOScheduler()
for i in range(100):
scheduler.add_job(strategyBuy, 'interval', seconds=10, args=[sqlConn, i],
start_date='2000-01-01 00:00:00', timezone='UTC')
scheduler.add_job(strategySell, 'interval', seconds=10, args=[sqlConn, i],
start_date='2000-01-01 00:00:00', timezone='UTC')
scheduler.start()
if __name__ == "__main__":
loop = asyncio.new_event_loop()
task = loop.create_task(main())
loop.run_forever()
结果
Bought2. Task: 55, myState:15
Bought2. Task: 13, myState:15
Bought2. Task: 80, myState:15
Bought2. Task: 57, myState:15
Bought2. Task: 24, myState:15
Bought2. Task: 44, myState:15
Bought2. Task: 18, myState:15
Bought2. Task: 7, myState:15
Bought2. Task: 49, myState:15
Bought1. Task: 86, myState:19
Bought1. Task: 97, myState:19


