Ask Your Question
0

How do I create a whole file in a custom processor?

asked 2017-10-27 13:02:07 -0500

metadaddy gravatar image

I want to create a whole file record in a custom processor. How do I do it?

edit retag flag offensive close merge delete

1 Answer

Sort by » oldest newest most voted
0

answered 2017-10-27 13:09:42 -0500

metadaddy gravatar image

updated 2017-11-02 17:53:48 -0500

You can implement a subclass of com.streamsets.pipeline.api.FileRef to provide an input stream for your data. Typically you might write a temporary file in the custom processor, and your custom FileRef would simply return an appropriate FileInputStream, but you can get an input stream from pretty much anywhere.

Here's a simple example that uses a string in memory:

StringFileRef.java

package com.streamsets.stage.lib.sample;

import com.streamsets.pipeline.api.FileRef;
import com.streamsets.pipeline.api.Stage;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Set;

public class StringFileRef extends FileRef {
  private final String data;

  public StringFileRef(String data) {
    super(data.length());
    this.data = data;
  }

  private final Set<Class<? extends AutoCloseable>> supportedStreamClasses = Collections.singleton(InputStream.class);
  @Override
  public <T extends AutoCloseable> Set<Class<T>> getSupportedStreamClasses() {
    return (Set)supportedStreamClasses;
  }

  @Override
  public <T extends AutoCloseable> T createInputStream(
      Stage.Context context, Class<T> aClass
  ) throws IOException {
    return (T)new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8.name()));
  }
}

In the custom processor:

public abstract class SampleProcessor extends SingleLaneRecordProcessor {
  public static final String DATA = "The quick brown fox jumps over the lazy dog";

  ...

  @Override
  protected void process(Record record, SingleLaneBatchMaker batchMaker) throws StageException {
    // Emit the input record
    batchMaker.addRecord(record);

    // Create a new record - we'll use the content of the field '/a' as
    // the filename, and some dummy data.
    Record newRecord = getContext().createRecord(record);

    HashMap<String, Field> root = new HashMap<>();
    root.put("fileRef", Field.create(new StringFileRef(DATA)));
    newRecord.set("/", Field.create(root));

    HashMap<String, Field> fileInfo = new HashMap<>();
    fileInfo.put("size", Field.create(DATA.length()));
    fileInfo.put("filename", Field.create(record.get("/a").getValueAsString()));
    newRecord.set("/fileInfo", Field.create(fileInfo));

    batchMaker.addRecord(newRecord);
  }
}

Example pipeline:

image description

Output:

$ ls /tmp/out/2017-10-27-18/
sdc-24e75fba-bd00-42fd-80c3-1f591e200ca6-1  sdc-24e75fba-bd00-42fd-80c3-1f591e200ca6-3
sdc-24e75fba-bd00-42fd-80c3-1f591e200ca6-2
$ cat /tmp/out/2017-10-27-18/sdc-24e75fba-bd00-42fd-80c3-1f591e200ca6-1
The quick brown fox jumps over the lazy dog

See Fun with FileRefs – Manipulating Whole File Data for a more complete example.

edit flag offensive delete link more
Login/Signup to Answer

Question Tools

1 follower

Stats

Asked: 2017-10-27 13:02:07 -0500

Seen: 547 times

Last updated: Nov 02 '17