-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathmaster_contract.py
More file actions
90 lines (76 loc) · 3.69 KB
/
master_contract.py
File metadata and controls
90 lines (76 loc) · 3.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import os
import requests
import json
from models import Instrument
from extensions import db
from datetime import datetime
from sqlalchemy import Index, inspect
def index_exists(engine, index_name):
"""Check if an index exists in the database."""
inspector = inspect(engine)
for idx in inspector.get_indexes(Instrument.__tablename__):
if idx['name'] == index_name:
return True
return False
def download_and_store_json(app):
tmp_folder = "/tmp"
json_file_path = os.path.join(tmp_folder, "OpenAPIScripMaster.json")
with app.app_context():
try:
print("Master Contract Download Started")
# 1. Download JSON to the local tmp folder
url = "https://margincalculator.angelbroking.com/OpenAPI_File/files/OpenAPIScripMaster.json"
response = requests.get(url)
# Save the JSON locally
with open(json_file_path, 'w') as json_file:
json.dump(response.json(), json_file)
# Load data from the JSON file
with open(json_file_path, 'r') as json_file:
instruments_data = json.load(json_file)
# 3. Clear existing data (drop and recreate the Instrument table only)
Instrument.__table__.drop(db.engine)
Instrument.__table__.create(db.engine)
# 2. Prepare bulk insert with `strike` and `tick_size` divided by 100
filtered_instruments_data = [
instrument for instrument in instruments_data
if (instrument['exch_seg'] != 'NSE' or # Keep non-NSE instruments as they are
(('-EQ' in instrument['symbol'] or '-BE' in instrument['symbol']) or
instrument['instrumenttype'] == 'AMXIDX'))
]
instrument_objects = [
Instrument(
token=instrument['token'],
symbol=instrument['symbol'],
name=instrument['name'],
expiry=instrument['expiry'],
strike=float(instrument['strike']) / 100,
lotsize=int(instrument['lotsize']),
instrumenttype=instrument['instrumenttype'],
exch_seg=instrument['exch_seg'],
tick_size=float(instrument['tick_size']) / 10000000 if instrument['exch_seg'] == 'CDS' else float(instrument['tick_size']) / 100
) for instrument in filtered_instruments_data
]
# Bulk insert all filtered instruments at once
db.session.bulk_save_objects(instrument_objects)
db.session.commit()
# 4. Create indexes on `symbol`, `token`, `expiry`, and `exch_seg` if they don't exist
index_mappings = [
('idx_symbol', Instrument.symbol),
('idx_token', Instrument.token),
('idx_expiry', Instrument.expiry),
('idx_exch_seg', Instrument.exch_seg)
]
for index_name, column in index_mappings:
if not index_exists(db.engine, index_name):
index = Index(index_name, column)
index.create(db.engine)
print(f"Filtered bulk data successfully downloaded and stored with indexes at {datetime.now()}")
# 5. Delete the JSON file after successful bulk upload
if os.path.exists(json_file_path):
os.remove(json_file_path)
print(f"Temporary JSON file deleted: {json_file_path}")
except Exception as e:
print(f"Error occurred: {e}")
# Clean up in case of error
if os.path.exists(json_file_path):
os.remove(json_file_path)