Performing MongoDB Lookups in Pentaho Data Integration

In a recent project, one of the requirements was to load detailed data for a large amount of IDs from MongoDB.
Although the MongoDB input step is great for reading vast amounts of data from a MongoDB collection, there is no easy way to do lookups in MongoDB by passing in stream fields as query parameters. Since it is possible to pass query parameters (variables) to the MongoDB query, such a transformation could be run from a job in a loop. However, this means that transformation has to be executed for every single ID that we need details for. This may work for small data volumes, but it is not a scalable solution if you need to perform lots of lookups.
Luckily, PDI offers the User Defined Java Class (UDJC) step, which allows Java code to be compiled and executed in runtime in a transformation. Even better: before the MongoDB-input step appeared in PDI, Matt Casters wrote a blog post about how to read data from a MongoDB collection. The code in his post served as a starting point to read data from a parameterized query:

<code> import com.mongodb.Mongo;  <br> import com.mongodb.DB;  <br> import com.mongodb.DBCollection;  <br> import com.mongodb.DBObject;  <br> import com.mongodb.ReadPreference;  <br> import com.mongodb.QueryBuilder;  <br><br> private Mongo m;  <br> private DB db;  <br> private DBCollection coll;  <br> String  inputField;  <br> String  jsonField;  <br><br> public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException  <br> &#123;  <br>   Object[] r = getRow();  <br>   if (r == null) &#123;  <br>      setOutputDone();  <br>      return false;  <br>   &#125;  <br>   if (first) &#123;  <br>      inputField = getParameter("PRM_INPUT");  <br>      jsonField = getParameter("PRM_JSON");  <br>      first=false;  <br>   &#125;  <br>   Object[] row = createOutputRow(r, data.outputRowMeta.size());  <br>   Long details = get(Fields.In, inputField).getInteger(r);  <br>   String json = "";  <br>   try&#123;  <br>      DBObject obj = coll.findOne(QueryBuilder.start("ORDERNUMBER").is(details).get());  <br>      json = obj.toString();  <br>      get(Fields.Out, jsonField).setValue(row, json);  <br>      putRow(data.outputRowMeta, row);  <br>   &#125;catch(java.lang.Exception e)&#123;  <br>      putError(data.outputRowMeta, row, 1, "No data found for input row", "json", "unknown order number");  <br>   &#125;  <br>   return true;  <br> &#125;  <br> public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface)  <br> &#123;  <br>   try &#123;  <br>      m = new Mongo("localhost", 27017);  <br>      db = m.getDB( "SalesData" );  <br>      //db.setReadPreference(ReadPreference.secondary());  <br>      //db.setReadPreference(ReadPreference.secondaryPreferred());  <br>      //db.authenticate("foo", "bar".toCharArray());  <br>      coll = db.getCollection("Sales");  <br>      return parent.initImpl(stepMetaInterface, stepDataInterface);  <br>   &#125; catch(Exception e) &#123;  <br>      logError("Error connecting to MongoDB: ", e);  <br>      return false;  <br>   &#125;  <br> &#125;  <br>

Get the example transformations here. 'tr_write_sales_data_to_mongodb' reads data from the 'sales_data.csv' file in your PDI samples directory and loads it into your MongoDB database (installation instructions for Ubuntu can be found here). 'tr_read_sales_details_from_mongodb' fetches the order details from MongoDB for every order number in your BA server's hypersonic SampleData database. You'll need to add the MongoDB jar to the libext folder of your PDI installation to be able to run the second transformation.
Since this functionality may come in handy in a lot of cases, expect a full-blown PDI MongoDB lookup plugin in the near future.