संपादित करें 2018-01-27:
यह पता चला है कि यह समस्या DirectRunner से संबंधित है। यदि आप DataflowRunner का उपयोग करके समान पाइपलाइन चलाते हैं, तो आपको ऐसे बैच प्राप्त करने चाहिए जो वास्तव में 1,000 रिकॉर्ड तक हों। DirectRunner हमेशा ग्रुपिंग ऑपरेशन के बाद आकार 1 के बंडल बनाता है।
मूल उत्तर:
अपाचे बीम के जेडीबीसीआईओ का उपयोग कर क्लाउड डेटाबेस में लिखते समय मैंने एक ही समस्या में भाग लिया है। समस्या यह है कि जबकि जेडीबीसीआईओ एक बैच में 1,000 रिकॉर्ड तक लिखने का समर्थन करता है, मैंने वास्तव में इसे एक समय में 1 से अधिक पंक्ति लिखने में कभी नहीं देखा है (मुझे स्वीकार करना होगा:यह हमेशा विकास वातावरण में डायरेक्टरनर का उपयोग कर रहा था)।
इसलिए मैंने जेडीबीसीआईओ में एक फीचर जोड़ा है जहां आप अपने डेटा को एक साथ समूहीकृत करके और प्रत्येक समूह को एक बैच के रूप में लिखकर बैचों के आकार को स्वयं नियंत्रित कर सकते हैं। नीचे अपाचे बीम के मूल वर्डकाउंट उदाहरण के आधार पर इस सुविधा का उपयोग करने का एक उदाहरण दिया गया है।
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
// Count words in input file(s)
.apply(new CountWords())
// Format as text
.apply(MapElements.via(new FormatAsTextFn()))
// Make key-value pairs with the first letter as the key
.apply(ParDo.of(new FirstLetterAsKey()))
// Group the words by first letter
.apply(GroupByKey.<String, String> create())
// Get a PCollection of only the values, discarding the keys
.apply(ParDo.of(new GetValues()))
// Write the words to the database
.apply(JdbcIO.<String> writeIterable()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
.withStatement(INSERT_OR_UPDATE_SQL)
.withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
JdbcIO की सामान्य लेखन-विधि के साथ अंतर नई विधि writeIterable()
है जो एक PCollection<Iterable<RowT>>
. लेता है PCollection<RowT>
. के बजाय इनपुट के रूप में . प्रत्येक Iterable को डेटाबेस में एक बैच के रूप में लिखा जाता है।
इस जोड़ के साथ JdbcIO का संस्करण यहां पाया जा सकता है:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java
ऊपर दिए गए उदाहरण वाला पूरा उदाहरण प्रोजेक्ट यहां पाया जा सकता है:https://github.com/ ओलाव्लोइट/स्पैनर-बीम-उदाहरण
(इसे परियोजना में शामिल करने के लिए अपाचे बीम पर एक पुल अनुरोध भी लंबित है)