Pytho Kafka, Spark Streaming, Mongodb conection

Asked

Viewed 116 times

1

Hello

I am creating a job streaming, but I do not get error message and nor does recording in Mongo.

I have tried several types of connection. On the command line, Producer and Consumer can produce and consume the topic normally.

Thank you in advance!

Just follow my code:

from kafka import KafkaConsumer
from pymongo import MongoClient
from json import loads
from pyspark.sql import SparkSession,SQLContext
from pyspark.streaming import StreamingContext
import logging

spark = SparkSession.builder \
     .appName("edp_ignite_streaming") \
     .master("yarn") \
     .config("spark.mongodb.input.uri", "mongodb://172.29.0.22:27017/ignite.DDATransaction") \
     .config("spark.mongodb.output.uri", "mongodb://172.29.0.22:27017/ignite.DDATransaction") \
     .getOrCreate()

logger = logging.getLogger('ignite_consumer.py')

logger.debug("TESTE PARA VER SE O LOGGER ESTA FUNCIONANDO. DULOREAN")

print("INICIANDO CONTEXTO SPARK ")
sc = spark.sparkContext
print("INICIANDO CONTEXTO SPARK STREAMING")
ssc = StreamingContext(sc, 3)
print("INICIANDO CONTEXTO SPARK SQL")

#sqlContext = SQLContext(sc)

logger.debug("CONSUMINDO O TOPICO")
consumer = KafkaConsumer('edp_ignite_teste',
                         bootstrap_servers=['kafka02.medp.pt:6667'],
                         auto_offset_reset='earliest',
                         enable_auto_commit=True,
                         group_id='ignite-group',
                         value_deserializer=lambda x: loads(x.decode('utf-8')))

consumerKStream = KafkaUtils.createDirectStream(ssc, topics = ['edp_ignite_teste'], kafkaParams = {"metadata.broker.list": 'kafka02.medp.pt:6667'})

streamingDataFrame.writeStream \
     .format("kafka") \
     .option("topic", "edp_ignite_topic_1") \
     .option("kafka.bootstrap.servers", "kafka01.medp.pt:6667")\
     .start()

consumerKStream.show()

client = MongoClient('172.29.0.22:27017')
collection = client.ignite.DDATransaction

logger.debug("ITERANDO A LISTA PARA PERSISTENCIA. DULOREAN")
for message in consumer:
    message = message.value
    collection.insert_one(message)
    print('{} added to {}'.format(message, collection))



df = sparkSession.read.format('com.stratio.datasource.mongodb').options(host='172.29.0.22:27017', database='ignite', collection='DDATransaction').load()
df.select("column1").collect()

df.select("column1").write.format("com.stratio.datasource.mongodb").mode('overwrite').options(host='172.29.0.22:27017', database='ignite', collection='DDATransaction').save()
dfView = sparkSession.read.format('com.stratio.datasource.mongodb').options(host='172.29.0.22:27017', database='ignite', collection='DDATransaction').load()
dfView.show()




logger.debug("DATAFRAME DE TESTE. DULOREAN")

people = spark.createDataFrame([("Bilbo Baggins",  50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77),
   ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)], ["name", "age"])

people.write.format("mongo").mode("append").save()
people.write.format("mongo").mode("append").option("database","igniteDfTeste").option("collection", "contatos").save()

logger.debug("GRAVACAO MONGO. DULOREAN")
print("Save conta records in mongodb")
df_str.write.format("mongo").mode("append").option("database","ignite").option("collection", "DDATransaction").save()

logger.debug("ENVIO PARA OUTRO TOPICO KAFKA. DULOREAN")

streamingDataFrame.writeStream \
     .format("kafka") \
     .option("topic", "edp_ignite_topic_1") \
     .option("kafka.bootstrap.servers", "kafka01.medp.pt:6667") \
     .option("checkpointLocation", "hdfs://no01.medp.pt/checkpointkfk/") \
     .start()


ssc.start()
ssc.awaitTermination()
  • Does your Consumer stay in the air via the command line while you’re running the application? If so, both are in the same Consumer group?

No answers

Browser other questions tagged

You are not signed in. Login or sign up in order to post.