Sink Kafka Topic to Database Table | Build JDBC Sink Connector | Confluent Connector | Kafka Connect

  Рет қаралды 23,106

The Java Tech Learning

The Java Tech Learning

Күн бұрын

This video explains about sinking Kafka topic data to MySQL table using Confluent JDBC Sink Connector. It echo implementation to create Sink Connector Config and then later to deploy on Confluent server using Control Center.
Like | Subscribe | Share

Пікірлер: 75
@wilfredocorchado7379
@wilfredocorchado7379 Жыл бұрын
The best expanation of Kafka Sink Connector I found ...Thanks so much.
@javatechlearning
@javatechlearning Жыл бұрын
Thank you
@zdeals_official
@zdeals_official 9 ай бұрын
You must continue your teaching bro. You are excellent thanks
@javatechlearning
@javatechlearning 9 ай бұрын
Thank you
@amankumar-f4n5r
@amankumar-f4n5r Жыл бұрын
nice found your sink connector video as well here. I really appreciate the effort you put for us
@javatechlearning
@javatechlearning Жыл бұрын
Thank you 🙏😊
@srinathvk
@srinathvk 3 жыл бұрын
Nice walk through
@muthukumarchellappa7889
@muthukumarchellappa7889 6 ай бұрын
Cheers bro!
@AbhishekKumar-fn6sz
@AbhishekKumar-fn6sz 2 жыл бұрын
Good one, Thank you sir
@javatechlearning
@javatechlearning 2 жыл бұрын
Thanks for your feedback 🙂
@rajkiranboggala7085
@rajkiranboggala7085 9 ай бұрын
Would this work with any RDBMS? Would it work with Sybase ASE as the sink?
@javatechlearning
@javatechlearning 9 ай бұрын
Yes this should work with sybase db as well. You need to use sybase driver for that
@hemalathaa4579
@hemalathaa4579 Ай бұрын
Hi, can you please confirm 1)why you are using Avro converter? If I didn't set any schema for the topic means what should I give here? 2)pk.mode and pk.fields is mandatory?
@ravi7100
@ravi7100 Жыл бұрын
Hi, I have one question... can this kafka sink connector be used to connect 100 different databases ? Need your input on this
@bajisyed9225
@bajisyed9225 2 жыл бұрын
Hi Vishal, many thanks for uploading the videos related to JDBC source and sink connectors. They are very helpful. It would also be helpful if you can upload a video on how to upload a file (csv) to Kafka topic 🙂
@javatechlearning
@javatechlearning 2 жыл бұрын
Sure will prepare one on that topic
@karthikb.s.k.4486
@karthikb.s.k.4486 2 жыл бұрын
@@javatechlearning Can you do a video on upload csv file
@mdicopriatama6380
@mdicopriatama6380 Жыл бұрын
hi vishal, great explanation.. btw on parameter i use upsert for insert mode but the value on destination table not changing.. that is something I missed ?
@babuperumal7664
@babuperumal7664 2 жыл бұрын
Please increase the Font Size, so that can able to see in mobile and all kinds of devices..
@blacksoul2887
@blacksoul2887 Жыл бұрын
Hi Vishal, I am using Apache Kafka in my environment and I need to setup a sink connector to push the data from Kafka to ORACLE DB. Will this Confluent sink connector work with Apache Kafka?
@javatechlearning
@javatechlearning Жыл бұрын
You can't do it with Apache Kafka. You need to get confluent setup. Or you can go for confluent cloud as well
@blacksoul2887
@blacksoul2887 Жыл бұрын
@@javatechlearning okay, so if I don't use confluent (which is most likely non-compliant in my org), is there any other open source connector available for Apache Kafka that enables me to push data from Kafka to Oracle ?
@fumbling-fingers
@fumbling-fingers Жыл бұрын
Hi, thanks for your helpful guide, but pls let me know how we can define consumer group in this configuration
@javatechlearning
@javatechlearning Жыл бұрын
Consumer group is automatically created based on connector name
@fumbling-fingers
@fumbling-fingers Жыл бұрын
@@javatechlearning that's my problem, I can't add more than 1 consumer to a consumer group 😔
@karthikb.s.k.4486
@karthikb.s.k.4486 2 жыл бұрын
Nice explaination vishal one doubt if we have 10 fields coming from a kafka topic amd if we want to insert only 4 fields into database what properybis required how to do it
@javatechlearning
@javatechlearning 2 жыл бұрын
I'm afraid that's not how connector works. It expects whatever present in schema must go to table. You can keep those field null in tables
@Revving4LIFE
@Revving4LIFE 2 жыл бұрын
Hi Vishal, I am using JDBC sink connector in Kafka Confluent tool to load data from a topic to a table in SQL Server database. Due to mismatch in length between source and target field, I am getting an error which says: String or Binary data would be truncated for the field-X in table-xyz in database-abc (SQL Server). Is there a setting or any JDBC connection URL configuration available in Kafka connector to allow the data to load in the target table without erroring out the record or routing the record to the dead letter topic?
@javatechlearning
@javatechlearning 2 жыл бұрын
What the field type and column type in table
@PawanK-cu2kt
@PawanK-cu2kt 2 жыл бұрын
How can i capture a delete in source table using JDBC source connector and then sync the target table using JDBC sink connector?
@aishaimran5616
@aishaimran5616 2 жыл бұрын
How did you install the jdbcsink connector ?
@karthikb.s.k.4486
@karthikb.s.k.4486 2 жыл бұрын
Hi Vishal, Is there any parameter in connector that is there to check how many records are consumed by jdbc sink connector from a topic
@javatechlearning
@javatechlearning 2 жыл бұрын
It's default value which we can override in connector config. Max.poll.record
@Akash-tq1ui
@Akash-tq1ui 2 жыл бұрын
can you tell me how to delete data from topic when mysql query excecutes ?
@duongpham2561
@duongpham2561 2 жыл бұрын
Nive walk through. Do you have config file for Oracle ?
@javatechlearning
@javatechlearning 2 жыл бұрын
Its shared on my GitHub account. You can find it in description
@saurabhkar3014
@saurabhkar3014 2 жыл бұрын
Without wsl2 can we run confluent in our local windows..is there any way?
@javatechlearning
@javatechlearning 2 жыл бұрын
No, confluent can only be run on Linux based OS
@sankha087
@sankha087 2 жыл бұрын
So if I need to pull data from multiple kafka topics and dump them to multiple oracle tables , then can I configure the same in one connector json file ?? Please advise
@javatechlearning
@javatechlearning 2 жыл бұрын
No, in such case you need to create connector for each table. You can dump multiple topics data into single table, but not to multiple tables
@sankha087
@sankha087 2 жыл бұрын
@@javatechlearning but if I dump multiple topics in single table , then is there a way to distinguish the data between different topics from the table ? Can you please share any reference link if you have any
@javatechlearning
@javatechlearning 2 жыл бұрын
That depends upon data present or any differentiation flag field in your topic data. For example let's I have three topics one for each region APAC, EMEA and AMER then I should have region field present in all my topics.
@sankha087
@sankha087 2 жыл бұрын
@@javatechlearning Thanks for your reply . Problem is that those topics are not similar type of data and so is there a way that I can dump the topic names along with the data in the sink database , so that I can distinguish by seeing the source topic name ?
@sankha087
@sankha087 2 жыл бұрын
@@javatechlearning Problem is that those topics are not similar type of data and so is there a way that I can dump the topic names along with the data in the sink database , so that I can distinguish by seeing the source topic name ?
@karthikb.s.k.4486
@karthikb.s.k.4486 2 жыл бұрын
Hi Vishal, since you uploaded the config manually right say i want to run this json config file every 1 hour how can we do that with out human uploading it .
@javatechlearning
@javatechlearning 2 жыл бұрын
Hi, you do not have to upload multiple times. You can add poll interval property in config to run it every hour.
@karthikb.s.k.4486
@karthikb.s.k.4486 2 жыл бұрын
@@javatechlearning Thank you for your time . Is schema is a must to store the values in JSON format ? What if schema is not send by upstream application
@javatechlearning
@javatechlearning 2 жыл бұрын
In that case you have to manage conversation of upstream data into schema compatible object
@karthikb.s.k.4486
@karthikb.s.k.4486 2 жыл бұрын
@@javatechlearning means upstream has to send schema in every message ? Please correct me
@javatechlearning
@javatechlearning 2 жыл бұрын
Yes, either they can send or you can prepare in your app before dumping to Kafka topic. Somewhere it should be done
@ItEngineer1998
@ItEngineer1998 7 ай бұрын
Can we do this same in using spring boot
@javatechlearning
@javatechlearning 7 ай бұрын
Yes we can
@ItEngineer1998
@ItEngineer1998 7 ай бұрын
@@javatechlearning how sir ? Can u please provide some example ? Or code ?
@shuchikumari8031
@shuchikumari8031 2 жыл бұрын
Sir where you are doing this all code, like which file if you will reply it will soo much helpfull for us
@javatechlearning
@javatechlearning 2 жыл бұрын
I'm just using intellij to create config file.
@shuchikumari8031
@shuchikumari8031 2 жыл бұрын
@@javatechlearning sir if it possible send the code what you wrote in file
@javatechlearning
@javatechlearning 2 жыл бұрын
Sure will send. Ping me ur email id
@AmitKumar-om4yd
@AmitKumar-om4yd 2 жыл бұрын
if you got the code please send to me also
@javatechlearning
@javatechlearning 2 жыл бұрын
Git link is provided in my videos. Plz check
@AmitKumar-om4yd
@AmitKumar-om4yd 2 жыл бұрын
How can we run it locally ??
@javatechlearning
@javatechlearning 2 жыл бұрын
First you need to install confluent on local wsl2. Watch installation video
@rajsidhu2002
@rajsidhu2002 6 ай бұрын
How to insert this type of day { "_id": { "string": "663ba280e41f1aa25e72f32a" }, "Address": { "array": [ { "string": "a" }, { "string": "b" }, { "string": "c" } ] }, "City": { "string": "Norway" }, "FirstName": { "string": "Tom" }, "LastName": { "string": "Cardinal" }, "PersonID": { "int": 1 } }
@javatechlearning
@javatechlearning 6 ай бұрын
Is this json ? You can either convert or create avro schema from it and then publish into topic
@gurpartapsingh836
@gurpartapsingh836 6 ай бұрын
Yes In json there is Address field which Is array
@towardsa.i.3216
@towardsa.i.3216 Жыл бұрын
I deleted the entry from Source but it does not reflect on the sink. In the Sink it goes into DLQ, which says "Exception chain: java.sql.BatchUpdateException: Incorrect integer value: '' for column 'id' at row 1 java.sql.SQLException: Incorrect integer value: '' for column 'id' at row 1 "
@javatechlearning
@javatechlearning Жыл бұрын
Check column data type with the value being passed from Kafka topic
@javatechlearning
@javatechlearning Жыл бұрын
If you are just doing poc then why don't tou trying deleting based on string. See of that works of yes then there os problem with datatype of that value field
@towardsa.i.3216
@towardsa.i.3216 Жыл бұрын
@@javatechlearning Thank for the response. I solved the problem. In Source: 1) Tombstones should be true. In Sink: 1) Delete on null should be set to true 2) pk_mode should be record_key 3) pk_fields: comma separated value that uniquely defines the record 4) Input Key format should be AVRO
@yashilohia42
@yashilohia42 2 ай бұрын
If we want to extract header json into the db table columns how can this be done ?
Kafka Connect JDBC sink deep-dive: Working with Primary Keys
43:11
Robin Moffatt
Рет қаралды 12 М.
Creative Justice at the Checkout: Bananas and Eggs Showdown #shorts
00:18
Fabiosa Best Lifehacks
Рет қаралды 34 МЛН
Мама у нас строгая
00:20
VAVAN
Рет қаралды 12 МЛН
Farmer narrowly escapes tiger attack
00:20
CTV News
Рет қаралды 13 МЛН
Kafka Connect Iceberg Sink
40:20
Apache Iceberg
Рет қаралды 1,2 М.
Solving one of PostgreSQL's biggest weaknesses.
17:12
Dreams of Code
Рет қаралды 216 М.
Introduction to Kafka Connect
17:28
Knowledge Amplifier
Рет қаралды 7 М.
Kafka Connector Demo - MongoDB Developer Tools
9:52
MongoDB
Рет қаралды 29 М.
Microservices with Databases can be challenging...
20:52
Software Developer Diaries
Рет қаралды 101 М.
Delta Live Tables A to Z: Best Practices for Modern Data Pipelines
1:27:52