alexander-lazarin's picture
Initial commit
39cdc3e
import os
import time
import csv
import tempfile
from io import StringIO
from clickhouse_driver import Client
from clickhouse_driver.errors import Error as ClickHouseError
import gradio as gr
try:
from google.colab import userdata
DB_PASSWORD = userdata.get('FASHION_CH_PASS')
except:
DB_PASSWORD = os.environ['FASHION_CH_PASS']
class DBConnectionManager:
def __init__(self, max_connection_age=3600): # 1 hour max connection age
self.client = None
self.last_connection_time = None
self.max_connection_age = max_connection_age
def get_connection(self):
if self.client is None or self._should_refresh_connection():
self._create_connection()
return self.client
def _create_connection(self):
if self.client:
self.close_connection()
self.client = Client(
host='rc1d-a93v7vf0pjfr6e2o.mdb.yandexcloud.net',
port=9440,
user='user1',
password=DB_PASSWORD,
database='db1',
secure=True,
ca_certs='./RootCA.pem'
)
self.last_connection_time = time.time()
def _should_refresh_connection(self):
if self.last_connection_time is None:
return True
return (time.time() - self.last_connection_time) > self.max_connection_age
def execute_query(self, query, params=None):
max_retries = 3
for attempt in range(max_retries):
try:
connection = self.get_connection()
return connection.execute(query, params)
except ClickHouseError as e:
print(f"Database error on attempt {attempt + 1}: {e}")
if attempt < max_retries - 1:
print("Retrying...")
self._create_connection() # Force a new connection
else:
raise
def close_connection(self):
if self.client:
self.client.disconnect()
self.client = None
self.last_connection_time = None
db_manager = DBConnectionManager()
def get_category_a_options():
query = "SELECT DISTINCT category_a FROM datamart_lamoda_grownups_mc_4 ORDER BY category_a"
result = db_manager.execute_query(query)
return [row[0] for row in result]
def update_category_3(category_a):
print("entering update_category_a")
try:
query = """
SELECT DISTINCT category_3
FROM datamart_lamoda_grownups_mc_4
WHERE category_a in %(category_a)s
ORDER BY category_3
"""
result = db_manager.execute_query(query, {'category_a': category_a})
category_3_options = [row[0] for row in result]
return gr.CheckboxGroup(choices=category_3_options, label="Category 3", interactive=True)
except Exception as e:
print(f"Error updating category 3: {e}")
return gr.CheckboxGroup(choices=[], label="Category 3", interactive=True)
def update_category_4(category_a, category_3):
print("entering update_category_4")
try:
query = """
SELECT DISTINCT category_4
FROM datamart_lamoda_grownups_mc_4
WHERE 1=1
and category_a in %(category_a)s
and category_3 in %(category_3)s
ORDER BY category_4
"""
result = db_manager.execute_query(query, {'category_a': category_a, 'category_3': category_3})
category_4_options = [row[0] for row in result]
return gr.CheckboxGroup(choices=category_4_options, label="Category 4", interactive=True)
except Exception as e:
print(f"Error updating category 4: {e}")
return gr.CheckboxGroup(choices=[], label="Category 4", interactive=True)
def generate_csv_report(category_a, category_3, category_4):
query = """
WITH
sku_prices AS (
SELECT
id_product_money,
category_a,
category_3,
category_4,
avg(avg_price) AS sku_avg_price
FROM datamart_lamoda_grownups_mc_4
WHERE price_tier in ('Medium', 'High', 'Premium')
AND category_a IN %(category_a)s
AND category_3 IN %(category_3)s
AND category_4 IN %(category_4)s
GROUP BY all
),
median_prices AS (
SELECT
category_a,
category_3,
category_4,
median(sku_avg_price) AS median_price
FROM sku_prices
GROUP BY all
),
main AS(
SELECT
d.category_a AS category_a,
d.category_3 AS category_3,
d.category_4 AS category_4,
500*round(0.2*median_price/500) AS step,
step*round(sku_avg_price/step) AS price_group,
turnover,
total_sales,
d.id_product_money AS id_product_money,
avg_price,
season,
brand_rank,
stock_increase,
initial_stock,
days_in_stock,
total_days_in_period,
median_price,
brand_name,
is_first_week,
week_start_date
FROM datamart_lamoda_grownups_mc_4 d
LEFT JOIN median_prices m
USING (category_a, category_3, category_4)
LEFT JOIN sku_prices s
ON d.id_product_money = s.id_product_money
WHERE price_tier in ('Medium', 'High', 'Premium')
AND d.category_a IN %(category_a)s
AND d.category_3 IN %(category_3)s
AND d.category_4 IN %(category_4)s
)
SELECT
year(week_start_date)::text AS YEAR,
min(week_start_date)::text AS min_week_start_date,
max(week_start_date)::text AS max_week_start_date,
category_a,
category_3,
category_4,
brand_name,
price_group,
sum(turnover) as sum_turnover,
sum(total_sales) as count_sales,
count(distinct id_product_money) as count_sku,
round(avg(avg_price)) as avg_price1,
round(sum_turnover/IF(count_sales=0,NULL,count_sales)) as avg_price2,
uniqExactIf(id_product_money, season = 'AW 23') as count_sku_aw_23,
uniqExactIf(id_product_money, season = 'SS 24') as count_sku_ss_24,
sumIf(turnover, season = 'AW 23') as sum_turnover_aw_23,
sumIf(turnover, season = 'SS 24') as sum_turnover_ss_24,
round(sum_turnover/IF(count_sku=0,NULL,count_sku)) as turnover_per_sku,
round(sum_turnover_aw_23/IF(count_sku_aw_23=0,NULL,count_sku_aw_23)) as turnover_per_sku_aw_23,
round(sum_turnover_ss_24/IF(count_sku_ss_24=0,NULL,count_sku_ss_24)) as turnover_per_sku_ss_24,
round(1.0*sumIf(turnover, brand_rank <= 10)/IF(sum_turnover=0,NULL,sum_turnover), 3) as top10_turnover_share,
round(1.0*sumIf(turnover, brand_name = 'BLCV')/IF(sum_turnover=0,NULL,sum_turnover), 3) as blcv_turnover_share,
round(1.0*count_sales/IF((sum(stock_increase) + max(initial_stock))=0,NULL,(sum(stock_increase) + sumIf(initial_stock, is_first_week=1))), 3) as sales_through_rate,
round(1.0*uniqExactIf(id_product_money, total_sales > 0)/IF(count_sku=0,NULL,count_sku), 3) as sold_sku_share,
round(1.0*sum(days_in_stock)/IF(sum(total_days_in_period)=0,NULL,sum(total_days_in_period)), 3) as availability_index,
max(median_price) AS median_price
FROM main
GROUP BY all
ORDER BY all
"""
params = {
'category_a': category_a,
'category_3': category_3,
'category_4': category_4
}
try:
print('trying to run the query')
result = db_manager.execute_query(query, params)
# Create a CSV string
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.csv', encoding='utf-8') as temp_file:
csv_writer = csv.writer(temp_file)
# Write header
header = [
"YEAR", "min_week_start_date", "max_week_start_date", "category_a", "category_3", "category_4",
"brand_name", "price_group", "sum_turnover", "count_sales", "count_sku", "avg_price1",
"avg_price2", "count_sku_aw_23", "count_sku_ss_24", "sum_turnover_aw_23", "sum_turnover_ss_24",
"turnover_per_sku", "turnover_per_sku_aw_23", "turnover_per_sku_ss_24", "top10_turnover_share",
"blcv_turnover_share", "sales_through_rate", "sold_sku_share", "availability_index", "median_price"
]
csv_writer.writerow(header)
# Write data rows
csv_writer.writerows(result)
return temp_file.name
except Exception as e:
print(f"Error generating CSV report: {e}")
return None
def download_csv(category_a, category_3, category_4):
csv_content = generate_csv_report(category_a, category_3, category_4)
if csv_content:
return csv_content
else:
raise gr.Error("Error generating CSV report. Please try again.")
def interface():
with gr.Blocks() as demo:
category_a_options = get_category_a_options()
category_a = gr.CheckboxGroup(choices=category_a_options, label="Category A")
category_3 = gr.CheckboxGroup(choices=[], label="Category 3", interactive=True)
category_4 = gr.CheckboxGroup(choices=[], label="Category 4", interactive=True)
download_button = gr.Button("Download CSV Report")
csv_output = gr.File(label="CSV Report")
category_a.change(
fn=update_category_3,
inputs=[category_a],
outputs=[category_3]
)
category_3.change(
fn=update_category_4,
inputs=[category_a, category_3],
outputs=[category_4]
)
download_button.click(
fn=download_csv,
inputs=[category_a, category_3, category_4],
outputs=[csv_output]
)
return demo
def cleanup_temp_files():
temp_dir = tempfile.gettempdir()
for filename in os.listdir(temp_dir):
if filename.endswith('.csv'):
file_path = os.path.join(temp_dir, filename)
try:
os.remove(file_path)
except Exception as e:
print(f"Error deleting temporary file {file_path}: {e}")
if __name__ == "__main__":
demo = interface()
demo.launch(debug=True)
db_manager.close_connection() # Close the connection when the app exits
cleanup_temp_files()