Spaces:
Runtime error
Runtime error
File size: 10,477 Bytes
39cdc3e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
import os
import time
import csv
import tempfile
from io import StringIO
from clickhouse_driver import Client
from clickhouse_driver.errors import Error as ClickHouseError
import gradio as gr
try:
from google.colab import userdata
DB_PASSWORD = userdata.get('FASHION_CH_PASS')
except:
DB_PASSWORD = os.environ['FASHION_CH_PASS']
class DBConnectionManager:
def __init__(self, max_connection_age=3600): # 1 hour max connection age
self.client = None
self.last_connection_time = None
self.max_connection_age = max_connection_age
def get_connection(self):
if self.client is None or self._should_refresh_connection():
self._create_connection()
return self.client
def _create_connection(self):
if self.client:
self.close_connection()
self.client = Client(
host='rc1d-a93v7vf0pjfr6e2o.mdb.yandexcloud.net',
port=9440,
user='user1',
password=DB_PASSWORD,
database='db1',
secure=True,
ca_certs='./RootCA.pem'
)
self.last_connection_time = time.time()
def _should_refresh_connection(self):
if self.last_connection_time is None:
return True
return (time.time() - self.last_connection_time) > self.max_connection_age
def execute_query(self, query, params=None):
max_retries = 3
for attempt in range(max_retries):
try:
connection = self.get_connection()
return connection.execute(query, params)
except ClickHouseError as e:
print(f"Database error on attempt {attempt + 1}: {e}")
if attempt < max_retries - 1:
print("Retrying...")
self._create_connection() # Force a new connection
else:
raise
def close_connection(self):
if self.client:
self.client.disconnect()
self.client = None
self.last_connection_time = None
db_manager = DBConnectionManager()
def get_category_a_options():
query = "SELECT DISTINCT category_a FROM datamart_lamoda_grownups_mc_4 ORDER BY category_a"
result = db_manager.execute_query(query)
return [row[0] for row in result]
def update_category_3(category_a):
print("entering update_category_a")
try:
query = """
SELECT DISTINCT category_3
FROM datamart_lamoda_grownups_mc_4
WHERE category_a in %(category_a)s
ORDER BY category_3
"""
result = db_manager.execute_query(query, {'category_a': category_a})
category_3_options = [row[0] for row in result]
return gr.CheckboxGroup(choices=category_3_options, label="Category 3", interactive=True)
except Exception as e:
print(f"Error updating category 3: {e}")
return gr.CheckboxGroup(choices=[], label="Category 3", interactive=True)
def update_category_4(category_a, category_3):
print("entering update_category_4")
try:
query = """
SELECT DISTINCT category_4
FROM datamart_lamoda_grownups_mc_4
WHERE 1=1
and category_a in %(category_a)s
and category_3 in %(category_3)s
ORDER BY category_4
"""
result = db_manager.execute_query(query, {'category_a': category_a, 'category_3': category_3})
category_4_options = [row[0] for row in result]
return gr.CheckboxGroup(choices=category_4_options, label="Category 4", interactive=True)
except Exception as e:
print(f"Error updating category 4: {e}")
return gr.CheckboxGroup(choices=[], label="Category 4", interactive=True)
def generate_csv_report(category_a, category_3, category_4):
query = """
WITH
sku_prices AS (
SELECT
id_product_money,
category_a,
category_3,
category_4,
avg(avg_price) AS sku_avg_price
FROM datamart_lamoda_grownups_mc_4
WHERE price_tier in ('Medium', 'High', 'Premium')
AND category_a IN %(category_a)s
AND category_3 IN %(category_3)s
AND category_4 IN %(category_4)s
GROUP BY all
),
median_prices AS (
SELECT
category_a,
category_3,
category_4,
median(sku_avg_price) AS median_price
FROM sku_prices
GROUP BY all
),
main AS(
SELECT
d.category_a AS category_a,
d.category_3 AS category_3,
d.category_4 AS category_4,
500*round(0.2*median_price/500) AS step,
step*round(sku_avg_price/step) AS price_group,
turnover,
total_sales,
d.id_product_money AS id_product_money,
avg_price,
season,
brand_rank,
stock_increase,
initial_stock,
days_in_stock,
total_days_in_period,
median_price,
brand_name,
is_first_week,
week_start_date
FROM datamart_lamoda_grownups_mc_4 d
LEFT JOIN median_prices m
USING (category_a, category_3, category_4)
LEFT JOIN sku_prices s
ON d.id_product_money = s.id_product_money
WHERE price_tier in ('Medium', 'High', 'Premium')
AND d.category_a IN %(category_a)s
AND d.category_3 IN %(category_3)s
AND d.category_4 IN %(category_4)s
)
SELECT
year(week_start_date)::text AS YEAR,
min(week_start_date)::text AS min_week_start_date,
max(week_start_date)::text AS max_week_start_date,
category_a,
category_3,
category_4,
brand_name,
price_group,
sum(turnover) as sum_turnover,
sum(total_sales) as count_sales,
count(distinct id_product_money) as count_sku,
round(avg(avg_price)) as avg_price1,
round(sum_turnover/IF(count_sales=0,NULL,count_sales)) as avg_price2,
uniqExactIf(id_product_money, season = 'AW 23') as count_sku_aw_23,
uniqExactIf(id_product_money, season = 'SS 24') as count_sku_ss_24,
sumIf(turnover, season = 'AW 23') as sum_turnover_aw_23,
sumIf(turnover, season = 'SS 24') as sum_turnover_ss_24,
round(sum_turnover/IF(count_sku=0,NULL,count_sku)) as turnover_per_sku,
round(sum_turnover_aw_23/IF(count_sku_aw_23=0,NULL,count_sku_aw_23)) as turnover_per_sku_aw_23,
round(sum_turnover_ss_24/IF(count_sku_ss_24=0,NULL,count_sku_ss_24)) as turnover_per_sku_ss_24,
round(1.0*sumIf(turnover, brand_rank <= 10)/IF(sum_turnover=0,NULL,sum_turnover), 3) as top10_turnover_share,
round(1.0*sumIf(turnover, brand_name = 'BLCV')/IF(sum_turnover=0,NULL,sum_turnover), 3) as blcv_turnover_share,
round(1.0*count_sales/IF((sum(stock_increase) + max(initial_stock))=0,NULL,(sum(stock_increase) + sumIf(initial_stock, is_first_week=1))), 3) as sales_through_rate,
round(1.0*uniqExactIf(id_product_money, total_sales > 0)/IF(count_sku=0,NULL,count_sku), 3) as sold_sku_share,
round(1.0*sum(days_in_stock)/IF(sum(total_days_in_period)=0,NULL,sum(total_days_in_period)), 3) as availability_index,
max(median_price) AS median_price
FROM main
GROUP BY all
ORDER BY all
"""
params = {
'category_a': category_a,
'category_3': category_3,
'category_4': category_4
}
try:
print('trying to run the query')
result = db_manager.execute_query(query, params)
# Create a CSV string
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.csv', encoding='utf-8') as temp_file:
csv_writer = csv.writer(temp_file)
# Write header
header = [
"YEAR", "min_week_start_date", "max_week_start_date", "category_a", "category_3", "category_4",
"brand_name", "price_group", "sum_turnover", "count_sales", "count_sku", "avg_price1",
"avg_price2", "count_sku_aw_23", "count_sku_ss_24", "sum_turnover_aw_23", "sum_turnover_ss_24",
"turnover_per_sku", "turnover_per_sku_aw_23", "turnover_per_sku_ss_24", "top10_turnover_share",
"blcv_turnover_share", "sales_through_rate", "sold_sku_share", "availability_index", "median_price"
]
csv_writer.writerow(header)
# Write data rows
csv_writer.writerows(result)
return temp_file.name
except Exception as e:
print(f"Error generating CSV report: {e}")
return None
def download_csv(category_a, category_3, category_4):
csv_content = generate_csv_report(category_a, category_3, category_4)
if csv_content:
return csv_content
else:
raise gr.Error("Error generating CSV report. Please try again.")
def interface():
with gr.Blocks() as demo:
category_a_options = get_category_a_options()
category_a = gr.CheckboxGroup(choices=category_a_options, label="Category A")
category_3 = gr.CheckboxGroup(choices=[], label="Category 3", interactive=True)
category_4 = gr.CheckboxGroup(choices=[], label="Category 4", interactive=True)
download_button = gr.Button("Download CSV Report")
csv_output = gr.File(label="CSV Report")
category_a.change(
fn=update_category_3,
inputs=[category_a],
outputs=[category_3]
)
category_3.change(
fn=update_category_4,
inputs=[category_a, category_3],
outputs=[category_4]
)
download_button.click(
fn=download_csv,
inputs=[category_a, category_3, category_4],
outputs=[csv_output]
)
return demo
def cleanup_temp_files():
temp_dir = tempfile.gettempdir()
for filename in os.listdir(temp_dir):
if filename.endswith('.csv'):
file_path = os.path.join(temp_dir, filename)
try:
os.remove(file_path)
except Exception as e:
print(f"Error deleting temporary file {file_path}: {e}")
if __name__ == "__main__":
demo = interface()
demo.launch(debug=True)
db_manager.close_connection() # Close the connection when the app exits
cleanup_temp_files() |