generated from dhairya/scenario_template
126 lines
4.0 KiB
Python
126 lines
4.0 KiB
Python
#!/usr/bin/env python
|
|
# coding: utf-8
|
|
|
|
# In[1]:
|
|
|
|
|
|
from datetime import datetime
|
|
import pandas as pd
|
|
from tms_data_interface import SQLQueryInterface
|
|
|
|
# SQL query to aggregate trade data and compute metrics using ROWS with optimizations
|
|
query_template = """
|
|
WITH trade_data AS (
|
|
SELECT
|
|
trader_id,
|
|
date_time,
|
|
trade_price,
|
|
trade_volume,
|
|
-- Create a time window for each trade by subtracting time_window_s seconds
|
|
date_add('second', -{time_window_s}, date_time) AS window_start,
|
|
date_time AS window_end,
|
|
trade_side
|
|
FROM {trade_10m_v3}
|
|
WHERE date_time BETWEEN date_add('day', -1, current_date) AND current_date -- Limit to the last 1 day of data
|
|
LIMIT 10000 -- Process only a subset of records for testing
|
|
),
|
|
|
|
aggregated_trades AS (
|
|
SELECT
|
|
td.trader_id,
|
|
td.window_start,
|
|
td.window_end,
|
|
SUM(CASE WHEN td.trade_side = 'buy' THEN td.trade_volume ELSE 0 END)
|
|
OVER (PARTITION BY td.trader_id ORDER BY td.date_time
|
|
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS buy_volume,
|
|
SUM(CASE WHEN td.trade_side = 'sell' THEN td.trade_volume ELSE 0 END)
|
|
OVER (PARTITION BY td.trader_id ORDER BY td.date_time
|
|
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sell_volume,
|
|
SUM(td.trade_volume) OVER (ORDER BY td.date_time
|
|
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS total_volume,
|
|
MAX(td.trade_price) OVER (ORDER BY td.date_time
|
|
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS highest_price,
|
|
MIN(td.trade_price) OVER (ORDER BY td.date_time
|
|
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS lowest_price,
|
|
COUNT(*) OVER (PARTITION BY td.trader_id ORDER BY td.date_time
|
|
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS number_of_trades
|
|
FROM trade_data td
|
|
)
|
|
|
|
SELECT
|
|
window_start AS start_time,
|
|
window_end AS end_time,
|
|
trader_id AS "Participant",
|
|
lowest_price AS min_price,
|
|
highest_price AS max_price,
|
|
(highest_price - lowest_price) / NULLIF(lowest_price, 0) * 100 AS "Price Change (%)",
|
|
buy_volume AS participant_volume,
|
|
total_volume,
|
|
(buy_volume / NULLIF(total_volume, 0)) * 100 AS "Volume (%)"
|
|
FROM aggregated_trades
|
|
WHERE buy_volume > 0 OR sell_volume > 0
|
|
limit 1000
|
|
"""
|
|
|
|
class Scenario:
|
|
seq = SQLQueryInterface(schema="trade_schema")
|
|
|
|
def logic(self, **kwargs):
|
|
validation_window = kwargs.get('validation_window')
|
|
time_window_s = int(validation_window / 1000) # Convert milliseconds to seconds
|
|
|
|
query_start_time = datetime.now()
|
|
print("Query start time:", query_start_time)
|
|
|
|
# Execute the optimized query using a time window and limit
|
|
row_list = self.seq.execute_raw(query_template.format(
|
|
trade_10m_v3="trade_10m_v3",
|
|
time_window_s=time_window_s
|
|
))
|
|
|
|
# Define the columns for the resulting DataFrame
|
|
cols = [
|
|
'START_DATE_TIME',
|
|
'END_DATE_TIME',
|
|
'FOCAL_ID',
|
|
'MIN_PRICE',
|
|
'MAX_PRICE',
|
|
'PRICE_CHANGE (%)',
|
|
'PARTICIPANT_VOLUME',
|
|
'TOTAL_VOLUME',
|
|
'VOLUME (%)',
|
|
]
|
|
|
|
# Create DataFrame from query results
|
|
final_scenario_df = pd.DataFrame(row_list, columns=cols)
|
|
|
|
# Calculate the participant's volume percentage
|
|
final_scenario_df['PARTICIPANT_VOLUME_PCT'] = final_scenario_df['PARTICIPANT_VOLUME'] / \
|
|
final_scenario_df['TOTAL_VOLUME'] * 100
|
|
|
|
# Add additional columns to the DataFrame
|
|
final_scenario_df['Segment'] = 'Default'
|
|
final_scenario_df['SAR_FLAG'] = 'N'
|
|
final_scenario_df['Risk'] = 'Low Risk'
|
|
|
|
print("Query end time:", datetime.now())
|
|
return final_scenario_df
|
|
|
|
|
|
|
|
|
|
|
|
# In[3]:
|
|
|
|
|
|
# Instantiate and execute logic
|
|
scenario = Scenario()
|
|
scenario.logic(validation_window=100000)
|
|
|
|
|
|
# In[ ]:
|
|
|
|
|
|
|
|
|