6月に開催されたPresto meetupで登壇したときに夏休みに作るわ〜と言ってしまったので宣言通り作ってみた。
embulk-output-orc | RubyGems.org | your community gem host
現状、マルチスレッドでちゃんと動かないバグを回避するために一時的な実装を入れています。
https://github.com/yuokada/embulk-output-orc/blob/0.2.2/src/main/java/org/embulk/output/orc/OrcOutputPlugin.java
github.com
diff --git a/src/main/java/org/embulk/output/orc/OrcOutputPlugin.java b/src/main/java/org/embulk/output/orc/OrcOutputPlugin.java index c0ef4d8..d9352d9 100644 --- a/src/main/java/org/embulk/output/orc/OrcOutputPlugin.java +++ b/src/main/java/org/embulk/output/orc/OrcOutputPlugin.java @@ -34,6 +34,7 @@ import org.embulk.util.aws.credentials.AwsCredentialsTask; import org.joda.time.DateTimeZone; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -265,6 +266,7 @@ public class OrcOutputPlugin { private final PageReader reader; private final Writer writer; + private final ArrayList<VectorizedRowBatch> rowBatches = new ArrayList<>(); public OrcTransactionalPageOutput(PageReader reader, Writer writer, PluginTask task) { @@ -288,11 +290,8 @@ public class OrcOutputPlugin ); i++; } - try { - writer.addRowBatch(batch); - } - catch (IOException e) { - Throwables.propagate(e); + synchronized (this) { + rowBatches.add(batch); } } @@ -300,6 +299,9 @@ public class OrcOutputPlugin public void finish() { try { + for (VectorizedRowBatch batch : rowBatches) { + writer.addRowBatch(batch); + } writer.close(); } catch (IOException e) {
Javaをこれまで全く書いてこなかったのでマルチスレッド周りのバグ対応の知見がなくこんなパッチになっています。
そして、この実装を取り入れた副作用として大量のレコード(自分の環境だとデフォルトの設定にでだいたい2~300万レコード以上)を処理するときに、java.lang.OutOfMemoryError: GC Overhead limit exceeded
が発生して処理が落ちます。
自分の用途だと今のところ検証目的で100万程度までの処理が多いのでこの実装で間に合っていますし、
v0.2.0
の実装でシングルスレッドで使えばレコードが増えても問題なく動くので今のところこれに落ち着きました。
ただ、ちゃんとバグ修正はしたいなと思っているので年内にでもちゃんと時間取って対応して1千万レコードでも1億レコードでも問題なく処理できるようにしたいとなと考えてます。
orcのなかでスレッドセーフじゃないクラスがいくつかあり、#addRowBatchからこれらを呼んでいるからかなというステータスでその先は調査中です。
orcフォーマットを軽く試す分には問題なく使えるレベルにはなっているので是非使って見てください。