@@ -62,6 +62,26 @@ def request_stop(self):
6262 if self ._t :
6363 self ._t = None
6464
65+ def _process_data_from_queue (self ) -> None :
66+ """Process data from queue."""
67+ try :
68+ data , timestamp = self ._queue .get (True , self ._timeout_for_get_from_queue )
69+ try :
70+ self .logger .log (level = TRACE , msg = f'notifying { self ._observer } ({ repr (data )} )' )
71+ except ReferenceError :
72+ self ._request_end .set () # self._observer is no more valid.
73+ try :
74+ if self ._observer_self :
75+ self ._observer (self ._observer_self , data , timestamp )
76+ else :
77+ self ._observer (data , timestamp )
78+ except ReferenceError :
79+ self ._request_end .set () # self._observer is no more valid.
80+ except Exception as ex :
81+ self ._handle_unexpected_error_from_observer (exception = ex , data = data , timestamp = timestamp )
82+ except queue .Empty :
83+ pass # No incoming data within self._timeout_for_get_from_queue
84+
6585 @tracked_thread .log_exit_exception
6686 def _loop_for_observer (self ):
6787 """
@@ -74,23 +94,7 @@ def _loop_for_observer(self):
7494 while not self ._request_end .is_set ():
7595 if next (heartbeat ):
7696 logging .getLogger ("moler_threads" ).debug ("ALIVE" )
77- try :
78- data , timestamp = self ._queue .get (True , self ._timeout_for_get_from_queue )
79- try :
80- self .logger .log (level = TRACE , msg = f'notifying { self ._observer } ({ repr (data )} )' )
81- except ReferenceError :
82- self ._request_end .set () # self._observer is no more valid.
83- try :
84- if self ._observer_self :
85- self ._observer (self ._observer_self , data , timestamp )
86- else :
87- self ._observer (data , timestamp )
88- except ReferenceError :
89- self ._request_end .set () # self._observer is no more valid.
90- except Exception as ex :
91- self ._handle_unexpected_error_from_observer (exception = ex , data = data , timestamp = timestamp )
92- except queue .Empty :
93- pass # No incoming data within self._timeout_for_get_from_queue
97+ self ._process_data_from_queue ()
9498 self ._observer = None
9599 self ._observer_self = None
96100 logging .getLogger ("moler_threads" ).debug ("EXIT" )
0 commit comments