-
Notifications
You must be signed in to change notification settings - Fork 187
Expand file tree
/
Copy pathvalidate_brokerage_and_data_provider_yaml.py
More file actions
380 lines (330 loc) · 13.2 KB
/
validate_brokerage_and_data_provider_yaml.py
File metadata and controls
380 lines (330 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# This script scans the YAML file to find all the supported brokerages and data providers.
# It then tries to deploy an algorithm with each one.
# Deployments that omit some `required` properties are tested to ensure they fail.
# Deployments will all `required` properties are tested to ensure they return {'success': True}.
from base64 import b64encode
from hashlib import sha256
from time import time, sleep
import requests
import os
# Inputs:
USER_ID = os.getenv('QUANTCONNECT_USER_ID')
API_TOKEN = os.getenv('QUANTCONNECT_API_TOKEN')
BASE_URL = 'https://www.quantconnect.com/api/v2/'
YAML_URL = 'https://raw.githubusercontent.com/QuantConnect/Documentation/refs/heads/master/QuantConnect-Platform-2.0.0.yaml'
DEFAULT_BROKERAGE = {
'id': 'QuantConnectBrokerage'
}
DEFAULT_DATA_PROVIDERS = {
'QuantConnectBrokerage': {
'id': 'QuantConnectBrokerage'
}
}
DEFAULT_VALUE_BY_TYPE = {
'string': ' ',
'integer': 1,
'number': 1.0,
'boolean': True,
'array': []
}
VALUE_OVERRIDE_BY_PROPERTY = {
# For TT brokerage, cash is required and passing an empty
# list throws an error.
'cash': [{'amount': 100_000, 'currency': 'USD'}],
'ib-weekly-restart-utc-time': '12:00:00',
}
def get_headers():
# Get timestamp
timestamp = f'{int(time())}'
time_stamped_token = f'{API_TOKEN}:{timestamp}'.encode('utf-8')
# Get hased API token
hashed_token = sha256(time_stamped_token).hexdigest()
authentication = f'{USER_ID}:{hashed_token}'.encode('utf-8')
authentication = b64encode(authentication).decode('ascii')
# Create headers dictionary.
return {
'Authorization': f'Basic {authentication}',
'Timestamp': timestamp
}
def post(endpoint, payload):
if endpoint == '/live/create':
print(payload)
return requests.post(
f'{BASE_URL}{endpoint}', headers=get_headers(), json=payload
).json()
def prepare_live_payload():
# Create a project.
project_id = post(
'/projects/create', {'name': 'TEST Project', 'language': 'Py'}
)['projects'][0]['projectId']
# Compile it.
compile_id = post(
'/compile/create', {'projectId': project_id}
)['compileId']
wait_for_compile_to_complete(project_id, compile_id)
# Get a node Id.
nodes = post(
'/projects/nodes/read', {'projectId': project_id}
)['nodes']['live']
node_id = [n for n in nodes if not n['hasGpu'] and not n['busy']][-1]['id']
return project_id, compile_id, node_id
def wait_for_compile_to_complete(project_id, compile_id):
attempts = 0
while attempts < 10:
attempts += 1
response = post(
'/compile/read', {'projectId': project_id, 'compileId': compile_id}
)
if response['state'] != 'InQueue':
return # Done.
sleep(2)
assert False, "Compile job stuck in queue."
def create_live_algorithm(
project_id, compile_id, node_id, brokerage, data_providers):
return post(
'/live/create',
{
'projectId': project_id,
'compileId': compile_id,
'nodeId': node_id,
'versionId': '-1',
'brokerage': brokerage,
'dataProviders': data_providers
}
)
class Property:
def __init__(self, name):
self.name = name
self.enums = []
def set_type(self, type_):
self.type = type_
def add_enum(self, enum):
self.enums.append(enum)
# Define a class for each brokerage setting.
class Brokerage:
def __init__(self, schema_name):
self.schema_name = schema_name
self.properties = []
self.required_properties = []
def add_property(self, name):
self.properties.append(Property(name))
def set_property_type(self, type_):
self.properties[-1].set_type(type_)
def make_property_required(self, property_name):
self.required_properties.append(property_name)
def add_property_enum(self, enum):
self.properties[-1].add_enum(enum)
class DataProvider(Brokerage):
def __init__(self, key, schema_name):
self.key = key
self.schema_name = schema_name
self.properties = []
self.required_properties = []
def indents(line):
return len(line) - len(line.lstrip())
# Define a method to get the schema name.
# Example: convert this line
# - $ref: '#/components/schemas/BybitBrokerageAndDataProviderSettings'
# into 'BybitBrokerageAndDataProviderSettings'
def get_schema_name(line):
return line.split('/')[-1].rstrip()[:-1]
def inherit_properties(parent_model, child_model):
child_model.properties.extend(parent_model.properties)
child_model.required_properties.extend(parent_model.required_properties)
# Define a method to get all the brokerage and data providers the
# `/live/create` endpoint supports.
def get_supported_models(yaml):
# Get the list of brokerages and data providers that the
# CreateLiveAlgorithmRequest supports.
parsing_request = False
parsing_brokerage_list = False
parsing_data_providers_list = False
brokerages = []
data_providers = []
for line in yaml:
# When you hit the CreateLiveAlgorithmRequest schema, start
# processing it.
if line.strip() == 'CreateLiveAlgorithmRequest:':
parsing_request = True
continue
# If you haven't started processing the
# CreateLiveAlgorithmRequest schema yet, just continue to the
# next line.
if not parsing_request:
continue
# Get the number of leading white space in this line of the
# YAML.
line_indents = indents(line)
# When you hit the "brokerage" property of the
# CreateLiveAlgorithmRequest, start inspecting each line.
if 'brokerage:' in line:
parsing_brokerage_list = True
brokerage_indents = line_indents
continue
# If you are processing the "brokerage" property...
if parsing_brokerage_list:
# When the brokerages property closes, stop inspecting
# lines.
if line_indents <= brokerage_indents:
parsing_brokerage_list = False
# Otherwise, record each brokerage that the endpoint
# supports.
elif '$ref' in line:
schema_name = get_schema_name(line)
brokerages.append(Brokerage(schema_name))
# When you hit the "dataProviders" property of the
# CreateLiveAlgorithmRequest, start inspecting each line.
if 'dataProviders:' in line:
parsing_data_providers_list = True
data_provider_indents = line_indents
continue
# If you are processing the "dataProviders" property...
if parsing_data_providers_list:
# When the dataProviders property closes, stop inspecting
# lines.
if line_indents <= data_provider_indents:
parsing_data_providers_list = False
break
elif line_indents == data_provider_indents + 4:
key = line.strip()[:-1]
# Otherwise, record each data provider that the endpoint
# supports.
elif '$ref' in line:
schema_name = get_schema_name(line)
data_providers.append(DataProvider(key, schema_name))
return brokerages, data_providers
# Define a method to parse the properties of each brokerage and data
# provider.
def load_schema(model, yaml):
parsing_parent_schema = False
parsing_schema = False
parsing_required = False
parsing_properties = False
for line in yaml:
# When you hit the schema name, start processing it.
if line.strip() == f'{model.schema_name}:':
parsing_schema = True
schema_indents = indents(line)
continue
# If you haven't started processing the schema yet, just
# continue to the next line.
if not parsing_schema:
continue
# Get the number of leading white space in this line of the
# YAML.
line_indents = indents(line)
# When you reach the end of the schema definition, stop.
if line_indents <= schema_indents:
break
line = line.strip()
if line == 'allOf:':
parsing_parent_schema = True
continue
if line == 'required:':
parsing_required = True
continue
if line == 'properties:':
parsing_required = False
parsing_properties = True
property_indents = line_indents
continue
if parsing_parent_schema:
if '$ref:' in line:
parent_model = Brokerage(get_schema_name(line))
load_schema(parent_model, yaml)
inherit_properties(parent_model, model)
else:
parsing_parent_schema = False
continue
if parsing_required:
property_name = line.split(' ')[-1]
model.make_property_required(property_name)
if parsing_properties:
# When we reach the end of the properties, stop.
if line_indents <= property_indents:
parsing_enums = False
break
# If this line holds the name of a property...
if line_indents == property_indents + 2:
# Add the property to the model.
property_name = line[:-1]
model.add_property(property_name)
# Set the `parsing_enums` flag for this property.
parsing_enums = False
# If this line holds the type of a property...
elif line.split(' ')[0] == 'type:':
model.set_property_type(line.split(' ')[1])
elif line.split(' ')[0] == 'enum:':
parsing_enums = True
elif parsing_enums:
enum = line.split(' ')[-1]
model.add_property_enum(enum)
if __name__ == '__main__':
# Read the YAML file.
yaml = requests.get(YAML_URL).text.splitlines()
# Get the brokerage and data providers that the /live/create
# endpoint supports.
brokerages, data_providers = get_supported_models(yaml)
for model_type, models in zip(['brokerage', 'data provider'], [brokerages, data_providers]):
for j, model in enumerate(models):
schemas_to_skip = [
# Wolverine brokerage is currently under maintance.
'WolverineSettings',
# Need permission to use RBI brokerage
'RBIBrokerageSettings'
]
if model.schema_name in schemas_to_skip:
continue
print(f'{j+1}/{len(models)}:', model.schema_name)
project_id, compile_id, node_id = prepare_live_payload()
load_schema(model, yaml)
# Define a payload for the model settings using only the required
# properties.
minimal_model_payload = {}
for p in model.properties:
if p.name not in model.required_properties:
continue
if p.enums:
value = p.enums[0]
elif p.name in VALUE_OVERRIDE_BY_PROPERTY:
value = VALUE_OVERRIDE_BY_PROPERTY[p.name]
else:
value = DEFAULT_VALUE_BY_TYPE[p.type]
minimal_model_payload[p.name] = value
# Ensure the endpoint fails when we omit each of the required
# properties.
for p_name in model.required_properties:
model_payload = {
k: v
for k, v in minimal_model_payload.items() if k != p_name
}
print(f'> {model_type} payload:', model_payload)
if model_type == 'brokerage':
brokerage = model_payload
data_providers = DEFAULT_DATA_PROVIDERS
else:
brokerage = DEFAULT_BROKERAGE
data_providers = {model.key: model_payload}
response = create_live_algorithm(
project_id, compile_id, node_id, brokerage, data_providers
)
print('--> response:', response)
assert not response['success'], "Trim down required properties"
# Ensure the endpoint successed when we include all the
# required properties.
print(f'> {model_type} payload:', minimal_model_payload)
if model_type == 'brokerage':
brokerage = minimal_model_payload
data_providers = DEFAULT_DATA_PROVIDERS
else:
brokerage = DEFAULT_BROKERAGE
data_providers = {model.key: minimal_model_payload}
response = create_live_algorithm(
project_id, compile_id, node_id, brokerage, data_providers
)
print('--> response:', response, '\n')
assert response['success'], "Expand required properties"
# Stop the live algorithm.
post('/live/update/stop', {'projectId': project_id})
# Delete the project to clean up.
post('/projects/delete', {'projectId': project_id})