Reading Data from a External Partitioned Hive Table in Scalding

scalding-hive

In our team we have many of our data, stored as a set of daily acquired folder accessed through Hive. In some of our scalding Job we had the necessity to process the content of these files.

Being the files on the file system it is possible to create a MultiSourceTap to join all the partition constituting the table but that will imply a certain amount of work.

We then tried to find a more practical solution to tbe problem. It turned out that the Cascading project contains a (very basic) Hive connector called cascading-hive. It allows to access Hive storage file and also to query HCatalog to retrieve all the files where a Hive table data is stored.

We decided to add a Scalding wrapper of this connector to allow us to use it in our jobs. The result is the HiveSource tap available in the scalding-taps project. As I mentioned before, the code of the Scalding tap and of the underlying Cascading-Hive adapter is quite basic but it has been proven very useful for my team.

To use it in your project, add the dependency to scalding-taps:

Add conjars to your repositories:

In maven:


  <repository>
    <id>conjars.org</id>
    <url>//conjars.org/repo</url>
  </repository>

and add the dependency to scalding-taps and cascading-hive:

In Maven:

 
 <dependency>
    <groupId>io.scalding</groupId>
    <artifactId>scalding-taps_2.10</artifactId>
    <version>0.6</version>
 </dependency>

 <dependency>
   <groupId>com.ebay</groupId>
   <artifactId>cascading-hive</artifactId>
   <version>0.0.2-SNAPSHOT</version>
   <exclusions>
   <!-- Need to exclude them and add them as runtime dependencies to avoid problems with overlapping configuration files in the three jars -->
     <exclusion>
       <groupId>org.datanucleus</groupId>
       <artifactId>datanucleus-api-jdo</artifactId>
     </exclusion>
     <exclusion>
       <groupId>org.datanucleus</groupId>
       <artifactId>datanucleus-core</artifactId>
     </exclusion>
     <exclusion>
       <groupId>org.datanucleus</groupId>
       <artifactId>datanucleus-rdbms</artifactId>
     </exclusion>
   </exclusions>
 </dependency>

The exclusions of the the datanucleus libraries is required to avoid problems if deploying the job as a single fat jar since the datanucleus libraries are trying to load the configuration from a file with the same name that would be then uncorrectly loaded (see here for more details).

An example of usage is:


 HiveSource("nowtv_box_user_dashboard", SinkMode.KEEP)

 

The withHCatScheme method will generate a HiveSource retrieving the data and fiel schema (format and fields) from HCatalogue. It is also possible to override the schema retrieved from HCatalogue (for example to provide your field names)


 HiveSource("nowtv_box_user_dashboard", SinkMode.KEEP)
   .withHCatScheme(osvInputScheme(fields = USER_SCHEMA))

 

There are methods already available in the io.scalding.taps.hive package object to generate OSV, CSV and TSV file formats.

At runtime you need to add the datanucleus library jars (datanucleus-api-jdo-3.2.2.jar, datanucleus-core-3.2.2.jar, datanucleus-rdbms-3.2.2.jar) and the hive-site.xml to the job libraries.

What we are currently doing is to package all those files under a ‘lib’ directory and then add it to the HADOOP_CLASSPATH environment variable of the scalding job.

This is the snippet of the pom.xml file we use to create the fat jar file and the tar.gz package


<plugin>
 <artifactId>maven-assembly-plugin</artifactId>
 <executions>
 <execution>
 <id>create-uber-jar</id>
 <phase>package</phase>
 <goals>
 <goal>single</goal>
 </goals>
 <configuration>
 <descriptorRefs>
 <descriptorRef>jar-with-dependencies</descriptorRef>
 </descriptorRefs>
 </configuration>
 </execution>
 <execution>
 <id>create-distrib</id>
 <phase>package</phase>
 <goals>
 <goal>single</goal>
 </goals>
 <inherited>false</inherited>
 <configuration>
 <descriptors>
 <descriptor>src/assembly/distribution.xml</descriptor>
 </descriptors>
 </configuration>
 </execution>
 </executions>
</plugin>
                    
                
            

This is the distribution.xml file we use to create the package


<assembly xmlns="//maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
 xmlns:xsi="//www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="//maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 //maven.apache.org/xsd/assembly-1.1.2.xsd">
 <id>entitlementIndex</id>
 <formats>
 <format>tar.gz</format>
 </formats>
 <fileSets>
 <fileSet>
 <directory>${project.build.directory}</directory>
 <!--<outputDirectory>/${project.artifactId}-${project.version}/</outputDirectory>-->
 <outputDirectory>/</outputDirectory>
 <includes>
 <include>*-with-dependencies.jar</include>
 </includes>
 </fileSet>
 <fileSet>
 <directory>${basedir}/src/main/scripts</directory>
 <!--<outputDirectory>/${project.artifactId}-${project.version}/</outputDirectory>-->
 <outputDirectory>/</outputDirectory>
 <includes>
 <include>*</include>
 </includes>
 <fileMode>0777</fileMode>
 </fileSet>
 <fileSet>
 <directory>runtime-lib</directory>
 <!--<outputDirectory>/${project.artifactId}-${project.version}/lib</outputDirectory>-->
 <outputDirectory>/lib</outputDirectory>
 <includes>
 <include>*.jar</include>
 <include>hive-site.xml</include>
 </includes>
 </fileSet>
 </fileSets>
</assembly>
        
    

 

3
Share

Trackbacks for this post

Leave a Reply

-->