1
Hello
I am creating a job streaming, but I do not get error message and nor does recording in Mongo.
I have tried several types of connection. On the command line, Producer and Consumer can produce and consume the topic normally.
Thank you in advance!
Just follow my code:
from kafka import KafkaConsumer
from pymongo import MongoClient
from json import loads
from pyspark.sql import SparkSession,SQLContext
from pyspark.streaming import StreamingContext
import logging
spark = SparkSession.builder \
.appName("edp_ignite_streaming") \
.master("yarn") \
.config("spark.mongodb.input.uri", "mongodb://172.29.0.22:27017/ignite.DDATransaction") \
.config("spark.mongodb.output.uri", "mongodb://172.29.0.22:27017/ignite.DDATransaction") \
.getOrCreate()
logger = logging.getLogger('ignite_consumer.py')
logger.debug("TESTE PARA VER SE O LOGGER ESTA FUNCIONANDO. DULOREAN")
print("INICIANDO CONTEXTO SPARK ")
sc = spark.sparkContext
print("INICIANDO CONTEXTO SPARK STREAMING")
ssc = StreamingContext(sc, 3)
print("INICIANDO CONTEXTO SPARK SQL")
#sqlContext = SQLContext(sc)
logger.debug("CONSUMINDO O TOPICO")
consumer = KafkaConsumer('edp_ignite_teste',
bootstrap_servers=['kafka02.medp.pt:6667'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='ignite-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
consumerKStream = KafkaUtils.createDirectStream(ssc, topics = ['edp_ignite_teste'], kafkaParams = {"metadata.broker.list": 'kafka02.medp.pt:6667'})
streamingDataFrame.writeStream \
.format("kafka") \
.option("topic", "edp_ignite_topic_1") \
.option("kafka.bootstrap.servers", "kafka01.medp.pt:6667")\
.start()
consumerKStream.show()
client = MongoClient('172.29.0.22:27017')
collection = client.ignite.DDATransaction
logger.debug("ITERANDO A LISTA PARA PERSISTENCIA. DULOREAN")
for message in consumer:
message = message.value
collection.insert_one(message)
print('{} added to {}'.format(message, collection))
df = sparkSession.read.format('com.stratio.datasource.mongodb').options(host='172.29.0.22:27017', database='ignite', collection='DDATransaction').load()
df.select("column1").collect()
df.select("column1").write.format("com.stratio.datasource.mongodb").mode('overwrite').options(host='172.29.0.22:27017', database='ignite', collection='DDATransaction').save()
dfView = sparkSession.read.format('com.stratio.datasource.mongodb').options(host='172.29.0.22:27017', database='ignite', collection='DDATransaction').load()
dfView.show()
logger.debug("DATAFRAME DE TESTE. DULOREAN")
people = spark.createDataFrame([("Bilbo Baggins", 50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77),
("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)], ["name", "age"])
people.write.format("mongo").mode("append").save()
people.write.format("mongo").mode("append").option("database","igniteDfTeste").option("collection", "contatos").save()
logger.debug("GRAVACAO MONGO. DULOREAN")
print("Save conta records in mongodb")
df_str.write.format("mongo").mode("append").option("database","ignite").option("collection", "DDATransaction").save()
logger.debug("ENVIO PARA OUTRO TOPICO KAFKA. DULOREAN")
streamingDataFrame.writeStream \
.format("kafka") \
.option("topic", "edp_ignite_topic_1") \
.option("kafka.bootstrap.servers", "kafka01.medp.pt:6667") \
.option("checkpointLocation", "hdfs://no01.medp.pt/checkpointkfk/") \
.start()
ssc.start()
ssc.awaitTermination()
Does your Consumer stay in the air via the command line while you’re running the application? If so, both are in the same Consumer group?
– Caio Augusto Papai