cancel
Showing results for 
Search instead for 
Did you mean: 

Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.

APOC procedure works in browser but not found with Kafka streams

I have a Cypher that calls the apoc.cypher.doIt procedure. It works fine when run in the browser. However, when it is configured to run from Kafka streams, it reports the error:

2019-10-16 15:47:23.049+0000 ERROR Error processing 1 messages
ErrorData(originalTopic=platforms.kubernetes.rolebindings, timestamp=1571239718430, partition=0, offset=0, exception=org.neo4j.graphdb.QueryExecutionException: There is no procedure with the name `apoc.cypher.doIt` registered for this database instance. Please ensure you've spelled the procedure name correctly and that the procedure is properly deployed.

In the Neo4j.conf file, the APOC procedures were set to unrestricted and the Cypher for one of the Kafka topics calls an APOC procedure, as in this fragment:

dbms.security.procedures.unrestricted=algo.*,apoc.*
streams.sink.topic.cypher.platforms.kubernetes.rolebindings=call apoc.cypher.doIt("....

This Cypher was run several times in the browser to test it before configuring it for use with Kafka streams. And it still runs in the browser on the same database where Kafka streams is reporting it as not found.

Is there some additional configuration needed to enable the APOC procedure to be found from Kafka Streams?

2 REPLIES 2

Can you show you entire cypher query? It may be throwing an error based on the quotes.

@david.fauth Its a bit long. Here is the version that works fine and finds the APOC procedure when run in the browser:

WITH [{ name:"RiptideTraining", namespace:"riptide-training", cluster:"kubernetes-ctc-core-nonprod", memberships:[ { name:"henry", type:"User", role:"admin" }, { name:"svcacct-1", type:"ServiceAccount", role:"admin", namespace:"sa-namespace" }, { name : "OSFI_K8S_CSG_ADMIN", role : "admin", type : "Group" },{ name : "system:authenticated", role : "view", type : "SystemGroup" } ] },
      { name:"RiptideTraining", namespace:"riptide-training", cluster:"kubernetes-ctc-core-nonprod", memberships:[ { name:"henry", type:"User", role:"admin" }, { name:"svcacct-1", type:"ServiceAccount", role:"developer", namespace:"sa-namespace" } ] }
      ] AS events
UNWIND events AS event
call apoc.cypher.doIt("
WITH $event as event, randomUUID() AS uuid
MERGE (namespace:PlatformsContainersNamespace {Name:event.namespace,Cluster:event.cluster})
MERGE (cluster:PlatformsContainersCluster {Name:event.cluster})
MERGE (namespace)-[:CreatedIn]-(cluster)
FOREACH (perm in event.memberships |
      FOREACH ( _ IN case when perm.type = 'User' then [1] else [] end |
        MERGE (who:AccountUser {Name:perm.name}) 
        MERGE (who)-[r:hasAccess]-(namespace) 
          ON CREATE set r.Runid = uuid, r.Role = perm.role
          ON MATCH  set r.Runid = uuid, r.Role = perm.role
      )
      FOREACH ( _ IN case when perm.type = 'ServiceAccount' then [1] else [] end |
        MERGE (who:PlatformsContainersServiceAccount {Name:perm.name,Namespace:perm.namespace,Cluster:event.cluster}) 
        MERGE (who)-[r:hasAccess]-(namespace) 
          ON CREATE set r.Runid = uuid, r.Role = perm.role
          ON MATCH  set r.Runid = uuid, r.Role = perm.role
        MERGE (createnamespace:PlatformsContainersNamespace {Name:perm.namespace,Cluster:event.cluster})
        MERGE (who)-[r2:createdIn]-(createnamespace)
      )
      FOREACH ( _ IN case when perm.type = 'Group' then [1] else [] end |
        MERGE (who:SecurityADGroup {Name:perm.name})
        MERGE (who)-[r:hasAccess]-(namespace) 
          ON CREATE set r.Runid = uuid, r.Role = perm.role
          ON MATCH  set r.Runid = uuid, r.Role = perm.role
      )
      FOREACH ( _ IN case when perm.type = 'SystemGroup' then [1] else [] end |
        MERGE (who:PlatformsContainersSystemGroup  {Name:perm.name,Cluster:event.cluster}) 
        MERGE (who)-[r:hasAccess]-(namespace) 
          ON CREATE set r.Runid = uuid, r.Role = perm.role
          ON MATCH  set r.Runid = uuid, r.Role = perm.role
      )
)
WITH namespace, uuid
MATCH ()-[rd1:hasAccess]-(namespace) WHERE rd1.Runid <> uuid DELETE rd1
",{event:event}) yield value
return value

Here is how that was configure in the neo4j.config file to obtain the data from a kafka topic rather than hard coded for the browser testing:

streams.sink.topic.cypher.platforms.kubernetes.rolebindings=call apoc.cypher.doIt(" WITH $event as event, randomUUID() AS uuid MERGE (namespace:PlatformsContainersNamespace {Name:event.namespace,Cluster:event.cluster}) MERGE (cluster:PlatformsContainersCluster {Name:event.cluster}) MERGE (namespace)-[:CreatedIn]-(cluster) FOREACH (perm in event.memberships | FOREACH ( _ IN case when perm.type = 'User' then [1] else [] end | MERGE (who:AccountUser {Name:perm.name}) MERGE (who)-[r:hasAccess]-(namespace) ON CREATE set r.Runid = uuid, r.Role = perm.role ON MATCH set r.Runid = uuid, r.Role = perm.role ) FOREACH ( _ IN case when perm.type = 'ServiceAccount' then [1] else [] end | MERGE (who:PlatformsContainersServiceAccount {Name:perm.name,Namespace:perm.namespace,Cluster:event.cluster}) MERGE (who)-[r:hasAccess]-(namespace) ON CREATE set r.Runid = uuid, r.Role = perm.role ON MATCH set r.Runid = uuid, r.Role = perm.role MERGE (createnamespace:PlatformsContainersNamespace {Name:perm.namespace,Cluster:event.cluster}) MERGE (who)-[r2:createdIn]-(createnamespace) ) FOREACH ( _ IN case when perm.type = 'Group' then [1] else [] end | MERGE (who:SecurityADGroup {Name:perm.name}) MERGE (who)-[r:hasAccess]-(namespace) ON CREATE set r.Runid = uuid, r.Role = perm.role ON MATCH set r.Runid = uuid, r.Role = perm.role ) FOREACH ( _ IN case when perm.type = 'SystemGroup' then [1] else [] end | MERGE (who:PlatformsContainersSystemGroup {Name:perm.name,Cluster:event.cluster}) MERGE (who)-[r:hasAccess]-(namespace) ON CREATE set r.Runid = uuid, r.Role = perm.role ON MATCH set r.Runid = uuid, r.Role = perm.role ) ) WITH namespace, uuid MATCH ()-[rd1:hasAccess]-(namespace) WHERE rd1.Runid <> uuid DELETE rd1 ",{event:event}) yield value return value