Skip to content

Commit 78140f9

Browse files
committed
Further optimizations
1 parent 74bbb16 commit 78140f9

12 files changed

Lines changed: 1014 additions & 177 deletions

File tree

QuantileFlow/ddsketch/core.py

Lines changed: 85 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
"""Core DDSketch implementation."""
1+
"""Core DDSketch implementation.
2+
3+
Optimized for high throughput to match or exceed Datadog's implementation.
4+
"""
25

36
from typing import Literal, Union
47
from .mapping.logarithmic import LogarithmicMapping
@@ -8,6 +11,7 @@
811
from .storage.contiguous import ContiguousStorage
912
from .storage.sparse import SparseStorage
1013

14+
1115
class DDSketch:
1216
"""
1317
DDSketch implementation for quantile approximation with relative-error guarantees.
@@ -21,6 +25,9 @@ class DDSketch:
2125
by Charles Masson, Jee E. Rim and Homin K. Lee
2226
"""
2327

28+
__slots__ = ('relative_accuracy', 'cont_neg', 'mapping', 'positive_store',
29+
'negative_store', 'count', 'zero_count', '_min', '_max', '_sum')
30+
2431
def __init__(
2532
self,
2633
relative_accuracy: float,
@@ -54,7 +61,6 @@ def __init__(
5461
self.relative_accuracy = relative_accuracy
5562
self.cont_neg = cont_neg
5663

57-
5864
# Initialize mapping scheme
5965
if mapping_type == 'logarithmic':
6066
self.mapping = LogarithmicMapping(relative_accuracy)
@@ -71,31 +77,47 @@ def __init__(
7177
self.positive_store = SparseStorage(strategy=bucket_strategy)
7278
self.negative_store = SparseStorage(strategy=bucket_strategy) if cont_neg else None
7379

74-
self.count = 0
75-
self.zero_count = 0
80+
self.count = 0.0
81+
self.zero_count = 0.0
82+
83+
# Summary stats (like Datadog)
84+
self._min = float('+inf')
85+
self._max = float('-inf')
86+
self._sum = 0.0
7687

77-
def insert(self, value: Union[int, float]) -> None:
88+
def insert(self, value: Union[int, float], weight: float = 1.0) -> None:
7889
"""
7990
Insert a value into the sketch.
8091
8192
Args:
8293
value: The value to insert.
94+
weight: The weight of the value (default 1.0).
8395
8496
Raises:
8597
ValueError: If value is negative and cont_neg is False.
8698
"""
8799
if value > 0:
88-
bucket_idx = self.mapping.compute_bucket_index(value)
89-
self.positive_store.add(bucket_idx)
100+
self.positive_store.add(self.mapping.compute_bucket_index(value), weight)
90101
elif value < 0:
91102
if self.cont_neg:
92-
bucket_idx = self.mapping.compute_bucket_index(-value)
93-
self.negative_store.add(bucket_idx)
103+
self.negative_store.add(self.mapping.compute_bucket_index(-value), weight)
94104
else:
95105
raise ValueError("Negative values not supported when cont_neg is False")
96106
else:
97-
self.zero_count += 1
98-
self.count += 1
107+
self.zero_count += weight
108+
109+
# Track summary stats
110+
self.count += weight
111+
self._sum += value * weight
112+
if value < self._min:
113+
self._min = value
114+
if value > self._max:
115+
self._max = value
116+
117+
# Alias for compatibility with Datadog's API
118+
def add(self, value: Union[int, float], weight: float = 1.0) -> None:
119+
"""Alias for insert() to match Datadog's API."""
120+
self.insert(value, weight)
99121

100122
def delete(self, value: Union[int, float]) -> None:
101123
"""
@@ -125,6 +147,7 @@ def delete(self, value: Union[int, float]) -> None:
125147

126148
if deleted:
127149
self.count -= 1
150+
self._sum -= value
128151

129152
def quantile(self, q: float) -> float:
130153
"""
@@ -146,32 +169,52 @@ def quantile(self, q: float) -> float:
146169

147170
rank = q * (self.count - 1)
148171

149-
if self.cont_neg:
150-
neg_count = self.negative_store.total_count
172+
if self.cont_neg and self.negative_store is not None:
173+
neg_count = self.negative_store.count
151174
if rank < neg_count:
152-
# Handle negative values
153-
curr_count = 0
154-
if self.negative_store.min_index is not None:
155-
for idx in range(self.negative_store.max_index, self.negative_store.min_index - 1, -1):
156-
bucket_count = self.negative_store.get_count(idx)
157-
curr_count += bucket_count
158-
if curr_count > rank:
159-
return -self.mapping.compute_value_from_index(idx)
175+
# Handle negative values - use reversed rank
176+
reversed_rank = neg_count - rank - 1
177+
key = self.negative_store.key_at_rank(reversed_rank, lower=False)
178+
return -self.mapping.compute_value_from_index(key)
160179
rank -= neg_count
161180

162181
if rank < self.zero_count:
163-
return 0
182+
return 0.0
164183
rank -= self.zero_count
165184

166-
curr_count = 0
167-
if self.positive_store.min_index is not None:
168-
for idx in range(self.positive_store.min_index, self.positive_store.max_index + 1):
169-
bucket_count = self.positive_store.get_count(idx)
170-
curr_count += bucket_count
171-
if curr_count > rank:
172-
return self.mapping.compute_value_from_index(idx)
173-
174-
return float('inf')
185+
# Use key_at_rank for consistency with storage implementation
186+
key = self.positive_store.key_at_rank(rank)
187+
return self.mapping.compute_value_from_index(key)
188+
189+
# Alias for Datadog compatibility
190+
def get_quantile_value(self, quantile: float) -> float:
191+
"""Alias for quantile() to match Datadog's API."""
192+
try:
193+
return self.quantile(quantile)
194+
except ValueError:
195+
return None
196+
197+
@property
198+
def avg(self) -> float:
199+
"""Return the exact average of values added to the sketch."""
200+
if self.count == 0:
201+
return 0.0
202+
return self._sum / self.count
203+
204+
@property
205+
def sum(self) -> float:
206+
"""Return the exact sum of values added to the sketch."""
207+
return self._sum
208+
209+
@property
210+
def min(self) -> float:
211+
"""Return the minimum value added to the sketch."""
212+
return self._min
213+
214+
@property
215+
def max(self) -> float:
216+
"""Return the maximum value added to the sketch."""
217+
return self._max
175218

176219
def merge(self, other: 'DDSketch') -> None:
177220
"""
@@ -185,12 +228,20 @@ def merge(self, other: 'DDSketch') -> None:
185228
"""
186229
if self.relative_accuracy != other.relative_accuracy:
187230
raise ValueError("Cannot merge sketches with different relative accuracies")
231+
232+
if other.count == 0:
233+
return
188234

189235
self.positive_store.merge(other.positive_store)
190-
if self.cont_neg and other.cont_neg:
236+
if self.cont_neg and other.cont_neg and other.negative_store is not None:
191237
self.negative_store.merge(other.negative_store)
192-
elif other.cont_neg and sum(other.negative_store.counts.values()) > 0:
238+
elif other.cont_neg and other.negative_store is not None and other.negative_store.count > 0:
193239
raise ValueError("Cannot merge sketch containing negative values when cont_neg is False")
194240

195241
self.zero_count += other.zero_count
196-
self.count += other.count
242+
self.count += other.count
243+
self._sum += other._sum
244+
if other._min < self._min:
245+
self._min = other._min
246+
if other._max > self._max:
247+
self._max = other._max

QuantileFlow/ddsketch/mapping/logarithmic.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,40 @@
33
import math
44
from .base import MappingScheme
55

6+
67
class LogarithmicMapping(MappingScheme):
8+
"""
9+
A memory-optimal KeyMapping that uses logarithmic mapping.
10+
11+
Given a targeted relative accuracy, it requires the least number of keys
12+
to cover a given range of values.
13+
"""
714
__slots__ = ('relative_accuracy', 'gamma', 'multiplier')
815

916
def __init__(self, relative_accuracy: float):
1017
self.relative_accuracy = relative_accuracy
1118
self.gamma = (1 + relative_accuracy) / (1 - relative_accuracy)
1219
self.multiplier = 1 / math.log(self.gamma)
20+
21+
def key(self, value: float) -> int:
22+
"""Alias for compute_bucket_index for Datadog API compatibility."""
23+
return self.compute_bucket_index(value)
24+
25+
def value(self, key: int) -> float:
26+
"""Alias for compute_value_from_index for Datadog API compatibility."""
27+
return self.compute_value_from_index(key)
1328

1429
def compute_bucket_index(self, value: float) -> int:
15-
# ceil(log_gamma(value) = ceil(log(value) / log(gamma))
30+
"""Compute the bucket index for a given value.
31+
32+
ceil(log_gamma(value)) = ceil(log(value) / log(gamma))
33+
"""
1634
return math.ceil(math.log(value) * self.multiplier)
1735

1836
def compute_value_from_index(self, index: int) -> float:
19-
# Return geometric mean of bucket boundaries
20-
# This ensures the relative error is bounded by relative_accuracy
37+
"""Compute the representative value for a given bucket index.
38+
39+
Returns the geometric mean of bucket boundaries to ensure
40+
the relative error is bounded by relative_accuracy.
41+
"""
2142
return math.pow(self.gamma, index) * (2.0 / (1.0 + self.gamma))

0 commit comments

Comments
 (0)