Skip to content
Merged
165 changes: 0 additions & 165 deletions sdks/python/apache_beam/internal/metrics/cells.py

This file was deleted.

77 changes: 0 additions & 77 deletions sdks/python/apache_beam/internal/metrics/cells_test.py

This file was deleted.

44 changes: 0 additions & 44 deletions sdks/python/apache_beam/internal/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,13 @@
from typing import Type
from typing import Union

from apache_beam.internal.metrics.cells import HistogramCellFactory
from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.execution import MetricUpdater
from apache_beam.metrics.metric import Metrics as UserMetrics
from apache_beam.metrics.metricbase import Histogram
from apache_beam.metrics.metricbase import MetricName

if TYPE_CHECKING:
from apache_beam.metrics.cells import MetricCell
from apache_beam.metrics.cells import MetricCellFactory
from apache_beam.utils.histogram import BucketType

# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
Expand Down Expand Up @@ -82,46 +78,6 @@ def counter(
MetricName(namespace=None, name=None, urn=urn, labels=labels),
process_wide=process_wide)

@staticmethod
def histogram(
namespace: Union[Type, str],
name: str,
bucket_type: 'BucketType',
logger: Optional['MetricLogger'] = None) -> 'Metrics.DelegatingHistogram':
"""Obtains or creates a Histogram metric.

Args:
namespace: A class or string that gives the namespace to a metric
name: A string that gives a unique name to a metric
bucket_type: A type of bucket used in a histogram. A subclass of
apache_beam.utils.histogram.BucketType
logger: MetricLogger for logging locally aggregated metric

Returns:
A Histogram object.
"""
namespace = UserMetrics.get_namespace(namespace)
return Metrics.DelegatingHistogram(
MetricName(namespace, name), bucket_type, logger)

class DelegatingHistogram(Histogram):
"""Metrics Histogram that Delegates functionality to MetricsEnvironment."""
def __init__(
self,
metric_name: MetricName,
bucket_type: 'BucketType',
logger: Optional['MetricLogger']) -> None:
super().__init__(metric_name)
self.metric_name = metric_name
self.cell_type = HistogramCellFactory(bucket_type)
self.logger = logger
self.updater = MetricUpdater(self.cell_type, self.metric_name)

def update(self, value: object) -> None:
self.updater(value)
if self.logger:
self.logger.update(self.cell_type, self.metric_name, value)


class MetricLogger(object):
"""Simple object to locally aggregate and log metrics."""
Expand Down
Loading
Loading