Do NVTabular and Transformers4Rec support list-of-list features? #803
Replies: 3 comments 1 reply
-
Code snippet
|
Beta Was this translation helpful? Give feedback.
-
@imran-marwat hello. Support of List of list features was not implemented and tested profoundly. Even though NVTabular transforms the list of lists (you can check the unit tests of the ListSlice op), the schema should represent the list of list features properly and then the Tf4Rec training script should handle the ragged or fixed size nested lists. This feature was not tested, but you can still try to train the model by following the existing notebook examples. Also recommended way to use Merlin libraries is using docker images to avoid any dependency issue with other libraries. you can use one of these images
|
Beta Was this translation helpful? Give feedback.
-
There is no example we did not do an example with nested lists. you can try to train the model with your nested columns, but again, this feature was not tested. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi I am using Transformers4Rec to build a session based recommender model and doing data pre-processing with NVTabular.
I am developing within the nvcr.io/nvidia/merlin/merlin-pytorch docker container. These are the versions of the packages that have been provided in the container
!pip show nvtabular
!pip show transformers4rec
Name: nvtabular
Version: 0+untagged.1.g52de1b2
Name: transformers4rec
Version: 0+untagged.1.g73782c9
I am facing an issue when applying a ListSlice op to a "list of lists" feature. I have raw row level string feature with example value like
merchandise | branding | marketing
I am converting this to a multi hot feature by first applying a custom LambdaOp to split the string and then categorify. This results in a list like
[0, 1, 2]
Downstream in my workflow, I am performing a groupby at the session level which then transforms this feature into a list of lists like:
[[0,1,2], [0,3,4], [1,2,3]]
When I apply a ListSliceOp I get the error below. Are list of list features supported? I can skip the ListSliceOp too but will this feature lead to a problem when I try to fit and train a model? Appreciate any help as these features are highly relevant to our model.
Failed to transform operator <nvtabular.ops.list_slice.ListSlice object at 0x7f158dab5ae0>
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/merlin/dag/executors.py", line 242, in _run_node_transform
transformed_data = node.op.transform(selection, input_data)
File "/usr/local/lib/python3.10/dist-packages/nvtx/nvtx.py", line 115, in inner
result = func(*args, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/nvtabular/ops/list_slice.py", line 107, in transform
elements = c.elements.values
File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 243, in values
return cupy.asarray(self.data_array_view(mode="write"))
File "/usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py", line 155, in data_array_view
return cuda.as_cuda_array(obj).view(self.dtype)
File "/usr/local/lib/python3.10/dist-packages/numba/cuda/api.py", line 74, in as_cuda_array
raise TypeError("obj doesn't implement the cuda array interface.")
TypeError: obj doesn't implement the cuda array interface.
TypeError Traceback (most recent call last)
Cell In[31], line 2
1 # Test subtraction on the grouped data
----> 2 processed_ds = workflow.fit_transform(sample_ds)
File /usr/local/lib/python3.10/dist-packages/nvtabular/workflow/workflow.py:236, in Workflow.fit_transform(self, dataset)
216 def fit_transform(self, dataset: Dataset) -> Dataset:
217 """Convenience method to both fit the workflow and transform the dataset in a single
218 call. Equivalent to calling
workflow.fit(dataset)
followed by219
workflow.transform(dataset)
(...)
234 transform
235 """
--> 236 self.fit(dataset)
237 return self.transform(dataset)
File /usr/local/lib/python3.10/dist-packages/nvtabular/workflow/workflow.py:213, in Workflow.fit(self, dataset)
199 def fit(self, dataset: Dataset) -> "Workflow":
200 """Calculates statistics for this workflow on the input dataset
201
202 Parameters
(...)
211 This Workflow with statistics calculated on it
212 """
--> 213 self.executor.fit(dataset, self.graph)
214 return self
File /usr/local/lib/python3.10/dist-packages/merlin/dag/executors.py:501, in DaskExecutor.fit(self, dataset, graph, refit)
485 dependencies.difference_update(current_phase)
487 # This captures the output dtypes of operators like LambdaOp where
488 # the dtype can't be determined without running the transform
489 # self._transform_impl(dataset, capture_dtypes=True).sample_dtypes()
490 #
491 Dataset(
492 self.transform(
493 dataset.to_ddf(),
494 graph.output_node,
495 graph.output_dtypes,
496 capture_dtypes=True,
497 ),
498 cpu=dataset.cpu,
499 base_dataset=dataset.base_dataset,
500 schema=graph.output_schema,
--> 501 ).sample_dtypes()
502 graph.construct_schema(dataset.schema, preserve_dtypes=True)
504 return graph
File /usr/local/lib/python3.10/dist-packages/merlin/io/dataset.py:1169, in Dataset.sample_dtypes(self, n, annotate_lists)
1162 """Return the real dtypes of the Dataset
1163
1164 Use cached metadata if this operation was
1165 already performed. Otherwise, call down to the
1166 underlying engine for sampling logic.
1167 """
1168 if self._real_meta.get(n, None) is None:
-> 1169 _real_meta = self.engine.sample_data(n=n)
1170 if self.dtypes:
1171 _real_meta = _set_dtypes(_real_meta, self.dtypes)
File /usr/local/lib/python3.10/dist-packages/merlin/io/dataset_engine.py:64, in DatasetEngine.sample_data(self, n)
62 _ddf = self.to_ddf()
63 for partition_index in range(_ddf.npartitions):
---> 64 _head = _ddf.partitions[partition_index].head(n)
65 if len(_head):
66 return _head
File /usr/local/lib/python3.10/dist-packages/dask/dataframe/core.py:1268, in _Frame.head(self, n, npartitions, compute)
1266 # No need to warn if we're already looking at all partitions
1267 safe = npartitions != self.npartitions
-> 1268 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)
File /usr/local/lib/python3.10/dist-packages/dask/dataframe/core.py:1302, in _Frame._head(self, n, npartitions, compute, safe)
1297 result = new_dd_object(
1298 graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
1299 )
1301 if compute:
-> 1302 result = result.compute()
1303 return result
File /usr/local/lib/python3.10/dist-packages/dask/base.py:314, in DaskMethodsMixin.compute(self, **kwargs)
290 def compute(self, **kwargs):
291 """Compute this dask collection
292
293 This turns a lazy Dask collection into its in-memory equivalent.
(...)
312 dask.base.compute
313 """
--> 314 (result,) = compute(self, traverse=False, **kwargs)
315 return result
File /usr/local/lib/python3.10/dist-packages/dask/base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
596 keys.append(x.dask_keys())
597 postcomputes.append(x.dask_postcompute())
--> 599 results = schedule(dsk, keys, **kwargs)
600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /usr/local/lib/python3.10/dist-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
86 elif isinstance(pool, multiprocessing.pool.Pool):
87 pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
90 pool.submit,
91 pool._max_workers,
92 dsk,
93 keys,
94 cache=cache,
95 get_id=_thread_get_id,
96 pack_exception=pack_exception,
97 **kwargs,
98 )
100 # Cleanup pools associated to dead threads
101 with pools_lock:
File /usr/local/lib/python3.10/dist-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
509 _execute_task(task, data) # Re-execute locally
510 else:
--> 511 raise_exception(exc, tb)
512 res, worker_id = loads(res_info)
513 state["cache"][key] = res
File /usr/local/lib/python3.10/dist-packages/dask/local.py:319, in reraise(exc, tb)
317 if exc.traceback is not tb:
318 raise exc.with_traceback(tb)
--> 319 raise exc
File /usr/local/lib/python3.10/dist-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
222 try:
223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
225 id = get_id()
226 result = dumps((result, id))
File /usr/local/lib/python3.10/dist-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/local/lib/python3.10/dist-packages/dask/optimization.py:990, in SubgraphCallable.call(self, *args)
988 if not len(args) == len(self.inkeys):
989 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File /usr/local/lib/python3.10/dist-packages/dask/core.py:149, in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
File /usr/local/lib/python3.10/dist-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File /usr/local/lib/python3.10/dist-packages/dask/utils.py:72, in apply(func, args, kwargs)
41 """Apply a function given its positional and keyword arguments.
42
43 Equivalent to
func(*args, **kwargs)
(...)
69 >>> dsk = {'task-name': task} # adds the task to a low level Dask task graph
70 """
71 if kwargs:
---> 72 return func(*args, **kwargs)
73 else:
74 return func(*args)
File /usr/local/lib/python3.10/dist-packages/merlin/dag/executors.py:103, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes, strict, target_format)
101 output_data = None
102 for node in nodes:
--> 103 transformed_data = self._execute_node(node, transformable, capture_dtypes, strict)
104 output_data = self._combine_node_outputs(node, transformed_data, output_data)
106 # If there are any additional columns that weren't produced by one of the supplied nodes
107 # we grab them directly from the supplied input data. Normally this would happen on a
108 # per-node basis, but offers a safety net for the multi-node case
File /usr/local/lib/python3.10/dist-packages/merlin/dag/executors.py:125, in LocalExecutor._execute_node(self, node, transformable, capture_dtypes, strict)
123 if "CategorifyTransform" in str(node.op):
124 transform_input = group_values_offsets(transform_input)
--> 125 transform_output = self._run_node_transform(node, transform_input, capture_dtypes, strict)
126 if "CategorifyTransform" in str(node.op):
127 transform_output = TensorTable(transform_output)
File /usr/local/lib/python3.10/dist-packages/merlin/dag/executors.py:255, in LocalExecutor._run_node_transform(self, node, input_data, capture_dtypes, strict)
253 except Exception as exc:
254 LOG.exception("Failed to transform operator %s", node.op)
--> 255 raise exc
File /usr/local/lib/python3.10/dist-packages/merlin/dag/executors.py:242, in LocalExecutor._run_node_transform(self, node, input_data, capture_dtypes, strict)
239 try:
240 # use input_columns to ensure correct grouping (subgroups)
241 selection = node.input_columns.resolve(node.input_schema)
--> 242 transformed_data = node.op.transform(selection, input_data)
244 if transformed_data is None:
245 raise RuntimeError(f"Operator {node.op} didn't return a value during transform")
File /usr/local/lib/python3.10/dist-packages/nvtx/nvtx.py:115, in annotate.call..inner(*args, **kwargs)
112 @wraps(func)
113 def inner(*args, **kwargs):
114 libnvtx_push_range(self.attributes, self.domain.handle)
--> 115 result = func(*args, **kwargs)
116 libnvtx_pop_range(self.domain.handle)
117 return result
File /usr/local/lib/python3.10/dist-packages/nvtabular/ops/list_slice.py:107, in ListSlice.transform(self, col_selector, df)
105 c = df[col]._column
106 offsets = c.offsets.values
--> 107 elements = c.elements.values
109 threads = 32
110 blocks = (offsets.size + threads - 1) // threads
File /usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py:243, in ColumnBase.values(self)
240 if self.has_nulls():
241 raise ValueError("Column must have no nulls.")
--> 243 return cupy.asarray(self.data_array_view(mode="write"))
File /usr/local/lib/python3.10/dist-packages/cudf/core/column/column.py:155, in ColumnBase.data_array_view(self, mode)
153 else:
154 obj = None
--> 155 return cuda.as_cuda_array(obj).view(self.dtype)
File /usr/local/lib/python3.10/dist-packages/numba/cuda/api.py:74, in as_cuda_array(obj, sync)
64 """Create a DeviceNDArray from any object that implements
65 the :ref:
cuda array interface <cuda-array-interface>
.66
(...)
71 synchronized.
72 """
73 if not is_cuda_array(obj):
---> 74 raise TypeError("obj doesn't implement the cuda array interface.")
75 else:
76 return from_cuda_array_interface(obj.cuda_array_interface,
77 owner=obj, sync=sync)
TypeError: obj doesn't implement the cuda array interface.
Beta Was this translation helpful? Give feedback.
All reactions