put a default type hint for PythonCallableSource · tomstepp/apache-beam@2fce769

Original file line numberDiff line numberDiff line change

@@ -77,6 +77,10 @@ private PythonExternalTransform(String fullyQualifiedName, String expansionServi

7777

this.expansionService = expansionService;

7878

this.kwargsMap = new TreeMap<>();

7979

this.typeHints = new HashMap<>();

80+

// TODO(BEAM-14458): remove a default type hint for PythonCallableSource when BEAM-14458 is

81+

// resolved

82+

this.typeHints.put(

83+

PythonCallableSource.class, Schema.FieldType.logicalType(new PythonCallable()));

8084

argsArray = new Object[] {};

8185

}

8286

@@ -204,14 +208,12 @@ Row buildOrGetKwargsRow() {

204208

// Types that are not one of following are considered custom types.

205209

// * Java primitives

206210

// * Type String

207-

// * Type PythonCallableSource

208211

// * Any Type explicitly annotated by withTypeHint()

209212

// * Type Row

210213

private boolean isCustomType(java.lang.Class<?> type) {

211214

boolean val =

212215

!(ClassUtils.isPrimitiveOrWrapper(type)

213216

|| type == String.class

214-

|| type == PythonCallableSource.class

215217

|| typeHints.containsKey(type)

216218

|| Row.class.isAssignableFrom(type));

217219

return val;

@@ -268,8 +270,6 @@ private Schema generateSchemaDirectly(

268270

if (field instanceof Row) {

269271

// Rows are used as is but other types are converted to proper field types.

270272

builder.addRowField(fieldName, ((Row) field).getSchema());

271-

} else if (field instanceof PythonCallableSource) {

272-

builder.addField(fieldName, Schema.FieldType.logicalType(new PythonCallable()));

273273

} else if (typeHints.containsKey(field.getClass())) {

274274

builder.addField(fieldName, typeHints.get(field.getClass()));

275275

} else {