Skip to content

Commit 6216d69

Browse files
docs: Example pipeline with apache flink runner (#24)
1 parent 81e51ae commit 6216d69

3 files changed

Lines changed: 67 additions & 3 deletions

File tree

example/langchain-beam-example/pom.xml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,24 @@
1717
<dependency>
1818
<groupId>org.apache.beam</groupId>
1919
<artifactId>beam-runners-direct-java</artifactId>
20-
<version>2.60.0</version>
20+
<version>2.61.0</version>
2121
</dependency>
2222
<dependency>
2323
<groupId>org.apache.beam</groupId>
2424
<artifactId>beam-sdks-java-core</artifactId>
25-
<version>2.60.0</version>
25+
<version>2.61.0</version>
2626
</dependency>
2727
<!-- https://mvnrepository.com/artifact/io.github.ganeshsivakumar/langchain-beam -->
2828
<dependency>
2929
<groupId>io.github.ganeshsivakumar</groupId>
3030
<artifactId>langchain-beam</artifactId>
3131
<version>0.2.0</version>
3232
</dependency>
33+
<dependency>
34+
<groupId>org.apache.beam</groupId>
35+
<artifactId>beam-runners-flink-1.18</artifactId>
36+
<version>2.61.0</version>
37+
</dependency>
3338
</dependencies>
3439
<build>
3540
<resources>
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.langchainbeam.example;
2+
3+
import org.apache.beam.runners.flink.FlinkPipelineOptions;
4+
import org.apache.beam.runners.flink.FlinkRunner;
5+
import org.apache.beam.sdk.Pipeline;
6+
import org.apache.beam.sdk.io.TextIO;
7+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
8+
import org.apache.beam.sdk.transforms.DoFn;
9+
import org.apache.beam.sdk.transforms.ParDo;
10+
11+
import com.langchainbeam.LangchainBeam;
12+
import com.langchainbeam.LangchainModelHandler;
13+
import com.langchainbeam.model.LangchainBeamOutput;
14+
import com.langchainbeam.model.openai.OpenAiModelOptions;
15+
16+
// Run sentiment analysis pipeline on Apache Flink using flink runner
17+
public class ApacheFlinkExample {
18+
19+
public static void main(String[] args) {
20+
String prompt = "Categorize the product review as Positive or Negative.";
21+
22+
String apiKey = System.getenv("OPENAI_API_KEY");
23+
24+
// Create model options with the model and its parameters
25+
OpenAiModelOptions modelOptions = OpenAiModelOptions.builder()
26+
.modelName("gpt-4o-mini")
27+
.apiKey(apiKey)
28+
.build();
29+
30+
// Create Apache flink pipeline options
31+
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
32+
options.setJobName("product-review-job");
33+
options.setRunner(FlinkRunner.class);
34+
35+
// Set Flink-specific options
36+
options.setParallelism(2);
37+
options.setMaxParallelism(16);
38+
39+
// create beam pipeline
40+
Pipeline p = Pipeline.create(options);
41+
42+
// create model handler
43+
LangchainModelHandler handler = new LangchainModelHandler(modelOptions, prompt);
44+
45+
p.apply(TextIO.read().from("/home/ganesh/Downloads/product_reviews.csv"))// load data
46+
.apply(LangchainBeam.run(handler)) // apply the LangchainBeam transform.
47+
.apply(ParDo.of(new DoFn<LangchainBeamOutput, Void>() {
48+
49+
@ProcessElement
50+
public void processElement(@Element LangchainBeamOutput out) {
51+
System.out
52+
.println("Model Output: " + out.getOutput() + "Input Element " + out.getInputElement());
53+
}
54+
}));
55+
56+
p.run();
57+
}
58+
}

example/langchain-beam-example/src/main/java/com/langchainbeam/example/SentimentAnalysis.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ static void identifySentiment() {
5252

5353
@ProcessElement
5454
public void processElement(@Element LangchainBeamOutput out) {
55-
System.out.println("Model Output: " + out.getOutput());
55+
System.out.println(
56+
"Model Output: " + out.getOutput() + "Product review: " + out.getInputElement());
5657
}
5758
}));
5859

0 commit comments

Comments
 (0)