Spring XD jdbc(source) -> jdbc(sink)

Refresh

November 2018

Views

1.5k time

1

I'm trying to configure streams like below (directly pipe from jdbc source to jdbc sink)

xd> stream create test2 --definition "output:jdbc --split=true --username=test --password=test --driverClassName=com.mysql.jdbc.Driver --url=jdbc:mysql://dbhost:3306/test --query='select id,name from test' | input:jdbc --username=test --password=test --driverClassName=com.mysql.jdbc.Driver --tableName=test2 --columns=id,name --url=jdbc:mysql://dbhost:3306/test" --deploy

but occurs ClassCastException.

Caused by: java.lang.ClassCastException: org.springframework.util.LinkedCaseInsensitiveMap cannot be cast to java.lang.String at org.springframework.xd.jdbc.JdbcMessagePayloadTransformer.transformPayload(JdbcMessagePayloadTransformer.java:39) at org.springframework.integration.transformer.AbstractPayloadTransformer.doTransform(AbstractPayloadTransformer.java:33) at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:33) ... 147 more

MySQL connection and table schema seems properly configured. connectivity already confirmed.

CREATE TABLE test (id int, name varchar(20)); CREATE TABLE test2 (id int, name varchar(20))

Spring XD version is 1.1.0.BUILD-20141103.163150-1-dist from zip below.

http://repo.spring.io/libs-snapshot/org/springframework/xd/spring-xd/1.1.0.BUILD-SNAPSHOT/spring-xd-1.1.0.BUILD-20141103.163150-1-dist.zip

I want to store the payload data to target sink table, but is the functionality still experimental?
or, Is it the stream problem, for example just racks some kind of conversion?

2 answers

3

Like Gary said, the sink expects a JSON document (there is an outstanding JIRA ticket to improve the sink to also accept a Map). For now you can have the source produce a JSON document instead of a Map using --outputType=application/json. Here is an example I just tried:

stream create jdbcCp --definition "source:jdbc --query='select id,name,year from myfiles where status = 0' --maxRowsPerPoll=10 --update='update myfiles set status = 1 where id in (:id)' --outputType=application/json | sink:jdbc --tableName=newfiles --columns=id,name,year"  --deploy

The JIRA issue is https://jira.spring.io/browse/XD-2250

1

See the documentation - the sink expects a simple String payload or a JSON String that will be converted to a Map. The source produces a List of Maps (or individual Maps when split is true).

Currently there is no option for the sink to handle a Map directly; you could add a transform module to convert the Map to JSON or create a custom sink.

We should probably change the sink to accept a Map.