Skip to content

Commit 24f90cc

Browse files
committed
fix(risingwave): normalize timestamp handling and add GetTableColumns
1 parent 32c16f3 commit 24f90cc

8 files changed

Lines changed: 621 additions & 160 deletions

File tree

cmd/substreams-sink-sql/from_proto_generate_csv.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1248,7 +1248,13 @@ func (g *protoAwareCSVGenerator) walkMessageAndCollectRows(dm *dynamic.Message,
12481248
}
12491249
}
12501250
}
1251-
assignRowValue(rowValues, columnIndex, colName, fv)
1251+
converted, err := sql.NormalizeValue(fd, fv)
1252+
if err != nil {
1253+
g.releaseRowBuffer(tableName, rowValues)
1254+
releaseAccumulated()
1255+
return nil, fmt.Errorf("normalizing field %q: %w", fd.GetName(), err)
1256+
}
1257+
assignRowValue(rowValues, columnIndex, colName, converted)
12521258
}
12531259
}
12541260

cmd/substreams-sink-sql/from_proto_generate_csv_comprehensive_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/streamingfast/substreams-sink-sql/db_proto/sql/risingwave"
1919
"github.com/streamingfast/substreams-sink-sql/db_proto/sql/schema"
2020
"github.com/streamingfast/substreams-sink-sql/internal/timefmt"
21+
pbSchema "github.com/streamingfast/substreams-sink-sql/pb/sf/substreams/sink/sql/schema/v1"
2122
"github.com/stretchr/testify/assert"
2223
"github.com/stretchr/testify/require"
2324
"go.uber.org/zap"
@@ -598,6 +599,80 @@ func TestCSVRowFormattingRisingWaveTimestamp(t *testing.T) {
598599
assert.Equal(t, expected, csvString, "RisingWave CSV should use canonical timestamp layout")
599600
}
600601

602+
func TestCSVSemanticTimestampNormalization(t *testing.T) {
603+
field := &descriptor.FieldDescriptorProto{
604+
Name: proto.String("event_time"),
605+
Number: proto.Int32(1),
606+
Type: descriptor.FieldDescriptorProto_TYPE_STRING.Enum(),
607+
Options: &descriptor.FieldOptions{},
608+
}
609+
if err := proto.SetExtension(field.Options, pbSchema.E_Field, &pbSchema.Column{SemanticType: proto.String(string(sql.SemanticBlockTimestamp))}); err != nil {
610+
t.Fatalf("set field extension: %v", err)
611+
}
612+
613+
msgOpts := &descriptor.MessageOptions{}
614+
if err := proto.SetExtension(msgOpts, pbSchema.E_Table, &pbSchema.Table{Name: "events"}); err != nil {
615+
t.Fatalf("set table extension: %v", err)
616+
}
617+
618+
msgProto := &descriptor.DescriptorProto{
619+
Name: proto.String("Event"),
620+
Field: []*descriptor.FieldDescriptorProto{field},
621+
Options: msgOpts,
622+
}
623+
fileProto := &descriptor.FileDescriptorProto{
624+
Name: proto.String("semantic_event.proto"),
625+
Package: proto.String("test.semantic"),
626+
MessageType: []*descriptor.DescriptorProto{msgProto},
627+
}
628+
629+
fd, err := desc.CreateFileDescriptor(fileProto)
630+
require.NoError(t, err)
631+
md := fd.FindMessage("test.semantic.Event")
632+
require.NotNil(t, md)
633+
634+
zlog := zap.NewNop()
635+
sch, err := schema.NewSchema("test_schema", md, true, zlog)
636+
require.NoError(t, err)
637+
638+
dialect, err := risingwave.NewDialectRisingwave(sch.Name, sch.TableRegistry, zlog)
639+
require.NoError(t, err)
640+
641+
gen := &protoAwareCSVGenerator{
642+
schema: sch,
643+
dialect: dialect,
644+
logger: zlog,
645+
useProtoOptions: true,
646+
}
647+
648+
dm := dynamic.NewMessage(md)
649+
const rawValue = "2025-09-18T18:24:17.806811415Z"
650+
require.NoError(t, dm.TrySetFieldByName("event_time", rawValue))
651+
652+
rows, err := gen.walkMessageAndCollectRows(dm, 12345, time.Unix(0, 0), nil)
653+
require.NoError(t, err)
654+
require.Len(t, rows, 1)
655+
require.Len(t, rows[0].rows, 1)
656+
values := rows[0].rows[0]
657+
require.GreaterOrEqual(t, len(values), 3)
658+
659+
eventTime, ok := values[2].(time.Time)
660+
if !ok {
661+
t.Fatalf("expected time.Time, got %T", values[2])
662+
}
663+
wantTime, err := timefmt.ParseTimestamp(rawValue)
664+
require.NoError(t, err)
665+
if !eventTime.Equal(wantTime) {
666+
t.Fatalf("unexpected normalization: got %v want %v", eventTime, wantTime)
667+
}
668+
669+
table := sch.TableRegistry["events"]
670+
require.NotNil(t, table)
671+
formatted := string(gen.formatRowForCSV(values, table))
672+
expectedEvent := timefmt.FormatRisingWave(wantTime)
673+
assert.Contains(t, formatted, expectedEvent, "CSV output should contain normalized event_time")
674+
}
675+
601676
// TestNullHandling validates NULL value handling in CSV
602677
func TestNullHandling(t *testing.T) {
603678
gen := &protoAwareCSVGenerator{

db_changes/db/dialect_risingwave.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,21 @@ func (d RisingwaveDialect) ParseDatetimeNormalization(value string) string {
247247
return escapeStringValue(value)
248248
}
249249

250+
func (d RisingwaveDialect) GetTableColumns(db *sql.DB, schemaName, tableName string) ([]*sql.ColumnType, error) {
251+
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE 1=0",
252+
EscapeIdentifier(schemaName),
253+
EscapeIdentifier(tableName),
254+
)
255+
256+
rows, err := db.Query(query)
257+
if err != nil {
258+
return nil, fmt.Errorf("querying table structure: %w", err)
259+
}
260+
defer rows.Close()
261+
262+
return rows.ColumnTypes()
263+
}
264+
250265
func (d RisingwaveDialect) DriverSupportRowsAffected() bool {
251266
return true
252267
}

db_proto/sql/database.go

Lines changed: 59 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -105,23 +105,23 @@ func (d *BaseDatabase) WalkMessageDescriptorAndInsertWithDialect(dm *dynamic.Mes
105105
}
106106

107107
d.logger.Debug("Walking message descriptor", zap.String("message_descriptor_name", md.GetName()), zap.Any("table_info", tableInfo))
108-
// Keep the actual PK value handy so we don't rely on slice indexes later
109-
var primaryKeyValue any
110-
if tableInfo != nil {
111-
if table := dialect.GetTable(tableInfo.Name); table != nil {
112-
if table.PrimaryKey != nil {
113-
// Robust PK retrieval: always use the original protobuf field name
114-
// from the descriptor (not the possibly renamed SQL column name).
115-
pkFieldName := table.PrimaryKey.FieldDescriptor.GetName()
116-
pkValue := dm.GetFieldByName(pkFieldName)
117-
if pkValue == nil {
118-
return 0, fmt.Errorf("missing primary key field %q for table %q", pkFieldName, tableInfo.Name)
119-
}
120-
fieldValues = append(fieldValues, pkValue)
121-
primaryKeyValue = pkValue
122-
}
123-
}
124-
}
108+
// Keep the actual PK value handy so we don't rely on slice indexes later
109+
var primaryKeyValue any
110+
if tableInfo != nil {
111+
if table := dialect.GetTable(tableInfo.Name); table != nil {
112+
if table.PrimaryKey != nil {
113+
// Robust PK retrieval: always use the original protobuf field name
114+
// from the descriptor (not the possibly renamed SQL column name).
115+
pkFieldName := table.PrimaryKey.FieldDescriptor.GetName()
116+
pkValue := dm.GetFieldByName(pkFieldName)
117+
if pkValue == nil {
118+
return 0, fmt.Errorf("missing primary key field %q for table %q", pkFieldName, tableInfo.Name)
119+
}
120+
fieldValues = append(fieldValues, pkValue)
121+
primaryKeyValue = pkValue
122+
}
123+
}
124+
}
125125

126126
totalSqlDuration := time.Duration(0)
127127

@@ -131,17 +131,17 @@ func (d *BaseDatabase) WalkMessageDescriptorAndInsertWithDialect(dm *dynamic.Mes
131131

132132
var childs []*dynamic.Message
133133

134-
for _, fd := range dm.GetKnownFields() {
135-
// Skip the PK using descriptor equality, resilient to column renames
136-
if tableInfo != nil {
137-
if table := dialect.GetTable(tableInfo.Name); table != nil && table.PrimaryKey != nil {
138-
if fd == table.PrimaryKey.FieldDescriptor {
139-
continue
140-
}
141-
}
142-
}
143-
fv := dm.GetField(fd)
144-
if v, ok := fv.([]interface{}); ok {
134+
for _, fd := range dm.GetKnownFields() {
135+
// Skip the PK using descriptor equality, resilient to column renames
136+
if tableInfo != nil {
137+
if table := dialect.GetTable(tableInfo.Name); table != nil && table.PrimaryKey != nil {
138+
if fd == table.PrimaryKey.FieldDescriptor {
139+
continue
140+
}
141+
}
142+
}
143+
fv := dm.GetField(fd)
144+
if v, ok := fv.([]interface{}); ok {
145145
// Check if this is an array of messages or native values
146146
if len(v) > 0 {
147147
if _, ok := v[0].(*dynamic.Message); ok {
@@ -154,8 +154,16 @@ func (d *BaseDatabase) WalkMessageDescriptorAndInsertWithDialect(dm *dynamic.Mes
154154
childs = append(childs, fm)
155155
}
156156
} else {
157-
// Array of native values - add as a single field value (the array itself)
158-
fieldValues = append(fieldValues, fv)
157+
// Array of native values - normalize each element before storing
158+
normalized := make([]interface{}, len(v))
159+
for idx, elem := range v {
160+
converted, err := NormalizeValue(fd, elem)
161+
if err != nil {
162+
return 0, fmt.Errorf("normalizing repeated field %q: %w", fd.GetName(), err)
163+
}
164+
normalized[idx] = converted
165+
}
166+
fieldValues = append(fieldValues, normalized)
159167
}
160168
}
161169
} else if fm, ok := fv.(*dynamic.Message); ok {
@@ -164,7 +172,11 @@ func (d *BaseDatabase) WalkMessageDescriptorAndInsertWithDialect(dm *dynamic.Mes
164172
}
165173
childs = append(childs, fm) //need to be handled after current message inserted
166174
} else {
167-
fieldValues = append(fieldValues, fv)
175+
converted, err := NormalizeValue(fd, fv)
176+
if err != nil {
177+
return 0, fmt.Errorf("normalizing field %q: %w", fd.GetName(), err)
178+
}
179+
fieldValues = append(fieldValues, converted)
168180
}
169181
}
170182

@@ -179,22 +191,22 @@ func (d *BaseDatabase) WalkMessageDescriptorAndInsertWithDialect(dm *dynamic.Mes
179191
fmt.Println("field values:", fieldValues)
180192
return 0, fmt.Errorf("inserting into table %q: %w", table.Name, err)
181193
}
182-
if len(childs) > 0 && d.useProtoOptions {
183-
if table.PrimaryKey == nil {
184-
return 0, fmt.Errorf("table %q has no primary key and has %d associated children table", table.Name, len(childs))
185-
}
186-
// Use the actual primary key value we fetched above. Using the
187-
// descriptor index to compute its position within fieldValues is
188-
// incorrect because fieldValues does not mirror the proto field
189-
// ordering (we prepend block metadata, optionally add parent id,
190-
// and we skipped the PK when iterating fields). This previously
191-
// caused index out-of-range panics like: index 8 with length 6.
192-
id := primaryKeyValue
193-
p = &Parent{
194-
field: strings.ToLower(md.GetName()),
195-
id: id,
196-
}
197-
}
194+
if len(childs) > 0 && d.useProtoOptions {
195+
if table.PrimaryKey == nil {
196+
return 0, fmt.Errorf("table %q has no primary key and has %d associated children table", table.Name, len(childs))
197+
}
198+
// Use the actual primary key value we fetched above. Using the
199+
// descriptor index to compute its position within fieldValues is
200+
// incorrect because fieldValues does not mirror the proto field
201+
// ordering (we prepend block metadata, optionally add parent id,
202+
// and we skipped the PK when iterating fields). This previously
203+
// caused index out-of-range panics like: index 8 with length 6.
204+
id := primaryKeyValue
205+
p = &Parent{
206+
field: strings.ToLower(md.GetName()),
207+
id: id,
208+
}
209+
}
198210
totalSqlDuration += time.Since(insertStartAt)
199211
}
200212
}

0 commit comments

Comments
 (0)