Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3c09615
feat(segment_membership): Daily Snowflake-backed per-env segment counts
khvn26 May 8, 2026
48afbd0
fix(segment_membership): Bump segment-list query counts for membershi…
khvn26 May 8, 2026
1145359
ci(segment_membership): Pin flagsmith-sql-flag-engine to CodeArtifact…
khvn26 May 9, 2026
4775bf6
ci(segment_membership): Authenticate Poetry installs against CodeArti…
khvn26 May 9, 2026
0176170
fix(segment_membership): Derive IDENTITIES.id from UUID bytes, not MD5
khvn26 May 9, 2026
a394246
feat(segment_membership): Observability for backfill and refresh
khvn26 May 9, 2026
b02f3d8
chore(segment_membership): Local smoke-test harness
khvn26 May 10, 2026
cf4c71a
Revert "chore(segment_membership): Local smoke-test harness"
khvn26 May 10, 2026
b0d4cf2
test(segment_membership): Tighten mappers + migration tests
khvn26 May 10, 2026
15d2c65
refactor(segment_membership): Migrate PoC from Snowflake to ClickHouse
khvn26 May 13, 2026
6e1584b
fix(segment_membership): Make IDENTITIES.id fit UInt64 and FROM claus…
khvn26 May 13, 2026
9957bd8
Merge remote-tracking branch 'origin/main' into feat/segment-membersh…
khvn26 May 15, 2026
280df5e
chore(api): Bump flagsmith-common to 3.9.1
khvn26 May 15, 2026
efbb257
chore(segment_membership): Wire flagsmith-sql-flag-engine via CodeArt…
khvn26 May 18, 2026
cdf7cb8
chore(segment_membership): TODOs at private-dep CodeArtifact wiring
khvn26 May 18, 2026
e03e3ce
feat(segment_membership): Accept CLICKHOUSE_URL DSN
khvn26 May 18, 2026
e22e241
refactor(segment_membership): Promote is_clickhouse_configured to a s…
khvn26 May 18, 2026
da03928
docs(segment_membership): Trim verbose comments and docstrings
khvn26 May 18, 2026
6d892f8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 18, 2026
fab9692
docs(segment_membership): Refresh events catalogue line numbers
khvn26 May 18, 2026
fe11760
Merge branch 'main' into feat/segment-membership-counts
khvn26 May 19, 2026
2f63f6d
docs(CodeArtifact): Reference flagsmith-sql-flag-engine#6 from TODOs
khvn26 May 19, 2026
0e3b5ca
refactor(segment_membership): Rename model to SegmentMembershipCount
khvn26 May 19, 2026
40555d3
refactor(segment_membership): Route ClickHouse via Django's DATABASES
khvn26 May 19, 2026
7ff1bb3
refactor(segment_membership): Key IDENTITIES by (environment_id, iden…
khvn26 May 19, 2026
88057f2
refactor(segment_membership): Drop redundant DISTINCT, stream project…
khvn26 May 19, 2026
8c07763
refactor(segment_membership): Dispatch refresh per project as backfil…
khvn26 May 19, 2026
4d6c9eb
docs(segment_membership): Frame refresh timeout as a slot-reclaim bac…
khvn26 May 19, 2026
d3dd988
test(segment_membership): Hardcode URL paths instead of reverse()
khvn26 May 19, 2026
aa92c73
test(segment_membership): Trim verbose GWT comments
khvn26 May 19, 2026
f60e516
test(segment_membership): Hardcode last_synced_at literal in field test
khvn26 May 19, 2026
8e6981c
refactor(segment_membership): Tighten refresh timeout from 30m to 10m
khvn26 May 19, 2026
1a48ea9
chore(deps): Switch flagsmith-sql-flag-engine to public PyPI
khvn26 May 20, 2026
83587ea
add `CLICKHOUSE_URL`
khvn26 May 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/update-flagsmith-environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ defaults:
run:
working-directory: api

permissions:
contents: read

jobs:
update_server_defaults:
runs-on: depot-ubuntu-latest
Expand Down
36 changes: 36 additions & 0 deletions api/app/routers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from django.db.models import Model

AnalyticsDatabaseName = Literal["analytics"]
ClickHouseDatabaseName = Literal["clickhouse"]


class AnalyticsRouter:
Expand Down Expand Up @@ -38,3 +39,38 @@ def allow_migrate(self, db: str, app_label: str, **hints: Any) -> bool | None:
if db == "analytics":
return app_label in self.route_app_labels
return None


class ClickHouseRouter:
route_app_labels = ["clickhouse"]

def db_for_read(
self, model: type[Model], **hints: Any
) -> ClickHouseDatabaseName | None:
if model._meta.app_label in self.route_app_labels:
return "clickhouse"
return None

def db_for_write(
self, model: type[Model], **hints: Any
) -> ClickHouseDatabaseName | None:
if model._meta.app_label in self.route_app_labels:
return "clickhouse"
return None

def allow_relation(self, obj1: Model, obj2: Model, **hints: Any) -> bool | None:
# ClickHouse has no FKs and we don't expose CH-app models, so any
# relation involving this app is forbidden.
if (
obj1._meta.app_label in self.route_app_labels
or obj2._meta.app_label in self.route_app_labels
):
return False
return None

def allow_migrate(self, db: str, app_label: str, **hints: Any) -> bool | None:
if db == "clickhouse":
return app_label in self.route_app_labels
if app_label in self.route_app_labels:
return False
return None
38 changes: 38 additions & 0 deletions api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@
"features.workflows.core",
"features.release_pipelines.core",
"segments",
"segment_membership",
"clickhouse",
"app",
"e2etests",
"simple_history",
Expand Down Expand Up @@ -1439,3 +1441,39 @@
PYLON_IDENTITY_VERIFICATION_SECRET = env.str("PYLON_IDENTITY_VERIFICATION_SECRET", None)

OSIC_UPDATE_BATCH_SIZE = env.int("OSIC_UPDATE_BATCH_SIZE", default=500)

# ClickHouse backs the segment_membership backfill and refresh tasks. Set
# CLICKHOUSE_URL (DSN form) or any CLICKHOUSE_HOST + discrete fields to enable.
# Discrete settings override the matching field parsed from the URL.
CLICKHOUSE_URL = env.str("CLICKHOUSE_URL", default=None)
CLICKHOUSE_HOST = env.str("CLICKHOUSE_HOST", default=None)
CLICKHOUSE_PORT = env.int("CLICKHOUSE_PORT", default=None)
CLICKHOUSE_USER = env.str("CLICKHOUSE_USER", default=None)
CLICKHOUSE_PASSWORD = env.str("CLICKHOUSE_PASSWORD", default=None)
CLICKHOUSE_DATABASE = env.str("CLICKHOUSE_DATABASE", default=None)
CLICKHOUSE_SECURE = env.bool("CLICKHOUSE_SECURE", default=None)

CLICKHOUSE_ENABLED = bool(CLICKHOUSE_URL or CLICKHOUSE_HOST)

# Always installed: the router fences the `clickhouse` app's migrations off
# the default Postgres database whether or not a CH alias is configured.
DATABASE_ROUTERS.append("app.routers.ClickHouseRouter")

if CLICKHOUSE_ENABLED:
_clickhouse_db: dict[str, Any] = {
"ENGINE": "clickhouse_backend.backend",
"HOST": CLICKHOUSE_HOST,
"PORT": CLICKHOUSE_PORT,
"USER": CLICKHOUSE_USER,
"PASSWORD": CLICKHOUSE_PASSWORD,
"NAME": CLICKHOUSE_DATABASE,
"OPTIONS": {
"dsn": CLICKHOUSE_URL,
"secure": CLICKHOUSE_SECURE,
"settings": {
# ClickHouse Cloud 25.12 requires this for `JSON`-column DDL.
"allow_experimental_json_type": 1,
},
},
}
DATABASES["clickhouse"] = _clickhouse_db # type: ignore[assignment]
Empty file added api/clickhouse/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions api/clickhouse/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class ClickHouseConfig(AppConfig):
name = "clickhouse"
label = "clickhouse"
28 changes: 28 additions & 0 deletions api/clickhouse/migrations/0001_create_identities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from django.db import migrations


_SCHEMA_DDL = """\
CREATE TABLE IF NOT EXISTS IDENTITIES (
environment_id String,
-- (environment_id, identifier) is the natural unique key in Flagsmith's
-- identity model — dedupes ReplacingMergeTree without a synthetic id.
identifier String,
identity_key String,
-- Stored per top-level key as typed subcolumns; SQL NULL for empty traits.
traits JSON,
-- ReplacingMergeTree version column; most-recent insert wins per PK.
inserted_at DateTime DEFAULT now()
)
ENGINE = ReplacingMergeTree(inserted_at)
ORDER BY (environment_id, identifier)
"""


class Migration(migrations.Migration):
# ClickHouse has no transactional DDL.
atomic = False
initial = True

operations = [
migrations.RunSQL(_SCHEMA_DDL, reverse_sql="DROP TABLE IF EXISTS IDENTITIES"),
]
Empty file.
Empty file added api/clickhouse/models.py
Empty file.
8 changes: 7 additions & 1 deletion api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ dependencies = [
"drf-writable-nested>=0.6.2,<0.7.0",
"django-filter>=2.4.0,<2.5.0",
"flagsmith-flag-engine>=10.1.0,<11.0.0",
"flagsmith-sql-flag-engine>=0.1.0,<0.2.0",
"django-clickhouse-backend>=1.4,<2.0",
"boto3>=1.35.95,<1.36.0",
"slack-sdk>=3.9.0,<3.10.0",
"asgiref>=3.8.1,<3.9.0",
Expand Down Expand Up @@ -71,7 +73,7 @@ dependencies = [
"hubspot-api-client>=12.0.0,<13.0.0",
"djangorestframework-dataclasses>=1.3.1,<2.0.0",
"pyotp>=2.9.0,<3.0.0",
"flagsmith-common[common-core,flagsmith-schemas,task-processor]>=3.9.0,<4",
"flagsmith-common[common-core,flagsmith-schemas,task-processor]>=3.9.1,<4",
"django-stubs>=5.1.3,<6.0.0",
"tzdata>=2024.1,<2025.0.0",
"djangorestframework-simplejwt>=5.5.1,<6.0.0",
Expand Down Expand Up @@ -541,6 +543,10 @@ ignore_missing_imports = true
module = ["openfeature_flagsmith.*"]
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = ["clickhouse_backend.*", "clickhouse_driver.*"]
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = ["scim.*"]
ignore_missing_imports = true
Expand Down
12 changes: 12 additions & 0 deletions api/scripts/run-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ run_task_processor() {
if [ -n "$TASK_PROCESSOR_DATABASE_URL" ] || [ -n "$TASK_PROCESSOR_DATABASE_NAME" ]; then
waitfordb --waitfor 30 --migrations --database task_processor
fi
if [ -n "$CLICKHOUSE_URL" ] || [ -n "$CLICKHOUSE_HOST" ]; then
waitfordb --waitfor 30 --migrations --database clickhouse
fi
exec flagsmith start \
--bind 0.0.0.0:8000 \
--access-logfile $ACCESS_LOG_LOCATION \
Expand All @@ -68,6 +71,12 @@ migrate_task_processor_db(){
fi
python manage.py migrate --database task_processor
}
migrate_clickhouse_db(){
if [ -z "$CLICKHOUSE_URL" ] && [ -z "$CLICKHOUSE_HOST" ]; then
return 0
fi
python manage.py migrate --database clickhouse
}
bootstrap(){
python manage.py bootstrap
}
Expand All @@ -81,17 +90,20 @@ if [ "$1" = "migrate" ]; then
migrate
migrate_analytics_db
migrate_task_processor_db
migrate_clickhouse_db
elif [ "$1" = "serve" ]; then
serve
elif [ "$1" = "run-task-processor" ]; then
migrate
migrate_analytics_db
migrate_task_processor_db
migrate_clickhouse_db
run_task_processor
elif [ "$1" = "migrate-and-serve" ]; then
migrate
migrate_analytics_db
migrate_task_processor_db
migrate_clickhouse_db
bootstrap
serve
else
Expand Down
Empty file.
6 changes: 6 additions & 0 deletions api/segment_membership/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from core.apps import BaseAppConfig


class SegmentMembershipConfig(BaseAppConfig):
name = "segment_membership"
default = True
43 changes: 43 additions & 0 deletions api/segment_membership/mappers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from decimal import Decimal

from flagsmith_schemas import dynamodb

# (environment_id, identifier, identity_key, traits)
ClickHouseIdentityRow = tuple[str, str, str, dict[str, object] | None]


def map_identity_document_to_clickhouse_row(
env_key: str,
identity_doc: dynamodb.Identity,
) -> ClickHouseIdentityRow:
"""Project a Dynamo identity document onto an IDENTITIES row tuple
`(environment_id, identifier, identity_key, traits)`."""
identifier = identity_doc["identifier"]
composite_key = identity_doc["composite_key"]
raw_traits = identity_doc.get("identity_traits")
traits = _flatten_traits(raw_traits) if raw_traits else None
return (
env_key,
identifier,
composite_key,
traits,
)


def _coerce_trait_value(value: object) -> object:
# boto3 hands us `Decimal` for numbers; narrow so the JSON column
# stores a typed numeric subcolumn instead of failing to serialise.
if isinstance(value, Decimal):
if value == value.to_integral_value():
return int(value)
return float(value)
return value


def _flatten_traits(
identity_traits: list[dynamodb.Trait],
) -> dict[str, object]:
return {
t["trait_key"]: _coerce_trait_value(t.get("trait_value"))
for t in identity_traits
}
24 changes: 24 additions & 0 deletions api/segment_membership/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import prometheus_client

# Metrics are global. Per-project / per-env drill-down lives in CH's
# `system.query_log` (via `log_comment`) and in the structlog events.

flagsmith_segment_membership_backfill_identities_total = prometheus_client.Counter(
"flagsmith_segment_membership_backfill_identities_total",
"Total identities mirrored from Dynamo to ClickHouse by the segment-membership backfill task across all environments.",
)

flagsmith_segment_membership_backfill_duration_seconds = prometheus_client.Histogram(
"flagsmith_segment_membership_backfill_duration_seconds",
"Duration of a segment-membership backfill for one environment.",
)

flagsmith_segment_membership_refresh_duration_seconds = prometheus_client.Histogram(
"flagsmith_segment_membership_refresh_duration_seconds",
"Duration of a single segment-membership count-refresh run for one project.",
)

flagsmith_segment_membership_refresh_failures_total = prometheus_client.Counter(
"flagsmith_segment_membership_refresh_failures_total",
"Total segment-membership refresh runs that failed for any reason.",
)
57 changes: 57 additions & 0 deletions api/segment_membership/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Generated by Django 5.2.13 on 2026-05-08 22:03

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

initial = True

dependencies = [
("environments", "0037_add_uuid_field"),
("segments", "0030_add_default_to_segment_version"),
]

operations = [
migrations.CreateModel(
name="SegmentMembershipCount",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("count", models.PositiveIntegerField()),
("last_synced_at", models.DateTimeField()),
(
"environment",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="+",
to="environments.environment",
),
),
(
"segment",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="membership_counts",
to="segments.segment",
),
),
],
options={
"constraints": [
models.UniqueConstraint(
fields=("segment", "environment"),
name="segment_membership_count_unique_segment_environment",
)
],
},
),
]
Empty file.
29 changes: 29 additions & 0 deletions api/segment_membership/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from django.db import models

from environments.models import Environment
from segments.models import Segment


class SegmentMembershipCount(models.Model):
"""Cached identity-match count for one (segment, environment) pair."""

segment = models.ForeignKey(
Segment,
on_delete=models.CASCADE,
related_name="membership_counts",
)
environment = models.ForeignKey(
Environment,
on_delete=models.CASCADE,
related_name="+",
)
count = models.PositiveIntegerField()
last_synced_at = models.DateTimeField()

class Meta:
constraints = [
models.UniqueConstraint(
fields=["segment", "environment"],
name="segment_membership_count_unique_segment_environment",
),
]
Loading
Loading