Ask Your Question
1

Starting pipelines using CURL POST / LABEL mode ( API REST FULL)

asked 2019-12-12 09:02:59 -0500

Fabio Baldaconi Roldao gravatar image

updated 2019-12-12 10:08:36 -0500

iamontheinet gravatar image

Good Morning,

I need to know if there is any way to do "CURL POST -X" by pointing to a "LABEL" containing several pipelines. Example with a "1D" LABEL is executed by POST and all those with this tag will be executed. I can not tell if there is this called via "API RET FULL" but if there is no other way to call via LABEL ???

Example in the documentation for a pipelines

#curl -X POST http://<SDCEdge_hostname>:<port>/rest/v1/pipelines/start

Example in the documentation for a specific pipeline

#curl -X POST http://<SDCEdge_hostname>:<port>/rest/v1/pipeline/<pipelineID>/start -H 'Content-Type: application/json;charset=UTF-8' --data-binary '{"<parameter_name1>":"<parameter_value1>","<parameter_name2":"<parameter_value2>"}'
edit retag flag offensive close merge delete

1 Answer

Sort by ยป oldest newest most voted
0

answered 2019-12-18 13:13:33 -0500

metadaddy gravatar image

Here is the final code where it is being executed by Apache Airflow:

  • Creates a list with all Pipeline IDs as per the specified label
  • Deletes Avro files that are in the repository (Google Cloud Storage) using the statement that was placed in the "Description" field of the pipeline
  • Run the pipelines as per initial list
  • Monitor all pipelines until the end of every process "Status = Finished"

.

# Import libraries
import airflow
import requests
import os
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

# Define some basic arguments
default_args = {
  'owner': 'ExampleOwner',
  'email': ['owner@example.com'],
  'depends_on_past': False,
  'start_date': datetime(2019, 12, 13),
  'schedule_interval': '@daily',
  'email_on_failure': False,
  'email_on_retry': False,
  'email_on_success': False,
  'retries': 1
  }

# Name the DAG and define when it will be executed
dag = DAG(
      dag_id='streamset_start_label',
      default_args=default_args,
      catchup=False
  )

def streamset_start_label():
    USERNAME='#####'
    PASSWORD='#####'
    SDC_URL='http://1.2.3.4:18630'
    LABEL='test'    

    # GET list of pipelines with label = test
    pipelines = requests.get(f'{SDC_URL}/rest/v1/pipelines?label={LABEL}',
                    auth=(USERNAME, PASSWORD),
                    headers={'X-Requested-By': 'script'}).json()
    print (f'{pipelines}')

    # Execute the command to DELETE files (GCS) as listed above.
    # Start pipeline from pipeline ID
    finalizaron=[]
    pipelines_names=[]
    for pipeline in pipelines:
        delete = os.system(pipeline["description"])
        response = requests.post(f'{SDC_URL}/rest/v1/pipeline/{pipeline["pipelineId"]}/start?rev=0',
                        auth=(USERNAME, PASSWORD),
                        headers={'X-Requested-By': 'script'}).json()  
        print('Delete',delete)
        print('Deleted Google Storage files referent to pipelineID: {}'.format(pipeline["pipelineId"]))
        pipelines_names.append(pipeline["pipelineId"])
        print(f'{response["pipelineId"]} status is {response["status"]}')

    # Wait for status = "FINISH"
    while len(finalizaron)!= len(pipelines_names):
        lista_status=[]
            # GET com todos os status de todos os Pipelines
        pipelines_status = requests.get(f'{SDC_URL}/rest/v1/pipelines/status',
                auth=(USERNAME, PASSWORD),
                headers={'X-Requested-By': 'script'}).json()
        print('pipelines_status is:')
        print (f'{pipelines_status}')
        for ppl in pipelines_names:
            if ppl in pipelines_status.keys():
                print(pipelines_status[ppl]["status"])
                lista_status.append(pipelines_status[ppl]["status"])
                print('Pipeline Status {} is {} '.format(ppl,pipelines_status[ppl]["status"]))        
        finalizaron=list(filter(lambda x: x=='FINISHED',lista_status))
    print('All finished')

# Define the tasks that DAG will perform
dag_task_1 = PythonOperator(
  task_id='dag_task_1',
  python_callable=streamset_start_label,
  dag=dag)
edit flag offensive delete link more
Login/Signup to Answer

Question Tools

1 follower

Stats

Asked: 2019-12-12 09:02:59 -0500

Seen: 490 times

Last updated: Dec 18 '19