spark streaming

from pyspark.sql import SparkSession
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext




if __name__ == '__main__':
    topic ="test"
    spark = SparkSession.builder.appName("Python Spark ").master("local[2]").getOrCreate()
    sc = spark.sparkContext
    ssc = StreamingContext(sc, 10)
    kvs = KafkaUtils.createDirectStream(ssc,[topic],{"metadata.broker.list":"127.0.0.1:9092"})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a + b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()