diff --git a/sdks/python/apache_beam/internal/metrics/cells.py b/sdks/python/apache_beam/internal/metrics/cells.py deleted file mode 100644 index 4180a5bfb829..000000000000 --- a/sdks/python/apache_beam/internal/metrics/cells.py +++ /dev/null @@ -1,165 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -This file contains internal metric cell classes. A metric cell is used to -accumulate in-memory changes to a metric. It represents a specific metric -in a single context. - -For internal use only. No backwards compatibility guarantees. -""" - -# pytype: skip-file - -from typing import TYPE_CHECKING -from typing import Optional - -from apache_beam.metrics.cells import MetricCell -from apache_beam.metrics.cells import MetricCellFactory -from apache_beam.portability.api import metrics_pb2 -from apache_beam.utils.histogram import Histogram - -if TYPE_CHECKING: - from apache_beam.utils.histogram import BucketType - - -class HistogramCell(MetricCell): - """For internal use only; no backwards-compatibility guarantees. - - Tracks the current value and delta for a histogram metric. - - Each cell tracks the state of a metric independently per context per bundle. - Therefore, each metric has a different cell in each bundle, that is later - aggregated. - - This class is thread safe since underlying histogram object is thread safe. - """ - def __init__(self, bucket_type): - self._bucket_type = bucket_type - self.data = HistogramData.identity_element(bucket_type) - - def reset(self): - self.data = HistogramData.identity_element(self._bucket_type) - - def combine(self, other: 'HistogramCell') -> 'HistogramCell': - result = HistogramCell(self._bucket_type) - result.data = self.data.combine(other.data) - return result - - def update(self, value): - self.data.histogram.record(value) - - def get_cumulative(self) -> 'HistogramData': - return self.data.get_cumulative() - - def to_runner_api_monitoring_info(self, name, transform_id): - from apache_beam.metrics import monitoring_infos - return monitoring_infos.user_histogram( - name.namespace, - name.name, - self.get_cumulative(), - ptransform=transform_id) - - -class HistogramCellFactory(MetricCellFactory): - def __init__(self, bucket_type): - self._bucket_type = bucket_type - - def __call__(self): - return HistogramCell(self._bucket_type) - - def __eq__(self, other): - if not isinstance(other, HistogramCellFactory): - return False - return self._bucket_type == other._bucket_type - - def __hash__(self): - return hash(self._bucket_type) - - -class HistogramResult(object): - def __init__(self, data: 'HistogramData') -> None: - self.data = data - - def __eq__(self, other): - if isinstance(other, HistogramResult): - return self.data == other.data - else: - return False - - def __hash__(self): - return hash(self.data) - - def __repr__(self): - return ''.format( - self.data.histogram.get_percentile_info()) - - @property - def p99(self): - return self.data.histogram.p99() - - @property - def p95(self): - return self.data.histogram.p95() - - @property - def p90(self): - return self.data.histogram.p90() - - -class HistogramData(object): - """For internal use only; no backwards-compatibility guarantees. - - The data structure that holds data about a histogram metric. - - This object is not thread safe, so it's not supposed to be modified - outside the HistogramCell. - """ - def __init__(self, histogram): - self.histogram = histogram - - def __eq__(self, other): - return self.histogram == other.histogram - - def __hash__(self): - return hash(self.histogram) - - def __repr__(self): - return 'HistogramData({})'.format(self.histogram.get_percentile_info()) - - def get_cumulative(self) -> 'HistogramData': - return HistogramData(self.histogram) - - def combine(self, other: Optional['HistogramData']) -> 'HistogramData': - if other is None: - return self - - return HistogramData(self.histogram.combine(other.histogram)) - - @staticmethod - def identity_element(bucket_type) -> 'HistogramData': - return HistogramData(Histogram(bucket_type)) - - def to_proto(self) -> metrics_pb2.HistogramValue: - return self.histogram.to_runner_api() - - @classmethod - def from_proto(cls, proto: metrics_pb2.HistogramValue): - return cls(Histogram.from_runner_api(proto)) - - def get_result(self): - return self.histogram diff --git a/sdks/python/apache_beam/internal/metrics/cells_test.py b/sdks/python/apache_beam/internal/metrics/cells_test.py deleted file mode 100644 index 066dec4a2635..000000000000 --- a/sdks/python/apache_beam/internal/metrics/cells_test.py +++ /dev/null @@ -1,77 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# pytype: skip-file - -import threading -import unittest - -from apache_beam.internal.metrics.cells import HistogramCell -from apache_beam.internal.metrics.cells import HistogramCellFactory -from apache_beam.internal.metrics.cells import HistogramData -from apache_beam.utils.histogram import Histogram -from apache_beam.utils.histogram import LinearBucket - - -class TestHistogramCell(unittest.TestCase): - @classmethod - def _modify_histogram(cls, d): - for i in range(cls.NUM_ITERATIONS): - d.update(i) - - NUM_THREADS = 5 - NUM_ITERATIONS = 100 - - def test_parallel_access(self): - # We create NUM_THREADS threads that concurrently modify the distribution. - threads = [] - bucket_type = LinearBucket(0, 1, 100) - d = HistogramCell(bucket_type) - for _ in range(TestHistogramCell.NUM_THREADS): - t = threading.Thread( - target=TestHistogramCell._modify_histogram, args=(d, )) - threads.append(t) - t.start() - - for t in threads: - t.join() - - histogram = Histogram(bucket_type) - for _ in range(self.NUM_THREADS): - for i in range(self.NUM_ITERATIONS): - histogram.record(i) - - self.assertEqual(d.get_cumulative(), HistogramData(histogram)) - - def test_basic_operations(self): - d = HistogramCellFactory(LinearBucket(0, 1, 10))() - d.update(10) - self.assertEqual( - str(d.get_cumulative()), - 'HistogramData(Total count: 1, P99: >=10, P90: >=10, P50: >=10)') - d.update(0) - self.assertEqual( - str(d.get_cumulative()), - 'HistogramData(Total count: 2, P99: >=10, P90: >=10, P50: 1)') - d.update(5) - self.assertEqual( - str(d.get_cumulative()), - 'HistogramData(Total count: 3, P99: >=10, P90: >=10, P50: 6)') - - -if __name__ == '__main__': - unittest.main() diff --git a/sdks/python/apache_beam/internal/metrics/metric.py b/sdks/python/apache_beam/internal/metrics/metric.py index 19e2694acc8d..6f6788e059bd 100644 --- a/sdks/python/apache_beam/internal/metrics/metric.py +++ b/sdks/python/apache_beam/internal/metrics/metric.py @@ -35,17 +35,13 @@ from typing import Type from typing import Union -from apache_beam.internal.metrics.cells import HistogramCellFactory from apache_beam.metrics import monitoring_infos -from apache_beam.metrics.execution import MetricUpdater from apache_beam.metrics.metric import Metrics as UserMetrics -from apache_beam.metrics.metricbase import Histogram from apache_beam.metrics.metricbase import MetricName if TYPE_CHECKING: from apache_beam.metrics.cells import MetricCell from apache_beam.metrics.cells import MetricCellFactory - from apache_beam.utils.histogram import BucketType # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position @@ -82,46 +78,6 @@ def counter( MetricName(namespace=None, name=None, urn=urn, labels=labels), process_wide=process_wide) - @staticmethod - def histogram( - namespace: Union[Type, str], - name: str, - bucket_type: 'BucketType', - logger: Optional['MetricLogger'] = None) -> 'Metrics.DelegatingHistogram': - """Obtains or creates a Histogram metric. - - Args: - namespace: A class or string that gives the namespace to a metric - name: A string that gives a unique name to a metric - bucket_type: A type of bucket used in a histogram. A subclass of - apache_beam.utils.histogram.BucketType - logger: MetricLogger for logging locally aggregated metric - - Returns: - A Histogram object. - """ - namespace = UserMetrics.get_namespace(namespace) - return Metrics.DelegatingHistogram( - MetricName(namespace, name), bucket_type, logger) - - class DelegatingHistogram(Histogram): - """Metrics Histogram that Delegates functionality to MetricsEnvironment.""" - def __init__( - self, - metric_name: MetricName, - bucket_type: 'BucketType', - logger: Optional['MetricLogger']) -> None: - super().__init__(metric_name) - self.metric_name = metric_name - self.cell_type = HistogramCellFactory(bucket_type) - self.logger = logger - self.updater = MetricUpdater(self.cell_type, self.metric_name) - - def update(self, value: object) -> None: - self.updater(value) - if self.logger: - self.logger.update(self.cell_type, self.metric_name, value) - class MetricLogger(object): """Simple object to locally aggregate and log metrics.""" diff --git a/sdks/python/apache_beam/internal/metrics/metric_test.py b/sdks/python/apache_beam/internal/metrics/metric_test.py deleted file mode 100644 index c547c8c534b1..000000000000 --- a/sdks/python/apache_beam/internal/metrics/metric_test.py +++ /dev/null @@ -1,130 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# pytype: skip-file - -import re -import unittest - -from mock import patch - -import apache_beam as beam -from apache_beam.internal.metrics.cells import HistogramCellFactory -from apache_beam.internal.metrics.metric import Metrics as InternalMetrics -from apache_beam.internal.metrics.metric import MetricLogger -from apache_beam.metrics.execution import MetricsContainer -from apache_beam.metrics.execution import MetricsEnvironment -from apache_beam.metrics.metric import Metrics -from apache_beam.metrics.metric import MetricsFilter -from apache_beam.metrics.metricbase import MetricName -from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner -from apache_beam.runners.worker import statesampler -from apache_beam.utils import counters -from apache_beam.utils.histogram import LinearBucket - - -class MetricLoggerTest(unittest.TestCase): - @patch('apache_beam.internal.metrics.metric._LOGGER') - def test_log_metrics(self, mock_logger): - logger = MetricLogger() - logger.minimum_logging_frequency_msec = -1 - namespace = Metrics.get_namespace(self.__class__) - metric_name = MetricName(namespace, 'metric_logger_test') - logger.update(HistogramCellFactory(LinearBucket(0, 1, 10)), metric_name, 1) - logger.log_metrics() - - class Contains(str): - def __eq__(self, other): - return self in other - - mock_logger.info.assert_called_once_with( - Contains('HistogramData(Total count: 1, P99: 2, P90: 2, P50: 2)')) - - -class MetricsTest(unittest.TestCase): - def test_create_process_wide(self): - sampler = statesampler.StateSampler('', counters.CounterFactory()) - statesampler.set_current_tracker(sampler) - state1 = sampler.scoped_state( - 'mystep', 'myState', metrics_container=MetricsContainer('mystep')) - - try: - sampler.start() - with state1: - urn = "my:custom:urn" - labels = {'key': 'value'} - counter = InternalMetrics.counter( - urn=urn, labels=labels, process_wide=True) - # Test that if process_wide is set, that it will be set - # on the process_wide container. - counter.inc(10) - self.assertTrue(isinstance(counter, Metrics.DelegatingCounter)) - - del counter - - metric_name = MetricName(None, None, urn=urn, labels=labels) - # Expect a value set on the current container. - self.assertEqual( - MetricsEnvironment.process_wide_container().get_counter( - metric_name).get_cumulative(), - 10) - # Expect no value set on the current container. - self.assertEqual( - MetricsEnvironment.current_container().get_counter( - metric_name).get_cumulative(), - 0) - finally: - sampler.stop() - - -class HistogramTest(unittest.TestCase): - def test_histogram(self): - class WordExtractingDoFn(beam.DoFn): - def __init__(self): - super().__init__() - self.word_lengths_dist = InternalMetrics.histogram( - self.__class__, - 'latency_histogram_ms', - LinearBucket(0, 1, num_buckets=10)) - - def process(self, element): - text_line = element.strip() - words = re.findall(r'[\w\']+', text_line, re.UNICODE) - for w in words: - self.word_lengths_dist.update(len(w)) - return words - - with beam.Pipeline(runner=BundleBasedDirectRunner()) as p: - lines = p | 'read' >> beam.Create(["x x x yyyyyy yyyyyy yyyyyy"]) - _ = ( - lines - | 'split' >> - (beam.ParDo(WordExtractingDoFn()).with_output_types(str))) - - result = p.result - - filter = MetricsFilter().with_name('latency_histogram_ms') - query_result = result.metrics().query(filter) - histogram = query_result['histograms'][0].committed - assert histogram._buckets == {1: 3, 6: 3} - assert histogram.total_count() == 6 - assert 1 < histogram.get_linear_interpolation(0.50) < 3 - assert histogram.get_linear_interpolation(0.99) > 3 - - -if __name__ == '__main__': - unittest.main() diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 889d3f1e96e3..f56607e0bf3b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -26,6 +26,7 @@ """ # pytype: skip-file +# pylint: disable=wrong-import-order, wrong-import-position import datetime import decimal @@ -53,12 +54,12 @@ from apache_beam.internal.gcp.json_value import from_json_value from apache_beam.internal.http_client import get_new_http from apache_beam.internal.metrics.metric import MetricLogger -from apache_beam.internal.metrics.metric import Metrics from apache_beam.internal.metrics.metric import ServiceCallMetric from apache_beam.io.gcp import bigquery_avro_tools from apache_beam.io.gcp import resource_identifiers from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.metrics import monitoring_infos +from apache_beam.metrics.metric import Metrics from apache_beam.options import value_provider from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.transforms import DoFn @@ -68,12 +69,13 @@ from apache_beam.utils.histogram import LinearBucket # Protect against environments where bigquery library is not available. -# pylint: disable=wrong-import-order, wrong-import-position try: + from apitools.base.py.exceptions import HttpError + from apitools.base.py.exceptions import HttpForbiddenError from apitools.base.py.transfer import Upload - from apitools.base.py.exceptions import HttpError, HttpForbiddenError - from google.api_core.exceptions import ClientError, GoogleAPICallError from google.api_core.client_info import ClientInfo + from google.api_core.exceptions import ClientError + from google.api_core.exceptions import GoogleAPICallError from google.cloud import bigquery as gcp_bigquery except ImportError: gcp_bigquery = None @@ -418,7 +420,7 @@ def _get_temp_table(self, project_id): def _get_temp_table_project(self, fallback_project_id): """Returns the project ID for temporary table operations. - + If temp_table_ref exists, returns its projectId. Otherwise, returns the fallback_project_id. """ diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py index b4703c5b5b96..0eb0e53e1d84 100644 --- a/sdks/python/apache_beam/metrics/cells.py +++ b/sdks/python/apache_beam/metrics/cells.py @@ -34,6 +34,7 @@ from typing import Set from apache_beam.portability.api import metrics_pb2 +from apache_beam.utils.histogram import Histogram try: import cython @@ -903,3 +904,133 @@ def singleton(value: str) -> "BoundedTrieData": @staticmethod def identity_element() -> "BoundedTrieData": return BoundedTrieData() + + +class HistogramCell(MetricCell): + """For internal use only; no backwards-compatibility guarantees. + + Tracks the current value and delta for a histogram metric. + + Each cell tracks the state of a metric independently per context per bundle. + Therefore, each metric has a different cell in each bundle, that is later + aggregated. + + This class is thread safe since underlying histogram object is thread safe. + """ + def __init__(self, bucket_type): + self._bucket_type = bucket_type + self.data = HistogramData.identity_element(bucket_type) + + def reset(self): + self.data = HistogramData.identity_element(self._bucket_type) + + def combine(self, other: 'HistogramCell') -> 'HistogramCell': + result = HistogramCell(self._bucket_type) + result.data = self.data.combine(other.data) + return result + + def update(self, value): + self.data.histogram.record(value) + + def get_cumulative(self) -> 'HistogramData': + return self.data.get_cumulative() + + def to_runner_api_monitoring_info(self, name, transform_id): + # Histogram metric is currently worker-local and internal + # use only. This method should be implemented when runners + # support Histogram metric reporting. + return None + + +class HistogramCellFactory(MetricCellFactory): + def __init__(self, bucket_type): + self._bucket_type = bucket_type + + def __call__(self): + return HistogramCell(self._bucket_type) + + def __eq__(self, other): + if not isinstance(other, HistogramCellFactory): + return False + return self._bucket_type == other._bucket_type + + def __hash__(self): + return hash(self._bucket_type) + + +class HistogramResult(object): + def __init__(self, data: 'HistogramData') -> None: + self.data = data + + def __eq__(self, other): + if isinstance(other, HistogramResult): + return self.data == other.data + else: + return False + + def __hash__(self): + return hash(self.data) + + def __repr__(self): + return ''.format( + self.data.histogram.get_percentile_info()) + + @property + def p99(self): + return self.data.histogram.p99() + + @property + def p95(self): + return self.data.histogram.p95() + + @property + def p90(self): + return self.data.histogram.p90() + + @property + def histogram(self): + return self.data.histogram + + +class HistogramData(object): + """For internal use only; no backwards-compatibility guarantees. + + The data structure that holds data about a histogram metric. + + This object is not thread safe, so it's not supposed to be modified + outside the HistogramCell. + """ + def __init__(self, histogram): + self.histogram = histogram + + def __eq__(self, other): + return self.histogram == other.histogram + + def __hash__(self): + return hash(self.histogram) + + def __repr__(self): + return 'HistogramData({})'.format(self.histogram.get_percentile_info()) + + def get_cumulative(self) -> 'HistogramData': + return HistogramData(self.histogram) + + def combine(self, other: Optional['HistogramData']) -> 'HistogramData': + if other is None: + return self + + return HistogramData(self.histogram.combine(other.histogram)) + + @staticmethod + def identity_element(bucket_type) -> 'HistogramData': + return HistogramData(Histogram(bucket_type)) + + def get_result(self) -> 'HistogramResult': + return HistogramResult(self.get_cumulative()) + + def to_proto(self) -> metrics_pb2.HistogramValue: + return self.histogram.to_runner_api() + + @classmethod + def from_proto(cls, proto: metrics_pb2.HistogramValue): + return cls(Histogram.from_runner_api(proto)) diff --git a/sdks/python/apache_beam/metrics/cells_test.py b/sdks/python/apache_beam/metrics/cells_test.py index 106f7542b230..11ea20ed6f6d 100644 --- a/sdks/python/apache_beam/metrics/cells_test.py +++ b/sdks/python/apache_beam/metrics/cells_test.py @@ -29,10 +29,15 @@ from apache_beam.metrics.cells import DistributionData from apache_beam.metrics.cells import GaugeCell from apache_beam.metrics.cells import GaugeData +from apache_beam.metrics.cells import HistogramCell +from apache_beam.metrics.cells import HistogramCellFactory +from apache_beam.metrics.cells import HistogramData from apache_beam.metrics.cells import StringSetCell from apache_beam.metrics.cells import StringSetData from apache_beam.metrics.cells import _BoundedTrieNode from apache_beam.metrics.metricbase import MetricName +from apache_beam.utils.histogram import Histogram +from apache_beam.utils.histogram import LinearBucket class TestCounterCell(unittest.TestCase): @@ -439,5 +444,51 @@ def test_merge_with_empty_node(self): self.assertFalse(root1._truncated) +class TestHistogramCell(unittest.TestCase): + @classmethod + def _modify_histogram(cls, d): + for i in range(cls.NUM_ITERATIONS): + d.update(i) + + NUM_THREADS = 5 + NUM_ITERATIONS = 100 + + def test_parallel_access(self): + # We create NUM_THREADS threads that concurrently modify the distribution. + threads = [] + bucket_type = LinearBucket(0, 1, 100) + d = HistogramCell(bucket_type) + for _ in range(TestHistogramCell.NUM_THREADS): + t = threading.Thread( + target=TestHistogramCell._modify_histogram, args=(d, )) + threads.append(t) + t.start() + + for t in threads: + t.join() + + histogram = Histogram(bucket_type) + for _ in range(self.NUM_THREADS): + for i in range(self.NUM_ITERATIONS): + histogram.record(i) + + self.assertEqual(d.get_cumulative(), HistogramData(histogram)) + + def test_basic_operations(self): + d = HistogramCellFactory(LinearBucket(0, 1, 10))() + d.update(10) + self.assertEqual( + str(d.get_cumulative()), + 'HistogramData(Total count: 1, P99: >=10, P90: >=10, P50: >=10)') + d.update(0) + self.assertEqual( + str(d.get_cumulative()), + 'HistogramData(Total count: 2, P99: >=10, P90: >=10, P50: 1)') + d.update(5) + self.assertEqual( + str(d.get_cumulative()), + 'HistogramData(Total count: 3, P99: >=10, P90: >=10, P50: 6)') + + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py index 602cbe93729d..ede0975ddb65 100644 --- a/sdks/python/apache_beam/metrics/execution.py +++ b/sdks/python/apache_beam/metrics/execution.py @@ -42,13 +42,13 @@ from typing import Union from typing import cast -from apache_beam.internal.metrics.cells import HistogramCellFactory -from apache_beam.internal.metrics.cells import HistogramData from apache_beam.metrics import monitoring_infos from apache_beam.metrics.cells import BoundedTrieCell from apache_beam.metrics.cells import CounterCell from apache_beam.metrics.cells import DistributionCell from apache_beam.metrics.cells import GaugeCell +from apache_beam.metrics.cells import HistogramCellFactory +from apache_beam.metrics.cells import HistogramData from apache_beam.metrics.cells import StringSetCell from apache_beam.metrics.cells import StringSetData from apache_beam.runners.worker import statesampler @@ -56,8 +56,8 @@ if TYPE_CHECKING: from apache_beam.metrics.cells import BoundedTrieData - from apache_beam.metrics.cells import GaugeData from apache_beam.metrics.cells import DistributionData + from apache_beam.metrics.cells import GaugeData from apache_beam.metrics.cells import MetricCell from apache_beam.metrics.cells import MetricCellFactory from apache_beam.metrics.metricbase import MetricName diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index 4221b36f0b84..a812ef7f3366 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -41,18 +41,22 @@ from typing import Union from apache_beam.metrics import cells +from apache_beam.metrics.cells import HistogramCellFactory from apache_beam.metrics.execution import MetricResult from apache_beam.metrics.execution import MetricUpdater from apache_beam.metrics.metricbase import BoundedTrie from apache_beam.metrics.metricbase import Counter from apache_beam.metrics.metricbase import Distribution from apache_beam.metrics.metricbase import Gauge +from apache_beam.metrics.metricbase import Histogram from apache_beam.metrics.metricbase import MetricName from apache_beam.metrics.metricbase import StringSet if TYPE_CHECKING: from apache_beam.metrics.execution import MetricKey from apache_beam.metrics.metricbase import Metric + from apache_beam.utils.histogram import BucketType + from apache_beam.internal.metrics.metric import MetricLogger __all__ = ['Metrics', 'MetricsFilter', 'Lineage'] @@ -153,6 +157,46 @@ def bounded_trie( namespace = Metrics.get_namespace(namespace) return Metrics.DelegatingBoundedTrie(MetricName(namespace, name)) + @staticmethod + def histogram( + namespace: Union[Type, str], + name: str, + bucket_type: 'BucketType', + logger: Optional['MetricLogger'] = None) -> 'Metrics.DelegatingHistogram': + """Obtains or creates a Histogram metric. + + Args: + namespace: A class or string that gives the namespace to a metric + name: A string that gives a unique name to a metric + bucket_type: A type of bucket used in a histogram. A subclass of + apache_beam.utils.histogram.BucketType + logger: MetricLogger for logging locally aggregated metric + + Returns: + A Histogram object. + """ + namespace = Metrics.get_namespace(namespace) + return Metrics.DelegatingHistogram( + MetricName(namespace, name), bucket_type, logger) + + class DelegatingHistogram(Histogram): + """Metrics Histogram that Delegates functionality to MetricsEnvironment.""" + def __init__( + self, + metric_name: MetricName, + bucket_type: 'BucketType', + logger: Optional['MetricLogger']) -> None: + super().__init__(metric_name) + self.metric_name = metric_name + self.cell_type = HistogramCellFactory(bucket_type) + self.logger = logger + self.updater = MetricUpdater(self.cell_type, self.metric_name) + + def update(self, value: object) -> None: + self.updater(value) + if self.logger: + self.logger.update(self.cell_type, self.metric_name, value) + class DelegatingCounter(Counter): """Metrics Counter that Delegates functionality to MetricsEnvironment.""" def __init__( diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py index bdba0512dfa2..bcfe236826ae 100644 --- a/sdks/python/apache_beam/metrics/metric_test.py +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -16,7 +16,7 @@ # # pytype: skip-file - +import re import unittest import hamcrest as hc @@ -33,6 +33,7 @@ from apache_beam.metrics.metric import Metrics from apache_beam.metrics.metric import MetricsFilter from apache_beam.metrics.metricbase import MetricName +from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner from apache_beam.runners.worker import statesampler from apache_beam.testing.metric_result_matchers import DistributionMatcher from apache_beam.testing.metric_result_matchers import MetricResultMatcher @@ -40,6 +41,7 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.utils import counters +from apache_beam.utils.histogram import LinearBucket class NameTest(unittest.TestCase): @@ -285,5 +287,40 @@ def test_add(self): ('sys:', 'seg1.', 'seg2.', 'seg3/', 'part2/', 'part3')}) +class HistogramTest(unittest.TestCase): + def test_histogram(self): + class WordExtractingDoFn(beam.DoFn): + def __init__(self): + super().__init__() + self.word_lengths_dist = Metrics.histogram( + self.__class__, + 'latency_histogram_ms', + LinearBucket(0, 1, num_buckets=10)) + + def process(self, element): + text_line = element.strip() + words = re.findall(r'[\w\']+', text_line, re.UNICODE) + for w in words: + self.word_lengths_dist.update(len(w)) + return words + + with beam.Pipeline(runner=BundleBasedDirectRunner()) as p: + lines = p | 'read' >> beam.Create(["x x x yyyyyy yyyyyy yyyyyy"]) + _ = ( + lines + | 'split' >> + (beam.ParDo(WordExtractingDoFn()).with_output_types(str))) + + result = p.result + + filter = MetricsFilter().with_name('latency_histogram_ms') + query_result = result.metrics().query(filter) + histogram = query_result['histograms'][0].committed.histogram + assert histogram._buckets == {1: 3, 6: 3} + assert histogram.total_count() == 6 + assert 1 < histogram.get_linear_interpolation(0.50) < 3 + assert histogram.get_linear_interpolation(0.99) > 3 + + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py index 1d340a9695ed..294bcef039a8 100644 --- a/sdks/python/apache_beam/metrics/monitoring_infos.py +++ b/sdks/python/apache_beam/metrics/monitoring_infos.py @@ -27,12 +27,12 @@ from apache_beam.coders import coder_impl from apache_beam.coders import coders -from apache_beam.internal.metrics.cells import HistogramData from apache_beam.metrics.cells import BoundedTrieData from apache_beam.metrics.cells import DistributionData from apache_beam.metrics.cells import DistributionResult from apache_beam.metrics.cells import GaugeData from apache_beam.metrics.cells import GaugeResult +from apache_beam.metrics.cells import HistogramData from apache_beam.metrics.cells import StringSetData from apache_beam.portability import common_urns from apache_beam.portability.api import metrics_pb2 diff --git a/sdks/python/apache_beam/metrics/monitoring_infos_test.py b/sdks/python/apache_beam/metrics/monitoring_infos_test.py index c658cea47a87..c55c11a87286 100644 --- a/sdks/python/apache_beam/metrics/monitoring_infos_test.py +++ b/sdks/python/apache_beam/metrics/monitoring_infos_test.py @@ -18,11 +18,11 @@ import unittest -from apache_beam.internal.metrics.cells import HistogramCell -from apache_beam.internal.metrics.cells import HistogramData from apache_beam.metrics import monitoring_infos from apache_beam.metrics.cells import CounterCell from apache_beam.metrics.cells import GaugeCell +from apache_beam.metrics.cells import HistogramCell +from apache_beam.metrics.cells import HistogramData from apache_beam.metrics.cells import StringSetCell from apache_beam.utils.histogram import Histogram from apache_beam.utils.histogram import LinearBucket