ksqlDB HOWTO: Handling Time
9:08
3 жыл бұрын
ksqlDB HOWTO: Stateful Aggregates
13:56
ksqlDB HOWTO: Joins
10:23
3 жыл бұрын
ksqlDB HOWTO: Schema Manipulation
10:56
ksqlDB HOWTO: Filtering
9:25
3 жыл бұрын
Exploring the Kafka Connect REST API
20:56
From Zero to Hero with Kafka Connect
33:49
Kafka Connect in 60 seconds
1:01
4 жыл бұрын
Пікірлер
@HelenWilliams-q2v
@HelenWilliams-q2v 2 ай бұрын
Hernandez Donna Harris Amy Garcia Eric
@mirshahrukh2934
@mirshahrukh2934 3 ай бұрын
i have configured the oracle as a source with kafka with debezium and logminer, it is taking the snapshot but not streaming any changes made to the database, also i don't find any consumer offsets related to data taken through the snapshot, what i am missing here, can any one tell me please @Robin Moffat
@lalitshukla6585
@lalitshukla6585 4 ай бұрын
Thanks for this video it really helped. Just one thing I’m curious to know…is this S3 connector free to use ? I meant that sink connector is open source?
@jorgeluiscarhuaricaaguilar4849
@jorgeluiscarhuaricaaguilar4849 4 ай бұрын
Hello Robin I was trying following your video but the voluble connector is not longer supported. So the step in 9 min is not possible to follow. Can you give another options to produce data like that connector. I also was trying to use the kafka-console-producer command but I could not get data inserted into my bucket yet.
@rmoff
@rmoff 4 ай бұрын
Check out shadowtraffic.io - it's what Voluble was initially conceived as.
@walterferreiradossantos2378
@walterferreiradossantos2378 4 ай бұрын
Robin i need your help. I´m new on development with kafka. So i have kafka connect with cdc as consumer and producer with jdbc connect sink. I can to to a upsert correctly, but i can´t to do work the delete operation. It is possible to use jdbc connect sink for make work all operations like insert, update and delete...? Can you help me please with a example kafka connect sql server to sql server without use debezium?
@jeremykenn
@jeremykenn 5 ай бұрын
create stream test02 (col1 int, col2 varchar) with (kafka_topic='test02', partitions=1, value_format='AVRO'); only returned col1 and col2, i don't see the default rowtime and rowkey ksql 7.3.2
@georgelza
@georgelza 6 ай бұрын
realized this is a "old'ish" video... you dont show at any time how you started your kafkacat container, also of course now kafkacat has been replaced/renamed to kcat
@Saikrishnabollabathini
@Saikrishnabollabathini 6 ай бұрын
Hi, Is there a way to modify the payload as needed? For example, if the kafka message is {"C":1, "C2":"Test"}, In elastic search, the document need to be created as {"C":1, "searchFields":{"C2":"Test"}} Is there a way to achieve this?
@arahmedrasheed
@arahmedrasheed 6 ай бұрын
So Amazing
@georgelza
@georgelza 6 ай бұрын
why why oh why have i not found this before.... would have saved me so much grieve, hehehe... now to figure out why my local cp kafka connect does not want to sink to MongoAtlas hosted collection...
@amuse_me
@amuse_me 6 ай бұрын
Where can I find the documentation that you used? I really appreciate any help you can provide.
@LMGaming0
@LMGaming0 7 ай бұрын
Just wondering if there is already a video of creating a custom partionner and use it in my connector? I've been struggling to day to implement that since I'm not a Java guy .. thanks
@Pinaldba
@Pinaldba 7 ай бұрын
@Robin, I have Azure Synapse Sync JDBC driver kafka-connect-azure-sql-dw-1.0.7.jar in the path /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc but still I get error - Caused by: java.sql.SQLException: No suitable driver found for "jdbc:sqlserver I have tried all possible options no of the option worked.
@AjayMatai
@AjayMatai 7 ай бұрын
Is there a Kafka connector that would read from Kafka, do some transforms and write back to another Kafka topic?
@LMGaming0
@LMGaming0 7 ай бұрын
4 years later, still usefull
@shibimukesh9369
@shibimukesh9369 7 ай бұрын
Using mongodb sink connector tried to flatten message but iam getting error
@shibimukesh9369
@shibimukesh9369 7 ай бұрын
How to handle nested document..only root level is working
@vivekshah1664
@vivekshah1664 7 ай бұрын
Hi Robin, I am a software engineer at a startup. Last year we build a pipeline to sync our postgres data to elasticsearch and cassandra. It was all custom java code with lot of operational handling. Thank you for this video, I am planning to use connect for those pipelines.
@timothyvandermeer9845
@timothyvandermeer9845 8 ай бұрын
FANTASTIC. thank you brother
@Abhijit_Patra
@Abhijit_Patra 8 ай бұрын
I am trying to do this but I get a null value.
@chintanthakker4129
@chintanthakker4129 8 ай бұрын
I tried in MongoSinkConnector, I am seeing a failure with this message DataException: Only Map objects supported in absence of schema for [field insertion], fund java.lang.String \t in requireMap. Correct me, do I need to use value.convertor with value as JsonConvertor?
@BrunoMendonça-w3m
@BrunoMendonça-w3m 8 ай бұрын
I'm attempting to create a stream from a multi schema topic, similar to the train example at the end of this video. I would then split into multiple streams/topics for each message type. However, I can't seem to create the stream in a way that it's populated with the messages from the multi schema topic. Is there an actual example you can reference for this scenario?
@wongsiewwai
@wongsiewwai 9 ай бұрын
thank you.
@nesreenmohd665
@nesreenmohd665 9 ай бұрын
Thanks
@sivakumarm1381
@sivakumarm1381 10 ай бұрын
Thank you Robin for excellent explanation. In all your examples you're using with auto.create option with true , where I was trying to auto.create with false where I am creating the table before hand but I am getting error as below. I have tried all possible option. Will you able to help . Thanks in advance. Caused by: io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Table "table_name" is missing and auto-creation is disabled
@Agrossio
@Agrossio 11 ай бұрын
Great Video!! Will this work for processing a csv with 1.000.000 registries ?? Would it last less than an hour to save it in an Oracle Database??
@karthikb.s.k.4486
@karthikb.s.k.4486 Жыл бұрын
Thank you for tutorials Robin . If the schema changes frequently based on business requirements then each time we have to drop the stream and create it . Please let me know
@raghavankasthuri2908
@raghavankasthuri2908 Жыл бұрын
Hi @Robin Moffatt, Many thank you. Request clarification if in my below example, I'm thinking of Kafka implementation correctly please: Say a train route is A-B-C-D-E-F-G-H. Train 1 starts at station A and stops only in stations C and F to reach destination H. In the above, in the dashboard display on stations C, F and H, if we want to display where is the train?(Example: train has already left station A and is between station B and C). To display the highly volatile information when the train is on the move, we use Apache Streams API? In other words, as kafka updates the topic (where is the train currently), the implementation should call subscribe() and poll() methods to pull data from partition every so often and display the message in real-time "Train is presently between station B and C and is running 6 minutes late". I shall much appreciate your confirmation that I am thinking of a right example please.
@rmoff
@rmoff Жыл бұрын
Hi, yes I think you would use some stream processing logic to do this, I guess with some kind of timer to trigger since there'd be no event as such until it was next received from the source.
@raghavankasthuri2908
@raghavankasthuri2908 Жыл бұрын
Hi @@rmoff Thank you. I shall be grateful to confirm kafka streaming is actually about an event that could change it's state/value over a period of time. In other words, the event here is "where is the train". The dashboard displaying between station B and C and later between C and D. Due to arrive in 5 minutes and later due in 2 min/1 min. Shall I be right in my assumption: Kafka streaming is used for streaming the data pertaining to an event which changes state and value every so often please? Many thank you.
@RobertMetzger
@RobertMetzger Жыл бұрын
@@raghavankasthuri2908 You'd probably model this in a way that each train is emitting events regularly, or when they arrive in stations. Kafka streams (or any other stream processor) would subscribe to those stream events and emit arrival time estimations for all the relevant stations.
@raghavankasthuri2908
@raghavankasthuri2908 Жыл бұрын
@@RobertMetzger many thank you. much appreciated.
@dswan01
@dswan01 Жыл бұрын
So I assume you can use ksql to do transformation of data and then pass to stream to update target database. Is this correct?
@nejbahadnane7517
@nejbahadnane7517 Жыл бұрын
Thanks 🙏 great video
@rmoff
@rmoff Жыл бұрын
Thanks, glad you liked it :)
@sdon1011
@sdon1011 Жыл бұрын
Very interesting series of videos. Very helpful. A little remark: at 38:58, it seems that the order value to be inserted was way higher that the currently displayed maximum (22190899.73 vs 216233.09) and still this value was not updated.
@shibilpm9873
@shibilpm9873 Жыл бұрын
Why every one using conflunet kafka thsi and that, I wanted to do it in production and confluent kafka is not open source. Can anyone suggest any article or video to refer, I want to load csv or json file to kafka as a table.
@hamoudy41
@hamoudy41 Жыл бұрын
My data from Kafka is schema-less (string key, json values). I want to sink them to postgres. Any tips? Do I need schema registry? what converters transformers(if any) do i need?
@mukeshdharajiya1261
@mukeshdharajiya1261 Жыл бұрын
Thank you so much for your best explanation 🏆
@rmoff
@rmoff Жыл бұрын
Glad it was helpful!
@MohamedAyman-wu6gk
@MohamedAyman-wu6gk Жыл бұрын
Hi Robin, I'm working with Strimzi Kafka on openshift, and when I pulled the images for KSQL server and KSQL cli with 0.15.0 versions I got this error while writing any command.: The server has encountered an incompatible entry in its log and cannot process further DDL statements. This is most likely due to the service being rolled back to an earlier version. and the problem that when I'm using newer versions or the latest version I got the below error when I'm trying to identify fields as KEY or PRIMARY KEY: KSQL currently only supports KEY columns named ROWKEY, extraneous input 'PRIMARY.' so im struggiling to deal with keys and i have to go live soon so please kindly your support. Thanks in advance.
@rmoff
@rmoff Жыл бұрын
Hi, A good place to go for help is www.confluent.io/en-gb/community/ask-the-community/. thanks.
@piusono
@piusono Жыл бұрын
Thanks for the video. Though it is about 2 years old, I still find it useful. However, any idea why the error "Failed to create connector: {"error_code":409,"message":"Connector SINK_FOO_01_0 already exists"}" when I run for the first time, CREATE SINK CONNECTOR SINK_FOO_01_0 WITH (...) following your steps? Everything else worked up to that point.
@rmoff
@rmoff Жыл бұрын
Hi, I don't work with Kafka directly anymore. A good place to go for help is www.confluent.io/en-gb/community/ask-the-community/. thanks.
@scr33tch
@scr33tch Жыл бұрын
Thanks Robin. Your videos are by far the best, most detailed kafka resources.
@rmoff
@rmoff Жыл бұрын
Thank you! :)
@marcorossi2268
@marcorossi2268 Жыл бұрын
What if i want to shred the json into submessages and publish each section to a different topic that ends into a different destanation table
@rmoff
@rmoff Жыл бұрын
Hi, I don't work with Kafka directly anymore. A good place to go for help is www.confluent.io/en-gb/community/ask-the-community/. thanks.
@Evkayne
@Evkayne Жыл бұрын
thank you
@TimoBarth-m3b
@TimoBarth-m3b Жыл бұрын
Great tutorial and very helpful! Thanks for this
@emmanuelharel
@emmanuelharel Жыл бұрын
Hello, does anyone know if it is possible to write to the WAL of a postgres database from ksql ?
@emmanuelharel
@emmanuelharel Жыл бұрын
@@RobinNMoffatt to implement outbox pattern from sql within a transaction. So it would mean for example with postgres to use pg_logical_emit_messages function to store the message at the same time as storing an entity into a table. But all this from ksql or flinksql. Does it make sense ?
@robertoojeda3263
@robertoojeda3263 Жыл бұрын
Hello Robin! Good Video, spanish : Tienes un video que se use un topico Dead Letter Queue donde guarde varios mensajes con diferentes schemas ,mejor dicho que use las propiedades confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy y TopicRecordNameStrategy
@alexandersk.8963
@alexandersk.8963 Жыл бұрын
Amazing materials for me, Robin, thank you a lot!
@tanakpek6270
@tanakpek6270 Жыл бұрын
Hi Robin, Awesome video! I really appreciate this. I was wondering, I did these two smts and had JSON_SR configured everywhere. but the key was always prepended with a \" and appended with an "\ between the quotes. What is causing this
@rmoff
@rmoff Жыл бұрын
Hi, glad you liked the video! a good place to go for help is www.confluent.io/en-gb/community/ask-the-community/. thanks.
@rushij6874
@rushij6874 Жыл бұрын
"Great video, [Robin]! I really appreciated the clear and concise way you explained concepts . Your examples were also really helpful in solidifying my understanding of the kafka use case. Overall, I think you did an excellent job and I look forward to seeing more of your videos in the future. Keep up the great work!"
@mathiasyeremiaaryadi9097
@mathiasyeremiaaryadi9097 Жыл бұрын
Can we make the insert query persistent ? So then it will be running automatically, not at that time when we write the query
@bonolomokgadi9025
@bonolomokgadi9025 2 жыл бұрын
Hi Robin. I'm seeing the below error. Key converter is in StringConverter "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:220) \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142) \tat org.apache.kafka.connect.runtime.TransformationChain.transformRecord(TransformationChain.java:70) \tat org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) \tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:357) \tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:271) \tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200) \tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255) \tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) \tat java.util.concurrent.FutureTask.run(FutureTask.java:266) \tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) \tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) \tat java.lang.Thread.run(Thread.java:750) Caused by: org.apache.kafka.connect.errors.DataException: Field does not exist: ENTITY_NO \tat org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:89) \tat org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:67) \tat org.apache.kafka.connect.runtime.TransformationChain.lambda$transformRecord$0(TransformationChain.java:70) \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166) \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200) \t... 12 more " Below is the config "key.ignore":"false", "transforms": "CastToInt64,CastToString,copyIdToKey,extractKeyFromStruct", "transforms.copyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.copyIdToKey.fields": "ENTITY_NO", "transforms.extractKeyFromStruct.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractKeyFromStruct.field": "ENTITY_NO", "transforms.CastToInt64.spec": "REQUESTING_ENTITY_NO:int64", "transforms.CastToInt64.type": "org.apache.kafka.connect.transforms.Cast$Value", "transforms.CastToString.spec": "REQUESTING_ENTITY_NO:string", "transforms.CastToString.type": "org.apache.kafka.connect.transforms.Cast$Value",
@carlaguelpa4574
@carlaguelpa4574 2 жыл бұрын
@rmoff, thanks for your videos, are the best! We have a situation: We have implemented Debezium with kafka, we are experience a performance issue when both, source and sink connectors, are working at the same time the consumer performance decrease significantly, here some number of what we have: 1 Kafka Cluster with 3 nodes (8GB ram -Xms: 4096m -Xmx: 4096m each) - All the topics with Replication Factor = 3 1 Zookeper Cluster with 3 nodes (4GB RAM -Xms: 2048m -Xmx: 2048m each) 1 Connect Cluster with 5 nodes (12GB RAM c/u - -Xms: 6168m -Xmx: 6168m) with 1 Partition and 1 task but we have tried with 1 / 5 / 100 partitions and 1 / 5 / 15 / 100 task 1 Source Connector per table (4 tables) 1 Sink Connector per Table (4 tables) We are not using Schema Registry The problem: the Target DB has a delay that increase when the quantity of messages is more than 750msjs per seconds. Mesage size = 6kb Now we are processing 2000 msjs of 6kb per second (This was the best performance that we get and it was enabling ONLY the sink connectors) When we enabled only the sink connectors the performance is good, the same with source conectors with no sinks, performance is great Resources are OK, source and target environments have pleanty of CPU and Memory. When we see the messages consume per second there is like a wait for 30sec and then it consume some msjs and wait and then again. Question, what can we do to improve the consumer performance? Could you help me?
@NivedSuresh-e9h
@NivedSuresh-e9h 5 күн бұрын
Hey, y'all prolly sorted this out? If so could you please share some info on how?
@carlaguelpa4574
@carlaguelpa4574 2 жыл бұрын
@rmoff, thanks for your videos, are the best! We have a situation: We have implemented Debezium with kafka, we are experience a performance issue when both, source and sink connectors, are working at the same time the consumer performance decrease significantly, here some number of what we have: 1 Kafka Cluster with 3 nodes (8GB ram -Xms: 4096m -Xmx: 4096m each) - All the topics with Replication Factor = 3 1 Zookeper Cluster with 3 nodes (4GB RAM -Xms: 2048m -Xmx: 2048m each) 1 Connect Cluster with 5 nodes (12GB RAM c/u - -Xms: 6168m -Xmx: 6168m) with 1 Partition and 1 task but we have tried with 1 / 5 / 100 partitions and 1 / 5 / 15 / 100 task 1 Source Connector per table (4 tables) 1 Sink Connector per Table (4 tables) We are not using Schema Registry The problem: the Target DB has a delay that increase when the quantity of messages is more than 750msjs per seconds. Mesage size = 6kb Now we are processing 2000 msjs of 6kb per second (This was the best performance that we get and it was enabling ONLY the sink connectors) When we enabled only the sink connectors the performance is good, the same with source conectors with no sinks, performance is great Resources are OK, source and target environments have pleanty of CPU and Memory. When we see the messages consume per second there is like a wait for 30sec and then it consume some msjs and wait and then again. Question, what can we do to improve the consumer performance? Could you help us?
@schoesa
@schoesa 2 жыл бұрын
If i run docker-compose up -d it hangs everytime downloading the Kafka Connect JDBC hub plugin
@rmoff
@rmoff 2 жыл бұрын
hi, the best place to get help is at www.confluent.io/en-gb/community/ask-the-community/ :)