uokadaの見逃し三振は嫌いです

ここで述べられていることは私の個人的な意見に基づくものであり、私が所属する組織には一切の関係はありません。

embulk-output-orcを作った

techplay.jp
togetter.com

6月に開催されたPresto meetupで登壇したときに夏休みに作るわ〜と言ってしまったので宣言通り作ってみた。

embulk-output-orc | RubyGems.org | your community gem host

現状、マルチスレッドでちゃんと動かないバグを回避するために一時的な実装を入れています。

https://github.com/yuokada/embulk-output-orc/blob/0.2.2/src/main/java/org/embulk/output/orc/OrcOutputPlugin.java
github.com

diff --git a/src/main/java/org/embulk/output/orc/OrcOutputPlugin.java b/src/main/java/org/embulk/output/orc/OrcOutputPlugin.java
index c0ef4d8..d9352d9 100644
--- a/src/main/java/org/embulk/output/orc/OrcOutputPlugin.java
+++ b/src/main/java/org/embulk/output/orc/OrcOutputPlugin.java
@@ -34,6 +34,7 @@ import org.embulk.util.aws.credentials.AwsCredentialsTask;
 import org.joda.time.DateTimeZone;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -265,6 +266,7 @@ public class OrcOutputPlugin
     {
         private final PageReader reader;
         private final Writer writer;
+        private final ArrayList<VectorizedRowBatch> rowBatches = new ArrayList<>();
 
         public OrcTransactionalPageOutput(PageReader reader, Writer writer, PluginTask task)
         {
@@ -288,11 +290,8 @@ public class OrcOutputPlugin
                 );
                 i++;
             }
-            try {
-                writer.addRowBatch(batch);
-            }
-            catch (IOException e) {
-                Throwables.propagate(e);
+            synchronized (this) {
+                rowBatches.add(batch);
             }
         }
 
@@ -300,6 +299,9 @@ public class OrcOutputPlugin
         public void finish()
         {
             try {
+                for (VectorizedRowBatch batch : rowBatches) {
+                    writer.addRowBatch(batch);
+                }
                 writer.close();
             }
             catch (IOException e) {

Javaをこれまで全く書いてこなかったのでマルチスレッド周りのバグ対応の知見がなくこんなパッチになっています。
そして、この実装を取り入れた副作用として大量のレコード(自分の環境だとデフォルトの設定にでだいたい2~300万レコード以上)を処理するときに、java.lang.OutOfMemoryError: GC Overhead limit exceeded が発生して処理が落ちます。

自分の用途だと今のところ検証目的で100万程度までの処理が多いのでこの実装で間に合っていますし、 v0.2.0の実装でシングルスレッドで使えばレコードが増えても問題なく動くので今のところこれに落ち着きました。

ただ、ちゃんとバグ修正はしたいなと思っているので年内にでもちゃんと時間取って対応して1千万レコードでも1億レコードでも問題なく処理できるようにしたいとなと考えてます。

orcのなかでスレッドセーフじゃないクラスがいくつかあり、#addRowBatchからこれらを呼んでいるからかなというステータスでその先は調査中です。

orcフォーマットを軽く試す分には問題なく使えるレベルにはなっているので是非使って見てください。