Skip to content

Commit dcd05ad

Browse files
committed
Fixed Reflection bug
1 parent eb8ba72 commit dcd05ad

File tree

3 files changed

+64
-20
lines changed

3 files changed

+64
-20
lines changed

classes/transports/timescaledb.py

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,13 @@ class timescaledb(transport_base):
273273
schema_needs_refresh: bool = True # flag to indicate if ORM schema refresh is needed after reconnect or column changes
274274
current_metric_count: int = 0
275275

276+
# pushover notification settings for stale data alerts.
277+
# These are optional and can be configured by the user if they want
278+
# to receive pushover notifications when stale data is detected.
279+
enable_pushover: bool = False
280+
pushover_token: str = ""
281+
pushover_user: str = ""
282+
276283
def __init__(self, settings: SectionProxy) -> None:
277284
"""
278285
Initialize the TimescaleDB transport bridge.
@@ -1085,11 +1092,25 @@ def _validate_wide_row(self, row: dict) -> bool:
10851092

10861093
# metadata keys like m_time and device_info_id are excluded in _cache_wide_table_columns
10871094
extra_keys: set = set(row) - self._wide_columns
1095+
fewer_keys: set = self._wide_columns - set(row)
10881096

10891097
if extra_keys:
1090-
self._log.error( f"Wide-table schema mismatch; unknown columns: {sorted(extra_keys)}" )
1091-
msg: str= f"Unknown columns: {sorted(extra_keys)}"
1092-
raise ValueError(msg)
1098+
self._log.info(f"New metrics detected: {extra_keys}. Triggering resync...")
1099+
1100+
# 1. Trigger the sync logic to update the table schema and internal column cache.
1101+
self._sync_single_table_schema()
1102+
1103+
# 2. Re-check: Did the resync actually add the keys?
1104+
# (In case the DB hasn't been updated yet)
1105+
still_extra = set(row) - self._wide_columns - {"m_time", "device_info_id"}
1106+
if still_extra:
1107+
msg = f"Database schema is still missing columns after resync: {sorted(still_extra)}"
1108+
self._log.error(msg)
1109+
raise ValueError(msg)
1110+
elif fewer_keys:
1111+
self._log.warning( f"Wide-table schema mismatch; missing keys in scrape data: {sorted(fewer_keys)}" )
1112+
msg: str= f"Missing columns: {sorted(fewer_keys)}"
1113+
10931114
else:
10941115
return True
10951116

@@ -1111,7 +1132,7 @@ def _sync_single_table_schema(self) -> None:
11111132
else:
11121133
self._log.warning(f"No existing table found for {table_name} during resync. Continuing with new reflection.")
11131134

1114-
# 2. Reflect the NEW structure from the database
1135+
# 2. Reflect the new structure from the database
11151136
new_table = Table(
11161137
table_name,
11171138
Base.metadata,
@@ -1263,14 +1284,15 @@ def _flush_worker(self) -> None:
12631284
self._flush_batch_narrow(narrow_data, device_info_id, timestamp, session)
12641285
if self.wide_table_flag:
12651286
if valid_row:
1266-
stmt: Insert = pg_insert(DeviceMetricsWide).values(**wide_data)
1287+
target_table: Table = Base.metadata.tables[DeviceMetricsWide.__tablename__]
1288+
stmt: Insert = pg_insert(target_table).values(**wide_data)
12671289
session.execute(stmt)
12681290

12691291
self._commit_transport_state(transport_name, metrics_only, timestamp, is_stale=False)
12701292
self._log.debug(f"data write complete from [{transport_name}] to timescaledb bridge")
12711293
except (SQLAlchemyError, ValueError) as e1:
12721294
session.rollback()
1273-
self._log.warning(f"metrics data write failed.{e1}")
1295+
self._log.error(f"metrics data write failed.{e1}")
12741296

12751297
# Only backlog if setting enabled and DB is down
12761298
with self._reconnect_lock:
@@ -2618,6 +2640,7 @@ def _get_dynamic_settings(self) -> dict:
26182640

26192641
return self.performance_tiers["tier_low"] # Fallback default
26202642

2643+
26212644
def _view_exists(self, session: Session, view_name: str) -> bool:
26222645
"""Check to see if a continuous aggregate exists in the catalog.
26232646
Parameters:
@@ -2626,9 +2649,20 @@ def _view_exists(self, session: Session, view_name: str) -> bool:
26262649
Returns:
26272650
bool: True if the view exists, False otherwise.
26282651
"""
2629-
check_sql: TextClause = text("SELECT 1 FROM timescaledb_information.continuous_aggregates WHERE view_name = :name")
2630-
return session.execute(check_sql, {"name": view_name}).fetchone() is not None
2652+
check_sql = text("SELECT 1 FROM timescaledb_information.continuous_aggregates WHERE view_name = :name")
26312653

2654+
try:
2655+
# Keep ONLY the risky operation in the try block
2656+
result = session.execute(check_sql, {"name": view_name}).fetchone()
2657+
except Exception as e:
2658+
# Handle the error and clean up
2659+
self._log.error(f"Error checking view existence for {view_name}: {e}")
2660+
session.rollback()
2661+
return False
2662+
else:
2663+
# The 'else' block runs ONLY if the try block succeeded
2664+
session.rollback() # Resets state for your next AUTOCOMMIT call
2665+
return result is not None
26322666

26332667
def _drop_all_continuous_aggregates(self, session: Session) -> None:
26342668
"""
@@ -2776,7 +2810,7 @@ def _refresh_single_rollup(self, session: Session, view_name: str, start_offset:
27762810
self._log.info(f"Starting {mode} refresh for {view_name}...")
27772811

27782812
# AUTOCOMMIT is mandatory for CALL refresh_continuous_aggregate
2779-
with session.connection().execution_options(isolation_level="AUTOCOMMIT") as conn:
2813+
with self.engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
27802814
conn.execute(text(f"SET LOCAL work_mem = '{r_settings['work_mem']}';"))
27812815
try:
27822816
if force_full:

classes/transports/transport_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class transport_base:
8080
on_message : Callable[["transport_base", registry_map_entry, str], None] = None
8181
''' callback, on message received '''
8282

83-
request_upstream_reconnect: Callable[[], None] | None = None # callback for reconnect.
83+
request_upstream_reconnect: Callable[[str], None] | None = None # callback for reconnect.
8484

8585
_log : logging.Logger = None
8686

protocol_gateway.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class Protocol_Gateway:
126126
_logging_initialized = False
127127

128128
@classmethod
129-
def _setup_logging(cls, cfg) -> None:
129+
def _setup_logging(cls, cfg: ConfigParser) -> None:
130130
"""
131131
created to eliminate multi gig log file sizes in docker as logging.StreamHandler(sys.stdout) results in a
132132
json file that can quickly grow quiet large.
@@ -138,7 +138,7 @@ def _setup_logging(cls, cfg) -> None:
138138
return
139139

140140
# Read logging config
141-
level_name: str = cfg.get("logging", "level", fallback="INFO").upper()
141+
level_name: str = cfg.get("logging", "level", fallback="INFO").strip().upper()
142142
level: int = getattr(logging, level_name, logging.INFO)
143143

144144
log_dir = Path(cfg.get("logging", "log_dir", fallback="logs"))
@@ -190,7 +190,7 @@ def _setup_logging(cls, cfg) -> None:
190190
handler.setFormatter(formatter)
191191

192192
# ---- Root logger wiring ----
193-
root = logging.getLogger()
193+
root: logging.Logger = logging.getLogger()
194194
root.setLevel(level)
195195
root.handlers.clear()
196196
root.addHandler(handler)
@@ -203,7 +203,7 @@ def _setup_logging(cls, cfg) -> None:
203203

204204
cls._logging_initialized = True
205205

206-
__log = None
206+
__log : logging.Logger
207207
# log level, available log levels are CRITICAL, FATAL, ERROR, WARNING, INFO, DEBUG, EXCEPTION
208208
__log_level = "DEBUG"
209209

@@ -213,12 +213,12 @@ def _setup_logging(cls, cfg) -> None:
213213
__transports : list[transport_base] = []
214214
''' transport_base is for type hinting. this can be any transport'''
215215

216-
config_file : str
216+
config_file : Path
217217

218218
# Simple read completion tracking
219219
__read_completion_tracker : dict[str, bool] = {}
220220
''' Track which transports have completed their current read cycle '''
221-
__read_tracker_lock : threading.Lock = None
221+
__read_tracker_lock : threading.Lock
222222

223223
# Concurrency control
224224
__enable_concurrency : bool = False
@@ -245,14 +245,15 @@ def __init__(self, config_file : str):
245245
if alternate_cfg.is_file():
246246
self.config_file = alternate_cfg
247247
else:
248+
self.__log.warning(f"Config file not found {alternate_cfg}, using default: {default_cfg}")
248249
self.config_file = default_cfg
249250

250251
#pymodbus_log = logging.getLogger('pymodbus')
251252
#pymodbus_log.setLevel(logging.DEBUG)
252253
#pymodbus_log.addHandler(handler)
253254

254255
self.__settings = CustomConfigParser()
255-
self.__settings.read(self.config_file)
256+
self.__settings.read(self.config_file.as_posix())
256257

257258
self._setup_logging(self.__settings)
258259
self.__log: logging.Logger = logging.getLogger(__name__)
@@ -261,15 +262,17 @@ def __init__(self, config_file : str):
261262
self.__log_level = self.__settings.get("general","log_level", fallback="INFO")
262263

263264
# Read concurrency setting - default to sequential (disabled) for better stability
264-
self.__enable_concurrency = self.__settings.getboolean("general", "enable_concurrency", fallback=False)
265+
self.__enable_concurrency = bool(self.__settings.getboolean("general", "enable_concurrency", fallback=False))
265266
self.__log.info(f"Concurrency mode: {'Concurrent' if self.__enable_concurrency else 'Sequential'}")
266267

267268
# Read sequential delay setting
268-
self.__sequential_delay = self.__settings.getfloat("general", "sequential_delay", fallback=1.0)
269+
self.__sequential_delay = float(
270+
self.__settings.getfloat("general", "sequential_delay", fallback=1.0) or 1.0
271+
)
269272
if not self.__enable_concurrency:
270273
self.__log.info(f"Sequential delay between transports: {self.__sequential_delay} seconds")
271274

272-
log_level = getattr(logging, self.__log_level, logging.INFO)
275+
log_level = getattr(logging, str(self.__log_level), logging.INFO)
273276
self.__log.setLevel(log_level)
274277
self.__log.info("Loading...")
275278

@@ -362,6 +365,13 @@ def _process_transport_read(self, transport) -> None:
362365
except Exception as err:
363366
self.__log.exception(f"Error processing transport {transport.transport_name} and {err}")
364367
# traceback.print_exc()
368+
# Errno 104 - Connection reset by peer (common for MQTT disconnects)
369+
# Errno 32 - Broken pipe (common for MQTT disconnects)
370+
# Errno 110 - Connection timed out (common for network issues)
371+
if err == 'Errno 104' or err == 'Errno 32' or err == 'Errno 110':
372+
# traceback.print_exc()
373+
transport.connect()
374+
self.__log.warning(f"Attempting reconnect for {transport.transport_name}")
365375
self._mark_read_complete(transport)
366376

367377
def _mark_read_complete(self, transport) -> None:

0 commit comments

Comments
 (0)