spark dataframe

from pyspark.sql import SparkSession from pyspark.sql import Row spark = SparkSession.builder.appName('test').getOrCreate() sc = spark.sparkContext spark.conf.set("spark.sql.shuffle.partitions", 6) l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)] rdd = sc.parallelize(l) people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) schemaPeople = spark.createDataFrame(people) df = spark.read.format("csv"). \ option("header", "true") \ .load("iris.csv") df.printSchema() df.show(10) df.count() df.columns df.withColumn('newWidth',df.SepalWidth * 2).show() df.drop('Name').show() df.describe().show() df.describe('Name').show() #分类变量 df.select('Name','SepalLength').show() df.select('Name').distinct().count() ### 分组统计 groupby(colname).agg({'col':'fun','col2':'fun2'}) df.groupby('Name').agg({'SepalWidth':'mean','SepalLength':'max'}).show() ### 自定义的汇总方法 import pyspark.sql.functions as fn df.agg(fn.count('SepalWidth').alias('width_count'), fn.countDistinct('id').alias('distinct_id_count')).collect() ### 数据集拆成两部分 trainDF, testDF = df.randomSplit([0.6, 0. [Read More]

spark streaming

from pyspark.sql import SparkSession
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext




if __name__ == '__main__':
    topic ="test"
    spark = SparkSession.builder.appName("Python Spark ").master("local[2]").getOrCreate()
    sc = spark.sparkContext
    ssc = StreamingContext(sc, 10)
    kvs = KafkaUtils.createDirectStream(ssc,[topic],{"metadata.broker.list":"127.0.0.1:9092"})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a + b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

sparksql

from pyspark.sql import SparkSession


if __name__ == '__main__':
    spark = SparkSession.builder.appName("Python Spark ").master("local[2]").getOrCreate()
    jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/video").option("driver","com.mysql.jdbc.Driver").option("dbtable", "baidu").option("user", "root").option("password", "123456").load()
    jdbcDF.show()