Skip to content

Commit a2aa4f2

Browse files
committed
Add random reads measurements to batch store bench
1 parent 0576718 commit a2aa4f2

File tree

1 file changed

+356
-0
lines changed

1 file changed

+356
-0
lines changed

bench/batch_store_cframes.py

Lines changed: 356 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,356 @@
1+
#######################################################################
2+
# Copyright (c) 2019-present, Blosc Development Team <blosc@blosc.org>
3+
# All rights reserved.
4+
#
5+
# SPDX-License-Identifier: BSD-3-Clause
6+
#######################################################################
7+
8+
from __future__ import annotations
9+
10+
import argparse
11+
import math
12+
import pathlib
13+
import random
14+
import sys
15+
import time
16+
from bisect import bisect_right
17+
18+
import blosc2
19+
from blosc2._msgpack_utils import msgpack_packb
20+
21+
22+
URLPATH = "bench_batch_store_cframes.b2b"
23+
DEFAULT_NFRAMES = 1_000
24+
DEFAULT_NELEMENTS = 1_000
25+
DEFAULT_NBATCHES = 1_000
26+
_DICT_CODECS = {blosc2.Codec.ZSTD, blosc2.Codec.LZ4, blosc2.Codec.LZ4HC}
27+
28+
29+
def build_parser() -> argparse.ArgumentParser:
30+
parser = argparse.ArgumentParser(
31+
description="Build or read an on-disk BatchStore containing batches of Blosc2 CFrames.",
32+
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
33+
)
34+
parser.add_argument("--urlpath", type=str, default=None, help="Path to the BatchStore file.")
35+
parser.add_argument(
36+
"--nframes-per-batch", type=int, default=DEFAULT_NFRAMES, help="Number of CFrames stored in each batch."
37+
)
38+
parser.add_argument(
39+
"--nelements-per-frame",
40+
type=int,
41+
default=DEFAULT_NELEMENTS,
42+
help="Number of array elements stored in each frame.",
43+
)
44+
parser.add_argument("--nbatches", type=int, default=DEFAULT_NBATCHES, help="Number of batches to append.")
45+
parser.add_argument(
46+
"--nframes-per-block",
47+
type=int,
48+
default=None,
49+
help="Maximum number of frames per internal block. Default is automatic inference.",
50+
)
51+
parser.add_argument("--codec", type=str, default="ZSTD", choices=[codec.name for codec in blosc2.Codec])
52+
parser.add_argument("--clevel", type=int, default=5)
53+
parser.add_argument("--seed", type=int, default=None, help="Optional RNG seed for reproducible random reads.")
54+
parser.add_argument(
55+
"--random-read",
56+
type=int,
57+
default=1,
58+
help="Read N random serialized CFrames and report timing. When passed explicitly, reads an existing store.",
59+
)
60+
parser.add_argument(
61+
"--random-read-cframe",
62+
type=int,
63+
default=0,
64+
help=(
65+
"Read N random single frames by fetching a stored CFrame and deserializing it "
66+
"with blosc2.ndarray_from_cframe(). Requires --urlpath."
67+
),
68+
)
69+
parser.add_argument(
70+
"--random-read-element",
71+
type=int,
72+
default=0,
73+
help=(
74+
"Read N random single elements by fetching a random frame, unpacking its CFrame, "
75+
"and indexing a random element. Requires --urlpath."
76+
),
77+
)
78+
parser.add_argument(
79+
"--use-dict",
80+
action="store_true",
81+
help="Enable dictionaries for codecs that support them (ZSTD, LZ4, LZ4HC).",
82+
)
83+
return parser
84+
85+
86+
def make_batch(nframes: int, frame: bytes) -> list[bytes]:
87+
return [frame] * nframes
88+
89+
90+
def format_size(nbytes: int) -> str:
91+
units = ("B", "KiB", "MiB", "GiB", "TiB")
92+
size = float(nbytes)
93+
unit = units[0]
94+
for candidate in units:
95+
unit = candidate
96+
if size < 1024 or candidate == units[-1]:
97+
break
98+
size /= 1024
99+
if unit == "B":
100+
return f"{nbytes} bytes ({nbytes} {unit})"
101+
return f"{nbytes} bytes ({size:.2f} {unit})"
102+
103+
104+
def format_count(value: int) -> str:
105+
return f"{value:_} ({value:.2e}, 2**{math.log2(value):.3f})"
106+
107+
108+
def print_store_counts(store: blosc2.BatchStore) -> None:
109+
total_frames = sum(len(batch) for batch in store)
110+
print(f" total frames: {format_count(total_frames)}")
111+
if total_frames == 0:
112+
print(" total elements: 0")
113+
return
114+
115+
first_frame = store[0][0]
116+
array = blosc2.ndarray_from_cframe(first_frame)
117+
nelements_per_frame = math.prod(array.shape)
118+
total_elements = total_frames * nelements_per_frame
119+
print(f" nelements per frame: {nelements_per_frame}")
120+
print(f" total elements: {format_count(total_elements)}")
121+
122+
123+
def sample_random_reads(store: blosc2.BatchStore, nreads: int, rng: random.Random) -> list[tuple[int, int, int, int]]:
124+
batch_lengths = [len(batch) for batch in store]
125+
total_frames = sum(batch_lengths)
126+
if total_frames == 0:
127+
return []
128+
129+
prefix = [0]
130+
for length in batch_lengths:
131+
prefix.append(prefix[-1] + length)
132+
133+
sample_size = min(nreads, total_frames)
134+
flat_indices = rng.sample(range(total_frames), sample_size)
135+
results: list[tuple[int, int, int, int]] = []
136+
137+
for flat_index in flat_indices:
138+
batch_index = bisect_right(prefix, flat_index) - 1
139+
frame_index = flat_index - prefix[batch_index]
140+
t0 = time.perf_counter_ns()
141+
frame = store[batch_index][frame_index]
142+
elapsed_ns = time.perf_counter_ns() - t0
143+
results.append((batch_index, frame_index, len(frame), elapsed_ns))
144+
145+
return results
146+
147+
148+
def print_random_read_stats(store: blosc2.BatchStore, nreads: int, rng: random.Random) -> None:
149+
samples = sample_random_reads(store, nreads, rng)
150+
if not samples:
151+
print("random scalar reads: store is empty")
152+
return
153+
154+
timings_ns = [elapsed_ns for _, _, _, elapsed_ns in samples]
155+
print(f"random scalar reads: {len(samples)}")
156+
print(f" mean: {sum(timings_ns) / len(timings_ns) / 1_000:.2f} us")
157+
print(f" min: {min(timings_ns) / 1_000:.2f} us")
158+
print(f" max: {max(timings_ns) / 1_000:.2f} us")
159+
batch_index, frame_index, frame_len, elapsed_ns = samples[0]
160+
print(
161+
f" first sample: store[{batch_index}][{frame_index}] -> {frame_len} bytes "
162+
f"in {elapsed_ns / 1_000:.2f} us"
163+
)
164+
165+
166+
def sample_random_cframe_reads(
167+
store: blosc2.BatchStore, nreads: int, rng: random.Random
168+
) -> list[tuple[int, int, tuple[int, ...], int]]:
169+
batch_lengths = [len(batch) for batch in store]
170+
total_frames = sum(batch_lengths)
171+
if total_frames == 0:
172+
return []
173+
174+
prefix = [0]
175+
for length in batch_lengths:
176+
prefix.append(prefix[-1] + length)
177+
178+
sample_size = min(nreads, total_frames)
179+
flat_indices = rng.sample(range(total_frames), sample_size)
180+
results: list[tuple[int, int, tuple[int, ...], int]] = []
181+
182+
for flat_index in flat_indices:
183+
batch_index = bisect_right(prefix, flat_index) - 1
184+
frame_index = flat_index - prefix[batch_index]
185+
t0 = time.perf_counter_ns()
186+
frame = store[batch_index][frame_index]
187+
array = blosc2.ndarray_from_cframe(frame)
188+
elapsed_ns = time.perf_counter_ns() - t0
189+
results.append((batch_index, frame_index, array.shape, elapsed_ns))
190+
191+
return results
192+
193+
194+
def print_random_cframe_read_stats(store: blosc2.BatchStore, nreads: int, rng: random.Random) -> None:
195+
samples = sample_random_cframe_reads(store, nreads, rng)
196+
if not samples:
197+
print("random cframe reads: store is empty")
198+
return
199+
200+
timings_ns = [elapsed_ns for _, _, _, elapsed_ns in samples]
201+
print(f"random cframe reads: {len(samples)}")
202+
print(f" mean: {sum(timings_ns) / len(timings_ns) / 1_000:.2f} us")
203+
print(f" min: {min(timings_ns) / 1_000:.2f} us")
204+
print(f" max: {max(timings_ns) / 1_000:.2f} us")
205+
batch_index, frame_index, shape, elapsed_ns = samples[0]
206+
print(f" first sample: store[{batch_index}][{frame_index}] -> shape={shape} in {elapsed_ns / 1_000:.2f} us")
207+
208+
209+
def sample_random_element_reads(
210+
store: blosc2.BatchStore, nreads: int, rng: random.Random
211+
) -> list[tuple[int, int, int, int | float | bool, int]]:
212+
batch_lengths = [len(batch) for batch in store]
213+
total_frames = sum(batch_lengths)
214+
if total_frames == 0:
215+
return []
216+
217+
prefix = [0]
218+
for length in batch_lengths:
219+
prefix.append(prefix[-1] + length)
220+
221+
samples: list[tuple[int, int, int, int | float | bool, int]] = []
222+
for _ in range(nreads):
223+
flat_index = rng.randrange(total_frames)
224+
batch_index = bisect_right(prefix, flat_index) - 1
225+
frame_index = flat_index - prefix[batch_index]
226+
t0 = time.perf_counter_ns()
227+
frame = store[batch_index][frame_index]
228+
array = blosc2.ndarray_from_cframe(frame)
229+
element_index = rng.randrange(array.shape[0])
230+
value = array[element_index].item()
231+
elapsed_ns = time.perf_counter_ns() - t0
232+
samples.append((batch_index, frame_index, element_index, value, elapsed_ns))
233+
return samples
234+
235+
236+
def print_random_element_read_stats(store: blosc2.BatchStore, nreads: int, rng: random.Random) -> None:
237+
samples = sample_random_element_reads(store, nreads, rng)
238+
if not samples:
239+
print("random element reads: store is empty")
240+
return
241+
242+
timings_ns = [elapsed_ns for *_, elapsed_ns in samples]
243+
print(f"random element reads: {len(samples)}")
244+
print(f" mean: {sum(timings_ns) / len(timings_ns) / 1_000:.2f} us")
245+
print(f" min: {min(timings_ns) / 1_000:.2f} us")
246+
print(f" max: {max(timings_ns) / 1_000:.2f} us")
247+
batch_index, frame_index, element_index, value, elapsed_ns = samples[0]
248+
print(
249+
f" first sample: store[{batch_index}][{frame_index}][{element_index}] -> {value!r} "
250+
f"in {elapsed_ns / 1_000:.2f} us"
251+
)
252+
253+
254+
def main() -> None:
255+
parser = build_parser()
256+
args = parser.parse_args()
257+
random_read_requested = any(arg == "--random-read" or arg.startswith("--random-read=") for arg in sys.argv[1:])
258+
259+
if args.nframes_per_batch <= 0:
260+
parser.error("--nframes-per-batch must be > 0")
261+
if args.nelements_per_frame <= 0:
262+
parser.error("--nelements-per-frame must be > 0")
263+
if args.nbatches <= 0:
264+
parser.error("--nbatches must be > 0")
265+
if args.nframes_per_block is not None and args.nframes_per_block <= 0:
266+
parser.error("--nframes-per-block must be > 0")
267+
if args.random_read <= 0:
268+
parser.error("--random-read must be > 0")
269+
if args.random_read_cframe < 0:
270+
parser.error("--random-read-cframe must be >= 0")
271+
if args.random_read_element < 0:
272+
parser.error("--random-read-element must be >= 0")
273+
if not 0 <= args.clevel <= 9:
274+
parser.error("--clevel must be between 0 and 9")
275+
if (random_read_requested or args.random_read_cframe > 0 or args.random_read_element > 0) and args.urlpath is None:
276+
parser.error("--random-read, --random-read-cframe and --random-read-element require --urlpath")
277+
278+
codec = blosc2.Codec[args.codec]
279+
use_dict = args.use_dict and codec in _DICT_CODECS
280+
total_frames = args.nframes_per_batch * args.nbatches
281+
total_elements = total_frames * args.nelements_per_frame
282+
rng = random.Random(args.seed)
283+
284+
if args.use_dict and not use_dict:
285+
print(f"Codec {codec.name} does not support use_dict; disabling it.")
286+
287+
if random_read_requested or args.random_read_cframe > 0 or args.random_read_element > 0:
288+
store = blosc2.open(args.urlpath, mode="r")
289+
if not isinstance(store, blosc2.BatchStore):
290+
raise TypeError(f"{args.urlpath!r} is not a BatchStore")
291+
print("Reading on-disk BatchStore with CFrame payloads")
292+
print(f" urlpath: {args.urlpath}")
293+
print(f" seed: {args.seed}")
294+
print_store_counts(store)
295+
print()
296+
print(store.info)
297+
print()
298+
if random_read_requested:
299+
print_random_read_stats(store, args.random_read, rng)
300+
if args.random_read_cframe > 0:
301+
if random_read_requested:
302+
print()
303+
print_random_cframe_read_stats(store, args.random_read_cframe, rng)
304+
if args.random_read_element > 0:
305+
if random_read_requested or args.random_read_cframe > 0:
306+
print()
307+
print_random_element_read_stats(store, args.random_read_element, rng)
308+
return
309+
310+
cparams = blosc2.CParams(codec=codec, clevel=args.clevel, use_dict=use_dict)
311+
312+
urlpath = args.urlpath or URLPATH
313+
blosc2.remove_urlpath(urlpath)
314+
source = blosc2.full(args.nelements_per_frame, 3)
315+
frame = source.to_cframe()
316+
msgpack_frame = msgpack_packb(frame)
317+
318+
print("Building on-disk BatchStore with CFrame payloads")
319+
print(f" urlpath: {urlpath}")
320+
print(f" nbatches: {args.nbatches}")
321+
print(f" nframes per batch: {args.nframes_per_batch}")
322+
print(f" nelements per frame: {args.nelements_per_frame}")
323+
print(f" nframes per block: {args.nframes_per_block}")
324+
print(f" total frames: {format_count(total_frames)}")
325+
print(f" total elements: {format_count(total_elements)}")
326+
print(f" cframe bytes per frame: {len(frame)}")
327+
print(f" msgpack bytes per frame: {len(msgpack_frame)}")
328+
print(f" codec: {codec.name}")
329+
print(f" clevel: {args.clevel}")
330+
print(f" use_dict: {use_dict}")
331+
print(f" seed: {args.seed}")
332+
333+
with blosc2.BatchStore(
334+
storage=blosc2.Storage(urlpath=urlpath, mode="w", contiguous=True),
335+
cparams=cparams,
336+
items_per_block=args.nframes_per_block,
337+
) as store:
338+
batch = make_batch(args.nframes_per_batch, frame)
339+
for _ in range(args.nbatches):
340+
store.append(batch)
341+
print()
342+
print(store.info)
343+
uncompressed_nbytes = store.nbytes
344+
345+
size_nbytes = pathlib.Path(urlpath).stat().st_size
346+
print(f"store file size: {format_size(size_nbytes)}")
347+
print(
348+
f"average compressed bytes per frame: {size_nbytes / total_frames:.2f} "
349+
f"({uncompressed_nbytes / total_frames:.2f} uncompressed)"
350+
)
351+
print()
352+
print_random_read_stats(blosc2.open(urlpath, mode="r"), args.random_read, rng)
353+
354+
355+
if __name__ == "__main__":
356+
main()

0 commit comments

Comments
 (0)