Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,24 @@
beam.Row(recipe='pie', fruit='blueberry', quantity=1, unit_price=2.00),
beam.Row(recipe='muffin', fruit='blueberry', quantity=2, unit_price=2.00),
beam.Row(recipe='muffin', fruit='banana', quantity=3, unit_price=1.00),
beam.Row(recipe='pie', fruit='strawberry', quantity=3, unit_price=1.50),
]
# [END groupby_table]


def groupby_attr(test=None):
with beam.Pipeline() as p:
# [START groupby_attr]
grouped = (
p
| beam.Create(GROCERY_LIST)
| beam.GroupBy('recipe')
| beam.Map(print))
)
# [END groupby_attr]

if test:
test(grouped)
if test:
test(grouped)
else:
grouped | beam.Map(print)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we've fixed this, we should

Comment thread
damccorm marked this conversation as resolved.
Outdated


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,23 @@
beam.Row(recipe='pie', fruit='blueberry', quantity=1, unit_price=2.00),
beam.Row(recipe='muffin', fruit='blueberry', quantity=2, unit_price=2.00),
beam.Row(recipe='muffin', fruit='banana', quantity=3, unit_price=1.00),
beam.Row(recipe='pie', fruit='strawberry', quantity=3, unit_price=1.50),
]
# [END groupby_table]


def groupby_attr_expr(test=None):
with beam.Pipeline() as p:
# [START groupby_attr_expr]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not remove these attributes

grouped = (
p
| beam.Create(GROCERY_LIST)
| beam.GroupBy('recipe', is_berry=lambda x: 'berry' in x.fruit)
| beam.Map(print))
# [END groupby_attr_expr]
)

if test:
test(grouped)
if test:
test(grouped)

grouped | beam.Map(print)


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@

import apache_beam as beam


def groupby_expr(test=None):
with beam.Pipeline() as p:
# [START groupby_expr]
grouped = (
p
| beam.Create(
['strawberry', 'raspberry', 'blueberry', 'blackberry', 'banana'])
| beam.GroupBy(lambda s: s[0])
| beam.Map(print))
| beam.GroupBy(lambda s: s[0]))
# [END groupby_expr]
if test:
test(grouped)
else:
grouped | beam.Map(print)


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,26 @@
beam.Row(recipe='pie', fruit='blueberry', quantity=1, unit_price=2.00),
beam.Row(recipe='muffin', fruit='blueberry', quantity=2, unit_price=2.00),
beam.Row(recipe='muffin', fruit='banana', quantity=3, unit_price=1.00),
beam.Row(recipe='pie', fruit='strawberry', quantity=3, unit_price=1.50),
]
# [END groupby_table]


def expr_aggregate(test=None):
with beam.Pipeline() as p:
# [START expr_aggregate]
grouped = (
p
| beam.Create(GROCERY_LIST)
| beam.GroupBy('recipe').aggregate_field(
'quantity', sum, 'total_quantity').aggregate_field(
lambda x: x.quantity * x.unit_price, sum, 'price')
| beam.Map(print))
| beam.GroupBy('recipe')
.aggregate_field('quantity', sum, 'total_quantity')
.aggregate_field(lambda x: x.quantity * x.unit_price, sum, 'price')
)
# [END expr_aggregate]

if test:
test(grouped)
if test:
test(grouped)
else:
grouped | beam.Map(print)


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@
beam.Row(recipe='pie', fruit='blueberry', quantity=1, unit_price=2.00),
beam.Row(recipe='muffin', fruit='blueberry', quantity=2, unit_price=2.00),
beam.Row(recipe='muffin', fruit='banana', quantity=3, unit_price=1.00),
beam.Row(recipe='pie', fruit='strawberry', quantity=3, unit_price=1.50),
]
# [END groupby_table]


def global_aggregate(test=None):
with beam.Pipeline() as p:
# [START global_aggregate]
Expand All @@ -57,11 +57,13 @@ def global_aggregate(test=None):
| beam.GroupBy().aggregate_field(
'unit_price', min, 'min_price').aggregate_field(
'unit_price', MeanCombineFn(), 'mean_price').aggregate_field(
'unit_price', max, 'max_price')
| beam.Map(print))
'unit_price', max, 'max_price'))
# [END global_aggregate]
if test:
test(grouped)
else:
grouped | beam.Map(print)



if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,49 @@

import apache_beam as beam

# [START groupby_table]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not remove this either

GROCERY_LIST = [
beam.Row(recipe='pie', fruit='raspberry', quantity=1, unit_price=3.50),
beam.Row(recipe='pie', fruit='blackberry', quantity=1, unit_price=4.00),
beam.Row(recipe='pie', fruit='blueberry', quantity=1, unit_price=2.00),
beam.Row(recipe='muffin', fruit='blueberry', quantity=2, unit_price=2.00),
beam.Row(recipe='muffin', fruit='banana', quantity=3, unit_price=1.00),
beam.Row(recipe='pie', fruit='strawberry', quantity=3, unit_price=1.50),
]
# [END groupby_table]


def simple_aggregate(test=None):
def to_grocery_row(x):
# If it's already a Beam Row / schema object, keep it
if hasattr(x, 'recipe') and hasattr(x, 'fruit') and hasattr(
x, 'quantity') and hasattr(x, 'unit_price'):
return beam.Row(
recipe=x.recipe,
fruit=x.fruit,
quantity=x.quantity,
unit_price=x.unit_price)

# If dict
if isinstance(x, dict):
return beam.Row(
recipe=x['recipe'],
fruit=x['fruit'],
quantity=x['quantity'],
unit_price=x['unit_price'],
)

# If tuple/list (recipe, fruit, quantity, unit_price)
return beam.Row(recipe=x[0], fruit=x[1], quantity=x[2], unit_price=x[3])

with beam.Pipeline() as p:
# [START simple_aggregate]
grouped = (
p
| beam.Create(GROCERY_LIST)
| 'ToGroceryRows' >> beam.Map(to_grocery_row)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above, this is redundant, I don't think we need it

| beam.GroupBy('fruit').aggregate_field(
'quantity', sum, 'total_quantity')
| beam.Map(print))
'quantity', sum, 'total_quantity'))
# [END simple_aggregate]

if test:
test(grouped)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,17 @@

import apache_beam as beam


def groupby_two_exprs(test=None):
with beam.Pipeline() as p:
# [START groupby_two_exprs]
grouped = (
p
| beam.Create(
['strawberry', 'raspberry', 'blueberry', 'blackberry', 'banana'])
| beam.Create(['strawberry', 'raspberry', 'blueberry', 'blackberry', 'banana'])
| beam.GroupBy(letter=lambda s: s[0], is_berry=lambda s: 'berry' in s)
| beam.Map(print))
# [END groupby_two_exprs]

if test:
test(grouped)
)

if test:
test(grouped)

if __name__ == '__main__':
groupby_two_exprs()

Loading