# # @author JBD-2016-07 # http://jbigdata.fr # from __future__ import print_function import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import SparkSession if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: xgrep <file> <pattern>", file=sys.stderr) exit(-1) aSparkContext = SparkContext(appName="xgrep@JBD") aFile = aSparkContext.textFile(sys.argv[1]) theErrors = aFile.filter(lambda line, pattern=sys.argv[2] : pattern in line) print("Results#SparkContext:") print(theErrors) print("Number of {} : {}".format(sys.argv[2], theErrors.count())) # for anItems in theErrors.collect(): # print(anItems) aSparkContext.stop() # aSparkSQLSession = SparkSession.builder.appName("xgrep@JBD").getOrCreate() # theLines = aSparkSQLSession.read.text(sys.argv[1]) # theErrors = theLines.filter("value like '%ERROR%'") # print("Results#SparkSQLSession:") # print(theLines) # print(theErrors) # theLines.show() # theErrors.show() # aSparkSQLSession.stop() # aStreamingContext = StreamingContext(aSparkContext, 1) # aFile = spark.textFile("hdfs://jbd-vm01.jbdata.fr:9000/test/lstash-00/2016-06-10/jbd-vm01/hadoop-hduser-datanode-jbd-vm01.log") # theErrors = aFile.filter(lambda line: "ERROR" in line) print("End...")