Opass: Analysis and Optimization of Parallel Data Access on Distributed File Systems

In this paper, we study parallel data access on distributed file systems, e.g, the Hadoop file system. Our experiments show that parallel data read requests often access data remotely and in an imbalanced fashion. This results in a serious disk access and data transfer contention on certain cluster/storage nodes. We conduct a complete analysis on how the remote and imbalanced read patterns occur and how they are affected by the size of the cluster. Then, we propose a novel method to Optimize Parallel Data Access on Distributed File Systems referred to as Opass. Opass maps the data read requests that are issued by parallel applications to cluster nodes to a graph data structure where edge weights encode the demands of data locality and load capacity. To achieve the maximum degree of data locality and balanced access, we propose new matching-based algorithms to match processes to data based on the configurations of the graph data structure. Our proposed method could benefit parallel dataintensive analysis with different parallel data access strategies. Experiments are conducted on PRObEs Marmot 128-node cluster testbed and the results from both benchmark and wellknown parallel applications show the performance benefits and scalability of Opass.

Machine Learning

Resources

Course

Text Books

Real World

Benchmarks

Dataset

Climate Data

Spark

Usage

Spark Code Analysis

Arch

Main Classes

Scheduler

Others

Spark源码分析之二

Spark运行模式

Spark源码走读

Spark Code Analysis - TaskScheduler

The DAGScheduler group tasks in a single stage as a TaskSet. Then the TaskSet is submitted to TaskSchedulerImpl, one implementation of TaskScheduler. [TaskScheduler] manages multiple TaskSet, and TaskSetManager manages tasks within a TaskSet. The final task actions are done by SchedulerBackend.

Spark Code Analysis - DAGScheduler

DAG, a directed acyclic graph, is a directed graph with no directed cycles. In spark, the DAGScheduler manages the user submitted jobs, and the stages of the jobs, and the tasks of each stages. The jobs are submitted as a final RDD, which has its dependent tree. The stage is the transformation from a parrent RDD to the child RDD. The task is the operation on each partition in a RDD transformation. DAGScheduler finally submit the missing tasks to the TaskScheduler based on the task dependent tree. In DAGScheduler, the process function is run as a background thread. The background thread answering the user request and the executer event in function onReceive (at the bottom of the file).

In Spark, two types of dependency are defined (in nsdi 2012).

We found it both sufficient and useful to classify dependencies into two types: narrow dependencies, where each partition of the parent RDD is used by at most one partition of the child RDD, wide dependencies, where multiple child partitions may depend on it.

This distinction is useful for two reasons. First, narrow dependencies allow for pipelined execution on one cluster node, which can compute all the parent partitions. For example, one can apply a map followed by a filter on an element-by-element basis. In contrast, wide dependencies require data from all parent partitions to be available and to be shuffled across the nodes using a MapReduce-like operation. Second, recovery after a node failure is more efficient with a narrow dependency, as only the lost parent partitions need to be recomputed, and they can be recomputed in parallel on different nodes. In contrast, in a lineage graph with wide dependencies, a single failed node might cause the loss of some partition from all the ancestors of an RDD, requiring a complete re-execution.

A brief summarize of function calling diagram of DAGScheduler is here:

Spark DAGScheduler

The handleJobSubmitted and handleTaskCompletion are the most important event processed by DAGScheduler. After all the dependency calculation (and failure handling) the DAGScheduler submit the missing tasks to TaskScheduler by function submitMissingTasks.

Spark Code Analysis - SparkContext

SparkContext

SparkContext is the main entry point of spark. It contains the interface of hdfs/tachyon etc.

Spark Code Analysis - SparkEnv

SparkEnv is used to holds all the runtime environment objects for a running Spark instance (either master or worker). It’s complete api includes here.

SparkEnv

The ExecutorEnv is created by CoarseGrainedExecutorBackend (in standalone mode).

The DriverEnv is created by SparkContext.

Spark Code Analysis - The Worker

The Spark has 3 main components:

  • The Master manages Workers.
  • The Worker watting for the command from Master.
  • The SparkSubmit provide the main gateway of launching a Spark application.

In the Spark Standalong Mode, the master and worker communicate through akka. The component of a Spark Worker looks like this:

Spark Worker

In the begining, the worker register with master:

15/02/10 12:29:57 INFO Worker: Registered signal handlers for [TERM, HUP, INT]
15/02/10 12:29:57 WARN Utils: Your hostname, zhou resolves to a loopback address: 127.0.1.1; using 10.173.214.158 instead (on interface eth0)
15/02/10 12:29:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/02/10 12:29:57 INFO SecurityManager: Changing view acls to: jan
15/02/10 12:29:57 INFO SecurityManager: Changing modify acls to: jan
15/02/10 12:29:57 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jan); users with modify permissions: Set(jan)
15/02/10 12:29:58 INFO Slf4jLogger: Slf4jLogger started
15/02/10 12:29:58 INFO Remoting: Starting remoting
15/02/10 12:29:58 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkWorker@10.173.214.158:55939]
15/02/10 12:29:58 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkWorker@10.173.214.158:55939]
15/02/10 12:29:58 INFO Utils: Successfully started service 'sparkWorker' on port 55939.
15/02/10 12:29:58 INFO Worker: Starting Spark worker 10.173.214.158:55939 with 8 cores, 14.7 GB RAM
15/02/10 12:29:58 INFO Worker: Running Spark version 1.3.0-SNAPSHOT
15/02/10 12:29:58 INFO Worker: Spark home: /home/jan/projects/spark
15/02/10 12:29:58 INFO Utils: Successfully started service 'WorkerUI' on port 8081.
15/02/10 12:29:58 INFO WorkerWebUI: Started WorkerWebUI at http://10.173.214.158:8081
15/02/10 12:29:58 INFO Worker: Connecting to master akka.tcp://sparkMaster@zhou:7077/user/Master...
15/02/10 12:29:58 INFO Worker: Successfully registered with master spark://zhou:7077

When a job launchs, the worker launch the executer and the coresponding driver1:

15/02/10 12:30:05 INFO ExecutorRunner: Launch command: "java" "-cp" ":/home/jan/projects/spark/sbin/../conf:/home/jan/projects/spark/assembly/target/scala-2.10/spark-assembly-1.3.0-SNAPSHOT-hadoop1.0.4.jar" "-XX:MaxPermSize=128m" "-Dspark.driver.port=35922" "-Xms512M" "-Xmx512M" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "akka.tcp://sparkDriver@10.173.214.158:35922/user/CoarseGrainedScheduler" "0" "10.173.214.158" "8" "app-20150210123005-0000" "akka.tcp://sparkWorker@10.173.214.158:55939/user/Worker"

The complete communication between worker and master are located in function receiveWithLogging.

CommandUtils.scala is the common code used by ‘DriverRunner’ and ‘ExecutorRunniner’, including transfer the Driver/Executor jar(s) and start up the processes. Each Driver/Executor run as an independent process.


  1. It seems that the driver runner did nothing. I do not know what is this driver used for. There are two parts of driver: 1) the master part start from SparkContent; 2) the worker part start from nothing.

    [return]

First look at Spark

Spark, started at UC Berkeley AMPLab in 2009, is a fast and general cluster computing system for Big Data.

Build From Source

Spark source contains a shell build/mvn which automatically download compatible version of scala and mvn and compile the source code1.

git clone https://github.com/apache/spark
cd spark
build/mvn -DskipTests clean package

Launch Master & Slaves

The spark config files are located in conf. To launch the master & slaves:

sbin/start-all.sh

Running the Examples

To run examples provided in Scala:

bin/run-example SparkPi 10

This run-examples shell runs the example provided with spark by calling spark-submit.

Spark Submit Jobs

To submit jobs provided in Python:

bin/spark-submit examples/src/main/python/pi.py 10

To submit jobs provided in Java/Scala:

bin/spark-submit --class org.apache.spark.examples.SparkPi examples/target/spark-examples_*.jar 10

For more information about spark-submit see this.

Code Structer

The Spark has 3 main components:

  • The Master manages Workers.
  • The Worker watting for the command from Master.
  • The SparkSubmit provide the main gateway of launching a Spark application.

Each components has their main function. The communication between Master and Worker is provided by Akka. Each example also has a main function, this is where the example code started. So, there are totally 4 main functions Take SparkPi as an example:

package org.apache.spark.examples

import scala.math.random

import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }
}
  • Above code first created a ParallelCollectionRDD , which is used to process slice, by using parallelize.
  • Then the code created a new MapPartitionsRDD. In MapPartitionsRDD, the previous ParallelCollectionRDD and the map function are stored but not “computed”.
  • Only in the reduce function, the map jobs and reduce jobs are scheduled.

More Examples About RDD can be find here


  1. In compiling the source code, the “clean” is recommended. If “clean” is not provided, you will make an incremental compiling. In doing this, you are likely to encounter compile error or function not found error in running. [return]

An Efficient Page-level FTL to Optimize Address Translation in Flash Memory

Flash-based solid state disks (SSDs) have been very popular in consumer and enterprise storage markets due to their high performance, low energy, shock resistance, and small sizes. However, the increasing SSD capacity imposes great pressure on performing efficient logical to physical address translation in a page-level flash translation layer (FTL). Existing schemes usually employ an on-board RAM cache for storing mapping information, called mapping cache, to speed up address translation. Since only a fraction of the mapping table can be cached at a time due to limited cache space, a large number of extra operations in flash memory are required for cache management and garbage collection, degrading performance and lifetime. In this paper, we first apply analytic models to investigate the key factors that incur extra operations. Then, we propose an efficient page-level FTL, named as TPFTL, which employs two-level LRU lists to organize cached mapping entries to minimize extra operations. Inspired by the models, we then design a workload-adaptive loading policy combined with an efficient replacement policy to increase the cache hit ratio while reduce writebacks of replaced dirty entries. Finally, we evaluate TPFTL using extensive trace-driven simulations. Our evaluation results show that compared to the state-of-the-art FTLs, TPFTL reduces random writes caused by address translation by an average of 62% and improves response time by up to 24%.

PERP: Attacking the balance among energy, performance and recovery in storage systems

Most recently, an important metric called “energy proportional” is presented as a guideline for energy efficiency systems (Barroso and Hölzle, 2007), which advocates that energy consumption should be in proportion to system performance/utilization. However, this tradeoff metric is only defined for normal mode where the system is functioning normally without node failures. When node failure occurs, the system enters degradation mode during which node reconstruction is initiated. This very process needs to wake/spin up a number of disks and takes a substantial amount of I/O bandwidth, which will not only compromise energy efficiency but also performance. Moreover, as in replication-based storage such as Google File System (Sanjay Ghemawat, Gobioff, 2003 [10]) and Hadoop Distributed File System (Borthakur, 2007), systems are adopting a recovery policy that defines a deadline for recovery rather than simply recovering the data as soon as possible. Given the flexibility of the recovery time, this makes it possible to reduce energy consumption with respect to the performance and recovery requirements. This raises a natural problem: how to balance the performance, energy, and recovery in degradation mode for an energy efficient storage system? Without considering the I/O bandwidth contention between recovery and performance, we find that the current energy proportional solutions cannot answer this question accurately. This paper presents a mathematical model named Perfect Energy, Recovery and Performance (PERP) which provides guidelines for provisioning the number of active nodes as well as the assigned recovery bandwidth at each time slot with respect to the performance and recovery constraints. To utilize PERP in storage systems, we take data layouts into consideration and propose a node selection algorithm named “Gradual Increase/decrease” Algorithm (GIA) to select the active nodes based on PERP results. We apply PERP and GIA to current popular power proportional layouts and test their effectiveness on a 25 nodes in-house CASS cluster. Experimental results validate that while meeting both performance and recovery constraints, PERP helps realize 25% power savings comparing with maximum recovery policy from Sierra (Thereska et al., 2011)and 20% power savings comparing with recovery group policy from Rabbit (Amur et al., 2010).

[BIB]

How to compile a Hadoop Program

Before compiling your first hadoop program, please see the instructions on how to run the WordCount Example.

You can get the wordcount example code from Github (Make sure you get the compatible version):

wget https://github.com/apache/hadoop-common/raw/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordCount.java

Optionally you can change package org.apache.hadoop.examples; to package org.janzhou;.

Set the HADOOP_CLASSPATH:

export HADOOP_CLASSPATH=$(bin/hadoop classpath)

Compile:

javac -classpath ${HADOOP_CLASSPATH} -d WordCount/ WordCount.java

Create JAR:

jar -cvf WordCount.jar -C WordCount/ .

Run:

bin/hadoop jar WordCount.jar org.janzhou.wordcount /wordcount/input /wordcount/output

Using sun.tools.javac.Main

You normally invoke javac.exe from the command line, but you can also invoke it from within a Java program. Use the sun.tools.javac.Main class located in ${JAVA_HOME}/lib/tools.jar to pass it an array of Strings equivalent to the command line parameters.

Look the MapReduce Tutorial.

Set environment variables:

export HADOOP_CLASSPATH=$JAVA_HOME/lib/tools.jar

Compile WordCount.java and create a jar:

bin/hadoop com.sun.tools.javac.Main -d WordCount/ WordCount.java 
jar -cvf WordCount.jar -C WordCount/ .

Makefile

It is also nice to have a Makefile that do this automatically for you.

Here is a simple example:

HADOOP = ${HOME}/hadoop-2.5.1/bin/hadoop
 
APP = WordCount
SRC = src/*.java 
OUT = out
  
$(APP): $(SRC) 
	mkdir -p $(OUT) 
	javac -classpath `$(HADOOP) classpath` -d $(OUT) $(SRC) 
	jar -cvf $(APP).jar -C $(OUT) .

clean: 
	rm -rf $(OUT) *.jar .

You can find more comprehensive examples from: https://github.com/janzhou/hadoop-example

How to install and run Hadoop (the Troubleshooting Version)

http://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml http://hadoop.apache.org/docs/r2.5.1/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml http://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html http://stackoverflow.com/questions/17975144/only-one-datanode-can-run-in-a-multinode-hadoop-setup http://wiki.apache.org/hadoop/ConnectionRefused http://blog.cloudera.com/blog/2009/08/hadoop-default-ports-quick-reference/ http://stackoverflow.com/questions/20171455/java-net-connectexception-connection-refused-error-when-running-hive

How to run Hadoop WordCount.java Map-Reduce Program

Hadoop comes with a set of demonstration programs. They are located in here. One of them is WordCount.java which will automatically compute the word frequency of all text files found in the HDFS directory you ask it to process. Follow the Hadoop Tutorial to run the example.

Creating a working directory for your data:

bin/hdfs dfs -mkdir /wordcount

Copy Data Files to HDFS:

bin/hdfs dfs -copyFromLocal /path/to/your/data /wordcount/input

Running WordCount:

bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.1.jar wordcount /wordcount/input /wordcount/output

View the Results:

bin/hdfs dfs -cat /wordcount/output/part-r-00000

Download the Results:

bin/hdfs dfs -copyToLocal /wordcount/output/part-r-00000 .

TOP Conference Programs for Computer Architectures

How to Write a Research Paper

Research is hard. In doing a research, you should start from finding a good research topic that truly interests you. However, finding a good research topic is out of the scope of this paper. In this paper, I mainly focus on writing.

Writing skills is essential in producing a good quantity paper. The writing skills used in a paper should depends on the specific topic and solution the paper is telling. However, all the paper in computer engineering is expected to following the general style.

Title

The title of a paper should be carefully chooses, and directly telling what you are doing in this paper.

Abstract

The abstract should be one ‘small’ paragraph summary of your paper. However, a good abstract should including all the important informations the paper is going to tell.

  1. Backgrounds.
  2. Limitations of state-of-art solutions..
  3. Your proposal.
  4. The expected outcome.
  5. Why your solution is going to work?
  6. Summary of the experimental results.

Introduction

The introductions of a paper should not exceed two pages and include:

  1. What is the problem? What is the background/root of the problem?
  2. Why the problem is important?
  3. What are the state-of-art solutions?
  4. What are the limitations of the solutions? This is also the highlight of the subproblems?
  5. What is your solution? Why it is going to work?
  6. What is your contribution? It should solve the subproblems.

Background and Motivation

There should be no enough space to tell too much detailed analysis about the problem you are going to solve in the Introduction. In this section, you should discuss the details about the problem you are going to solve. You need to give some detailed examples to illustrate the problems. The discussed aspect should have been highlighted in the Introduction.

Your Solution

This is an ansower to your motivation and the problem analysed in detail in Motivation. You should explain why you come to this solution, and why your solution is going to work.

Implementation

How you do the experiments?

Experiments Results

  1. Why you do this/these experiments?
  2. How you get the results?
  3. Why you get the results like this?

Summarise the state-of-art solutions, and tell the differences of your solution. This should be an explain about why your contribution is a contribution.

Conclusion

  1. Restate the topic.
  2. Restate your solution.
  3. Briefly summarize your main points.
  4. Explain the experiments.
  5. Explain the results.

Other Resources

For more detailed reference see Handouts from The Writing Center at UNC Chapel Hill.

Applying KNN To MNIST Dataset

The MNIST is a set of handwritten digits images. You can download it by using Makefile:

%.gz:
	wget http://yann.lecun.com/exdb/mnist/$*.gz

%.idx: %.gz
	gzip -d $*.gz
	mv $* $*.idx

prepare: t10k-images-idx3-ubyte.idx t10k-labels-idx1-ubyte.idx train-images-idx3-ubyte.idx train-labels-idx1-ubyte.idx

The size of each image is 28x28 pixels.

The IDX File Format

The MNIST dataset are stored in IDX file format. The basic format is:

magic number 
size in dimension 0 
size in dimension 1 
size in dimension 2 
..... 
size in dimension N 
data

The first 4 bytes of a IDX file is magic number:

typedef struct magic_number_s {
  char byte1;       // always 0
  char byte2;       // always 0
  char type;
  char dimentions;
} magic_number_t;

The first 2 bytes of magic number are always 0. The third byte codes the type of the data:

0x08: unsigned byte 
0x09: signed byte 
0x0B: short (2 bytes) 
0x0C: int (4 bytes) 
0x0D: float (4 bytes) 
0x0E: double (8 bytes)

For MNIST dataset, the type is unsigned byte.

The 4-th byte codes the number of dimensions of the vector/matrix. For image, the number of dimension is 3; for label, the number of dimension is 1.

The handwrite digits is formated as:

typedef struct images_s{
  magic_number_t magic_number;
  unsigned int number_of_images;
  unsigned int number_of_rows;
  unsigned int number_of_columns;
  unsigned char images[1]; // 28x28x[number of images]
} images_t;

The size of a single image in MNIST dataset is 28*28. Get the image date of a given index by:

#define get_img(head, index) (head + 28*28*index)

The label, represent the results(digits) of handwrite digits, is formated as:

typedef struct labels_s{
  magic_number_t magic_number;
  unsigned int number_of_items;
  unsigned char labels[1]; // [number of labels]
} labels_t;

Bigendian

The MNIST dataset is stored in bigendian. If you want to get the right number of items, which is unsigned int, you may need to convert it to littleendian.

// The sizes in each dimension are 4-byte integers (MSB first, high endian, like in most non-Intel processors).
int is_bigendian() {
  int i = 1;
  char *p = (char *)&i;

  if (p[0] == 1)
    return 0;
  else
    return 1;
}

unsigned int bit32conversion(unsigned int num) {
  return ((num>>24)&0xff) | ((num<<8)&0xff0000) | ((num>>8)&0xff00) | ((num<<24)&0xff000000);
}

if(!is_bigendian()) {
  number_of_items    = bit32conversion(*number_of_items);
}

The Distance of Images

There are multiple ways to caculate the distance of two images, one simple way is to using Euclidean distance.

// caculate the euclidean distance
double distance(unsigned char img1[], unsigned char img2[]) {
  int i,j;
  double sum = 0, value;
  for(i = 0, j= 28*28; i < j; i++) {
    value = img1[i] - img2[i];
    sum = sum + value*value;
  }
  return sum;
}

Multi-thread Design

In this design, finding the k-Nearest Neighbors in the train set for all the images in the test set is a time consuming process. We can design multi-thread program, in which each thread compute only a subset of the test set. The working structure for the thread can be designed as:

// the k-nearest neighbors is here.
typedef struct knn_s{
  int index;
  double distance;
} knn_t;

typedef struct cfg_s{
  knn_t * knn;
  unsigned int number_of_neighbor;
  unsigned int k;
  unsigned int train_start, train_stop, test_start, test_stop;
  unsigned int total;
  unsigned int * hit;
  unsigned int thread;
} cfg_t;

In the multi-thread design, the program can utilize all the cores of the CUPs.

Performance Evaluate

The performance we evaluate here is the hit ratio, which indicate the percentage the program success in telling the right digit from the image.

k hit rate (%)
1 97.91
2 96.27
3 97.05
4 96.82
5 96.88
6 96.77
7 96.94
8 96.70
9 96.59
10 96.65

The best performance is get when k = 3.

The complete source code can be got from: https://github.com/janzhou/cap5610

NNThroughputBenchmark

NNThroughputBenchmark is one of the earliest NameNode Benchmarks.

It was first described in HDFS Scalability: The Limits to Growth

In order to measure the name-node performance, I implemented a bench- mark called NNThroughputBenchmark, which now is a standard part of the HDFS code base.

NNThroughputBenchmark is a single-node benchmark, which starts a name-node and runs a series of client threads on the same node. Each client repetitively performs the same name-node operation by directly calling the name-node method implementing this operation. Then the benchmark mea- sures the number of operations performed by the name-node per second.

The reason for running clients locally rather than remotely from different nodes is to avoid any communication overhead caused by RPC connections and serialization, and thus reveal the upper bound of pure name-node per- formance.

run it using hadoop command:

hadoop org.apache.hadoop.hdfs.server.namenode.NNThroughputBenchmark

Then you will see available operations, options:

Usage: NNThroughputBenchmark
    -op all <other ops options> | 
    -op create [-threads T] [-files N] [-filesPerDir P] [-close] | 
    -op mkdirs [-threads T] [-dirs N] [-dirsPerDir P] | 
    -op open [-threads T] [-files N] [-filesPerDir P] [-useExisting] | 
    -op delete [-threads T] [-files N] [-filesPerDir P] [-useExisting] | 
    -op fileStatus [-threads T] [-files N] [-filesPerDir P] [-useExisting] | 
    -op rename [-threads T] [-files N] [-filesPerDir P] [-useExisting] | 
    -op blockReport [-datanodes T] [-reports N] [-blocksPerReport B] [-blocksPerFile F] | 
    -op replication [-datanodes T] [-nodesToDecommission D] [-nodeReplicationLimit C] [-totalBlocks B] [-replication R] | 
    -op clean | 
         [-keepResults] | [-logLevel L] | [-UGCacheRefreshCount G]

Run with option -op all, you will get reaults like:

14/04/25 15:22:26 FATAL namenode.NNThroughputBenchmark: Log level = ERROR
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Starting 22 replication(s).
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Starting benchmark: clean
14/04/25 15:22:26 FATAL namenode.NNThroughputBenchmark: Log level = ERROR
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Starting 1 clean(s).
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: 
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- create inputs ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrFiles = 10
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrThreads = 3
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrFilesPerDir = 4
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- create stats  ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: # operations: 10
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Elapsed Time: 403
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark:  Ops per sec: 24.81389578163772
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Average Time: 105
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: 
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- mkdirs inputs ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrDirs = 10
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrThreads = 3
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrDirsPerDir = 2
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- mkdirs stats  ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: # operations: 10
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Elapsed Time: 349
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark:  Ops per sec: 28.653295128939828
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Average Time: 88
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: 
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- open inputs ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrFiles = 10
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrThreads = 3
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrFilesPerDir = 4
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- open stats  ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: # operations: 10
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Elapsed Time: 16
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark:  Ops per sec: 625.0
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Average Time: 1
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: 
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- delete inputs ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrFiles = 10
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrThreads = 3
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrFilesPerDir = 4
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- delete stats  ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: # operations: 10
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Elapsed Time: 366
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark:  Ops per sec: 27.3224043715847
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Average Time: 91
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: 
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- fileStatus inputs ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrFiles = 10
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrThreads = 3
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrFilesPerDir = 4
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- fileStatus stats  ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: # operations: 10
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Elapsed Time: 3
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark:  Ops per sec: 3333.3333333333335
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Average Time: 0
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: 
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- rename inputs ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrFiles = 10
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrThreads = 3
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: nrFilesPerDir = 4
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- rename stats  ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: # operations: 10
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Elapsed Time: 349
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark:  Ops per sec: 28.653295128939828
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Average Time: 89
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: 
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- blockReport inputs ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: reports = 10
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: datanodes = 3 (0, 0, 0)
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: blocksPerReport = 100
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: blocksPerFile = 10
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- blockReport stats  ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: # operations: 10
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Elapsed Time: 20
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark:  Ops per sec: 500.0
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Average Time: 6
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: 
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- replication inputs ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: numOpsRequired = 22
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: datanodes = 3 (0, 0, 0)
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: decommissioned datanodes = 1
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: datanode replication limit = 100
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: total blocks = 100
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- replication stats  ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: # operations: 0
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Elapsed Time: 0
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark:  Ops per sec: 0.0
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Average Time: 0
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: decommissioned blocks = 0
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: pending replications = 0
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: replications per sec: 0.0
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: 
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- clean inputs ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Remove directory /nnThroughputBenchmark
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: --- clean stats  ---
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: # operations: 1
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Elapsed Time: 38
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark:  Ops per sec: 26.31578947368421
14/04/25 15:22:26 INFO namenode.NNThroughputBenchmark: Average Time: 35

Related issues:

  1. HDFS-5068
  2. HDFS-5675

On Balance among Energy, Performance and Recovery in Storage Systems

With the increasing size of the clusters as well as the increasing capacity of each storage node, current storage systems are spending more time on recovery. When node failure happens, the system enters degradation mode in which node reconstruction/block recovery is initiated. This very process needs to wake up a number of disks and takes a substantial amount of I/O bandwidth which will not only compromise energy efficiency but also performance. This raises a natural problem: how to balance the performance, energy, and recovery in degradation mode for an energy efficient storage system? Without considering the I/O bandwidth contention between recovery and performance, we find that the current energy proportional solutions cannot answer these question accurately. This paper presents a mathematical model called Perfect Energy, Reliability, and Performance (PERP) which provides guidelines of provisioning active nodes number and recovery speed at each time slot with respect to the performance and recovery constraints. We apply our model to practical data layouts and test the effectiveness on our 25-node CASS cluster. Experimental results validate that our model helps realize 25% energy savings while meeting both performance and recovery constraints and the saving is expected to increase with a larger number of nodes.

[BIB] [PDF]

JSON-RPC over Golang Websocket

Basic ideas

JSON-RPC is a lightweight remote procedure call protocol. The request of JSON-RPC is a single object serialized using JSON, a lightweight data-interchange format most commonly used in web applications to send data from the server to the browser. Typically JSON data is transfered using Ajax1. But WebSocket represents the next evolutionary step in web communication. It support two way communication, provide bi-directional, full-duplex communications channels, over a single TCP socket2. So that we can implement jsonrpc in a bi-directional way.

Golang jsonrpc server & client

Golang’s jsonrpc is an implementation of JSON-RPC protocol. Here is an example for the jsonrpc package.

First let us look a jsonrpc sample in pure golang.

Jquery-jsonrpc

jquery-jsonrpc is a JSON RPC 2.0 compatible client library and jQuery (1.4, 1.5, and 1.6 compatible) plugin. But it not implemented the jsonrpc server. I write a jsonrpc library which implemented both the server and client when a websocket was created.

##Golang Websocket

Golang has a third party websocket implementation that we can use. You need to get the websocket package first:

go get code.google.com/p/go.net/websocket

Then you can try these examples:

The complete example code is here.

Removing ^M Characters In Vim

If you edit files in gedit or notepad and ^M characters would be inserted. After that you could not simply remove ^M in VIM with the following command:

%s/^M//g
%s/\^M//g
%s/^V^M//g
%s/C-vC-m//g

As pattern was not found.

^M in VIM can be manipulated as it is an \r character, which is read as carriage return. Doing a replace for \r characters will remove the ^M:

%s/\r//g

Your file would also contain \0, which is null-byte. Remove them too:

%s/\0//g

\n represent new line. Replace it will get a single line file:

%s/\n//g

for more information

" " (ASCII 32 (0x20)), an ordinary space.
"\t" (ASCII 9 (0x09)), a tab.
"\n" (ASCII 10 (0x0A)), a new line (line feed).
"\r" (ASCII 13 (0x0D)), a carriage return.
"\0" (ASCII 0 (0x00)), the NUL-byte.
"\x0B" (ASCII 11 (0x0B)), a vertical tab.

MIND: A Black-Box Energy Consumption Model for Disk Arrays

Energy consumption is becoming a growing concern in data centers. Many energy-conservation techniques have been proposed to address this problem. However, an integrated method is still needed to evaluate energy efficiency of storage systems and various power conservation techniques. Extensive measurements of different workloads on storage systems are often very timeconsuming and require expensive equipments. We have analyzed changing characteristics such as power and performance of stand-alone disks and RAID arrays, and then defined MIND as a black box power model for RAID arrays. MIND is devised to quantitatively measure the power consumption of redundant disk arrays running different workloads in a variety of execution modes. In MIND, we define five modes (idle, standby, and several types of access) and four actions, to precisely characterize power states and changes of RAID arrays. In addition, we develop corresponding metrics for each mode and action, and then integrate the model and a measurement algorithm into a popular trace tool - blktrace. With these features, we are able to run different 10 traces on large-scale storage systems with power conservation techniques. Accurate energy consumption and performance statistics are then collected to evaluate energy efficiency of storage system designs and power conservation techniques. Our experiments running both synthetic and realworld workloads on enterprise RAID arrays show that MIND can estimate power consumptions of disk arrays with an error rate less than 2%.

[BIB] [PDF]

TRACER: A Trace Replay Tool to Evaluate Energy-Efficiency of Mass Storage Systems

Improving energy efficiency of mass storage systems has become an important and pressing research issue in large HPC centers and data centers. New energy conservation techniques in storage systems constantly spring up; however, there is a lack of systematic and uniform way of accurately evaluating energy-efficient storage systems and objectively comparing a wide range of energy-saving techniques. This research presents a new integrated scheme, called TRACER, for evaluating energyefficiency of mass storage systems and judging energy-saving techniques. The TRACER scheme consists of a toolkit used to measure energy efficiency of storage systems as well as performance and energy metrics. In addition, TRACER contains a novel and accurate workload-control module to acquire power varying with workload modes and I/O load intensity. The workload generator in TRACER facilitates a block-level trace replay mechanism. The main goal of the workload-control module is to select a certain percentage (e.g., anywhere from 10% to 100%) of trace entries from a real-world I/O trace file uniformly and to replay filtered trace entries to reach any level of I/O load intensity. TRACER is experimentally validated on a general RAID5 enterprise disk array. Our experiments demonstrate that energy-efficient mass storage systems can be accurately evaluated on full scales by TRACER. We applied TRACER to investigate impacts of workload modes and load intensity on energy-efficiency of storage devices. This work shows that TRACER can enable storage system developers to evaluate energy efficiency designs for storage systems.

[BIB] [PDF]