Skip to content

Commit 8d7c1f6

Browse files
committed
feat: add DuckDB chunking file lock test and improve connection handling
1 parent 5b54b00 commit 8d7c1f6

4 files changed

Lines changed: 69 additions & 2 deletions

File tree

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Test DuckDB chunking file lock (issue #717)
2+
# When using chunk_size with DuckDB target, ClearTableForChunkLoadWithRange
3+
# must close its connection before chunk tasks open theirs, otherwise
4+
# DuckDB's exclusive file lock causes "Could not set lock on file" errors.
5+
6+
steps:
7+
# 1. Create source table in Postgres
8+
- id: setup
9+
connection: postgres
10+
query: |
11+
DROP TABLE IF EXISTS public.test_duckdb_chunk_lock;
12+
CREATE TABLE public.test_duckdb_chunk_lock AS
13+
SELECT generate_series AS id, 'name_' || generate_series AS name
14+
FROM generate_series(1, 1000);
15+
16+
- log: "Created source table with 1000 rows"
17+
18+
# 2. Run replication with chunking to DuckDB target
19+
- replication:
20+
source: postgres
21+
target: DUCKDB
22+
defaults:
23+
mode: full-refresh
24+
target_options:
25+
use_bulk: false
26+
env:
27+
SLING_THREADS: 1
28+
streams:
29+
public.test_duckdb_chunk_lock:
30+
object: main.test_duckdb_chunk_lock
31+
primary_key: [id]
32+
update_key: id
33+
source_options:
34+
chunk_size: 250
35+
on_failure: abort
36+
37+
- log: "Replication with chunking completed"
38+
39+
# 3. Verify all 1000 rows arrived
40+
- connection: DUCKDB
41+
query: SELECT count(*) as cnt FROM main.test_duckdb_chunk_lock
42+
into: result
43+
44+
- log: "DuckDB row count: {store.result[0].cnt}"
45+
46+
- check: int_parse(store.result[0].cnt) == 1000
47+
failure_message: "Expected 1000 rows, got {store.result[0].cnt}"
48+
49+
- log: "SUCCESS: DuckDB chunking with chunk_size works (issue #717)"
50+
51+
# 4. Cleanup
52+
- connection: DUCKDB
53+
query: DROP TABLE IF EXISTS main.test_duckdb_chunk_lock;
54+
55+
- connection: postgres
56+
query: DROP TABLE IF EXISTS public.test_duckdb_chunk_lock;

cmd/sling/tests/suite.cli.yaml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2103,4 +2103,14 @@
21032103
run: 'sling run -d -r cmd/sling/tests/replications/r.108.oracle_chunk_custom_sql.yaml'
21042104
streams: 2
21052105
output_contains:
2106-
- 'execution succeeded'
2106+
- 'execution succeeded'
2107+
2108+
# DuckDB file locking prevents concurrent chunk writes (issue #717)
2109+
# When using chunk_size with DuckDB target, multiple threads try to open the same
2110+
# .duckdb file simultaneously, causing "Could not set lock on file" errors.
2111+
- id: 219
2112+
name: 'DuckDB chunking file lock (issue #717)'
2113+
run: 'sling run -d -p cmd/sling/tests/pipelines/p.19.duckdb_chunk_lock.yaml'
2114+
group: duckdb
2115+
output_contains:
2116+
- 'SUCCESS: DuckDB chunking with chunk_size works (issue #717)'

core/sling/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ func (cfg *Config) ClearTableForChunkLoadWithRange() (err error) {
317317
if err != nil {
318318
return g.Error(err, "could not connect to target conn for preparing final table for chunk loading")
319319
}
320+
defer dbConn.Close()
320321

321322
switch cfg.Mode {
322323
case FullRefreshMode:

core/sling/replication.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -768,7 +768,7 @@ func (rd *ReplicationConfig) ProcessChunks() (err error) {
768768
if chunkExpr != "" {
769769
// no update_key needed for chunking by expression
770770
} else if stream.config.UpdateKey == "" {
771-
return g.Error(err, "did not provide update_key for stream chunking: %s", stream.name)
771+
return g.Error("did not provide update_key for stream chunking: %s", stream.name)
772772
} else if stream.config.Mode == IncrementalMode {
773773
// need to get the max value target side if the table exists
774774
var tempCfg Config

0 commit comments

Comments
 (0)